class Fluent::Plugin::DatadogOutput

fluentd output plugin for the Datadog Log Intake API

Constants

DD_MAX_MESSAGE_LEN

MaxMessageLen is the maximum length for any message we send to the intake see github.com/DataDog/datadog-log-agent/blob/2394da8c79a6cadbcd1e98d6c89c437becec2732/pkg/config/constants.go#L9-L10

DEFAULT_BUFFER_TYPE
MAX_MESSAGE_LEN
METADATA_SERVICE_ADDR

Address of the metadata service.

PLUGIN_NAME
PLUGIN_VERSION
TRUNCATED_LEN
TRUNCATED_MSG

Disable this warning to conform to fluentd config_param conventions. rubocop:disable Style/HashSyntax

Attributes

vm_id[R]
zone[R]

rubocop:enable Style/HashSyntax

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog_log.rb, line 128
def initialize
  super
  # use the global logger
  @log = $log # rubocop:disable Style/GlobalVars
end

Public Instance Methods

build_api_key_str(api_key:, logset:) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 167
def build_api_key_str(api_key:, logset:)
  if !logset.nil? && logset != ''
    "#{api_key}/#{logset}"
  else
    api_key
  end
end
build_extra_content(timestamp:, hostname:, service:, tags_payload:) click to toggle source

github.com/DataDog/datadog-log-agent/blob/db13b53dfdd036d43acfb15089a43eb31548f09f/pkg/processor/processor.go#L65

# File lib/fluent/plugin/out_datadog_log.rb, line 163
def build_extra_content(timestamp:, hostname:, service:, tags_payload:)
  "<46>0 #{timestamp} #{hostname} #{service} - - #{tags_payload}"
end
build_payload(api_key_str:, msg:, extra_content:) click to toggle source

build_payload returns a processed payload from a raw message @param [String] api_key_str @param [String] extra_content @param [String] msg

# File lib/fluent/plugin/out_datadog_log.rb, line 179
def build_payload(api_key_str:, msg:, extra_content:)
  "#{api_key_str} #{extra_content} #{msg}\\n"
end
build_tags_payload(config_tags:, source:, source_category:) click to toggle source

Given a list of tags, build_tags_payload generates the bytes array that will be inserted into messages @see github.com/DataDog/datadog-log-agent/blob/2394da8c79a6cadbcd1e98d6c89c437becec2732/pkg/config/integration_config.go#L180

# File lib/fluent/plugin/out_datadog_log.rb, line 145
def build_tags_payload(config_tags:, source:, source_category:)
  payload = ''

  payload = "[dd ddsource=\"#{source}\"]" if !source.nil? && source != ''

  if !source_category.nil? && source_category != ''
    payload = "#{payload}[dd ddsourcecategory=\"#{source_category}\"]"
  end

  if !config_tags.nil? && config_tags != ''
    config_tags = config_tags.join(',') if config_tags.is_a? ::Array
    payload = "#{payload}[dd ddtags=\"#{config_tags}\"]"
  end

  payload
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog_log.rb, line 183
def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  if @api_key.size == 0
    @api_key = ENV['DD_LOG_API_KEY']
    if @api_key == '' || @api_key.nil?
      error_message = 'Unable to obtain api_key from DD_LOG_API_KEY'
      fail Fluent::ConfigError, error_message
    end
  end

  # If monitoring is enabled, register metrics in the default registry
  # and store metric objects for future use.
  if @enable_monitoring
    registry = Monitoring::MonitoringRegistryFactory.create @monitoring_type
    @successful_requests_count = registry.counter(
      :datadog_successful_requests_count,
      'A number of successful requests to the Datadog Log Intake API')
    @failed_requests_count = registry.counter(
      :datadog_failed_requests_count,
      'A number of failed requests to the Datadog Log Intake API,'\
        ' broken down by the error code')
    @ingested_entries_count = registry.counter(
      :datadog_ingested_entries_count,
      'A number of log entries ingested by Datadog Log Intake')
    @dropped_entries_count = registry.counter(
      :datadog_dropped_entries_count,
      'A number of log entries dropped by the Stackdriver output plugin')
    @retried_entries_count = registry.counter(
      :datadog_retried_entries_count,
      'The number of log entries that failed to be ingested by the'\
        ' Stackdriver output plugin due to a transient error and were'\
        ' retried')
  end

  @platform = detect_platform

  # Set required variables: @project_id, @vm_id, @vm_name and @zone.
  set_required_metadata_variables

  @default_tags = build_default_tags

  # The resource and labels are now set up; ensure they can't be modified
  # without first duping them.
  @default_tags.freeze

  # Log an informational message containing the Logs viewer URL
  @log.info 'Logs viewer address: https://example.com/logs/'
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 246
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 251
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 255
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog_log.rb, line 241
def shutdown
  super
  @conn.close unless @conn.nil?
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datadog_log.rb, line 234
def start
  super
  init_api_client
  @successful_call = false
  @timenanos_warning = false
