class ElasticAPM::Transport::Worker

@api private

Attributes

adapter[W]
connection[R]
filters[R]
name[R]
queue[R]
serializers[R]

Public Class Methods

adapter() click to toggle source
# File lib/elastic_apm/transport/worker.rb, line 27
def adapter
  @adapter ||= Connection
end
new( config, queue, serializers:, filters: ) click to toggle source
# File lib/elastic_apm/transport/worker.rb, line 40
def initialize(
  config,
  queue,
  serializers:,
  filters:
)
  @config = config
  @queue = queue

  @serializers = serializers
  @filters = filters

  @connection = self.class.adapter.new(config)
end

Public Instance Methods

process(resource) click to toggle source
# File lib/elastic_apm/transport/worker.rb, line 73
def process(resource)
  return unless (json = serialize_and_filter(resource))
  connection.write(json)
end
work_forever() click to toggle source
# File lib/elastic_apm/transport/worker.rb, line 57
def work_forever
  while (msg = queue.pop)
    case msg
    when StopMessage
      debug 'Stopping worker [%s]', self
      connection.flush(:halt)
      break
    else
      process msg
    end
  end
rescue Exception => e
  warn 'Worker died with exception: %s', e.inspect
  debug e.backtrace.join("\n")
end

Private Instance Methods

serialize_and_filter(resource) click to toggle source
# File lib/elastic_apm/transport/worker.rb, line 80
def serialize_and_filter(resource)
  if resource.respond_to?(:prepare_for_serialization!)
    resource.prepare_for_serialization!
  end

  serialized = serializers.serialize(resource)

  # if a filter returns nil, it means skip the event
  return nil if @filters.apply!(serialized) == Filters::SKIP

  JSON.fast_generate(serialized)
rescue Exception
  error format('Failed converting event to JSON: %s', resource.inspect)
  error serialized.inspect
  nil
end