class Deepstream::Client

Attributes

options[R]
state[R]

Public Class Methods

new(url, options = {}) click to toggle source
# File lib/deepstream/client.rb, line 26
def initialize(url, options = {})
  @url = Helpers.url(url)
  @error_handler = ErrorHandler.new(self)
  @record_handler = RecordHandler.new(self)
  @event_handler = EventHandler.new(self)
  @options = Helpers.default_options.merge!(options)
  @message_buffer = []
  @last_hearbeat = nil
  @challenge_denied, @@deliberate_close = false
  @state = CONNECTION_STATE::CLOSED
  @verbose = @options[:verbose]
  @reinitialize_master = @options[:reinitialize_master]
  @log = Async.logger
  @connected_before = false
  connect
end

Public Instance Methods

close() click to toggle source
# File lib/deepstream/client.rb, line 78
def close
  return unless connected?
  @deliberate_close = true
  log.info 'deliberate closing' if @verbose
rescue => e
  on_exception(e)
end
connected?() click to toggle source
# File lib/deepstream/client.rb, line 93
def connected?
  @state != CONNECTION_STATE::CLOSED
end
inspect() click to toggle source
# File lib/deepstream/client.rb, line 105
def inspect
  "#{self.class} #{@url} | connection state: #{@state}"
end
logged_in?() click to toggle source
# File lib/deepstream/client.rb, line 101
def logged_in?
  @state == CONNECTION_STATE::OPEN
end
login(credentials = @options[:credentials]) click to toggle source
# File lib/deepstream/client.rb, line 65
def login(credentials = @options[:credentials])
  @options[:credentials] = credentials
  if @challenge_denied
    on_error("this client's connection was closed")
  elsif @state == CONNECTION_STATE::AUTHENTICATING
    send_message(TOPIC::AUTH, ACTION::REQUEST, @options[:credentials].to_json, priority: true)
  end
  self
rescue => e
  on_exception(e)
  self
end
on_message(data) click to toggle source
# File lib/deepstream/client.rb, line 48
def on_message(data)
  message = Message.new(data)
  @log.info "Receiving msg = #{message.inspect}" if @verbose
  case message.topic
  when TOPIC::AUTH       then authentication_message(message)
  when TOPIC::CONNECTION then connection_message(message)
  when TOPIC::EVENT      then @event_handler.on_message(message)
  when TOPIC::ERROR      then @error_handler.on_error(message)
  when TOPIC::RECORD     then @record_handler.on_message(message)
  when TOPIC::RPC        then raise(UnknownTopic, 'RPC is currently not implemented.')
  when nil               then nil
  else raise(UnknownTopic, message)
  end
rescue => e
  on_exception(e)
end
on_open() click to toggle source
# File lib/deepstream/client.rb, line 43
def on_open
  @log.info "Websocket connection opened" if @verbose
  @state = CONNECTION_STATE::AWAITING_CONNECTION
end
reconnect() click to toggle source
# File lib/deepstream/client.rb, line 86
def reconnect
  return if connected?
  @deliberate_close = false
  @state = CONNECTION_STATE::RECONNECTING
  @log.info 'Reconnecting' if @verbose
end
reconnecting?() click to toggle source
# File lib/deepstream/client.rb, line 97
def reconnecting?
  @state == CONNECTION_STATE::RECONNECTING
end
send_message(*args, **kwargs) click to toggle source
# File lib/deepstream/client.rb, line 109
def send_message(*args, **kwargs)
  message = Message.parse(*args)
  priority = kwargs[:priority] || false
  timeout = message.topic == TOPIC::EVENT ? kwargs[:timeout] : nil
  message.set_timeout(timeout) if timeout
  return unable_to_send_message(message, priority) if !logged_in? && message.needs_authentication?
  priority ? @message_buffer.unshift(message) : @message_buffer.push(message)
rescue Errno::EPIPE
  unable_to_send_message(message, priority)
rescue => e
  on_exception(e)
end

Private Instance Methods