end
truncate_message(msg) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 134
def truncate_message(msg)
  if msg.size > DD_MAX_MESSAGE_LEN
    msg.slice(0, MAX_MESSAGE_LEN) + TRUNCATED_MSG
  else
    msg
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 259
def write(chunk)
  each_valid_record(chunk) do |_tag, time, record|
    if @detect_json
      # Save the timestamp and severity if available, then clear it out to
      # allow for determining whether we should parse the log or message
      # field.
      timestamp = record.delete('time')
      severity = record.delete('severity')

      # If the log is json, we want to export it as a structured log
      # unless there is additional metadata that would be lost.
      record_json = nil
      if record.length == 1
        %w(log message msg).each do |field|
          if record.key?(field)
            record_json = parse_json_or_nil(record[field])
          end
        end
      end
      record = record_json unless record_json.nil?
      # Restore timestamp and severity if necessary. Note that we don't
      # want to override these keys in the JSON we've just parsed.
      record['time'] ||= timestamp if timestamp
      record['severity'] ||= severity if severity
    end

    # TODO: Correlate Datadog APM spans with log messages
    # fq_trace_id = record.delete(@trace_key)
    # entry.trace = fq_trace_id if fq_trace_id

    begin
      msg = nil
      %w(log message msg).each do |field|
        msg = record[field] if record.key?(field)
      end

      tags = []

      kube = record['kubernetes'] || {}

      mappings = {
        'pod_name' => 'pod_name',
        'container_name' => 'container_name',
        'namespace_name' => 'kube_namespace'
      }

      mappings.each do |json_key, tag_key|
        tags << "#{tag_key}=#{kube[json_key]}" if kube.key? json_key
      end

      if kube.key? 'labels'
        labels = kube['labels']
        labels.each do |k, v|
          tags << "kube_#{k}=#{v}"
        end
      end

      # TODO: Include K8S tags like
      # - kube_daemon_set=$daemonset_name
      # - kube_deployment=$deployment_name
      # - kube_replica_set=$replicaset_name
      # -

      tags.concat(@default_tags)

      datetime = Time.at(Fluent::EventTime.new(time).to_r).utc.to_datetime
      timestamp_str = datetime.rfc3339(6)

      payload = build_payload(
        api_key_str: build_api_key_str(api_key: @api_key, logset: @logset),
        msg: truncate_message(msg),
        extra_content: build_extra_content(
          timestamp: timestamp_str,
          hostname: @vm_id,
          service: @service,
          tags_payload: build_tags_payload(
            config_tags: tags,
            source: @source,
            source_category: @source_category
          )
        )
      )

      entries_count = 1
      @conn.write(payload)
      @log.debug 'Sent payload to Datadog.', payload: payload
      increment_successful_requests_count
      increment_ingested_entries_count(entries_count)

      # Let the user explicitly know when the first call succeeded, to aid
      # with verification and troubleshooting.
      unless @successful_call
        @successful_call = true
        @log.info 'Successfully sent to Datadog.'
      end

    rescue => error
      increment_failed_requests_count
      increment_retried_entries_count(entries_count)
      # RPC cancelled, so retry via re-raising the error.
      @log.debug "Retrying #{entries_count} log message(s) later.",
                 error: error.to_s
      raise error
    end
  end
end

Private Instance Methods

build_default_tags() click to toggle source

Determine agent level monitored resource labels based on the resource type. Each resource type has its own labels that need to be filled in.

