class Sidekiq::Capsule
A Sidekiq::Capsule
is the set of resources necessary to process one or more queues with a given concurrency. One “default” Capsule
is started but the user may declare additional Capsules in their initializer.
This capsule will pull jobs from the “single” queue and process the jobs with one thread, meaning the jobs will be processed serially.
Sidekiq.configure_server do |config| config.capsule("single-threaded") do |cap| cap.concurrency = 1 cap.queues = %w(single) end end
Attributes
Public Class Methods
Source
# File lib/sidekiq/capsule.rb, line 32 def initialize(name, config) @name = name @config = config @queues = ["default"] @weights = {"default" => 0} @concurrency = config[:concurrency] @mode = :strict end
Public Instance Methods
Source
# File lib/sidekiq/capsule.rb, line 80 def client_middleware @client_chain ||= config.client_middleware.copy_for(self) yield @client_chain if block_given? @client_chain end
Allow the middleware to be different per-capsule. Avoid if possible and add middleware globally so all capsules share the same chains. Easier to debug that way.
Source
# File lib/sidekiq/capsule.rb, line 41 def fetcher @fetcher ||= begin instance = (config[:fetch_class] || Sidekiq::BasicFetch).new(self) instance.setup(config[:fetch_setup]) if instance.respond_to?(:setup) instance end end
Source
# File lib/sidekiq/capsule.rb, line 96 def local_redis_pool # connection pool is lazy, it will not create connections unless you actually need them # so don't be skimpy! @redis ||= config.new_redis_pool(@concurrency, name) end
Source
# File lib/sidekiq/capsule.rb, line 124 def lookup(name) config.lookup(name) end
Source
# File lib/sidekiq/capsule.rb, line 57 def queues=(val) @weights = {} @queues = Array(val).each_with_object([]) do |qstr, memo| arr = qstr arr = qstr.split(",") if qstr.is_a?(String) name, weight = arr @weights[name] = weight.to_i [weight.to_i, 1].max.times do memo << name end end @mode = if @weights.values.all?(&:zero?) :strict elsif @weights.values.all? { |x| x == 1 } :random else :weighted end end
Sidekiq
checks queues in three modes:
-
:strict - all queues have 0 weight and are checked strictly in order
-
:weighted - queues have arbitrary weight between 1 and N
-
:random - all queues have weight of 1
Source
# File lib/sidekiq/capsule.rb, line 102 def redis raise ArgumentError, "requires a block" unless block_given? redis_pool.with do |conn| retryable = true begin yield conn rescue RedisClientAdapter::BaseError => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The same retry logic is also used in client.rb if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ conn.close retryable = false retry end raise end end end
Source
# File lib/sidekiq/capsule.rb, line 92 def redis_pool Thread.current[:sidekiq_redis_pool] || local_redis_pool end
Source
# File lib/sidekiq/capsule.rb, line 86 def server_middleware @server_chain ||= config.server_middleware.copy_for(self) yield @server_chain if block_given? @server_chain end