class Ably::Realtime::Channel::ChannelManager

ChannelManager is responsible for all actions relating to channel state: attaching, detaching or failure Channel state changes are performed by this class and executed from {ChannelStateMachine}

This is a private class and should never be used directly by developers as the API is likely to change in future.

@api private

Attributes

pending_state_change_timer[R]

Public Class Methods

new(channel, connection) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 12
def initialize(channel, connection)
  @channel    = channel
  @connection = connection
end

Public Instance Methods

attach() click to toggle source

Commence attachment

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 18
def attach
  if can_transition_to?(:attached)
    connect_if_connection_initialized
    send_attach_protocol_message if connection.state?(:connected) # RTL4i
  end
end
attached(attached_protocol_message) click to toggle source

Channel is attached, notify presence if sync is expected

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 35
def attached(attached_protocol_message)
  # If no attached ProtocolMessage then this attached request was triggered by the client
  # library, such as returning to attached when detach has failed
  if attached_protocol_message
    channel.presence.manager.on_attach attached_protocol_message.has_presence_flag?
    channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
    channel.options.set_modes_from_flags(attached_protocol_message.flags)
    channel.options.set_params(attached_protocol_message.params)
  end
end
detach(error, previous_state) click to toggle source

Commence attachment

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 26
def detach(error, previous_state)
  if connection.closed? || connection.connecting? || connection.suspended?
    channel.transition_state_machine :detached, reason: error
  elsif can_transition_to?(:detached)
    send_detach_protocol_message previous_state
  end
end
detached_received(reason) click to toggle source

Handle DETACED messages, see RTL13 for server-initated detaches

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 83
def detached_received(reason)
  case channel.state.to_sym
  when :detaching
    channel.transition_state_machine :detached, reason: reason
  when :attached, :suspended
    channel.transition_state_machine :attaching, reason: reason
  else
    logger.debug { "ChannelManager: DETACHED ProtocolMessage received, but no action to take as not DETACHING, ATTACHED OR SUSPENDED" }
  end
end
drop_pending_queue_from_ack(ack_protocol_message) { |protocol_message| ... } click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 146
def drop_pending_queue_from_ack(ack_protocol_message)
  message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
  connection.__pending_message_ack_queue__.drop_while do |protocol_message|
    if protocol_message.message_serial <= message_serial_up_to
      yield protocol_message
      true
    end
  end
end
duplicate_attached_received(protocol_message) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 60
def duplicate_attached_received(protocol_message)
  logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" }
  if protocol_message.error
    channel.set_channel_error_reason protocol_message.error
    log_channel_error protocol_message.error
  end

  channel.properties.set_attach_serial(protocol_message.channel_serial)
  channel.options.set_modes_from_flags(protocol_message.flags)

  unless protocol_message.has_channel_resumed_flag?
    channel.emit :update, Ably::Models::ChannelStateChange.new(
      current: channel.state,
      previous: channel.state,
      event: Ably::Realtime::Channel::EVENT(:update),
      reason: protocol_message.error,
      resumed: false,
    )
    channel.presence.manager.on_attach protocol_message.has_presence_flag?
  end
end
fail_messages_awaiting_ack(error, options = {}) click to toggle source

When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) then all messages published but awaiting an ACK from Ably should be failed with a NACK @param [Hash] options @option options [Boolean] :immediately

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 98
def fail_messages_awaiting_ack(error, options = {})
  immediately = options[:immediately] || false

  fail_proc = lambda do
    error = Ably::Exceptions::MessageDeliveryFailed.new("Continuity of connection was lost so published messages awaiting ACK have failed") unless error
    fail_messages_in_queue connection.__pending_message_ack_queue__, error
  end

  # Allow a short time for other queued operations to complete before failing all messages
  if immediately
    fail_proc.call
  else
    EventMachine.add_timer(0.1) { fail_proc.call }
  end
end
fail_messages_in_queue(queue, error) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 122
def fail_messages_in_queue(queue, error)
  queue.delete_if do |protocol_message|
    if protocol_message.action.match_any?(:presence, :message)
      if protocol_message.channel == channel.name
        nack_messages protocol_message, error
        true
      end
    end
  end
end
fail_queued_messages(error) click to toggle source

