class LogStash::Inputs::Elasticsearch::Scroll
Constants
- SCROLL_JOB
Public Instance Methods
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 114 def clear(scroll_id) @client.clear_scroll(:body => { :scroll_id => scroll_id }) if scroll_id rescue => e # ignore & log any clear_scroll errors logger.debug("Ignoring clear_scroll exception", message: e.message, exception: e.class) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 63 def initial_search(slice_id) options = search_options(slice_id) @client.search(options) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 68 def next_page(scroll_id) @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 72 def process_page(output_queue) r = yield r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) } [r['hits']['hits'].any?, r['_scroll_id']] end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 95 def retryable_search(output_queue, slice_id=nil) retryable(SCROLL_JOB) do search(output_queue, slice_id) end end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 101 def retryable_slice_search(output_queue) logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8 @slices.times.map do |slice_id| Thread.new do LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}") retryable_search(output_queue, slice_id) end end.map(&:join) logger.trace("#{@slices} slices completed") end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 78 def search(output_queue, slice_id=nil) log_details = {} log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil? logger.info("Query start", log_details) has_hits, scroll_id = process_page(output_queue) { initial_search(slice_id) } while has_hits && scroll_id && !@plugin.stop? logger.debug("Query progress", log_details) has_hits, scroll_id = process_page(output_queue) { next_page(scroll_id) } end logger.info("Query completed", log_details) ensure clear(scroll_id) end
Source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 52 def search_options(slice_id) query = @query query = @query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil? { :index => @index, :scroll => @scroll, :size => @size, :body => LogStash::Json.dump(query) } end