class Roma::Routing::ChurnbasedRoutingTable
Attributes
auto_recover[RW]
auto_recover_status[RW]
auto_recover_time[RW]
enabled_failover[RW]
event[RW]
event_limit_line[RW]
fname[R]
leave_proc[R]
log_fd[R]
log_name[R]
logs[RW]
lost_action[RW]
lost_proc[R]
min_version[R]
recover_proc[R]
trans[R]
version_of_nodes[R]
Public Class Methods
new(rd,fname)
click to toggle source
Calls superclass method
# File lib/roma/routing/cb_rttable.rb 29 def initialize(rd,fname) 30 super(rd) 31 @rd.nodes.sort! 32 @trans={} 33 @fname=fname 34 @leave_proc=nil 35 @lost_proc=nil 36 @recover_proc=nil 37 @lost_action=:no_action 38 @auto_recover=false 39 @auto_recover_status="waiting" 40 @auto_recover_time=1800 41 @event = [] 42 @event_limit_line = 1000 43 @logs = [] 44 @enabled_failover=false 45 @lock = Mutex.new 46 @version_of_nodes = Hash.new(0) 47 @min_version = nil 48 open_log 49 end
Public Instance Methods
add_node(nid)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 213 def add_node(nid) 214 unless @rd.nodes.include?(nid) 215 @rd.nodes << nid 216 @rd.nodes.sort! 217 write_log("join #{nid}") 218 set_event(nid, 'join') 219 end 220 end
can_i_recover?()
click to toggle source
# File lib/roma/routing/cb_rttable.rb 124 def can_i_recover? 125 @rd.nodes.length >= @rd.rn 126 end
can_i_release?(ap_str, rep_host)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 128 def can_i_release?(ap_str, rep_host) 129 buf = self.nodes 130 buf.delete(ap_str) 131 hosts = [] 132 133 unless rep_host 134 buf.each{ |node| 135 host = node.split(/[:_]/)[0] 136 hosts << host unless hosts.include?(host) 137 } 138 else 139 hosts = buf 140 end 141 142 hosts.length < @rd.rn 143 end
close_log()
click to toggle source
# File lib/roma/routing/cb_rttable.rb 120 def close_log 121 @log_fd.close 122 end
commit(vn)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 334 def commit(vn) 335 return false unless @trans.key?(vn) 336 @lock.synchronize { 337 @rd.v_idx[vn]=@trans[vn][0] 338 @trans.delete(vn) 339 clk = @rd.v_clk[vn] + 1 340 @rd.v_clk[vn] = clk 341 @mtree.set(vn, @rd.v_idx[vn]) 342 write_log_setroute(vn, clk, @rd.v_idx[vn]) 343 return clk 344 } 345 end
delete_old_trans(sec=3600)
click to toggle source
sec
: elapsed-time
# File lib/roma/routing/cb_rttable.rb 352 def delete_old_trans(sec=3600) 353 @trans.delete_if{|vn,val| val[1] < Time.now-sec } 354 end
each_vnode() { |k, v| ... }
click to toggle source
# File lib/roma/routing/cb_rttable.rb 303 def each_vnode 304 @rd.v_idx.each_pair{ |k, v| yield(k, v) } 305 end
enabled_failover=(b)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 222 def enabled_failover=(b) 223 @fail_cnt.clear 224 @enabled_failover=b 225 end
find_min_version()
click to toggle source
# File lib/roma/routing/cb_rttable.rb 70 def find_min_version 71 ret = 0xffffff 72 @version_of_nodes.each_value{|ver| ret = ver if ret > ver} 73 ret 74 end
get_stat(ap)
click to toggle source
Calls superclass method
# File lib/roma/routing/cb_rttable.rb 51 def get_stat(ap) 52 ret = super(ap) 53 ret['routing.lost_action'] = @lost_action.to_s 54 ret['routing.auto_recover'] = @auto_recover.to_s 55 ret['routing.auto_recover_status'] = @auto_recover_status.to_s 56 ret['routing.auto_recover_time'] = @auto_recover_time 57 ret['routing.event'] = @event 58 ret['routing.event_limit_line'] = @event_limit_line 59 ret['routing.version_of_nodes'] = @version_of_nodes.inspect 60 ret['routing.min_version'] = @min_version 61 ret['routing.enabled_failover'] = @enabled_failover 62 ret 63 end
has_node?(ap_str)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 382 def has_node?(ap_str) 383 self.each_vnode do |vn, nids| 384 return true if nids.include?(ap_str) 385 end 386 false 387 end
leave(nid)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 231 def leave(nid) 232 unless @enabled_failover 233 return 234 end 235 return unless @rd.nodes.include?(nid) 236 237 @leave_proc.call(nid) if @leave_proc 238 @rd.nodes.delete(nid) 239 @version_of_nodes.delete(nid) 240 @min_version = find_min_version 241 242 @log.warn("#{nid} just failed.") 243 write_log("leave #{nid}") 244 set_event(nid, __method__) 245 246 lost_vnodes=[] 247 short_vnodes=[] 248 @lock.synchronize { 249 @rd.v_idx.each_pair{ |vn, nids| 250 buf = nids.clone 251 if buf.delete(nid) 252 set_route_and_inc_clk_inside_sync(vn, buf) 253 if buf.length == 0 254 lost_vnodes << vn 255 @log.error("Vnode data is lost.(Vnode=#{vn})") 256 elsif buf.length < @rd.rn 257 short_vnodes << vn 258 end 259 end 260 } 261 } 262 if lost_vnodes.length > 0 263 @lost_proc.call if @lost_proc 264 if @lost_action == :auto_assign 265 lost_vnodes.each{ |vn| 266 set_route_and_inc_clk_inside_sync( vn, next_alive_vnode(vn) ) 267 } 268 end 269 elsif short_vnodes.length > 0 270 @log.error("Short vnodes exist.") 271 @recover_proc.call('start_auto_recover_process') if @recover_proc 272 end 273 @fail_cnt.delete(nid) 274 end
open_log()
click to toggle source
# File lib/roma/routing/cb_rttable.rb 88 def open_log 89 log_list=@rd.get_file_list(@fname) 90 if log_list.length==0 91 @log_name="#{@fname}.1" 92 else 93 if File::stat("#{@fname}.#{log_list.last[0]}").size == 0 94 @log_name="#{@fname}.#{log_list.last[0]}" 95 else 96 @log_name="#{@fname}.#{log_list.last[0]+1}" 97 end 98 end 99 @log_fd=File.open(@log_name,"a") 100 end
rollback(vn)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 347 def rollback(vn) 348 @trans.delete(vn) 349 end
sample_vnode(without_nodes)
click to toggle source
vnode sampling without without_nodes
# File lib/roma/routing/cb_rttable.rb 171 def sample_vnode(without_nodes) 172 short_idx = {} 173 idx = {} 174 @rd.v_idx.each_pair{|vn, nids| 175 unless list_include?(nids, without_nodes) 176 idx[vn] = nids 177 short_idx[vn] = nids if nids.length < @rd.rn 178 end 179 } 180 idx = short_idx if short_idx.length > 0 181 182 ks = idx.keys 183 return nil if ks.length == 0 184 vn = ks[rand(ks.length)] 185 nids = idx[vn] 186 [vn, nids] 187 end
search_lost_vnodes(t)
click to toggle source
Returns the list of losted-data vnode newer than argument time.
# File lib/roma/routing/cb_rttable.rb 146 def search_lost_vnodes(t) 147 ret = [] 148 @rd.each_log_all(@fname){|log_t,line| 149 next if t > log_t 150 s = line.split(/ /) 151 if s[0] == 'setroute' && s.length == 3 152 # vnode has a no pnode. therefor this vnode was lost. 153 ret << s[1].to_i 154 end 155 } 156 ret 157 end
search_nodes_for_write(vn)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 311 def search_nodes_for_write(vn) 312 return @trans[vn][0].clone if @trans.key?(vn) 313 @rd.v_idx[vn].clone 314 rescue 315 nil 316 end
search_nodes_with_clk(vn)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 318 def search_nodes_with_clk(vn) 319 @lock.synchronize { 320 return [@rd.v_clk[vn], @rd.v_idx[vn].clone] 321 } 322 rescue 323 nil 324 end
select_a_short_vnodes(exclued_nodes)
click to toggle source
select a vnodes where short of redundancy.
# File lib/roma/routing/cb_rttable.rb 160 def select_a_short_vnodes(exclued_nodes) 161 ret = [] 162 @rd.v_idx.each_pair{|vn, nids| 163 if nids.length < @rd.rn && list_include?(nids,exclued_nodes) == false 164 ret << [vn,nids] 165 end 166 } 167 ret 168 end
set_leave_proc(&block)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 76 def set_leave_proc(&block) 77 @leave_proc=block 78 end
set_lost_proc(&block)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 80 def set_lost_proc(&block) 81 @lost_proc=block 82 end
set_recover_proc(&block)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 84 def set_recover_proc(&block) 85 @recover_proc=block 86 end
set_route(vn, clk, nids)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 197 def set_route(vn, clk, nids) 198 return "#{vn} is not found." unless @rd.v_idx.key?(vn) 199 @lock.synchronize { 200 return "It's old table." if @rd.v_clk[vn] > clk 201 nids.each{ |nid| 202 add_node(nid) unless @rd.nodes.include?(nid) 203 } 204 @rd.v_idx[vn] = nids.clone 205 clk += 1 206 @rd.v_clk[vn] = clk 207 @mtree.set(vn, nids) 208 write_log_setroute(vn, clk, nids) 209 return clk 210 } 211 end
set_version(nid,ver)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 65 def set_version(nid,ver) 66 @version_of_nodes[nid] = ver 67 @min_version = find_min_version 68 end
transaction(vn, nids)
click to toggle source
vn
: vnode-id nids
: node-id list
# File lib/roma/routing/cb_rttable.rb 328 def transaction(vn, nids) 329 return false if @trans.key?(vn) 330 @trans[vn]=[nids.clone, Time.now] 331 true 332 end
v_idx()
click to toggle source
# File lib/roma/routing/cb_rttable.rb 307 def v_idx 308 @rd.v_idx.clone 309 end
vnode_balance(ap)
click to toggle source
Returns the status of vnode balance. ap
: my address_port string(ex.“roma0_11211”)
# File lib/roma/routing/cb_rttable.rb 358 def vnode_balance(ap) 359 # amount of primary at node = amount of vnode / amount of node 360 n = (2**div_bits) / nodes.length 361 362 pcount = scount = 0 363 @rd.v_idx.each_pair{ |vn, nids| 364 next if nids == nil or nids.length == 0 365 if nids[0] == ap 366 pcount += 1 367 elsif nids.include?(ap) 368 scount += 1 369 end 370 } 371 372 @log.debug("#{__method__}:n=#{n} pcount=#{pcount} scount=#{scount}") 373 374 if pcount > n 375 return :over 376 elsif pcount < n 377 return :less 378 end 379 :even 380 end
write_log(line)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 108 def write_log(line) 109 # log rotation 110 if File::stat(@log_name).size > 1000 * 1024 111 close_log 112 open_log 113 end 114 t = Time.now 115 tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}" 116 @log_fd.write("#{tstr} #{line}\n") 117 @log_fd.flush 118 end
write_log_setroute(vn, clk, nids)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 102 def write_log_setroute(vn, clk, nids) 103 log="setroute #{vn} #{clk}" 104 nids.each{ |nid| log << " #{nid}" } 105 write_log(log) 106 end
Private Instance Methods
list_include?(list,nodes)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 189 def list_include?(list,nodes) 190 nodes.each{|nid| 191 return true if list.include?(nid) 192 } 193 false 194 end
next_alive_vnode(vn)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 294 def next_alive_vnode(vn) 295 svn = vn 296 while( (vn = @rd.next_vnode(vn)) != svn ) 297 return @rd.v_idx[vn].clone if @rd.v_idx[vn].length != 0 298 end 299 [] 300 end
set_event(nid, process)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 276 def set_event(nid, process) 277 t = Time.now 278 tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}" 279 @event.shift if @event.size >= @event_limit_line 280 @event << ("#{tstr} #{process} #{nid}") 281 end
set_route_and_inc_clk_inside_sync(vn, nodes)
click to toggle source
# File lib/roma/routing/cb_rttable.rb 284 def set_route_and_inc_clk_inside_sync(vn, nodes) 285 @rd.v_idx[vn] = nodes 286 clk = @rd.v_clk[vn] + 1 287 @rd.v_clk[vn] = clk 288 @mtree.set(vn, nodes) 289 write_log_setroute(vn, clk, nodes) 290 clk 291 end