class Ably::Realtime::Presence::MembersMap
A class encapsulating a map of the members of this presence channel, indexed by the unique {Ably::Models::PresenceMessage#member_key}
This map synchronises the membership of the presence set by handling SYNC messages from the service. Since sync messages can be out-of-order - e.g. a PRESENT sync event being received after that member has in fact left - this map keeps “witness” entries, with ABSENT Action, to remember the fact that a LEAVE event has been seen for a member. These entries are cleared once the last set of updates of a sync sequence have been received.
@api private
Constants
- STATE
Attributes
Public Class Methods
Source
# File lib/ably/realtime/presence/members_map.rb, line 31 def initialize(presence) @presence = presence @state = STATE(:initialized) # Two sets of members maintained # @members contains all members present on the channel # @local_members contains only this connection's members for the purpose of re-entering the member if channel continuity is lost reset_members reset_local_members @absent_member_cleanup_queue = [] # Each SYNC session has a unique ID so that following SYNC # any members present in the map without this session ID are # not present according to Ably, see #RTP19 @sync_session_id = -1 setup_event_handlers end
Public Instance Methods
Source
# File lib/ably/realtime/presence/members_map.rb, line 140 def each(&block) return to_enum(:each) unless block_given? present_members.each(&block) end
Method to allow {MembersMap} to be {ruby-doc.org/core-2.1.3/Enumerable.html Enumerable} @note this method will not wait for the sync operation to complete so may return an incomplete set of members. Use {MembersMap#get} instead.
Source
# File lib/ably/realtime/presence/members_map.rb, line 155 def enter_local_members local_members.values.each do |member| logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{member.client_id} with data: #{member.data}" } presence.enter_client_with_id(member.id, member.client_id, member.data).tap do |deferrable| deferrable.errback do |error| re_enter_error = Ably::Models::ErrorInfo.new( message: "unable to automatically re-enter presence channel for client_id '#{member.client_id}'. Source error code #{error.code} and message '#{error.message}'", code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL ) channel.emit :update, Ably::Models::ChannelStateChange.new( current: channel.state, previous: channel.state, event: Ably::Realtime::Channel::EVENT(:update), reason: re_enter_error, resumed: true ) end end end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 83 def get(options = {}, &block) wait_for_sync = options.fetch(:wait_for_sync, true) deferrable = Ably::Util::SafeDeferrable.new(logger) result_block = lambda do present_members.tap do |members| members.keep_if { |member| member.connection_id == options[:connection_id] } if options[:connection_id] members.keep_if { |member| member.client_id == options[:client_id] } if options[:client_id] end.tap do |members| safe_yield block, members if block_given? deferrable.succeed members end end if !wait_for_sync || sync_complete? result_block.call else # Must be defined before subsequent procs reference this callback reset_callbacks = nil sync_complete_callback = lambda do reset_callbacks.call if reset_callbacks result_block.call end sync_failed_callback = lambda do |error| reset_callbacks.call if reset_callbacks deferrable.fail error end reset_callbacks = lambda do off(&sync_complete_callback) off(&sync_failed_callback) channel.off(&sync_failed_callback) end unsafe_once(:sync_complete, &sync_complete_callback) unsafe_once(:failed, &sync_failed_callback) channel.unsafe_once(:detaching, :detached, :failed) do |error_reason| sync_failed_callback.call error_reason end end deferrable end
Get the list of presence members
@param [Hash,String] options an options Hash to filter members @option options [String] :client_id optional client_id filter for the member @option options [String] :connection_id optional connection_id filter for the member @option options [String] :wait_for_sync defaults to true, if true the get method waits for the initial presence sync following channel attachment to complete before returning the members present, else it immediately returns the members present currently
@yield [Array<Ably::Models::PresenceMessage>] array of present members
@return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
Source
# File lib/ably/realtime/presence/members_map.rb, line 132 def length present_members.length end
@!attribute [r] length @return [Integer] number of present members known at this point in time, will not wait for sync operation to complete
Source
# File lib/ably/realtime/presence/members_map.rb, line 151 def local_members @local_members end
A copy of the local members present i.e. members entered from this connection and thus the responsibility of this library to re-enter on the channel automatically if the channel loses continuity
@return [Hash<String, PresenceMessage>] @api private
Source
# File lib/ably/realtime/presence/members_map.rb, line 68 def sync_serial_cursor_at_end? sync_serial.nil? || sync_serial.to_s.match(/^[\w-]+:?$/) end
When channel serial in ProtocolMessage SYNC is nil or an empty cursor appears after the ‘:’ such as ‘cf30e75054887:psl_7g:client:189’. That is an indication that there are no more SYNC messages.
@api private
Source
# File lib/ably/realtime/presence/members_map.rb, line 58 def update_sync_serial(serial) @sync_serial = serial end
Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed. If the serial is nil, or the part after the first : is empty, then the SYNC is complete
@return [void]
@api private
Private Instance Methods
Source
# File lib/ably/realtime/presence/members_map.rb, line 191 def absent_member_cleanup_queue @absent_member_cleanup_queue end
Source
# File lib/ably/realtime/presence/members_map.rb, line 368 def absent_members members.reject do |key, presence| presence.fetch(:present) end.map do |key, presence| presence.fetch(:message) end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 317 def add_presence_member(presence_message) logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' for event '#{presence_message.action}' #{members.has_key?(presence_message.member_key) ? 'updated' : 'added'}.\n#{presence_message.to_json}" } # Mutate the PresenceMessage so that the action is :present, see #RTP2d present_presence_message = presence_message.shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Present) member_set_upsert present_presence_message, true presence.emit_message presence_message.action, presence_message end
Source
# File lib/ably/realtime/presence/members_map.rb, line 203 def channel presence.channel end
Source
# File lib/ably/realtime/presence/members_map.rb, line 376 def clean_up_absent_members while member_to_remove = absent_member_cleanup_queue.shift logger.debug { "#{self.class.name}: Cleaning up absent member '#{member_to_remove.member_key}' after SYNC.\n#{member_to_remove.to_json}" } member_set_delete member_to_remove end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 383 def clean_up_members_not_present_after_sync members.select do |member_key, member| member.fetch(:sync_session_id) != sync_session_id end.each do |member_key, member| presence_message = member.fetch(:message).shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Leave, id: nil) logger.debug { "#{self.class.name}: Fabricating a LEAVE event for member '#{presence_message.member_key}' was not present in recently completed SYNC session ID '#{sync_session_id}'.\n#{presence_message.to_json}" } member_set_delete member.fetch(:message) presence.emit_message Ably::Models::PresenceMessage::ACTION.Leave, presence_message end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 207 def client channel.client end
Source
# File lib/ably/realtime/presence/members_map.rb, line 215 def connection client.connection end
Source
# File lib/ably/realtime/presence/members_map.rb, line 269 def ensure_presence_message_is_valid(presence_message) return true if presence_message.connection_id error = Ably::Exceptions::ProtocolError.new("Protocol error, presence message is missing connectionId", 400, Ably::Exceptions::Codes::PROTOCOL_ERROR) logger.error { "PresenceMap: On channel '#{channel.name}' error: #{error}" } end
Source
# File lib/ably/realtime/presence/members_map.rb, line 211 def logger client.logger end
Source
# File lib/ably/realtime/presence/members_map.rb, line 352 def member_set_delete(presence_message) members.delete presence_message.member_key if sync_complete? and presence_message.connection_id == connection.id local_members.delete presence_message.client_id logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" } end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 344 def member_set_upsert(presence_message, present) members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id } if presence_message.connection_id == connection.id local_members[presence_message.client_id] = presence_message logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" } end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 179 def members @members end
Source
# File lib/ably/realtime/presence/members_map.rb, line 187 def presence @presence end
Source
# File lib/ably/realtime/presence/members_map.rb, line 360 def present_members members.select do |key, presence| presence.fetch(:present) end.map do |key, presence| presence.fetch(:message) end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 325 def remove_presence_member(presence_message) logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" } if sync_complete? member_set_delete presence_message else member_set_upsert presence_message, false absent_member_cleanup_queue << presence_message end presence.emit_message presence_message.action, presence_message end
Source
# File lib/ably/realtime/presence/members_map.rb, line 199 def reset_local_members @local_members = Hash.new end
Source
# File lib/ably/realtime/presence/members_map.rb, line 195 def reset_members @members = Hash.new end
Source
# File lib/ably/realtime/presence/members_map.rb, line 219 def setup_event_handlers presence.__incoming_msgbus__.subscribe(:presence, :sync) do |presence_message| presence_message.decode(client.encoders, channel.options) do |encode_error, error_message| client.logger.error error_message end update_members_and_emit_events presence_message end # RTP5a channel.unsafe_on(:failed, :detached) do reset_members reset_local_members end unsafe_on(:sync_starting) do @sync_session_id += 1 end unsafe_on(:sync_none) do @sync_session_id += 1 # Immediately change to finalizing which will result in all members being cleaned up change_state :finalizing_sync end unsafe_on(:finalizing_sync) do clean_up_absent_members clean_up_members_not_present_after_sync change_state :sync_complete end end
Source
# File lib/ably/realtime/presence/members_map.rb, line 284 def should_update_member?(new_message) if members[new_message.member_key] existing_message = members[new_message.member_key].fetch(:message) # If both are messages published by clients (not fabricated), use the ID to determine newness, see #RTP2b2 if new_message.id.start_with?(new_message.connection_id) && existing_message.id.start_with?(existing_message.connection_id) new_message_parts = new_message.id.match(/(\d+):(\d+)$/) existing_message_parts = existing_message.id.match(/(\d+):(\d+)$/) if !new_message_parts || !existing_message_parts logger.fatal { "#{self.class.name}: Message IDs for new message #{new_message.id} or old message #{existing_message.id} are invalid. \nNew message: #{new_message.to_json}" } return existing_message.timestamp < new_message.timestamp end # ID is in the format "connid:msgSerial:index" such as "aaaaaa:0:0" # if msgSerial is greater then the new_message should update the member # if msgSerial is equal and index is greater, then update the member if new_message_parts[1].to_i > existing_message_parts[1].to_i # msgSerial true elsif new_message_parts[1].to_i == existing_message_parts[1].to_i # msgSerial equal new_message_parts[2].to_i > existing_message_parts[2].to_i # compare index else false end else # This message is fabricated or could not be validated so rely on timestamps, see #RTP2b1 new_message.timestamp > existing_message.timestamp end else true end end
If the message received is older than the last known event for presence then skip (return false). This can occur during a SYNC operation. For example:
- SYNC starts - LEAVE event received for clientId 5 - SYNC present even received for clientId 5 with a timestamp before LEAVE event because the LEAVE occured before the SYNC operation completed
@return [Boolean] true when new_message
is newer than the existing member in the PresenceMap
Source
# File lib/ably/realtime/presence/members_map.rb, line 183 def sync_serial @sync_serial end
Source
# File lib/ably/realtime/presence/members_map.rb, line 340 def touch_presence_member(presence_message) members.fetch(presence_message.member_key)[:sync_session_id] = sync_session_id end
No update is necessary for this member as older / no change during update however we need to update the sync_session_id
so that this member is not removed following SYNC
Source
# File lib/ably/realtime/presence/members_map.rb, line 250 def update_members_and_emit_events(presence_message) return unless ensure_presence_message_is_valid(presence_message) unless should_update_member?(presence_message) logger.debug { "#{self.class.name}: Skipped presence member #{presence_message.action} on channel #{presence.channel.name}.\n#{presence_message.to_json}" } touch_presence_member presence_message return end case presence_message.action when Ably::Models::PresenceMessage::ACTION.Enter, Ably::Models::PresenceMessage::ACTION.Update, Ably::Models::PresenceMessage::ACTION.Present add_presence_member presence_message when Ably::Models::PresenceMessage::ACTION.Leave remove_presence_member presence_message else Ably::Exceptions::ProtocolError.new("Protocol error, unknown presence action #{presence_message.action}", 400, Ably::Exceptions::Codes::PROTOCOL_ERROR) end end