class Contender::Pool::PoolExecutor

Executor that uses a thread pool to execute tasks asynchronously

Made idiomatic by Ian Unruh Original author is Doug Lea, from the JDK

Constants

RUNNING
SHUTDOWN
STOP
TERMINATED
TIDYING

Attributes

allow_core_timeout[R]

@return [Boolean]

core_size[R]

@return [Integer]

largest_size[R]

@return [Integer]

maximum_size[R]

@return [Integer]

queue[R]

@return [Queue]

rejection_policy[RW]

@return [RejectionPolicy]

thread_factory[R]

@return [ThreadFactory]

work_timeout[R]

@return [Float]

Public Class Methods

new(core_size, maximum_size, work_timeout, queue, thread_factory) click to toggle source

@raise [ArgumentError]

If the core size is less than zero

@raise [ArgumentError]

If the maximum size is less than one or less than the core size

@raise [ArgumentError]

If the work timeout is less than zero

@param [Integer] core_size @param [Integer] maximum_size @param [Float] work_timeout @param [Queue] queue

Used to hold tasks that will be executed by the thread pool

@param [ThreadFactory] thread_factory

Used to create threads for new pool workers

@return [undefined]

# File lib/contender/pool/pool_executor.rb, line 48
def initialize(core_size, maximum_size, work_timeout, queue, thread_factory)
  raise ArgumentError if core_size < 0
  raise ArgumentError if maximum_size <= 0 || maximum_size < core_size
  raise ArgumentError if work_timeout < 0

  @queue = queue
  @thread_factory = thread_factory
  @rejection_policy = AbortPolicy.new

  @core_size = core_size
  @maximum_size = maximum_size
  @allow_core_timeout = false
  @work_timeout = work_timeout

  @control = Atomic.new(control_for(RUNNING, 0))

  @monitor = Monitor.new
  @termination = @monitor.new_cond

  # The following instance variables are guarded by the monitor
  @workers = Set.new

  @largest_size = 0
  @completed_task_count = 0
end

Public Instance Methods

active_count() click to toggle source

@api public @return [Integer]

# File lib/contender/pool/pool_executor.rb, line 289
def active_count
  synchronize do
    @workers.count do |worker|
      worker.locked?
    end
  end
end
allow_core_timeout=(allow_core_timeout) click to toggle source

@api public @param [Boolean] allow_core_timeout @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 228
def allow_core_timeout=(allow_core_timeout)
  raise ArgumentError if allow_core_timeout && @work_timeout <= 0

  return if allow_core_timeout == @allow_core_timeout

  @allow_core_timeout = allow_core_timeout

  if allow_core_timeout
    interrupt_idle_workers
  end
end
await_termination(timeout) click to toggle source

@api public @param [Float] timeout @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 132
def await_termination(timeout)
  synchronize do
    return true if terminated?

    @termination.wait timeout
    terminated?
  end
end
backlog() click to toggle source

@api public @return [Integer]

# File lib/contender/pool/pool_executor.rb, line 274
def backlog
  @queue.size
end
completed_tasks() click to toggle source

@api public @return [Integer]

# File lib/contender/pool/pool_executor.rb, line 305
def completed_tasks
  synchronize do
    total = @completed_task_count

    @workers.each do |worker|
      total += worker.completed_task_count
    end

    total
  end
end
core_size=(core_size) click to toggle source

@api public @param [Integer] core_size @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 185
def core_size=(core_size)
  raise ArgumentError if core_size < 0

  if @maximum_size < core_size
    @maximum_size = core_size
  end

  delta = core_size - @core_size
  @core_size = core_size

  if current_control.worker_count > core_size
    interrupt_idle_workers
  elsif delta > 0
    k = [delta, backlog].min

    # Expand the core size to meet the needs of the work backlog
    loop do
      break unless k > 0
      break unless add_worker false

      k -= 1

      break if @queue.empty?
    end
  end
end
current_size() click to toggle source

