redis.replicate_commands()

——– BEGIN keys ——— local digests_set = KEYS local expiring_digests_set = KEYS local schedule_set = KEYS local retry_set = KEYS ——– END keys ———

——– BEGIN argv ——— local reaper_count = tonumber(ARGV) local threshold = tonumber(ARGV) ——– END argv ———

——– BEGIN injected arguments ——– local current_time = tonumber(ARGV) local debug_lua = tostring(ARGV) == “1” local max_history = tonumber(ARGV) local script_name = ARGV .. “.lua” local redisversion = ARGV ——— END injected arguments ———

——– BEGIN local functions ——– <%= include_partial “shared/_common.lua” %> <%= include_partial “shared/_find_digest_in_queues.lua” %> <%= include_partial “shared/_find_digest_in_sorted_set.lua” %> <%= include_partial “shared/_find_digest_in_process_set.lua” %> ———- END local functions ———-

——– BEGIN delete_orphaned.lua ——– log_debug(“BEGIN”) local found = false local per = 50 local total = redis.call(“ZCARD”, digests_set) local index = 0 local del_count = 0 local redis_ver = toversion(redisversion)

repeat

log_debug("Interating through:", digests_set, "for orphaned locks")
local digests  = redis.call("ZREVRANGE", digests_set, index, index + per -1)

for _, digest in pairs(digests) do
  log_debug("Searching for digest:", digest, "in", schedule_set)
  found = find_digest_in_sorted_set(schedule_set, digest)

  if found ~= true then
    log_debug("Searching for digest:", digest, "in", retry_set)
    found = find_digest_in_sorted_set(retry_set, digest)
  end

  if found ~= true then
    log_debug("Searching for digest:", digest, "in all queues")
    local queue = find_digest_in_queues(digest)

    if queue then
      log_debug("found digest:", digest, "in queue:", queue)
      found = true
    end
  end

  -- TODO: Add check for jobs checked out by process
  if found ~= true then
    log_debug("Searching for digest:", digest, "in process sets")
    found = find_digest_in_process_set(digest, threshold)
  end

  if found ~= true then
    local queued     = digest .. ":QUEUED"
    local primed     = digest .. ":PRIMED"
    local locked     = digest .. ":LOCKED"
    local info       = digest .. ":INFO"
    local run_digest = digest .. ":RUN"
    local run_queued = digest .. ":RUN:QUEUED"
    local run_primed = digest .. ":RUN:PRIMED"
    local run_locked = digest .. ":RUN:LOCKED"
    local run_info   = digest .. ":RUN:INFO"

    redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info)

    redis.call("ZREM", digests_set, digest)
    del_count = del_count + 1
  end
end

index = index + per

until index >= total or del_count >= reaper_count

if del_count < reaper_count then

index = 0
total = redis.call("ZCOUNT", expiring_digests_set, 0, current_time)
repeat
  local digests = redis.call("ZRANGEBYSCORE", expiring_digests_set, 0, current_time, "LIMIT", index, index + per -1)

  for _, digest in pairs(digests) do
    local queued     = digest .. ":QUEUED"
    local primed     = digest .. ":PRIMED"
    local locked     = digest .. ":LOCKED"
    local info       = digest .. ":INFO"
    local run_digest = digest .. ":RUN"
    local run_queued = digest .. ":RUN:QUEUED"
    local run_primed = digest .. ":RUN:PRIMED"
    local run_locked = digest .. ":RUN:LOCKED"
    local run_info   = digest .. ":RUN:INFO"

    redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info)

    redis.call("ZREM", expiring_digests_set, digest)
    del_count = del_count + 1
  end

  index = index + per
until index >= total or del_count >= reaper_count

end

log_debug(“END”) return del_count