class Fluent::Plugin::ForkOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_fork.rb, line 11 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_fork.rb, line 25 def configure(conf) super fallbacks = %w(skip drop log) raise Fluent::ConfigError, "max_fallback must be one of #{fallbacks.inspect}" unless fallbacks.include?(@max_fallback) end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_fork.rb, line 32 def process(tag, es) es.each do |time, record| org_value = record[@fork_key] if org_value.nil? log.trace "#{tag} - #{time}: skip to fork #{@fork_key}=#{org_value}" next end log.trace "#{tag} - #{time}: try to fork #{@fork_key}=#{org_value}" values = [] case @fork_value_type when 'csv' values = org_value.split(@separator) when 'array' values = org_value else values = org_value end values = values.uniq unless @no_unique if @max_size && @max_size < values.size case @max_fallback when 'skip' log.warn "#{tag} - #{time}: Skip too many forked values (max=#{@max_size}) : #{org_value}" next when 'drop' log.warn "#{tag} - #{time}: Drop too many forked values (max=#{@max_size}) : #{org_value}" values = values.take(@max_size) when 'log' log.info "#{tag} - #{time}: Too many forked values (max=#{@max_size}) : #{org_value}" end end values.reject{ |value| value.to_s == '' }.each_with_index do |value, i| log.trace "#{tag} - #{time}: reemit #{@output_key}=#{value} for #{@output_tag}" new_record = record.reject{ |k, v| k == @fork_key }.merge(@output_key => value) new_record.merge!(@index_key => i) unless @index_key.nil? router.emit(@output_tag, time, new_record) end end rescue => e log.error "#{e.message}: #{e.backtrace.join(', ')}" end