@api public @return [Integer]

# File lib/contender/pool/pool_executor.rb, line 280
def current_size
  synchronize do
    return 0 if current_control.state > STOP
    return @workers.size
  end
end
execute(task = nil, &block) click to toggle source

@api public @raise [TaskRejectionError] If task was rejected by the thread pool @param [Object] task @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 78
def execute(task = nil, &block)
  task ||= block

  raise ArgumentError unless task

  return if try_core_worker task

  # Core worker couldn't be started with the task; enqueue it instead
  control = current_control

  if control.state == RUNNING && @queue.offer(task)
    after_task_enqueue task
    return
  end

  # Either the pool is shutting down or the queue is full
  unless add_worker false, task
    reject task
  end
end
maximum_size=(maximum_size) click to toggle source

@api public @param [Integer] maximum_size @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 215
def maximum_size=(maximum_size)
  raise ArgumentError if maximum_size <= 0 || maximum_size < @core_size

  @maximum_size = maximum_size

  if current_control.worker_count > maximum_size
    interrupt_idle_workers
  end
end
prestart() click to toggle source

@api public @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 166
def prestart
  current_control.worker_count < @core_size && add_worker(true)
end
prestart!() click to toggle source

@api public @return [Integer]

# File lib/contender/pool/pool_executor.rb, line 172
def prestart!
  count = 0

  while add_worker true
    count += 1
  end

  count
end
remove(task) click to toggle source

@api public @param [Object] task @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 262
def remove(task)
  removed = @queue.delete task
  try_terminate
  removed
end
shutdown() click to toggle source

@api public @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 103
def shutdown
  synchronize do
    advance_state_to SHUTDOWN
    interrupt_idle_workers
    on_shutdown
  end

  try_terminate
end
shutdown!() click to toggle source

@api public @return [Array]

# File lib/contender/pool/pool_executor.rb, line 115
def shutdown!
  tasks = Array.new

  synchronize do
    advance_state_to STOP
    interrupt_workers

    @queue.drain_to tasks
  end

  try_terminate
  tasks
end
shutdown?() click to toggle source

@api public @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 143
def shutdown?
  current_control.state != RUNNING
end
terminated?() click to toggle source

@api public @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 156
def terminated?
  current_control.state == TERMINATED
end
terminating?() click to toggle source

@api public @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 149
def terminating?
  state = current_control.state
  state > RUNNING && state < TERMINATED
end
to_s() click to toggle source

@api public @return [String]

# File lib/contender/pool/pool_executor.rb, line 321
def to_s
  state = current_control.state

  state_text = "Running" if state == RUNNING
  state_text = "Shutting down" if state > RUNNING && state < TERMINATED
  state_text = "Terminated" if state == TERMINATED

  "{#{state_text}" +
  ", pool size = #{current_size}" +
  ", active threads = #{active_count}" +
  ", queued tasks = #{backlog}" +
  ", completed tasks = #{completed_tasks}" +
  "}"
end
total_tasks() click to toggle source

@api public @return [Integer]

# File lib/contender/pool/pool_executor.rb, line 299
def total_tasks
  completed_tasks + backlog
end
work_timeout=(work_timeout) click to toggle source

@api public @param [Float] work_timeout @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 243
def work_timeout=(work_timeout)
  raise ArgumentError if work_timeout < 0
  raise ArgumentError if work_timeout == 0 && @allow_core_timeout

  delta = work_timeout - @work_timeout
  @work_timeout = work_timeout

  if delta < 0
    interrupt_idle_workers
  end
end

Protected Instance Methods

after_execution(task, exception) click to toggle source

@param [Object] task @param [Exception] exception @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 346
def after_execution(task, exception); end
before_execution(task, thread) click to toggle source

@param [Object] task @param [Thread] thread @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 341
def before_execution(task, thread); end
current_control() click to toggle source

@return [PoolControl]

# File lib/contender/pool/pool_executor.rb, line 384
def current_control
  @control.value
