class Sidekiq::Stats
Retrieve runtime statistics from Redis regarding this Sidekiq
cluster.
stat = Sidekiq::Stats.new stat.processed
Public Class Methods
Public Instance Methods
Source
# File lib/sidekiq/api.rb, line 62 def default_queue_latency stat :default_queue_latency end
Source
# File lib/sidekiq/api.rb, line 160 def fetch_stats! fetch_stats_fast! fetch_stats_slow! end
@api private
Source
# File lib/sidekiq/api.rb, line 83 def fetch_stats_fast! pipe1_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| pipeline.get("stat:processed") pipeline.get("stat:failed") pipeline.zcard("schedule") pipeline.zcard("retry") pipeline.zcard("dead") pipeline.scard("processes") pipeline.lindex("queue:default", -1) end } default_queue_latency = if (entry = pipe1_res[6]) job = begin Sidekiq.load_json(entry) rescue {} end enqueued_at = job["enqueued_at"] if enqueued_at if enqueued_at.is_a?(Float) # old format now = Time.now.to_f now - enqueued_at else now = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) (now - enqueued_at) / 1000.0 end else 0.0 end else 0.0 end @stats = { processed: pipe1_res[0].to_i, failed: pipe1_res[1].to_i, scheduled_size: pipe1_res[2], retry_size: pipe1_res[3], dead_size: pipe1_res[4], processes_size: pipe1_res[5], default_queue_latency: default_queue_latency } end
O(1) redis calls @api private
Source
# File lib/sidekiq/api.rb, line 134 def fetch_stats_slow! processes = Sidekiq.redis { |conn| conn.sscan("processes").to_a } queues = Sidekiq.redis { |conn| conn.sscan("queues").to_a } pipe2_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| processes.each { |key| pipeline.hget(key, "busy") } queues.each { |queue| pipeline.llen("queue:#{queue}") } end } s = processes.size workers_size = pipe2_res[0...s].sum(&:to_i) enqueued = pipe2_res[s..].sum(&:to_i) @stats[:workers_size] = workers_size @stats[:enqueued] = enqueued @stats end
O(number of processes + number of queues) redis calls @api private
Source
# File lib/sidekiq/api.rb, line 54 def processes_size stat :processes_size end
Source
# File lib/sidekiq/api.rb, line 66 def queues Sidekiq.redis do |conn| queues = conn.sscan("queues").to_a lengths = conn.pipelined { |pipeline| queues.each do |queue| pipeline.llen("queue:#{queue}") end } array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size } array_of_arrays.to_h end end
Source
# File lib/sidekiq/api.rb, line 166 def reset(*stats) all = %w[failed processed] stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s) mset_args = [] stats.each do |stat| mset_args << "stat:#{stat}" mset_args << 0 end Sidekiq.redis do |conn| conn.mset(*mset_args) end end
@api private
Source
# File lib/sidekiq/api.rb, line 38 def scheduled_size stat :scheduled_size end
Private Instance Methods
Source
# File lib/sidekiq/api.rb, line 182 def stat(s) fetch_stats_slow! if @stats[s].nil? @stats[s] || raise(ArgumentError, "Unknown stat #{s}") end