module HireFire::Macro::Sidekiq::JobQueueSize
@!visibility private
Constants
- SERVER_SIDE_SCRIPT
- SERVER_SIDE_SCRIPT_SHA
Public Instance Methods
Source
# File lib/hirefire/macro/sidekiq.rb, line 264 def call(*queues, server: false, **options) require "sidekiq/api" queues = normalize_queues(queues, allow_empty: true) if server server_lookup(queues, **options) else client_lookup(queues, **options) end end
Private Instance Methods
Source
# File lib/hirefire/macro/sidekiq.rb, line 278 def client_lookup(queues, skip_retries: false, skip_scheduled: false, skip_working: false, max_scheduled: nil) size = enqueued_size(queues) size += scheduled_size(queues, max_scheduled) unless skip_scheduled size += retry_size(queues) unless skip_retries size += working_size(queues) unless skip_working size end
Source
# File lib/hirefire/macro/sidekiq.rb, line 355 def count_with_redis(connection, *args) connection.evalsha(SERVER_SIDE_SCRIPT_SHA, argv: args) rescue Redis::CommandError => e if e.message.include?("NOSCRIPT") connection.script(:load, SERVER_SIDE_SCRIPT) retry else raise end end
Source
# File lib/hirefire/macro/sidekiq.rb, line 366 def count_with_redis_client(connection, *args) connection.call("evalsha", SERVER_SIDE_SCRIPT_SHA, 0, *args) rescue RedisClient::CommandError => e if e.message.include?("NOSCRIPT") connection.call("script", "load", SERVER_SIDE_SCRIPT) retry else raise end end
Source
# File lib/hirefire/macro/sidekiq.rb, line 286 def enqueued_size(queues) queues = registered_queues if queues.empty? ::Sidekiq.redis do |conn| conn.pipelined do |pipeline| queues.each { |name| pipeline.llen("queue:#{name}") } end end.sum end
Source
# File lib/hirefire/macro/sidekiq.rb, line 310 def retry_size(queues) size = 0 now = Time.now find_each_in_set(::Sidekiq::RetrySet.new) do |job| if job.at > now break elsif queues.empty? || queues.include?(job["queue"]) size += 1 end end size end
Source
# File lib/hirefire/macro/sidekiq.rb, line 296 def scheduled_size(queues, max = nil) size, now = 0, Time.now find_each_in_set(::Sidekiq::ScheduledSet.new) do |job| if job.at > now || max && size >= max break elsif queues.empty? || queues.include?(job["queue"]) size += 1 end end size end
Source
# File lib/hirefire/macro/sidekiq.rb, line 338 def server_lookup(queues, skip_scheduled: false, skip_retries: false, skip_working: false, max_scheduled: 0) ::Sidekiq.redis do |connection| now = Time.now.to_i skip_scheduled = skip_scheduled ? 1 : 0 skip_retries = skip_retries ? 1 : 0 skip_working = skip_working ? 1 : 0 if defined?(::Sidekiq::RedisClientAdapter::CompatClient) && connection.is_a?(::Sidekiq::RedisClientAdapter::CompatClient) count_with_redis_client(connection, now, max_scheduled, skip_scheduled, skip_retries, skip_working, *queues) elsif defined?(::Redis) && connection.is_a?(::Redis) count_with_redis(connection, now, max_scheduled, skip_scheduled, skip_retries, skip_working, *queues) else raise "Unsupported Redis connection type: #{connection.class}" end end end
Source
# File lib/hirefire/macro/sidekiq.rb, line 325 def working_size(queues) now = Time.now now_as_i = now.to_i ::Sidekiq::Workers.new.count do |key, tid, job| if job.is_a?(Hash) # Sidekiq < 7.2.1 (queues.empty? || queues.include?(job["queue"])) && job["run_at"] <= now_as_i else # Sidekiq >= 7.2.1 (queues.empty? || queues.include?(job.queue)) && job.run_at <= now end end end