class Roma::RecoverLost

Public Class Methods

new(pname, pushv_cmd, argv, alldata = false) click to toggle source
   # File lib/roma/tools/recoverlost_lib.rb
18 def initialize(pname, pushv_cmd, argv, alldata = false)
19   if alldata == false && argv.length < 4
20     puts "usage:#{pname} address port storage-path [yyyymmddhhmmss]"
21     exit
22   end
23 
24   if alldata && argv.length != 3
25     puts "usage:#{pname} address port storage-path"
26     exit
27   end
28 
29   @addr = argv[0]
30   @port = argv[1]
31   @strgpath = argv[2]
32   @ymdhms = argv[3]
33 
34   if @port =~ /\D/
35     STDERR.puts "port was not numeric."
36     exit
37   end
38 
39   if @ymdhms && (@ymdhms.length != 14 || @ymdhms =~ /\D/)
40     STDERR.puts "yyyymmddhhmmss format mismatch."
41     exit
42   end
43   @pushv_cmd = pushv_cmd
44   @nodeid = "#{@addr}_#{@port}"
45   @stream_copy_wait_param = 0.0001
46   @alldata = alldata
47 end

Public Instance Methods

broadcast_cmd(cmd,without_nids=nil) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
321 def broadcast_cmd(cmd,without_nids=nil)
322   without_nids=[] unless without_nids
323   res = {}
324   @rd.nodes.each{ |nid|
325     res[nid] = send_cmd(nid,cmd) unless without_nids.include?(nid)
326   }
327   res
328 rescue => e
329   STDERR.puts("#{e}\n#{$@}")
330   nil
331 end
clk_to_zero(data) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
310 def clk_to_zero(data)
311   vn, last, clk, expt, klen = data.unpack('NNNNN')
312   k, vlen = data[20..-1].unpack("a#{klen}N")
313   if vlen != 0
314     v, = data[(20+klen+4)..-1].unpack("a#{vlen}")
315     [vn, last, 0, expt, klen, k, vlen, v].pack("NNNNNa#{klen}Na#{vlen}")
316   else
317     [vn, last, 0, expt, klen, k, 0].pack("NNNNNa#{klen}N")
318   end
319 end
each_hash(path) { |hname,dir| ... } click to toggle source
   # File lib/roma/tools/recoverlost_lib.rb
81 def each_hash(path)
82   Dir::glob("#{path}/*").each{|dir|
83     next unless File::directory?(dir)
84     hname = dir[dir.rindex('/')+1..-1]
85     yield hname,dir
86   }
87 end
get_history_of_lost(nid,ymdhms) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
102 def get_history_of_lost(nid,ymdhms)
103   ret = []
104   con = Roma::Messaging::ConPool.instance.get_connection(nid)
105   con.write("history_of_lost #{ymdhms}\r\n")
106   while((buf = con.gets) != "END\r\n")
107     ret << buf.chomp.to_i
108   end
109   Roma::Messaging::ConPool.instance.return_connection(nid, con)
110   ret
111 end
get_lost_vnodes(rd,ymdhms) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
 94 def get_lost_vnodes(rd,ymdhms)
 95   ret = rd.get_lost_vnodes
 96   if ymdhms
 97     ret |= get_history_of_lost(@nodeid,ymdhms)
 98   end
 99   ret
100 end
get_routing_data(nid) click to toggle source
   # File lib/roma/tools/recoverlost_lib.rb
