module NewRelic::Agent::Instrumentation::RubyKafka
Constants
- CONSUME
- INSTRUMENTATION_NAME
- MESSAGING_LIBRARY
- PRODUCE
Public Instance Methods
Source
# File lib/new_relic/agent/instrumentation/ruby_kafka/instrumentation.rb, line 53 def create_kafka_metrics(action:, topic:) @nr_config.each do |seed_broker| host = "#{seed_broker&.host}:#{seed_broker&.port}" NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}/#{action}/#{topic}", 1) NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}", 1) end end
Source
# File lib/new_relic/agent/instrumentation/ruby_kafka/instrumentation.rb, line 35 def each_message_with_new_relic(message) NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME) headers = message&.headers || {} topic_name = message&.topic NewRelic::Agent::Messaging.wrap_message_broker_consume_transaction( library: MESSAGING_LIBRARY, destination_type: :topic, destination_name: topic_name, headers: headers, action: :consume ) do create_kafka_metrics(action: CONSUME, topic: topic_name) yield end end
Source
# File lib/new_relic/agent/instrumentation/ruby_kafka/instrumentation.rb, line 15 def produce_with_new_relic(value, **kwargs) NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME) topic_name = kwargs[:topic] segment = NewRelic::Agent::Tracer.start_message_broker_segment( action: :produce, library: MESSAGING_LIBRARY, destination_type: :topic, destination_name: topic_name ) create_kafka_metrics(action: PRODUCE, topic: topic_name) headers = kwargs[:headers] || {} ::NewRelic::Agent::DistributedTracing.insert_distributed_trace_headers(headers) NewRelic::Agent::Tracer.capture_segment_error(segment) { yield(headers) } ensure segment&.finish end