class Ably::Realtime::Connection::WebsocketTransport
EventMachine WebSocket transport @api private
Constants
- STATE
Valid WebSocket connection states
Attributes
Public Class Methods
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 23 def initialize(connection, url) @connection = connection @state = STATE.Initialized @url = url @host = URI.parse(url).hostname setup_event_handlers end
Public Instance Methods
@return [Ably::Util::PubSub] Websocket Transport internal incoming protocol message bus
@api private
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 141 def __incoming_protocol_msgbus__ @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus end
@return [Ably::Util::PubSub] Websocket Transport internal outgoing protocol message bus
@api private
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 148 def __outgoing_protocol_msgbus__ @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 126 def certificate_store @certificate_store ||= OpenSSL::X509::Store.new.tap do |store| store.set_default_paths end end
Remote TCP connection attempt completes successfully Required {www.rubydoc.info/github/eventmachine/eventmachine/EventMachine/Connection EventMachine::Connection} interface
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 57 def connection_completed change_state STATE.Connected start_tls(tls_opts) if client.use_tls? driver.start end
Disconnect the socket transport connection and write all pending text. If Disconnected state is not automatically emitted, it will be emitted automatically
@return [void]
@api public
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 38 def disconnect close_connection_after_writing change_state STATE.Disconnecting create_timer(2) do # if connection is not disconnected within 2s, set state as disconnected change_state STATE.Disconnected unless disconnected? end end
Network connection has been established Required {www.rubydoc.info/github/eventmachine/eventmachine/EventMachine/Connection EventMachine::Connection} interface
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 49 def post_init clear_timer change_state STATE.Connecting setup_driver end
True if socket connection is ready to be released i.e. it is not currently connecting or connected
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 134 def ready_for_release? !connecting? && !connected? end
Called by the event loop whenever data has been received by the network connection. Simply pass onto the WebSocket driver to process and determine content boundaries. Required {www.rubydoc.info/github/eventmachine/eventmachine/EventMachine/Connection EventMachine::Connection} interface
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 66 def receive_data(data) driver.parse(data) end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 117 def ssl_handshake_completed unless OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, host) disconnect_with_reason "Websocket host '#{host}' returned an invalid TLS certificate" false else true end end
TLS verification support, original implementation by Mislav Marohnić:
github.com/lostisland/faraday/commit/63cf47c95b573539f047c729bd9ad67560bc83ff
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 90 def ssl_verify_peer(cert_string) cert = nil begin cert = OpenSSL::X509::Certificate.new(cert_string) rescue OpenSSL::X509::CertificateError => e disconnect_with_reason "Websocket host '#{host}' returned an invalid TLS certificate: #{e.message}" return false end @last_seen_cert = cert if certificate_store.verify(@last_seen_cert) begin certificate_store.add_cert(@last_seen_cert) rescue OpenSSL::X509::StoreError => e unless e.message == 'cert already in hash table' disconnect_with_reason "Websocket host '#{host}' returned an invalid TLS certificate: #{e.message}" return false end end true else disconnect_with_reason "Websocket host '#{host}' returned an invalid TLS certificate" false end end
Called whenever a connection (either a server or client connection) is closed Required {www.rubydoc.info/github/eventmachine/eventmachine/EventMachine/Connection EventMachine::Connection} interface
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 72 def unbind change_state STATE.Disconnected, reason_closed || 'Websocket connection closed unexpectedly' end
URL end point including initialization configuration {www.rubydoc.info/gems/websocket-driver/0.3.5/WebSocket/Driver WebSocket::Driver} interface
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 78 def url @url end
{www.rubydoc.info/gems/websocket-driver/0.3.5/WebSocket/Driver WebSocket::Driver} interface
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 83 def write(data) send_data(data) end
Private Instance Methods
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 191 def clear_timer if defined?(@timer) && @timer @timer.cancel @timer = nil end end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 246 def client connection.client end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 157 def connection @connection end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 267 def create_pub_sub_message_bus Ably::Util::PubSub.new( coerce_into: lambda do |event| raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message :protocol_message end ) end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 198 def create_timer(period) @timer = EventMachine::Timer.new(period) do yield end end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 161 def disconnect_with_reason(reason) client.logger.error { "WebsocketTransport: Disconnecting due to error: #{reason}" } @reason_closed = reason disconnect end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 153 def driver @driver end
Used to log transport messages
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 251 def logger connection.logger end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 255 def parse_event_data(data) case client.protocol when :json JSON.parse(data) when :msgpack MessagePack.unpack(data.pack('C*')) else client.logger.fatal { "WebsocketTransport: Unsupported Protocol Message format #{client.protocol}" } data end end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 167 def reason_closed @reason_closed end
Send object down the WebSocket driver connection as a serialized string/byte array based on protocol @param [Object] object to serialize and send to the WebSocket driver
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 173 def send_object(object) case client.protocol when :json driver.text(object.to_json) when :msgpack driver.binary(object.to_msgpack.unpack('C*')) else client.logger.fatal { "WebsocketTransport: Unsupported protocol '#{client.protocol}' for serialization, object cannot be serialized and sent to Ably over this WebSocket" } end end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 204 def setup_driver @driver = WebSocket::Driver.client(self) driver.on("open") do logger.debug { "WebsocketTransport: socket opened to #{url}, waiting for Connected protocol message" } end driver.on("message") do |event| event_data = parse_event_data(event.data).freeze protocol_message = Ably::Models::ProtocolMessage.new(event_data, logger: logger) action_name = Ably::Models::ProtocolMessage::ACTION[event_data['action']] rescue event_data['action'] logger.debug { "WebsocketTransport: Prot msg recv <=: #{action_name} - #{event_data}" } if protocol_message.invalid? error = Ably::Exceptions::ProtocolError.new("Invalid Protocol Message received: #{event_data}\nConnection moving to the failed state as the protocol is invalid and unsupported", 400, Ably::Exceptions::Codes::PROTOCOL_ERROR) logger.fatal { "WebsocketTransport: #{error.message}" } failed_protocol_message = Ably::Models::ProtocolMessage.new( action: Ably::Models::ProtocolMessage::ACTION.Error, error: error.as_json, logger: logger ) __incoming_protocol_msgbus__.publish :protocol_message, failed_protocol_message else __incoming_protocol_msgbus__.publish :protocol_message, protocol_message end end driver.on("ping") do __incoming_protocol_msgbus__.publish :protocol_message, Ably::Models::ProtocolMessage.new(action: Ably::Models::ProtocolMessage::ACTION.Heartbeat, source: :websocket) end driver.on("error") do |error| logger.error { "WebsocketTransport: Protocol Error on transports - #{error.message}" } end @reason_closed = nil driver.on("closed") do |event| @reason_closed = "#{event.code}: #{event.reason}" logger.warn { "WebsocketTransport: Driver reported transport as closed - #{reason_closed}" } end end
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 184 def setup_event_handlers __outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| send_object protocol_message client.logger.debug { "WebsocketTransport: Prot msg sent =>: #{protocol_message.action} #{protocol_message}" } end end
TLS options to pass to EventMachine::Connection#start_tls
See www.rubydoc.info/github/eventmachine/eventmachine/EventMachine/Connection#start_tls-instance_method
# File lib/submodules/ably-ruby/lib/ably/realtime/connection/websocket_transport.rb, line 279 def tls_opts { sni_hostname: host, verify_peer: true, } end