module Roma::AsyncProcess

Public Class Methods

queue() click to toggle source
   # File lib/roma/async_process.rb
37 def self.queue
38   @@async_queue
39 end
queue_latency() click to toggle source
   # File lib/roma/async_process.rb
41 def self.queue_latency
42   @@async_queue_latency
43 end

Public Instance Methods

start_async_process() click to toggle source
   # File lib/roma/async_process.rb
45 def start_async_process
46   @async_thread = Thread.new do
47     async_process_loop
48   end
49   @async_thread[:name] = __method__
50 
51   @async_thread_latency = Thread.new do
52     async_process_loop_for_latency
53   end
54   @async_thread_latency[:name] = __method__
55 rescue => e
56   @log.error("#{e}\n#{$ERROR_POSITION}")
57 end

Private Instance Methods

acquired_recover_process() click to toggle source
    # File lib/roma/async_process.rb
359 def acquired_recover_process
360   @log.info("#{__method__}:start")
361 
362   exclude_nodes = @rttable.exclude_nodes_for_recover(@stats.ap_str, @stats.rep_host)
363 
364   @do_acquired_recover_process = true
365   loop do
366     break unless @do_acquired_recover_process
367     break if @rttable.num_of_vn(@stats.ap_str)[2] == 0 # short vnodes
368 
369     vn, nodes, is_primary = @rttable.select_vn_for_recover(exclude_nodes, @stats.rep_host)
370     break unless vn
371 
372     if nodes.length != 0
373       ret = req_push_a_vnode(vn, nodes[0], is_primary)
374       if ret == :rejected
375         sleep 1
376       elsif ret == false
377         break
378       end
379       sleep 1
380     end
381   end
382   @log.info("#{__method__} has done.")
383 rescue => e
384   @log.error("#{e.inspect} #{$ERROR_POSITION}")
385 ensure
386   @do_acquired_recover_process = false
387 end
async_process_loop() click to toggle source
    # File lib/roma/async_process.rb
 77 def async_process_loop
 78   loop do
 79     while msg = @@async_queue.pop
 80       if send("asyncev_#{msg.event}", msg.args)
 81         msg.callback.call(msg, true) if msg.callback
 82       else
 83         if msg.retry?
 84           t = Thread.new do
 85             msg.wait
 86             msg.incr_count
 87             @@async_queue.push(msg)
 88           end
 89           t[:name] = __method__
 90         else
 91           @log.error("async process retry out:#{msg.inspect}")
 92           msg.callback.call(msg, false) if msg.callback
 93         end
 94       end
 95     end
 96   end
 97 rescue => e
 98   @log.error("#{e}\n#{$ERROR_POSITION}")
 99   retry
100 end
async_process_loop_for_latency() click to toggle source
    # File lib/roma/async_process.rb
102 def async_process_loop_for_latency
103   loop do
104     while msg = @@async_queue_latency.pop
105       if send("asyncev_#{msg.event}", msg.args)
106         msg.callback.call(msg, true) if msg.callback
107       else
108         if msg.retry?
109           t = Thread.new do
110             msg.wait
111             msg.incr_count
112             @@async_queue_latency.push(msg)
113           end
114           t[:name] = __method__
115         else
116           @log.error("async process retry out:#{msg.inspect}")
117           msg.callback.call(msg, false) if msg.callback
118         end
119       end
120     end
121   end
122 rescue => e
123   @log.error("#{e}\n#{$ERROR_POSITION}")
124   retry
125 end
asyncev_broadcast_cmd(args) click to toggle source
    # File lib/roma/async_process.rb
127 def asyncev_broadcast_cmd(args)
128   @log.debug("#{__method__} #{args.inspect}")
129   cmd, nids, tout = args
130   t = Thread.new do
131     async_broadcast_cmd("#{cmd}\r\n", nids, tout)
132   end
133   t[:name] = __method__
134   true
135 end
asyncev_calc_latency_average(args) click to toggle source
    # File lib/roma/async_process.rb
