class Ducts::Client

Constants

DuctEventHandlerReturnValue
VERSION

Attributes

EVENT[R]
WSD[R]
catchall_event_handler[RW]
connection_listener[R]
event_error_handler[RW]
uncaught_event_handler[RW]

Public Class Methods

new() click to toggle source
# File lib/ducts/client.rb, line 112
def initialize
  @WSD = nil
  @EVENT = nil
  @connection_listener = ConnectionEventListener.new
  @catchall_event_handler = lambda{ |rid, eid, data| }
  @uncaught_event_handler = lambda{ |rid, eid, data| }
  @event_error_handler = lambda{ |rid, eid, data, error|
    warn Message.construct(Message::Level::ERROR, error.message)
  }

  @_last_rid = nil
  @_ws = nil
  @_waiting_message_completion = Hash.new
  @_waiting_closed_completion = Array.new
  @_loop_queues = Hash.new
  @_divided_buffers = Hash.new
end

Public Instance Methods

close() click to toggle source
# File lib/ducts/client.rb, line 260
def close
  completion = EM::Completion.new
  if @_ws
    @_ws.close 
    @_waiting_closed_completion.push(completion)
  else
    completion.succeed(nil)
  end
  completion
end
event_call(eid, data) click to toggle source
# File lib/ducts/client.rb, line 253
def event_call(eid, data)
  rid = next_rid
  event_send(rid, eid, data)
  warn Message.construct(Message::Level::WARNING, 'request ID is duplicated.') if @_waiting_message_completion.keys.include? rid
  EM::Completion.new.tap { |completion| @_waiting_message_completion[rid] = completion }
end
event_send(rid, eid, data) click to toggle source
# File lib/ducts/client.rb, line 247
def event_send(rid, eid, data)
  msgpack = [ rid, eid, data ].to_msgpack
  @_ws.send(msgpack.chars.map(&:ord))
  rid
end
next_rid() click to toggle source
# File lib/ducts/client.rb, line 130
def next_rid
  next_id = Time.now.to_i
  if (!@_last_rid.nil? && (next_id <= @_last_rid))
    next_id = @_last_rid + 1
  end
  @_last_rid = next_id
  next_id
end
open(wsd_url, uuid = nil, **params) click to toggle source
# File lib/ducts/client.rb, line 188
def open(wsd_url, uuid = nil, **params)
  completion = EM::Completion.new
  if @_ws
    completion.succeed(self)
    return completion;
  end
  begin
    query = '?uuid=' + (uuid || 'dummy') + params.map{ |k, v| "&#{k}=#{v}" }.join
    uri = URI.parse(wsd_url + query)
    https = Net::HTTP.new(uri.host, Net::HTTP.https_default_port).tap{ |h|
      h.ssl_version = :SSLv23
      h.use_ssl = true
      h.verify_mode = OpenSSL::SSL::VERIFY_PEER
    }
    response = https.get(uri)
    @WSD = JSON.parse(response.body)
    @EVENT = @WSD['EVENT']
    Faye::WebSocket::Client.new(@WSD['websocket_url']).tap do |ws|
      ws.on(:open) do |event|
        @_ws = ws
        completion.succeed(self)
        connection_event = DuctConnectionEvent.new('onopen', event)
        _onopen(connection_event)
        @connection_listener.onopen(connection_event)
      end
      ws.on(:message) do |event|
        message_event = DuctMessageEvent.new(event)
        connection_event = DuctConnectionEvent.new('onmessage', message_event)
        _onmessage(connection_event)
        @connection_listener.onmessage(connection_event)
      end
      ws.on(:error) do |event|
        connection_event = DuctConnectionEvent.new('onerror', event)
        @connection_listener.onerror(connection_event)
      end
      ws.on(:close) do |event|
        @_ws = nil
        case event.code
        when 1000
          @_waiting_closed_completion.each do |waiting_closed_completion|
            waiting_closed_completion.succeed(event)
          end
        else
          p event.code
          @_waiting_closed_completion.each do |waiting_closed_completion|
            waiting_closed_completion.fail(event)
          end
        end
        @_waiting_closed_completion.clear
        connection_event = DuctConnectionEvent.new('onclose', event)
        @connection_listener.onclose(connection_event)
      end
    end
  rescue => error
    @connection_listener.onerror(DuctConnectionEvent.new('onerror', error))
  end
  completion
end
reconnect() click to toggle source
# File lib/ducts/client.rb, line 156
def reconnect
  completion = EM::Completion.new
  if @_ws
    completion.succeed(self)
    return completion;
  end
  Faye::WebSocket::Client.new(@WSD['websocket_url_reconnect']).tap do |ws|
    ws.on(:open) do |event|
      @_ws = ws
      completion.succeed(self)
      connection_event = DuctConnectionEvent.new('onopen', event)
      _onreconnect(connection_event)
      @connection_listener.onopen(connection_event)
    end
    ws.on(:close) do |event|
      connection_event = DuctConnectionEvent.new('onclose', event)
      @connection_listener.onclose(connection_event)
    end
    ws.on(:message) do |event|
      message_event = DuctMessageEvent.new(event)
      connection_event = DuctConnectionEvent.new('onmessage', message_event)
      _onmessage(connection_event)
      @connection_listener.onmessage(connection_event)
    end
    ws.on(:error) do |event|
      connection_event = DuctConnectionEvent.new('onerror', event)
      @connection_listener.onerror(connection_event)
    end
  end
  completion
end
set_event_handler(event_id, &handler) click to toggle source
# File lib/ducts/client.rb, line 139
def set_event_handler(event_id, &handler)
  @_event_handler ||= {}
  if handler
    @_event_handler.store(event_id, handler)
  else
    @_event_handler.delete(event_id)
  end
