class LogStash::Inputs::Zenoss

Read Zenoss events from the zenoss.zenevents fanout exchange.

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/zenoss.rb, line 35
def register
  super
  require "logstash/util/zenoss"
  require "bunny"
end
run(queue) click to toggle source
# File lib/logstash/inputs/zenoss.rb, line 41
def run(queue)
  begin
    zep = Org::Zenoss::Protobufs::Zep

    @logger.debug("Connecting with RabbitMQ settings #{@rabbitmq_settings.inspect}")
    @bunny = Bunny.new(@rabbitmq_settings)
    return if terminating?
    @bunny.start
    @bunny.qos({:prefetch_count => @prefetch_count})

    @arguments_hash = Hash[*@arguments]

    @logger.debug("Setting up queue #{@name.inspect}")
    @queue = @bunny.queue(@name, {
      :durable => @durable,
      :auto_delete => @auto_delete,
      :exclusive => @exclusive,
      :arguments => @arguments_hash
    })

    @queue.bind(@exchange, :key => @key)

    @queue.subscribe({:ack => @ack}) do |data|

      # Zenoss can optionally compress message payloads.
      if data[:header].content_encoding == "deflate"
        data[:payload] = Zlib::Inflate.inflate(data[:payload])
      end

      # Decode the payload into an EventSummary.
      summary = zep::EventSummary.decode(data[:payload])

      # This should never happen, but skip it if it does.
      next unless summary.occurrence.length > 0

      occurrence = summary.occurrence[0]
      #timestamp = DateTime.strptime(occurrence.created_time.to_s, "%Q").to_s
      timestamp = Time.at(occurrence.created_time / 1000.0)

      # LogStash event properties.
      event = LogStash::Event.new(
        "@timestamp" => timestamp,
        "type" => @type,
        "host" => occurrence.actor.element_title,
        "message" => occurrence.message,
      )
      decorate(event)

      # Direct mappings from summary.
      %w{uuid}.each do |property|
        property_value = occurrence.send property
        if !property_value.nil?
          event[property] = property_value
        end
      end

      # Direct mappings from occurrence.
      %w{
        fingerprint event_class event_class_key event_key event_group agent
        syslog_facility nt_event_code monitor
      }.each do |property|
        property_value = occurrence.send property
        if !property_value.nil?
          event[property] = property_value
        end
      end

      # Enum Mappings.
      event["severity"] = zep::EventSeverity.constants[occurrence.severity]

      if !occurrence.status.nil?
        event["status"] = zep::EventStatus.constants[occurrence.status]
      end

      if !occurrence.syslog_priority.nil?
        event["syslog_priority"] = zep::SyslogPriority.constants[
          occurrence.syslog_priority]
      end

      # Extra Details.
      if !occurrence.details.nil?
        occurrence.details.each do |detail|
          if detail.value.length == 1
            event[detail.name] = detail.value[0]
          else
            event[detail.name] = detail.value
          end
        end
      end

      queue << event
    end # @queue.subscribe

  rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
    @logger.error("RabbitMQ connection error, will reconnect: #{e}")
    # Sleep for a bit before retrying.
    # TODO(sissel): Write 'backoff' method?
    sleep(1)
    retry
  end # begin/rescue
end