class LogStash::Outputs::CloudWatch

This output lets you aggregate and send metric data to AWS CloudWatch

#### Summary: This plugin is intended to be used on a logstash indexer agent (but that is not the only way, see below.) In the intended scenario, one cloudwatch output plugin is configured, on the logstash indexer node, with just AWS API credentials, and possibly a region and/or a namespace. The output looks for fields present in events, and when it finds them, it uses them to calculate aggregate statistics. If the `metricname` option is set in this output, then any events which pass through it will be aggregated & sent to CloudWatch, but that is not recommended. The intended use is to NOT set the metricname option here, and instead to add a `CW_metricname` field (and other fields) to only the events you want sent to CloudWatch.

When events pass through this output they are queued for background aggregation and sending, which happens every minute by default. The queue has a maximum size, and when it is full aggregated statistics will be sent to CloudWatch ahead of schedule. Whenever this happens a warning message is written to logstash's log. If you see this you should increase the `queue_size` configuration option to avoid the extra API calls. The queue is emptied every time we send data to CloudWatch.

Note: when logstash is stopped the queue is destroyed before it can be processed. This is a known limitation of logstash and will hopefully be addressed in a future version.

#### Details: There are two ways to configure this plugin, and they can be used in combination: event fields & per-output defaults

Event Field configuration… You add fields to your events in inputs & filters and this output reads those fields to aggregate events. The names of the fields read are configurable via the `field_*` options.

Per-output defaults… You set universal defaults in this output plugin's configuration, and if an event does not have a field for that option then the default is used.

Notice, the event fields take precedence over the per-output defaults.

At a minimum events must have a “metric name” to be sent to CloudWatch. This can be achieved either by providing a default here OR by adding a `CW_metricname` field. By default, if no other configuration is provided besides a metric name, then events will be counted (Unit: Count, Value: 1) by their metric name (either a default or from their `CW_metricname` field)

Other fields which can be added to events to modify the behavior of this plugin are, `CW_namespace`, `CW_unit`, `CW_value`, and `CW_dimensions`. All of these field names are configurable in this output. You can also set per-output defaults for any of them. See below for details.

Read more about [AWS CloudWatch](aws.amazon.com/cloudwatch/), and the specific of API endpoint this output uses, [PutMetricData](docs.amazonwebservices.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html)

Constants

COUNT
COUNT_UNIT

Units

DIMENSIONS

Constants aggregate_key members

MAX
METRIC
MIN
NONE
SUM
TIMESTAMP
UNIT
VALID_UNITS

Public Instance Methods

aws_service_endpoint(region) click to toggle source
# File lib/logstash/outputs/cloudwatch.rb, line 156
def aws_service_endpoint(region)
  return {
      :cloud_watch_endpoint => "monitoring.#{region}.amazonaws.com"
  }
end
receive(event) click to toggle source
# File lib/logstash/outputs/cloudwatch.rb, line 179
def receive(event)
  return unless output?(event)

  if event == LogStash::SHUTDOWN
    job.trigger()
    job.unschedule()
    @logger.info("CloudWatch aggregator thread shutdown.")
    finished
    return
  end

  return unless (event[@field_metricname] || @metricname)

  if (@event_queue.length >= @event_queue.max)
    @job.trigger
    @logger.warn("Posted to AWS CloudWatch ahead of schedule.  If you see this often, consider increasing the cloudwatch queue_size option.")
  end

  @logger.info("Queueing event", :event => event)
  @event_queue << event
end
register() click to toggle source
# File lib/logstash/outputs/cloudwatch.rb, line 163
def register
  require "thread"
  require "rufus/scheduler"
  require "aws"

  @cw = AWS::CloudWatch.new(aws_options_hash)

  @event_queue = SizedQueue.new(@queue_size)
  @scheduler = Rufus::Scheduler.start_new
  @job = @scheduler.every @timeframe do
    @logger.info("Scheduler Activated")
    publish(aggregate({}))
  end
end

Private Instance Methods

aggregate(aggregates) click to toggle source
# File lib/logstash/outputs/cloudwatch.rb, line 245
def aggregate(aggregates)
  @logger.info("QUEUE SIZE ", :queuesize => @event_queue.size)
  while !@event_queue.empty? do
    begin
      count(aggregates, @event_queue.pop(true))
    rescue Exception => e
      @logger.warn("Exception!  Breaking count loop", :exception => e)
      break
    end
  end
  return aggregates