When a channel becomes suspended or failed, all queued messages should be failed immediately as we don’t queue in any of those states

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 117
def fail_queued_messages(error)
  error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error
  fail_messages_in_queue connection.__outgoing_message_queue__, error
end
log_channel_error(error) click to toggle source

An error has occurred on the channel

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 47
def log_channel_error(error)
  logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" }
end
nack_message(message, error, protocol_message = nil) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 141
def nack_message(message, error, protocol_message = nil)
  logger.debug { "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json} #{"protocol message: #{protocol_message}" if protocol_message}" }
  message.fail error
end
nack_messages(protocol_message, error) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 133
def nack_messages(protocol_message, error)
  (protocol_message.messages + protocol_message.presence).each do |message|
    nack_message message, error, protocol_message
  end
  logger.debug { "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" }
  protocol_message.fail error
end
notify_state_change() click to toggle source

RTL13c

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 171
def notify_state_change
  @pending_state_change_timer.cancel if @pending_state_change_timer
  @pending_state_change_timer = nil
end
request_reattach(reason = nil) click to toggle source

Request channel to be reattached by sending an attach protocol message @param [Ably::Models::ErrorInfo] reason

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 53
def request_reattach(reason = nil)
  channel.set_channel_error_reason(reason) if reason
  channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
  send_attach_protocol_message
  logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
end
start_attach_from_suspended_timer() click to toggle source

If the connection is still connected and the channel still suspended after channel_retry_timeout has passed, then attempt to reattach automatically, see RTL13b

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 158
def start_attach_from_suspended_timer
  cancel_attach_from_suspended_timer
  if connection.connected?
    channel.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update }
    connection.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update }

    @attach_from_suspended_timer = EventMachine::Timer.new(channel_retry_timeout) do
      channel.transition_state_machine! :attaching
    end
  end
end

Private Instance Methods

cancel_attach_from_suspended_timer() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 189
def cancel_attach_from_suspended_timer
  @attach_from_suspended_timer.cancel if @attach_from_suspended_timer
  @attach_from_suspended_timer = nil
end
channel() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 179
def channel
  @channel
end
channel_retry_timeout() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 203
def channel_retry_timeout
  connection.defaults.fetch(:channel_retry_timeout)
end
connect_if_connection_initialized() click to toggle source

If the connection has not previously connected, connect now

# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 195
def connect_if_connection_initialized
  connection.connect if connection.initialized?
end
connection() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 183
def connection
  @connection
end
logger() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 267
def logger
  connection.logger
end
realtime_request_timeout() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 199
def realtime_request_timeout
  connection.defaults.fetch(:realtime_request_timeout)
end
send_attach_protocol_message() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 207
def send_attach_protocol_message
  message_options = {}
  message_options[:params] = channel.options.params if channel.options.params.any?
  message_options[:flags] = channel.options.modes_to_flags if channel.options.modes
  if channel.attach_resume
    message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume]
  end

  message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1

  state_at_time_of_request = channel.state
  attach_action = Ably::Models::ProtocolMessage::ACTION.Attach
  # RTL4f
  @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
    if channel.state == state_at_time_of_request
      error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)")
      channel.transition_state_machine :suspended, reason: error # return to suspended state if failed
    end
  end
  # Shouldn't queue attach message as per RTL4i, so message is added top of the queue
  # to be sent immediately while processing next message
  connection.send_protocol_message_immediately(
    action:  attach_action.to_i,
    channel: channel.name,
    **message_options.to_h
  )
end
send_detach_protocol_message(previous_state) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb, line 235
def send_detach_protocol_message(previous_state)
  state_at_time_of_request = channel.state
  detach_action = Ably::Models::ProtocolMessage::ACTION.Detach

  @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
    if channel.state == state_at_time_of_request
      error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)")
      channel.transition_state_machine previous_state, reason: error # return to previous state if failed
    end
  end

  on_disconnected_and_connected = lambda do |&block|
    connection.unsafe_once(:disconnected) do
      connection.unsafe_once(:connected) do
        block.call if pending_state_change_timer
      end if pending_state_change_timer
    end
  end

  send_detach_message = lambda do
    on_disconnected_and_connected.call do
      send_detach_message.call
    end
    connection.send_protocol_message(
      action:  detach_action.to_i,
      channel: channel.name
    )
  end

  send_detach_message.call
end