762 def asyncev_calc_latency_average(args)
763   latency, cmd = args
764   # @log.debug(__method__)
765 
766   unless @stats.latency_data.key?(cmd) # only first execute target cmd
767     @stats.latency_data[cmd].store('latency', [])
768     @stats.latency_data[cmd].store('latency_max', {})
769     @stats.latency_data[cmd]['latency_max'].store('current', 0)
770     @stats.latency_data[cmd].store('latency_min', {})
771     @stats.latency_data[cmd]['latency_min'].store('current', 99_999)
772     @stats.latency_data[cmd].store('time', Time.now.to_i)
773   end
774 
775   begin
776     @stats.latency_data[cmd]['latency'].delete_at(0) if @stats.latency_data[cmd]['latency'].length >= 10
777     @stats.latency_data[cmd]['latency'].push(latency)
778 
779     @stats.latency_data[cmd]['latency_max']['current'] = latency if latency > @stats.latency_data[cmd]['latency_max']['current']
780     @stats.latency_data[cmd]['latency_min']['current'] = latency if latency < @stats.latency_data[cmd]['latency_min']['current']
781 
782   rescue => e
783     @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
784 
785   ensure
786     if @stats.latency_check_time_count && Time.now.to_i - @stats.latency_data[cmd]['time'] > @stats.latency_check_time_count
787       average = @stats.latency_data[cmd]['latency'].inject(0.0) { |r, i| r += i } / @stats.latency_data[cmd]['latency'].size
788       max = @stats.latency_data[cmd]['latency_max']['current']
789       min = @stats.latency_data[cmd]['latency_min']['current']
790       @log.debug("Latency average[#{cmd}]: #{sprintf('%.8f', average)}"\
791                  "(denominator=#{@stats.latency_data[cmd]['latency'].length}"\
792                  " max=#{sprintf('%.8f', max)}"\
793                  " min=#{sprintf('%.8f', min)})"
794                 )
795 
796       @stats.latency_data[cmd]['time'] =  Time.now.to_i
797       @stats.latency_data[cmd]['latency_past'] = @stats.latency_data[cmd]['latency']
798       @stats.latency_data[cmd]['latency'] = []
799       @stats.latency_data[cmd]['latency_max']['past'] = @stats.latency_data[cmd]['latency_max']['current']
800       @stats.latency_data[cmd]['latency_max']['current'] = 0
801       @stats.latency_data[cmd]['latency_min']['past'] = @stats.latency_data[cmd]['latency_min']['current']
802       @stats.latency_data[cmd]['latency_min']['current'] = 99_999
803     end
804   end
805   true
806 end
asyncev_rdelete(args) click to toggle source
    # File lib/roma/async_process.rb
224 def asyncev_rdelete(args)
225   nid, hname, k, clk = args
226   @log.debug("#{__method__} #{args.inspect}")
227   unless @rttable.nodes.include?(nid)
228     @log.warn("async rdelete failed:#{nid} does not found in routing table.#{k}\e#{hname} #{clk}")
229     return true # no retry
230   end
231   res = async_send_cmd(nid, "rdelete #{k}\e#{hname} #{clk}\r\n", 10)
232   unless res
233     @log.warn("async redundant failed:#{k}\e#{hname} #{clk} -> #{nid}")
234     return false # retry
235   end
236   true
237 end
asyncev_redundant(args) click to toggle source
    # File lib/roma/async_process.rb
194 def asyncev_redundant(args)
195   nid, hname, k, d, clk, expt, v = args
196   @log.debug("#{__method__} #{args.inspect}")
197   unless @rttable.nodes.include?(nid)
198     @log.warn("async redundant failed:#{nid} does not found in routing table.#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}")
199     return true # no retry
200   end
201   res = async_send_cmd(nid, "rset #{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}\r\n#{v}\r\n", 10)
202   if res.nil? || res.start_with?('ERROR')
203     @log.warn("async redundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}")
204     return false # retry
205   end
206   true
207 end
asyncev_reqpushv(args) click to toggle source
    # File lib/roma/async_process.rb
239 def asyncev_reqpushv(args)
240   vn, nid, p = args
241   @log.debug("#{__method__} #{args.inspect}")
242   if @stats.run_iterate_storage
243     @log.warn("#{__method__}:already be iterated storage process.")
244   else
245     @stats.run_iterate_storage = true
246     t = Thread.new do
247       begin
248         sync_a_vnode(vn.to_i, nid, p == 'true')
249       rescue => e
250         @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
251       ensure
252         @stats.run_iterate_storage = false
253       end
254     end
255     t[:name] = __method__
256   end
257 end
asyncev_start_auto_recover_process(args) click to toggle source
    # File lib/roma/async_process.rb
