class LogStash::Outputs::ElasticSearchRiver

This output lets you store logs in elasticsearch. It's similar to the 'elasticsearch' output but improves performance by using a queue server, rabbitmq, to send data to elasticsearch.

Upon startup, this output will automatically contact an elasticsearch cluster and configure it to read from the queue to which we write.

You can learn more about elasticseasrch at <elasticsearch.org> More about the elasticsearch rabbitmq river plugin: <github.com/elasticsearch/elasticsearch-river-rabbitmq/blob/master/README.md>

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 202
def receive(event)
  return unless output?(event)
  # River events have a format of
  # "action\ndata\n"
  # where 'action' is index or delete, data is the data to index.
  header = { "index" => { "_index" => event.sprintf(@index), "_type" => event.sprintf(@index_type) } }
  if !@document_id.nil?
    header["index"]["_id"] = event.sprintf(@document_id)
  end

  @mq.publish_serialized(header.to_json + "\n" + event.to_json + "\n")
end
register() click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 88
def register

  # TODO(sissel): find a better way of declaring where the elasticsearch
  # libraries are
  # TODO(sissel): can skip this step if we're running from a jar.
  jarpath = File.join(File.dirname(__FILE__), "../../../vendor/**/*.jar")
  Dir[jarpath].each do |jar|
      require jar
  end
  prepare_river
end

Protected Instance Methods

prepare_river() click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 101
def prepare_river
  require "logstash/outputs/rabbitmq"

  # Configure the message plugin
  params = {
    "host" => [@rabbitmq_host],
    "port" => [@rabbitmq_port],
    "user" => [@user],
    "password" => [@password],
    "exchange_type" => [@exchange_type],
    "exchange" => [@exchange],
    "key" => [@key],
    "vhost" => [@vhost],
    "durable" => [@durable.to_s],
    "persistent" => [@persistent.to_s],
    "debug" => [@debug.to_s],
  }.reject {|k,v| v.first.nil?}
  @mq = LogStash::Outputs::RabbitMQ.new(params)
  @mq.register

  # Set up the river
  begin
    auth = "#{@user}:#{@password}"

    # Name the river by our hostname
    require "socket"
    hostname = Socket.gethostname
    
    # Replace spaces with hyphens and remove all non-alpha non-dash non-underscore characters
    river_name = "#{hostname} #{@queue}".gsub(' ', '-').gsub(/[^\w-]/, '')
    
    api_path = "/_river/logstash-#{river_name}/_meta"
    @status_path = "/_river/logstash-#{river_name}/_status"

    river_config = {"type" => "rabbitmq",
                    "rabbitmq" => {
                              "host" => @rabbitmq_host=="localhost" ? hostname : @rabbitmq_host,
                              "port" => @rabbitmq_port,
                              "user" => @user,
                              "pass" => @password,
                              "vhost" => @vhost,
                              "queue" => @queue,
                              "exchange" => @exchange,
                              "routing_key" => @key,
                              "exchange_type" => @exchange_type,
                              "exchange_durable" => @durable.to_s,
                              "queue_durable" => @durable.to_s
                             },
                    "index" => {"bulk_size" => @es_bulk_size,
                               "bulk_timeout" => "#{@es_bulk_timeout_ms}ms",
                               "ordered" => @es_ordered
                              },
                   }
    @logger.info("ElasticSearch using river", :config => river_config)
    Net::HTTP.start(@es_host, @es_port) do |http|
      req = Net::HTTP::Put.new(api_path)
      req.body = river_config.to_json
      response = http.request(req)
      response.value() # raise an exception if error
      @logger.info("River created: #{response.body}")
    end
  rescue Exception => e
    # TODO(petef): should we just throw an exception here, so the
    # agent tries to restart us and we in turn retry the river
    # registration?
    @logger.warn("Couldn't set up river. You'll have to set it up manually (or restart)", :exception => e)
  end

  check_river_status
end

Private Instance Methods

check_river_status() click to toggle source
# File lib/logstash/outputs/elasticsearch_river.rb, line 173
def check_river_status
  tries = 0
  success = false
  reason = nil
  begin
    while !success && tries <= 3 do
      tries += 1
      Net::HTTP.start(@es_host, @es_port) do |http|
        req = Net::HTTP::Get.new(@status_path)
        response = http.request(req)
        response.value
        status = JSON.parse(response.body)
        @logger.debug("Checking ES river status", :status => status)
        if status["_source"]["error"]
          reason = "ES river status: #{status["_source"]["error"]}"
        else
          success = true
        end
      end
      sleep(2)
    end
  rescue Exception => e
    raise "river is not running, checking status failed: #{$!}"
  end

  raise "river is not running: #{reason}" unless success
end