class Roma::Routing::ChurnbasedRoutingTable

Attributes

auto_recover[RW]
auto_recover_status[RW]
auto_recover_time[RW]
enabled_failover[RW]
event[RW]
event_limit_line[RW]
fname[R]
leave_proc[R]
log_fd[R]
log_name[R]
logs[RW]
lost_action[RW]
lost_proc[R]
min_version[R]
recover_proc[R]
trans[R]
version_of_nodes[R]

Public Class Methods

new(rd,fname) click to toggle source
Calls superclass method
   # File lib/roma/routing/cb_rttable.rb
29 def initialize(rd,fname)
30   super(rd)
31   @rd.nodes.sort!
32   @trans={}
33   @fname=fname
34   @leave_proc=nil
35   @lost_proc=nil
36   @recover_proc=nil
37   @lost_action=:no_action
38   @auto_recover=false
39   @auto_recover_status="waiting"
40   @auto_recover_time=1800
41   @event = []
42   @event_limit_line = 1000
43   @logs = []
44   @enabled_failover=false
45   @lock = Mutex.new
46   @version_of_nodes = Hash.new(0)
47   @min_version = nil
48   open_log
49 end

Public Instance Methods

add_node(nid) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
213 def add_node(nid)
214   unless @rd.nodes.include?(nid)
215     @rd.nodes << nid
216     @rd.nodes.sort!
217     write_log("join #{nid}")
218     set_event(nid, 'join')
219   end
220 end
can_i_recover?() click to toggle source
    # File lib/roma/routing/cb_rttable.rb
124 def can_i_recover?
125   @rd.nodes.length >= @rd.rn
126 end
can_i_release?(ap_str, rep_host) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
128 def can_i_release?(ap_str, rep_host)
129   buf = self.nodes
130   buf.delete(ap_str)
131   hosts = []
132 
133   unless rep_host
134     buf.each{ |node|
135       host = node.split(/[:_]/)[0]
136       hosts << host unless hosts.include?(host)
137     }
138   else
139     hosts = buf
140   end
141 
142   hosts.length < @rd.rn
143 end
close_log() click to toggle source
    # File lib/roma/routing/cb_rttable.rb
120 def close_log
121   @log_fd.close
122 end
commit(vn) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
334 def commit(vn)
335   return false unless @trans.key?(vn)
336   @lock.synchronize {
337     @rd.v_idx[vn]=@trans[vn][0]
338     @trans.delete(vn)
339     clk = @rd.v_clk[vn] + 1
340     @rd.v_clk[vn] = clk
341     @mtree.set(vn, @rd.v_idx[vn])
342     write_log_setroute(vn, clk, @rd.v_idx[vn])
343     return clk
344   }
345 end
delete_old_trans(sec=3600) click to toggle source

sec: elapsed-time

    # File lib/roma/routing/cb_rttable.rb
352 def delete_old_trans(sec=3600)
353   @trans.delete_if{|vn,val| val[1] < Time.now-sec }
354 end
each_vnode() { |k, v| ... } click to toggle source
    # File lib/roma/routing/cb_rttable.rb
303 def each_vnode
304   @rd.v_idx.each_pair{ |k, v| yield(k, v) }
305 end
enabled_failover=(b) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
222 def enabled_failover=(b)
223   @fail_cnt.clear
224   @enabled_failover=b
225 end
find_min_version() click to toggle source
   # File lib/roma/routing/cb_rttable.rb
70 def find_min_version
71   ret = 0xffffff
72   @version_of_nodes.each_value{|ver| ret = ver if ret > ver}
73   ret
74 end
get_stat(ap) click to toggle source
Calls superclass method
   # File lib/roma/routing/cb_rttable.rb
