class Sidekiq::Throttler::RateLimit
Handles the tracking of rate limits.
TODO: Consider reducing ‘threshold` and `period` to smooth out job executions so that “24 jobs every 1 hour” becomes “1 job every 2 minutes and 30 seconds”
Constants
- LOCK
Attributes
@return [Array]
The message payload for the current job.
@return [String]
The queue to rate limit.
@return [Sidekiq::Worker]
The worker to rate limit.
Public Class Methods
@param [Sidekiq::Worker] worker
The worker to rate limit.
@param [Array<Object>] payload
The message payload for the current job.
@param [String] queue
The queue to rate limit.
@option [Symbol] :storage
Either :memory or :redis, the storage backend to use
# File lib/sidekiq/throttler/rate_limit.rb, line 42 def initialize(worker, payload, queue, options = {}) @worker = worker @payload = payload @queue = queue unless @storage_class = lookup_storage(options.fetch(:storage, :memory)) raise ArgumentError, "Unrecognized storage backend: #{options[:storage].inspect}" end end
Private Class Methods
Fetch the number of jobs executed by the provided ‘RateLimit`.
@param [RateLimit] limiter
@return [Integer]
The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 180 def self.count(limiter) LOCK.synchronize do prune(limiter) limiter.executions.count(limiter.key) end end
Increment the count of jobs executed by the provided ‘RateLimit`.
@param [RateLimit] limiter
@return [Integer]
The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 194 def self.increment(limiter) LOCK.synchronize do limiter.executions.append(limiter.key, Time.now) end count(limiter) end
Remove old entries for the provided ‘RateLimit`.
@param [RateLimit] limiter
The rate limit to prune.
# File lib/sidekiq/throttler/rate_limit.rb, line 206 def self.prune(limiter) limiter.executions.prune(limiter.key, Time.now - limiter.period) end
Public Instance Methods
Check if rate limiting options were correctly specified on the worker.
@return [true, false]
# File lib/sidekiq/throttler/rate_limit.rb, line 108 def can_throttle? [threshold, period].select(&:zero?).empty? end
Fetch the number of jobs executed.
@return [Integer]
The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 58 def count self.class.count(self) end
Set a callback to be executed when {#execute} is called and the rate limit has exceeded the threshold.
@yieldparam [Integer] delay
Delay in seconds to requeue job for.
# File lib/sidekiq/throttler/rate_limit.rb, line 141 def exceeded(&block) @exceeded = block end
Check if rate limit has exceeded the threshold.
@return [true, false]
# File lib/sidekiq/throttler/rate_limit.rb, line 116 def exceeded? count >= threshold end
Executes a callback ({#within_bounds}, or {#exceeded}) depending on the state of the rate limit.
# File lib/sidekiq/throttler/rate_limit.rb, line 148 def execute return @within_bounds.call unless can_throttle? if exceeded? @exceeded.call(period) else increment @within_bounds.call end end
Get the storage backend.
# File lib/sidekiq/throttler/rate_limit.rb, line 167 def executions @storage_class.instance end
Increment the count of jobs executed.
@return [Integer]
The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 67 def increment self.class.increment(self) end
@return [String]
The key name used when storing counters for jobs.
# File lib/sidekiq/throttler/rate_limit.rb, line 96 def key @key ||= if options['key'] options['key'].respond_to?(:call) ? options['key'].call(*payload) : options['key'] else "#{@worker.class.to_s.underscore.gsub('/', ':')}:#{@queue}" end end
Returns the rate limit options for the current running worker.
@return [{String => Float, Integer}]
# File lib/sidekiq/throttler/rate_limit.rb, line 75 def options @options ||= (worker.class.get_sidekiq_options['throttle'] || {}).stringify_keys end
@return [Float]
The number of seconds in the rate limit period.
# File lib/sidekiq/throttler/rate_limit.rb, line 89 def period @period ||= (options['period'].respond_to?(:call) ? options['period'].call(*payload) : options['period']).to_f end
Reset the tracking of job executions.
# File lib/sidekiq/throttler/rate_limit.rb, line 161 def reset! executions.reset end
@return [Integer]
The number of jobs that are allowed within the `period`.
# File lib/sidekiq/throttler/rate_limit.rb, line 82 def threshold @threshold ||= (options['threshold'].respond_to?(:call) ? options['threshold'].call(*payload) : options['threshold']).to_i end
Set a callback to be executed when {#execute} is called and the rate limit has not exceeded the threshold.
# File lib/sidekiq/throttler/rate_limit.rb, line 131 def within_bounds(&block) @within_bounds = block end
Check if rate limit is within the threshold.
@return [true, false]
# File lib/sidekiq/throttler/rate_limit.rb, line 124 def within_bounds? !exceeded? end
Private Instance Methods
Lookup storage class for a given options key
@param [Symbol] key
The options key, :memory or :redis
@return [Class]
The storage backend class, or nil if the key is not found
# File lib/sidekiq/throttler/rate_limit.rb, line 218 def lookup_storage(key) { memory: Storage::Memory, redis: Storage::Redis }[key] end