class Sidekiq::Throttler::RateLimit

Handles the tracking of rate limits.

TODO: Consider reducing ‘threshold` and `period` to smooth out job executions so that “24 jobs every 1 hour” becomes “1 job every 2 minutes and 30 seconds”

Constants

LOCK

Attributes

payload[R]

@return [Array]

The message payload for the current job.
queue[R]

@return [String]

The queue to rate limit.
worker[R]

@return [Sidekiq::Worker]

The worker to rate limit.

Public Class Methods

new(worker, payload, queue, options = {}) click to toggle source

@param [Sidekiq::Worker] worker

The worker to rate limit.

@param [Array<Object>] payload

The message payload for the current job.

@param [String] queue

The queue to rate limit.

@option [Symbol] :storage

Either :memory or :redis, the storage backend to use
# File lib/sidekiq/throttler/rate_limit.rb, line 42
def initialize(worker, payload, queue, options = {})
  @worker = worker
  @payload = payload
  @queue = queue

  unless @storage_class = lookup_storage(options.fetch(:storage, :memory))
    raise ArgumentError,
      "Unrecognized storage backend: #{options[:storage].inspect}"
  end
end

Private Class Methods

count(limiter) click to toggle source

Fetch the number of jobs executed by the provided ‘RateLimit`.

@param [RateLimit] limiter

@return [Integer]

The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 180
def self.count(limiter)
  LOCK.synchronize do
    prune(limiter)
    limiter.executions.count(limiter.key)
  end
end
increment(limiter) click to toggle source

Increment the count of jobs executed by the provided ‘RateLimit`.

@param [RateLimit] limiter

@return [Integer]

The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 194
def self.increment(limiter)
  LOCK.synchronize do
    limiter.executions.append(limiter.key, Time.now)
  end
  count(limiter)
end
prune(limiter) click to toggle source

Remove old entries for the provided ‘RateLimit`.

@param [RateLimit] limiter

The rate limit to prune.
# File lib/sidekiq/throttler/rate_limit.rb, line 206
def self.prune(limiter)
  limiter.executions.prune(limiter.key, Time.now - limiter.period)
end

Public Instance Methods

can_throttle?() click to toggle source

Check if rate limiting options were correctly specified on the worker.

@return [true, false]

# File lib/sidekiq/throttler/rate_limit.rb, line 108
def can_throttle?
  [threshold, period].select(&:zero?).empty?
end
count() click to toggle source

Fetch the number of jobs executed.

@return [Integer]

The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 58
def count
  self.class.count(self)
end
exceeded(&block) click to toggle source

Set a callback to be executed when {#execute} is called and the rate limit has exceeded the threshold.

@yieldparam [Integer] delay

Delay in seconds to requeue job for.
# File lib/sidekiq/throttler/rate_limit.rb, line 141
def exceeded(&block)
  @exceeded = block
end
exceeded?() click to toggle source

Check if rate limit has exceeded the threshold.

@return [true, false]

# File lib/sidekiq/throttler/rate_limit.rb, line 116
def exceeded?
  count >= threshold
end
execute() click to toggle source

Executes a callback ({#within_bounds}, or {#exceeded}) depending on the state of the rate limit.

# File lib/sidekiq/throttler/rate_limit.rb, line 148
def execute
  return @within_bounds.call unless can_throttle?

  if exceeded?
    @exceeded.call(period)
  else
    increment
    @within_bounds.call
  end
end
executions() click to toggle source

Get the storage backend.

# File lib/sidekiq/throttler/rate_limit.rb, line 167
def executions
  @storage_class.instance
end
increment() click to toggle source

Increment the count of jobs executed.

@return [Integer]

The current number of jobs executed.
# File lib/sidekiq/throttler/rate_limit.rb, line 67
def increment
  self.class.increment(self)
end
key() click to toggle source

@return [String]

The key name used when storing counters for jobs.
# File lib/sidekiq/throttler/rate_limit.rb, line 96
def key
  @key ||= if options['key']
    options['key'].respond_to?(:call) ? options['key'].call(*payload) : options['key']
  else
    "#{@worker.class.to_s.underscore.gsub('/', ':')}:#{@queue}"
  end
end
options() click to toggle source

Returns the rate limit options for the current running worker.

@return [{String => Float, Integer}]

# File lib/sidekiq/throttler/rate_limit.rb, line 75
def options
  @options ||= (worker.class.get_sidekiq_options['throttle'] || {}).stringify_keys
end
period() click to toggle source

@return [Float]

The number of seconds in the rate limit period.
# File lib/sidekiq/throttler/rate_limit.rb, line 89
def period
  @period ||= (options['period'].respond_to?(:call) ? options['period'].call(*payload) : options['period']).to_f
end
reset!() click to toggle source

Reset the tracking of job executions.

# File lib/sidekiq/throttler/rate_limit.rb, line 161
def reset!
  executions.reset
end
threshold() click to toggle source

@return [Integer]

The number of jobs that are allowed within the `period`.
# File lib/sidekiq/throttler/rate_limit.rb, line 82
def threshold
  @threshold ||= (options['threshold'].respond_to?(:call) ? options['threshold'].call(*payload) : options['threshold']).to_i
end
within_bounds(&block) click to toggle source

Set a callback to be executed when {#execute} is called and the rate limit has not exceeded the threshold.

# File lib/sidekiq/throttler/rate_limit.rb, line 131
def within_bounds(&block)
  @within_bounds = block
end
within_bounds?() click to toggle source

Check if rate limit is within the threshold.

@return [true, false]

# File lib/sidekiq/throttler/rate_limit.rb, line 124
def within_bounds?
  !exceeded?
end

Private Instance Methods

lookup_storage(key) click to toggle source

Lookup storage class for a given options key

@param [Symbol] key

The options key, :memory or :redis

@return [Class]

The storage backend class, or nil if the key is not found
# File lib/sidekiq/throttler/rate_limit.rb, line 218
def lookup_storage(key)
  { memory: Storage::Memory, redis: Storage::Redis }[key]
end