# File lib/fluent/plugin/out_datadog_log.rb, line 493
def build_default_tags
  aws_account_id = ec2_metadata['accountId'] if
      ec2_metadata.key?('accountId')
  # #host:i-09fbfed2672d2c6bf
  %W(host=#{@vm_id} zone=#{@zone} aws_account_id=#{aws_account_id})
    .concat @tags
end
convert_to_utf8(input) click to toggle source

Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out.

# File lib/fluent/plugin/out_datadog_log.rb, line 547
def convert_to_utf8(input)
  if @coerce_to_utf8
    input.encode(
      'utf-8',
      invalid: :replace,
      undef: :replace,
      replace: @non_utf8_replacement_string)
  else
    begin
      input.encode('utf-8')
    rescue EncodingError
      @log.error 'Encountered encoding issues potentially due to non ' \
                 'UTF-8 characters. To allow non-UTF-8 characters and ' \
                 'replace them with spaces, please set "coerce_to_utf8" ' \
                 'to true.'
      raise
    end
  end
end
detect_platform() click to toggle source

Determine what platform we are running on by consulting the metadata service (unless the user has explicitly disabled using that).

# File lib/fluent/plugin/out_datadog_log.rb, line 400
def detect_platform
  unless @use_metadata_service
    @log.info 'use_metadata_service is false; not detecting platform'
    return Platform::OTHER
  end

  begin
    open('http://' + METADATA_SERVICE_ADDR) do |f|
      if f.meta['metadata-flavor'] == 'Google'
        @log.info 'Detected GCE platform'
        return Platform::GCE
      end
      if f.meta['server'] == 'EC2ws'
        @log.info 'Detected EC2 platform'
        return Platform::EC2
      end
    end
  rescue StandardError => e
    @log.error 'Failed to access metadata service: ', error: e
  end

  @log.info 'Unable to determine platform'
  Platform::OTHER
end
each_valid_record(chunk) { |event| ... } click to toggle source

Filter out invalid non-Hash entries.

# File lib/fluent/plugin/out_datadog_log.rb, line 502
def each_valid_record(chunk)
  chunk.msgpack_each do |event|
    record = event.last
    unless record.is_a?(Hash)
      @log.warn 'Dropping log entries with malformed record: ' \
                "'#{record.inspect}'. " \
                'A log record should be in JSON format.'
      next
    end
    tag = record.first
    sanitized_tag = sanitize_tag(tag)
    if sanitized_tag.nil?
      @log.warn "Dropping log entries with invalid tag: '#{tag.inspect}'." \
                ' A tag should be a string with utf8 characters.'
      next
    end
    yield event
  end
end
ec2_metadata() click to toggle source

EC2 Metadata server returns everything in one call. Store it after the first fetch to avoid making multiple calls.

# File lib/fluent/plugin/out_datadog_log.rb, line 435
def ec2_metadata
  fail "Called ec2_metadata with platform=#{@platform}" unless
    @platform == Platform::EC2
  unless @ec2_metadata
    # See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
    open('http://' + METADATA_SERVICE_ADDR +
         '/latest/dynamic/instance-identity/document') do |f|
      contents = f.read
      @ec2_metadata = JSON.parse(contents)
    end
  end

  @ec2_metadata
end
ensure_array(value) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 567
def ensure_array(value)
  Array.try_convert(value) || (fail JSON::ParserError, "#{value.class}")
end
ensure_hash(value) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 571
def ensure_hash(value)
  Hash.try_convert(value) || (fail JSON::ParserError, "#{value.class}")
end
fetch_gce_metadata(metadata_path) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 425
def fetch_gce_metadata(metadata_path)
  fail "Called fetch_gce_metadata with platform=#{@platform}" unless
    @platform == Platform::GCE
  # See https://cloud.google.com/compute/docs/metadata
  open('http://' + METADATA_SERVICE_ADDR + '/computeMetadata/v1/' +
       metadata_path, 'Metadata-Flavor' => 'Google', &:read)
end
increment_dropped_entries_count(count) click to toggle source

Increment the metric for the number of log entries that were dropped and not ingested by the Datadog Log Intake API.

# File lib/fluent/plugin/out_datadog_log.rb, line 597
def increment_dropped_entries_count(count)
  return unless @dropped_entries_count
  @dropped_entries_count.increment({}, count)
end
increment_failed_requests_count() click to toggle source

Increment the metric for the number of failed requests, labeled by the provided status code.

# File lib/fluent/plugin/out_datadog_log.rb, line 583
def increment_failed_requests_count
  return unless @failed_requests_count
  @failed_requests_count.increment
end
increment_ingested_entries_count(count) click to toggle source

Increment the metric for the number of log entries, successfully ingested by the Datadog Log Intake API.

# File lib/fluent/plugin/out_datadog_log.rb, line 590
def increment_ingested_entries_count(count)
  return unless @ingested_entries_count
  @ingested_entries_count.increment({}, count)
end
increment_retried_entries_count(count) click to toggle source

Increment the metric for the number of log entries that were dropped and not ingested by the Datadog Log Intake API.

# File lib/fluent/plugin/out_datadog_log.rb, line 604
def increment_retried_entries_count(count)
  return unless @retried_entries_count
  @retried_entries_count.increment({}, count)
end
increment_successful_requests_count() click to toggle source

Increment the metric for the number of successful requests.

# File lib/fluent/plugin/out_datadog_log.rb, line 576
def increment_successful_requests_count
  return unless @successful_requests_count
  @successful_requests_count.increment
end
init_api_client() click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 536
def init_api_client
  ssl = true
  ssl = { verify_mode: OpenSSL::SSL::VERIFY_NONE } if @skip_ssl_validation
  server = "#{@log_dd_url}:#{@log_dd_port}"
  @conn = Net::TCPClient.new(server: server, ssl: ssl)
end
parse_json_or_nil(input) click to toggle source
# File lib/fluent/plugin/out_datadog_log.rb, line 368
def parse_json_or_nil(input)
  # Only here to please rubocop...
  return nil if input.nil?

  input.each_codepoint do |c|
    if c == 123
      # left curly bracket (U+007B)
      begin
        return JSON.parse(input)
      rescue JSON::ParserError
        return nil
      end
    else
      # Break (and return nil) unless the current character is whitespace,
      # in which case we continue to look for a left curly bracket.
      # Whitespace as per the JSON spec are: tabulation (U+0009),
      # line feed (U+000A), carriage return (U+000D), and space (U+0020).
      break unless c == 9 || c == 10 || c == 13 || c == 32
    end # case
  end # do
  nil
end
sanitize_tag(tag) click to toggle source

Given a tag, returns the corresponding valid tag if possible, or nil if the tag should be rejected. If 'require_valid_tags' is false, non-string tags are converted to strings, and invalid characters are sanitized; otherwise such tags are rejected.

# File lib/fluent/plugin/out_datadog_log.rb, line 526
def sanitize_tag(tag)
  if @require_valid_tags &&
     (!tag.is_a?(String) || tag == '' || convert_to_utf8(tag) != tag)
    return nil
  end
  tag = convert_to_utf8(tag.to_s)
  tag = '_' if tag == ''
  tag
end
set_required_metadata_variables() click to toggle source

Set required variables like @vm_id, @vm_name and @zone.

# File lib/fluent/plugin/out_datadog_log.rb, line 451
def set_required_metadata_variables
  set_vm_id
  set_vm_name
  set_zone

  # All metadata parameters must now be set.
  missing = []
  missing << 'zone' unless @zone
  missing << 'vm_id' unless @vm_id
  missing << 'vm_name' unless @vm_name
  return if missing.empty?
  fail Fluent::ConfigError, 'Unable to obtain metadata parameters: ' +
    missing.join(' ')
end
set_vm_id() click to toggle source
  1. Return the value if it is explicitly set in the config already.

  2. If not, try to retrieve it by calling metadata servers directly.

# File lib/fluent/plugin/out_datadog_log.rb, line 468
def set_vm_id
  @vm_id ||= ec2_metadata['instanceId'] if @platform == Platform::EC2
rescue StandardError => e
  @log.error 'Failed to obtain vm_id: ', error: e
end
set_vm_name() click to toggle source
  1. Return the value if it is explicitly set in the config already.

  2. If not, try to retrieve it locally.

# File lib/fluent/plugin/out_datadog_log.rb, line 476
def set_vm_name
  @vm_name ||= Socket.gethostname
rescue StandardError => e
  @log.error 'Failed to obtain vm name: ', error: e
end
set_zone() click to toggle source
  1. Return the value if it is explicitly set in the config already.

  2. If not, try to retrieve it locally.

# File lib/fluent/plugin/out_datadog_log.rb, line 484
def set_zone
  @zone ||= 'aws:' + ec2_metadata['availabilityZone'] if
    @platform == Platform::EC2 && ec2_metadata.key?('availabilityZone')
rescue StandardError => e
  @log.error 'Failed to obtain location: ', error: e
end