module Roma::Command::VnodeCommandReceiver

Public Instance Methods

ev_reqpushv(s) click to toggle source

reqpushv <vnode-id> <node-id> <is primary?> src dst

|<-['reqpushv <vn> <nid> <p?>\r\n']         |
|                           ['PUSHED'\r\n]->|
    # File lib/roma/command/vn_command_receiver.rb
109 def ev_reqpushv(s)
110   if s.length!=4
111     send_data("CLIENT_ERROR usage:reqpushv vnode-id node-id primary-flag(true/false)\r\n")
112     return
113   end
114   if(@stats.run_iterate_storage || @stats.run_join || @stats.run_balance)
115     @log.warn("reqpushv rejected:#{s}")
116     send_data("REJECTED\r\n")
117     return
118   end
119   Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('reqpushv',[s[1],s[2],s[3]]))
120   send_data("PUSHED\r\n")
121 rescue =>e
122   @log.error("#{e}\n#{$@}")
123 end
ev_spushv(s) click to toggle source

spushv <hash-name> <vnode-id> src dst

|  ['spushv' <hname> <vn>\r\n]->|
|<-['READY'\r\n]                |
|                 [<dumpdata>]->|
|                       :       |
|                       :       |
|              [<end of dump>]->|
|<-['STORED'\r\n]               |
    # File lib/roma/command/vn_command_receiver.rb
 30       def ev_spushv(s)
 31         if s.length != 3
 32           @log.error("#{__method__}:wrong number of arguments(#{s})")
 33           return send_data("CLIENT_ERROR Wrong number of arguments.\r\n")
 34         end
 35         if @stats.spushv_protection
 36           @log.info("#{__method__}:In spushv_protection")
 37           return send_data("SERVER_ERROR In spushv_protection.\r\n")          
 38         end
 39         @stats.run_receive_a_vnode["#{s[1]}_#{s[2]}"] = true
 40 
 41         $roma.stop_clean_up
 42 
 43         send_data("READY\r\n")
 44 
 45         count = rcount = 0
 46         @log.debug("#{__method__}:#{s.inspect} received.")
 47         loop {
 48           context_bin = read_bytes(20, @stats.spushv_read_timeout)
 49           vn, last, clk, expt, klen = context_bin.unpack('NNNNN')
 50           break if klen == 0 # end of dump ?
 51           k = read_bytes(klen)
 52           vlen_bin = read_bytes(4, @stats.spushv_read_timeout)
 53           vlen, =  vlen_bin.unpack('N')
 54           if vlen != 0
 55             if klen > @stats.spushv_klength_warn
 56               @log.warn("#{__method__}:Too long key: key = #{k}")
 57             end
 58             if vlen > @stats.spushv_vlength_warn
 59               @log.warn("#{__method__}:Too long value: key = #{k} vlen = #{vlen}")
 60             end
 61             v = read_bytes(vlen, @stats.spushv_read_timeout)
 62 
 63             createhash(s[1]) unless @storages[s[1]]
 64             if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, v)
 65               count += 1
 66 #              @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.")
 67             else
 68               rcount += 1
 69 #              @log.warn("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.")
 70             end
 71           else
 72             createhash(s[1]) unless @storages[s[1]]
 73             if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, nil)
 74 #              @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.")
 75               count += 1
 76             else
 77               rcount += 1
 78 #              @log.warn("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.")
 79             end
 80           end
 81         }
 82         if @stats.spushv_protection
 83           @log.info("#{__method__}:Canceled because of spushv_protection")
 84           send_data("CANCELED\r\n")
 85         else
 86           send_data("STORED\r\n")
 87         end
 88         @log.debug("#{__method__}:#{s[1]}_#{s[2]} #{count} keys loaded.")
 89         @log.debug("#{__method__}:#{s[1]}_#{s[2]} #{rcount} keys rejected.") if rcount > 0
 90       rescue Storage::StorageException => e
 91         @log.error("#{e.inspect} #{$@}")
 92         close_connection
 93         if Config.const_defined?(:STORAGE_EXCEPTION_ACTION) &&
 94             Config::STORAGE_EXCEPTION_ACTION == :shutdown
 95           @log.error("#{__method__}:Romad will be stop.")
 96           @stop_event_loop = true
 97         end
 98       rescue => e
 99         @log.error("#{e} #{$@}")
100       ensure
101         @stats.run_receive_a_vnode.delete("#{s[1]}_#{s[2]}") if s.length == 3
102         @stats.last_clean_up = Time.now
103       end
ev_spushv_protection(s) click to toggle source

spushv_protection <true/false>

   # File lib/roma/command/vn_command_receiver.rb
10 def ev_spushv_protection(s)
11   if s.length == 1
12     send_data("#{@stats.spushv_protection}\r\n")
13   elsif s.length == 2
14     @stats.spushv_protection = (s[1] == 'true')
15     send_data("#{@stats.spushv_protection}\r\n")
16   else
17     send_data("COMMAND ERROR\r\n")
18   end
19 end
req_push_a_vnode(vn, src_nid, is_primary) click to toggle source
    # File lib/roma/command/vn_command_receiver.rb
125 def req_push_a_vnode(vn, src_nid, is_primary)
126   con = Roma::Messaging::ConPool.instance.get_connection(src_nid)
127   con.write("reqpushv #{vn} #{@nid} #{is_primary}\r\n")
128   res = con.gets # receive 'PUSHED\r\n' | 'REJECTED\r\n'
129   Roma::Messaging::ConPool.instance.return_connection(src_nid,con)
130   # waiting for pushv
131   count = 0
132   while @rttable.search_nodes(vn).include?(@nid)==false && count < 300
133     sleep 0.1
134     count += 1
135   end
136 rescue =>e
137   @log.error("#{e}\n#{$@}")
138   @rttable.proc_failed(src_nid)
139   false
140 end