class RactorPool::Channel

Public Class Methods

new() click to toggle source
# File lib/ractor_pool/channel.rb, line 4
def initialize # rubocop:disable Metrics/MethodLength
  @pipe = Ractor.new do
    listeners_count = 0
    loop do
      msg = Ractor.recv
      case msg
      in type: :close
        listeners_count.times { Ractor.yield(msg) }
        break
      in type: :subscription
        listeners_count += 1
        Ractor.yield({ type: :skip })
      in type: :data
        Ractor.yield({ type: :data, data: msg[:data] })
      else
      end
    end
  end
end

Public Instance Methods

<<(data) click to toggle source
# File lib/ractor_pool/channel.rb, line 24
def <<(data)
  @pipe.send({ type: :data, data: data })
end
close!() click to toggle source
# File lib/ractor_pool/channel.rb, line 28
def close!
  @pipe.send({ type: :close }, move: true)
end
subscribe() { |msg| ... } click to toggle source
# File lib/ractor_pool/channel.rb, line 32
def subscribe
  @pipe.send({ type: :subscription }, move: true)
  loop do
    msg = @pipe.take
    case msg
      in type: :close
      return
      in type: :data
      yield(msg[:data])
      else
    end
  end
rescue Ractor::ClosedError # rubocop:disable Lint/SuppressedException
end