class RactorPool::Channel
Public Class Methods
new()
click to toggle source
# File lib/ractor_pool/channel.rb, line 4 def initialize # rubocop:disable Metrics/MethodLength @pipe = Ractor.new do listeners_count = 0 loop do msg = Ractor.recv case msg in type: :close listeners_count.times { Ractor.yield(msg) } break in type: :subscription listeners_count += 1 Ractor.yield({ type: :skip }) in type: :data Ractor.yield({ type: :data, data: msg[:data] }) else end end end end
Public Instance Methods
<<(data)
click to toggle source
# File lib/ractor_pool/channel.rb, line 24 def <<(data) @pipe.send({ type: :data, data: data }) end
close!()
click to toggle source
# File lib/ractor_pool/channel.rb, line 28 def close! @pipe.send({ type: :close }, move: true) end
subscribe() { |msg| ... }
click to toggle source
# File lib/ractor_pool/channel.rb, line 32 def subscribe @pipe.send({ type: :subscription }, move: true) loop do msg = @pipe.take case msg in type: :close return in type: :data yield(msg[:data]) else end end rescue Ractor::ClosedError # rubocop:disable Lint/SuppressedException end