class Fluent::Plugin::JsonlArraySplitterFilter

Public Instance Methods

filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_jsonl_array_splitter.rb, line 15
def filter_stream(tag, es)
    new_es = Fluent::MultiEventStream.new
    es.each do |time, record|
        record_lines = record[@key_name].to_s
        parsed = []
        record_lines.each_line do |line|
            if @debug then
                log.debug("Read JSON line: #{line}")
            end
            parsed << JSON.parse("#{line}")
        end

        parsed.each do |r|
            new_record = record.clone
            new_record.update(r)

            if !@reserve_key then
                new_record.delete(@key_name)
            end
            if @debug then
                log.debug("New record by json line filter: #{new_record}")
            end
            new_es.add(time, new_record)
        end
    end
    new_es
end