class NewRelic::Agent::LogEventAggregator

Constants

CUSTOM_ATTRIBUTES_KEY
DECORATING_ENABLED_KEY
DECORATING_SUPPORTABILITY_FORMAT
DROPPED_METRIC
FORWARDING_ENABLED_KEY
FORWARDING_SUPPORTABILITY_FORMAT
LEVEL_KEY

Per-message keys

LINES

Metric keys

LOGGER_SUPPORTABILITY_FORMAT
LOGSTASHER_SUPPORTABILITY_FORMAT
LOG_LEVEL_KEY
MAX_BYTES
MESSAGE_KEY
METRICS_ENABLED_KEY
METRICS_SUPPORTABILITY_FORMAT
OVERALL_ENABLED_KEY

Config keys

PRIORITY_KEY
SEEN_METRIC
SENT_METRIC
TIMESTAMP_KEY

Attributes

attributes[R]

Public Class Methods

new(events) click to toggle source
Calls superclass method NewRelic::Agent::EventAggregator::new
# File lib/new_relic/agent/log_event_aggregator.rb, line 46
def initialize(events)
  super(events)
  @counter_lock = Mutex.new
  @seen = 0
  @seen_by_severity = Hash.new(0)
  @high_security = NewRelic::Agent.config[:high_security]
  @instrumentation_logger_enabled = NewRelic::Agent::Instrumentation::Logger.enabled?
  @attributes = NewRelic::Agent::LogEventAttributes.new
  register_for_done_configuring(events)
end
payload_to_melt_format(data) click to toggle source

Because our transmission format (MELT) is different than historical agent payloads, extract the munging here to keep the service focused on the general harvest + transmit instead of the format.

Payload shape matches the publicly documented MELT format. docs.newrelic.com/docs/logs/log-api/introduction-log-api

We have to keep the aggregated payloads in a separate shape, though, to work with the priority sampling buffers

# File lib/new_relic/agent/log_event_aggregator.rb, line 198
def self.payload_to_melt_format(data)
  common_attributes = LinkingMetadata.append_service_linking_metadata({})

  # To save on unnecessary data transmission, trim the entity.type
  # sent by classic logs-in-context
  common_attributes.delete(ENTITY_TYPE_KEY)

  common_attributes.merge!(NewRelic::Agent.agent.log_event_aggregator.attributes.custom_attributes)

  _, items = data
  payload = [{
    common: {attributes: common_attributes},
    logs: items.map(&:last)
  }]

  return [payload, items.size]
end

Public Instance Methods

add_custom_attributes(custom_attributes) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 185
def add_custom_attributes(custom_attributes)
  attributes.add_custom_attributes(custom_attributes)
end
add_event_metadata(formatted_message, severity) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 141
def add_event_metadata(formatted_message, severity)
  metadata = {
    LEVEL_KEY => severity,
    TIMESTAMP_KEY => Process.clock_gettime(Process::CLOCK_REALTIME) * 1000
  }
  metadata[MESSAGE_KEY] = formatted_message unless formatted_message.nil?

  LinkingMetadata.append_trace_linking_metadata(metadata)
end
add_logstasher_event_attributes(event, log) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 175
def add_logstasher_event_attributes(event, log)
  log_copy = log.dup
  # Delete previously reported attributes
  log_copy.delete('message')
  log_copy.delete('level')
  log_copy.delete('@timestamp')

  event['attributes'] = log_copy
end
capacity() click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 57
def capacity
  @buffer.capacity
end
create_event(priority, formatted_message, severity) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 160
def create_event(priority, formatted_message, severity)
  formatted_message = truncate_message(formatted_message)
  event = add_event_metadata(formatted_message, severity)

  create_prioritized_event(priority, event)
end
create_logstasher_event(priority, severity, log) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 167
def create_logstasher_event(priority, severity, log)
  formatted_message = log['message'] ? truncate_message(log['message']) : nil
  event = add_event_metadata(formatted_message, severity)
  add_logstasher_event_attributes(event, log)

  create_prioritized_event(priority, event)
end
create_prioritized_event(priority, event) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 151
def create_prioritized_event(priority, event)
  [
    {
      PrioritySampledBuffer::PRIORITY_KEY => priority
    },
    event
  ]
end
determine_severity(log) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 114
def determine_severity(log)
  log['level'] ? log['level'].to_s.upcase : 'UNKNOWN'
end
harvest!() click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 216
def harvest!
  record_customer_metrics()
  super
end
increment_event_counters(severity) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 118
def increment_event_counters(severity)
  return unless NewRelic::Agent.config[METRICS_ENABLED_KEY]

  @counter_lock.synchronize do
    @seen += 1
    @seen_by_severity[severity] += 1
  end
end
logger_enabled?() click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 230
def logger_enabled?
  @enabled && @instrumentation_logger_enabled
end
logstasher_enabled?() click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 234
def logstasher_enabled?
  @enabled && NewRelic::Agent::Instrumentation::LogStasher.enabled?
end
monitoring_conditions_met?(severity) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 110
def monitoring_conditions_met?(severity)
  !severity_too_low?(severity) && NewRelic::Agent.config[FORWARDING_ENABLED_KEY] && !@high_security
end
record(formatted_message, severity) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 61
def record(formatted_message, severity)
  return unless logger_enabled?

  severity = 'UNKNOWN' if severity.nil? || severity.empty?
  increment_event_counters(severity)

  return if formatted_message.nil? || formatted_message.empty?
  return unless monitoring_conditions_met?(severity)

  txn = NewRelic::Agent::Transaction.tl_current
  priority = LogPriority.priority_for(txn)

  return txn.add_log_event(create_event(priority, formatted_message, severity)) if txn

  @lock.synchronize do
    @buffer.append(priority: priority) do
      create_event(priority, formatted_message, severity)
    end
  end