51 def get_stat(ap)
52   ret = super(ap)
53   ret['routing.lost_action'] = @lost_action.to_s
54   ret['routing.auto_recover'] = @auto_recover.to_s
55   ret['routing.auto_recover_status'] = @auto_recover_status.to_s
56   ret['routing.auto_recover_time'] = @auto_recover_time
57   ret['routing.event'] = @event
58   ret['routing.event_limit_line'] = @event_limit_line
59   ret['routing.version_of_nodes'] = @version_of_nodes.inspect
60   ret['routing.min_version'] = @min_version
61   ret['routing.enabled_failover'] = @enabled_failover
62   ret
63 end
has_node?(ap_str) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
382 def has_node?(ap_str)
383   self.each_vnode do |vn, nids|
384     return true if nids.include?(ap_str)
385   end
386   false
387 end
leave(nid) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
231 def leave(nid)
232   unless @enabled_failover
233     return
234   end
235   return unless @rd.nodes.include?(nid)
236 
237   @leave_proc.call(nid) if @leave_proc
238   @rd.nodes.delete(nid)
239   @version_of_nodes.delete(nid)
240   @min_version = find_min_version
241 
242   @log.warn("#{nid} just failed.")
243   write_log("leave #{nid}")
244   set_event(nid, __method__)
245 
246   lost_vnodes=[]
247   short_vnodes=[]
248   @lock.synchronize {
249     @rd.v_idx.each_pair{ |vn, nids|
250       buf = nids.clone
251       if buf.delete(nid)
252         set_route_and_inc_clk_inside_sync(vn, buf)
253         if buf.length == 0
254           lost_vnodes << vn
255           @log.error("Vnode data is lost.(Vnode=#{vn})")
256         elsif buf.length < @rd.rn
257           short_vnodes << vn
258         end
259       end
260     }
261   }
262   if lost_vnodes.length > 0
263     @lost_proc.call if @lost_proc
264     if @lost_action == :auto_assign
265       lost_vnodes.each{ |vn|
266         set_route_and_inc_clk_inside_sync( vn, next_alive_vnode(vn) )
267       }
268     end
269   elsif short_vnodes.length > 0
270     @log.error("Short vnodes exist.")
271     @recover_proc.call('start_auto_recover_process') if @recover_proc
272   end
273   @fail_cnt.delete(nid)
274 end
open_log() click to toggle source
    # File lib/roma/routing/cb_rttable.rb
 88 def open_log
 89   log_list=@rd.get_file_list(@fname)
 90   if log_list.length==0
 91     @log_name="#{@fname}.1"
 92   else
 93     if File::stat("#{@fname}.#{log_list.last[0]}").size == 0
 94       @log_name="#{@fname}.#{log_list.last[0]}"
 95     else
 96       @log_name="#{@fname}.#{log_list.last[0]+1}"
 97     end
 98   end
 99   @log_fd=File.open(@log_name,"a")
100 end
rollback(vn) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
347 def rollback(vn)
348   @trans.delete(vn)
349 end
sample_vnode(without_nodes) click to toggle source

vnode sampling without without_nodes

    # File lib/roma/routing/cb_rttable.rb
171 def sample_vnode(without_nodes)
172   short_idx = {}
173   idx = {}
174   @rd.v_idx.each_pair{|vn, nids|
175     unless list_include?(nids, without_nodes)
176       idx[vn] = nids
177       short_idx[vn] = nids if nids.length < @rd.rn
178     end
179   }
180   idx = short_idx if short_idx.length > 0
181 
182   ks = idx.keys
183   return nil if ks.length == 0
184   vn = ks[rand(ks.length)]
185   nids = idx[vn]
186   [vn, nids]
187 end
search_lost_vnodes(t) click to toggle source

Returns the list of losted-data vnode newer than argument time.

    # File lib/roma/routing/cb_rttable.rb
146 def search_lost_vnodes(t)
147   ret = []
148   @rd.each_log_all(@fname){|log_t,line|
149     next if t > log_t
150     s = line.split(/ /)
151     if s[0] == 'setroute' && s.length == 3
152       # vnode has a no pnode. therefor this vnode was lost.
153       ret << s[1].to_i
154     end
155   }
156   ret
157 end
search_nodes_for_write(vn) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
311 def search_nodes_for_write(vn)
312   return @trans[vn][0].clone if @trans.key?(vn)
313   @rd.v_idx[vn].clone
314 rescue
315   nil
316 end
search_nodes_with_clk(vn) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
318 def search_nodes_with_clk(vn)
319   @lock.synchronize {
320     return [@rd.v_clk[vn], @rd.v_idx[vn].clone]
321   }
322 rescue
323   nil
324 end
select_a_short_vnodes(exclued_nodes) click to toggle source

select a vnodes where short of redundancy.

    # File lib/roma/routing/cb_rttable.rb
160 def select_a_short_vnodes(exclued_nodes)
161   ret = []
162   @rd.v_idx.each_pair{|vn, nids|
163     if nids.length < @rd.rn && list_include?(nids,exclued_nodes) == false
164       ret << [vn,nids]
165     end
166   }
167   ret
168 end
set_leave_proc(&block) click to toggle source
   # File lib/roma/routing/cb_rttable.rb
