class LogStash::Inputs::Elasticsearch
Read from elasticsearch.
This is useful for replay testing logs, reindexing, etc.
Example:
input { # Read all documents from elasticsearch matching the given query elasticsearch { host => "localhost" query => "ERROR" } }
-
TODO(sissel): configurable scroll timeout
-
TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing?
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 52 def register require "ftw" @agent = FTW::Agent.new params = { "q" => @query, "scroll" => @scroll, "size" => "#{@size}", } params['search_type'] = "scan" if @scan @url = "http://#{@host}:#{@port}/#{@index}/_search?#{encode(params)}" end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 74 def run(output_queue) # Execute the search request response = @agent.get!(@url) json = "" response.read_body { |c| json << c } result = JSON.parse(json) scroll_id = result["_scroll_id"] # When using the search_type=scan we don't get an initial result set. # So we do it here. if @scan scroll_params = { "scroll_id" => scroll_id, "scroll" => @scroll } scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode(scroll_params)}" response = @agent.get!(scroll_url) json = "" response.read_body { |c| json << c } result = JSON.parse(json) end while true break if result.nil? hits = result["hits"]["hits"] break if hits.empty? hits.each do |hit| event = hit["_source"] # Hack to make codecs work @codec.decode(event.to_json) do |event| decorate(event) output_queue << event end end # Get the scroll id from the previous result set and use it for getting the next data set scroll_id = result["_scroll_id"] # Fetch the next result set scroll_params = { "scroll_id" => scroll_id, "scroll" => @scroll } scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode(scroll_params)}" response = @agent.get!(scroll_url) json = "" response.read_body { |c| json << c } result = JSON.parse(json) if result["error"] @logger.warn(result["error"], :request => scroll_url) # TODO(sissel): raise an error instead of breaking break end end rescue LogStash::ShutdownSignal # Do nothing, let us quit. end
Private Instance Methods
encode(hash)
click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 67 def encode(hash) return hash.collect do |key, value| CGI.escape(key) + "=" + CGI.escape(value) end.join("&") end