class LogStash::Outputs::ElasticSearchHTTP

This output lets you store logs in elasticsearch.

This plugin uses the HTTP/REST interface to ElasticSearch, which usually lets you use any version of elasticsearch server. It is known to work with elasticsearch %ELASTICSEARCH_VERSION%

You can learn more about elasticsearch at <elasticsearch.org>

Public Instance Methods

flush(events, teardown=false) click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 204
def flush(events, teardown=false)
  # Avoid creating a new string for newline every time
  newline = "\n".freeze

  body = events.collect do |event, index, type|
    index = event.sprintf(@index)

    # Set the 'type' value for the index.
    if @index_type.nil?
      type =  event["type"] || "logs"
    else
      type = event.sprintf(@index_type)
    end
    header = { "index" => { "_index" => index, "_type" => type } }
    header["index"]["_id"] = event.sprintf(@document_id) if !@document_id.nil?

    [ header.to_json, newline, event.to_json, newline ]
  end.flatten
  post(body.join(""))
end
get_template_json() click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 174
def get_template_json
  if @template.nil?
    if __FILE__ =~ /^(jar:)?file:\/.+!.+/
      begin
        # Running from a jar, assume types.db is at the root.
        jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!")
        @template = jar_path
      rescue => ex
        raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}"
      end
    else
      if File.exists?("elasticsearch-template.json")
        @template = "elasticsearch-template.json"
      elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json")
        @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json"
      else
        raise "You must specify 'template => ...' in your elasticsearch_http output"
      end
    end
  end
  @template_json = IO.read(@template).gsub(/\n/,'')
  @logger.info("Using mapping template", :template => @template_json)
end
post(body) click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 225
def post(body)
  begin
    response = @agent.post!(@bulk_url, :body => body)
  rescue EOFError
    @logger.warn("EOF while writing request or reading response header from elasticsearch",
                 :host => @host, :port => @port)
    return # abort this flush
  end

  # Consume the body for error checking
  # This will also free up the connection for reuse.
  body = ""
  begin
    response.read_body { |chunk| body += chunk }
  rescue EOFError
    @logger.warn("EOF while reading response body from elasticsearch",
                 :host => @host, :port => @port)
    return # abort this flush
  end

  if response.status != 200
    @logger.error("Error writing (bulk) to elasticsearch",
                  :response => response, :response_body => body,
                  :request_body => @queue.join("\n"))
    return
  end
end
receive(event) click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 199
def receive(event)
  return unless output?(event)
  buffer_receive([event, index, type])
end
register() click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 96
def register
  require "ftw" # gem ftw
  @agent = FTW::Agent.new
  @queue = []

  auth = @user && @password ? "#{@user}:#{@password.value}@" : ""
  @bulk_url = "http://#{auth}#{@host}:#{@port}/_bulk?replication=#{@replication}"
  if @manage_template
    @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
    template_search_url = "http://#{auth}#{@host}:#{@port}/_template/*"
    @template_url = "http://#{auth}#{@host}:#{@port}/_template/#{@template_name}"
    if @template_overwrite
      @logger.info("Template overwrite enabled.  Deleting existing template.", :template_overwrite => @template_overwrite.to_s)
      response = @agent.get!(@template_url)
      template_action('delete') if response.status == 200 #=> Purge the old template if it exists
    end
    @logger.debug("Template Search URL:", :template_search_url => template_search_url)
    has_template = false
    template_idx_name = @index.sub(/%{[^}]+}/,'*')
    alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*')
    # Get the template data
    response = @agent.get!(template_search_url)
    json = ""
    if response.status == 404 #=> This condition can occcur when no template has ever been appended
      @logger.info("No template found in Elasticsearch...")
      get_template_json
      template_action('put')
    elsif response.status == 200
      begin
        response.read_body { |c| json << c }
        results = JSON.parse(json)
      rescue Exception => e
        @logger.error("Error parsing JSON", :json => json, :results => results.to_s, :error => e.to_s)
        raise "Exception in parsing JSON", e
      end
      if !results.any? { |k,v| v["template"] == template_idx_name || v["template"] == alt_template_idx_name }
        @logger.debug("No template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name)
        get_template_json
        template_action('put')      
      end
    else #=> Some other status code?
      @logger.error("Could not check for existing template.  Check status code.", :status => response.status.to_s)
    end # end if response.status == 200
  end # end if @manage_template
  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
end
teardown() click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 253
def teardown
  buffer_flush(:final => true)
end
template_action(command) click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 148
def template_action(command)
  begin
    if command == 'delete'
      response = @agent.delete!(@template_url)
      response.discard_body
    elsif command == 'put'
      response = @agent.put!(@template_url, :body => @template_json)
      response.discard_body
    end
  rescue EOFError
    @logger.warn("EOF while attempting request or reading response header from elasticsearch",
                 :host => @host, :port => @port)
    return # abort this action
  end
  if response.status != 200
    @logger.error("Error acting on elasticsearch mapping template",
                  :response => response, :action => command,
                  :request_url => @template_url)
    return
  end
  @logger.info("Successfully deleted template", :template_url => @template_url) if command == 'delete'
  @logger.info("Successfully applied template", :template_url => @template_url) if command == 'put'
end