class Kafka::Prometheus::ConnectionSubscriber

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/kafka/prometheus.rb, line 43
def initialize
  super
  @api_calls = Prometheus.registry.counter(:api_calls, docstring: 'Total calls', labels: [:client, :api, :broker])
  @api_latency = Prometheus.registry.histogram(:api_latency, docstring: 'Latency', buckets: LATENCY_BUCKETS, labels: [:client, :api, :broker])
  @api_request_size = Prometheus.registry.histogram(:api_request_size, docstring: 'Request size', buckets: SIZE_BUCKETS, labels: [:client, :api, :broker])
  @api_response_size = Prometheus.registry.histogram(:api_response_size, docstring: 'Response size', buckets: SIZE_BUCKETS, labels: [:client, :api, :broker])
  @api_errors = Prometheus.registry.counter(:api_errors, docstring: 'Errors', labels: [:client, :api, :broker])
end

Public Instance Methods

request(event) click to toggle source
# File lib/kafka/prometheus.rb, line 52
def request(event)
  key = {
    client: event.payload.fetch(:client_id),
    api: event.payload.fetch(:api, 'unknown'),
    broker: event.payload.fetch(:broker_host)
  }
  request_size = event.payload.fetch(:request_size, 0)
  response_size = event.payload.fetch(:response_size, 0)

  @api_calls.increment(labels: key)
  @api_latency.observe(event.duration, labels: key)
  @api_request_size.observe(request_size, labels: key)
  @api_response_size.observe(response_size, labels: key)
  @api_errors.increment(labels: key) if event.payload.key?(:exception)
end