class Kafka::Prometheus::ConsumerSubscriber

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/kafka/prometheus.rb, line 70
def initialize
  super
  @process_messages = Prometheus.registry.counter(:consumer_process_messages, docstring: 'Total messages', labels: [:client, :group_id, :topic, :partition])
  @process_message_errors = Prometheus.registry.counter(:consumer_process_message_errors, docstring: 'Total errors', labels: [:client, :group_id, :topic, :partition])
  @process_message_latency =
    Prometheus.registry.histogram(:consumer_process_message_latency, docstring: 'Latency', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition])
  @offset_lag = Prometheus.registry.gauge(:consumer_offset_lag, docstring: 'Offset lag', labels: [:client, :group_id, :topic, :partition])
  @time_lag = Prometheus.registry.gauge(:consumer_time_lag, docstring: 'Time lag of message', labels: [:client, :group_id, :topic, :partition])
  @process_batch_errors = Prometheus.registry.counter(:consumer_process_batch_errors, docstring: 'Total errors in batch', labels: [:client, :group_id, :topic, :partition])
  @process_batch_latency =
    Prometheus.registry.histogram(:consumer_process_batch_latency, docstring: 'Latency in batch', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition])
  @batch_size = Prometheus.registry.histogram(:consumer_batch_size, docstring: 'Size of batch', buckets: SIZE_BUCKETS, labels: [:client, :group_id, :topic, :partition])
  @join_group = Prometheus.registry.histogram(:consumer_join_group, docstring: 'Time to join group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
  @join_group_errors = Prometheus.registry.counter(:consumer_join_group_errors, docstring: 'Total error in joining group', labels: [:client, :group_id])
  @sync_group = Prometheus.registry.histogram(:consumer_sync_group, docstring: 'Time to sync group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
  @sync_group_errors = Prometheus.registry.counter(:consumer_sync_group_errors, docstring: 'Total error in syncing group', labels: [:client, :group_id])
  @leave_group = Prometheus.registry.histogram(:consumer_leave_group, docstring: 'Time to leave group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
  @leave_group_errors = Prometheus.registry.counter(:consumer_leave_group_errors, docstring: 'Total error in leaving group', labels: [:client, :group_id])
  @pause_duration = Prometheus.registry.gauge(:consumer_pause_duration, docstring: 'Pause duration', labels: [:client, :group_id, :topic, :partition])
end

Public Instance Methods

fetch_batch(event) click to toggle source
# File lib/kafka/prometheus.rb, line 136
def fetch_batch(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }
  offset_lag = event.payload.fetch(:offset_lag)
  batch_size = event.payload.fetch(:message_count)

  @batch_size.observe(batch_size, labels: key)
  @offset_lag.set(offset_lag, labels: key)
end
join_group(event) click to toggle source
# File lib/kafka/prometheus.rb, line 150
def join_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @join_group.observe(event.duration, labels: key)

  @join_group_errors.increment(labels: key) if event.payload.key?(:exception)
end
leave_group(event) click to toggle source
# File lib/kafka/prometheus.rb, line 164
def leave_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @leave_group.observe(event.duration, labels: key)

  @leave_group_errors.increment(labels: key) if event.payload.key?(:exception)
end
pause_status(event) click to toggle source
# File lib/kafka/prometheus.rb, line 171
def pause_status(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }

  duration = event.payload.fetch(:duration)
  @pause_duration.set(duration, labels: key)
end
process_batch(event) click to toggle source
# File lib/kafka/prometheus.rb, line 119
def process_batch(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }
  message_count = event.payload.fetch(:message_count)

  if event.payload.key?(:exception)
    @process_batch_errors.increment(labels: key)
  else
    @process_batch_latency.observe(event.duration, labels: key)
    @process_messages.increment(by: message_count, labels: key)
  end
end
process_message(event) click to toggle source
# File lib/kafka/prometheus.rb, line 91
def process_message(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }

  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)

  time_lag = create_time && ((Time.now - create_time) * 1000).to_i

  if event.payload.key?(:exception)
    @process_message_errors.increment(labels: key)
  else
    @process_message_latency.observe(event.duration, labels: key)
    @process_messages.increment(labels: key)
  end

  @offset_lag.set(offset_lag, labels: key)

  # Not all messages have timestamps.
  return unless time_lag

  @time_lag.set(time_lag, labels: key)
end
sync_group(event) click to toggle source
# File lib/kafka/prometheus.rb, line 157
def sync_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @sync_group.observe(event.duration, labels: key)

  @sync_group_errors.increment(labels: key) if event.payload.key?(:exception)
end