class Kafka::AsyncProducer::Worker
Public Class Methods
new(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
click to toggle source
# File lib/kafka/async_producer.rb, line 201 def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold @max_retries = max_retries @retry_backoff = retry_backoff @instrumenter = instrumenter @logger = TaggedLogger.new(logger) end
Public Instance Methods
run()
click to toggle source
# File lib/kafka/async_producer.rb, line 211 def run @logger.push_tags(@producer.to_s) @logger.info "Starting async producer in the background..." do_loop rescue Exception => e @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.error "Async producer crashed!" ensure @producer.shutdown @logger.pop_tags end
Private Instance Methods
deliver_messages()
click to toggle source
# File lib/kafka/async_producer.rb, line 283 def deliver_messages @producer.deliver_messages rescue DeliveryFailed, ConnectionError => e # Failed to deliver messages -- nothing to do but log and try again later. @logger.error("Failed to asynchronously deliver messages: #{e.message}") @instrumenter.instrument("error.async_producer", { error: e }) end
do_loop()
click to toggle source
# File lib/kafka/async_producer.rb, line 226 def do_loop loop do begin operation, payload = @queue.pop case operation when :produce produce(payload[0], **payload[1]) deliver_messages if threshold_reached? when :deliver_messages deliver_messages when :shutdown begin # Deliver any pending messages first. @producer.deliver_messages rescue Error => e @logger.error("Failed to deliver messages during shutdown: #{e.message}") @instrumenter.instrument("drop_messages.async_producer", { message_count: @producer.buffer_size + @queue.size, }) end # Stop the run loop. break else raise "Unknown operation #{operation.inspect}" end end end rescue Kafka::Error => e @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.info "Restarting in 10 seconds..." sleep 10 retry end
produce(value, **kwargs)
click to toggle source
# File lib/kafka/async_producer.rb, line 264 def produce(value, **kwargs) retries = 0 begin @producer.produce(value, **kwargs) rescue BufferOverflow => e deliver_messages if @max_retries == -1 retry elsif retries < @max_retries retries += 1 sleep @retry_backoff**retries retry else @logger.error("Failed to asynchronously produce messages due to BufferOverflow") @instrumenter.instrument("error.async_producer", { error: e }) end end end
threshold_reached?()
click to toggle source
# File lib/kafka/async_producer.rb, line 291 def threshold_reached? @delivery_threshold > 0 && @producer.buffer_size >= @delivery_threshold end