class LogStash::Filters::Collate

Collate events by time or count.

The original goal of this filter was to merge the logs from different sources by the time of log, for example, in real-time log collection, logs can be collated by amount of 3000 logs or can be collated in 30 seconds.

The config looks like this:

filter {
  collate {
    size => 3000
    interval => "30s"
    order => "ascending"
  }
}

Public Instance Methods

filter(event) { |collatedEvent| ... } click to toggle source
# File lib/logstash/filters/collate.rb, line 52
def filter(event)
  @logger.info("do collate filter")
  if event == LogStash::SHUTDOWN
    @job.trigger()
    @job.unschedule()
    @logger.info("collate filter thread shutdown.")
    return
  end

  # if the event is collated, a "collated" tag will be marked, so for those uncollated event, cancel them first.
  if event["tags"].nil? || !event.tags.include?("collated")
    event.cancel
  else
    return
  end

  @mutex.synchronize{
    @collatingArray.push(event.clone)

    if (@collatingArray.length == @count)
      collate
    end

    if (@collatingDone)
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil?
        collatedEvent["tags"] << "collated"
        filter_matched(collatedEvent)
        yield collatedEvent
      end # while @collatingArray.pop
      # reset collatingDone flag
      @collatingDone = false
    end
  }
end
flush() click to toggle source

Flush any pending messages.

# File lib/logstash/filters/collate.rb, line 100
def flush
  events = []
  if (@collatingDone)
    @mutex.synchronize{
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] << "collated"
        events << collatedEvent
      end # while @collatingArray.pop
    }
    # reset collatingDone flag.
    @collatingDone = false
  end
  return events
end
register() click to toggle source
# File lib/logstash/filters/collate.rb, line 35
def register
  require "thread"
  require "rufus/scheduler"

  @mutex = Mutex.new
  @collatingDone = false
  @collatingArray = Array.new
  @scheduler = Rufus::Scheduler.start_new
  @job = @scheduler.every @interval do
    @logger.info("Scheduler Activated")
    @mutex.synchronize{
      collate
    }
  end
end

Private Instance Methods

collate() click to toggle source
# File lib/logstash/filters/collate.rb, line 89
def collate
  if (@order == "ascending")
    @collatingArray.sort! { |eventA, eventB| eventB.timestamp <=> eventA.timestamp }
  else 
    @collatingArray.sort! { |eventA, eventB| eventA.timestamp <=> eventB.timestamp }
  end
  @collatingDone = true
end