class Polyphony::ThreadPool

Implements a pool of threads

Attributes

size[R]

Public Class Methods

new(size = Etc.nprocessors) click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 22
def initialize(size = Etc.nprocessors)
  @size = size
  @task_queue = Polyphony::Queue.new
  @threads = (1..@size).map { Thread.new { thread_loop } }
end
process(&block) click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 10
def self.process(&block)
  @default_pool ||= new
  @default_pool.process(&block)
end
reset() click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 15
def self.reset
  return unless @default_pool

  @default_pool.stop
  @default_pool = nil
end

Public Instance Methods

busy?() click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 43
def busy?
  !@task_queue.empty?
end
cast(&block) click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 36
def cast(&block)
  setup unless @task_queue

  @task_queue << [block, nil]
  self
end
process(&block) click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 28
def process(&block)
  setup unless @task_queue

  watcher = Fiber.current.auto_watcher
  @task_queue << [block, watcher]
  watcher.await
end
run_queued_task() click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 53
def run_queued_task
  (block, watcher) = @task_queue.shift
  result = block.()
  watcher&.signal(result)
rescue Exception => e
  watcher ? watcher.signal(e) : raise(e)
end
stop() click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 61
def stop
  @threads.each(&:kill)
  @threads.each(&:join)
end
thread_loop() click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 47
def thread_loop
  while true
    run_queued_task
  end
end