class RactorPool::Pool

Public Class Methods

new(jobs:, mapper_class:, reducer_class: RactorPool::Reducers::CollectReducer) click to toggle source
# File lib/ractor_pool/pool.rb, line 4
def initialize(jobs:, mapper_class:, reducer_class: RactorPool::Reducers::CollectReducer)
  @jobs = jobs
  @logger = Logger.new($stdout)
  @mapper_class = mapper_class
  @reducer_class = reducer_class
end

Public Instance Methods

schedule(*args, **params) click to toggle source
# File lib/ractor_pool/pool.rb, line 17
def schedule(*args, **params)
  jobs_pipe << { type: :job, args: args, params: params }
  self
end
start() click to toggle source
# File lib/ractor_pool/pool.rb, line 11
def start
  reducer
  workers
  self
end
stop() click to toggle source
# File lib/ractor_pool/pool.rb, line 22
def stop
  jobs_pipe.close!
  workers.each(&:take)
  results_pipe.close!
  result = reducer.take
  instance_variable_set(:@jobs_pipe, nil)
  instance_variable_set(:@resuts_pipe, nil)
  instance_variable_set(:@workers, nil)
  instance_variable_set(:@reducer, nil)
  result
end

Private Instance Methods

jobs_pipe() click to toggle source
# File lib/ractor_pool/pool.rb, line 36
def jobs_pipe
  @jobs_pipe ||= RactorPool::Channel.new
end
reducer() click to toggle source
# File lib/ractor_pool/pool.rb, line 64
def reducer
  @reducer ||= Ractor.new(results_pipe, @logger, @reducer_class) do |results_pipe, logger, reducer_class|
    logger.debug('Reducer: started')

    reducer = reducer_class.new(logger: logger)
    result = nil
    results_pipe.subscribe do |data|
      logger.debug("Reducer: received data: #{data}")
      result = reducer.call(**data)
    end
    Ractor.yield(result)
  end
end
results_pipe() click to toggle source
# File lib/ractor_pool/pool.rb, line 40
def results_pipe
  @results_pipe ||= RactorPool::Channel.new
end
workers() click to toggle source
# File lib/ractor_pool/pool.rb, line 44
def workers # rubocop:disable Metrics/MethodLength
  @workers ||= (1..@jobs).map do |worker_id|
    Ractor.new(worker_id, jobs_pipe, results_pipe, @mapper_class,
               @logger) do |worker_id, jobs_pipe, results_pipe, mapper_class, logger|
      logger.debug("Worker #{worker_id}: started")
      jobs_pipe.subscribe do |data|
        logger.debug("Worker #{worker_id}: received data: #{data}")
        case data
        in args: args, params: params
          logger.debug("Worker #{worker_id}: running #{mapper_class}.call(*#{args}, **#{params}))")
          results_pipe << { type: :result, worker_id: worker_id, args: args, params: params,
                            result: mapper_class.call(logger, *args, **params) }
        else
          logger.debug("Worker #{worker_id}: Unknown data received: #{msg}")
        end
      end
    end
  end
end