class Fluent::Plugin::ForkOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_fork.rb, line 11
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_fork.rb, line 25
def configure(conf)
  super

  fallbacks = %w(skip drop log)
  raise Fluent::ConfigError, "max_fallback must be one of #{fallbacks.inspect}" unless fallbacks.include?(@max_fallback)
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_fork.rb, line 32
def process(tag, es)
  es.each do |time, record|
    org_value = record[@fork_key]
    if org_value.nil?
      log.trace "#{tag} - #{time}: skip to fork #{@fork_key}=#{org_value}"
      next
    end
    log.trace "#{tag} - #{time}: try to fork #{@fork_key}=#{org_value}"

    values = []
    case @fork_value_type
    when 'csv'
      values = org_value.split(@separator)
    when 'array'
      values = org_value
    else
      values = org_value
    end

    values = values.uniq unless @no_unique

    if @max_size && @max_size < values.size
      case @max_fallback
      when 'skip'
        log.warn "#{tag} - #{time}: Skip too many forked values (max=#{@max_size}) : #{org_value}"
        next
      when 'drop'
        log.warn "#{tag} - #{time}: Drop too many forked values (max=#{@max_size}) : #{org_value}"
        values = values.take(@max_size)
      when 'log'
        log.info "#{tag} - #{time}: Too many forked values (max=#{@max_size}) : #{org_value}"
      end
    end

    values.reject{ |value| value.to_s == '' }.each_with_index do |value, i|
      log.trace "#{tag} - #{time}: reemit #{@output_key}=#{value} for #{@output_tag}"
      new_record = record.reject{ |k, v| k == @fork_key }.merge(@output_key => value)
      new_record.merge!(@index_key => i) unless @index_key.nil?
      router.emit(@output_tag, time, new_record)
    end
  end
rescue => e
  log.error "#{e.message}: #{e.backtrace.join(', ')}"
end