class LogStash::Outputs::ElasticSearch

This output lets you store logs in elasticsearch and is the most recommended output for logstash. If you plan on using the logstash web interface, you'll need to use this output.

*VERSION NOTE*: Your elasticsearch cluster must be running elasticsearch
%ELASTICSEARCH_VERSION%. If you use any other version of elasticsearch,
you should consider using the [elasticsearch_http](elasticsearch_http)
output instead.

If you want to set other elasticsearch options that are not exposed directly as config options, there are two options:

This plugin will join your elasticsearch cluster, so it will show up in elasticsearch's cluster health status.

You can learn more about elasticsearch at <elasticsearch.org>

## Operational Notes

Template management is a new feature and requires at least version Elasticsearch 0.90.5+

If you are still using a version older than this, please upgrade for more benefits than just template management.

Your firewalls will need to permit port 9300 in both directions (from logstash to elasticsearch, and elasticsearch to logstash)

Public Instance Methods

flush(events, teardown=false) click to toggle source
# File lib/logstash/outputs/elasticsearch.rb, line 308
def flush(events, teardown=false)
  request = @client.bulk
  events.each do |event, index, type|
    index = event.sprintf(@index)

    # Set the 'type' value for the index.
    if @index_type.nil?
      type =  event["type"] || "logs"
    else
      type = event.sprintf(@index_type)
    end
    if @document_id
      request.index(index, type, event.sprintf(@document_id), event.to_json)
    else
      request.index(index, type, nil, event.to_json)
    end
  end

  request.execute!
  # TODO(sissel): Handle errors. Since bulk requests could mostly succeed
  # (aka partially fail), we need to figure out what documents need to be
  # retried.
end
get_template_json() click to toggle source
# File lib/logstash/outputs/elasticsearch.rb, line 264
def get_template_json
  if @template.nil?
    if __FILE__ =~ /^(jar:)?file:\/.+!.+/
      begin
        # Running from a jar, assume types.db is at the root.
        jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!")
        @template = jar_path
      rescue => ex
        raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}"
      end
    else
      if File.exists?("elasticsearch-template.json")
        @template = "elasticsearch-template.json"
      elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json")
        @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json"
      else
        raise "You must specify 'template => ...' in your elasticsearch_http output"
      end
    end
  end
  @template_json = IO.read(@template).gsub(/\n/,'')
  @logger.info("Using mapping template", :template => @template_json)
end
receive(event) click to toggle source
# File lib/logstash/outputs/elasticsearch.rb, line 303
def receive(event)
  return unless output?(event)
  buffer_receive([event, index, type])
end
register() click to toggle source
# File lib/logstash/outputs/elasticsearch.rb, line 154
def register
  # TODO(sissel): find a better way of declaring where the elasticsearch
  # libraries are
  # TODO(sissel): can skip this step if we're running from a jar.
  jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/elasticsearch*/lib/*.jar")
  Dir[jarpath].each do |jar|
    require jar
  end

  # setup log4j properties for elasticsearch
  LogStash::Logger.setup_log4j(@logger)

  if @embedded
    # Default @host with embedded to localhost. This should help avoid
    # newbies tripping on ubuntu and other distros that have a default
    # firewall that blocks multicast.
    @host ||= "localhost"

    # Start elasticsearch local.
    start_local_elasticsearch
  end
  require "jruby-elasticsearch"

  @logger.info("New ElasticSearch output", :cluster => @cluster,
               :host => @host, :port => @port, :embedded => @embedded)
  options = {
    :cluster => @cluster,
    :host => @host,
    :port => @port,
    :bind_host => @bind_host,
    :node_name => @node_name,
  }

  # :node or :transport protocols
  options[:type] = @protocol.to_sym 

  options[:bind_port] = @bind_port unless @bind_port.nil?

  # TransportClient requires a number for a port.
  options[:port] = options[:port].to_i if options[:type] == :transport

  @client = ElasticSearch::Client.new(options)

  # Check to see if we *can* get the template
  java_client = @client.instance_eval{@client}
  begin
    check_template = ElasticSearch::GetIndexTemplatesRequest.new(java_client, @template_name)
    result = check_template.execute #=> Run previously...
  rescue Exception => e
    @logger.error("Unable to check template.  Automatic template management disabled.", :error => e.to_s)
    @manage_template = false
  end
  
  if @manage_template
    @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
    if @template_overwrite
      @logger.info("Template overwrite enabled.  Deleting template if it exists.", :template_overwrite => @template_overwrite.to_s)
      if !result.getIndexTemplates.isEmpty
        delete_template = ElasticSearch::DeleteIndexTemplateRequest.new(java_client, @template_name)
        result = delete_template.execute
        if result.isAcknowledged
          @logger.info("Successfully deleted template", :template_name => @template_name)
        else
          @logger.error("Failed to delete template", :template_name => @template_name)
        end
      end  
    end # end if @template_overwrite
    has_template = false
    @logger.debug("Fetching all templates...")
    gettemplates = ElasticSearch::GetIndexTemplatesRequest.new(java_client, "*")
    result = gettemplates.execute
    # Results of this come as a list, so we need to iterate through it
    if !result.getIndexTemplates.isEmpty
      template_metadata_list = result.getIndexTemplates
      templates = {}
      i = 0
      template_metadata_list.size.times do
        template_data = template_metadata_list.get(i)
        templates[template_data.name] = template_data.template
        i += 1
      end
      template_idx_name = @index.sub(/%{[^}]+}/,'*')
      alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*')
      if !templates.any? { |k,v| v == template_idx_name || v == alt_template_idx_name }
        @logger.debug("No logstash template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name)
      else
        has_template = true
        @logger.info("Found existing Logstash template match.", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name, :templates => templates.to_s)
      end
    end
    if !has_template #=> No template found, we're going to add one
      get_template_json
      put_template = ElasticSearch::PutIndexTemplateRequest.new(java_client, @template_name, @template_json)
      result = put_template.execute
      if result.isAcknowledged
        @logger.info("Successfully inserted template", :template_name => @template_name)
      else
        @logger.error("Failed to insert template", :template_name => @template_name)
      end
    end 
  end # if @manage_templates
  
  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
end

Protected Instance Methods

start_local_elasticsearch() click to toggle source
# File lib/logstash/outputs/elasticsearch.rb, line 289
def start_local_elasticsearch
  @logger.info("Starting embedded ElasticSearch local node.")
  builder = org.elasticsearch.node.NodeBuilder.nodeBuilder
  # Disable 'local only' - LOGSTASH-277
  #builder.local(true)
  builder.settings.put("cluster.name", @cluster) if !@cluster.nil?
  builder.settings.put("node.name", @node_name) if !@node_name.nil?
  builder.settings.put("http.port", @embedded_http_port)

  @embedded_elasticsearch = builder.node
  @embedded_elasticsearch.start
end