class Fluent::Plugin::SplunkOutput
Constants
- KEY_FIELDS
- TAG_PLACEHOLDER
- VERSION
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk.rb, line 77 def initialize super @registry = ::Prometheus::Client.registry end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk.rb, line 82 def configure(conf) super check_conflict @api = construct_api prepare_key_fields configure_fields(conf) configure_metrics(conf) # @formatter_configs is from formatter helper @formatters = @formatter_configs.map do |section| MatchFormatter.new section.usage, formatter_create(usage: section.usage) end end
construct_api()
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 114 def construct_api raise NotImplementedError("Child class should implement 'construct_api'") end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 96 def write(chunk) log.trace { "#{self.class}: Received new chunk, size=#{chunk.read.bytesize}" } t = Benchmark.realtime do write_to_splunk(chunk) end @metrics[:record_counter].increment(labels: metric_labels, by: chunk.size_of_events) @metrics[:bytes_counter].increment(labels: metric_labels, by: chunk.bytesize) @metrics[:write_records_histogram].observe(chunk.size_of_events, labels: metric_labels) @metrics[:write_bytes_histogram].observe(chunk.bytesize, labels: metric_labels, ) @metrics[:write_latency_histogram].observe(t, labels: metric_labels, ) end
write_to_splunk(_chunk)
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 110 def write_to_splunk(_chunk) raise NotImplementedError("Child class should implement 'write_to_splunk'") end
Protected Instance Methods
format_event(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 149 def format_event(tag, time, record) MultiJson.dump(prepare_event_payload(tag, time, record)) end
prepare_event_payload(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 120 def prepare_event_payload(tag, time, record) { host: @host ? @host.call(tag, record) : @default_host, # From the API reference # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector # `time` should be a string or unsigned integer. # That's why we use `to_s` here. time: time.to_f.to_s }.tap do |payload| payload[:index] = @index.call(tag, record) if @index payload[:source] = @source.call(tag, record) if @source payload[:sourcetype] = @sourcetype.call(tag, record) if @sourcetype # delete nil fields otherwise will get format error from HEC %i[host index source sourcetype].each { |f| payload.delete f if payload[f].nil? } if @extra_fields payload[:fields] = @extra_fields.map { |name, field| [name, record[field]] }.to_h payload[:fields].compact! # if a field is already in indexed fields, then remove it from the original event @extra_fields.values.each { |field| record.delete field } end if formatter = @formatters.find { |f| f.match? tag } record = formatter.format(tag, time, record) end payload[:event] = convert_to_utf8 record end end
process_response(response, _request_body)
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 153 def process_response(response, _request_body) log.trace { "[Response] POST #{@api}: #{response.inspect}" } @metrics[:status_counter].increment(labels: metric_labels(status: response.code.to_s)) raise_err = response.code.to_s.start_with?('5') || (!@consume_chunk_on_4xx_errors && response.code.to_s.start_with?('4')) # raise Exception to utilize Fluentd output plugin retry mechanism raise "Server error (#{response.code}) for POST #{@api}, response: #{response.body}" if raise_err # For both success response (2xx) we will consume the chunk. unless response.code.to_s.start_with?('2') log.error "#{self.class}: Failed POST to #{@api}, response: #{response.body}" log.error { "#{self.class}: Failed request body: #{post.body}" } end end
Private Instance Methods
check_conflict()
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 172 def check_conflict KEY_FIELDS.each do |f| kf = "#{f}_key" raise Fluent::ConfigError, "Can not set #{f} and #{kf} at the same time." \ if %W[@#{f} @#{kf}].all? &method(:instance_variable_get) end end
configure_fields(conf)
click to toggle source
<fields> directive, which defines:
-
when data_type is event, index-time fields
-
when data_type is metric, metric dimensions
# File lib/fluent/plugin/out_splunk.rb, line 208 def configure_fields(conf) # This loop looks dump, but it is used to suppress the unused parameter configuration warning # Learned from `filter_record_transformer`. conf.elements.select { |element| element.name == 'fields' }.each do |element| element.each_pair { |k, _v| element.has_key?(k) } end return unless @fields @extra_fields = @fields.corresponding_config_element.map do |k, v| [k, v.empty? ? k : v] end.to_h end
configure_metrics(conf)
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 230 def configure_metrics(conf) @metric_labels = { type: conf['@type'], plugin_id: plugin_id } @metrics = { record_counter: register_metric(::Prometheus::Client::Counter.new( :splunk_output_write_records_count, docstring: 'The number of log records being sent', labels: metric_label_keys )), bytes_counter: register_metric(::Prometheus::Client::Counter.new( :splunk_output_write_bytes_count, docstring: 'The number of log bytes being sent', labels: metric_label_keys )), status_counter: register_metric(::Prometheus::Client::Counter.new( :splunk_output_write_status_count, docstring: 'The count of sends by response_code', labels: metric_label_keys(status: "") )), write_bytes_histogram: register_metric(::Prometheus::Client::Histogram.new( :splunk_output_write_payload_bytes, docstring: 'The size of the write payload in bytes', buckets: [1024, 23_937, 47_875, 95_750, 191_500, 383_000, 766_000, 1_149_000], labels: metric_label_keys )), write_records_histogram: register_metric(::Prometheus::Client::Histogram.new( :splunk_output_write_payload_records, docstring: 'The number of records written per write', buckets: [1, 10, 25, 100, 200, 300, 500, 750, 1000, 1500], labels: metric_label_keys )), write_latency_histogram: register_metric(::Prometheus::Client::Histogram.new( :splunk_output_write_latency_seconds, docstring: 'The latency of writes', labels: metric_label_keys )) } end
convert_to_utf8(input)
click to toggle source
Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284
# File lib/fluent/plugin/out_splunk.rb, line 285 def convert_to_utf8(input) if input.is_a?(Hash) record = {} input.each do |key, value| record[convert_to_utf8(key)] = convert_to_utf8(value) end return record end return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array) return input unless input.respond_to?(:encode) if @coerce_to_utf8 input.encode( 'utf-8', invalid: :replace, undef: :replace, replace: @non_utf8_replacement_string ) else begin input.encode('utf-8') rescue EncodingError log.error do 'Encountered encoding issues potentially due to non ' \ 'UTF-8 characters. To allow non-UTF-8 characters and ' \ 'replace them with spaces, please set "coerce_to_utf8" ' \ 'to true.' end raise end end end
metric_label_keys(other_labels = {})
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 275 def metric_label_keys(other_labels = {}) (@metric_labels.merge other_labels).keys end
metric_labels(other_labels = {})
click to toggle source
Tag metrics with the type string that was used to register the plugin
# File lib/fluent/plugin/out_splunk.rb, line 271 def metric_labels(other_labels = {}) @metric_labels.merge other_labels end
pick_custom_format_method()
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 222 def pick_custom_format_method if @data_type == :event define_singleton_method :format, method(:format_event) else define_singleton_method :format, method(:format_metric) end end
prepare_key_fields()
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 180 def prepare_key_fields KEY_FIELDS.each do |f| v = instance_variable_get "@#{f}_key" if v attrs = v.split('.').freeze if @keep_keys instance_variable_set "@#{f}", ->(_, record) { attrs.inject(record) { |o, k| o[k] } } else instance_variable_set "@#{f}", lambda { |_, record| attrs[0...-1].inject(record) { |o, k| o[k] }.delete(attrs[-1]) } end else v = instance_variable_get "@#{f}" next unless v if v == TAG_PLACEHOLDER instance_variable_set "@#{f}", ->(tag, _) { tag } else instance_variable_set "@#{f}", ->(_, _) { v } end end end end
register_metric(metric)
click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 319 def register_metric(metric) if !@registry.exist?(metric.name) @registry.register(metric) else @registry.get(metric.name) end end