end
state() click to toggle source
# File lib/ducts/client.rb, line 148
def state
  if @_ws
    State.get_state_from_value(@_ws.ready_state)
  else
    State::CLOSE
  end
end

Private Instance Methods

_alive_monitoring_handler(rid, eid, data) click to toggle source
# File lib/ducts/client.rb, line 323
def _alive_monitoring_handler(rid, eid, data)
  client_received = Time.now.to_f
  server_sent, server_received = data
  client_sent = @_send_timestamp
  new_offset = ((server_received - client_sent) - (client_received - server_sent)) / 2
  new_latency = ((client_received - client_sent) - (server_sent - server_received)) / 2
  @_time_offset = (@_time_offset * @_time_count + new_offset) / (@_time_count + 1)
  @_time_latency = (@_time_latency * @_time_count + new_latency) / (@_time_count + 1)
  @_time_count += 1
end
_divided_response_append_handler(rid, eid, data) click to toggle source
# File lib/ducts/client.rb, line 363
def _divided_response_append_handler(rid, eid, data)
  @_divided_buffers[rid] ||= StringIO.new('', 'wb').set_encoding(Encoding::ASCII_8BIT)
  divided_buffer = @_divided_buffers[rid]
  divided_buffer.write(data)
  DuctEventHandlerReturnValue.new(nil, eid, nil)
end
_divided_response_end_handler(rid, eid, data) click to toggle source
# File lib/ducts/client.rb, line 370
def _divided_response_end_handler(rid, eid, data)
  divided_buffer = @_divided_buffers.delete(rid)
  divided_buffer.write(data)
  source_rid, source_eid, source_data = MessagePack.unpack(divided_buffer.string)
  @catchall_event_handler.call(source_rid, source_eid, source_data)
  handle = @_event_handler[source_eid]
  handle ||= @uncaught_event_handler
  handle.call(source_rid, source_eid, source_data)
  DuctEventHandlerReturnValue.new(source_rid, source_eid, source_data)
end
_loop_response_end_handler(rid, eid, data) click to toggle source
# File lib/ducts/client.rb, line 349
def _loop_response_end_handler(rid, eid, data)
  warn Message.construct(Message::Level::WARNING, 'loop response end has error.') if eid.negative?
  source_eid = data[1]
  source_data = data[2]
  warn Message.construct(Message::Level::WARNING, "loop response end has data: #{source_data}.") if source_data
  @catchall_event_handler.call(rid, source_eid, source_data)
  handle = @_event_handler[source_eid.abs]
  handle ||= @uncaught_event_handler
  handle.call(rid, source_eid, source_data)
  queue = @_loop_queues.delete(rid)
  queue.push(nil)
  DuctEventHandlerReturnValue.new(rid, source_eid, source_eid)
end
_loop_response_handler(rid, eid, data) click to toggle source
# File lib/ducts/client.rb, line 334
def _loop_response_handler(rid, eid, data)
  warn Message.construct(Message::Level::WARNING, 'loop response has error.') if eid.negative?
  source_eid = data[1]
  source_data = data[2]
  @catchall_event_handler.call(rid, source_eid, source_data)
  handle = @_event_handler[source_eid.abs]
  handle ||= @uncaught_event_handler
  ret = handle.call(rid, source_eid, source_data)
  handled_source_data = (ret.instance_of? DuctEventHandlerReturnValue)? ret.source_data : source_data
  @_loop_queues[rid] ||= EM::Queue.new
  queue = @_loop_queues[rid]
  queue.push(handled_source_data) if handled_source_data
  DuctEventHandlerReturnValue.new(rid, source_eid, queue)
end
_onmessage(connection_event) click to toggle source
# File lib/ducts/client.rb, line 292
def _onmessage(connection_event)
  begin
    rid, eid, data = %i(rid eid data).map{ |name| connection_event.source.public_send(name) }
    begin
      @catchall_event_handler.call(rid, eid, data)
      handle = @_event_handler[eid]
      handle ||= @uncaught_event_handler
      handle.call(rid, eid, data).tap do |ret|
        if ret.instance_of? DuctEventHandlerReturnValue
          rid = ret.source_rid 
          eid = ret.source_eid
          data = ret.source_data
        end
      end
      return if rid.nil?
      completion = @_waiting_message_completion.delete(rid)
      if completion
        if eid > 0
          completion.succeed(data)
        else
          completion.fail(DuctError.exception(data))
        end
      end
    rescue => error
      @event_error_handler.call(rid, eid, data, error)
    end
  rescue => error
    @event_error_handler.call(-1, -1, nil, error)
  end
end
_onopen(event) click to toggle source
# File lib/ducts/client.rb, line 272
def _onopen(event)
  @_send_timestamp = Time.now.to_f
  @_time_offset = 0
  @_time_latency = 0
  @_time_count = 0
  set_event_handler(@EVENT['ALIVE_MONITORING'], &method(:_alive_monitoring_handler))
  set_event_handler(@EVENT['LOOP_RESPONSE_START'], &method(:_loop_response_handler))
  set_event_handler(@EVENT['LOOP_RESPONSE_NEXT'], &method(:_loop_response_handler))
  set_event_handler(@EVENT['LOOP_RESPONSE_END'], &method(:_loop_response_end_handler))
  set_event_handler(@EVENT['DIVIDED_RESPONSE_APPEND'], &method(:_divided_response_append_handler))
  set_event_handler(@EVENT['DIVIDED_RESPONSE_END'], &method(:_divided_response_end_handler))
  rid = next_rid
  eid = @EVENT['ALIVE_MONITORING']
  value = @_send_timestamp
  event_send(rid, eid, value)
end
_onreconnect(connection_event) click to toggle source
# File lib/ducts/client.rb, line 289
def _onreconnect(connection_event)
end