class Promenade::Kafka::ProducerSubscriber
Public Instance Methods
ack_message(event)
click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 93 def ack_message(event) labels = get_labels(event) delay = event.payload.fetch(:delay) Promenade.metric(:kafka_producer_ack_messages).increment(labels) Promenade.metric(:kafka_producer_ack_latency).observe(labels, delay) end
buffer_overflow(event)
click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 78 def buffer_overflow(event) Promenade.metric(:kafka_producer_buffer_overflows).increment(get_labels(event)) end
deliver_messages(event)
click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 82 def deliver_messages(event) # rubocop:disable Metrics/AbcSize labels = { client: event.payload.fetch(:client_id) } message_count = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) Promenade.metric(:kafka_producer_delivery_errors).increment(labels) if event.payload.key?(:exception) Promenade.metric(:kafka_producer_delivery_latency).observe(labels, event.duration) Promenade.metric(:kafka_producer_delivered_messages).increment(labels, message_count) Promenade.metric(:kafka_producer_delivery_attempts).observe(labels, attempts) end
produce_message(event)
click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 64 def produce_message(event) # rubocop:disable Metrics/AbcSize labels = get_labels(event) message_size = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size Promenade.metric(:kafka_producer_messages).increment(labels) Promenade.metric(:kafka_producer_message_size).observe(labels, message_size) Promenade.metric(:kafka_producer_buffer_size).set(labels.slice(:client), buffer_size) Promenade.metric(:kafka_producer_max_buffer_size).set(labels.slice(:client), max_buffer_size) Promenade.metric(:kafka_producer_buffer_fill_ratio).set(labels.slice(:client), buffer_fill_ratio) end
topic_error(event)
click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 101 def topic_error(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) Promenade.metric(:kafka_producer_ack_errors).increment(client: client, topic: topic) end
Private Instance Methods
get_labels(event)
click to toggle source
# File lib/promenade/kafka/producer_subscriber.rb, line 110 def get_labels(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) { client: client, topic: topic } end