286 def asyncev_start_auto_recover_process(args)
287   @log.debug("#{__method__} #{args.inspect}")
288   # ##run_join don't have possibility to be true in this case.
289   # if @stats.run_join
290   #  @log.error("#{__method__}:join process running")
291   #  return true
292   # end
293   if @stats.run_recover
294     @log.error("#{__method__}:recover process running.")
295     return false
296   end
297   if @stats.run_balance
298     @log.error("#{__method__}:balance process running")
299     return true
300   end
301 
302   @rttable.auto_recover_status = 'preparing'
303   t = Thread.new do
304     begin
305       timeout(@rttable.auto_recover_time)do
306         loop do
307           sleep 1
308           break if @rttable.auto_recover_status != 'preparing'
309           # break if @stats.run_join #run_join don't have possibility to be true in this case.
310           break if @stats.run_recover
311           break if @stats.run_balance
312         end
313       end
314       @log.debug('inactivated AUTO_RECOVER')
315     rescue
316       case @rttable.lost_action
317         when :auto_assign, :shutdown
318           @stats.run_recover = true
319           @rttable.auto_recover_status = 'executing'
320           begin
321             @log.debug('auto recover start')
322             acquired_recover_process
323           rescue => e
324             @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
325           ensure
326             @stats.run_recover = false
327             @rttable.auto_recover_status = 'waiting'
328           end
329         when :no_action
330           @log.debug('auto recover NOT start. Because lost action is [no_action]')
331       end
332     end
333   end
334   t[:name] = __method__
335 end
asyncev_start_balance_process(_args) click to toggle source
    # File lib/roma/async_process.rb
166 def asyncev_start_balance_process(_args)
167   @log.debug(__method__)
168   if @stats.run_join
169     @log.error("#{__method__}:join process running")
170     return true
171   end
172   if @stats.run_recover
173     @log.error("#{__method__}:recover process running")
174     return true
175   end
176   if @stats.run_balance
177     @log.error("#{__method__}:balance process running")
178     return true
179   end
180   @stats.run_balance = true
181   t = Thread.new do
182     begin
183       balance_process
184     rescue => e
185       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
186     ensure
187       @stats.run_balance = false
188     end
189   end
190   t[:name] = __method__
191   true
192 end
asyncev_start_get_logs(args) click to toggle source
    # File lib/roma/async_process.rb
925 def asyncev_start_get_logs(args)
926   @log.debug("#{__method__} #{args}")
927   t = Thread.new do
928     begin
929       get_logs(args)
930     rescue => e
931       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
932     ensure
933       @stats.gui_run_gather_logs = false
934     end
935   end
936   t[:name] = __method__
937 end
asyncev_start_get_routing_event(args) click to toggle source
    # File lib/roma/async_process.rb
892 def asyncev_start_get_routing_event(args)
893   @log.debug("#{__method__} #{args}")
894   t = Thread.new do
895     begin
896       get_routing_event
897     rescue => e
898       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
899     ensure
900     end
901   end
902   t[:name] = __method__
903 end
asyncev_start_join_process(_args) click to toggle source
    # File lib/roma/async_process.rb
137 def asyncev_start_join_process(_args)
138   @log.debug(__method__)
139   if @stats.run_join
140     @log.error("#{__method__}:join process running")
141     return true
142   end
143   if @stats.run_recover
144     @log.error("#{__method__}:recover process running")
145     return true
146   end
147   if @stats.run_balance
148     @log.error("#{__method__}:balance process running")
149     return true
150   end
151   @stats.run_join = true
152   t = Thread.new do
153     begin
154       join_process
155     rescue => e
156       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
157     ensure
158       @stats.run_join = false
159       @stats.join_ap = nil
160     end
161   end
162   t[:name] = __method__
163   true
164 end
asyncev_start_recover_process(args) click to toggle source
    # File lib/roma/async_process.rb
259 def asyncev_start_recover_process(args)
260   @log.debug("#{__method__} #{args.inspect}")
261   if @stats.run_join
262     @log.error("#{__method__}:join process running")
263     return true
264   end
265   if @stats.run_recover
266     @log.error("#{__method__}:recover process running.")
267     return false
268   end
269   if @stats.run_balance
270     @log.error("#{__method__}:balance process running")
271     return true
272   end
273   @stats.run_recover = true
274   t = Thread.new do
275     begin
276       acquired_recover_process
277     rescue => e
278       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
279     ensure
280       @stats.run_recover = false
281     end
282   end
283   t[:name] = __method__
284 end
asyncev_start_release_process(args) click to toggle source
    # File lib/roma/async_process.rb
337 def asyncev_start_release_process(args)
338   @log.debug("#{__method__} #{args}")
339   if @stats.run_iterate_storage
340     @log.warn("#{__method__}:already be iterated storage process.")
341   else
342     @stats.run_release = true
343     @stats.run_iterate_storage = true
344     @stats.spushv_protection = true
345     t = Thread.new do
346       begin
347         release_process
348       rescue => e
349         @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
350       ensure
351         @stats.run_iterate_storage = false
352         @stats.run_release = false
353       end
354     end
355     t[:name] = __method__
356   end
357 end
asyncev_start_replicate_existing_data_process(args) click to toggle source
     # File lib/roma/async_process.rb
