class LogStash::Filters::Collate
Collate
events by time or count.
The original goal of this filter was to merge the logs from different sources by the time of log, for example, in real-time log collection, logs can be collated by amount of 3000 logs or can be collated in 30 seconds.
The config looks like this:
filter { collate { size => 3000 interval => "30s" order => "ascending" } }
Public Instance Methods
filter(event) { |collatedEvent| ... }
click to toggle source
# File lib/logstash/filters/collate.rb, line 52 def filter(event) @logger.info("do collate filter") if event == LogStash::SHUTDOWN @job.trigger() @job.unschedule() @logger.info("collate filter thread shutdown.") return end # if the event is collated, a "collated" tag will be marked, so for those uncollated event, cancel them first. if event["tags"].nil? || !event.tags.include?("collated") event.cancel else return end @mutex.synchronize{ @collatingArray.push(event.clone) if (@collatingArray.length == @count) collate end if (@collatingDone) while collatedEvent = @collatingArray.pop collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil? collatedEvent["tags"] << "collated" filter_matched(collatedEvent) yield collatedEvent end # while @collatingArray.pop # reset collatingDone flag @collatingDone = false end } end
flush()
click to toggle source
Flush any pending messages.
# File lib/logstash/filters/collate.rb, line 100 def flush events = [] if (@collatingDone) @mutex.synchronize{ while collatedEvent = @collatingArray.pop collatedEvent["tags"] << "collated" events << collatedEvent end # while @collatingArray.pop } # reset collatingDone flag. @collatingDone = false end return events end
register()
click to toggle source
# File lib/logstash/filters/collate.rb, line 35 def register require "thread" require "rufus/scheduler" @mutex = Mutex.new @collatingDone = false @collatingArray = Array.new @scheduler = Rufus::Scheduler.start_new @job = @scheduler.every @interval do @logger.info("Scheduler Activated") @mutex.synchronize{ collate } end end
Private Instance Methods
collate()
click to toggle source
# File lib/logstash/filters/collate.rb, line 89 def collate if (@order == "ascending") @collatingArray.sort! { |eventA, eventB| eventB.timestamp <=> eventA.timestamp } else @collatingArray.sort! { |eventA, eventB| eventA.timestamp <=> eventB.timestamp } end @collatingDone = true end