module Roma::Command::VnodeCommandReceiver
Public Instance Methods
ev_reqpushv(s)
click to toggle source
reqpushv <vnode-id> <node-id> <is primary?> src dst
|<-['reqpushv <vn> <nid> <p?>\r\n'] | | ['PUSHED'\r\n]->|
# File lib/roma/command/vn_command_receiver.rb 109 def ev_reqpushv(s) 110 if s.length!=4 111 send_data("CLIENT_ERROR usage:reqpushv vnode-id node-id primary-flag(true/false)\r\n") 112 return 113 end 114 if(@stats.run_iterate_storage || @stats.run_join || @stats.run_balance) 115 @log.warn("reqpushv rejected:#{s}") 116 send_data("REJECTED\r\n") 117 return 118 end 119 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('reqpushv',[s[1],s[2],s[3]])) 120 send_data("PUSHED\r\n") 121 rescue =>e 122 @log.error("#{e}\n#{$@}") 123 end
ev_spushv(s)
click to toggle source
spushv <hash-name> <vnode-id> src dst
| ['spushv' <hname> <vn>\r\n]->| |<-['READY'\r\n] | | [<dumpdata>]->| | : | | : | | [<end of dump>]->| |<-['STORED'\r\n] |
# File lib/roma/command/vn_command_receiver.rb 30 def ev_spushv(s) 31 if s.length != 3 32 @log.error("#{__method__}:wrong number of arguments(#{s})") 33 return send_data("CLIENT_ERROR Wrong number of arguments.\r\n") 34 end 35 if @stats.spushv_protection 36 @log.info("#{__method__}:In spushv_protection") 37 return send_data("SERVER_ERROR In spushv_protection.\r\n") 38 end 39 @stats.run_receive_a_vnode["#{s[1]}_#{s[2]}"] = true 40 41 $roma.stop_clean_up 42 43 send_data("READY\r\n") 44 45 count = rcount = 0 46 @log.debug("#{__method__}:#{s.inspect} received.") 47 loop { 48 context_bin = read_bytes(20, @stats.spushv_read_timeout) 49 vn, last, clk, expt, klen = context_bin.unpack('NNNNN') 50 break if klen == 0 # end of dump ? 51 k = read_bytes(klen) 52 vlen_bin = read_bytes(4, @stats.spushv_read_timeout) 53 vlen, = vlen_bin.unpack('N') 54 if vlen != 0 55 if klen > @stats.spushv_klength_warn 56 @log.warn("#{__method__}:Too long key: key = #{k}") 57 end 58 if vlen > @stats.spushv_vlength_warn 59 @log.warn("#{__method__}:Too long value: key = #{k} vlen = #{vlen}") 60 end 61 v = read_bytes(vlen, @stats.spushv_read_timeout) 62 63 createhash(s[1]) unless @storages[s[1]] 64 if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, v) 65 count += 1 66 # @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.") 67 else 68 rcount += 1 69 # @log.warn("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.") 70 end 71 else 72 createhash(s[1]) unless @storages[s[1]] 73 if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, nil) 74 # @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.") 75 count += 1 76 else 77 rcount += 1 78 # @log.warn("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.") 79 end 80 end 81 } 82 if @stats.spushv_protection 83 @log.info("#{__method__}:Canceled because of spushv_protection") 84 send_data("CANCELED\r\n") 85 else 86 send_data("STORED\r\n") 87 end 88 @log.debug("#{__method__}:#{s[1]}_#{s[2]} #{count} keys loaded.") 89 @log.debug("#{__method__}:#{s[1]}_#{s[2]} #{rcount} keys rejected.") if rcount > 0 90 rescue Storage::StorageException => e 91 @log.error("#{e.inspect} #{$@}") 92 close_connection 93 if Config.const_defined?(:STORAGE_EXCEPTION_ACTION) && 94 Config::STORAGE_EXCEPTION_ACTION == :shutdown 95 @log.error("#{__method__}:Romad will be stop.") 96 @stop_event_loop = true 97 end 98 rescue => e 99 @log.error("#{e} #{$@}") 100 ensure 101 @stats.run_receive_a_vnode.delete("#{s[1]}_#{s[2]}") if s.length == 3 102 @stats.last_clean_up = Time.now 103 end
ev_spushv_protection(s)
click to toggle source
spushv_protection <true/false>
# File lib/roma/command/vn_command_receiver.rb 10 def ev_spushv_protection(s) 11 if s.length == 1 12 send_data("#{@stats.spushv_protection}\r\n") 13 elsif s.length == 2 14 @stats.spushv_protection = (s[1] == 'true') 15 send_data("#{@stats.spushv_protection}\r\n") 16 else 17 send_data("COMMAND ERROR\r\n") 18 end 19 end
req_push_a_vnode(vn, src_nid, is_primary)
click to toggle source
# File lib/roma/command/vn_command_receiver.rb 125 def req_push_a_vnode(vn, src_nid, is_primary) 126 con = Roma::Messaging::ConPool.instance.get_connection(src_nid) 127 con.write("reqpushv #{vn} #{@nid} #{is_primary}\r\n") 128 res = con.gets # receive 'PUSHED\r\n' | 'REJECTED\r\n' 129 Roma::Messaging::ConPool.instance.return_connection(src_nid,con) 130 # waiting for pushv 131 count = 0 132 while @rttable.search_nodes(vn).include?(@nid)==false && count < 300 133 sleep 0.1 134 count += 1 135 end 136 rescue =>e 137 @log.error("#{e}\n#{$@}") 138 @rttable.proc_failed(src_nid) 139 false 140 end