class ActionPubsub::Balancer

Public Class Methods

new() click to toggle source
# File lib/action_pubsub/balancer.rb, line 4
def initialize
  @receivers = []
  @buffer    = []
end

Public Instance Methods

dead_letter_routing() click to toggle source
# File lib/action_pubsub/balancer.rb, line 33
def dead_letter_routing
  ::ActionPubsub.silent_dead_letter_handler
end
distribute() click to toggle source
# File lib/action_pubsub/balancer.rb, line 27
def distribute
  while !@receivers.empty? && !@buffer.empty?
    redirect @receivers.shift, @buffer.shift
  end
end
on_message(message) click to toggle source
# File lib/action_pubsub/balancer.rb, line 9
def on_message(message)
  case message
  when :subscribe
    @receivers << envelope.sender
    distribute
    true
  when :unsubscribe
    @receivers.delete envelope.sender
    true
  when :subscribed?
    @receivers.include? envelope.sender
  else
    @buffer << envelope
    distribute
    ::Concurrent::Actor::Behaviour::MESSAGE_PROCESSED
  end
end