_connect(url = @url) click to toggle source
# File lib/deepstream/client.rb, line 221
def _connect(url = @url)
  @log.info "Trying to connect to #{url}" if @verbose
  endpoint = Async::HTTP::Endpoint.parse(url)
  Async::WebSocket::Client.connect(endpoint) do |connection|
    on_open
    @task.async do
      loop do
        break if ( connection.closed? || @deliberate_close )
        while !@message_buffer.empty? && (logged_in? || !@message_buffer[0].needs_authentication?)
          msg = @message_buffer.shift
          next if msg.expired?
          encoded_msg = msg.to_s.encode(Encoding::UTF_8)
          @log.info "Sending msg = #{msg.inspect}" if @verbose
          connection.write(encoded_msg)
          connection.flush rescue @message_buffer.unshift(msg)
        end
        @task.sleep 0.001
      end
    end

    loop do
      on_message(connection.read)
      break if ( connection.closed? || @deliberate_close )
    end

  rescue => e
    @log.error "Connection error #{e.message}"
    on_exception(e)
  ensure
    connection.close
  end
  on_close
end
authentication_message(message) click to toggle source
# File lib/deepstream/client.rb, line 144
def authentication_message(message)
  case message.action
  when ACTION::ACK   then on_login
  when ACTION::ERROR then on_error(message)
  else raise(UnknownAction, message)
  end
end
check_heartbeat() click to toggle source
# File lib/deepstream/client.rb, line 188
def check_heartbeat
  return unless @last_heartbeat && Time.now - @last_heartbeat > 2 * @options[:heartbeat_interval]
  @state = CONNECTION_STATE::CLOSED
  on_error('Two connections heartbeats missed successively')
end
connect(in_thread = @options[:in_thread]) click to toggle source
# File lib/deepstream/client.rb, line 199
def connect(in_thread = @options[:in_thread])
  if in_thread
    Thread.start { connection_loop }
  else
    connection_loop
  end
end
connection_loop() click to toggle source
# File lib/deepstream/client.rb, line 207
def connection_loop
  Async do |task|
    @task = task
    loop do
      if @deliberate_close
        sleep 5
        next
      end
      _connect
      sleep 5
    end
  end
end
connection_message(message) click to toggle source
# File lib/deepstream/client.rb, line 132
def connection_message(message)
  case message.action
  when ACTION::ACK       then on_connection_ack
  when ACTION::CHALLENGE then on_challenge
  when ACTION::ERROR     then on_error(message)
  when ACTION::PING      then on_ping
  when ACTION::REDIRECT  then on_redirection(message)
  when ACTION::REJECTION then on_rejection
  else raise(UnknownAction, message)
  end
end
on_challenge() click to toggle source
# File lib/deepstream/client.rb, line 152
def on_challenge
  @state = CONNECTION_STATE::CHALLENGING
  send_message(TOPIC::CONNECTION, ACTION::CHALLENGE_RESPONSE, @url)
end
on_close() click to toggle source
# File lib/deepstream/client.rb, line 181
def on_close
  @log.info 'Websocket connection closed' if @verbose
  @state = CONNECTION_STATE::CLOSED
rescue => e
  on_exception(e)
end
on_connection_ack() click to toggle source
# File lib/deepstream/client.rb, line 157
def on_connection_ack
  @state = CONNECTION_STATE::AUTHENTICATING
  @message_buffer.delete_if { |msg| msg.action == ACTION::PATCH }
  @record_handler.reinitialize if @connected_before
  login
end
on_login() click to toggle source
# File lib/deepstream/client.rb, line 169
def on_login
  @connected_before = true
  @state = CONNECTION_STATE::OPEN
  every(@options[:heartbeat_interval]) { check_heartbeat } if @options[:heartbeat_interval]
  resubscribe
end
on_ping() click to toggle source
# File lib/deepstream/client.rb, line 164
def on_ping
  @last_heartbeat = Time.now
  send_message(TOPIC::CONNECTION, ACTION::PONG)
end
on_redirection(message) click to toggle source
# File lib/deepstream/client.rb, line 194
def on_redirection(message)
  on_close
  @url = message.data.last
end
on_rejection() click to toggle source
# File lib/deepstream/client.rb, line 176
def on_rejection
  @challenge_denied = true
  on_close
end
unable_to_send_message(message, priority) click to toggle source
# File lib/deepstream/client.rb, line 124
def unable_to_send_message(message, priority)
  @state = CONNECTION_STATE::CLOSED if logged_in?
  unless message.expired?
   @log.info("Placing a message #{message.inspect} in the buffer, waiting for authentication") if @verbose
   priority ? @message_buffer.unshift(message) : @message_buffer.push(message)
  end
end