module HireFire::Macro::Sidekiq::JobQueueLatency
@!visibility private
Public Instance Methods
Source
# File lib/hirefire/macro/sidekiq.rb, line 105 def call(*queues, skip_retries: false, skip_scheduled: false) require "sidekiq/api" queues = normalize_queues(queues, allow_empty: true) latencies = [] latencies << enqueued_latency(queues) latencies << set_latency(::Sidekiq::RetrySet.new, queues) unless skip_retries latencies << set_latency(::Sidekiq::ScheduledSet.new, queues) unless skip_scheduled latencies.max end
Private Instance Methods
Source
# File lib/hirefire/macro/sidekiq.rb, line 118 def enqueued_latency(queues) queues = registered_queues if queues.empty? oldest_jobs = ::Sidekiq.redis do |conn| conn.pipelined do |pipeline| queues.each do |queue| pipeline.lindex("queue:#{queue}", -1) end end end max_latencies = oldest_jobs.map do |job_payload| job = job_payload ? JSON.parse(job_payload) : {} if Gem::Version.new(::Sidekiq::VERSION) >= Gem::Version.new("8.0.0") job["enqueued_at"] ? Time.now.to_i - job["enqueued_at"] / 1000 : 0 else job["enqueued_at"] ? Time.now.to_f - job["enqueued_at"] : 0.0 end end Integer(max_latencies.max || 0) end
Source
# File lib/hirefire/macro/sidekiq.rb, line 142 def set_latency(set, queues) max_latency = 0.0 now = Time.now find_each_in_set(set) do |job| if job.at > now break elsif queues.empty? || queues.include?(job.queue) max_latency = now - job.at break end end max_latency end