class RactorPool::Pool
Public Class Methods
new(jobs:, mapper_class:, reducer_class: RactorPool::Reducers::CollectReducer)
click to toggle source
# File lib/ractor_pool/pool.rb, line 4 def initialize(jobs:, mapper_class:, reducer_class: RactorPool::Reducers::CollectReducer) @jobs = jobs @logger = Logger.new($stdout) @mapper_class = mapper_class @reducer_class = reducer_class end
Public Instance Methods
schedule(*args, **params)
click to toggle source
# File lib/ractor_pool/pool.rb, line 17 def schedule(*args, **params) jobs_pipe << { type: :job, args: args, params: params } self end
start()
click to toggle source
# File lib/ractor_pool/pool.rb, line 11 def start reducer workers self end
stop()
click to toggle source
# File lib/ractor_pool/pool.rb, line 22 def stop jobs_pipe.close! workers.each(&:take) results_pipe.close! result = reducer.take instance_variable_set(:@jobs_pipe, nil) instance_variable_set(:@resuts_pipe, nil) instance_variable_set(:@workers, nil) instance_variable_set(:@reducer, nil) result end
Private Instance Methods
jobs_pipe()
click to toggle source
# File lib/ractor_pool/pool.rb, line 36 def jobs_pipe @jobs_pipe ||= RactorPool::Channel.new end
reducer()
click to toggle source
# File lib/ractor_pool/pool.rb, line 64 def reducer @reducer ||= Ractor.new(results_pipe, @logger, @reducer_class) do |results_pipe, logger, reducer_class| logger.debug('Reducer: started') reducer = reducer_class.new(logger: logger) result = nil results_pipe.subscribe do |data| logger.debug("Reducer: received data: #{data}") result = reducer.call(**data) end Ractor.yield(result) end end
results_pipe()
click to toggle source
# File lib/ractor_pool/pool.rb, line 40 def results_pipe @results_pipe ||= RactorPool::Channel.new end
workers()
click to toggle source
# File lib/ractor_pool/pool.rb, line 44 def workers # rubocop:disable Metrics/MethodLength @workers ||= (1..@jobs).map do |worker_id| Ractor.new(worker_id, jobs_pipe, results_pipe, @mapper_class, @logger) do |worker_id, jobs_pipe, results_pipe, mapper_class, logger| logger.debug("Worker #{worker_id}: started") jobs_pipe.subscribe do |data| logger.debug("Worker #{worker_id}: received data: #{data}") case data in args: args, params: params logger.debug("Worker #{worker_id}: running #{mapper_class}.call(*#{args}, **#{params}))") results_pipe << { type: :result, worker_id: worker_id, args: args, params: params, result: mapper_class.call(logger, *args, **params) } else logger.debug("Worker #{worker_id}: Unknown data received: #{msg}") end end end end end