class LogStash::Outputs::ElasticSearch
This output lets you store logs in elasticsearch and is the most recommended output for logstash. If you plan on using the logstash web interface, you'll need to use this output.
*VERSION NOTE*: Your elasticsearch cluster must be running elasticsearch %ELASTICSEARCH_VERSION%. If you use any other version of elasticsearch, you should consider using the [elasticsearch_http](elasticsearch_http) output instead.
If you want to set other elasticsearch options that are not exposed directly as config options, there are two options:
-
create an elasticsearch.yml file in the $PWD of the logstash process
-
pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=)
This plugin will join your elasticsearch cluster, so it will show up in elasticsearch's cluster health status.
You can learn more about elasticsearch at <elasticsearch.org>
## Operational Notes
Template management is a new feature and requires at least version Elasticsearch 0.90.5+
If you are still using a version older than this, please upgrade for more benefits than just template management.
Your firewalls will need to permit port 9300 in both directions (from logstash to elasticsearch, and elasticsearch to logstash)
Public Instance Methods
# File lib/logstash/outputs/elasticsearch.rb, line 308 def flush(events, teardown=false) request = @client.bulk events.each do |event, index, type| index = event.sprintf(@index) # Set the 'type' value for the index. if @index_type.nil? type = event["type"] || "logs" else type = event.sprintf(@index_type) end if @document_id request.index(index, type, event.sprintf(@document_id), event.to_json) else request.index(index, type, nil, event.to_json) end end request.execute! # TODO(sissel): Handle errors. Since bulk requests could mostly succeed # (aka partially fail), we need to figure out what documents need to be # retried. end
# File lib/logstash/outputs/elasticsearch.rb, line 264 def get_template_json if @template.nil? if __FILE__ =~ /^(jar:)?file:\/.+!.+/ begin # Running from a jar, assume types.db is at the root. jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!") @template = jar_path rescue => ex raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}" end else if File.exists?("elasticsearch-template.json") @template = "elasticsearch-template.json" elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json") @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json" else raise "You must specify 'template => ...' in your elasticsearch_http output" end end end @template_json = IO.read(@template).gsub(/\n/,'') @logger.info("Using mapping template", :template => @template_json) end
# File lib/logstash/outputs/elasticsearch.rb, line 303 def receive(event) return unless output?(event) buffer_receive([event, index, type]) end
# File lib/logstash/outputs/elasticsearch.rb, line 154 def register # TODO(sissel): find a better way of declaring where the elasticsearch # libraries are # TODO(sissel): can skip this step if we're running from a jar. jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/elasticsearch*/lib/*.jar") Dir[jarpath].each do |jar| require jar end # setup log4j properties for elasticsearch LogStash::Logger.setup_log4j(@logger) if @embedded # Default @host with embedded to localhost. This should help avoid # newbies tripping on ubuntu and other distros that have a default # firewall that blocks multicast. @host ||= "localhost" # Start elasticsearch local. start_local_elasticsearch end require "jruby-elasticsearch" @logger.info("New ElasticSearch output", :cluster => @cluster, :host => @host, :port => @port, :embedded => @embedded) options = { :cluster => @cluster, :host => @host, :port => @port, :bind_host => @bind_host, :node_name => @node_name, } # :node or :transport protocols options[:type] = @protocol.to_sym options[:bind_port] = @bind_port unless @bind_port.nil? # TransportClient requires a number for a port. options[:port] = options[:port].to_i if options[:type] == :transport @client = ElasticSearch::Client.new(options) # Check to see if we *can* get the template java_client = @client.instance_eval{@client} begin check_template = ElasticSearch::GetIndexTemplatesRequest.new(java_client, @template_name) result = check_template.execute #=> Run previously... rescue Exception => e @logger.error("Unable to check template. Automatic template management disabled.", :error => e.to_s) @manage_template = false end if @manage_template @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) if @template_overwrite @logger.info("Template overwrite enabled. Deleting template if it exists.", :template_overwrite => @template_overwrite.to_s) if !result.getIndexTemplates.isEmpty delete_template = ElasticSearch::DeleteIndexTemplateRequest.new(java_client, @template_name) result = delete_template.execute if result.isAcknowledged @logger.info("Successfully deleted template", :template_name => @template_name) else @logger.error("Failed to delete template", :template_name => @template_name) end end end # end if @template_overwrite has_template = false @logger.debug("Fetching all templates...") gettemplates = ElasticSearch::GetIndexTemplatesRequest.new(java_client, "*") result = gettemplates.execute # Results of this come as a list, so we need to iterate through it if !result.getIndexTemplates.isEmpty template_metadata_list = result.getIndexTemplates templates = {} i = 0 template_metadata_list.size.times do template_data = template_metadata_list.get(i) templates[template_data.name] = template_data.template i += 1 end template_idx_name = @index.sub(/%{[^}]+}/,'*') alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*') if !templates.any? { |k,v| v == template_idx_name || v == alt_template_idx_name } @logger.debug("No logstash template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name) else has_template = true @logger.info("Found existing Logstash template match.", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name, :templates => templates.to_s) end end if !has_template #=> No template found, we're going to add one get_template_json put_template = ElasticSearch::PutIndexTemplateRequest.new(java_client, @template_name, @template_json) result = put_template.execute if result.isAcknowledged @logger.info("Successfully inserted template", :template_name => @template_name) else @logger.error("Failed to insert template", :template_name => @template_name) end end end # if @manage_templates buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) end
Protected Instance Methods
# File lib/logstash/outputs/elasticsearch.rb, line 289 def start_local_elasticsearch @logger.info("Starting embedded ElasticSearch local node.") builder = org.elasticsearch.node.NodeBuilder.nodeBuilder # Disable 'local only' - LOGSTASH-277 #builder.local(true) builder.settings.put("cluster.name", @cluster) if !@cluster.nil? builder.settings.put("node.name", @node_name) if !@node_name.nil? builder.settings.put("http.port", @embedded_http_port) @embedded_elasticsearch = builder.node @embedded_elasticsearch.start end