class SidekiqUniqueJobs::Orphans::RubyReaper
Class DeleteOrphans provides deletion of orphaned digests
@note this is a much slower version of the lua script but does not crash redis
@author Mikael Henriksson <mikael@mhenrixon.com>
Constants
- MAX_QUEUE_LENGTH
-
@return [Integer] the maximum combined length of sidekiq queues for running the reaper
- RUN_SUFFIX
-
@return [String] the suffix for :RUN locks
- SIDEKIQ_BEAT_PAUSE
-
@return [Integer] a best guess of Sidekiq::Launcher::BEAT_PAUSE
Attributes
@!attribute [r] digests
@return [SidekiqUniqueJobs::Digests] digest collection
@!attribute [r] retried
@return [Redis::SortedSet] the Sidekiq RetrySet
@!attribute [r] scheduled
@return [Redis::SortedSet] the Sidekiq ScheduleSet
@!attribute [r] start_time
@return [Integer] The clock stamp this execution started represented as integer (used for redis compatibility as it is more accurate than time)
@!attribute [r] start_time
@return [Integer] The timestamp this execution started represented as Time (used for locks)
@!attribute [r] timeout_ms
@return [Integer] The allowed ms before timeout
Public Class Methods
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 58 def initialize(conn) super @digests = SidekiqUniqueJobs::Digests.new @scheduled = Redis::SortedSet.new(SCHEDULE) @retried = Redis::SortedSet.new(RETRY) @start_time = Time.now @start_source = time_source.call @timeout_ms = SidekiqUniqueJobs.config.reaper_timeout * 1000 end
Initialize a new instance of DeleteOrphans
@param [Redis] conn a connection to redis
SidekiqUniqueJobs::Orphans::Reaper::new
Public Instance Methods
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 199 def active?(digest) Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a return false if procs.empty? procs.sort.each do |key| valid, workers = conn.pipelined do |pipeline| # TODO: Remove the if statement in the future if pipeline.respond_to?(:exists?) pipeline.exists?(key) else pipeline.exists(key) end pipeline.hgetall("#{key}:work") end next unless valid next unless workers.any? workers.each_pair do |_tid, job| next unless (item = safe_load_json(job)) payload = safe_load_json(item[PAYLOAD]) return true if match?(digest, payload[LOCK_DIGEST]) return true if considered_active?(payload[CREATED_AT]) end end false end end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 154 def belongs_to_job?(digest) scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest) end
Checks if the digest has a matching job.
1. It checks the scheduled set 2. It checks the retry set 3. It goes through all queues
@param [String] digest the digest to search for
@return [true] when either of the checks return true @return [false] when no job was found for this digest
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 74 def call return if queues_very_full? BatchDelete.call(expired_digests, conn) BatchDelete.call(orphans, conn) # orphans.each_slice(500) do |chunk| # conn.pipelined do |pipeline| # chunk.each do |digest| # next if belongs_to_job?(digest) # pipeline.zadd(ORPHANED_DIGESTS, now_f, digest) # end # end # end end
Delete orphaned digests
@return [Integer] the number of reaped locks
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 238 def considered_active?(time_f) max_score < time_f end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 138 def elapsed_ms time_source.call - start_source end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 187 def enqueued?(digest) Sidekiq.redis do |conn| queues(conn) do |queue| entries(conn, queue) do |entry| return true if entry.include?(digest) end end false end end
Checks if the digest exists in a Sidekiq::Queue
@param [String] digest the current digest
@return [true] when digest exists in any queue
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 255 def entries(conn, queue, &block) queue_key = "queue:#{queue}" initial_size = conn.llen(queue_key) deleted_size = 0 page = 0 page_size = 50 loop do range_start = (page * page_size) - deleted_size range_end = range_start + page_size - 1 entries = conn.lrange(queue_key, range_start, range_end) page += 1 break if entries.empty? entries.each(&block) deleted_size = initial_size - conn.llen(queue_key) # The queue is growing, not shrinking, just keep looping deleted_size = 0 if deleted_size.negative? end end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 91 def expired_digests conn.zrange(EXPIRING_DIGESTS, 0, max_score, "byscore") end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 306 def in_sorted_set?(key, digest) conn.zscan(key, match: "*#{digest}*", count: 1).to_a.any? end
Checks a sorted set for the existance of this digest
@param [String] key the key for the sorted set @param [String] digest the digest to scan for
@return [true] when found @return [false] when missing
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 232 def match?(key_one, key_two) return false if key_one.nil? || key_two.nil? key_one.delete_suffix(RUN_SUFFIX) == key_two.delete_suffix(RUN_SUFFIX) end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 99 def max_score (start_time - reaper_timeout - SIDEKIQ_BEAT_PAUSE).to_f end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 95 def orphaned_digests conn.zrange(ORPHANED_DIGESTS, 0, max_score, "byscore") end
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 109 def orphans orphans = [] page = 0 per = reaper_count * 2 results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per) while results.size.positive? results.each do |digest| break if timeout? next if belongs_to_job?(digest) orphans << digest break if orphans.size >= reaper_count end break if timeout? break if orphans.size >= reaper_count page += 1 results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per) end orphans end
Find orphaned digests
@return [Array<String>] an array of orphaned digests
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 251 def queues(conn, &block) conn.sscan("queues").each(&block) end
Loops through all the redis queues and yields them one by one
@param [Redis] conn the connection to use for fetching queues
@return [void]
@yield queues one at a time
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 284 def queues_very_full? total_queue_size = 0 Sidekiq.redis do |conn| queues(conn) do |queue| total_queue_size += conn.llen("queue:#{queue}") return true if total_queue_size > MAX_QUEUE_LENGTH end end false end
If sidekiq queues are very full, it becomes highly inefficient for the reaper because it must check every queued job to verify a digest is safe to delete The reaper checks queued jobs in batches of 50, adding 2 reads per digest With a queue length of 1,000 jobs, that’s over 20 extra reads per digest.
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 176 def retried?(digest) in_sorted_set?(RETRY, digest) end
Checks if the digest exists in the Sidekiq::RetrySet
@param [String] digest the current digest
@return [true] when digest exists in retry set
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 165 def scheduled?(digest) in_sorted_set?(SCHEDULE, digest) end
Checks if the digest exists in the Sidekiq::ScheduledSet
@param [String] digest the current digest
@return [true] when digest exists in scheduled set
Source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 134 def timeout? elapsed_ms >= timeout_ms end