class Fluent::Plugin::ParserFilter

Constants

FAILED_RESULT
REPLACE_CHAR

Attributes

parser[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Filter#configure
# File lib/fluent/plugin/filter_parser.rb, line 48
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  @accessor = record_accessor_create(@key_name)
  @parser = parser_create
end
filter_with_time(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_parser.rb, line 60
def filter_with_time(tag, time, record)
  raw_value = @accessor.call(record)
  if raw_value.nil?
    if @emit_invalid_record_to_error
      router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
    end
    if @reserve_data
      return time, handle_parsed(tag, record, time, {})
    else
      return FAILED_RESULT
    end
  end
  begin
    # Note: https://github.com/fluent/fluentd/issues/4100
    # If the parser returns multiple records from one raw_value,
    # this returns only the first one record.
    # This should be fixed in the future version.
    result_time = nil
    result_record = nil

    @parser.parse(raw_value) do |t, values|
      if values
        t = if @reserve_time
              time
            else
              t.nil? ? time : t
            end
        @accessor.delete(record) if @remove_key_name_field
        r = handle_parsed(tag, record, t, values)

        if result_record.nil?
          result_time = t
          result_record = r
        else
          if @emit_invalid_record_to_error
            router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
              "Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
            ))
          end
        end
      else
        if @emit_invalid_record_to_error
          router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
        end

        next unless @reserve_data
        next unless result_record.nil?

        result_time = time
        result_record = handle_parsed(tag, record, time, {})
      end
    end

    return result_time, result_record
  rescue Fluent::Plugin::Parser::ParserError => e
    if @emit_invalid_record_to_error
      raise e
    else
      return FAILED_RESULT
    end
  rescue ArgumentError => e
    raise unless @replace_invalid_sequence
    raise unless e.message.index("invalid byte sequence in") == 0

    raw_value = raw_value.scrub(REPLACE_CHAR)
    retry
  rescue => e
    if @emit_invalid_record_to_error
      raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
    else
      return FAILED_RESULT
    end
  end
end

Private Instance Methods

handle_parsed(tag, record, t, values) click to toggle source
# File lib/fluent/plugin/filter_parser.rb, line 137
def handle_parsed(tag, record, t, values)
  if values && @inject_key_prefix
    values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]
  end
  r = @hash_value_field ? {@hash_value_field => values} : values
  if @reserve_data
    r = r ? record.merge(r) : record
  end
  r
end