end
count(aggregates, event) click to toggle source
# File lib/logstash/outputs/cloudwatch.rb, line 259
def count(aggregates, event)
  # If the event doesn't declare a namespace, use the default
  fnamespace = field(event, @field_namespace)
  namespace = (fnamespace ? fnamespace : event.sprintf(@namespace))

  funit = field(event, @field_unit)
  unit = (funit ? funit : event.sprintf(@unit))

  fvalue = field(event, @field_value)
  value = (fvalue ? fvalue : event.sprintf(@value))

  # We may get to this point with valid Units but missing value.  Send zeros.
  val = (!value) ? 0.0 : value.to_f

  # Event provides exactly one (but not both) of value or unit
  if ( (fvalue == nil) ^ (funit == nil) )
    @logger.warn("Likely config error: event has one of #{@field_value} or #{@field_unit} fields but not both.", :event => event)
  end

  # If Unit is still not set or is invalid warn about misconfiguration & use NONE
  if (!VALID_UNITS.include?(unit))
    unit = NONE
    @logger.warn("Likely config error: invalid or missing Units (#{unit.to_s}), using '#{NONE}' instead", :event => event)
  end

  if (!aggregates[namespace])
    aggregates[namespace] = {}
  end

  dims = event[@field_dimensions]
  if (dims) # event provides dimensions
            # validate the structure
    if (!dims.is_a?(Array) || dims.length == 0 || (dims.length % 2) != 0)
      @logger.warn("Likely config error: CloudWatch dimensions field (#{dims.to_s}) found which is not a positive- & even-length array.  Ignoring it.", :event => event)
      dims = nil
    end
            # Best case, we get here and exit the conditional because dims...
            # - is an array
            # - with positive length
            # - and an even number of elements
  elsif (@dimensions.is_a?(Hash)) # event did not provide dimensions, but the output has been configured with a default
    dims = @dimensions.flatten.map{|d| event.sprintf(d)} # into the kind of array described just above
  else
    dims = nil
  end

  fmetric = field(event, @field_metricname)
  aggregate_key = {
      METRIC => (fmetric ? fmetric : event.sprintf(@metricname)),
      DIMENSIONS => dims,
      UNIT => unit,
      TIMESTAMP => event.sprintf("%{+YYYY-MM-dd'T'HH:mm:00Z}")
  }

  if (!aggregates[namespace][aggregate_key])
    aggregates[namespace][aggregate_key] = {}
  end

  if (!aggregates[namespace][aggregate_key][MAX] || val > aggregates[namespace][aggregate_key][MAX])
    aggregates[namespace][aggregate_key][MAX] = val
  end

  if (!aggregates[namespace][aggregate_key][MIN] || val < aggregates[namespace][aggregate_key][MIN])
    aggregates[namespace][aggregate_key][MIN] = val
  end

  if (!aggregates[namespace][aggregate_key][COUNT])
    aggregates[namespace][aggregate_key][COUNT] = 1
  else
    aggregates[namespace][aggregate_key][COUNT] += 1
  end

  if (!aggregates[namespace][aggregate_key][SUM])
    aggregates[namespace][aggregate_key][SUM] = val
  else
    aggregates[namespace][aggregate_key][SUM] += val
  end
end
field(event, fieldname) click to toggle source
# File lib/logstash/outputs/cloudwatch.rb, line 339
def field(event, fieldname)
  if !event[fieldname]
    return nil
  else
    if event[fieldname].is_a?(Array)
      return event[fieldname][0]
    else
      return event[fieldname]
    end
  end
end
publish(aggregates) click to toggle source
# File lib/logstash/outputs/cloudwatch.rb, line 202
def publish(aggregates)
  aggregates.each do |namespace, data|
    @logger.info("Namespace, data: ", :namespace => namespace, :data => data)
    metric_data = []
    data.each do |aggregate_key, stats|
      new_data = {
          :metric_name => aggregate_key[METRIC],
          :timestamp => aggregate_key[TIMESTAMP],
          :unit => aggregate_key[UNIT],
          :statistic_values => {
              :sample_count => stats[COUNT],
              :sum => stats[SUM],
              :minimum => stats[MIN],
              :maximum => stats[MAX],
          }
      }
      dims = aggregate_key[DIMENSIONS]
      if (dims.is_a?(Array) && dims.length > 0 && (dims.length % 2) == 0)
        new_data[:dimensions] = Array.new
        i = 0
        while (i < dims.length)
          new_data[:dimensions] << {:name => dims[i], :value => dims[i+1]}
          i += 2
        end
      end
      metric_data << new_data
    end # data.each

    begin
      @cw.put_metric_data(
          :namespace => namespace,
          :metric_data => metric_data
      )
      @logger.info("Sent data to AWS CloudWatch OK", :namespace => namespace, :metric_data => metric_data)
    rescue Exception => e
      @logger.warn("Failed to send to AWS CloudWatch", :exception => e, :namespace => namespace, :metric_data => metric_data)
      break
    end
  end # aggregates.each
  return aggregates
end