class Roma::RecoverLost
Public Class Methods
new(pname, pushv_cmd, argv, alldata = false)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 18 def initialize(pname, pushv_cmd, argv, alldata = false) 19 if alldata == false && argv.length < 4 20 puts "usage:#{pname} address port storage-path [yyyymmddhhmmss]" 21 exit 22 end 23 24 if alldata && argv.length != 3 25 puts "usage:#{pname} address port storage-path" 26 exit 27 end 28 29 @addr = argv[0] 30 @port = argv[1] 31 @strgpath = argv[2] 32 @ymdhms = argv[3] 33 34 if @port =~ /\D/ 35 STDERR.puts "port was not numeric." 36 exit 37 end 38 39 if @ymdhms && (@ymdhms.length != 14 || @ymdhms =~ /\D/) 40 STDERR.puts "yyyymmddhhmmss format mismatch." 41 exit 42 end 43 @pushv_cmd = pushv_cmd 44 @nodeid = "#{@addr}_#{@port}" 45 @stream_copy_wait_param = 0.0001 46 @alldata = alldata 47 end
Public Instance Methods
broadcast_cmd(cmd,without_nids=nil)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 321 def broadcast_cmd(cmd,without_nids=nil) 322 without_nids=[] unless without_nids 323 res = {} 324 @rd.nodes.each{ |nid| 325 res[nid] = send_cmd(nid,cmd) unless without_nids.include?(nid) 326 } 327 res 328 rescue => e 329 STDERR.puts("#{e}\n#{$@}") 330 nil 331 end
clk_to_zero(data)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 310 def clk_to_zero(data) 311 vn, last, clk, expt, klen = data.unpack('NNNNN') 312 k, vlen = data[20..-1].unpack("a#{klen}N") 313 if vlen != 0 314 v, = data[(20+klen+4)..-1].unpack("a#{vlen}") 315 [vn, last, 0, expt, klen, k, vlen, v].pack("NNNNNa#{klen}Na#{vlen}") 316 else 317 [vn, last, 0, expt, klen, k, 0].pack("NNNNNa#{klen}N") 318 end 319 end
each_hash(path) { |hname,dir| ... }
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 81 def each_hash(path) 82 Dir::glob("#{path}/*").each{|dir| 83 next unless File::directory?(dir) 84 hname = dir[dir.rindex('/')+1..-1] 85 yield hname,dir 86 } 87 end
get_history_of_lost(nid,ymdhms)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 102 def get_history_of_lost(nid,ymdhms) 103 ret = [] 104 con = Roma::Messaging::ConPool.instance.get_connection(nid) 105 con.write("history_of_lost #{ymdhms}\r\n") 106 while((buf = con.gets) != "END\r\n") 107 ret << buf.chomp.to_i 108 end 109 Roma::Messaging::ConPool.instance.return_connection(nid, con) 110 ret 111 end
get_lost_vnodes(rd,ymdhms)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 94 def get_lost_vnodes(rd,ymdhms) 95 ret = rd.get_lost_vnodes 96 if ymdhms 97 ret |= get_history_of_lost(@nodeid,ymdhms) 98 end 99 ret 100 end
get_routing_data(nid)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 89 def get_routing_data(nid) 90 sender = Roma::Client::Sender.new 91 sender.send_routedump_command(nid) 92 end
make_node_hash(keys)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 198 def make_node_hash(keys) 199 res = {} 200 @rd.nodes.each{|nid| res[nid] = [] } 201 keys.each{|key| 202 d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits) 203 @rd.v_idx[d & @rd.search_mask].each{|nid| res[nid] << key } 204 } 205 res 206 end
new_storage(ext)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 132 def new_storage(ext) 133 case(ext) 134 when 'tc' 135 return ::Roma::Storage::TCStorage.new 136 when 'dbm' 137 return Roma::Storage::DbmStorage.new 138 when 'sql3' 139 return Roma::Storage::SQLite3Storage.new 140 else 141 return nil 142 end 143 end
open_storage(path,vn_list)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 113 def open_storage(path,vn_list) 114 unless File::directory?(path) 115 STDERR.puts "#{path} does not found." 116 return nil 117 end 118 119 # get a file extension 120 ext = File::extname(Dir::glob("#{path}/0.*")[0])[1..-1] 121 # count a number of divided files 122 divnum = Dir::glob("#{path}/*.#{ext}").length 123 124 st = new_storage(ext) 125 st.divnum = divnum 126 st.vn_list = vn_list 127 st.storage_path = path 128 st.opendb 129 st 130 end
push_a_vnode_stream(hname, vn, nid)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 172 def push_a_vnode_stream(hname, vn, nid) 173 con = Roma::Messaging::ConPool.instance.get_connection(nid) 174 175 con.write("#{@pushv_cmd} #{hname} #{vn}\r\n") 176 177 res = con.gets # READY\r\n or error string 178 if res != "READY\r\n" 179 con.close 180 return res.chomp 181 end 182 183 @storage.each_vn_dump(vn){|data| 184 con.write(clk_to_zero(data)) 185 sleep @stream_copy_wait_param 186 } 187 con.write("\0"*20) # end of steram 188 189 res = con.gets # STORED\r\n or error string 190 Roma::Messaging::ConPool.instance.return_connection(nid,con) 191 res.chomp! if res 192 res 193 rescue =>e 194 STDERR.puts "#{e}\n#{$@}" 195 nil 196 end
send_cmd(nid, cmd)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 333 def send_cmd(nid, cmd) 334 con = Roma::Messaging::ConPool.instance.get_connection(nid) 335 con.write(cmd) 336 res = con.gets 337 Roma::Messaging::ConPool.instance.return_connection(nid, con) 338 if res 339 res.chomp! 340 end 341 res 342 rescue => e 343 STDERR.puts("#{__FILE__}:#{__LINE__}:Send command failed that node-id is #{nid},command is #{cmd}.") 344 nil 345 end
start_recover(hname)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 145 def start_recover(hname) 146 @lost_vnodes.each_with_index{|vn, idx| 147 nodes = @rd.v_idx[vn] 148 if nodes == nil || nodes.length == 0 149 nids = [] 150 nids[0] = @rd.nodes[rand(@rd.nodes.length)] 151 puts "#{idx}/#{@lost_vnodes.length} #{vn} assign to #{nids.inspect}" 152 else 153 nids = nodes 154 puts "#{idx}/#{@lost_vnodes.length} #{vn} was auto assigned at #{nids.inspect}" 155 end 156 157 nids.each{|nid| 158 if push_a_vnode_stream(hname, vn, nid)!="STORED" 159 STDERR.puts "push_a_vnode_stream aborted in #{vn}" 160 exit 161 end 162 } 163 164 if nodes == nil || nodes.length == 0 165 cmd = "setroute #{vn} #{@rd.v_clk[vn]} #{nids[0]}\r\n" 166 exit unless send_cmd(nids[0] ,cmd) 167 broadcast_cmd(cmd, nids[0]) 168 end 169 } 170 end
start_recover_width_keys(hname,keys)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 259 def start_recover_width_keys(hname,keys) 260 keys.each{|key| 261 data = @storage.get_raw2(key) 262 if data 263 puts "hit => #{key}" 264 d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits) 265 vn = d & @rd.search_mask 266 nodes = @rd.v_idx[vn] 267 nodes.each{|nid| 268 print "#{nid}=>" 269 res = upload_data(hname, vn, nid, key, data) 270 puts res 271 } 272 end 273 } 274 end
start_recover_width_keys2(hname,keys)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 208 def start_recover_width_keys2(hname,keys) 209 node_hash = make_node_hash(keys) 210 node_hash.each{|nid,ks| 211 puts nid 212 upload_data2(hname, nid, ks) 213 } 214 end
suite()
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 49 def suite 50 @rd = get_routing_data(@nodeid) 51 unless @alldata 52 @lost_vnodes = get_lost_vnodes(@rd,@ymdhms) 53 puts "#{@lost_vnodes.length} vnodes where data was lost." 54 55 exit if @lost_vnodes.length == 0 56 else 57 @lost_vnodes = @rd.v_idx.keys 58 end 59 60 each_hash(@strgpath){|hname,dir| 61 puts "#{hname} #{dir}" 62 @storage = open_storage(dir,@lost_vnodes) 63 start_recover(hname) 64 @storage.closedb 65 } 66 end
suite_with_keys(keys)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 68 def suite_with_keys(keys) 69 @rd = get_routing_data(@nodeid) 70 @lost_vnodes = @rd.v_idx.keys 71 72 each_hash(@strgpath){|hname,dir| 73 puts "#{hname} #{dir}" 74 @storage = open_storage(dir,@lost_vnodes) 75 start_recover_width_keys(hname, keys) 76 # start_recover_width_keys2(hname, keys) 77 @storage.closedb 78 } 79 end
upload_data(hname, vn, nid, k, data)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 276 def upload_data(hname, vn, nid, k, data) 277 con = Roma::Messaging::ConPool.instance.get_connection(nid) 278 279 cmd = "#{@pushv_cmd} #{hname} #{vn}\r\n" 280 con.write(cmd) 281 # puts "new vn = #{vn}" 282 res = con.gets # READY\r\n or error string 283 if res != "READY\r\n" 284 con.close 285 return res.chomp 286 end 287 288 vn_old, last, clk, expt, val = data 289 # puts "old vn = #{vn_old}" 290 if val 291 wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}") 292 else 293 wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N") 294 end 295 296 con.write(wd) 297 sleep @stream_copy_wait_param 298 299 con.write("\0"*20) # end of steram 300 301 res = con.gets # STORED\r\n or error string 302 Roma::Messaging::ConPool.instance.return_connection(nid,con) 303 res.chomp! if res 304 res 305 rescue =>e 306 STDERR.puts "#{e}\n#{$@}" 307 nil 308 end
upload_data2(hname, nid, keys)
click to toggle source
# File lib/roma/tools/recoverlost_lib.rb 216 def upload_data2(hname, nid, keys) 217 con = Roma::Messaging::ConPool.instance.get_connection(nid) 218 219 cmd = "#{@pushv_cmd} #{hname} 0\r\n" 220 con.write(cmd) 221 res = con.gets # READY\r\n or error string 222 if res != "READY\r\n" 223 con.close 224 return res.chomp 225 end 226 227 n = keys.length 228 m = n / 100 229 m = 1 if m < 1 230 keys.each_with_index{|k,i| 231 print "#{i}/#{n}\r" if i%m == 0 232 data = @storage.get_raw2(k) 233 next unless data 234 d = Digest::SHA1.hexdigest(k).hex % (2**@rd.dgst_bits) 235 vn = d & @rd.search_mask 236 237 vn_old, last, clk, expt, val = data 238 # puts "old vn = #{vn_old}" 239 if val 240 wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}") 241 else 242 wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N") 243 end 244 245 con.write(wd) 246 sleep @stream_copy_wait_param 247 } 248 con.write("\0"*20) # end of steram 249 250 res = con.gets # STORED\r\n or error string 251 Roma::Messaging::ConPool.instance.return_connection(nid,con) 252 res.chomp! if res 253 res 254 rescue =>e 255 STDERR.puts "#{e}\n#{$@}" 256 nil 257 end