class LogStash::Outputs::ElasticSearchRiver
This output lets you store logs in elasticsearch. It's similar to the 'elasticsearch' output but improves performance by using a queue server, rabbitmq, to send data to elasticsearch.
Upon startup, this output will automatically contact an elasticsearch cluster and configure it to read from the queue to which we write.
You can learn more about elasticseasrch at <elasticsearch.org> More about the elasticsearch rabbitmq river plugin: <github.com/elasticsearch/elasticsearch-river-rabbitmq/blob/master/README.md>
Public Instance Methods
receive(event)
click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 202 def receive(event) return unless output?(event) # River events have a format of # "action\ndata\n" # where 'action' is index or delete, data is the data to index. header = { "index" => { "_index" => event.sprintf(@index), "_type" => event.sprintf(@index_type) } } if !@document_id.nil? header["index"]["_id"] = event.sprintf(@document_id) end @mq.publish_serialized(header.to_json + "\n" + event.to_json + "\n") end
register()
click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 88 def register # TODO(sissel): find a better way of declaring where the elasticsearch # libraries are # TODO(sissel): can skip this step if we're running from a jar. jarpath = File.join(File.dirname(__FILE__), "../../../vendor/**/*.jar") Dir[jarpath].each do |jar| require jar end prepare_river end
Protected Instance Methods
prepare_river()
click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 101 def prepare_river require "logstash/outputs/rabbitmq" # Configure the message plugin params = { "host" => [@rabbitmq_host], "port" => [@rabbitmq_port], "user" => [@user], "password" => [@password], "exchange_type" => [@exchange_type], "exchange" => [@exchange], "key" => [@key], "vhost" => [@vhost], "durable" => [@durable.to_s], "persistent" => [@persistent.to_s], "debug" => [@debug.to_s], }.reject {|k,v| v.first.nil?} @mq = LogStash::Outputs::RabbitMQ.new(params) @mq.register # Set up the river begin auth = "#{@user}:#{@password}" # Name the river by our hostname require "socket" hostname = Socket.gethostname # Replace spaces with hyphens and remove all non-alpha non-dash non-underscore characters river_name = "#{hostname} #{@queue}".gsub(' ', '-').gsub(/[^\w-]/, '') api_path = "/_river/logstash-#{river_name}/_meta" @status_path = "/_river/logstash-#{river_name}/_status" river_config = {"type" => "rabbitmq", "rabbitmq" => { "host" => @rabbitmq_host=="localhost" ? hostname : @rabbitmq_host, "port" => @rabbitmq_port, "user" => @user, "pass" => @password, "vhost" => @vhost, "queue" => @queue, "exchange" => @exchange, "routing_key" => @key, "exchange_type" => @exchange_type, "exchange_durable" => @durable.to_s, "queue_durable" => @durable.to_s }, "index" => {"bulk_size" => @es_bulk_size, "bulk_timeout" => "#{@es_bulk_timeout_ms}ms", "ordered" => @es_ordered }, } @logger.info("ElasticSearch using river", :config => river_config) Net::HTTP.start(@es_host, @es_port) do |http| req = Net::HTTP::Put.new(api_path) req.body = river_config.to_json response = http.request(req) response.value() # raise an exception if error @logger.info("River created: #{response.body}") end rescue Exception => e # TODO(petef): should we just throw an exception here, so the # agent tries to restart us and we in turn retry the river # registration? @logger.warn("Couldn't set up river. You'll have to set it up manually (or restart)", :exception => e) end check_river_status end
Private Instance Methods
check_river_status()
click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 173 def check_river_status tries = 0 success = false reason = nil begin while !success && tries <= 3 do tries += 1 Net::HTTP.start(@es_host, @es_port) do |http| req = Net::HTTP::Get.new(@status_path) response = http.request(req) response.value status = JSON.parse(response.body) @logger.debug("Checking ES river status", :status => status) if status["_source"]["error"] reason = "ES river status: #{status["_source"]["error"]}" else success = true end end sleep(2) end rescue Exception => e raise "river is not running, checking status failed: #{$!}" end raise "river is not running: #{reason}" unless success end