class Promenade::Kafka::ProducerSubscriber

Public Instance Methods

ack_message(event) click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 93
def ack_message(event)
  labels = get_labels(event)
  delay = event.payload.fetch(:delay)

  Promenade.metric(:kafka_producer_ack_messages).increment(labels)
  Promenade.metric(:kafka_producer_ack_latency).observe(labels, delay)
end
buffer_overflow(event) click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 78
def buffer_overflow(event)
  Promenade.metric(:kafka_producer_buffer_overflows).increment(get_labels(event))
end
deliver_messages(event) click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 82
def deliver_messages(event) # rubocop:disable Metrics/AbcSize
  labels = { client: event.payload.fetch(:client_id) }
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  Promenade.metric(:kafka_producer_delivery_errors).increment(labels) if event.payload.key?(:exception)
  Promenade.metric(:kafka_producer_delivery_latency).observe(labels, event.duration)
  Promenade.metric(:kafka_producer_delivered_messages).increment(labels, message_count)
  Promenade.metric(:kafka_producer_delivery_attempts).observe(labels, attempts)
end
produce_message(event) click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 64
def produce_message(event) # rubocop:disable Metrics/AbcSize
  labels = get_labels(event)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size

  Promenade.metric(:kafka_producer_messages).increment(labels)
  Promenade.metric(:kafka_producer_message_size).observe(labels, message_size)
  Promenade.metric(:kafka_producer_buffer_size).set(labels.slice(:client), buffer_size)
  Promenade.metric(:kafka_producer_max_buffer_size).set(labels.slice(:client), max_buffer_size)
  Promenade.metric(:kafka_producer_buffer_fill_ratio).set(labels.slice(:client), buffer_fill_ratio)
end
topic_error(event) click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 101
def topic_error(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  Promenade.metric(:kafka_producer_ack_errors).increment(client: client, topic: topic)
end

Private Instance Methods

get_labels(event) click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 110
def get_labels(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  { client: client, topic: topic }
end