class Librato::Collector::Aggregator
maintains storage of timing and measurement type measurements
Constants
- SEPARATOR
Attributes
Public Class Methods
new(options={})
click to toggle source
# File lib/librato/collector/aggregator.rb, line 16 def initialize(options={}) @cache = Librato::Metrics::Aggregator.new(prefix: options[:prefix]) @percentiles = {} @lock = Mutex.new @default_tags = options.fetch(:default_tags, {}) end
Public Instance Methods
[](key)
click to toggle source
# File lib/librato/collector/aggregator.rb, line 23 def [](key) fetch(key) end
delete_all()
click to toggle source
clear all stored values
# File lib/librato/collector/aggregator.rb, line 45 def delete_all @lock.synchronize { clear_storage } end
fetch(key, options={})
click to toggle source
retrieve current value of a metric/source/percentage. this exists primarily for debugging/testing and isn't called routinely.
# File lib/librato/collector/aggregator.rb, line 29 def fetch(key, options={}) return nil if @cache.empty? return fetch_percentile(key, options) if options[:percentile] measurements = nil tags = options[:tags] || @default_tags @lock.synchronize { measurements = @cache.queued[:measurements] } measurements.each do |metric| if metric[:name] == key.to_s return metric if !tags && !metric[:tags] return metric if tags == metric[:tags] end end nil end
flush_to(queue, opts={})
click to toggle source
transfer all measurements to queue and reset internal status
# File lib/librato/collector/aggregator.rb, line 50 def flush_to(queue, opts={}) queued = nil @lock.synchronize do return if @cache.empty? queued = @cache.queued flush_percentiles(queue, opts) unless @percentiles.empty? clear_storage unless opts[:preserve] end queue.merge!(queued) if queued end
measure(*args) { || ... }
click to toggle source
@example Simple measurement
measure 'sources_returned', sources.length
@example Simple timing in milliseconds
timing 'twitter.lookup', 2.31
@example Block-based timing
timing 'db.query' do do_my_query end
@example Custom source
measure 'user.all_orders', user.order_count, :source => user.id
# File lib/librato/collector/aggregator.rb, line 75 def measure(*args, &block) options = {} event = args[0].to_s returned = nil # handle block or specified argument if block_given? start = Time.now returned = yield value = ((Time.now - start) * 1000.0).to_i elsif args[1] value = args[1] else raise "no value provided" end # detect options hash if present if args.length > 1 and args[-1].respond_to?(:each) options = args[-1] end percentiles = Array(options[:percentile]) source = options[:source] tags_option = options[:tags] tags_option = { source: source } if source && !tags_option tags = if tags_option && options[:inherit_tags] @default_tags.merge(tags_option) elsif tags_option tags_option else @default_tags end @lock.synchronize do payload = { value: value } payload.merge!({ tags: tags }) if tags @cache.add event => payload percentiles.each do |perc| store = fetch_percentile_store(event, payload) store[:reservoir] << value track_percentile(store, perc) end end returned end
Also aliased as: timing
Private Instance Methods
clear_storage()
click to toggle source
# File lib/librato/collector/aggregator.rb, line 126 def clear_storage @cache.clear @percentiles.each do |key, val| val[:reservoir].clear val[:percs].clear end end
fetch_percentile(key, options)
click to toggle source
# File lib/librato/collector/aggregator.rb, line 134 def fetch_percentile(key, options) store = fetch_percentile_store(key, options) return nil unless store store[:reservoir].percentile(options[:percentile]) end
fetch_percentile_store(event, options)
click to toggle source
# File lib/librato/collector/aggregator.rb, line 140 def fetch_percentile_store(event, options) keyname = event if options[:tags] keyname = Librato::Metrics::Util.build_key_for(keyname, options[:tags]) end @percentiles[keyname] ||= { name: event, reservoir: Hetchy::Reservoir.new(size: 1000), percs: Set.new } @percentiles[keyname].merge!({ tags: options[:tags] }) if options && options[:tags] @percentiles[keyname] end
flush_percentiles(queue, opts)
click to toggle source
# File lib/librato/collector/aggregator.rb, line 156 def flush_percentiles(queue, opts) @percentiles.each do |key, val| val[:percs].each do |perc| perc_name = perc.to_s[0,5].gsub('.','') payload = if val[:tags] { value: val[:reservoir].percentile(perc), tags: val[:tags] } else val[:reservoir].percentile(perc) end queue.add "#{val[:name]}.p#{perc_name}" => payload end end end
track_percentile(store, perc)
click to toggle source
# File lib/librato/collector/aggregator.rb, line 171 def track_percentile(store, perc) if perc < 0.0 || perc > 100.0 raise InvalidPercentile, "Percentiles must be between 0.0 and 100.0" end store[:percs].add(perc) end