class Ducts::Client
Constants
- DuctEventHandlerReturnValue
- VERSION
Attributes
EVENT[R]
WSD[R]
catchall_event_handler[RW]
connection_listener[R]
event_error_handler[RW]
uncaught_event_handler[RW]
Public Class Methods
new()
click to toggle source
# File lib/ducts/client.rb, line 112 def initialize @WSD = nil @EVENT = nil @connection_listener = ConnectionEventListener.new @catchall_event_handler = lambda{ |rid, eid, data| } @uncaught_event_handler = lambda{ |rid, eid, data| } @event_error_handler = lambda{ |rid, eid, data, error| warn Message.construct(Message::Level::ERROR, error.message) } @_last_rid = nil @_ws = nil @_waiting_message_completion = Hash.new @_waiting_closed_completion = Array.new @_loop_queues = Hash.new @_divided_buffers = Hash.new end
Public Instance Methods
close()
click to toggle source
# File lib/ducts/client.rb, line 260 def close completion = EM::Completion.new if @_ws @_ws.close @_waiting_closed_completion.push(completion) else completion.succeed(nil) end completion end
event_call(eid, data)
click to toggle source
# File lib/ducts/client.rb, line 253 def event_call(eid, data) rid = next_rid event_send(rid, eid, data) warn Message.construct(Message::Level::WARNING, 'request ID is duplicated.') if @_waiting_message_completion.keys.include? rid EM::Completion.new.tap { |completion| @_waiting_message_completion[rid] = completion } end
event_send(rid, eid, data)
click to toggle source
# File lib/ducts/client.rb, line 247 def event_send(rid, eid, data) msgpack = [ rid, eid, data ].to_msgpack @_ws.send(msgpack.chars.map(&:ord)) rid end
next_rid()
click to toggle source
# File lib/ducts/client.rb, line 130 def next_rid next_id = Time.now.to_i if (!@_last_rid.nil? && (next_id <= @_last_rid)) next_id = @_last_rid + 1 end @_last_rid = next_id next_id end
open(wsd_url, uuid = nil, **params)
click to toggle source
# File lib/ducts/client.rb, line 188 def open(wsd_url, uuid = nil, **params) completion = EM::Completion.new if @_ws completion.succeed(self) return completion; end begin query = '?uuid=' + (uuid || 'dummy') + params.map{ |k, v| "&#{k}=#{v}" }.join uri = URI.parse(wsd_url + query) https = Net::HTTP.new(uri.host, Net::HTTP.https_default_port).tap{ |h| h.ssl_version = :SSLv23 h.use_ssl = true h.verify_mode = OpenSSL::SSL::VERIFY_PEER } response = https.get(uri) @WSD = JSON.parse(response.body) @EVENT = @WSD['EVENT'] Faye::WebSocket::Client.new(@WSD['websocket_url']).tap do |ws| ws.on(:open) do |event| @_ws = ws completion.succeed(self) connection_event = DuctConnectionEvent.new('onopen', event) _onopen(connection_event) @connection_listener.onopen(connection_event) end ws.on(:message) do |event| message_event = DuctMessageEvent.new(event) connection_event = DuctConnectionEvent.new('onmessage', message_event) _onmessage(connection_event) @connection_listener.onmessage(connection_event) end ws.on(:error) do |event| connection_event = DuctConnectionEvent.new('onerror', event) @connection_listener.onerror(connection_event) end ws.on(:close) do |event| @_ws = nil case event.code when 1000 @_waiting_closed_completion.each do |waiting_closed_completion| waiting_closed_completion.succeed(event) end else p event.code @_waiting_closed_completion.each do |waiting_closed_completion| waiting_closed_completion.fail(event) end end @_waiting_closed_completion.clear connection_event = DuctConnectionEvent.new('onclose', event) @connection_listener.onclose(connection_event) end end rescue => error @connection_listener.onerror(DuctConnectionEvent.new('onerror', error)) end completion end
reconnect()
click to toggle source
# File lib/ducts/client.rb, line 156 def reconnect completion = EM::Completion.new if @_ws completion.succeed(self) return completion; end Faye::WebSocket::Client.new(@WSD['websocket_url_reconnect']).tap do |ws| ws.on(:open) do |event| @_ws = ws completion.succeed(self) connection_event = DuctConnectionEvent.new('onopen', event) _onreconnect(connection_event) @connection_listener.onopen(connection_event) end ws.on(:close) do |event| connection_event = DuctConnectionEvent.new('onclose', event) @connection_listener.onclose(connection_event) end ws.on(:message) do |event| message_event = DuctMessageEvent.new(event) connection_event = DuctConnectionEvent.new('onmessage', message_event) _onmessage(connection_event) @connection_listener.onmessage(connection_event) end ws.on(:error) do |event| connection_event = DuctConnectionEvent.new('onerror', event) @connection_listener.onerror(connection_event) end end completion end
set_event_handler(event_id, &handler)
click to toggle source
# File lib/ducts/client.rb, line 139 def set_event_handler(event_id, &handler) @_event_handler ||= {} if handler @_event_handler.store(event_id, handler) else @_event_handler.delete(event_id) end end
state()
click to toggle source
# File lib/ducts/client.rb, line 148 def state if @_ws State.get_state_from_value(@_ws.ready_state) else State::CLOSE end end
Private Instance Methods
_alive_monitoring_handler(rid, eid, data)
click to toggle source
# File lib/ducts/client.rb, line 323 def _alive_monitoring_handler(rid, eid, data) client_received = Time.now.to_f server_sent, server_received = data client_sent = @_send_timestamp new_offset = ((server_received - client_sent) - (client_received - server_sent)) / 2 new_latency = ((client_received - client_sent) - (server_sent - server_received)) / 2 @_time_offset = (@_time_offset * @_time_count + new_offset) / (@_time_count + 1) @_time_latency = (@_time_latency * @_time_count + new_latency) / (@_time_count + 1) @_time_count += 1 end
_divided_response_append_handler(rid, eid, data)
click to toggle source
# File lib/ducts/client.rb, line 363 def _divided_response_append_handler(rid, eid, data) @_divided_buffers[rid] ||= StringIO.new('', 'wb').set_encoding(Encoding::ASCII_8BIT) divided_buffer = @_divided_buffers[rid] divided_buffer.write(data) DuctEventHandlerReturnValue.new(nil, eid, nil) end
_divided_response_end_handler(rid, eid, data)
click to toggle source
# File lib/ducts/client.rb, line 370 def _divided_response_end_handler(rid, eid, data) divided_buffer = @_divided_buffers.delete(rid) divided_buffer.write(data) source_rid, source_eid, source_data = MessagePack.unpack(divided_buffer.string) @catchall_event_handler.call(source_rid, source_eid, source_data) handle = @_event_handler[source_eid] handle ||= @uncaught_event_handler handle.call(source_rid, source_eid, source_data) DuctEventHandlerReturnValue.new(source_rid, source_eid, source_data) end
_loop_response_end_handler(rid, eid, data)
click to toggle source
# File lib/ducts/client.rb, line 349 def _loop_response_end_handler(rid, eid, data) warn Message.construct(Message::Level::WARNING, 'loop response end has error.') if eid.negative? source_eid = data[1] source_data = data[2] warn Message.construct(Message::Level::WARNING, "loop response end has data: #{source_data}.") if source_data @catchall_event_handler.call(rid, source_eid, source_data) handle = @_event_handler[source_eid.abs] handle ||= @uncaught_event_handler handle.call(rid, source_eid, source_data) queue = @_loop_queues.delete(rid) queue.push(nil) DuctEventHandlerReturnValue.new(rid, source_eid, source_eid) end
_loop_response_handler(rid, eid, data)
click to toggle source
# File lib/ducts/client.rb, line 334 def _loop_response_handler(rid, eid, data) warn Message.construct(Message::Level::WARNING, 'loop response has error.') if eid.negative? source_eid = data[1] source_data = data[2] @catchall_event_handler.call(rid, source_eid, source_data) handle = @_event_handler[source_eid.abs] handle ||= @uncaught_event_handler ret = handle.call(rid, source_eid, source_data) handled_source_data = (ret.instance_of? DuctEventHandlerReturnValue)? ret.source_data : source_data @_loop_queues[rid] ||= EM::Queue.new queue = @_loop_queues[rid] queue.push(handled_source_data) if handled_source_data DuctEventHandlerReturnValue.new(rid, source_eid, queue) end
_onmessage(connection_event)
click to toggle source
# File lib/ducts/client.rb, line 292 def _onmessage(connection_event) begin rid, eid, data = %i(rid eid data).map{ |name| connection_event.source.public_send(name) } begin @catchall_event_handler.call(rid, eid, data) handle = @_event_handler[eid] handle ||= @uncaught_event_handler handle.call(rid, eid, data).tap do |ret| if ret.instance_of? DuctEventHandlerReturnValue rid = ret.source_rid eid = ret.source_eid data = ret.source_data end end return if rid.nil? completion = @_waiting_message_completion.delete(rid) if completion if eid > 0 completion.succeed(data) else completion.fail(DuctError.exception(data)) end end rescue => error @event_error_handler.call(rid, eid, data, error) end rescue => error @event_error_handler.call(-1, -1, nil, error) end end
_onopen(event)
click to toggle source
# File lib/ducts/client.rb, line 272 def _onopen(event) @_send_timestamp = Time.now.to_f @_time_offset = 0 @_time_latency = 0 @_time_count = 0 set_event_handler(@EVENT['ALIVE_MONITORING'], &method(:_alive_monitoring_handler)) set_event_handler(@EVENT['LOOP_RESPONSE_START'], &method(:_loop_response_handler)) set_event_handler(@EVENT['LOOP_RESPONSE_NEXT'], &method(:_loop_response_handler)) set_event_handler(@EVENT['LOOP_RESPONSE_END'], &method(:_loop_response_end_handler)) set_event_handler(@EVENT['DIVIDED_RESPONSE_APPEND'], &method(:_divided_response_append_handler)) set_event_handler(@EVENT['DIVIDED_RESPONSE_END'], &method(:_divided_response_end_handler)) rid = next_rid eid = @EVENT['ALIVE_MONITORING'] value = @_send_timestamp event_send(rid, eid, value) end
_onreconnect(connection_event)
click to toggle source
# File lib/ducts/client.rb, line 289 def _onreconnect(connection_event) end