module Roma::AsyncProcess
Public Class Methods
queue()
click to toggle source
# File lib/roma/async_process.rb 37 def self.queue 38 @@async_queue 39 end
queue_latency()
click to toggle source
# File lib/roma/async_process.rb 41 def self.queue_latency 42 @@async_queue_latency 43 end
Public Instance Methods
start_async_process()
click to toggle source
# File lib/roma/async_process.rb 45 def start_async_process 46 @async_thread = Thread.new do 47 async_process_loop 48 end 49 @async_thread[:name] = __method__ 50 51 @async_thread_latency = Thread.new do 52 async_process_loop_for_latency 53 end 54 @async_thread_latency[:name] = __method__ 55 rescue => e 56 @log.error("#{e}\n#{$ERROR_POSITION}") 57 end
Private Instance Methods
acquired_recover_process()
click to toggle source
# File lib/roma/async_process.rb 359 def acquired_recover_process 360 @log.info("#{__method__}:start") 361 362 exclude_nodes = @rttable.exclude_nodes_for_recover(@stats.ap_str, @stats.rep_host) 363 364 @do_acquired_recover_process = true 365 loop do 366 break unless @do_acquired_recover_process 367 break if @rttable.num_of_vn(@stats.ap_str)[2] == 0 # short vnodes 368 369 vn, nodes, is_primary = @rttable.select_vn_for_recover(exclude_nodes, @stats.rep_host) 370 break unless vn 371 372 if nodes.length != 0 373 ret = req_push_a_vnode(vn, nodes[0], is_primary) 374 if ret == :rejected 375 sleep 1 376 elsif ret == false 377 break 378 end 379 sleep 1 380 end 381 end 382 @log.info("#{__method__} has done.") 383 rescue => e 384 @log.error("#{e.inspect} #{$ERROR_POSITION}") 385 ensure 386 @do_acquired_recover_process = false 387 end
async_process_loop()
click to toggle source
# File lib/roma/async_process.rb 77 def async_process_loop 78 loop do 79 while msg = @@async_queue.pop 80 if send("asyncev_#{msg.event}", msg.args) 81 msg.callback.call(msg, true) if msg.callback 82 else 83 if msg.retry? 84 t = Thread.new do 85 msg.wait 86 msg.incr_count 87 @@async_queue.push(msg) 88 end 89 t[:name] = __method__ 90 else 91 @log.error("async process retry out:#{msg.inspect}") 92 msg.callback.call(msg, false) if msg.callback 93 end 94 end 95 end 96 end 97 rescue => e 98 @log.error("#{e}\n#{$ERROR_POSITION}") 99 retry 100 end
async_process_loop_for_latency()
click to toggle source
# File lib/roma/async_process.rb 102 def async_process_loop_for_latency 103 loop do 104 while msg = @@async_queue_latency.pop 105 if send("asyncev_#{msg.event}", msg.args) 106 msg.callback.call(msg, true) if msg.callback 107 else 108 if msg.retry? 109 t = Thread.new do 110 msg.wait 111 msg.incr_count 112 @@async_queue_latency.push(msg) 113 end 114 t[:name] = __method__ 115 else 116 @log.error("async process retry out:#{msg.inspect}") 117 msg.callback.call(msg, false) if msg.callback 118 end 119 end 120 end 121 end 122 rescue => e 123 @log.error("#{e}\n#{$ERROR_POSITION}") 124 retry 125 end
asyncev_broadcast_cmd(args)
click to toggle source
# File lib/roma/async_process.rb 127 def asyncev_broadcast_cmd(args) 128 @log.debug("#{__method__} #{args.inspect}") 129 cmd, nids, tout = args 130 t = Thread.new do 131 async_broadcast_cmd("#{cmd}\r\n", nids, tout) 132 end 133 t[:name] = __method__ 134 true 135 end
asyncev_calc_latency_average(args)
click to toggle source
# File lib/roma/async_process.rb 762 def asyncev_calc_latency_average(args) 763 latency, cmd = args 764 # @log.debug(__method__) 765 766 unless @stats.latency_data.key?(cmd) # only first execute target cmd 767 @stats.latency_data[cmd].store('latency', []) 768 @stats.latency_data[cmd].store('latency_max', {}) 769 @stats.latency_data[cmd]['latency_max'].store('current', 0) 770 @stats.latency_data[cmd].store('latency_min', {}) 771 @stats.latency_data[cmd]['latency_min'].store('current', 99_999) 772 @stats.latency_data[cmd].store('time', Time.now.to_i) 773 end 774 775 begin 776 @stats.latency_data[cmd]['latency'].delete_at(0) if @stats.latency_data[cmd]['latency'].length >= 10 777 @stats.latency_data[cmd]['latency'].push(latency) 778 779 @stats.latency_data[cmd]['latency_max']['current'] = latency if latency > @stats.latency_data[cmd]['latency_max']['current'] 780 @stats.latency_data[cmd]['latency_min']['current'] = latency if latency < @stats.latency_data[cmd]['latency_min']['current'] 781 782 rescue => e 783 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 784 785 ensure 786 if @stats.latency_check_time_count && Time.now.to_i - @stats.latency_data[cmd]['time'] > @stats.latency_check_time_count 787 average = @stats.latency_data[cmd]['latency'].inject(0.0) { |r, i| r += i } / @stats.latency_data[cmd]['latency'].size 788 max = @stats.latency_data[cmd]['latency_max']['current'] 789 min = @stats.latency_data[cmd]['latency_min']['current'] 790 @log.debug("Latency average[#{cmd}]: #{sprintf('%.8f', average)}"\ 791 "(denominator=#{@stats.latency_data[cmd]['latency'].length}"\ 792 " max=#{sprintf('%.8f', max)}"\ 793 " min=#{sprintf('%.8f', min)})" 794 ) 795 796 @stats.latency_data[cmd]['time'] = Time.now.to_i 797 @stats.latency_data[cmd]['latency_past'] = @stats.latency_data[cmd]['latency'] 798 @stats.latency_data[cmd]['latency'] = [] 799 @stats.latency_data[cmd]['latency_max']['past'] = @stats.latency_data[cmd]['latency_max']['current'] 800 @stats.latency_data[cmd]['latency_max']['current'] = 0 801 @stats.latency_data[cmd]['latency_min']['past'] = @stats.latency_data[cmd]['latency_min']['current'] 802 @stats.latency_data[cmd]['latency_min']['current'] = 99_999 803 end 804 end 805 true 806 end
asyncev_rdelete(args)
click to toggle source
# File lib/roma/async_process.rb 224 def asyncev_rdelete(args) 225 nid, hname, k, clk = args 226 @log.debug("#{__method__} #{args.inspect}") 227 unless @rttable.nodes.include?(nid) 228 @log.warn("async rdelete failed:#{nid} does not found in routing table.#{k}\e#{hname} #{clk}") 229 return true # no retry 230 end 231 res = async_send_cmd(nid, "rdelete #{k}\e#{hname} #{clk}\r\n", 10) 232 unless res 233 @log.warn("async redundant failed:#{k}\e#{hname} #{clk} -> #{nid}") 234 return false # retry 235 end 236 true 237 end
asyncev_redundant(args)
click to toggle source
# File lib/roma/async_process.rb 194 def asyncev_redundant(args) 195 nid, hname, k, d, clk, expt, v = args 196 @log.debug("#{__method__} #{args.inspect}") 197 unless @rttable.nodes.include?(nid) 198 @log.warn("async redundant failed:#{nid} does not found in routing table.#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}") 199 return true # no retry 200 end 201 res = async_send_cmd(nid, "rset #{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}\r\n#{v}\r\n", 10) 202 if res.nil? || res.start_with?('ERROR') 203 @log.warn("async redundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}") 204 return false # retry 205 end 206 true 207 end
asyncev_reqpushv(args)
click to toggle source
# File lib/roma/async_process.rb 239 def asyncev_reqpushv(args) 240 vn, nid, p = args 241 @log.debug("#{__method__} #{args.inspect}") 242 if @stats.run_iterate_storage 243 @log.warn("#{__method__}:already be iterated storage process.") 244 else 245 @stats.run_iterate_storage = true 246 t = Thread.new do 247 begin 248 sync_a_vnode(vn.to_i, nid, p == 'true') 249 rescue => e 250 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 251 ensure 252 @stats.run_iterate_storage = false 253 end 254 end 255 t[:name] = __method__ 256 end 257 end
asyncev_start_auto_recover_process(args)
click to toggle source
# File lib/roma/async_process.rb 286 def asyncev_start_auto_recover_process(args) 287 @log.debug("#{__method__} #{args.inspect}") 288 # ##run_join don't have possibility to be true in this case. 289 # if @stats.run_join 290 # @log.error("#{__method__}:join process running") 291 # return true 292 # end 293 if @stats.run_recover 294 @log.error("#{__method__}:recover process running.") 295 return false 296 end 297 if @stats.run_balance 298 @log.error("#{__method__}:balance process running") 299 return true 300 end 301 302 @rttable.auto_recover_status = 'preparing' 303 t = Thread.new do 304 begin 305 timeout(@rttable.auto_recover_time)do 306 loop do 307 sleep 1 308 break if @rttable.auto_recover_status != 'preparing' 309 # break if @stats.run_join #run_join don't have possibility to be true in this case. 310 break if @stats.run_recover 311 break if @stats.run_balance 312 end 313 end 314 @log.debug('inactivated AUTO_RECOVER') 315 rescue 316 case @rttable.lost_action 317 when :auto_assign, :shutdown 318 @stats.run_recover = true 319 @rttable.auto_recover_status = 'executing' 320 begin 321 @log.debug('auto recover start') 322 acquired_recover_process 323 rescue => e 324 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 325 ensure 326 @stats.run_recover = false 327 @rttable.auto_recover_status = 'waiting' 328 end 329 when :no_action 330 @log.debug('auto recover NOT start. Because lost action is [no_action]') 331 end 332 end 333 end 334 t[:name] = __method__ 335 end
asyncev_start_balance_process(_args)
click to toggle source
# File lib/roma/async_process.rb 166 def asyncev_start_balance_process(_args) 167 @log.debug(__method__) 168 if @stats.run_join 169 @log.error("#{__method__}:join process running") 170 return true 171 end 172 if @stats.run_recover 173 @log.error("#{__method__}:recover process running") 174 return true 175 end 176 if @stats.run_balance 177 @log.error("#{__method__}:balance process running") 178 return true 179 end 180 @stats.run_balance = true 181 t = Thread.new do 182 begin 183 balance_process 184 rescue => e 185 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 186 ensure 187 @stats.run_balance = false 188 end 189 end 190 t[:name] = __method__ 191 true 192 end
asyncev_start_get_logs(args)
click to toggle source
# File lib/roma/async_process.rb 925 def asyncev_start_get_logs(args) 926 @log.debug("#{__method__} #{args}") 927 t = Thread.new do 928 begin 929 get_logs(args) 930 rescue => e 931 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 932 ensure 933 @stats.gui_run_gather_logs = false 934 end 935 end 936 t[:name] = __method__ 937 end
asyncev_start_get_routing_event(args)
click to toggle source
# File lib/roma/async_process.rb 892 def asyncev_start_get_routing_event(args) 893 @log.debug("#{__method__} #{args}") 894 t = Thread.new do 895 begin 896 get_routing_event 897 rescue => e 898 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 899 ensure 900 end 901 end 902 t[:name] = __method__ 903 end
asyncev_start_join_process(_args)
click to toggle source
# File lib/roma/async_process.rb 137 def asyncev_start_join_process(_args) 138 @log.debug(__method__) 139 if @stats.run_join 140 @log.error("#{__method__}:join process running") 141 return true 142 end 143 if @stats.run_recover 144 @log.error("#{__method__}:recover process running") 145 return true 146 end 147 if @stats.run_balance 148 @log.error("#{__method__}:balance process running") 149 return true 150 end 151 @stats.run_join = true 152 t = Thread.new do 153 begin 154 join_process 155 rescue => e 156 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 157 ensure 158 @stats.run_join = false 159 @stats.join_ap = nil 160 end 161 end 162 t[:name] = __method__ 163 true 164 end
asyncev_start_recover_process(args)
click to toggle source
# File lib/roma/async_process.rb 259 def asyncev_start_recover_process(args) 260 @log.debug("#{__method__} #{args.inspect}") 261 if @stats.run_join 262 @log.error("#{__method__}:join process running") 263 return true 264 end 265 if @stats.run_recover 266 @log.error("#{__method__}:recover process running.") 267 return false 268 end 269 if @stats.run_balance 270 @log.error("#{__method__}:balance process running") 271 return true 272 end 273 @stats.run_recover = true 274 t = Thread.new do 275 begin 276 acquired_recover_process 277 rescue => e 278 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 279 ensure 280 @stats.run_recover = false 281 end 282 end 283 t[:name] = __method__ 284 end
asyncev_start_release_process(args)
click to toggle source
# File lib/roma/async_process.rb 337 def asyncev_start_release_process(args) 338 @log.debug("#{__method__} #{args}") 339 if @stats.run_iterate_storage 340 @log.warn("#{__method__}:already be iterated storage process.") 341 else 342 @stats.run_release = true 343 @stats.run_iterate_storage = true 344 @stats.spushv_protection = true 345 t = Thread.new do 346 begin 347 release_process 348 rescue => e 349 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 350 ensure 351 @stats.run_iterate_storage = false 352 @stats.run_release = false 353 end 354 end 355 t[:name] = __method__ 356 end 357 end
asyncev_start_replicate_existing_data_process(args)
click to toggle source
# File lib/roma/async_process.rb 1047 def asyncev_start_replicate_existing_data_process(args) 1048 # args is [$roma.cr_writer.replica_rttable]) 1049 t = Thread.new do 1050 begin 1051 $roma.cr_writer.run_existing_data_replication = true 1052 replicate_existing_data_process(args) 1053 rescue => e 1054 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 1055 ensure 1056 $roma.cr_writer.run_existing_data_replication = false 1057 end 1058 end 1059 t[:name] = __method__ 1060 end
asyncev_start_storage_cachecleaning_process(args)
click to toggle source
# File lib/roma/async_process.rb 831 def asyncev_start_storage_cachecleaning_process(args) 832 hname, dn = args 833 @log.debug("#{__method__} #{args.inspect}") 834 835 st = @storages[hname] 836 if st.dbs[dn] != :cachecleaning 837 @log.error("Can not start cachecleaning process. stat = #{st.dbs[dn]}") 838 return true 839 end 840 t = Thread.new do 841 begin 842 storage_cachecleaning_process(hname, dn) 843 rescue => e 844 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 845 ensure 846 end 847 end 848 t[:name] = __method__ 849 true 850 end
asyncev_start_storage_clean_up_process(_args)
click to toggle source
# File lib/roma/async_process.rb 688 def asyncev_start_storage_clean_up_process(_args) 689 # @log.info("#{__method__}") 690 if @stats.run_storage_clean_up 691 @log.error("#{__method__}:already in being") 692 return 693 end 694 @stats.run_storage_clean_up = true 695 t = Thread.new do 696 begin 697 storage_clean_up_process 698 rescue => e 699 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 700 ensure 701 @stats.last_clean_up = Time.now 702 @stats.run_storage_clean_up = false 703 end 704 end 705 t[:name] = __method__ 706 end
asyncev_start_storage_flush_process(args)
click to toggle source
# File lib/roma/async_process.rb 808 def asyncev_start_storage_flush_process(args) 809 hname, dn = args 810 @log.debug("#{__method__} #{args.inspect}") 811 812 st = @storages[hname] 813 if st.dbs[dn] != :safecopy_flushing 814 @log.error("Can not flush storage. stat = #{st.dbs[dn]}") 815 return true 816 end 817 t = Thread.new do 818 begin 819 st.flush_db(dn) 820 st.set_db_stat(dn, :safecopy_flushed) 821 @log.info("#{__method__}:storage has flushed. (#{hname}, #{dn})") 822 rescue => e 823 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 824 ensure 825 end 826 end 827 t[:name] = __method__ 828 true 829 end
asyncev_zredundant(args)
click to toggle source
# File lib/roma/async_process.rb 209 def asyncev_zredundant(args) 210 nid, hname, k, d, clk, expt, zv = args 211 @log.debug("#{__method__} #{args.inspect}") 212 unless @rttable.nodes.include?(nid) 213 @log.warn("async zredundant failed:#{nid} does not found in routing table.#{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}") 214 return true # no retry 215 end 216 res = async_send_cmd(nid, "rzset #{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}\r\n#{zv}\r\n", 10) 217 if res.nil? || res.start_with?('ERROR') 218 @log.warn("async zredundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}") 219 return false # retry 220 end 221 true 222 end
balance_process()
click to toggle source
# File lib/roma/async_process.rb 419 def balance_process 420 @log.info("#{__method__}:start") 421 count = 0 422 nv = @rttable.v_idx.length 423 exclude_nodes = @rttable.exclude_nodes_for_balance(@stats.ap_str, @stats.rep_host) 424 425 @do_balance_process = true 426 while @rttable.vnode_balance(@stats.ap_str) == :less && count < nv 427 break unless @do_balance_process 428 429 vn, nodes, is_primary = @rttable.select_vn_for_balance(exclude_nodes, @stats.rep_host) 430 unless vn 431 @log.warn("#{__method__}:vnode does not found") 432 return false 433 end 434 ret = req_push_a_vnode(vn, nodes[0], is_primary) 435 if ret == :rejected 436 sleep 5 437 else 438 sleep 1 439 count += 1 440 end 441 end 442 @log.info("#{__method__} has done.") 443 rescue => e 444 @log.error("#{e.inspect} #{$ERROR_POSITION}") 445 ensure 446 @do_balance_process = false 447 end
edit_nodes(nodes, new_nid, is_primary)
click to toggle source
# File lib/roma/async_process.rb 612 def edit_nodes(nodes, new_nid, is_primary) 613 if @rttable.rn == 1 614 return [new_nid] 615 end 616 # [node_a, node_b, new_nid] 617 nodes.delete(new_nid) 618 # [node_a, node_b] 619 620 if nodes.length >= @rttable.rn 621 host = new_nid.split(/[:_]/)[0] 622 buf = [] # list of a same host 623 nodes.each do |nid| 624 buf << nid if nid.split(/[:_]/)[0] == host 625 end 626 if buf.length > 0 627 # include same host 628 # delete a last one, due to save a primary node 629 nodes.delete(buf.last) 630 else 631 nodes.delete(nodes.last) 632 end 633 end 634 635 if is_primary 636 # [new_nid, node_a] 637 nodes.insert(0, new_nid) 638 else 639 # [node_a, new_nid] 640 nodes << new_nid 641 end 642 nodes 643 end
get_logs(args)
click to toggle source
# File lib/roma/async_process.rb 939 def get_logs(args) 940 @log.debug("#{__method__}:start.") 941 942 log_path = Config::LOG_PATH 943 log_file = "#{log_path}/#{@stats.ap_str}.log" 944 945 target_logs = [] 946 File.open(log_file)do|f| 947 start_point = get_point(f, args[0], 'start') 948 end_point = get_point(f, args[1], 'end') 949 950 ## read target logs 951 f.seek(start_point, IO::SEEK_SET) 952 target_logs = f.read(end_point - start_point) 953 target_logs = target_logs.each_line.map(&:chomp) 954 target_logs.delete('.') 955 end 956 957 @rttable.logs = target_logs 958 # set gathered date for expiration 959 @rttable.logs.unshift(Time.now) 960 961 @log.debug("#{__method__} has done.") 962 rescue => e 963 @rttable.logs = [] 964 @log.error("#{e}\n#{$ERROR_POSITION}") 965 ensure 966 @stats.gui_run_gather_logs = false 967 end
get_point(f, target_time, type, latency_time = Time.now, current_pos = 0, new_pos = f.size / 2)
click to toggle source
# File lib/roma/async_process.rb 969 def get_point(f, target_time, type, latency_time = Time.now, current_pos = 0, new_pos = f.size / 2) 970 # hilatency check 971 ps = Time.now - latency_time 972 if ps > 5 973 @log.warn('gather_logs process was failed.') 974 fail 975 end 976 977 # initialize read size 978 read_size = 2048 979 980 # first check 981 unless target_time.class == Time 982 # in case of not set end_date 983 return f.size if target_time == 'current' 984 985 target_time =~ (/(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)/) 986 target_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], 000000) 987 988 # check outrange or not 989 f.seek(0, IO::SEEK_SET) 990 begining_log = f.read(read_size) 991 pos = begining_log.index(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/) 992 begining_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], Regexp.last_match[7]) 993 994 f.seek(-read_size, IO::SEEK_END) 995 end_log = f.read(read_size) 996 pos = end_log.rindex(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/) 997 end_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], Regexp.last_match[7]) 998 999 case type 1000 when 'start' 1001 if target_time < begining_time 1002 return 0 1003 elsif target_time > end_time 1004 @log.error('irregular time was set.') 1005 fail 1006 end 1007 when 'end' 1008 if target_time > end_time 1009 return f.size 1010 elsif target_time < begining_time 1011 @log.error('irregular time was set.') 1012 fail 1013 end 1014 end 1015 end 1016 1017 # read half sector size 1018 f.seek(new_pos, IO::SEEK_SET) 1019 sector_log = f.read(read_size) 1020 # grep date 1021 date_a = sector_log.scan(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/) 1022 1023 time_a = [] 1024 date_a.each do|time| 1025 time_a.push(Time.mktime(time[0], time[1], time[2], time[3], time[4], time[5], time[6])) 1026 end 1027 sector_time_first = time_a[0] 1028 sector_time_last = time_a[-1] 1029 1030 if target_time.between?(sector_time_first, sector_time_last) 1031 time_a.each do|time| 1032 if target_time <= time 1033 time_string = time.strftime('%Y-%m-%dT%H:%M:%S') 1034 target_index = sector_log.index(/[IDEW],\s\[#{time_string}/) 1035 return new_pos + target_index 1036 end 1037 end 1038 elsif sector_time_first > target_time 1039 target_pos = new_pos - ((new_pos - current_pos).abs / 2) 1040 elsif sector_time_first < target_time 1041 target_pos = new_pos + ((new_pos - current_pos).abs / 2) 1042 end 1043 1044 get_point(f, target_time, type, latency_time, new_pos, target_pos) 1045 end
get_routing_event()
click to toggle source
# File lib/roma/async_process.rb 905 def get_routing_event 906 @log.info("#{__method__}:start.") 907 908 routing_path = Config::RTTABLE_PATH 909 f_list = Dir.glob("#{routing_path}/#{@stats.ap_str}*") 910 911 f_list.each do|fname| 912 IO.foreach(fname)do|line| 913 if line =~ /join|leave/ 914 @rttable.event.shift if @rttable.event.size >= @rttable.event_limit_line 915 @rttable.event << line.chomp 916 end 917 end 918 end 919 920 @log.info("#{__method__} has done.") 921 rescue => e 922 @log.error("#{e}\n#{$ERROR_POSITION}") 923 end
join_process()
click to toggle source
# File lib/roma/async_process.rb 389 def join_process 390 @log.info("#{__method__}:start") 391 count = 0 392 nv = @rttable.v_idx.length 393 exclude_nodes = @rttable.exclude_nodes_for_join(@stats.ap_str, @stats.rep_host) 394 395 @do_join_process = true 396 while @rttable.vnode_balance(@stats.ap_str) == :less && count < nv 397 break unless @do_join_process 398 399 vn, nodes, is_primary = @rttable.select_vn_for_join(exclude_nodes, @stats.rep_host) 400 unless vn 401 @log.warn("#{__method__}:vnode does not found") 402 return false 403 end 404 ret = req_push_a_vnode(vn, nodes[0], is_primary) 405 if ret == :rejected 406 sleep 5 407 else 408 sleep 1 409 count += 1 410 end 411 end 412 rescue => e 413 @log.error("#{e.inspect} #{$ERROR_POSITION}") 414 ensure 415 @log.info("#{__method__} has done.") 416 @do_join_process = false 417 end
push_a_vnode_stream(hname, vn, nid)
click to toggle source
# File lib/roma/async_process.rb 645 def push_a_vnode_stream(hname, vn, nid) 646 @log.debug("#{__method__}:hname=#{hname} vn=#{vn} nid=#{nid}") 647 648 stop_clean_up 649 650 con = Roma::Messaging::ConPool.instance.get_connection(nid) 651 652 @do_push_a_vnode_stream = true 653 654 con.write("spushv #{hname} #{vn}\r\n") 655 656 res = con.gets # READY\r\n or error string 657 if res != "READY\r\n" 658 con.close 659 return res.chomp 660 end 661 662 res_dump = @storages[hname].each_vn_dump(vn) do |data| 663 664 unless @do_push_a_vnode_stream 665 con.close 666 @log.error("#{__method__}:canceled in hname=#{hname} vn=#{vn} nid=#{nid}") 667 return 'CANCELED' 668 end 669 670 con.write(data) 671 sleep @stats.stream_copy_wait_param 672 end 673 con.write("\0" * 20) # end of stream 674 675 res = con.gets # STORED\r\n or error string 676 Roma::Messaging::ConPool.instance.return_connection(nid, con) 677 res.chomp! if res 678 if res_dump == false 679 @log.error("#{__method__}:each_vn_dump in hname=#{hname} vn=#{vn} nid=#{nid}") 680 return 'CANCELED' 681 end 682 res 683 rescue => e 684 @log.error("#{e}\n#{$ERROR_POSITION}") 685 e.to_s 686 end
release_process()
click to toggle source
# File lib/roma/async_process.rb 479 def release_process 480 @log.info("#{__method__}:start.") 481 482 if @rttable.can_i_release?(@stats.ap_str, @stats.rep_host) 483 @log.error("#{__method__}:Sufficient nodes do not found.") 484 return 485 end 486 487 @do_release_process = true 488 while @rttable.has_node?(@stats.ap_str) 489 break unless @do_release_process 490 @rttable.each_vnode do |vn, nids| 491 break unless @do_release_process 492 if nids.include?(@stats.ap_str) 493 494 to_nid, new_nids = @rttable.select_node_for_release(@stats.ap_str, @stats.rep_host, nids) 495 res = sync_a_vnode_for_release(vn, to_nid, new_nids) 496 if res == :abort 497 @log.error("#{__method__}:release_process aborted due to SERVER_ERROR received.") 498 @do_release_process = false 499 end 500 if res == false 501 @log.warn("#{__method__}:error at vn=#{vn} to_nid=#{to_nid} new_nid=#{new_nids}") 502 redo 503 end 504 end 505 end 506 end 507 @log.info("#{__method__} has done.") 508 rescue => e 509 @log.error("#{e}\n#{$ERROR_POSITION}") 510 ensure 511 @do_release_process = false 512 Roma::Messaging::ConPool.instance.close_all 513 end
replicate_existing_data_process(args)
click to toggle source
# File lib/roma/async_process.rb 1062 def replicate_existing_data_process(args) 1063 @log.info("#{__method__} :start.") 1064 1065 @storages.each_key do |hname| 1066 @rttable.v_idx.each_key do |vn| 1067 raise unless $roma.cr_writer.run_existing_data_replication 1068 args[0].v_idx[vn].each do |replica_nid| 1069 res = push_a_vnode_stream(hname, vn, replica_nid) 1070 if res != 'STORED' 1071 @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}") 1072 return false 1073 end 1074 end 1075 end 1076 end 1077 1078 @log.info("#{__method__} has done.") 1079 rescue => e 1080 @log.error("#{e}\n#{$ERROR_POSITION}") 1081 end
req_push_a_vnode(vn, src_nid, is_primary)
click to toggle source
# File lib/roma/async_process.rb 449 def req_push_a_vnode(vn, src_nid, is_primary) 450 con = Roma::Messaging::ConPool.instance.get_connection(src_nid) 451 con.write("reqpushv #{vn} #{@stats.ap_str} #{is_primary}\r\n") 452 res = con.gets # receive 'PUSHED\r\n' | 'REJECTED\r\n' | 'ERROR\r\n' 453 if res == "REJECTED\r\n" 454 @log.warn("#{__method__}:request was rejected from #{src_nid}.") 455 Roma::Messaging::ConPool.instance.return_connection(src_nid, con) 456 return :rejected 457 elsif res != "PUSHED\r\n" 458 @log.warn("#{__method__}:#{res}") 459 return :rejected 460 end 461 Roma::Messaging::ConPool.instance.return_connection(src_nid, con) 462 # waiting for pushv 463 count = 0 464 while @rttable.search_nodes(vn).include?(@stats.ap_str) == false && count < @stats.reqpushv_timeout_count 465 sleep 0.1 466 count += 1 467 end 468 if count >= @stats.reqpushv_timeout_count 469 @log.warn("#{__method__}:request has been time-out.vn=#{vn} nid=#{src_nid}") 470 return :timeout 471 end 472 true 473 rescue => e 474 @log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}") 475 @rttable.proc_failed(src_nid) 476 false 477 end
stop_async_process()
click to toggle source
# File lib/roma/async_process.rb 61 def stop_async_process 62 count = 0 63 while @@async_queue.empty? == false && count < 100 64 count += 1 65 sleep 0.1 66 end 67 @async_thread.exit 68 69 count = 0 70 while @@async_queue_latency.empty? == false && count < 100 71 count += 1 72 sleep 0.1 73 end 74 @async_thread_latency.exit 75 end
storage_cachecleaning_process(hname, dn)
click to toggle source
# File lib/roma/async_process.rb 852 def storage_cachecleaning_process(hname, dn) 853 count = 0 854 rcount = 0 855 st = @storages[hname] 856 857 @do_storage_cachecleaning_process = true 858 loop do 859 # get keys in a cache up to 100 kyes 860 keys = st.get_keys_in_cache(dn) 861 break if keys.nil? || keys.length == 0 862 break unless @do_storage_cachecleaning_process 863 864 # @log.debug("#{__method__}:#{keys.length} keys found") 865 866 # copy cache -> db 867 st.each_cache_by_keys(dn, keys) do |vn, last, clk, expt, k, v| 868 break unless @do_storage_cachecleaning_process 869 if st.load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v) 870 count += 1 871 # @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.") 872 else 873 rcount += 1 874 # @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.") 875 end 876 end 877 878 # remove keys in a cache 879 keys.each { |key| st.out_cache(dn, key) } 880 end 881 if @do_storage_cachecleaning_process == false 882 @log.warn("#{__method__}:uncompleted") 883 else 884 st.set_db_stat(dn, :normal) 885 end 886 @log.debug("#{__method__}:#{count} keys loaded.") 887 @log.debug("#{__method__}:#{rcount} keys rejected.") if rcount > 0 888 ensure 889 @do_storage_cachecleaning_process = false 890 end
storage_clean_up_process()
click to toggle source
# File lib/roma/async_process.rb 708 def storage_clean_up_process 709 @log.info("#{__method__}:start") 710 me = @stats.ap_str 711 vnhash = {} 712 @rttable.each_vnode do |vn, nids| 713 if nids.include?(me) 714 if nids[0] == me 715 vnhash[vn] = :primary 716 else 717 vnhash[vn] = "secondary#{nids.index(me)}".to_sym 718 end 719 end 720 end 721 t = Time.now.to_i - Roma::Config::STORAGE_DELMARK_EXPTIME 722 count = 0 723 @storages.each_pair do |hname, st| 724 break unless @stats.do_clean_up? 725 st.each_clean_up(t, vnhash) do |key, vn| 726 # @log.debug("#{__method__}:key=#{key} vn=#{vn}") 727 if @stats.run_receive_a_vnode.key?("#{hname}_#{vn}") 728 false 729 else 730 nodes = @rttable.search_nodes_for_write(vn) 731 if nodes && nodes.length > 1 732 nodes[1..-1].each do |nid| 733 res = async_send_cmd(nid, "out #{key}\e#{hname} #{vn}\r\n") 734 unless res 735 @log.warn("send out command failed:#{key}\e#{hname} #{vn} -> #{nid}") 736 end 737 # @log.debug("#{__method__}:res=#{res}") 738 end 739 end 740 count += 1 741 @stats.out_count += 1 742 true 743 end 744 end 745 end 746 if count > 0 747 @log.info("#{__method__}:#{count} keys deleted.") 748 end 749 750 # delete @rttable.logs 751 if @stats.gui_run_gather_logs || @rttable.logs.empty? 752 false 753 else 754 gathered_time = @rttable.logs[0] 755 # delete gathering log data after 5min 756 @rttable.logs.clear if gathered_time.to_i < Time.now.to_i - (60 * 5) 757 end 758 ensure 759 @log.info("#{__method__}:stop") 760 end
sync_a_vnode(vn, to_nid, is_primary = nil)
click to toggle source
# File lib/roma/async_process.rb 559 def sync_a_vnode(vn, to_nid, is_primary = nil) 560 nids = @rttable.search_nodes(vn) 561 562 if nids.include?(to_nid) == false || (is_primary && nids[0] != to_nid) 563 @log.debug("#{__method__}:#{vn} #{to_nid} #{is_primary}") 564 # change routing data at the vnode and synchronize a data 565 nids << to_nid 566 return false unless @rttable.transaction(vn, nids) 567 568 # synchronize a data 569 @storages.each_key do |hname| 570 res = push_a_vnode_stream(hname, vn, to_nid) 571 572 if res != 'STORED' 573 @rttable.rollback(vn) 574 @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}") 575 return false 576 end 577 end 578 579 if (clk = @rttable.commit(vn)) == false 580 @rttable.rollback(vn) 581 @log.error("#{__method__}:routing table commit failed") 582 return false 583 end 584 585 nids = edit_nodes(nids, to_nid, is_primary) 586 clk = @rttable.set_route(vn, clk, nids) 587 if clk.is_a?(Integer) == false 588 clk, nids = @rttable.search_nodes_with_clk(vn) 589 end 590 591 cmd = "setroute #{vn} #{clk - 1}" 592 nids.each { |nn| cmd << " #{nn}" } 593 res = async_broadcast_cmd("#{cmd}\r\n") 594 @log.debug("#{__method__}:async_broadcast_cmd(#{cmd}) #{res}") 595 else 596 # synchronize a data 597 @storages.each_key do |hname| 598 res = push_a_vnode_stream(hname, vn, to_nid) 599 if res != 'STORED' 600 @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}") 601 return false 602 end 603 end 604 end 605 606 return true 607 rescue => e 608 @log.error("#{e}\n#{$ERROR_POSITION}") 609 false 610 end
sync_a_vnode_for_release(vn, to_nid, new_nids)
click to toggle source
# File lib/roma/async_process.rb 515 def sync_a_vnode_for_release(vn, to_nid, new_nids) 516 nids = @rttable.search_nodes(vn) 517 518 if nids.include?(to_nid) == false 519 @log.debug("#{__method__}:#{vn} #{to_nid}") 520 # change routing data at the vnode and synchronize a data 521 nids << to_nid 522 return false unless @rttable.transaction(vn, nids) 523 524 # synchronize a data 525 @storages.each_key do |hname| 526 res = push_a_vnode_stream(hname, vn, to_nid) 527 528 if res != 'STORED' 529 @rttable.rollback(vn) 530 @log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}") 531 return :abort if res.start_with?('SERVER_ERROR') 532 return false 533 end 534 end 535 536 if (clk = @rttable.commit(vn)) == false 537 @rttable.rollback(vn) 538 @log.error("#{__method__}:routing table commit failed") 539 return false 540 end 541 542 clk = @rttable.set_route(vn, clk, new_nids) 543 if clk.is_a?(Integer) == false 544 clk, new_nids = @rttable.search_nodes_with_clk(vn) 545 end 546 547 cmd = "setroute #{vn} #{clk - 1}" 548 new_nids.each { |nn| cmd << " #{nn}" } 549 res = async_broadcast_cmd("#{cmd}\r\n") 550 @log.debug("#{__method__}:async_broadcast_cmd(#{cmd}) #{res}") 551 end 552 553 return true 554 rescue => e 555 @log.error("#{e}\n#{$ERROR_POSITION}") 556 false 557 end