class LogStash::Inputs::Elasticsearch::SearchAfter
Constants
- PIT_JOB
- SEARCH_AFTER_JOB
Attributes
Public Instance Methods
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 231 def clear(pit_id) logger.info("Closing point in time (PIT)") @client.close_point_in_time(:body => {:id => pit_id} ) if pit?(pit_id) rescue => e logger.debug("Ignoring close_point_in_time exception", message: e.message, exception: e.class) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 137 def create_pit logger.info("Create point in time (PIT)") r = @client.open_point_in_time(index: @index, keep_alive: @scroll) r['id'] end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 128 def do_run(output_queue, query) super(output_queue, query) @cursor_tracker.checkpoint_cursor(intermediate: false) if @cursor_tracker end
Calls superclass method
LogStash::Inputs::Elasticsearch::PaginatedSearch#do_run
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 165 def next_page(pit_id: , search_after: nil, slice_id: nil) options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id) logger.trace("search options", options) @client.search(options) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 133 def pit?(id) !!id&.is_a?(String) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 171 def process_page(output_queue) r = yield r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) } has_hits = r['hits']['hits'].any? search_after = r['hits']['hits'][-1]['sort'] rescue nil logger.warn("Query got data but the sort value is empty") if has_hits && search_after.nil? [ has_hits, search_after ] end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 208 def retryable_search(output_queue) with_pit do |pit_id| retryable(SEARCH_AFTER_JOB) do search(output_queue: output_queue, pit_id: pit_id) end end end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 216 def retryable_slice_search(output_queue) with_pit do |pit_id| @slices.times.map do |slice_id| Thread.new do LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}") retryable(SEARCH_AFTER_JOB) do search(output_queue: output_queue, slice_id: slice_id, pit_id: pit_id) end end end.map(&:join) end logger.trace("#{@slices} slices completed") end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 188 def search(output_queue:, slice_id: nil, pit_id:) log_details = {} log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil? logger.info("Query start", log_details) has_hits = true search_after = nil while has_hits && !@plugin.stop? logger.debug("Query progress", log_details) has_hits, search_after = process_page(output_queue) do next_page(pit_id: pit_id, search_after: search_after, slice_id: slice_id) end end @cursor_tracker.checkpoint_cursor(intermediate: true) if @cursor_tracker logger.info("Query completed", log_details) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 143 def search_options(pit_id: , search_after: nil, slice_id: nil) body = @query.merge({ :pit => { :id => pit_id, :keep_alive => @scroll } }) # search_after requires at least a sort field explicitly # we add default sort "_shard_doc": "asc" if the query doesn't have any sort field # by default, ES adds the same implicitly on top of the provided "sort" # https://www.elastic.co/guide/en/elasticsearch/reference/8.10/paginate-search-results.html#CO201-2 body = body.merge(:sort => {"_shard_doc": "asc"}) if @query&.dig("sort").nil? body = body.merge(:search_after => search_after) unless search_after.nil? body = body.merge(:slice => {:id => slice_id, :max => @slices}) unless slice_id.nil? { :size => @size, :body => body } end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 181 def with_pit pit_id = retryable(PIT_JOB) { create_pit } yield pit_id if pit?(pit_id) ensure clear(pit_id) end