class WaterDrop::Instrumentation::Callbacks::Delivery
Creates a callable that we want to run upon each message delivery or failure
@note We don’t have to provide client_name here as this callback is per client instance
@note We do not consider ‘message.purge` as an error for transactional producers, because
this is a standard behaviour for not yet dispatched messages on aborted transactions. We do however still want to instrument it for traceability.
Constants
Public Class Methods
Source
# File lib/waterdrop/instrumentation/callbacks/delivery.rb, line 28 def initialize(producer_id, transactional, monitor) @producer_id = producer_id @transactional = transactional @monitor = monitor end
@param producer_id [String] id of the current producer @param transactional [Boolean] is this handle for a transactional or regular producer @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
Public Instance Methods
Source
# File lib/waterdrop/instrumentation/callbacks/delivery.rb, line 36 def call(delivery_report) error_code = delivery_report.error.to_i if error_code.zero? instrument_acknowledged(delivery_report) elsif @transactional && PURGE_ERRORS.include?(error_code) instrument_purged(delivery_report) else instrument_error(delivery_report) end # This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute # crashes even if the instrumentation code fails. If it would bubble-up, it could crash # the rdkafka background thread rescue StandardError => e @monitor.instrument( 'error.occurred', caller: self, error: e, producer_id: @producer_id, type: 'callbacks.delivery.error' ) end
Emits delivery details to the monitor @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
Private Instance Methods
Source
# File lib/waterdrop/instrumentation/callbacks/delivery.rb, line 111 def build_error(delivery_report) ::Rdkafka::RdkafkaError.new(delivery_report.error) end
Builds appropriate rdkafka error @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report @return [::Rdkafka::RdkafkaError]
Source
# File lib/waterdrop/instrumentation/callbacks/delivery.rb, line 63 def instrument_acknowledged(delivery_report) @monitor.instrument( 'message.acknowledged', caller: self, producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, delivery_report: delivery_report, label: delivery_report.label ) end
@param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
Source
# File lib/waterdrop/instrumentation/callbacks/delivery.rb, line 93 def instrument_error(delivery_report) @monitor.instrument( 'error.occurred', caller: self, error: build_error(delivery_report), producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, delivery_report: delivery_report, label: delivery_report.label, type: 'librdkafka.dispatch_error' ) end
@param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
Source
# File lib/waterdrop/instrumentation/callbacks/delivery.rb, line 77 def instrument_purged(delivery_report) @monitor.instrument( 'message.purged', caller: self, error: build_error(delivery_report), producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, delivery_report: delivery_report, label: delivery_report.label, type: 'librdkafka.dispatch_error' ) end
@param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report