class Promenade::Kafka::ConsumerSubscriber

Public Instance Methods

fetch_batch(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 108
def fetch_batch(event)
  labels = get_labels(event)
  offset_lag = event.payload.fetch(:offset_lag)
  messages = event.payload.fetch(:message_count)

  Promenade.metric(:kafka_consumer_messages_fetched).increment(labels, messages)
  Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag)
end
join_group(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 117
def join_group(event)
  labels = group_labels(event)
  Promenade.metric(:kafka_consumer_join_group).observe(labels, event.duration)
  Promenade.metric(:kafka_consumer_join_group_errors).increment(labels) if event.payload.key?(:exception)
end
leave_group(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 129
def leave_group(event)
  labels = group_labels(event)
  Promenade.metric(:kafka_consumer_leave_group).observe(labels, event.duration)
  Promenade.metric(:kafka_consumer_leave_group_errors).increment(labels) if event.payload.key?(:exception)
end
pause_status(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 135
def pause_status(event)
  Promenade.metric(:kafka_consumer_pause_duration).observe(get_labels(event), event.payload.fetch(:duration))
end
process_batch(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 93
def process_batch(event)
  labels = get_labels(event)
  offset_lag = event.payload.fetch(:offset_lag)
  messages = event.payload.fetch(:message_count)

  if event.payload.key?(:exception)
    Promenade.metric(:kafka_consumer_batch_processing_errors).increment(labels)
  else
    Promenade.metric(:kafka_consumer_messages_processed).increment(labels, messages)
    Promenade.metric(:kafka_consumer_batch_processing_latency).observe(labels, event.duration)
  end

  Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag)
end
process_message(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 74
def process_message(event) # rubocop:disable Metrics/AbcSize
  labels = get_labels(event)
  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)
  time_lag = create_time && ((Time.now.utc - create_time) * 1000).to_i

  if event.payload.key?(:exception)
    Promenade.metric(:kafka_consumer_message_processing_errors).increment(labels)
  else
    Promenade.metric(:kafka_consumer_messages_processed).increment(labels)
    Promenade.metric(:kafka_consumer_message_processing_latency).observe(labels, event.duration)
  end

  Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag)

  # Not all messages have timestamps.
  Promenade.metric(:kafka_consumer_time_lag).set(labels, time_lag) if time_lag
end
sync_group(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 123
def sync_group(event)
  labels = group_labels(event)
  Promenade.metric(:kafka_consumer_sync_group).observe(labels, event.duration)
  Promenade.metric(:kafka_consumer_sync_group_errors).increment(labels) if event.payload.key?(:exception)
end

Private Instance Methods

get_labels(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 141
def get_labels(event)
  {
    client: event.payload.fetch(:client_id),
    group: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }
end
group_labels(event) click to toggle source
# File lib/promenade/kafka/consumer_subscriber.rb, line 150
def group_labels(event)
  {
    client: event.payload.fetch(:client_id),
    group: event.payload.fetch(:group_id),
  }
end