class LogStash::Inputs::Elasticsearch::CursorTracker
Attributes
Public Class Methods
Source
# File lib/logstash/inputs/elasticsearch/cursor_tracker.rb, line 9 def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) @last_run_metadata_path = last_run_metadata_path @last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed @tracking_field = tracking_field logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}" @mutex = Mutex.new end
Public Instance Methods
Source
# File lib/logstash/inputs/elasticsearch/cursor_tracker.rb, line 18 def checkpoint_cursor(intermediate: true) @mutex.synchronize do if intermediate # in intermediate checkpoints pick the smallest converge_last_value {|v1, v2| v1 < v2 ? v1 : v2} else # in the last search of a PIT choose the largest converge_last_value {|v1, v2| v1 > v2 ? v1 : v2} @last_value_hashmap.clear end IO.write(@last_run_metadata_path, @last_value) end end
Source
# File lib/logstash/inputs/elasticsearch/cursor_tracker.rb, line 32 def converge_last_value(&block) return if @last_value_hashmap.empty? new_last_value = @last_value_hashmap.reduceValues(1000, &block) logger.debug? && logger.debug("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}") return if new_last_value == @last_value @last_value = new_last_value logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}" end
Source
# File lib/logstash/inputs/elasticsearch/cursor_tracker.rb, line 47 def inject_cursor(query_json) # ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", now_minus_30s) logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result) result end
Source
# File lib/logstash/inputs/elasticsearch/cursor_tracker.rb, line 54 def now_minus_30s Java::java.time.Instant.now.minusSeconds(30).to_s end
Source
# File lib/logstash/inputs/elasticsearch/cursor_tracker.rb, line 41 def record_last_value(event) value = event.get(@tracking_field) logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}") @last_value_hashmap.put(Thread.current.object_id, value) end