class Fluent::Plugin::SplunkHecOutput

Constants

KEY_FIELDS
MISSING_FIELD
TAG_PLACEHOLDER
VERSION

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::SplunkOutput::new
# File lib/fluent/plugin/out_splunk_hec.rb, line 127
def initialize
  super
  @default_host = Socket.gethostname
  @extra_fields = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::SplunkOutput#configure
# File lib/fluent/plugin/out_splunk_hec.rb, line 133
def configure(conf)
  super

  check_metric_configs
  pick_custom_format_method
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 166
def format(tag, time, record)
  # this method will be replaced in `configure`
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 170
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 161
def shutdown
  super
  @conn.shutdown
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 140
def start
  super
  @conn = Net::HTTP::Persistent.new.tap do |c|
    c.verify_mode = @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER
    c.cert = OpenSSL::X509::Certificate.new File.read(@client_cert) if @client_cert
    c.key = OpenSSL::PKey::RSA.new File.read(@client_key) if @client_key
    c.ca_file = @ca_file
    c.ca_path = @ca_path
    c.ciphers = @ssl_ciphers
    c.proxy   = :ENV
    c.min_version = OpenSSL::SSL::TLS1_1_VERSION if @require_ssl_min_version

    c.override_headers['Content-Type'] = 'application/json'
    c.override_headers['User-Agent'] = "fluent-plugin-splunk_hec_out/#{VERSION}"
    c.override_headers['Authorization'] = "Splunk #{@hec_token}"
    c.override_headers['__splunk_app_name'] = "#{@app_name}"
    c.override_headers['__splunk_app_version'] = "#{@app_version}"

  end
end

Private Instance Methods

check_metric_configs() click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 178
def check_metric_configs
  return unless @data_type == :metric

  @metrics_from_event = false if @metric_name_key

  return if @metrics_from_event

  raise Fluent::ConfigError, '`metric_name_key` is required when `metrics_from_event` is `false`.' unless @metric_name_key
  raise Fluent::ConfigError, '`metric_value_key` is required when `metric_name_key` is set.' unless @metric_value_key
end
construct_api() click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 281
def construct_api
  URI("#{@protocol}://#{@hec_host}:#{@hec_port}/services/collector")
rescue StandardError
  raise Fluent::ConfigError, "hec_host (#{@hec_host}) and/or hec_port (#{@hec_port}) are invalid."
end
convert_to_utf8(input) click to toggle source

Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284

# File lib/fluent/plugin/out_splunk_hec.rb, line 341
def convert_to_utf8(input)
  if input.is_a?(Hash)
    record = {}
    input.each do |key, value|
      record[convert_to_utf8(key)] = convert_to_utf8(value)
    end

    return record
  end
  return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array)
  return input unless input.respond_to?(:encode)

  if @coerce_to_utf8
    input.encode(
      'utf-8',
      invalid: :replace,
      undef: :replace,
      replace: @non_utf8_replacement_string)
  else
    begin
      input.encode('utf-8')
    rescue EncodingError
      log.error { 'Encountered encoding issues potentially due to non ' \
          'UTF-8 characters. To allow non-UTF-8 characters and ' \
          'replace them with spaces, please set "coerce_to_utf8" ' \
          'to true.' }
      raise
    end
  end
