class Deluge::Rpc::Connection
Constants
- DAEMON_LOGIN
- DAEMON_METHOD_LIST
- DAEMON_REGISTER_EVENT
- DEFAULT_CALL_TIMEOUT
- DEFAULT_PORT
- PROTOCOL_VERSION
- RPC_ERROR
- RPC_EVENT
- RPC_RESPONSE
Attributes
host[R]
port[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/deluge/rpc/connection.rb, line 36 def initialize(options = {}) @host = options.delete(:host) || 'localhost' @port = (options.delete(:port) || DEFAULT_PORT).to_i @call_timeout = options.delete(:call_timeout) || DEFAULT_CALL_TIMEOUT @request_id = Concurrent::AtomicFixnum.new @running = Concurrent::AtomicBoolean.new @messages = {} @events = {} @write_mutex = Mutex.new end
Public Instance Methods
authenticate(login, password)
click to toggle source
# File lib/deluge/rpc/connection.rb, line 69 def authenticate(login, password) self.call(DAEMON_LOGIN, [login, password, {'client_version' => Deluge::Rpc::VERSION.to_s}]) end
call(method, *args)
click to toggle source
# File lib/deluge/rpc/connection.rb, line 92 def call(method, *args) raise "Not connected!" unless @connection kwargs = {} if args.size == 1 && args[0].last.is_a?(Hash) kwargs = args[0].pop args = args[0] end future = Concurrent::IVar.new request_id = @request_id.increment @messages[request_id] = future message = [[request_id, method, args, kwargs]] write_packet(message) result = future.value!(@call_timeout) if result.nil? && future.pending? raise InvokeTimeoutError.new("Failed to retrieve response for '#{method}' in #{@call_timeout} seconds. Probably method not exists.") end result end
close()
click to toggle source
# File lib/deluge/rpc/connection.rb, line 88 def close @running.make_false end
method_list()
click to toggle source
# File lib/deluge/rpc/connection.rb, line 73 def method_list self.call(DAEMON_METHOD_LIST) end
register_event(event_name, force = false, &block)
click to toggle source
# File lib/deluge/rpc/connection.rb, line 77 def register_event(event_name, force = false, &block) unless @events[event_name] # Register event only ONCE! self.call(DAEMON_REGISTER_EVENT, [event_name]) if @connection # Let events be initialized lazily end @events[event_name] ||= [] @events[event_name] << block true end
start()
click to toggle source
# File lib/deluge/rpc/connection.rb, line 51 def start raise 'Connection already opened' if @connection @connection = OpenSSL::SSL::SSLSocket.new(create_socket, ssl_context) @connection.connect @running.make_true @main_thread = Thread.current @thread = Thread.new(&self.method(:read_loop)) # register present events recover_events! if @events.size > 0 true end
Private Instance Methods
create_socket()
click to toggle source
# File lib/deluge/rpc/connection.rb, line 215 def create_socket socket = TCPSocket.new(host, port) if ::Socket.constants.include?('TCP_NODELAY') || ::Socket.constants.include?(:TCP_NODELAY) socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, true) end socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) socket end
dispatch_packet(packet)
click to toggle source
# File lib/deluge/rpc/connection.rb, line 150 def dispatch_packet(packet) type, response_id, value = packet case type when RPC_RESPONSE, RPC_ERROR future = @messages.delete(response_id) return unless future # TODO: Handle unknown messages if type == RPC_RESPONSE future.set(value) else future.fail(RPCError.new(value)) end when RPC_EVENT handlers = @events[response_id] return unless handlers # TODO: Handle unknown events handlers.each do |block| block.call(*value) end else raise "Unknown packet type #{type.inspect}" end end
parse_packets(raw)
click to toggle source
# File lib/deluge/rpc/connection.rb, line 230 def parse_packets(raw) io = StringIO.new(raw) packets = [] until(io.eof?) packets << Rencoder.load(io) end packets end
read_loop()
click to toggle source
# File lib/deluge/rpc/connection.rb, line 121 def read_loop while(@running.true?) io_poll = IO.select([@connection], nil, [@connection], 0.1) next unless io_poll read_sockets, _, error_sockets = io_poll if @connection.eof? # TODO: implement auto-recovery raise ConnectionClosedError end read_sockets.each do |socket| packets = read_packets(socket) packets.each do |packet| dispatch_packet(packet) end end end rescue => e @main_thread.raise(e) ensure @connection.close if @connection @connection = nil @messages.clear end
read_packets(socket)
click to toggle source
# File lib/deluge/rpc/connection.rb, line 187 def read_packets(socket) raw = "" # Read message header protocol_version, buffer_size = socket.readpartial(5).unpack('CN') raise('Received response with unknown protocol_version=' + protocol_version) if protocol_version != PROTOCOL_VERSION # a big response requires some extra reads because deluged may be # slow to generate and send all the data through the socket raw += socket.readpartial(buffer_size - raw.bytesize) while raw.bytesize < buffer_size raw = Zlib::Inflate.inflate(raw) parse_packets(raw) end
recover_events!()
click to toggle source
# File lib/deluge/rpc/connection.rb, line 204 def recover_events! present_events = @events @events = {} present_events.each do |event, handlers| handlers.each do |handler| self.register_event(event, &handler) end end end
ssl_context()
click to toggle source
# File lib/deluge/rpc/connection.rb, line 226 def ssl_context OpenSSL::SSL::SSLContext.new end
write_packet(packet)
click to toggle source
# File lib/deluge/rpc/connection.rb, line 176 def write_packet(packet) raw = Zlib::Deflate.deflate Rencoder.dump(packet) raw = [PROTOCOL_VERSION, raw.bytesize].pack("CN") + raw @write_mutex.synchronize do if IO.select([], [@connection], nil, nil) @connection.write(raw) end end end