class Kafka::AsyncProducer::Worker

Public Class Methods

new(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) click to toggle source
# File lib/kafka/async_producer.rb, line 201
def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
  @queue = queue
  @producer = producer
  @delivery_threshold = delivery_threshold
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @instrumenter = instrumenter
  @logger = TaggedLogger.new(logger)
end

Public Instance Methods

run() click to toggle source
# File lib/kafka/async_producer.rb, line 211
def run
  @logger.push_tags(@producer.to_s)
  @logger.info "Starting async producer in the background..."

  do_loop
rescue Exception => e
  @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
  @logger.error "Async producer crashed!"
ensure
  @producer.shutdown
  @logger.pop_tags
end

Private Instance Methods

deliver_messages() click to toggle source
# File lib/kafka/async_producer.rb, line 283
def deliver_messages
  @producer.deliver_messages
rescue DeliveryFailed, ConnectionError => e
  # Failed to deliver messages -- nothing to do but log and try again later.
  @logger.error("Failed to asynchronously deliver messages: #{e.message}")
  @instrumenter.instrument("error.async_producer", { error: e })
end
do_loop() click to toggle source
# File lib/kafka/async_producer.rb, line 226
def do_loop
  loop do
    begin
      operation, payload = @queue.pop

      case operation
      when :produce
        produce(payload[0], **payload[1])
        deliver_messages if threshold_reached?
      when :deliver_messages
        deliver_messages
      when :shutdown
        begin
          # Deliver any pending messages first.
          @producer.deliver_messages
        rescue Error => e
          @logger.error("Failed to deliver messages during shutdown: #{e.message}")

          @instrumenter.instrument("drop_messages.async_producer", {
            message_count: @producer.buffer_size + @queue.size,
          })
        end

        # Stop the run loop.
        break
      else
        raise "Unknown operation #{operation.inspect}"
      end
    end
  end
rescue Kafka::Error => e
  @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
  @logger.info "Restarting in 10 seconds..."

  sleep 10
  retry
end
produce(value, **kwargs) click to toggle source
# File lib/kafka/async_producer.rb, line 264
def produce(value, **kwargs)
  retries = 0
  begin
    @producer.produce(value, **kwargs)
  rescue BufferOverflow => e
    deliver_messages
    if @max_retries == -1
      retry
    elsif retries < @max_retries
      retries += 1
      sleep @retry_backoff**retries
      retry
    else
      @logger.error("Failed to asynchronously produce messages due to BufferOverflow")
      @instrumenter.instrument("error.async_producer", { error: e })
    end
  end
end
threshold_reached?() click to toggle source
# File lib/kafka/async_producer.rb, line 291
def threshold_reached?
  @delivery_threshold > 0 &&
    @producer.buffer_size >= @delivery_threshold
end