class Polyphony::ThreadPool
Implements a pool of threads
Attributes
size[R]
Public Class Methods
new(size = Etc.nprocessors)
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 22 def initialize(size = Etc.nprocessors) @size = size @task_queue = Polyphony::Queue.new @threads = (1..@size).map { Thread.new { thread_loop } } end
process(&block)
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 10 def self.process(&block) @default_pool ||= new @default_pool.process(&block) end
reset()
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 15 def self.reset return unless @default_pool @default_pool.stop @default_pool = nil end
Public Instance Methods
busy?()
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 43 def busy? !@task_queue.empty? end
cast(&block)
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 36 def cast(&block) setup unless @task_queue @task_queue << [block, nil] self end
process(&block)
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 28 def process(&block) setup unless @task_queue watcher = Fiber.current.auto_watcher @task_queue << [block, watcher] watcher.await end
run_queued_task()
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 53 def run_queued_task (block, watcher) = @task_queue.shift result = block.() watcher&.signal(result) rescue Exception => e watcher ? watcher.signal(e) : raise(e) end
stop()
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 61 def stop @threads.each(&:kill) @threads.each(&:join) end
thread_loop()
click to toggle source
# File lib/polyphony/core/thread_pool.rb, line 47 def thread_loop while true run_queued_task end end