class Pubsubstub::Subscription
Attributes
Public Class Methods
Source
# File lib/pubsubstub/subscription.rb, line 7 def initialize(channels, connection) @id = Random.rand(2 ** 64) @connection = connection @channels = channels @queue = Queue.new end
Public Instance Methods
Source
# File lib/pubsubstub/subscription.rb, line 27 def push(event) queue.push(event) end
Source
# File lib/pubsubstub/subscription.rb, line 14 def stream(last_event_id) info { "Connecting client ##{id} (#{channels.map(&:name).join(', ')})" } subscribe fetch_scrollback(last_event_id) while event = queue.pop debug { "Sending event ##{event.id} to client ##{id}"} connection << event.to_message end ensure info { "Disconnecting client ##{id}" } unsubscribe end
Private Instance Methods
Source
# File lib/pubsubstub/subscription.rb, line 58 def callback @callback ||= method(:push) end
We use store the callback so that the object_id stays the same. Otherwise we wouldn’t be able to unsubscribe
Source
# File lib/pubsubstub/subscription.rb, line 42 def fetch_scrollback(last_event_id) event_sent = false if last_event_id channels.each do |channel| channel.scrollback(since: last_event_id).each do |event| event_sent = true queue.push(event) end end end queue.push(Pubsubstub.heartbeat_event) unless event_sent end
This method is not ideal as it doesn’t guarantee order in case of multi-channel subscription
Source
# File lib/pubsubstub/subscription.rb, line 33 def subscribe channels.each { |c| Pubsubstub.subscriber.add_event_listener(c.name, callback) } end
Source
# File lib/pubsubstub/subscription.rb, line 37 def unsubscribe channels.each { |c| Pubsubstub.subscriber.remove_event_listener(c.name, callback) } end