1047 def asyncev_start_replicate_existing_data_process(args)
1048   # args is [$roma.cr_writer.replica_rttable])
1049   t = Thread.new do
1050     begin
1051       $roma.cr_writer.run_existing_data_replication = true
1052       replicate_existing_data_process(args)
1053     rescue => e
1054       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
1055     ensure
1056       $roma.cr_writer.run_existing_data_replication = false
1057     end
1058   end
1059   t[:name] = __method__
1060 end
asyncev_start_storage_cachecleaning_process(args) click to toggle source
    # File lib/roma/async_process.rb
831 def asyncev_start_storage_cachecleaning_process(args)
832   hname, dn = args
833   @log.debug("#{__method__} #{args.inspect}")
834 
835   st = @storages[hname]
836   if st.dbs[dn] != :cachecleaning
837     @log.error("Can not start cachecleaning process. stat = #{st.dbs[dn]}")
838     return true
839   end
840   t = Thread.new do
841     begin
842       storage_cachecleaning_process(hname, dn)
843     rescue => e
844       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
845     ensure
846     end
847   end
848   t[:name] = __method__
849   true
850 end
asyncev_start_storage_clean_up_process(_args) click to toggle source
    # File lib/roma/async_process.rb
688 def asyncev_start_storage_clean_up_process(_args)
689   #      @log.info("#{__method__}")
690   if @stats.run_storage_clean_up
691     @log.error("#{__method__}:already in being")
692     return
693   end
694   @stats.run_storage_clean_up = true
695   t = Thread.new do
696     begin
697       storage_clean_up_process
698     rescue => e
699       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
700     ensure
701       @stats.last_clean_up = Time.now
702       @stats.run_storage_clean_up = false
703     end
704   end
705   t[:name] = __method__
706 end
asyncev_start_storage_flush_process(args) click to toggle source
    # File lib/roma/async_process.rb
808 def asyncev_start_storage_flush_process(args)
809   hname, dn = args
810   @log.debug("#{__method__} #{args.inspect}")
811 
812   st = @storages[hname]
813   if st.dbs[dn] != :safecopy_flushing
814     @log.error("Can not flush storage. stat = #{st.dbs[dn]}")
815     return true
816   end
817   t = Thread.new do
818     begin
819       st.flush_db(dn)
820       st.set_db_stat(dn, :safecopy_flushed)
821       @log.info("#{__method__}:storage has flushed. (#{hname}, #{dn})")
822     rescue => e
823       @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
824     ensure
825     end
826   end
827   t[:name] = __method__
828   true
829 end
asyncev_zredundant(args) click to toggle source
    # File lib/roma/async_process.rb
209 def asyncev_zredundant(args)
210   nid, hname, k, d, clk, expt, zv = args
211   @log.debug("#{__method__} #{args.inspect}")
212   unless @rttable.nodes.include?(nid)
213     @log.warn("async zredundant failed:#{nid} does not found in routing table.#{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}")
214     return true # no retry
215   end
216   res = async_send_cmd(nid, "rzset #{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}\r\n#{zv}\r\n", 10)
217   if res.nil? || res.start_with?('ERROR')
218     @log.warn("async zredundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}")
219     return false # retry
220   end
221   true
222 end
balance_process() click to toggle source
    # File lib/roma/async_process.rb
419 def balance_process
420   @log.info("#{__method__}:start")
421   count = 0
422   nv = @rttable.v_idx.length
423   exclude_nodes = @rttable.exclude_nodes_for_balance(@stats.ap_str, @stats.rep_host)
424 
425   @do_balance_process = true
426   while @rttable.vnode_balance(@stats.ap_str) == :less && count < nv
427     break unless @do_balance_process
428 
429     vn, nodes, is_primary = @rttable.select_vn_for_balance(exclude_nodes, @stats.rep_host)
430     unless vn
431       @log.warn("#{__method__}:vnode does not found")
432       return false
433     end
434     ret = req_push_a_vnode(vn, nodes[0], is_primary)
435     if ret == :rejected
436       sleep 5
437     else
438       sleep 1
439       count += 1
440     end
441   end
442   @log.info("#{__method__} has done.")
443 rescue => e
444   @log.error("#{e.inspect} #{$ERROR_POSITION}")
445 ensure
446   @do_balance_process = false
447 end
edit_nodes(nodes, new_nid, is_primary) click to toggle source
    # File lib/roma/async_process.rb
