class Promenade::Kafka::ConsumerSubscriber
Public Instance Methods
fetch_batch(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 108 def fetch_batch(event) labels = get_labels(event) offset_lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count) Promenade.metric(:kafka_consumer_messages_fetched).increment(labels, messages) Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag) end
join_group(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 117 def join_group(event) labels = group_labels(event) Promenade.metric(:kafka_consumer_join_group).observe(labels, event.duration) Promenade.metric(:kafka_consumer_join_group_errors).increment(labels) if event.payload.key?(:exception) end
leave_group(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 129 def leave_group(event) labels = group_labels(event) Promenade.metric(:kafka_consumer_leave_group).observe(labels, event.duration) Promenade.metric(:kafka_consumer_leave_group_errors).increment(labels) if event.payload.key?(:exception) end
pause_status(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 135 def pause_status(event) Promenade.metric(:kafka_consumer_pause_duration).observe(get_labels(event), event.payload.fetch(:duration)) end
process_batch(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 93 def process_batch(event) labels = get_labels(event) offset_lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count) if event.payload.key?(:exception) Promenade.metric(:kafka_consumer_batch_processing_errors).increment(labels) else Promenade.metric(:kafka_consumer_messages_processed).increment(labels, messages) Promenade.metric(:kafka_consumer_batch_processing_latency).observe(labels, event.duration) end Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag) end
process_message(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 74 def process_message(event) # rubocop:disable Metrics/AbcSize labels = get_labels(event) offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now.utc - create_time) * 1000).to_i if event.payload.key?(:exception) Promenade.metric(:kafka_consumer_message_processing_errors).increment(labels) else Promenade.metric(:kafka_consumer_messages_processed).increment(labels) Promenade.metric(:kafka_consumer_message_processing_latency).observe(labels, event.duration) end Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag) # Not all messages have timestamps. Promenade.metric(:kafka_consumer_time_lag).set(labels, time_lag) if time_lag end
sync_group(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 123 def sync_group(event) labels = group_labels(event) Promenade.metric(:kafka_consumer_sync_group).observe(labels, event.duration) Promenade.metric(:kafka_consumer_sync_group_errors).increment(labels) if event.payload.key?(:exception) end
Private Instance Methods
get_labels(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 141 def get_labels(event) { client: event.payload.fetch(:client_id), group: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } end
group_labels(event)
click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 150 def group_labels(event) { client: event.payload.fetch(:client_id), group: event.payload.fetch(:group_id), } end