class LogStash::FilterWorker
TODO(sissel): Should this really be a 'plugin' ?
Constants
- Exceptions
Attributes
after_filter[R]
filters[RW]
logger[RW]
Public Class Methods
new(filters, input_queue, output_queue)
click to toggle source
# File lib/logstash/filterworker.rb, line 18 def initialize(filters, input_queue, output_queue) @filters = filters @input_queue = input_queue @output_queue = output_queue @shutdown_requested = false end
Public Instance Methods
filter(original_event)
click to toggle source
# File lib/logstash/filterworker.rb, line 82 def filter(original_event) # Make an 'events' array that filters can push onto if they # need to generate additional events based on the current event. # The 'split' filter does this, for example. events = [original_event] events.each do |event| @filters.each do |filter| # Filter can emit multiple events, like the 'split' event, so # give the input queue to dump generated events into. # TODO(sissel): This may require some refactoring later, I am not sure # this is the best approach. The goal is to allow filters to modify # the current event, but if necessary, create new events based on # this event. begin update_watchdog(:event => event, :filter => filter) filter.execute(event) do |newevent| events << newevent end rescue *Exceptions => e @logger.warn("Exception during filter", :event => event, :exception => $!, :backtrace => e.backtrace, :filter => filter) ensure clear_watchdog end if event.cancelled? @logger.debug? and @logger.debug("Event cancelled", :event => event, :filter => filter.class) break end @after_filter.call(event,filter) unless @after_filter.nil? end # @filters.each @logger.debug? and @logger.debug("Event finished filtering", :event => event, :thread => Thread.current[:name]) @output_queue.push(event) unless event.cancelled? end # events.each end
flusher()
click to toggle source
# File lib/logstash/filterworker.rb, line 52 def flusher events = [] @filters.each do |filter| # Filter any events generated so far in this flush. events.each do |event| # TODO(sissel): watchdog on flush filtration? unless event.cancelled? filter.filter(event) @after_filter.call(event,filter) unless @after_filter.nil? end end # TODO(sissel): watchdog on flushes? if filter.respond_to?(:flush) flushed = filter.flush events += flushed if !flushed.nil? && flushed.any? end end events.each do |event| @logger.debug? and @logger.debug("Pushing flushed events", :event => event) @output_queue.push(event) unless event.cancelled? end end
run()
click to toggle source
# File lib/logstash/filterworker.rb, line 32 def run # TODO(sissel): Run a flusher thread for each plugin requesting flushes # > It seems reasonable that you could want a multiline filter to flush # after 5 seconds, but want a metrics filter to flush every 10 or 60. # Set up the periodic flusher thread. @flusher = Thread.new { interval(5) { flusher } } while !@shutdown_requested && event = @input_queue.pop if event == LogStash::SHUTDOWN finished @input_queue << LogStash::SHUTDOWN # for the next filter thread return end filter(event) end # while @input_queue.pop finished end
teardown()
click to toggle source
# File lib/logstash/filterworker.rb, line 78 def teardown @shutdown_requested = true end