class Fluent::Plugin::DatadogOutput
fluentd output plugin for the Datadog Log Intake API
Constants
- DD_MAX_MESSAGE_LEN
MaxMessageLen is the maximum length for any message we send to the intake see github.com/DataDog/datadog-log-agent/blob/2394da8c79a6cadbcd1e98d6c89c437becec2732/pkg/config/constants.go#L9-L10
- DEFAULT_BUFFER_TYPE
- MAX_MESSAGE_LEN
- METADATA_SERVICE_ADDR
Address of the metadata service.
- PLUGIN_NAME
- PLUGIN_VERSION
- TRUNCATED_LEN
- TRUNCATED_MSG
Disable this warning to conform to fluentd config_param conventions. rubocop:disable Style/HashSyntax
Attributes
rubocop:enable Style/HashSyntax
Public Class Methods
# File lib/fluent/plugin/out_datadog_log.rb, line 128 def initialize super # use the global logger @log = $log # rubocop:disable Style/GlobalVars end
Public Instance Methods
# File lib/fluent/plugin/out_datadog_log.rb, line 167 def build_api_key_str(api_key:, logset:) if !logset.nil? && logset != '' "#{api_key}/#{logset}" else api_key end end
# File lib/fluent/plugin/out_datadog_log.rb, line 163 def build_extra_content(timestamp:, hostname:, service:, tags_payload:) "<46>0 #{timestamp} #{hostname} #{service} - - #{tags_payload}" end
build_payload
returns a processed payload from a raw message @param [String] api_key_str @param [String] extra_content @param [String] msg
# File lib/fluent/plugin/out_datadog_log.rb, line 179 def build_payload(api_key_str:, msg:, extra_content:) "#{api_key_str} #{extra_content} #{msg}\\n" end
# File lib/fluent/plugin/out_datadog_log.rb, line 183 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super if @api_key.size == 0 @api_key = ENV['DD_LOG_API_KEY'] if @api_key == '' || @api_key.nil? error_message = 'Unable to obtain api_key from DD_LOG_API_KEY' fail Fluent::ConfigError, error_message end end # If monitoring is enabled, register metrics in the default registry # and store metric objects for future use. if @enable_monitoring registry = Monitoring::MonitoringRegistryFactory.create @monitoring_type @successful_requests_count = registry.counter( :datadog_successful_requests_count, 'A number of successful requests to the Datadog Log Intake API') @failed_requests_count = registry.counter( :datadog_failed_requests_count, 'A number of failed requests to the Datadog Log Intake API,'\ ' broken down by the error code') @ingested_entries_count = registry.counter( :datadog_ingested_entries_count, 'A number of log entries ingested by Datadog Log Intake') @dropped_entries_count = registry.counter( :datadog_dropped_entries_count, 'A number of log entries dropped by the Stackdriver output plugin') @retried_entries_count = registry.counter( :datadog_retried_entries_count, 'The number of log entries that failed to be ingested by the'\ ' Stackdriver output plugin due to a transient error and were'\ ' retried') end @platform = detect_platform # Set required variables: @project_id, @vm_id, @vm_name and @zone. set_required_metadata_variables @default_tags = build_default_tags # The resource and labels are now set up; ensure they can't be modified # without first duping them. @default_tags.freeze # Log an informational message containing the Logs viewer URL @log.info 'Logs viewer address: https://example.com/logs/' end
# File lib/fluent/plugin/out_datadog_log.rb, line 246 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end
# File lib/fluent/plugin/out_datadog_log.rb, line 251 def formatted_to_msgpack_binary? true end
# File lib/fluent/plugin/out_datadog_log.rb, line 255 def multi_workers_ready? true end
# File lib/fluent/plugin/out_datadog_log.rb, line 241 def shutdown super @conn.close unless @conn.nil? end
# File lib/fluent/plugin/out_datadog_log.rb, line 234 def start super init_api_client @successful_call = false @timenanos_warning = false end
# File lib/fluent/plugin/out_datadog_log.rb, line 134 def truncate_message(msg) if msg.size > DD_MAX_MESSAGE_LEN msg.slice(0, MAX_MESSAGE_LEN) + TRUNCATED_MSG else msg end end
# File lib/fluent/plugin/out_datadog_log.rb, line 259 def write(chunk) each_valid_record(chunk) do |_tag, time, record| if @detect_json # Save the timestamp and severity if available, then clear it out to # allow for determining whether we should parse the log or message # field. timestamp = record.delete('time') severity = record.delete('severity') # If the log is json, we want to export it as a structured log # unless there is additional metadata that would be lost. record_json = nil if record.length == 1 %w(log message msg).each do |field| if record.key?(field) record_json = parse_json_or_nil(record[field]) end end end record = record_json unless record_json.nil? # Restore timestamp and severity if necessary. Note that we don't # want to override these keys in the JSON we've just parsed. record['time'] ||= timestamp if timestamp record['severity'] ||= severity if severity end # TODO: Correlate Datadog APM spans with log messages # fq_trace_id = record.delete(@trace_key) # entry.trace = fq_trace_id if fq_trace_id begin msg = nil %w(log message msg).each do |field| msg = record[field] if record.key?(field) end tags = [] kube = record['kubernetes'] || {} mappings = { 'pod_name' => 'pod_name', 'container_name' => 'container_name', 'namespace_name' => 'kube_namespace' } mappings.each do |json_key, tag_key| tags << "#{tag_key}=#{kube[json_key]}" if kube.key? json_key end if kube.key? 'labels' labels = kube['labels'] labels.each do |k, v| tags << "kube_#{k}=#{v}" end end # TODO: Include K8S tags like # - kube_daemon_set=$daemonset_name # - kube_deployment=$deployment_name # - kube_replica_set=$replicaset_name # - tags.concat(@default_tags) datetime = Time.at(Fluent::EventTime.new(time).to_r).utc.to_datetime timestamp_str = datetime.rfc3339(6) payload = build_payload( api_key_str: build_api_key_str(api_key: @api_key, logset: @logset), msg: truncate_message(msg), extra_content: build_extra_content( timestamp: timestamp_str, hostname: @vm_id, service: @service, tags_payload: build_tags_payload( config_tags: tags, source: @source, source_category: @source_category ) ) ) entries_count = 1 @conn.write(payload) @log.debug 'Sent payload to Datadog.', payload: payload increment_successful_requests_count increment_ingested_entries_count(entries_count) # Let the user explicitly know when the first call succeeded, to aid # with verification and troubleshooting. unless @successful_call @successful_call = true @log.info 'Successfully sent to Datadog.' end rescue => error increment_failed_requests_count increment_retried_entries_count(entries_count) # RPC cancelled, so retry via re-raising the error. @log.debug "Retrying #{entries_count} log message(s) later.", error: error.to_s raise error end end end
Private Instance Methods
Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out.
# File lib/fluent/plugin/out_datadog_log.rb, line 547 def convert_to_utf8(input) if @coerce_to_utf8 input.encode( 'utf-8', invalid: :replace, undef: :replace, replace: @non_utf8_replacement_string) else begin input.encode('utf-8') rescue EncodingError @log.error 'Encountered encoding issues potentially due to non ' \ 'UTF-8 characters. To allow non-UTF-8 characters and ' \ 'replace them with spaces, please set "coerce_to_utf8" ' \ 'to true.' raise end end end
Determine what platform we are running on by consulting the metadata service (unless the user has explicitly disabled using that).
# File lib/fluent/plugin/out_datadog_log.rb, line 400 def detect_platform unless @use_metadata_service @log.info 'use_metadata_service is false; not detecting platform' return Platform::OTHER end begin open('http://' + METADATA_SERVICE_ADDR) do |f| if f.meta['metadata-flavor'] == 'Google' @log.info 'Detected GCE platform' return Platform::GCE end if f.meta['server'] == 'EC2ws' @log.info 'Detected EC2 platform' return Platform::EC2 end end rescue StandardError => e @log.error 'Failed to access metadata service: ', error: e end @log.info 'Unable to determine platform' Platform::OTHER end
Filter out invalid non-Hash entries.
# File lib/fluent/plugin/out_datadog_log.rb, line 502 def each_valid_record(chunk) chunk.msgpack_each do |event| record = event.last unless record.is_a?(Hash) @log.warn 'Dropping log entries with malformed record: ' \ "'#{record.inspect}'. " \ 'A log record should be in JSON format.' next end tag = record.first sanitized_tag = sanitize_tag(tag) if sanitized_tag.nil? @log.warn "Dropping log entries with invalid tag: '#{tag.inspect}'." \ ' A tag should be a string with utf8 characters.' next end yield event end end
EC2 Metadata server returns everything in one call. Store it after the first fetch to avoid making multiple calls.
# File lib/fluent/plugin/out_datadog_log.rb, line 435 def ec2_metadata fail "Called ec2_metadata with platform=#{@platform}" unless @platform == Platform::EC2 unless @ec2_metadata # See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html open('http://' + METADATA_SERVICE_ADDR + '/latest/dynamic/instance-identity/document') do |f| contents = f.read @ec2_metadata = JSON.parse(contents) end end @ec2_metadata end
# File lib/fluent/plugin/out_datadog_log.rb, line 567 def ensure_array(value) Array.try_convert(value) || (fail JSON::ParserError, "#{value.class}") end
# File lib/fluent/plugin/out_datadog_log.rb, line 571 def ensure_hash(value) Hash.try_convert(value) || (fail JSON::ParserError, "#{value.class}") end
# File lib/fluent/plugin/out_datadog_log.rb, line 425 def fetch_gce_metadata(metadata_path) fail "Called fetch_gce_metadata with platform=#{@platform}" unless @platform == Platform::GCE # See https://cloud.google.com/compute/docs/metadata open('http://' + METADATA_SERVICE_ADDR + '/computeMetadata/v1/' + metadata_path, 'Metadata-Flavor' => 'Google', &:read) end
Increment the metric for the number of log entries that were dropped and not ingested by the Datadog Log Intake API.
# File lib/fluent/plugin/out_datadog_log.rb, line 597 def increment_dropped_entries_count(count) return unless @dropped_entries_count @dropped_entries_count.increment({}, count) end
Increment the metric for the number of failed requests, labeled by the provided status code.
# File lib/fluent/plugin/out_datadog_log.rb, line 583 def increment_failed_requests_count return unless @failed_requests_count @failed_requests_count.increment end
Increment the metric for the number of log entries, successfully ingested by the Datadog Log Intake API.
# File lib/fluent/plugin/out_datadog_log.rb, line 590 def increment_ingested_entries_count(count) return unless @ingested_entries_count @ingested_entries_count.increment({}, count) end
Increment the metric for the number of log entries that were dropped and not ingested by the Datadog Log Intake API.
# File lib/fluent/plugin/out_datadog_log.rb, line 604 def increment_retried_entries_count(count) return unless @retried_entries_count @retried_entries_count.increment({}, count) end
Increment the metric for the number of successful requests.
# File lib/fluent/plugin/out_datadog_log.rb, line 576 def increment_successful_requests_count return unless @successful_requests_count @successful_requests_count.increment end
# File lib/fluent/plugin/out_datadog_log.rb, line 536 def init_api_client ssl = true ssl = { verify_mode: OpenSSL::SSL::VERIFY_NONE } if @skip_ssl_validation server = "#{@log_dd_url}:#{@log_dd_port}" @conn = Net::TCPClient.new(server: server, ssl: ssl) end
# File lib/fluent/plugin/out_datadog_log.rb, line 368 def parse_json_or_nil(input) # Only here to please rubocop... return nil if input.nil? input.each_codepoint do |c| if c == 123 # left curly bracket (U+007B) begin return JSON.parse(input) rescue JSON::ParserError return nil end else # Break (and return nil) unless the current character is whitespace, # in which case we continue to look for a left curly bracket. # Whitespace as per the JSON spec are: tabulation (U+0009), # line feed (U+000A), carriage return (U+000D), and space (U+0020). break unless c == 9 || c == 10 || c == 13 || c == 32 end # case end # do nil end
Given a tag, returns the corresponding valid tag if possible, or nil if the tag should be rejected. If 'require_valid_tags' is false, non-string tags are converted to strings, and invalid characters are sanitized; otherwise such tags are rejected.
# File lib/fluent/plugin/out_datadog_log.rb, line 526 def sanitize_tag(tag) if @require_valid_tags && (!tag.is_a?(String) || tag == '' || convert_to_utf8(tag) != tag) return nil end tag = convert_to_utf8(tag.to_s) tag = '_' if tag == '' tag end
Set required variables like @vm_id, @vm_name and @zone.
# File lib/fluent/plugin/out_datadog_log.rb, line 451 def set_required_metadata_variables set_vm_id set_vm_name set_zone # All metadata parameters must now be set. missing = [] missing << 'zone' unless @zone missing << 'vm_id' unless @vm_id missing << 'vm_name' unless @vm_name return if missing.empty? fail Fluent::ConfigError, 'Unable to obtain metadata parameters: ' + missing.join(' ') end
-
Return the value if it is explicitly set in the config already.
-
If not, try to retrieve it by calling metadata servers directly.
# File lib/fluent/plugin/out_datadog_log.rb, line 468 def set_vm_id @vm_id ||= ec2_metadata['instanceId'] if @platform == Platform::EC2 rescue StandardError => e @log.error 'Failed to obtain vm_id: ', error: e end
-
Return the value if it is explicitly set in the config already.
-
If not, try to retrieve it locally.
# File lib/fluent/plugin/out_datadog_log.rb, line 476 def set_vm_name @vm_name ||= Socket.gethostname rescue StandardError => e @log.error 'Failed to obtain vm name: ', error: e end
-
Return the value if it is explicitly set in the config already.
-
If not, try to retrieve it locally.
# File lib/fluent/plugin/out_datadog_log.rb, line 484 def set_zone @zone ||= 'aws:' + ec2_metadata['availabilityZone'] if @platform == Platform::EC2 && ec2_metadata.key?('availabilityZone') rescue StandardError => e @log.error 'Failed to obtain location: ', error: e end