rescue
  nil
end
record_batch(txn, logs) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 127
def record_batch(txn, logs)
  # Ensure we have the same shared priority
  priority = LogPriority.priority_for(txn)
  logs.each do |log|
    log.first[PRIORITY_KEY] = priority
  end

  @lock.synchronize do
    logs.each do |log|
      @buffer.append(event: log)
    end
  end
end
record_logstasher_event(log) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 84
def record_logstasher_event(log)
  return unless logstasher_enabled?

  # LogStasher logs do not inherently include a message key, so most logs are recorded.
  # But when the key exists, we should not record the log if the message value is nil or empty.
  return if log.key?('message') && (log['message'].nil? || log['message'].empty?)

  severity = determine_severity(log)
  increment_event_counters(severity)

  return unless monitoring_conditions_met?(severity)

  txn = NewRelic::Agent::Transaction.tl_current
  priority = LogPriority.priority_for(txn)

  return txn.add_log_event(create_logstasher_event(priority, severity, log)) if txn

  @lock.synchronize do
    @buffer.append(priority: priority) do
      create_logstasher_event(priority, severity, log)
    end
  end
rescue
  nil
end
reset!() click to toggle source
Calls superclass method NewRelic::Agent::EventAggregator#reset!
# File lib/new_relic/agent/log_event_aggregator.rb, line 221
def reset!
  @counter_lock.synchronize do
    @seen = 0
    @seen_by_severity.clear
  end

  super
end

Private Instance Methods

after_harvest(metadata) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 265
def after_harvest(metadata)
  dropped_count = metadata[:seen] - metadata[:captured]
  note_dropped_events(metadata[:seen], dropped_count)
  record_supportability_metrics(metadata[:seen], metadata[:captured], dropped_count)
end
configured_log_level_constant() click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 315
def configured_log_level_constant
  format_log_level_constant(NewRelic::Agent.config[LOG_LEVEL_KEY])
end
format_log_level_constant(log_level) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 319
def format_log_level_constant(log_level)
  log_level.upcase.to_sym
end
line_metric_name_by_severity(severity) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 290
def line_metric_name_by_severity(severity)
  @line_metrics ||= {}
  @line_metrics[severity] ||= "Logging/lines/#{severity}".freeze
end
note_dropped_events(total_count, dropped_count) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 295
def note_dropped_events(total_count, dropped_count)
  if dropped_count > 0
    NewRelic::Agent.logger.warn("Dropped #{dropped_count} log events out of #{total_count}.")
  end
end
record_configuration_metric(format, key) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 255
def record_configuration_metric(format, key)
  state = NewRelic::Agent.config[key]
  label = if !enabled?
    'disabled'
  else
    state ? 'enabled' : 'disabled'
  end
  NewRelic::Agent.increment_metric(format % label)
end
record_customer_metrics() click to toggle source

To avoid paying the cost of metric recording on every line, we hold these until harvest before recording them

# File lib/new_relic/agent/log_event_aggregator.rb, line 273
def record_customer_metrics
  return unless enabled?
  return unless NewRelic::Agent.config[METRICS_ENABLED_KEY]

  @counter_lock.synchronize do
    return unless @seen > 0

    NewRelic::Agent.increment_metric(LINES, @seen)
    @seen_by_severity.each do |(severity, count)|
      NewRelic::Agent.increment_metric(line_metric_name_by_severity(severity), count)
    end

    @seen = 0
    @seen_by_severity.clear
  end
end
record_supportability_metrics(total_count, captured_count, dropped_count) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 301
def record_supportability_metrics(total_count, captured_count, dropped_count)
  return unless total_count > 0

  NewRelic::Agent.increment_metric(DROPPED_METRIC, dropped_count)
  NewRelic::Agent.increment_metric(SEEN_METRIC, total_count)
  NewRelic::Agent.increment_metric(SENT_METRIC, captured_count)
end
register_for_done_configuring(events) click to toggle source

We record once-per-connect metrics for enabled/disabled state at the point we consider the configuration stable (i.e. once we’ve gotten SSC)

# File lib/new_relic/agent/log_event_aggregator.rb, line 242
def register_for_done_configuring(events)
  events.subscribe(:server_source_configuration_added) do
    @high_security = NewRelic::Agent.config[:high_security]
    record_configuration_metric(LOGGER_SUPPORTABILITY_FORMAT, OVERALL_ENABLED_KEY)
    record_configuration_metric(LOGSTASHER_SUPPORTABILITY_FORMAT, OVERALL_ENABLED_KEY)
    record_configuration_metric(METRICS_SUPPORTABILITY_FORMAT, METRICS_ENABLED_KEY)
    record_configuration_metric(FORWARDING_SUPPORTABILITY_FORMAT, FORWARDING_ENABLED_KEY)
    record_configuration_metric(DECORATING_SUPPORTABILITY_FORMAT, DECORATING_ENABLED_KEY)

    add_custom_attributes(NewRelic::Agent.config[CUSTOM_ATTRIBUTES_KEY])
  end
end
severity_too_low?(severity) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 323
def severity_too_low?(severity)
  severity_constant = format_log_level_constant(severity)
  # always record custom log levels
  return false unless Logger::Severity.constants.include?(severity_constant)

  Logger::Severity.const_get(severity_constant) < Logger::Severity.const_get(configured_log_level_constant)
end
truncate_message(message) click to toggle source
# File lib/new_relic/agent/log_event_aggregator.rb, line 309
def truncate_message(message)
  return message if message.bytesize <= MAX_BYTES

  message.byteslice(0...MAX_BYTES)
end