class Kafka::Statsd::ProducerSubscriber
Public Instance Methods
ack_message(event)
click to toggle source
# File lib/kafka/statsd.rb, line 231 def ack_message(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) # Number of messages ACK'd for the topic. increment("producer.#{client}.#{topic}.ack.messages") # Histogram of delay between a message being produced and it being ACK'd. timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay)) end
buffer_overflow(event)
click to toggle source
# File lib/kafka/statsd.rb, line 206 def buffer_overflow(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.produce.errors") end
deliver_messages(event)
click to toggle source
# File lib/kafka/statsd.rb, line 213 def deliver_messages(event) client = event.payload.fetch(:client_id) message_count = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) if event.payload.key?(:exception) increment("producer.#{client}.deliver.errors") end timing("producer.#{client}.deliver.latency", event.duration) # Messages delivered to Kafka: count("producer.#{client}.deliver.messages", message_count) # Number of attempts to deliver messages: timing("producer.#{client}.deliver.attempts", attempts) end
produce_message(event)
click to toggle source
# File lib/kafka/statsd.rb, line 184 def produce_message(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) message_size = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f buffer_fill_percentage = buffer_fill_ratio * 100.0 # This gets us the write rate. increment("producer.#{client}.#{topic}.produce.messages") timing("producer.#{client}.#{topic}.produce.message_size", message_size) # This gets us the avg/max buffer size per producer. timing("producer.#{client}.buffer.size", buffer_size) # This gets us the avg/max buffer fill ratio per producer. timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio) timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage) end
topic_error(event)
click to toggle source
# File lib/kafka/statsd.rb, line 242 def topic_error(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.ack.errors") end