class LogStash::Inputs::Elasticsearch

Read from elasticsearch.

This is useful for replay testing logs, reindexing, etc.

Example:

input {
  # Read all documents from elasticsearch matching the given query
  elasticsearch {
    host => "localhost"
    query => "ERROR"
  }
}

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 52
def register
  require "ftw"
  @agent = FTW::Agent.new
  params = {
    "q" => @query,
    "scroll" => @scroll,
    "size" => "#{@size}",
  }

  params['search_type'] = "scan" if @scan

  @url = "http://#{@host}:#{@port}/#{@index}/_search?#{encode(params)}"
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 74
def run(output_queue)

  # Execute the search request
  response = @agent.get!(@url)
  json = ""
  response.read_body { |c| json << c }
  result = JSON.parse(json)
  scroll_id = result["_scroll_id"]

  # When using the search_type=scan we don't get an initial result set.
  # So we do it here.
  if @scan

    scroll_params = {
      "scroll_id" => scroll_id,
      "scroll" => @scroll
    }

    scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode(scroll_params)}"
    response = @agent.get!(scroll_url)
    json = ""
    response.read_body { |c| json << c }
    result = JSON.parse(json)

  end

  while true
    break if result.nil?
    hits = result["hits"]["hits"]
    break if hits.empty?

    hits.each do |hit|
      event = hit["_source"]

      # Hack to make codecs work
      @codec.decode(event.to_json) do |event|
        decorate(event)
        output_queue << event
      end
    end

    # Get the scroll id from the previous result set and use it for getting the next data set
    scroll_id = result["_scroll_id"]

    # Fetch the next result set
    scroll_params = {
      "scroll_id" => scroll_id,
      "scroll" => @scroll
    }
    scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode(scroll_params)}"

    response = @agent.get!(scroll_url)
    json = ""
    response.read_body { |c| json << c }
    result = JSON.parse(json)

    if result["error"]
      @logger.warn(result["error"], :request => scroll_url)
      # TODO(sissel): raise an error instead of breaking
      break
    end

  end
rescue LogStash::ShutdownSignal
  # Do nothing, let us quit.
end

Private Instance Methods

encode(hash) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 67
def encode(hash)
  return hash.collect do |key, value|
    CGI.escape(key) + "=" + CGI.escape(value)
  end.join("&")
end