class LogStash::Filters::Aggregate
Constants
- DEFAULT_TIMEOUT
Default timeout (in seconds) when not defined in plugin configuration
Attributes
boolean indicating if expired maps should be checked on every flush call (typically because custom timeout has beeen set on a map)
pointer to current pipeline context
Public Instance Methods
Called when Logstash stops
# File lib/logstash/filters/aggregate.rb, line 146 def close @logger.debug("Aggregate close call", :code => @code) # define pipeline close instance if none is already defined @current_pipeline.pipeline_close_instance = self if @current_pipeline.pipeline_close_instance.nil? if @current_pipeline.pipeline_close_instance == self # store aggregate maps to file (if option defined) @current_pipeline.mutex.synchronize do @current_pipeline.aggregate_maps.delete_if { |key, value| value.empty? } if @aggregate_maps_path && !@current_pipeline.aggregate_maps.empty? File.open(@aggregate_maps_path, "w"){ |to_file| Marshal.dump(@current_pipeline.aggregate_maps, to_file) } @logger.info("Aggregate maps stored to : #{@aggregate_maps_path}") end end # remove pipeline context for Logstash reload @@pipelines.delete(pipeline_id) end end
Create a new event from the aggregation_map and the corresponding task_id This will create the event and
if @timeout_task_id_field is set, it will set the task_id on the timeout event if @timeout_code is set, it will execute the timeout code on the created timeout event
returns the newly created event
# File lib/logstash/filters/aggregate.rb, line 271 def create_timeout_event(aggregation_map, task_id) @logger.debug("Aggregate create_timeout_event call with task_id '#{task_id}'") event_to_yield = LogStash::Event.new(aggregation_map) if @timeout_task_id_field event_to_yield.set(@timeout_task_id_field, task_id) end LogStash::Util::Decorators.add_tags(@timeout_tags, event_to_yield, "filters/#{self.class.name}") # Call timeout code block if available if @timeout_code begin @timeout_codeblock.call(event_to_yield) rescue => exception @logger.error("Aggregate exception occurred", :error => exception, :timeout_code => @timeout_code, :timeout_event_data => event_to_yield.to_hash_with_metadata) event_to_yield.tag("_aggregateexception") metric.increment(:timeout_code_errors) end end metric.increment(:pushed_events) return event_to_yield end
display all possible timeout options
# File lib/logstash/filters/aggregate.rb, line 452 def display_timeout_options() return [ "timeout", "inactivity_timeout", "timeout_code", "push_map_as_event_on_timeout", "push_previous_map_as_event", "timeout_timestamp_field", "timeout_task_id_field", "timeout_tags" ].join(", ") end
Extract the previous map in aggregate maps, and return it as a new Logstash event
# File lib/logstash/filters/aggregate.rb, line 304 def extract_previous_map_as_event previous_entry = @current_pipeline.aggregate_maps[@task_id].shift() previous_task_id = previous_entry[0] previous_map = previous_entry[1].map update_aggregate_maps_metric() return create_timeout_event(previous_map, previous_task_id) end
This method is invoked each time an event matches the filter
# File lib/logstash/filters/aggregate.rb, line 171 def filter(event, &new_event_block) # define task id task_id = event.sprintf(@task_id) return if task_id.nil? || task_id == @task_id noError = false event_to_yield = nil # protect aggregate_maps against concurrent access, using a mutex @current_pipeline.mutex.synchronize do # if timeout is based on event timestamp, check if task_id map is expired and should be removed if @timeout_timestamp_field event_to_yield = remove_expired_map_based_on_event_timestamp(task_id, event) end # retrieve the current aggregate map aggregate_maps_element = @current_pipeline.aggregate_maps[@task_id][task_id] # case where aggregate map isn't already created if aggregate_maps_element.nil? return if @map_action == "update" # create new event from previous map, if @push_previous_map_as_event is enabled if @push_previous_map_as_event && !@current_pipeline.aggregate_maps[@task_id].empty? event_to_yield = extract_previous_map_as_event() end # create aggregate map creation_timestamp = reference_timestamp(event) aggregate_maps_element = LogStash::Filters::Aggregate::Element.new(creation_timestamp, task_id) @current_pipeline.aggregate_maps[@task_id][task_id] = aggregate_maps_element update_aggregate_maps_metric() else return if @map_action == "create" end # update last event timestamp aggregate_maps_element.lastevent_timestamp = reference_timestamp(event) aggregate_maps_element.difference_from_lastevent_to_now = (Time.now - aggregate_maps_element.lastevent_timestamp).to_i # execute the code to read/update map and event map = aggregate_maps_element.map begin @codeblock.call(event, map, aggregate_maps_element, &new_event_block) @logger.debug("Aggregate successful filter code execution", :code => @code) noError = true rescue => exception @logger.error("Aggregate exception occurred", :error => exception, :code => @code, :map => map, :event_data => event.to_hash_with_metadata) event.tag("_aggregateexception") metric.increment(:code_errors) end # delete the map if task is ended @current_pipeline.aggregate_maps[@task_id].delete(task_id) if @end_of_task update_aggregate_maps_metric() # process custom timeout set by code block if (aggregate_maps_element.timeout || aggregate_maps_element.inactivity_timeout) event_to_yield = process_map_timeout(aggregate_maps_element) end end # match the filter, only if no error occurred filter_matched(event) if noError # yield previous map as new event if set yield event_to_yield if event_to_yield end
This method is invoked by LogStash
every 5 seconds.
# File lib/logstash/filters/aggregate.rb, line 318 def flush(options = {}) @logger.trace("Aggregate flush call with #{options}") # init flush/timeout properties for current pipeline init_pipeline_timeout_management() # launch timeout management only every interval of (@inactivity_timeout / 2) seconds or at Logstash shutdown if @current_pipeline.flush_instance_map[@task_id] == self && @current_pipeline.aggregate_maps[@task_id] && (!@current_pipeline.last_flush_timestamp_map.has_key?(@task_id) || Time.now > @current_pipeline.last_flush_timestamp_map[@task_id] + @inactivity_timeout / 2 || options[:final] || @check_expired_maps_on_every_flush) events_to_flush = remove_expired_maps() # at Logstash shutdown, if push_previous_map_as_event is enabled, it's important to force flush (particularly for jdbc input plugin) @current_pipeline.mutex.synchronize do if options[:final] && @push_previous_map_as_event && !@current_pipeline.aggregate_maps[@task_id].empty? events_to_flush << extract_previous_map_as_event() end end update_aggregate_maps_metric() # tag flushed events, indicating "final flush" special event if options[:final] events_to_flush.each { |event_to_flush| event_to_flush.tag("_aggregatefinalflush") } end # update last flush timestamp @current_pipeline.last_flush_timestamp_map[@task_id] = Time.now # return events to flush into Logstash pipeline return events_to_flush else return [] end end
return if this filter instance has any timeout option enabled in logstash configuration
# File lib/logstash/filters/aggregate.rb, line 438 def has_timeout_options?() return ( timeout || inactivity_timeout || timeout_code || push_map_as_event_on_timeout || push_previous_map_as_event || timeout_timestamp_field || timeout_task_id_field || !timeout_tags.empty? ) end
init flush/timeout properties for current pipeline
# File lib/logstash/filters/aggregate.rb, line 354 def init_pipeline_timeout_management() # Define default flush instance that manages timeout (if not defined by user) if !@current_pipeline.flush_instance_map.has_key?(@task_id) @current_pipeline.flush_instance_map[@task_id] = self end # Define timeout and inactivity_timeout (if not defined by user) if @current_pipeline.flush_instance_map[@task_id] == self if @timeout.nil? @timeout = DEFAULT_TIMEOUT @logger.debug("Aggregate timeout for '#{@task_id}' pattern: #{@timeout} seconds (default value)") end if @inactivity_timeout.nil? @inactivity_timeout = @timeout end end end
Necessary to indicate Logstash to periodically call 'flush' method
# File lib/logstash/filters/aggregate.rb, line 313 def periodic_flush true end
return current pipeline id
# File lib/logstash/filters/aggregate.rb, line 466 def pipeline_id() if @execution_context return @execution_context.pipeline_id else return "main" end end
Process a custom timeout defined in aggregate map element Returns an event to yield if timeout=0 and push_map_as_event_on_timeout=true
# File lib/logstash/filters/aggregate.rb, line 249 def process_map_timeout(element) event_to_yield = nil init_pipeline_timeout_management() if (element.timeout == 0 || element.inactivity_timeout == 0) @current_pipeline.aggregate_maps[@task_id].delete(element.task_id) if @current_pipeline.flush_instance_map[@task_id].push_map_as_event_on_timeout event_to_yield = create_timeout_event(element.map, element.task_id) end @logger.debug("Aggregate remove expired map with task_id=#{element.task_id} and custom timeout=0") metric.increment(:task_timeouts) update_aggregate_maps_metric() else @current_pipeline.flush_instance_map[@task_id].check_expired_maps_on_every_flush ||= true end return event_to_yield end
compute and return “reference” timestamp to compute timeout : by default “system current time” or event timestamp if timeout_timestamp_field option is defined
# File lib/logstash/filters/aggregate.rb, line 476 def reference_timestamp(event) return (@timeout_timestamp_field) ? event.get(@timeout_timestamp_field).time : Time.now end
Initialize plugin
# File lib/logstash/filters/aggregate.rb, line 76 def register @logger.debug("Aggregate register call", :code => @code) # validate task_id option if !@task_id.match(/%\{.+\}/) raise LogStash::ConfigurationError, "Aggregate plugin: task_id pattern '#{@task_id}' must contain a dynamic expression like '%{field}'" end # process lambda expression to call in each filter call eval("@codeblock = lambda { |event, map, map_meta, &new_event_block| #{@code} }", binding, "(aggregate filter code)") # process lambda expression to call in the timeout case or previous event case if @timeout_code eval("@timeout_codeblock = lambda { |event| #{@timeout_code} }", binding, "(aggregate filter timeout code)") end # init pipeline context @@pipelines[pipeline_id] ||= LogStash::Filters::Aggregate::Pipeline.new() @current_pipeline = @@pipelines[pipeline_id] @current_pipeline.mutex.synchronize do # timeout management : define eviction_instance for current task_id pattern if has_timeout_options? if @current_pipeline.flush_instance_map.has_key?(@task_id) # all timeout options have to be defined in only one aggregate filter per task_id pattern raise LogStash::ConfigurationError, "Aggregate plugin: For task_id pattern '#{@task_id}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern. Timeout options are : #{display_timeout_options}" end @current_pipeline.flush_instance_map[@task_id] = self @logger.debug("Aggregate timeout for '#{@task_id}' pattern: #{@timeout} seconds") end # inactivity timeout management: make sure it is lower than timeout if @inactivity_timeout && ((@timeout && @inactivity_timeout > @timeout) || (@timeout.nil? && @inactivity_timeout > DEFAULT_TIMEOUT)) raise LogStash::ConfigurationError, "Aggregate plugin: For task_id pattern #{@task_id}, inactivity_timeout (#{@inactivity_timeout}) must be lower than timeout (#{@timeout})" end # reinit pipeline_close_instance (if necessary) if !@current_pipeline.aggregate_maps_path_set && @current_pipeline.pipeline_close_instance @current_pipeline.pipeline_close_instance = nil end # check if aggregate_maps_path option has already been set on another instance else set @current_pipeline.aggregate_maps_path_set if @aggregate_maps_path if @current_pipeline.aggregate_maps_path_set @current_pipeline.aggregate_maps_path_set = false raise LogStash::ConfigurationError, "Aggregate plugin: Option 'aggregate_maps_path' must be set on only one aggregate filter" else @current_pipeline.aggregate_maps_path_set = true @current_pipeline.pipeline_close_instance = self end end # load aggregate maps from file (if option defined) if @aggregate_maps_path && File.exist?(@aggregate_maps_path) File.open(@aggregate_maps_path, "r") { |from_file| @current_pipeline.aggregate_maps.merge!(Marshal.load(from_file)) } File.delete(@aggregate_maps_path) @logger.info("Aggregate maps loaded from : #{@aggregate_maps_path}") end # init aggregate_maps @current_pipeline.aggregate_maps[@task_id] ||= {} update_aggregate_maps_metric() end end
Remove the expired Aggregate
map associated to task_id if it is older than timeout or if no new event has been received since inactivity_timeout (relative to current event timestamp). If @push_previous_map_as_event option is set, or @push_map_as_event_on_timeout is set, expired map is returned as new event to be flushed to Logstash pipeline.
# File lib/logstash/filters/aggregate.rb, line 410 def remove_expired_map_based_on_event_timestamp(task_id, event) @logger.debug("Aggregate remove_expired_map_based_on_event_timestamp call with task_id : '#{@task_id}'") # get aggregate map element element = @current_pipeline.aggregate_maps[@task_id][task_id] return nil if element.nil? init_pipeline_timeout_management() event_to_flush = nil event_timestamp = reference_timestamp(event) min_timestamp = element.timeout ? event_timestamp - element.timeout : event_timestamp - @timeout min_inactivity_timestamp = element.inactivity_timeout ? event_timestamp - element.inactivity_timeout : event_timestamp - @inactivity_timeout if element.creation_timestamp < min_timestamp || element.lastevent_timestamp < min_inactivity_timestamp if @push_previous_map_as_event || @push_map_as_event_on_timeout event_to_flush = create_timeout_event(element.map, task_id) end @current_pipeline.aggregate_maps[@task_id].delete(task_id) @logger.debug("Aggregate remove expired map with task_id=#{task_id}") metric.increment(:task_timeouts) end return event_to_flush end
Remove the expired Aggregate
maps from @current_pipeline.aggregate_maps if they are older than timeout or if no new event has been received since inactivity_timeout. If @push_previous_map_as_event option is set, or @push_map_as_event_on_timeout is set, expired maps are returned as new events to be flushed to Logstash pipeline.
# File lib/logstash/filters/aggregate.rb, line 376 def remove_expired_maps() events_to_flush = [] default_min_timestamp = Time.now - @timeout default_min_inactivity_timestamp = Time.now - @inactivity_timeout @current_pipeline.mutex.synchronize do @logger.debug("Aggregate remove_expired_maps call with '#{@task_id}' pattern and #{@current_pipeline.aggregate_maps[@task_id].length} maps") @current_pipeline.aggregate_maps[@task_id].delete_if do |key, element| min_timestamp = element.timeout ? Time.now - element.timeout : default_min_timestamp min_inactivity_timestamp = element.inactivity_timeout ? Time.now - element.inactivity_timeout : default_min_inactivity_timestamp if element.creation_timestamp + element.difference_from_creation_to_now < min_timestamp || element.lastevent_timestamp + element.difference_from_lastevent_to_now < min_inactivity_timestamp if @push_previous_map_as_event || @push_map_as_event_on_timeout events_to_flush << create_timeout_event(element.map, key) end @logger.debug("Aggregate remove expired map with task_id=#{key}") metric.increment(:task_timeouts) next true end next false end end # disable check_expired_maps_on_every_flush if there is not anymore maps if @current_pipeline.aggregate_maps[@task_id].length == 0 && @check_expired_maps_on_every_flush @check_expired_maps_on_every_flush = nil end return events_to_flush end
update “aggregate_maps” metric, with aggregate maps count associated to configured taskid pattern
# File lib/logstash/filters/aggregate.rb, line 481 def update_aggregate_maps_metric() aggregate_maps = @current_pipeline.aggregate_maps[@task_id] if aggregate_maps metric.gauge(:aggregate_maps, aggregate_maps.length) end end