class LogStash::Outputs::ElasticSearchHTTP
This output lets you store logs in elasticsearch.
This plugin uses the HTTP/REST interface to ElasticSearch, which usually lets you use any version of elasticsearch server. It is known to work with elasticsearch %ELASTICSEARCH_VERSION%
You can learn more about elasticsearch at <elasticsearch.org>
Public Instance Methods
flush(events, teardown=false)
click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 204 def flush(events, teardown=false) # Avoid creating a new string for newline every time newline = "\n".freeze body = events.collect do |event, index, type| index = event.sprintf(@index) # Set the 'type' value for the index. if @index_type.nil? type = event["type"] || "logs" else type = event.sprintf(@index_type) end header = { "index" => { "_index" => index, "_type" => type } } header["index"]["_id"] = event.sprintf(@document_id) if !@document_id.nil? [ header.to_json, newline, event.to_json, newline ] end.flatten post(body.join("")) end
get_template_json()
click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 174 def get_template_json if @template.nil? if __FILE__ =~ /^(jar:)?file:\/.+!.+/ begin # Running from a jar, assume types.db is at the root. jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!") @template = jar_path rescue => ex raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}" end else if File.exists?("elasticsearch-template.json") @template = "elasticsearch-template.json" elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json") @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json" else raise "You must specify 'template => ...' in your elasticsearch_http output" end end end @template_json = IO.read(@template).gsub(/\n/,'') @logger.info("Using mapping template", :template => @template_json) end
post(body)
click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 225 def post(body) begin response = @agent.post!(@bulk_url, :body => body) rescue EOFError @logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port) return # abort this flush end # Consume the body for error checking # This will also free up the connection for reuse. body = "" begin response.read_body { |chunk| body += chunk } rescue EOFError @logger.warn("EOF while reading response body from elasticsearch", :host => @host, :port => @port) return # abort this flush end if response.status != 200 @logger.error("Error writing (bulk) to elasticsearch", :response => response, :response_body => body, :request_body => @queue.join("\n")) return end end
receive(event)
click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 199 def receive(event) return unless output?(event) buffer_receive([event, index, type]) end
register()
click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 96 def register require "ftw" # gem ftw @agent = FTW::Agent.new @queue = [] auth = @user && @password ? "#{@user}:#{@password.value}@" : "" @bulk_url = "http://#{auth}#{@host}:#{@port}/_bulk?replication=#{@replication}" if @manage_template @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) template_search_url = "http://#{auth}#{@host}:#{@port}/_template/*" @template_url = "http://#{auth}#{@host}:#{@port}/_template/#{@template_name}" if @template_overwrite @logger.info("Template overwrite enabled. Deleting existing template.", :template_overwrite => @template_overwrite.to_s) response = @agent.get!(@template_url) template_action('delete') if response.status == 200 #=> Purge the old template if it exists end @logger.debug("Template Search URL:", :template_search_url => template_search_url) has_template = false template_idx_name = @index.sub(/%{[^}]+}/,'*') alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*') # Get the template data response = @agent.get!(template_search_url) json = "" if response.status == 404 #=> This condition can occcur when no template has ever been appended @logger.info("No template found in Elasticsearch...") get_template_json template_action('put') elsif response.status == 200 begin response.read_body { |c| json << c } results = JSON.parse(json) rescue Exception => e @logger.error("Error parsing JSON", :json => json, :results => results.to_s, :error => e.to_s) raise "Exception in parsing JSON", e end if !results.any? { |k,v| v["template"] == template_idx_name || v["template"] == alt_template_idx_name } @logger.debug("No template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name) get_template_json template_action('put') end else #=> Some other status code? @logger.error("Could not check for existing template. Check status code.", :status => response.status.to_s) end # end if response.status == 200 end # end if @manage_template buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) end
teardown()
click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 253 def teardown buffer_flush(:final => true) end
template_action(command)
click to toggle source
# File lib/logstash/outputs/elasticsearch_http.rb, line 148 def template_action(command) begin if command == 'delete' response = @agent.delete!(@template_url) response.discard_body elsif command == 'put' response = @agent.put!(@template_url, :body => @template_json) response.discard_body end rescue EOFError @logger.warn("EOF while attempting request or reading response header from elasticsearch", :host => @host, :port => @port) return # abort this action end if response.status != 200 @logger.error("Error acting on elasticsearch mapping template", :response => response, :action => command, :request_url => @template_url) return end @logger.info("Successfully deleted template", :template_url => @template_url) if command == 'delete' @logger.info("Successfully applied template", :template_url => @template_url) if command == 'put' end