end
format_event(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 189
def format_event(tag, time, record)
  d = {
    host: @host ? @host.(tag, record) : @default_host,
    # From the API reference
    # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
    # `time` should be a string or unsigned integer.
    # That's why we use the to_string function here.
    time: time.to_f.to_s
    }.tap { |payload|
      if @time
        time_value = @time.(tag, record)
        # if no value is found don't override and use fluentd's time
        if !time_value.nil?
          payload[:time] = time_value
        end
      end

      payload[:index] = @index.(tag, record) if @index
      payload[:source] = @source.(tag, record) if @source
      payload[:sourcetype] = @sourcetype.(tag, record) if @sourcetype

      # delete nil fields otherwise will get formet error from HEC
      %i[host index source sourcetype].each { |f| payload.delete f if payload[f].nil? }

      if @extra_fields
        payload[:fields] = @extra_fields.map { |name, field| [name, record[field]] }.to_h
        payload[:fields].delete_if { |_k,v| v.nil? }
        # if a field is already in indexed fields, then remove it from the original event
        @extra_fields.values.each { |field| record.delete field }
      end
      if formatter = @formatters.find { |f| f.match? tag }
        record = formatter.format(tag, time, record)
      end
      payload[:event] = convert_to_utf8 record
  }
  if d[:event] == "{}"
    log.warn { "Event after formatting was blank, not sending" }
    return ""
  end
  MultiJson.dump(d)
end
format_metric(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 231
def format_metric(tag, time, record)
  payload = {
    host: @host ? @host.call(tag, record) : @default_host,
    # From the API reference
    # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
    # `time` should be a string or unsigned integer.
    # That's why we use `to_s` here.
    time: time.to_f.to_s,
    event: 'metric'
  }.tap do |payload|
    if @time
      time_value = @time.(tag, record)
      # if no value is found don't override and use fluentd's time
      if !time_value.nil?
        payload[:time] = time_value
      end
    end
  end
  payload[:index] = @index.call(tag, record) if @index
  payload[:source] = @source.call(tag, record) if @source
  payload[:sourcetype] = @sourcetype.call(tag, record) if @sourcetype

  unless @metrics_from_event
    fields = {
      metric_name: @metric_name.call(tag, record),
      _value: @metric_value.call(tag, record)
    }

    if @extra_fields
      fields.update @extra_fields.map { |name, field| [name, record[field]] }.to_h
      fields.delete_if { |_k,v| v.nil? }
    else
      fields.update record
    end

    fields.delete_if { |_k,v| v.nil? }

    payload[:fields] = convert_to_utf8 fields

    return MultiJson.dump(payload)
  end

  # when metrics_from_event is true, generate one metric event for each key-value in record
  payloads = record.map do |key, value|
    { fields: { metric_name: key, _value: value } }.merge! payload
  end

  payloads.map!(&MultiJson.method(:dump)).join
end
new_connection() click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 287
def new_connection
  Net::HTTP::Persistent.new.tap do |c|
    c.verify_mode = @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER
    c.cert = OpenSSL::X509::Certificate.new File.read(@client_cert) if @client_cert
    c.key = OpenSSL::PKey::RSA.new File.read(@client_key) if @client_key
    c.ca_file = @ca_file
    c.ca_path = @ca_path
    c.ciphers = @ssl_ciphers
    c.proxy   = :ENV
    c.idle_timeout = @idle_timeout
    c.read_timeout = @read_timeout
    c.open_timeout = @open_timeout
    c.min_version = OpenSSL::SSL::TLS1_1_VERSION if @require_ssl_min_version

    c.override_headers['Content-Type'] = 'application/json'
    c.override_headers['User-Agent'] = "fluent-plugin-splunk_hec_out/#{VERSION}"
    c.override_headers['Authorization'] = "Splunk #{@hec_token}"
    c.override_headers['__splunk_app_name'] = "#{@app_name}"
    c.override_headers['__splunk_app_version'] = "#{@app_version}"

  end
end
write_to_splunk(chunk) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 310
def write_to_splunk(chunk)
  post = Net::HTTP::Post.new @api.request_uri
  post.body = chunk.read
  log.debug { "[Sending] Chunk: #{dump_unique_id_hex(chunk.unique_id)}(#{post.body.bytesize}B)." }
  log.trace { "POST #{@api} body=#{post.body}" }

  t1 = Time.now
  response = @conn.request @api, post
  t2 = Time.now

  raise_err = response.code.to_s.start_with?('5') || (!@consume_chunk_on_4xx_errors && response.code.to_s.start_with?('4'))

  # raise Exception to utilize Fluentd output plugin retry mechanism
  raise "Server error (#{response.code}) for POST #{@api}, response: #{response.body}" if raise_err

  # For both success response (2xx) we will consume the chunk.
  if not response.code.start_with?('2')
    log.error "Failed POST to #{@api}, response: #{response.body}"
    log.debug { "Failed request body: #{post.body}" }
  end

  log.debug { "[Response] Chunk: #{dump_unique_id_hex(chunk.unique_id)} Size: #{post.body.bytesize} Response: #{response.inspect} Duration: #{t2 - t1}" }
  process_response(response, post.body)
end