module Deimos::KafkaListener
This module listens to events published by RubyKafka.
Public Class Methods
send_produce_error(event)
click to toggle source
Listens for any exceptions that happen during publishing and re-publishes as a Deimos
event. @param event [ActiveSupport::Notification]
# File lib/deimos/instrumentation.rb, line 43 def self.send_produce_error(event) exception = event.payload[:exception_object] return if !exception || !exception.respond_to?(:failed_messages) messages = exception.failed_messages messages.group_by(&:topic).each do |topic, batch| producer = Deimos::Producer.descendants.find { |c| c.topic == topic } next if batch.empty? || !producer decoder = Deimos.schema_backend(schema: producer.config[:schema], namespace: producer.config[:namespace]) payloads = batch.map { |m| decoder.decode(m.value) } Deimos.config.metrics&.increment( 'publish_error', tags: %W(topic:#{topic}), by: payloads.size ) Deimos.instrument( 'produce_error', producer: producer, topic: topic, exception_object: exception, payloads: payloads ) end end