class Ably::Realtime::Channel::ChannelManager
ChannelManager
is responsible for all actions relating to channel state: attaching, detaching or failure Channel
state changes are performed by this class and executed from {ChannelStateMachine}
This is a private class and should never be used directly by developers as the API is likely to change in future.
@api private
Attributes
Public Class Methods
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 12 def initialize(channel, connection) @channel = channel @connection = connection end
Public Instance Methods
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 18 def attach if can_transition_to?(:attached) connect_if_connection_initialized send_attach_protocol_message if connection.state?(:connected) # RTL4i end end
Commence attachment
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 35 def attached(attached_protocol_message) # If no attached ProtocolMessage then this attached request was triggered by the client # library, such as returning to attached when detach has failed if attached_protocol_message channel.presence.manager.on_attach attached_protocol_message.has_presence_flag? channel.properties.set_attach_serial(attached_protocol_message.channel_serial) channel.options.set_modes_from_flags(attached_protocol_message.flags) channel.options.set_params(attached_protocol_message.params) end end
Channel
is attached, notify presence if sync is expected
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 26 def detach(error, previous_state) if connection.closed? || connection.connecting? || connection.suspended? channel.transition_state_machine :detached, reason: error elsif can_transition_to?(:detached) send_detach_protocol_message previous_state end end
Commence attachment
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 83 def detached_received(reason) case channel.state.to_sym when :detaching channel.transition_state_machine :detached, reason: reason when :attached, :suspended channel.transition_state_machine :attaching, reason: reason else logger.debug { "ChannelManager: DETACHED ProtocolMessage received, but no action to take as not DETACHING, ATTACHED OR SUSPENDED" } end end
Handle DETACED messages, see RTL13 for server-initated detaches
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 146 def drop_pending_queue_from_ack(ack_protocol_message) message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1 connection.__pending_message_ack_queue__.drop_while do |protocol_message| if protocol_message.message_serial <= message_serial_up_to yield protocol_message true end end end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 60 def duplicate_attached_received(protocol_message) logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" } if protocol_message.error channel.set_channel_error_reason protocol_message.error log_channel_error protocol_message.error end channel.properties.set_attach_serial(protocol_message.channel_serial) channel.options.set_modes_from_flags(protocol_message.flags) unless protocol_message.has_channel_resumed_flag? channel.emit :update, Ably::Models::ChannelStateChange.new( current: channel.state, previous: channel.state, event: Ably::Realtime::Channel::EVENT(:update), reason: protocol_message.error, resumed: false, ) channel.presence.manager.on_attach protocol_message.has_presence_flag? end end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 98 def fail_messages_awaiting_ack(error, options = {}) immediately = options[:immediately] || false fail_proc = lambda do error = Ably::Exceptions::MessageDeliveryFailed.new("Continuity of connection was lost so published messages awaiting ACK have failed") unless error fail_messages_in_queue connection.__pending_message_ack_queue__, error end # Allow a short time for other queued operations to complete before failing all messages if immediately fail_proc.call else EventMachine.add_timer(0.1) { fail_proc.call } end end
When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) then all messages published but awaiting an ACK from Ably
should be failed with a NACK @param [Hash] options @option options [Boolean] :immediately
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 122 def fail_messages_in_queue(queue, error) queue.delete_if do |protocol_message| if protocol_message.action.match_any?(:presence, :message) if protocol_message.channel == channel.name nack_messages protocol_message, error true end end end end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 117 def fail_queued_messages(error) error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error fail_messages_in_queue connection.__outgoing_message_queue__, error end
When a channel becomes suspended or failed, all queued messages should be failed immediately as we don’t queue in any of those states
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 47 def log_channel_error(error) logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" } end
An error has occurred on the channel
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 141 def nack_message(message, error, protocol_message = nil) logger.debug { "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json} #{"protocol message: #{protocol_message}" if protocol_message}" } message.fail error end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 133 def nack_messages(protocol_message, error) (protocol_message.messages + protocol_message.presence).each do |message| nack_message message, error, protocol_message end logger.debug { "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" } protocol_message.fail error end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 171 def notify_state_change @pending_state_change_timer.cancel if @pending_state_change_timer @pending_state_change_timer = nil end
RTL13c
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 53 def request_reattach(reason = nil) channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? send_attach_protocol_message logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } end
Request channel to be reattached by sending an attach protocol message @param [Ably::Models::ErrorInfo] reason
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 158 def start_attach_from_suspended_timer cancel_attach_from_suspended_timer if connection.connected? channel.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } connection.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } @attach_from_suspended_timer = EventMachine::Timer.new(channel_retry_timeout) do channel.transition_state_machine! :attaching end end end
If the connection is still connected and the channel still suspended after channel_retry_timeout
has passed, then attempt to reattach automatically, see RTL13b
Private Instance Methods
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 189 def cancel_attach_from_suspended_timer @attach_from_suspended_timer.cancel if @attach_from_suspended_timer @attach_from_suspended_timer = nil end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 179 def channel @channel end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 203 def channel_retry_timeout connection.defaults.fetch(:channel_retry_timeout) end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 195 def connect_if_connection_initialized connection.connect if connection.initialized? end
If the connection has not previously connected, connect now
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 183 def connection @connection end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 267 def logger connection.logger end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 199 def realtime_request_timeout connection.defaults.fetch(:realtime_request_timeout) end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 207 def send_attach_protocol_message message_options = {} message_options[:params] = channel.options.params if channel.options.params.any? message_options[:flags] = channel.options.modes_to_flags if channel.options.modes if channel.attach_resume message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume] end message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1 state_at_time_of_request = channel.state attach_action = Ably::Models::ProtocolMessage::ACTION.Attach # RTL4f @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do if channel.state == state_at_time_of_request error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)") channel.transition_state_machine :suspended, reason: error # return to suspended state if failed end end # Shouldn't queue attach message as per RTL4i, so message is added top of the queue # to be sent immediately while processing next message connection.send_protocol_message_immediately( action: attach_action.to_i, channel: channel.name, **message_options.to_h ) end
Source
# File lib/ably/realtime/channel/channel_manager.rb, line 235 def send_detach_protocol_message(previous_state) state_at_time_of_request = channel.state detach_action = Ably::Models::ProtocolMessage::ACTION.Detach @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do if channel.state == state_at_time_of_request error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)") channel.transition_state_machine previous_state, reason: error # return to previous state if failed end end on_disconnected_and_connected = lambda do |&block| connection.unsafe_once(:disconnected) do connection.unsafe_once(:connected) do block.call if pending_state_change_timer end if pending_state_change_timer end end send_detach_message = lambda do on_disconnected_and_connected.call do send_detach_message.call end connection.send_protocol_message( action: detach_action.to_i, channel: channel.name ) end send_detach_message.call end