module StatsD

Public Class Methods

carbon_update_str() click to toggle source
# File lib/statsd.rb, line 158
def self.carbon_update_str
  updates = []
  now = Time.now.to_i

  @@timers_mutex.synchronize do
    @@timers.each do |key, values|
      next if values.length == 0
      values.sort!
      min = values[0]
      max = values[-1]
      mean = min
      maxAtThreshold = min
      if values.length > 1
        threshold_index = ((100 - @@pct_threshold) / 100.0) * values.length
        threshold_count = values.length - threshold_index.round
        valid_values = values.slice(0, threshold_count)
        maxAtThreshold = valid_values[-1]

        sum = 0
        valid_values.each { |v| sum += v }
        mean = sum / valid_values.length
      end

      suffix = @@key_suffix ? ".#{@@key_suffix}" : ""
      updates << "stats.timers.#{key}.mean#{suffix} #{mean} #{now}"
      updates << "stats.timers.#{key}.upper#{suffix} #{max} #{now}"
      updates << "stats.timers.#{key}.upper_#{@@pct_threshold}#{suffix} " \
                "#{maxAtThreshold} #{now}"
      updates << "stats.timers.#{key}.lower#{suffix} #{min} #{now}"
      updates << "stats.timers.#{key}.count#{suffix} #{values.length} #{now}"
    end

    @@timers.each { |k, v| @@timers[k] = [] }
  end

  @@counters_mutex.synchronize do
    @@counters.each do |key, value|
      suffix = @@key_suffix ? ".#{@@key_suffix}" : ""
      updates << "stats.#{key}#{suffix} #{value / @@flush_interval} #{now}"
    end

    @@counters.each { |k, v| @@counters[k] = 0 }
  end

  return updates.length == 0 ? nil : updates.join("\n") + "\n"
end
flush() click to toggle source
# File lib/statsd.rb, line 205
def self.flush
  s = carbon_update_str
  return unless s

  begin
    Timeout::timeout(2) { self.send(@@output_func, s) }
  rescue Timeout::Error
    @@logger.warn("timed out sending update to #{@@output_url}")
  rescue
    @@logger.warn("error sending update to #{@@output_url}: #{$!}")
  end
end
flush_interval=(val) click to toggle source
# File lib/statsd.rb, line 34
def self.flush_interval=(val)
  @@flush_interval = val.to_i
end
key_suffix=(val) click to toggle source
# File lib/statsd.rb, line 42
def self.key_suffix=(val)
  @@key_suffix = val
end
logger() click to toggle source
# File lib/statsd.rb, line 28
def self.logger; return @@logger; end
logger_output=(output) click to toggle source
# File lib/statsd.rb, line 29
def self.logger_output=(output)
  @@logger = Logger.new(output)
  @@logger.progname = File.basename($0)
end
output_amqp(packet) click to toggle source
# File lib/statsd.rb, line 119
def self.output_amqp(packet)
  @target.publish(packet)
end
output_stdout(packet) click to toggle source
# File lib/statsd.rb, line 123
def self.output_stdout(packet)
  $stdout.write(packet)
end
output_tcp(packet) click to toggle source
TODO: option for persistent tcp connection

def setup_tcp end

# File lib/statsd.rb, line 64
def self.output_tcp(packet)
  server = TCPSocket.new(@@output_url.host, @@output_url.port)
  server.puts packet
  server.close
end
output_url=(url) click to toggle source
# File lib/statsd.rb, line 46
def self.output_url=(url)
  @@output_url = URI.parse(url)
  scheme_mapper = {"tcp" => [nil, :output_tcp],
                   "amqp" => [:setup_amqp, :output_amqp],
                   "stdout" => [nil, :output_stdout],
                   }
  if ! scheme_mapper.has_key?(@@output_url.scheme)
    raise TypeError, "unsupported scheme in #{url}"
  end

  setup_func, @@output_func = scheme_mapper[@@output_url.scheme]
  self.send(setup_func) if setup_func
end
pct_threshold=(val) click to toggle source
# File lib/statsd.rb, line 38
def self.pct_threshold=(val)
  @@pct_threshold = val.to_i
end
setup_amqp() click to toggle source
# File lib/statsd.rb, line 70
def self.setup_amqp
  begin
    require "amqp"
    require "mq"
  rescue LoadError
    @@logger.fatal("missing amqp ruby module. try gem install amqp")
    exit(1)
  end

  user = @@output_url.user || ""
  user, vhost = user.split("@", 2)
  _, mqtype, mqname = @@output_url.path.split("/", 3)
  amqp_settings = {
    :host => @@output_url.host,
    :port => @@output_url.port || 5672,
    :user => user,
    :pass => @@output_url.password,
    :vhost => vhost || "/",
  }

  @amqp = AMQP.connect(amqp_settings)
  @mq = MQ.new(@amqp)
  @target = nil
  opts = {:durable => true,
          :auto_delete => false,
         }
 
  if @@output_url.query
    @@output_url.query.split("&").each do |param|
      k, v = param.split("=", 2)
      opts[:durable] = false if k == "durable" and v == "false"
      opts[:auto_delete] = true if k == "autodelete" and v == "true"
    end
  end

  @@logger.info(opts.inspect)

  case mqtype
    when "fanout"
      @target = @mq.fanout(mqname, opts)
    when "queue"
      @target = @mq.queue(mqname, opts)
    when "topic"
      @target = @mq.topic(mqname, opts)
    else
      raise TypeError, "unknown mq output type #{mqname}"
  end
end

Public Instance Methods

receive_data(packet) click to toggle source
# File lib/statsd.rb, line 127
def receive_data(packet)
  bits = packet.strip.split(":")
  key = bits.shift.gsub(/\s+/, "_") \
                  .gsub(/\//, "-") \
                  .gsub(/[^a-zA-Z_\-0-9\.]/, "")
  bits << "1" if bits.length == 0
  bits.each do |bit|
    fields = bit.split("|")
    if fields.length != 2
      $stderr.puts "invalid update: #{bit}"
      next
    end

    if fields[1] == "ms" # timer update
      @@timers_mutex.synchronize do
        @@timers[key] << fields[0].to_f
      end
    elsif fields[1] == "c" # counter update
      count, sample_rate = fields[0].split("@", 2)
      sample_rate ||= 1
      #puts "count is #{count.to_f} (#{count})"
      #puts "multiplier is is #{1 / sample_rate.to_f}"
      @@counters_mutex.synchronize do
        @@counters[key] += count.to_f * (1 / sample_rate.to_f)
      end
    else
      $stderr.puts "invalid field in update: #{bit}"
    end
  end
end