76 def set_leave_proc(&block)
77   @leave_proc=block
78 end
set_lost_proc(&block) click to toggle source
   # File lib/roma/routing/cb_rttable.rb
80 def set_lost_proc(&block)
81   @lost_proc=block
82 end
set_recover_proc(&block) click to toggle source
   # File lib/roma/routing/cb_rttable.rb
84 def set_recover_proc(&block)
85   @recover_proc=block
86 end
set_route(vn, clk, nids) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
197 def set_route(vn, clk, nids)
198   return "#{vn} is not found." unless @rd.v_idx.key?(vn)
199   @lock.synchronize {
200     return "It's old table." if @rd.v_clk[vn] > clk
201     nids.each{ |nid|
202       add_node(nid) unless @rd.nodes.include?(nid)
203     }
204     @rd.v_idx[vn] = nids.clone
205     clk += 1
206     @rd.v_clk[vn] = clk
207     @mtree.set(vn, nids)
208     write_log_setroute(vn, clk, nids)
209     return clk
210   }
211 end
set_version(nid,ver) click to toggle source
   # File lib/roma/routing/cb_rttable.rb
65 def set_version(nid,ver)
66   @version_of_nodes[nid] = ver
67   @min_version = find_min_version
68 end
transaction(vn, nids) click to toggle source

vn: vnode-id nids: node-id list

    # File lib/roma/routing/cb_rttable.rb
328 def transaction(vn, nids)
329   return false if @trans.key?(vn)
330   @trans[vn]=[nids.clone, Time.now]
331   true
332 end
v_idx() click to toggle source
    # File lib/roma/routing/cb_rttable.rb
307 def v_idx
308   @rd.v_idx.clone
309 end
vnode_balance(ap) click to toggle source

Returns the status of vnode balance. ap: my address_port string(ex.“roma0_11211”)

    # File lib/roma/routing/cb_rttable.rb
358 def vnode_balance(ap)
359   # amount of primary at node = amount of vnode / amount of node
360   n = (2**div_bits) / nodes.length
361 
362   pcount = scount = 0
363   @rd.v_idx.each_pair{ |vn, nids|
364     next if nids == nil or nids.length == 0
365     if nids[0] == ap
366       pcount += 1
367     elsif nids.include?(ap)
368       scount += 1
369     end
370   }
371 
372   @log.debug("#{__method__}:n=#{n} pcount=#{pcount} scount=#{scount}")
373 
374   if pcount > n
375     return :over
376   elsif pcount < n
377     return :less
378   end
379   :even
380 end
write_log(line) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
108 def write_log(line)
109   # log rotation
110   if File::stat(@log_name).size > 1000 * 1024
111     close_log
112     open_log
113   end
114   t = Time.now
115   tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}"
116   @log_fd.write("#{tstr} #{line}\n")
117   @log_fd.flush
118 end
write_log_setroute(vn, clk, nids) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
102 def write_log_setroute(vn, clk, nids)
103   log="setroute #{vn} #{clk}"
104   nids.each{ |nid| log << " #{nid}" }
105   write_log(log)
106 end

Private Instance Methods

list_include?(list,nodes) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
189 def list_include?(list,nodes)
190   nodes.each{|nid|
191     return true if list.include?(nid)
192   }
193   false
194 end
next_alive_vnode(vn) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
294 def next_alive_vnode(vn)
295   svn = vn
296   while( (vn = @rd.next_vnode(vn)) != svn )
297     return @rd.v_idx[vn].clone if @rd.v_idx[vn].length != 0
298   end
299   []
300 end
set_event(nid, process) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
276 def set_event(nid, process)
277   t = Time.now
278   tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}"
279   @event.shift if @event.size >= @event_limit_line
280   @event << ("#{tstr} #{process} #{nid}")
281 end
set_route_and_inc_clk_inside_sync(vn, nodes) click to toggle source
    # File lib/roma/routing/cb_rttable.rb
284 def set_route_and_inc_clk_inside_sync(vn, nodes)
285   @rd.v_idx[vn] = nodes
286   clk = @rd.v_clk[vn] + 1
287   @rd.v_clk[vn] = clk
288   @mtree.set(vn, nodes)
289   write_log_setroute(vn, clk, nodes)
290   clk
291 end