end
on_shutdown() click to toggle source

@return [undefined]

# File lib/contender/pool/pool_executor.rb, line 349
def on_shutdown; end
on_termination() click to toggle source

@return [undefined]

# File lib/contender/pool/pool_executor.rb, line 352
def on_termination; end
reject(task) click to toggle source

@param [Object] task @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 356
def reject(task)
  @rejection_policy.on_rejection task, self
end
try_terminate() click to toggle source

@return [undefined]

# File lib/contender/pool/pool_executor.rb, line 361
def try_terminate
  loop do
    control = current_control

    state = control.state
    worker_count = control.worker_count

    return if state == RUNNING || state > STOP
    return if state == SHUTDOWN && @queue.size > 0

    if worker_count > 0
      # Eligible to terminate, but there are still workers left
      interrupt_idle_workers true
      return
    end

    break if @control.compare_and_swap(control, control_for(TIDYING, 0))
  end

  terminate
end

Private Instance Methods

add_worker(core, first_task = nil) click to toggle source

@param [Boolean] core @param [Object] first_task @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 499
def add_worker(core, first_task = nil)
  unless add_worker? core, first_task
    return false
  end

  worker = PoolWorker.new thread_factory, first_task

  synchronize do
    control = current_control
    state = control.state

    if state >= SHUTDOWN && !(state == SHUTDOWN && first_task.nil?)
      decrement_worker_count
      try_terminate

      return false
    end

    @workers.add worker

    count = @workers.size
    @largest_size = count if count > @largest_size
  end

  worker.start do
    run_worker worker
  end

  return true
end
add_worker?(core, first_task) click to toggle source
# File lib/contender/pool/pool_executor.rb, line 530
def add_worker?(core, first_task)
  loop do
    control = current_control
    state = control.state

    if state == SHUTDOWN
      return false unless first_task || @queue.size > 0
    elsif state > SHUTDOWN
      return false
    end

    loop do
      worker_count = control.worker_count

      maximum = @core_size if core
      maximum ||= @maximum_size

      return false unless worker_count < maximum

      # CAS was successful, worker can be started
      return true if compare_and_increment_worker_count control

      # Pool state changed, retry the entire loop
      break unless current_control.state == state

      # Worker count changed, retry inter loop
      control = current_control
    end
  end
end
advance_state_to(state) click to toggle source

@param [Integer] state @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 405
def advance_state_to(state)
  loop do
    ctl = current_control
    return if ctl.state >= state ||
      @control.compare_and_swap(ctl, control_for(state, ctl.worker_count))
  end
end
after_task_enqueue(task) click to toggle source

@param [Object] task @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 484
def after_task_enqueue(task)
  control = current_control
  if control.state != RUNNING && remove(task)
    # The pool state changed after the task got enqueued, but the task was able to be
    # removed from the work queue before it could be processed
    reject task
  elsif control.worker_count == 0
    # There are no workers to process the task, add one
    add_worker false
  end
end
compare_and_decrement_worker_count(exp) click to toggle source

@param [PoolControl] exp @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 421
def compare_and_decrement_worker_count(exp)
  @control.compare_and_swap(exp, control_for(exp.state, exp.worker_count - 1))
end
compare_and_increment_worker_count(exp) click to toggle source

@param [PoolControl] exp @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 415
def compare_and_increment_worker_count(exp)
  @control.compare_and_swap(exp, control_for(exp.state, exp.worker_count + 1))
end
control_for(state, worker_count) click to toggle source

@param [Integer] state @param [Integer] worker_count @return [PoolControl]

# File lib/contender/pool/pool_executor.rb, line 399
def control_for(state, worker_count)
  PoolControl.new state, worker_count
end
decrement_worker_count() click to toggle source

@return [undefined]

# File lib/contender/pool/pool_executor.rb, line 426
def decrement_worker_count
  loop do
    return if compare_and_decrement_worker_count current_control
  end
end
interrupt_idle_workers(single = false) click to toggle source