89 def get_routing_data(nid)
90   sender = Roma::Client::Sender.new
91   sender.send_routedump_command(nid)
92 end
make_node_hash(keys) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
198 def make_node_hash(keys)
199   res = {}
200   @rd.nodes.each{|nid| res[nid] = [] }
201   keys.each{|key|
202     d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits)
203     @rd.v_idx[d & @rd.search_mask].each{|nid| res[nid] << key }
204   }
205   res
206 end
new_storage(ext) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
132 def new_storage(ext)
133   case(ext)
134   when 'tc'
135     return ::Roma::Storage::TCStorage.new
136   when 'dbm'
137     return Roma::Storage::DbmStorage.new
138   when 'sql3'
139     return Roma::Storage::SQLite3Storage.new
140   else
141     return nil
142   end
143 end
open_storage(path,vn_list) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
113 def open_storage(path,vn_list)
114   unless File::directory?(path)
115     STDERR.puts "#{path} does not found."
116     return nil
117   end
118 
119   # get a file extension
120   ext = File::extname(Dir::glob("#{path}/0.*")[0])[1..-1]
121   # count a number of divided files
122   divnum = Dir::glob("#{path}/*.#{ext}").length
123 
124   st = new_storage(ext)
125   st.divnum = divnum
126   st.vn_list = vn_list
127   st.storage_path = path
128   st.opendb
129   st
130 end
push_a_vnode_stream(hname, vn, nid) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
172 def push_a_vnode_stream(hname, vn, nid)
173   con = Roma::Messaging::ConPool.instance.get_connection(nid)
174 
175   con.write("#{@pushv_cmd} #{hname} #{vn}\r\n")
176 
177   res = con.gets # READY\r\n or error string
178   if res != "READY\r\n"
179     con.close
180     return res.chomp
181   end
182 
183   @storage.each_vn_dump(vn){|data|
184     con.write(clk_to_zero(data))
185     sleep @stream_copy_wait_param
186   }
187   con.write("\0"*20) # end of steram
188 
189   res = con.gets # STORED\r\n or error string
190   Roma::Messaging::ConPool.instance.return_connection(nid,con)
191   res.chomp! if res
192   res
193 rescue =>e
194   STDERR.puts "#{e}\n#{$@}"
195   nil
196 end
send_cmd(nid, cmd) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
333 def send_cmd(nid, cmd)
334   con = Roma::Messaging::ConPool.instance.get_connection(nid)
335   con.write(cmd)
336   res = con.gets
337   Roma::Messaging::ConPool.instance.return_connection(nid, con)
338   if res
339     res.chomp!
340   end
341   res
342 rescue => e
343   STDERR.puts("#{__FILE__}:#{__LINE__}:Send command failed that node-id is #{nid},command is #{cmd}.")
344   nil
345 end
start_recover(hname) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
145 def start_recover(hname)
146   @lost_vnodes.each_with_index{|vn, idx|
147     nodes = @rd.v_idx[vn]
148     if nodes == nil || nodes.length == 0
149       nids = []
150       nids[0] = @rd.nodes[rand(@rd.nodes.length)]
151       puts "#{idx}/#{@lost_vnodes.length} #{vn} assign to #{nids.inspect}"
152     else
153       nids = nodes
154       puts "#{idx}/#{@lost_vnodes.length} #{vn} was auto assigned at #{nids.inspect}"
155     end
156 
157     nids.each{|nid|
158       if push_a_vnode_stream(hname, vn, nid)!="STORED"
159         STDERR.puts "push_a_vnode_stream aborted in #{vn}"
160         exit
161       end
162     }
163 
164     if nodes == nil || nodes.length == 0
165       cmd = "setroute #{vn} #{@rd.v_clk[vn]} #{nids[0]}\r\n"
166       exit unless send_cmd(nids[0] ,cmd)
167       broadcast_cmd(cmd, nids[0])
168     end
169   }
170 end
start_recover_width_keys(hname,keys) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
259 def start_recover_width_keys(hname,keys)
260   keys.each{|key|
261     data = @storage.get_raw2(key)
262     if data
263       puts "hit => #{key}"
264       d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits)
265       vn = d & @rd.search_mask
266       nodes = @rd.v_idx[vn]
267       nodes.each{|nid|
268         print "#{nid}=>"
269         res = upload_data(hname, vn, nid, key, data)
270         puts res
271       }
272     end
273   }
274 end
start_recover_width_keys2(hname,keys) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
208 def start_recover_width_keys2(hname,keys)
209   node_hash = make_node_hash(keys)
210   node_hash.each{|nid,ks|
211     puts nid
212     upload_data2(hname, nid, ks)
213   }
214 end
suite() click to toggle source
   # File lib/roma/tools/recoverlost_lib.rb
