class Fluent::Plugin::SplunkHecOutput
Constants
- KEY_FIELDS
- MISSING_FIELD
- TAG_PLACEHOLDER
- VERSION
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Plugin::SplunkOutput::new
# File lib/fluent/plugin/out_splunk_hec.rb, line 127 def initialize super @default_host = Socket.gethostname @extra_fields = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::SplunkOutput#configure
# File lib/fluent/plugin/out_splunk_hec.rb, line 133 def configure(conf) super check_metric_configs pick_custom_format_method end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 166 def format(tag, time, record) # this method will be replaced in `configure` end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 170 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 161 def shutdown super @conn.shutdown end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 140 def start super @conn = Net::HTTP::Persistent.new.tap do |c| c.verify_mode = @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER c.cert = OpenSSL::X509::Certificate.new File.read(@client_cert) if @client_cert c.key = OpenSSL::PKey::RSA.new File.read(@client_key) if @client_key c.ca_file = @ca_file c.ca_path = @ca_path c.ciphers = @ssl_ciphers c.proxy = :ENV c.min_version = OpenSSL::SSL::TLS1_1_VERSION if @require_ssl_min_version c.override_headers['Content-Type'] = 'application/json' c.override_headers['User-Agent'] = "fluent-plugin-splunk_hec_out/#{VERSION}" c.override_headers['Authorization'] = "Splunk #{@hec_token}" c.override_headers['__splunk_app_name'] = "#{@app_name}" c.override_headers['__splunk_app_version'] = "#{@app_version}" end end
Private Instance Methods
check_metric_configs()
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 178 def check_metric_configs return unless @data_type == :metric @metrics_from_event = false if @metric_name_key return if @metrics_from_event raise Fluent::ConfigError, '`metric_name_key` is required when `metrics_from_event` is `false`.' unless @metric_name_key raise Fluent::ConfigError, '`metric_value_key` is required when `metric_name_key` is set.' unless @metric_value_key end
construct_api()
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 281 def construct_api URI("#{@protocol}://#{@hec_host}:#{@hec_port}/services/collector") rescue StandardError raise Fluent::ConfigError, "hec_host (#{@hec_host}) and/or hec_port (#{@hec_port}) are invalid." end
convert_to_utf8(input)
click to toggle source
Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284
# File lib/fluent/plugin/out_splunk_hec.rb, line 341 def convert_to_utf8(input) if input.is_a?(Hash) record = {} input.each do |key, value| record[convert_to_utf8(key)] = convert_to_utf8(value) end return record end return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array) return input unless input.respond_to?(:encode) if @coerce_to_utf8 input.encode( 'utf-8', invalid: :replace, undef: :replace, replace: @non_utf8_replacement_string) else begin input.encode('utf-8') rescue EncodingError log.error { 'Encountered encoding issues potentially due to non ' \ 'UTF-8 characters. To allow non-UTF-8 characters and ' \ 'replace them with spaces, please set "coerce_to_utf8" ' \ 'to true.' } raise end end end
format_event(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 189 def format_event(tag, time, record) d = { host: @host ? @host.(tag, record) : @default_host, # From the API reference # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector # `time` should be a string or unsigned integer. # That's why we use the to_string function here. time: time.to_f.to_s }.tap { |payload| if @time time_value = @time.(tag, record) # if no value is found don't override and use fluentd's time if !time_value.nil? payload[:time] = time_value end end payload[:index] = @index.(tag, record) if @index payload[:source] = @source.(tag, record) if @source payload[:sourcetype] = @sourcetype.(tag, record) if @sourcetype # delete nil fields otherwise will get formet error from HEC %i[host index source sourcetype].each { |f| payload.delete f if payload[f].nil? } if @extra_fields payload[:fields] = @extra_fields.map { |name, field| [name, record[field]] }.to_h payload[:fields].delete_if { |_k,v| v.nil? } # if a field is already in indexed fields, then remove it from the original event @extra_fields.values.each { |field| record.delete field } end if formatter = @formatters.find { |f| f.match? tag } record = formatter.format(tag, time, record) end payload[:event] = convert_to_utf8 record } if d[:event] == "{}" log.warn { "Event after formatting was blank, not sending" } return "" end MultiJson.dump(d) end
format_metric(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 231 def format_metric(tag, time, record) payload = { host: @host ? @host.call(tag, record) : @default_host, # From the API reference # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector # `time` should be a string or unsigned integer. # That's why we use `to_s` here. time: time.to_f.to_s, event: 'metric' }.tap do |payload| if @time time_value = @time.(tag, record) # if no value is found don't override and use fluentd's time if !time_value.nil? payload[:time] = time_value end end end payload[:index] = @index.call(tag, record) if @index payload[:source] = @source.call(tag, record) if @source payload[:sourcetype] = @sourcetype.call(tag, record) if @sourcetype unless @metrics_from_event fields = { metric_name: @metric_name.call(tag, record), _value: @metric_value.call(tag, record) } if @extra_fields fields.update @extra_fields.map { |name, field| [name, record[field]] }.to_h fields.delete_if { |_k,v| v.nil? } else fields.update record end fields.delete_if { |_k,v| v.nil? } payload[:fields] = convert_to_utf8 fields return MultiJson.dump(payload) end # when metrics_from_event is true, generate one metric event for each key-value in record payloads = record.map do |key, value| { fields: { metric_name: key, _value: value } }.merge! payload end payloads.map!(&MultiJson.method(:dump)).join end
new_connection()
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 287 def new_connection Net::HTTP::Persistent.new.tap do |c| c.verify_mode = @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER c.cert = OpenSSL::X509::Certificate.new File.read(@client_cert) if @client_cert c.key = OpenSSL::PKey::RSA.new File.read(@client_key) if @client_key c.ca_file = @ca_file c.ca_path = @ca_path c.ciphers = @ssl_ciphers c.proxy = :ENV c.idle_timeout = @idle_timeout c.read_timeout = @read_timeout c.open_timeout = @open_timeout c.min_version = OpenSSL::SSL::TLS1_1_VERSION if @require_ssl_min_version c.override_headers['Content-Type'] = 'application/json' c.override_headers['User-Agent'] = "fluent-plugin-splunk_hec_out/#{VERSION}" c.override_headers['Authorization'] = "Splunk #{@hec_token}" c.override_headers['__splunk_app_name'] = "#{@app_name}" c.override_headers['__splunk_app_version'] = "#{@app_version}" end end
write_to_splunk(chunk)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 310 def write_to_splunk(chunk) post = Net::HTTP::Post.new @api.request_uri post.body = chunk.read log.debug { "[Sending] Chunk: #{dump_unique_id_hex(chunk.unique_id)}(#{post.body.bytesize}B)." } log.trace { "POST #{@api} body=#{post.body}" } t1 = Time.now response = @conn.request @api, post t2 = Time.now raise_err = response.code.to_s.start_with?('5') || (!@consume_chunk_on_4xx_errors && response.code.to_s.start_with?('4')) # raise Exception to utilize Fluentd output plugin retry mechanism raise "Server error (#{response.code}) for POST #{@api}, response: #{response.body}" if raise_err # For both success response (2xx) we will consume the chunk. if not response.code.start_with?('2') log.error "Failed POST to #{@api}, response: #{response.body}" log.debug { "Failed request body: #{post.body}" } end log.debug { "[Response] Chunk: #{dump_unique_id_hex(chunk.unique_id)} Size: #{post.body.bytesize} Response: #{response.inspect} Duration: #{t2 - t1}" } process_response(response, post.body) end