@param [Boolean] single @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 449
def interrupt_idle_workers(single = false)
  synchronize do
    @workers.each do |worker|
      if worker.try_lock
        worker.interrupt
        worker.unlock
      end

      break if single
    end
  end
end
interrupt_workers() click to toggle source

@return [undefined]

# File lib/contender/pool/pool_executor.rb, line 439
def interrupt_workers
  synchronize do
    @workers.each do |worker|
      worker.interrupt
    end
  end
end
next_task() click to toggle source

@return [Object]

# File lib/contender/pool/pool_executor.rb, line 599
def next_task
  timed_out = false

  loop do
    failed = false

    control = current_control
    state = control.state

    if state > SHUTDOWN || (state == SHUTDOWN && @queue.empty?)
      decrement_worker_count
      return
    end

    timed = false

    loop do
      worker_count = control.worker_count
      timed = @allow_core_timeout || worker_count > @core_size

      if worker_count <= @maximum_size
        break unless timed && timed_out
      end

      # Pool has either gone over its maximum size OR the worker thread has timed out
      # Now attempt to just quietly remove this worker
      return if compare_and_decrement_worker_count control

      # The compare and swap of the worker count failed
      unless current_control.state == state
        # The compare and swap operation failed because the pool state changed
        # Start completely over so that the pool state is checked
        failed = true
        break
      end

      # The compare and swap operation failed because the worker count changed
      # Start this inner loop over and ensure that this worker is still eligible for culling

      control = current_control
    end

    next if failed

    begin
      if timed
        task = @queue.poll @work_timeout
      else
        task = @queue.take
      end

      return task if task

      timed_out = true
    rescue Interrupt
      # Something interrupted the worker, this may signal the need to reduce the size of the
      # pool or prepare for termination
      timed_out = false
    end
  end
end
process_worker_exit(worker, clean_exit) click to toggle source

@param [PoolWorker] worker @param [Boolean] clean_exit @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 664
def process_worker_exit(worker, clean_exit)
  unless clean_exit
    # If worker did not exit cleanly, then the worker count was not adjusted
    decrement_worker_count
  end

  synchronize do
    @completed_task_count += worker.completed_task_count
    @workers.delete worker
  end

  try_terminate

  control = current_control
  if control.state < STOP
    if clean_exit
      # Some action caused the pool size to change
      minimum = (@allow_core_timeout && 0) || @core_size
      minimum = 1 if minimum == 0 && @queue.size > 0

      # No need to replace the dying worker with a new thread
      return unless control.worker_count < minimum
    end

    # Dying worker will be replaced with a new one
    add_worker false
  end
end
run_worker(worker) click to toggle source

@param [PoolWorker] worker @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 563
def run_worker(worker)
  task = worker.first_task!

  clean_exit = false

  loop do
    task ||= next_task
    break unless task

    worker.lock
    begin
      before_execution task, worker.thread
      exception = nil

      begin
        task.call
      rescue Exception
        exception = $!
        raise
      ensure
        after_execution task, exception
      end
    ensure
      task = nil

      worker.on_task_completion
      worker.unlock
    end
  end

  clean_exit = true
ensure
  process_worker_exit worker, clean_exit
end
synchronize(&block) click to toggle source

@yield @return [undefined]

# File lib/contender/pool/pool_executor.rb, line 434
def synchronize(&block)
  @monitor.synchronize &block
end
terminate() click to toggle source

@return [undefined]

# File lib/contender/pool/pool_executor.rb, line 463
def terminate
  synchronize do
    begin
      on_termination
    ensure
      @control.value = control_for(TERMINATED, 0)
      @termination.broadcast
    end
  end
end
try_core_worker(first_task) click to toggle source

@param [Object] first_task @return [Boolean]

# File lib/contender/pool/pool_executor.rb, line 476
def try_core_worker(first_task)
  if current_control.worker_count < @core_size
    add_worker true, first_task
  end
end