class Metricize::Forwarder

Public Class Methods

new(options) click to toggle source
# File lib/metricize/forwarder.rb, line 5
def initialize(options)
  @password          = options.fetch(:password)
  @username          = options.fetch(:username)
  @remote_url        = options[:remote_url]     || 'metrics-api.librato.com/v1/metrics'
  @remote_timeout    = (options[:remote_timeout] || 12).to_i
  @remote_requests   = (options[:remote_requests] || 30).to_i
  @batch_size        = (options[:batch_size] || 5000).to_i
  @batch_sleep       = (options[:batch_sleep] || 1).to_i
  establish_logger(options)
  initialize_redis(options)
end

Public Instance Methods

go!() click to toggle source
# File lib/metricize/forwarder.rb, line 17
def go!
  process_metric_queue
end

Private Instance Methods

add_aggregate_info(metrics) click to toggle source
# File lib/metricize/forwarder.rb, line 62
def add_aggregate_info(metrics)
  counters, measurements = metrics.partition {|metric| metric.fetch(:name) =~ /.count$/ }
  counters = consolidate_counts(counters)
  measurements = add_value_stats(measurements)
  { :gauges => counters + measurements, :measure_time => Time.now.to_i }
end
add_stat_by_key(key, value, suffix = "") click to toggle source
# File lib/metricize/forwarder.rb, line 131
def add_stat_by_key(key, value, suffix = "")
  metric = { :name         => key.split('|')[0] + suffix,
             :value        => value }
  metric.merge!(:source => key.split('|')[1]) if key.split('|')[1]
  metric
end
add_value_stats(gauges) click to toggle source
# File lib/metricize/forwarder.rb, line 85
def add_value_stats(gauges)
  value_groups = {}
  gauges.each do | metric |
    key = [metric.fetch(:name), metric[:source]].join('|')
    value_groups[key] ||= []
    value_groups[key] << metric[:value]
  end
  value_groups.each do |key, values|
    with_error_handling do
      print_histogram(key, values)
    end
    gauges << add_stat_by_key(key, values.size, '.count').merge(counter_attributes)
    [0.50, 0.95].each do |p|
      percentile = values.extend(Stats).calculate_percentile(p)
      gauges << add_stat_by_key(key, percentile, ".#{(p*100).to_i}e")
    end
  end
  gauges
end
clear_queue() click to toggle source
# File lib/metricize/forwarder.rb, line 47
def clear_queue
  log_message "clearing queue"
  @redis.del @queue_name
end
consolidate_counts(counters) click to toggle source
# File lib/metricize/forwarder.rb, line 69
def consolidate_counts(counters)
  aggregated_counts = {}
  counters.each_with_index do |metric,i|
    # collect aggregate stats for each name+source combination
    key = [metric.fetch(:name), metric[:source]].join('|')
    aggregated_counts[key] = aggregated_counts[key].to_i + metric[:value]
  end
  aggregated_counts.map do | key, count |
    add_stat_by_key(key, count).merge(counter_attributes)
  end
end
counter_attributes() click to toggle source
# File lib/metricize/forwarder.rb, line 81
def counter_attributes
  { :attributes => {:source_aggregate => true, :summarize_function => 'sum'} }
end
lshift_queue() click to toggle source
# File lib/metricize/forwarder.rb, line 34
def lshift_queue
  return [] unless queue_length > 0
  current_batch = @redis.lrange(@queue_name, 0, @batch_size - 1)
  # ltrim indexes are 0 based and somewhat confusing -- see http://redis.io/commands/ltrim
  @redis.ltrim(@queue_name, 0, -1-@batch_size)
  current_batch.map {|metric| JSON.parse(metric, :symbolize_names => true) }
end
print_histogram(name, values) click to toggle source
process_metric_queue() click to toggle source
# File lib/metricize/forwarder.rb, line 23
def process_metric_queue
  with_error_handling do
    @remote_requests.times do |n|
      batch = lshift_queue
      return if batch.empty?
      store_metrics(add_aggregate_info(batch))
      sleep @batch_sleep # don't send a flood of small batches
    end
  end
end
queue_length() click to toggle source
# File lib/metricize/forwarder.rb, line 42
def queue_length
  log_message "queue_length=#{length = @redis.llen(@queue_name)}", :info
  length
end
store_metrics(data) click to toggle source
# File lib/metricize/forwarder.rb, line 52
def store_metrics(data)
  log_message "remote_data_sent='#{data}'"
  start_time = Time.now
  RestClient.post("https://#{@username.sub('@','%40')}:#{@password}@#{@remote_url}",
                  data.to_json,
                  :timeout      => @remote_timeout,
                  :content_type => 'application/json')
  log_message "remote_data_sent_chars=#{data.to_s.length}, remote_request_duration_ms=#{time_delta_ms(start_time)}", :info
end