612 def edit_nodes(nodes, new_nid, is_primary)
613   if @rttable.rn == 1
614     return [new_nid]
615   end
616   # [node_a, node_b, new_nid]
617   nodes.delete(new_nid)
618   # [node_a, node_b]
619 
620   if nodes.length >= @rttable.rn
621     host = new_nid.split(/[:_]/)[0]
622     buf = [] # list of a same host
623     nodes.each do |nid|
624       buf << nid if nid.split(/[:_]/)[0] == host
625     end
626     if buf.length > 0
627       # include same host
628       # delete a last one, due to save a primary node
629       nodes.delete(buf.last)
630     else
631       nodes.delete(nodes.last)
632     end
633   end
634 
635   if is_primary
636     # [new_nid, node_a]
637     nodes.insert(0, new_nid)
638   else
639     # [node_a, new_nid]
640     nodes << new_nid
641   end
642   nodes
643 end
get_logs(args) click to toggle source
    # File lib/roma/async_process.rb
939 def get_logs(args)
940   @log.debug("#{__method__}:start.")
941 
942   log_path =  Config::LOG_PATH
943   log_file = "#{log_path}/#{@stats.ap_str}.log"
944 
945   target_logs = []
946   File.open(log_file)do|f|
947     start_point = get_point(f, args[0], 'start')
948     end_point = get_point(f, args[1], 'end')
949 
950     ## read target logs
951     f.seek(start_point, IO::SEEK_SET)
952     target_logs = f.read(end_point - start_point)
953     target_logs = target_logs.each_line.map(&:chomp)
954     target_logs.delete('.')
955   end
956 
957   @rttable.logs = target_logs
958   # set gathered date for expiration
959   @rttable.logs.unshift(Time.now)
960 
961   @log.debug("#{__method__} has done.")
962 rescue => e
963   @rttable.logs = []
964   @log.error("#{e}\n#{$ERROR_POSITION}")
965 ensure
966   @stats.gui_run_gather_logs = false
967 end
get_point(f, target_time, type, latency_time = Time.now, current_pos = 0, new_pos = f.size / 2) click to toggle source
     # File lib/roma/async_process.rb
 969 def get_point(f, target_time, type, latency_time = Time.now, current_pos = 0, new_pos = f.size / 2)
 970   # hilatency check
 971   ps = Time.now - latency_time
 972   if ps > 5
 973     @log.warn('gather_logs process was failed.')
 974     fail
 975   end
 976 
 977   # initialize read size
 978   read_size = 2048
 979 
 980   # first check
 981   unless target_time.class == Time
 982     # in case of not set end_date
 983     return f.size if target_time == 'current'
 984 
 985     target_time =~ (/(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)/)
 986     target_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], 000000)
 987 
 988     # check outrange or not
 989     f.seek(0, IO::SEEK_SET)
 990     begining_log = f.read(read_size)
 991     pos = begining_log.index(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/)
 992     begining_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], Regexp.last_match[7])
 993 
 994     f.seek(-read_size, IO::SEEK_END)
 995     end_log = f.read(read_size)
 996     pos = end_log.rindex(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/)
 997     end_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], Regexp.last_match[7])
 998 
 999     case type