49 def suite
50   @rd = get_routing_data(@nodeid)
51   unless @alldata
52     @lost_vnodes = get_lost_vnodes(@rd,@ymdhms)
53     puts "#{@lost_vnodes.length} vnodes where data was lost."
54 
55     exit if @lost_vnodes.length == 0
56   else
57     @lost_vnodes = @rd.v_idx.keys
58   end
59 
60   each_hash(@strgpath){|hname,dir|
61     puts "#{hname} #{dir}"
62     @storage = open_storage(dir,@lost_vnodes)
63     start_recover(hname)
64     @storage.closedb
65   }
66 end
suite_with_keys(keys) click to toggle source
   # File lib/roma/tools/recoverlost_lib.rb
68     def suite_with_keys(keys)
69       @rd = get_routing_data(@nodeid)
70       @lost_vnodes = @rd.v_idx.keys
71 
72       each_hash(@strgpath){|hname,dir|
73         puts "#{hname} #{dir}"
74         @storage = open_storage(dir,@lost_vnodes)
75         start_recover_width_keys(hname, keys)
76 #        start_recover_width_keys2(hname, keys)
77         @storage.closedb
78       }
79     end
upload_data(hname, vn, nid, k, data) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
276     def upload_data(hname, vn, nid, k, data)
277       con = Roma::Messaging::ConPool.instance.get_connection(nid)
278 
279       cmd = "#{@pushv_cmd} #{hname} #{vn}\r\n"
280       con.write(cmd)
281 # puts "new vn = #{vn}"
282       res = con.gets # READY\r\n or error string
283       if res != "READY\r\n"
284         con.close
285         return res.chomp
286       end
287 
288       vn_old, last, clk, expt, val = data
289 # puts "old vn = #{vn_old}"
290       if val
291         wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
292       else
293         wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N")
294       end
295 
296       con.write(wd)
297       sleep @stream_copy_wait_param
298 
299       con.write("\0"*20) # end of steram
300 
301       res = con.gets # STORED\r\n or error string
302       Roma::Messaging::ConPool.instance.return_connection(nid,con)
303       res.chomp! if res
304       res
305     rescue =>e
306       STDERR.puts "#{e}\n#{$@}"
307       nil
308     end
upload_data2(hname, nid, keys) click to toggle source
    # File lib/roma/tools/recoverlost_lib.rb
216 def upload_data2(hname, nid, keys)
217   con = Roma::Messaging::ConPool.instance.get_connection(nid)
218 
219   cmd = "#{@pushv_cmd} #{hname} 0\r\n"
220   con.write(cmd)
221   res = con.gets # READY\r\n or error string
222   if res != "READY\r\n"
223     con.close
224     return res.chomp
225   end
226 
227   n = keys.length
228   m = n / 100
229   m = 1 if m < 1
230   keys.each_with_index{|k,i|
231     print "#{i}/#{n}\r" if i%m == 0
232     data = @storage.get_raw2(k)
233     next unless data
234     d = Digest::SHA1.hexdigest(k).hex % (2**@rd.dgst_bits)
235     vn = d & @rd.search_mask
236 
237     vn_old, last, clk, expt, val = data
238     # puts "old vn = #{vn_old}"
239     if val
240       wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
241     else
242       wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N")
243     end
244 
245     con.write(wd)
246     sleep @stream_copy_wait_param
247   }
248   con.write("\0"*20) # end of steram
249 
250   res = con.gets # STORED\r\n or error string
251   Roma::Messaging::ConPool.instance.return_connection(nid,con)
252   res.chomp! if res
253   res
254 rescue =>e
255   STDERR.puts "#{e}\n#{$@}"
256   nil
257 end