class Contender::Pool::PoolExecutor
Executor
that uses a thread pool to execute tasks asynchronously
Made idiomatic by Ian Unruh Original author is Doug Lea, from the JDK
Constants
- RUNNING
- SHUTDOWN
- STOP
- TERMINATED
- TIDYING
Attributes
@return [Boolean]
@return [Integer]
@return [Integer]
@return [Integer]
@return [Queue]
@return [RejectionPolicy]
@return [ThreadFactory]
@return [Float]
Public Class Methods
@raise [ArgumentError]
If the core size is less than zero
@raise [ArgumentError]
If the maximum size is less than one or less than the core size
@raise [ArgumentError]
If the work timeout is less than zero
@param [Integer] core_size
@param [Integer] maximum_size
@param [Float] work_timeout
@param [Queue] queue
Used to hold tasks that will be executed by the thread pool
@param [ThreadFactory] thread_factory
Used to create threads for new pool workers
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 48 def initialize(core_size, maximum_size, work_timeout, queue, thread_factory) raise ArgumentError if core_size < 0 raise ArgumentError if maximum_size <= 0 || maximum_size < core_size raise ArgumentError if work_timeout < 0 @queue = queue @thread_factory = thread_factory @rejection_policy = AbortPolicy.new @core_size = core_size @maximum_size = maximum_size @allow_core_timeout = false @work_timeout = work_timeout @control = Atomic.new(control_for(RUNNING, 0)) @monitor = Monitor.new @termination = @monitor.new_cond # The following instance variables are guarded by the monitor @workers = Set.new @largest_size = 0 @completed_task_count = 0 end
Public Instance Methods
@api public @return [Integer]
# File lib/contender/pool/pool_executor.rb, line 289 def active_count synchronize do @workers.count do |worker| worker.locked? end end end
@api public @param [Boolean] allow_core_timeout
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 228 def allow_core_timeout=(allow_core_timeout) raise ArgumentError if allow_core_timeout && @work_timeout <= 0 return if allow_core_timeout == @allow_core_timeout @allow_core_timeout = allow_core_timeout if allow_core_timeout interrupt_idle_workers end end
@api public @param [Float] timeout @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 132 def await_termination(timeout) synchronize do return true if terminated? @termination.wait timeout terminated? end end
@api public @return [Integer]
# File lib/contender/pool/pool_executor.rb, line 274 def backlog @queue.size end
@api public @return [Integer]
# File lib/contender/pool/pool_executor.rb, line 305 def completed_tasks synchronize do total = @completed_task_count @workers.each do |worker| total += worker.completed_task_count end total end end
@api public @param [Integer] core_size
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 185 def core_size=(core_size) raise ArgumentError if core_size < 0 if @maximum_size < core_size @maximum_size = core_size end delta = core_size - @core_size @core_size = core_size if current_control.worker_count > core_size interrupt_idle_workers elsif delta > 0 k = [delta, backlog].min # Expand the core size to meet the needs of the work backlog loop do break unless k > 0 break unless add_worker false k -= 1 break if @queue.empty? end end end
@api public @return [Integer]
# File lib/contender/pool/pool_executor.rb, line 280 def current_size synchronize do return 0 if current_control.state > STOP return @workers.size end end
@api public @raise [TaskRejectionError] If task was rejected by the thread pool @param [Object] task @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 78 def execute(task = nil, &block) task ||= block raise ArgumentError unless task return if try_core_worker task # Core worker couldn't be started with the task; enqueue it instead control = current_control if control.state == RUNNING && @queue.offer(task) after_task_enqueue task return end # Either the pool is shutting down or the queue is full unless add_worker false, task reject task end end
@api public @param [Integer] maximum_size
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 215 def maximum_size=(maximum_size) raise ArgumentError if maximum_size <= 0 || maximum_size < @core_size @maximum_size = maximum_size if current_control.worker_count > maximum_size interrupt_idle_workers end end
@api public @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 166 def prestart current_control.worker_count < @core_size && add_worker(true) end
@api public @return [Integer]
# File lib/contender/pool/pool_executor.rb, line 172 def prestart! count = 0 while add_worker true count += 1 end count end
@api public @param [Object] task @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 262 def remove(task) removed = @queue.delete task try_terminate removed end
@api public @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 103 def shutdown synchronize do advance_state_to SHUTDOWN interrupt_idle_workers on_shutdown end try_terminate end
@api public @return [Array]
# File lib/contender/pool/pool_executor.rb, line 115 def shutdown! tasks = Array.new synchronize do advance_state_to STOP interrupt_workers @queue.drain_to tasks end try_terminate tasks end
@api public @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 143 def shutdown? current_control.state != RUNNING end
@api public @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 156 def terminated? current_control.state == TERMINATED end
@api public @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 149 def terminating? state = current_control.state state > RUNNING && state < TERMINATED end
@api public @return [String]
# File lib/contender/pool/pool_executor.rb, line 321 def to_s state = current_control.state state_text = "Running" if state == RUNNING state_text = "Shutting down" if state > RUNNING && state < TERMINATED state_text = "Terminated" if state == TERMINATED "{#{state_text}" + ", pool size = #{current_size}" + ", active threads = #{active_count}" + ", queued tasks = #{backlog}" + ", completed tasks = #{completed_tasks}" + "}" end
@api public @return [Integer]
# File lib/contender/pool/pool_executor.rb, line 299 def total_tasks completed_tasks + backlog end
@api public @param [Float] work_timeout
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 243 def work_timeout=(work_timeout) raise ArgumentError if work_timeout < 0 raise ArgumentError if work_timeout == 0 && @allow_core_timeout delta = work_timeout - @work_timeout @work_timeout = work_timeout if delta < 0 interrupt_idle_workers end end
Protected Instance Methods
@param [Object] task @param [Exception] exception @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 346 def after_execution(task, exception); end
@param [Object] task @param [Thread] thread @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 341 def before_execution(task, thread); end
@return [PoolControl]
# File lib/contender/pool/pool_executor.rb, line 384 def current_control @control.value end
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 349 def on_shutdown; end
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 352 def on_termination; end
@param [Object] task @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 356 def reject(task) @rejection_policy.on_rejection task, self end
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 361 def try_terminate loop do control = current_control state = control.state worker_count = control.worker_count return if state == RUNNING || state > STOP return if state == SHUTDOWN && @queue.size > 0 if worker_count > 0 # Eligible to terminate, but there are still workers left interrupt_idle_workers true return end break if @control.compare_and_swap(control, control_for(TIDYING, 0)) end terminate end
Private Instance Methods
@param [Boolean] core @param [Object] first_task @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 499 def add_worker(core, first_task = nil) unless add_worker? core, first_task return false end worker = PoolWorker.new thread_factory, first_task synchronize do control = current_control state = control.state if state >= SHUTDOWN && !(state == SHUTDOWN && first_task.nil?) decrement_worker_count try_terminate return false end @workers.add worker count = @workers.size @largest_size = count if count > @largest_size end worker.start do run_worker worker end return true end
# File lib/contender/pool/pool_executor.rb, line 530 def add_worker?(core, first_task) loop do control = current_control state = control.state if state == SHUTDOWN return false unless first_task || @queue.size > 0 elsif state > SHUTDOWN return false end loop do worker_count = control.worker_count maximum = @core_size if core maximum ||= @maximum_size return false unless worker_count < maximum # CAS was successful, worker can be started return true if compare_and_increment_worker_count control # Pool state changed, retry the entire loop break unless current_control.state == state # Worker count changed, retry inter loop control = current_control end end end
@param [Integer] state @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 405 def advance_state_to(state) loop do ctl = current_control return if ctl.state >= state || @control.compare_and_swap(ctl, control_for(state, ctl.worker_count)) end end
@param [Object] task @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 484 def after_task_enqueue(task) control = current_control if control.state != RUNNING && remove(task) # The pool state changed after the task got enqueued, but the task was able to be # removed from the work queue before it could be processed reject task elsif control.worker_count == 0 # There are no workers to process the task, add one add_worker false end end
@param [PoolControl] exp @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 421 def compare_and_decrement_worker_count(exp) @control.compare_and_swap(exp, control_for(exp.state, exp.worker_count - 1)) end
@param [PoolControl] exp @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 415 def compare_and_increment_worker_count(exp) @control.compare_and_swap(exp, control_for(exp.state, exp.worker_count + 1)) end
@param [Integer] state @param [Integer] worker_count @return [PoolControl]
# File lib/contender/pool/pool_executor.rb, line 399 def control_for(state, worker_count) PoolControl.new state, worker_count end
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 426 def decrement_worker_count loop do return if compare_and_decrement_worker_count current_control end end
@param [Boolean] single @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 449 def interrupt_idle_workers(single = false) synchronize do @workers.each do |worker| if worker.try_lock worker.interrupt worker.unlock end break if single end end end
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 439 def interrupt_workers synchronize do @workers.each do |worker| worker.interrupt end end end
@return [Object]
# File lib/contender/pool/pool_executor.rb, line 599 def next_task timed_out = false loop do failed = false control = current_control state = control.state if state > SHUTDOWN || (state == SHUTDOWN && @queue.empty?) decrement_worker_count return end timed = false loop do worker_count = control.worker_count timed = @allow_core_timeout || worker_count > @core_size if worker_count <= @maximum_size break unless timed && timed_out end # Pool has either gone over its maximum size OR the worker thread has timed out # Now attempt to just quietly remove this worker return if compare_and_decrement_worker_count control # The compare and swap of the worker count failed unless current_control.state == state # The compare and swap operation failed because the pool state changed # Start completely over so that the pool state is checked failed = true break end # The compare and swap operation failed because the worker count changed # Start this inner loop over and ensure that this worker is still eligible for culling control = current_control end next if failed begin if timed task = @queue.poll @work_timeout else task = @queue.take end return task if task timed_out = true rescue Interrupt # Something interrupted the worker, this may signal the need to reduce the size of the # pool or prepare for termination timed_out = false end end end
@param [PoolWorker] worker @param [Boolean] clean_exit @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 664 def process_worker_exit(worker, clean_exit) unless clean_exit # If worker did not exit cleanly, then the worker count was not adjusted decrement_worker_count end synchronize do @completed_task_count += worker.completed_task_count @workers.delete worker end try_terminate control = current_control if control.state < STOP if clean_exit # Some action caused the pool size to change minimum = (@allow_core_timeout && 0) || @core_size minimum = 1 if minimum == 0 && @queue.size > 0 # No need to replace the dying worker with a new thread return unless control.worker_count < minimum end # Dying worker will be replaced with a new one add_worker false end end
@param [PoolWorker] worker @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 563 def run_worker(worker) task = worker.first_task! clean_exit = false loop do task ||= next_task break unless task worker.lock begin before_execution task, worker.thread exception = nil begin task.call rescue Exception exception = $! raise ensure after_execution task, exception end ensure task = nil worker.on_task_completion worker.unlock end end clean_exit = true ensure process_worker_exit worker, clean_exit end
@yield @return [undefined]
# File lib/contender/pool/pool_executor.rb, line 434 def synchronize(&block) @monitor.synchronize &block end
@return [undefined]
# File lib/contender/pool/pool_executor.rb, line 463 def terminate synchronize do begin on_termination ensure @control.value = control_for(TERMINATED, 0) @termination.broadcast end end end
@param [Object] first_task @return [Boolean]
# File lib/contender/pool/pool_executor.rb, line 476 def try_core_worker(first_task) if current_control.worker_count < @core_size add_worker true, first_task end end