1000     when 'start'
1001       if target_time < begining_time
1002         return 0
1003       elsif target_time > end_time
1004         @log.error('irregular time was set.')
1005         fail
1006       end
1007     when 'end'
1008       if target_time > end_time
1009         return f.size
1010       elsif target_time < begining_time
1011         @log.error('irregular time was set.')
1012         fail
1013       end
1014     end
1015   end
1016 
1017   # read half sector size
1018   f.seek(new_pos, IO::SEEK_SET)
1019   sector_log = f.read(read_size)
1020   # grep date
1021   date_a = sector_log.scan(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/)
1022 
1023   time_a = []
1024   date_a.each do|time|
1025     time_a.push(Time.mktime(time[0], time[1], time[2], time[3], time[4], time[5], time[6]))
1026   end
1027   sector_time_first = time_a[0]
1028   sector_time_last = time_a[-1]
1029 
1030   if target_time.between?(sector_time_first, sector_time_last)
1031     time_a.each do|time|
1032       if target_time <= time
1033         time_string = time.strftime('%Y-%m-%dT%H:%M:%S')
1034         target_index = sector_log.index(/[IDEW],\s\[#{time_string}/)
1035         return new_pos + target_index
1036       end
1037     end
1038   elsif sector_time_first > target_time
1039     target_pos = new_pos - ((new_pos - current_pos).abs / 2)
1040   elsif sector_time_first < target_time
1041     target_pos = new_pos + ((new_pos - current_pos).abs / 2)
1042   end
1043 
1044   get_point(f, target_time, type, latency_time, new_pos, target_pos)
1045 end
get_routing_event() click to toggle source
    # File lib/roma/async_process.rb
905 def get_routing_event
906   @log.info("#{__method__}:start.")
907 
908   routing_path = Config::RTTABLE_PATH
909   f_list = Dir.glob("#{routing_path}/#{@stats.ap_str}*")
910 
911   f_list.each do|fname|
912     IO.foreach(fname)do|line|
913       if line =~ /join|leave/
914         @rttable.event.shift if @rttable.event.size >= @rttable.event_limit_line
915         @rttable.event << line.chomp
916       end
917     end
918   end
919 
920   @log.info("#{__method__} has done.")
921 rescue => e
922   @log.error("#{e}\n#{$ERROR_POSITION}")
923 end
join_process() click to toggle source
    # File lib/roma/async_process.rb
389 def join_process
390   @log.info("#{__method__}:start")
391   count = 0
392   nv = @rttable.v_idx.length
393   exclude_nodes = @rttable.exclude_nodes_for_join(@stats.ap_str, @stats.rep_host)
394 
395   @do_join_process = true
396   while @rttable.vnode_balance(@stats.ap_str) == :less && count < nv
397     break unless @do_join_process
398 
399     vn, nodes, is_primary = @rttable.select_vn_for_join(exclude_nodes, @stats.rep_host)
400     unless vn
401       @log.warn("#{__method__}:vnode does not found")
402       return false
403     end
404     ret = req_push_a_vnode(vn, nodes[0], is_primary)
405     if ret == :rejected
406       sleep 5
407     else
408       sleep 1
409       count += 1
410     end
411   end
412 rescue => e
413   @log.error("#{e.inspect} #{$ERROR_POSITION}")
414 ensure
415   @log.info("#{__method__} has done.")
416   @do_join_process = false
417 end
push_a_vnode_stream(hname, vn, nid) click to toggle source
    # File lib/roma/async_process.rb
645 def push_a_vnode_stream(hname, vn, nid)
646   @log.debug("#{__method__}:hname=#{hname} vn=#{vn} nid=#{nid}")
647 
648   stop_clean_up
649 
650   con = Roma::Messaging::ConPool.instance.get_connection(nid)
651 
652   @do_push_a_vnode_stream = true
653 
654   con.write("spushv #{hname} #{vn}\r\n")
655 
656   res = con.gets # READY\r\n or error string
657   if res != "READY\r\n"
658     con.close
659     return res.chomp
660   end
661 
662   res_dump = @storages[hname].each_vn_dump(vn) do |data|
663 
664     unless @do_push_a_vnode_stream
665       con.close
666       @log.error("#{__method__}:canceled in hname=#{hname} vn=#{vn} nid=#{nid}")
667       return 'CANCELED'
668     end
669 
670     con.write(data)
671     sleep @stats.stream_copy_wait_param
672   end
673   con.write("\0" * 20) # end of stream
674 
675   res = con.gets # STORED\r\n or error string
676   Roma::Messaging::ConPool.instance.return_connection(nid, con)
677   res.chomp! if res
678   if res_dump == false
679     @log.error("#{__method__}:each_vn_dump in hname=#{hname} vn=#{vn} nid=#{nid}")
680     return 'CANCELED'
681   end
682   res
683 rescue => e
684   @log.error("#{e}\n#{$ERROR_POSITION}")
685   e.to_s
686 end
release_process() click to toggle source
    # File lib/roma/async_process.rb
479 def release_process
480   @log.info("#{__method__}:start.")
481 
482   if @rttable.can_i_release?(@stats.ap_str, @stats.rep_host)
483     @log.error("#{__method__}:Sufficient nodes do not found.")
484     return
485   end
486 
487   @do_release_process = true
488   while @rttable.has_node?(@stats.ap_str)
489     break unless @do_release_process
490     @rttable.each_vnode do |vn, nids|
491       break unless @do_release_process
492       if nids.include?(@stats.ap_str)
493 
494         to_nid, new_nids = @rttable.select_node_for_release(@stats.ap_str, @stats.rep_host, nids)
495         res = sync_a_vnode_for_release(vn, to_nid, new_nids)
496         if res == :abort
497           @log.error("#{__method__}:release_process aborted due to SERVER_ERROR received.")
498           @do_release_process = false
499         end
500         if res == false
501           @log.warn("#{__method__}:error at vn=#{vn} to_nid=#{to_nid} new_nid=#{new_nids}")
502           redo
503         end
504       end
505     end
506   end
507   @log.info("#{__method__} has done.")
508 rescue => e
509   @log.error("#{e}\n#{$ERROR_POSITION}")
510 ensure
511   @do_release_process = false
512   Roma::Messaging::ConPool.instance.close_all
513 end
replicate_existing_data_process(args) click to toggle source
     # File lib/roma/async_process.rb
1062 def replicate_existing_data_process(args)
1063   @log.info("#{__method__} :start.")
1064 
1065   @storages.each_key do |hname|
1066     @rttable.v_idx.each_key do |vn|
1067       raise unless $roma.cr_writer.run_existing_data_replication
1068       args[0].v_idx[vn].each do |replica_nid|
1069         res = push_a_vnode_stream(hname, vn, replica_nid)
1070         if res != 'STORED'
1071           @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
1072           return false
1073         end
1074       end
1075     end
1076   end
1077 
1078   @log.info("#{__method__} has done.")
1079 rescue => e
1080   @log.error("#{e}\n#{$ERROR_POSITION}")
1081 end
req_push_a_vnode(vn, src_nid, is_primary) click to toggle source
    # File lib/roma/async_process.rb
449 def req_push_a_vnode(vn, src_nid, is_primary)
450   con = Roma::Messaging::ConPool.instance.get_connection(src_nid)
451   con.write("reqpushv #{vn} #{@stats.ap_str} #{is_primary}\r\n")
452   res = con.gets # receive 'PUSHED\r\n' | 'REJECTED\r\n' | 'ERROR\r\n'
453   if res == "REJECTED\r\n"
454     @log.warn("#{__method__}:request was rejected from #{src_nid}.")
455     Roma::Messaging::ConPool.instance.return_connection(src_nid, con)
456     return :rejected
457   elsif res != "PUSHED\r\n"
458     @log.warn("#{__method__}:#{res}")
459     return :rejected
460   end
461   Roma::Messaging::ConPool.instance.return_connection(src_nid, con)
462   # waiting for pushv
463   count = 0
464   while @rttable.search_nodes(vn).include?(@stats.ap_str) == false && count < @stats.reqpushv_timeout_count
465     sleep 0.1
466     count += 1
467   end
468   if count >= @stats.reqpushv_timeout_count
469     @log.warn("#{__method__}:request has been time-out.vn=#{vn} nid=#{src_nid}")
470     return :timeout
471   end
472   true
473 rescue => e
474   @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
475   @rttable.proc_failed(src_nid)
476   false
477 end
stop_async_process() click to toggle source
   # File lib/roma/async_process.rb
61 def stop_async_process
62   count = 0
63   while @@async_queue.empty? == false && count < 100
64     count += 1
65     sleep 0.1
66   end
67   @async_thread.exit
68 
69   count = 0
70   while @@async_queue_latency.empty? == false && count < 100
71     count += 1
72     sleep 0.1
73   end
74   @async_thread_latency.exit
75 end
storage_cachecleaning_process(hname, dn) click to toggle source
    # File lib/roma/async_process.rb
852 def storage_cachecleaning_process(hname, dn)
853   count = 0
854   rcount = 0
855   st = @storages[hname]
856 
857   @do_storage_cachecleaning_process = true
858   loop do
859     # get keys in a cache up to 100 kyes
860     keys = st.get_keys_in_cache(dn)
861     break if keys.nil? || keys.length == 0
862     break unless @do_storage_cachecleaning_process
863 
864     # @log.debug("#{__method__}:#{keys.length} keys found")
865 
866     # copy cache -> db
867     st.each_cache_by_keys(dn, keys) do |vn, last, clk, expt, k, v|
868       break unless @do_storage_cachecleaning_process
869       if st.load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v)
870         count += 1
871         # @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.")
872       else
873         rcount += 1
874         # @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.")
875       end
876     end
877 
878     # remove keys in a cache
879     keys.each { |key| st.out_cache(dn, key) }
880   end
881   if @do_storage_cachecleaning_process == false
882     @log.warn("#{__method__}:uncompleted")
883   else
884     st.set_db_stat(dn, :normal)
885   end
886   @log.debug("#{__method__}:#{count} keys loaded.")
887   @log.debug("#{__method__}:#{rcount} keys rejected.") if rcount > 0
888 ensure
889   @do_storage_cachecleaning_process = false
890 end
storage_clean_up_process() click to toggle source
    # File lib/roma/async_process.rb
708 def storage_clean_up_process
709   @log.info("#{__method__}:start")
710   me = @stats.ap_str
711   vnhash = {}
712   @rttable.each_vnode do |vn, nids|
713     if nids.include?(me)
714       if nids[0] == me
715         vnhash[vn] = :primary
716       else
717         vnhash[vn] = "secondary#{nids.index(me)}".to_sym
718       end
719     end
720   end
721   t = Time.now.to_i - Roma::Config::STORAGE_DELMARK_EXPTIME
722   count = 0
723   @storages.each_pair do |hname, st|
724     break unless @stats.do_clean_up?
725     st.each_clean_up(t, vnhash) do |key, vn|
726       # @log.debug("#{__method__}:key=#{key} vn=#{vn}")
727       if @stats.run_receive_a_vnode.key?("#{hname}_#{vn}")
728         false
729       else
730         nodes = @rttable.search_nodes_for_write(vn)
731         if nodes && nodes.length > 1
732           nodes[1..-1].each do |nid|
733             res = async_send_cmd(nid, "out #{key}\e#{hname} #{vn}\r\n")
734             unless res
735               @log.warn("send out command failed:#{key}\e#{hname} #{vn} -> #{nid}")
736             end
737             # @log.debug("#{__method__}:res=#{res}")
738           end
739         end
740         count += 1
741         @stats.out_count += 1
742         true
743       end
744     end
745   end
746   if count > 0
747     @log.info("#{__method__}:#{count} keys deleted.")
748   end
749 
750   # delete @rttable.logs
751   if @stats.gui_run_gather_logs || @rttable.logs.empty?
752     false
753   else
754     gathered_time = @rttable.logs[0]
755     # delete gathering log data after 5min
756     @rttable.logs.clear if gathered_time.to_i < Time.now.to_i - (60 * 5)
757   end
758 ensure
759   @log.info("#{__method__}:stop")
760 end
sync_a_vnode(vn, to_nid, is_primary = nil) click to toggle source
    # File lib/roma/async_process.rb
559 def sync_a_vnode(vn, to_nid, is_primary = nil)
560   nids = @rttable.search_nodes(vn)
561 
562   if nids.include?(to_nid) == false || (is_primary && nids[0] != to_nid)
563     @log.debug("#{__method__}:#{vn} #{to_nid} #{is_primary}")
564     # change routing data at the vnode and synchronize a data
565     nids << to_nid
566     return false unless @rttable.transaction(vn, nids)
567 
568     # synchronize a data
569     @storages.each_key do |hname|
570       res = push_a_vnode_stream(hname, vn, to_nid)
571 
572       if res != 'STORED'
573         @rttable.rollback(vn)
574         @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
575         return false
576       end
577     end
578 
579     if (clk = @rttable.commit(vn)) == false
580       @rttable.rollback(vn)
581       @log.error("#{__method__}:routing table commit failed")
582       return false
583     end
584 
585     nids = edit_nodes(nids, to_nid, is_primary)
586     clk = @rttable.set_route(vn, clk, nids)
587     if clk.is_a?(Integer) == false
588       clk, nids = @rttable.search_nodes_with_clk(vn)
589     end
590 
591     cmd = "setroute #{vn} #{clk - 1}"
592     nids.each { |nn| cmd << " #{nn}" }
593     res = async_broadcast_cmd("#{cmd}\r\n")
594     @log.debug("#{__method__}:async_broadcast_cmd(#{cmd}) #{res}")
595   else
596     # synchronize a data
597     @storages.each_key do |hname|
598       res = push_a_vnode_stream(hname, vn, to_nid)
599       if res != 'STORED'
600         @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
601         return false
602       end
603     end
604   end
605 
606   return true
607 rescue => e
608   @log.error("#{e}\n#{$ERROR_POSITION}")
609   false
610 end
sync_a_vnode_for_release(vn, to_nid, new_nids) click to toggle source
    # File lib/roma/async_process.rb
515 def sync_a_vnode_for_release(vn, to_nid, new_nids)
516   nids = @rttable.search_nodes(vn)
517 
518   if nids.include?(to_nid) == false
519     @log.debug("#{__method__}:#{vn} #{to_nid}")
520     # change routing data at the vnode and synchronize a data
521     nids << to_nid
522     return false unless @rttable.transaction(vn, nids)
523 
524     # synchronize a data
525     @storages.each_key do |hname|
526       res = push_a_vnode_stream(hname, vn, to_nid)
527 
528       if res != 'STORED'
529         @rttable.rollback(vn)
530         @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
531         return :abort if res.start_with?('SERVER_ERROR')
532         return false
533       end
534     end
535 
536     if (clk = @rttable.commit(vn)) == false
537       @rttable.rollback(vn)
538       @log.error("#{__method__}:routing table commit failed")
539       return false
540     end
541 
542     clk = @rttable.set_route(vn, clk, new_nids)
543     if clk.is_a?(Integer) == false
544       clk, new_nids = @rttable.search_nodes_with_clk(vn)
545     end
546 
547     cmd = "setroute #{vn} #{clk - 1}"
548     new_nids.each { |nn| cmd << " #{nn}" }
549     res = async_broadcast_cmd("#{cmd}\r\n")
550     @log.debug("#{__method__}:async_broadcast_cmd(#{cmd}) #{res}")
551   end
552 
553   return true
554 rescue => e
555   @log.error("#{e}\n#{$ERROR_POSITION}")
556   false
557 end