class ActionPubsub::Balancer
Public Class Methods
new()
click to toggle source
# File lib/action_pubsub/balancer.rb, line 4 def initialize @receivers = [] @buffer = [] end
Public Instance Methods
dead_letter_routing()
click to toggle source
# File lib/action_pubsub/balancer.rb, line 33 def dead_letter_routing ::ActionPubsub.silent_dead_letter_handler end
distribute()
click to toggle source
# File lib/action_pubsub/balancer.rb, line 27 def distribute while !@receivers.empty? && !@buffer.empty? redirect @receivers.shift, @buffer.shift end end
on_message(message)
click to toggle source
# File lib/action_pubsub/balancer.rb, line 9 def on_message(message) case message when :subscribe @receivers << envelope.sender distribute true when :unsubscribe @receivers.delete envelope.sender true when :subscribed? @receivers.include? envelope.sender else @buffer << envelope distribute ::Concurrent::Actor::Behaviour::MESSAGE_PROCESSED end end