class SuckerPunch::Queue

Constants

DEFAULT_EXECUTOR_OPTIONS
DEFAULT_MAX_QUEUE_SIZE
PAUSE_TIME
QUEUES

Attributes

name[R]

Public Class Methods

all() click to toggle source
# File lib/sucker_punch/queue.rb, line 33
def self.all
  queues = Concurrent::Array.new
  QUEUES.each_pair do |name, pool|
    queues.push new(name, pool)
  end
  queues
end
clear() click to toggle source
# File lib/sucker_punch/queue.rb, line 41
def self.clear
  # susceptible to race conditions--only use in testing
  old = all
  QUEUES.clear
  SuckerPunch::Counter::Busy.clear
  SuckerPunch::Counter::Processed.clear
  SuckerPunch::Counter::Failed.clear
  old.each { |queue| queue.kill }
end
find_or_create(name, num_workers = 2, num_jobs_max = nil) click to toggle source
# File lib/sucker_punch/queue.rb, line 19
def self.find_or_create(name, num_workers = 2, num_jobs_max = nil)
  pool = QUEUES.fetch_or_store(name) do
    options = DEFAULT_EXECUTOR_OPTIONS
      .merge(
        min_threads: num_workers,
        max_threads: num_workers,
        max_queue: num_jobs_max || DEFAULT_MAX_QUEUE_SIZE
      )
    Concurrent::ThreadPoolExecutor.new(**options)
  end

  new(name, pool)
end
new(name, pool) click to toggle source
Calls superclass method
# File lib/sucker_punch/queue.rb, line 134
def initialize(name, pool)
  super()
  @running = true
  @name, @pool = name, pool
end
shutdown_all() click to toggle source
# File lib/sucker_punch/queue.rb, line 74
def self.shutdown_all
  deadline = Time.now + SuckerPunch.shutdown_timeout

  if SuckerPunch::RUNNING.make_false
    # If a job is enqueued right before the script exits
    # (command line, rake task, etc.), the system needs an
    # interval to allow the enqueue jobs to make it in to the system
    # otherwise the queue will look idle
    sleep PAUSE_TIME

    queues = all

    # Issue shutdown to each queue and let them wrap up their work. This
    # prevents new jobs from being enqueued and lets the pool clean up
    # after itself
    queues.each { |queue| queue.shutdown }

    # return if every queue is empty and workers in every queue are idle
    return if queues.all? { |queue| queue.idle? }

    SuckerPunch.logger.info("Pausing to allow workers to finish...")

    remaining = deadline - Time.now

    # Continue to loop through each queue and test if it's idle, while
    # respecting the shutdown timeout
    while remaining > PAUSE_TIME
      return if queues.all? { |queue| queue.idle? }
      sleep PAUSE_TIME
      remaining = deadline - Time.now
    end

    # Queues haven't finished work. Aggressively kill them.
    SuckerPunch.logger.warn("Queued jobs didn't finish before shutdown_timeout...killing remaining jobs")
    queues.each { |queue| queue.kill }
  end
end
stats() click to toggle source
# File lib/sucker_punch/queue.rb, line 51
def self.stats
  queues = {}

  all.each do |queue|
    queues[queue.name] = {
      "workers" => {
        "total" => queue.total_workers,
        "busy" => queue.busy_workers,
        "idle" => queue.idle_workers,
      },
      "jobs" => {
        "processed" => queue.processed_jobs,
        "failed" => queue.failed_jobs,
        "enqueued" => queue.enqueued_jobs,
      }
    }
  end

  queues
end

Public Instance Methods

==(other) click to toggle source
# File lib/sucker_punch/queue.rb, line 148
def ==(other)
  pool == other.pool
end
busy_workers() click to toggle source
# File lib/sucker_punch/queue.rb, line 152
def busy_workers
  SuckerPunch::Counter::Busy.new(name).value
end
failed_jobs() click to toggle source
# File lib/sucker_punch/queue.rb, line 164
def failed_jobs
  SuckerPunch::Counter::Failed.new(name).value
end
idle?() click to toggle source
# File lib/sucker_punch/queue.rb, line 144
def idle?
  enqueued_jobs == 0 && busy_workers == 0
end
idle_workers() click to toggle source
# File lib/sucker_punch/queue.rb, line 156
def idle_workers
  total_workers - busy_workers
end
kill() click to toggle source
# File lib/sucker_punch/queue.rb, line 179
def kill
  @pool.kill
end
post(*args, &block) click to toggle source
# File lib/sucker_punch/queue.rb, line 168
def post(*args, &block)
  synchronize do
    if @running
      @pool.post(*args, &block)
    else
      false
    end
  end
end
processed_jobs() click to toggle source
# File lib/sucker_punch/queue.rb, line 160
def processed_jobs
  SuckerPunch::Counter::Processed.new(name).value
end
running?() click to toggle source
# File lib/sucker_punch/queue.rb, line 140
def running?
  synchronize { @running }
end
shutdown() click to toggle source
# File lib/sucker_punch/queue.rb, line 183
def shutdown
  synchronize { @running = false }
  @pool.shutdown
end

Protected Instance Methods

pool() click to toggle source
# File lib/sucker_punch/queue.rb, line 190
def pool
  @pool
end