class Roma::Romad
Attributes
cr_writer[R]
eventloop[RW]
rttable[R]
startup[RW]
stats[R]
storages[R]
wb_writer[R]
Public Class Methods
new(argv = nil)
click to toggle source
# File lib/roma/romad.rb 30 def initialize(argv = nil) 31 @stats = Roma::Stats.instance 32 @startup = true 33 options(argv) 34 initialize_stats 35 initialize_connection 36 initialize_logger 37 initialize_rttable 38 initialize_storages 39 initialize_handler 40 initialize_plugin 41 initialize_wb_writer 42 end
Public Instance Methods
daemon?()
click to toggle source
# File lib/roma/romad.rb 160 def daemon?; @stats.daemon; end
start()
click to toggle source
# File lib/roma/romad.rb 44 def start 45 # config version check 46 if !Config.const_defined?(:VERSION) 47 @log.error("ROMA FAIL TO BOOT! : config.rb's version is too old.") 48 exit 49 elsif Config::VERSION != Roma::VERSION 50 if /(\d+)\.(\d+)\.(\d+)/ =~ Config::VERSION 51 version_config = ($1.to_i << 16) + ($2.to_i << 8) + $3.to_i 52 end 53 if /(\d+)\.(\d+)\.(\d+)/ =~ Roma::VERSION 54 version_roma = ($1.to_i << 16) + ($2.to_i << 8) + $3.to_i 55 end 56 57 if version_config == version_roma 58 @log.info("This version is development version.") 59 else 60 @log.error("ROMA FAIL TO BOOT! : config.rb's version is differ from current ROMA version.") 61 exit 62 end 63 end 64 65 if node_check(@stats.ap_str) 66 @log.error("#{@stats.ap_str} is already running.") 67 return 68 end 69 70 @storages.each{|hashname,st| 71 st.opendb 72 } 73 74 start_async_process 75 start_wb_process 76 timer 77 78 if @stats.join_ap 79 AsyncProcess::queue.push(AsyncMessage.new('start_join_process')) 80 end 81 82 # select a kind of system call 83 if Config.const_defined?(:CONNECTION_USE_EPOLL) && Config::CONNECTION_USE_EPOLL 84 @log.info("use an epoll") 85 EM.epoll 86 if Config.const_defined?(:CONNECTION_DESCRIPTOR_TABLE_SIZE) 87 EM.set_descriptor_table_size(Config::CONNECTION_DESCRIPTOR_TABLE_SIZE) 88 end 89 else 90 @log.info("use a select") 91 end 92 93 @eventloop = true 94 while(@eventloop) 95 @eventloop = false 96 begin 97 # initialize an instance of connections as restarting of an evantmachine 98 Event::Handler::connections.each_key{|k| 99 begin 100 k.close_connection 101 rescue Exception => e 102 @log.error("#{e}\n#{$@}") 103 end 104 } 105 Event::Handler::connections.clear 106 107 EventMachine::run do 108 EventMachine.start_server('0.0.0.0', @stats.port, 109 Roma::Command::Receiver, 110 @storages, @rttable) 111 # a management of connections lives 112 EventMachine::add_periodic_timer( 10 ) { 113 if Event::Handler::connection_expire_time > 0 114 dellist = [] 115 Event::Handler::connections.each{|k,v| 116 if k.connected == false || k.last_access == nil 117 dellist << k 118 elsif k.last_access < Time.now - Event::Handler::connection_expire_time 119 begin 120 k.close_connection 121 if k.addr 122 @log.info("connection expired from #{k.addr}:#{k.port},lastcmd = #{k.lastcmd}") 123 else 124 @log.info("connection expired in irregular connection") 125 dellist << k 126 end 127 rescue Exception => e 128 @log.error("#{e}\n#{$@}") 129 dellist << k 130 end 131 end 132 } 133 dellist.each{|k| 134 @log.info("delete connection lastcmd = #{k.lastcmd}") 135 Event::Handler::connections.delete(k) 136 } 137 end 138 } 139 140 @log.info("Now accepting connections on address #{@stats.address}, port #{@stats.port}") 141 end 142 rescue Interrupt => e 143 if daemon? 144 @log.error("#{e.inspect}\n#{$@}") 145 retry 146 else 147 $stderr.puts "#{e.inspect}" 148 end 149 rescue Exception => e 150 @log.error("#{e}\n#{$@}") 151 @log.error("restart an eventmachine") 152 retry 153 end 154 end 155 stop_async_process 156 stop_wb_process 157 stop 158 end
stop_clean_up()
click to toggle source
# File lib/roma/romad.rb 162 def stop_clean_up 163 @stats.last_clean_up = Time.now 164 while(@stats.run_storage_clean_up) 165 @log.info("Storage clean up process will be stop.") 166 @storages.each_value{|st| st.stop_clean_up} 167 sleep 0.005 168 end 169 end
Private Instance Methods
async_broadcast_cmd(cmd,without_nids=nil,tout=nil)
click to toggle source
# File lib/roma/romad.rb 771 def async_broadcast_cmd(cmd,without_nids=nil,tout=nil) 772 without_nids=[@stats.ap_str] unless without_nids 773 res = {} 774 @rttable.nodes.each{ |nid| 775 res[nid] = async_send_cmd(nid,cmd,tout) unless without_nids.include?(nid) 776 } 777 res 778 rescue Exception => e 779 @log.error("#{e}\n#{$@}") 780 nil 781 end
async_send_cmd(nid, cmd, tout=nil)
click to toggle source
# File lib/roma/romad.rb 733 def async_send_cmd(nid, cmd, tout=nil) 734 con = res = nil 735 if tout 736 timeout(tout){ 737 con = Roma::Messaging::ConPool.instance.get_connection(nid) 738 unless con 739 @rttable.proc_failed(nid) if @rttable 740 @log.error("#{__FILE__}:#{__LINE__}:#{nid} connection refused,command is #{cmd}.") 741 return nil 742 end 743 con.write(cmd) 744 res = con.gets 745 } 746 else 747 con = Roma::Messaging::ConPool.instance.get_connection(nid) 748 unless con 749 @rttable.proc_failed(nid) if @rttable 750 @log.error("#{__FILE__}:#{__LINE__}:#{nid} connection refused,command is #{cmd}.") 751 return nil 752 end 753 con.write(cmd) 754 res = con.gets 755 end 756 if res == nil 757 @rttable.proc_failed(nid) if @rttable 758 return nil 759 elsif res.start_with?("ERROR") == false 760 @rttable.proc_succeed(nid) if @rttable 761 Roma::Messaging::ConPool.instance.return_connection(nid, con) 762 end 763 res.chomp 764 rescue Exception => e 765 @rttable.proc_failed(nid) if @rttable 766 @log.error("#{__FILE__}:#{__LINE__}:#{e} #{$@}") 767 @log.error("#{__FILE__}:#{__LINE__}:Send command failed that node-id is #{nid},command is #{cmd}.") 768 nil 769 end
get_routedump(nid)
click to toggle source
# File lib/roma/romad.rb 507 def get_routedump(nid) 508 rcv = receive_routing_dump(nid, "routingdump bin\r\n") 509 unless rcv 510 rcv = receive_routing_dump(nid, "routingdump\r\n") 511 rd = Marshal.load(rcv) 512 else 513 rd = Routing::RoutingData.decode_binary(rcv) 514 end 515 rd 516 rescue 517 nil 518 end
gets()
click to toggle source
# File lib/roma/romad.rb 262 def gets 263 ret = gets2 264 @log.info("command log:#{ret.chomp}") if ret 265 ret 266 end
initialize_connection()
click to toggle source
# File lib/roma/romad.rb 200 def initialize_connection 201 if Config.const_defined?(:CONNECTION_CONTINUOUS_LIMIT) 202 unless Event::Handler.set_ccl(Config::CONNECTION_CONTINUOUS_LIMIT) 203 raise "config parse error : Config::CONNECTION_CONTINUOUS_LIMIT" 204 end 205 end 206 207 if Config.const_defined?(:CONNECTION_EXPTIME) 208 Event::Handler::connection_expire_time = Config::CONNECTION_EXPTIME 209 end 210 211 if Config.const_defined?(:CONNECTION_POOL_EXPTIME) 212 Messaging::ConPool.instance.expire_time = Config::CONNECTION_POOL_EXPTIME 213 end 214 215 if Config.const_defined?(:CONNECTION_POOL_MAX) 216 Messaging::ConPool.instance.maxlength = Config::CONNECTION_POOL_MAX 217 end 218 219 if Config.const_defined?(:CONNECTION_EMPOOL_EXPTIME) 220 Event::EMConPool::instance.expire_time = Config::CONNECTION_EMPOOL_EXPTIME 221 end 222 223 if Config.const_defined?(:CONNECTION_EMPOOL_MAX) 224 Event::EMConPool::instance.maxlength = Config::CONNECTION_EMPOOL_MAX 225 end 226 end
initialize_handler()
click to toggle source
# File lib/roma/romad.rb 256 def initialize_handler 257 if @stats.verbose 258 Event::Handler.class_eval{ 259 alias gets2 gets 260 undef gets 261 262 def gets 263 ret = gets2 264 @log.info("command log:#{ret.chomp}") if ret 265 ret 266 end 267 } 268 end 269 270 if @stats.join_ap 271 Command::Receiver::mk_evlist 272 else 273 Command::Receiver::mk_starting_evlist 274 end 275 end
initialize_logger()
click to toggle source
# File lib/roma/romad.rb 277 def initialize_logger 278 Roma::Logging::RLogger.create_singleton_instance("#{Roma::Config::LOG_PATH}/#{@stats.ap_str}.log", 279 Roma::Config::LOG_SHIFT_AGE, 280 Roma::Config::LOG_SHIFT_SIZE) 281 @log = Roma::Logging::RLogger.instance 282 283 if Config.const_defined? :LOG_LEVEL 284 case Config::LOG_LEVEL 285 when :debug 286 @log.level = Roma::Logging::RLogger::Severity::DEBUG 287 when :info 288 @log.level = Roma::Logging::RLogger::Severity::INFO 289 when :warn 290 @log.level = Roma::Logging::RLogger::Severity::WARN 291 when :error 292 @log.level = Roma::Logging::RLogger::Severity::ERROR 293 end 294 end 295 end
initialize_plugin()
click to toggle source
# File lib/roma/romad.rb 237 def initialize_plugin 238 return unless Roma::Config.const_defined? :PLUGIN_FILES 239 240 Roma::Config::PLUGIN_FILES.each do|f| 241 require "roma/plugin/#{f}" 242 @log.info("roma/plugin/#{f} loaded") 243 end 244 Roma::CommandPlugin.plugins.each do|plugin| 245 Roma::Command::Receiver.class_eval do 246 include plugin 247 end 248 @log.info("#{plugin.to_s} included") 249 end 250 251 if @stats.disabled_cmd_protect 252 Command::Receiver::mk_evlist 253 end 254 end
initialize_rttable()
click to toggle source
# File lib/roma/romad.rb 412 def initialize_rttable 413 if @stats.join_ap 414 initialize_rttable_join 415 else 416 fname = "#{Roma::Config::RTTABLE_PATH}/#{@stats.ap_str}.route" 417 raise "#{fname} not found." unless File::exist?(fname) 418 rd = Roma::Routing::RoutingData::load(fname) 419 raise "It failed in loading the routing table data." unless rd 420 if Config.const_defined? :RTTABLE_CLASS 421 @rttable = Config::RTTABLE_CLASS.new(rd,fname) 422 else 423 @rttable = Roma::Routing::ChurnbasedRoutingTable.new(rd,fname) 424 end 425 end 426 427 if Roma::Config.const_defined?(:RTTABLE_SUB_NID) 428 @rttable.sub_nid = Roma::Config::RTTABLE_SUB_NID 429 end 430 431 if Roma::Config.const_defined?(:ROUTING_FAIL_CNT_THRESHOLD) 432 @rttable.fail_cnt_threshold = Roma::Config::ROUTING_FAIL_CNT_THRESHOLD 433 end 434 if Roma::Config.const_defined?(:ROUTING_FAIL_CNT_GAP) 435 @rttable.fail_cnt_gap = Roma::Config::ROUTING_FAIL_CNT_GAP 436 end 437 @rttable.lost_action = Roma::Config::DEFAULT_LOST_ACTION 438 @rttable.auto_recover = Roma::Config::AUTO_RECOVER if defined?(Roma::Config::AUTO_RECOVER) 439 440 @rttable.enabled_failover = false 441 @rttable.set_leave_proc{|nid| 442 Roma::Messaging::ConPool.instance.close_same_host(nid) 443 Roma::Event::EMConPool.instance.close_same_host(nid) 444 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('broadcast_cmd',["leave #{nid}",[@stats.ap_str,nid,5]])) 445 } 446 @rttable.set_lost_proc{ 447 if @rttable.lost_action == :shutdown 448 async_broadcast_cmd("rbalse lose_data\r\n") 449 EventMachine::stop_event_loop 450 @log.error("Romad has stopped, so that lose data.") 451 end 452 } 453 @rttable.set_recover_proc{|action| 454 if (@rttable.lost_action == :shutdown || @rttable.lost_action == :auto_assign) && @rttable.auto_recover == true 455 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new("#{action}")) 456 elsif 457 @log.error("AUTO_RECOVER is off or Unavailable value is set to [DEFAULT_LOST_ACTION] => #{@rttable.lost_action}") 458 end 459 } 460 461 if Roma::Config.const_defined?(:ROUTING_EVENT_LIMIT_LINE) 462 @rttable.event_limit_line = Roma::Config::ROUTING_EVENT_LIMIT_LINE 463 end 464 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_get_routing_event')) 465 end
initialize_rttable_join()
click to toggle source
# File lib/roma/romad.rb 467 def initialize_rttable_join 468 name = async_send_cmd(@stats.join_ap,"whoami\r\n") 469 unless name 470 raise "No respons from #{@stats.join_ap}." 471 end 472 473 if name != @stats.name 474 raise "#{@stats.join_ap} has diffarent name.\n" + 475 "me = \"#{@stats.name}\" #{@stats.join_ap} = \"#{name}\"" 476 end 477 478 fname = "#{Roma::Config::RTTABLE_PATH}/#{@stats.ap_str}.route" 479 if rd = get_routedump(@stats.join_ap) 480 rd.save(fname) 481 else 482 raise "It failed in getting the routing table data from #{@stats.join_ap}." 483 end 484 485 if rd.nodes.include?(@stats.ap_str) 486 raise "ROMA has already contained #{@stats.ap_str}." 487 end 488 489 @rttable = Roma::Routing::ChurnbasedRoutingTable.new(rd,fname) 490 nodes = @rttable.nodes 491 492 nodes.each{|nid| 493 begin 494 con = Roma::Messaging::ConPool.instance.get_connection(nid) 495 con.write("join #{@stats.ap_str}\r\n") 496 if con.gets != "ADDED\r\n" 497 raise "Hotscale initialize failed.\n#{nid} is busy." 498 end 499 Roma::Messaging::ConPool.instance.return_connection(nid, con) 500 rescue =>e 501 raise "Hotscale initialize failed.\n#{nid} unreachable connection." 502 end 503 } 504 @rttable.add_node(@stats.ap_str) 505 end
initialize_stats()
click to toggle source
# File lib/roma/romad.rb 173 def initialize_stats 174 if Config.const_defined?(:REDUNDANT_ZREDUNDANT_SIZE) 175 @stats.size_of_zredundant = Config::REDUNDANT_ZREDUNDANT_SIZE 176 end 177 if Config.const_defined?(:DATACOPY_STREAM_COPY_WAIT_PARAM) 178 @stats.stream_copy_wait_param = Config::DATACOPY_STREAM_COPY_WAIT_PARAM 179 end 180 if Config.const_defined?(:LOG_STREAM_SHOW_WAIT_PARAM) 181 @stats.stream_show_wait_param = Config::LOG_STREAM_SHOW_WAIT_PARAM 182 end 183 if Config.const_defined?(:WB_COMMAND_MAP) 184 @stats.wb_command_map = Config::WB_COMMAND_MAP 185 end 186 if Config.const_defined?(:STORAGE_CLEAN_UP_INTERVAL) 187 @stats.clean_up_interval = Config::STORAGE_CLEAN_UP_INTERVAL 188 end 189 if Config.const_defined?(:LOG_SHIFT_SIZE) 190 @stats.log_shift_size = Config::LOG_SHIFT_SIZE 191 end 192 if Config.const_defined?(:LOG_SHIFT_AGE) 193 @stats.log_shift_age = Config::LOG_SHIFT_AGE 194 end 195 if Config.const_defined?(:LOG_LEVEL) 196 @stats.log_level = Config::LOG_LEVEL 197 end 198 end
initialize_storages()
click to toggle source
# File lib/roma/romad.rb 367 def initialize_storages 368 @storages = {} 369 if Config.const_defined? :STORAGE_PATH 370 path = "#{Roma::Config::STORAGE_PATH}/#{@stats.ap_str}" 371 end 372 373 if Config.const_defined? :STORAGE_CLASS 374 st_class = Config::STORAGE_CLASS 375 end 376 377 if Config.const_defined? :STORAGE_DIVNUM 378 st_divnum = Config::STORAGE_DIVNUM 379 end 380 if Config.const_defined? :STORAGE_OPTION 381 st_option = Config::STORAGE_OPTION 382 end 383 384 path ||= './' 385 st_class ||= Storage::RubyHashStorage 386 st_divnum ||= 10 387 st_option ||= nil 388 Dir.glob("#{path}/*").each{|f| 389 if File.directory?(f) 390 hname = File.basename(f) 391 st = st_class.new 392 st.storage_path = "#{path}/#{hname}" 393 st.vn_list = @rttable.vnodes 394 st.st_class = st_class 395 st.divnum = st_divnum 396 st.option = st_option 397 @storages[hname] = st 398 end 399 } 400 if @storages.length == 0 401 hname = 'roma' 402 st = st_class.new 403 st.storage_path = "#{path}/#{hname}" 404 st.vn_list = @rttable.vnodes 405 st.st_class = st_class 406 st.divnum = st_divnum 407 st.option = st_option 408 @storages[hname] = st 409 end 410 end
initialize_wb_writer()
click to toggle source
# File lib/roma/romad.rb 228 def initialize_wb_writer 229 @wb_writer = Roma::WriteBehind::FileWriter.new( 230 Roma::Config::WRITEBEHIND_PATH, 231 Roma::Config::WRITEBEHIND_SHIFT_SIZE, 232 @log) 233 234 @cr_writer = Roma::WriteBehind::StreamWriter.new(@log) 235 end
node_check(nid)
click to toggle source
# File lib/roma/romad.rb 629 def node_check(nid) 630 if @startup && @rttable.enabled_failover == false 631 unless Roma::Messaging::ConPool.instance.check_connection(nid) 632 @log.info("I'm waiting for booting the #{nid} instance.") 633 return false 634 end 635 end 636 name = async_send_cmd(nid,"whoami\r\n",2) 637 return false unless name 638 if name != @stats.name 639 @log.error("#{nid} has diffarent name.") 640 @log.error("me = \"#{@stats.name}\" #{nid} = \"#{name}\"") 641 return false 642 end 643 return true 644 end
nodes_check(nodes)
click to toggle source
# File lib/roma/romad.rb 622 def nodes_check(nodes) 623 nodes.each{|nid| 624 return false unless node_check(nid) 625 } 626 return true 627 end
options(argv)
click to toggle source
# File lib/roma/romad.rb 297 def options(argv) 298 opts = OptionParser.new 299 opts.banner="usage:#{File.basename($0)} [options] address" 300 301 @stats.daemon = false 302 opts.on("-d","--daemon") { |v| @stats.daemon = true } 303 304 opts.on_tail("-h", "--help", "Show this message") { 305 puts opts; exit 306 } 307 308 opts.on("-j","--join [address:port]") { |v| @stats.join_ap = v } 309 310 opts.on("-p", "--port [PORT]") { |v| @stats.port = v } 311 312 @stats.verbose = false 313 opts.on(nil,"--verbose"){ |v| @stats.verbose = true } 314 315 opts.on_tail("-v", "--version", "Show version") { 316 puts "romad.rb #{Roma::VERSION}"; exit 317 } 318 319 opts.on("-n", "--name [name]") { |v| @stats.name = v } 320 321 ## 322 # "--enabled_repeathost" is deplicated. We will rename it to "--replication_in_host" 323 ## 324 @stats.enabled_repetition_host_in_routing = false 325 opts.on(nil,"--enabled_repeathost", "Allow redundancy to same host"){ 326 @stats.enabled_repetition_host_in_routing = true 327 puts "Warning: \"--enabled_repeathost\" is deplicated. Please use \"--replication_in_host\"" 328 } 329 opts.on(nil,"--replication_in_host", "Allow redundancy to same host"){ 330 @stats.enabled_repetition_host_in_routing = true 331 } 332 333 @stats.disabled_cmd_protect = false 334 opts.on(nil,"--disabled_cmd_protect", "Command protection disable while starting"){ 335 @stats.disabled_cmd_protect = true 336 } 337 338 opts.on("--config [file path of the config.rb]"){ |v| @stats.config_path = File.expand_path(v) } 339 340 opts.parse!(argv) 341 raise OptionParser::ParseError.new if argv.length < 1 342 @stats.address = argv[0] 343 344 @stats.config_path = 'roma/config' unless @stats.config_path 345 346 unless require @stats.config_path 347 raise "config.rb has already been load outside the romad.rb." 348 end 349 350 @stats.name = Config::DEFAULT_NAME unless @stats.name 351 @stats.port = Config::DEFAULT_PORT.to_s unless @stats.port 352 353 unless @stats.port =~ /^\d+$/ 354 raise OptionParser::ParseError.new('Port number is not numeric.') 355 end 356 357 @stats.join_ap.sub!(':','_') if @stats.join_ap 358 if @stats.join_ap && !(@stats.join_ap =~ /^.+_\d+$/) 359 raise OptionParser::ParseError.new('[address:port] can not parse.') 360 end 361 rescue OptionParser::ParseError => e 362 $stderr.puts e.message 363 $stderr.puts opts.help 364 exit 1 365 end
receive_routing_dump(nid, cmd)
click to toggle source
# File lib/roma/romad.rb 520 def receive_routing_dump(nid, cmd) 521 con = Messaging::ConPool.instance.get_connection(nid) 522 con.write(cmd) 523 unless select [con], nil, nil, 1 524 con.close 525 return nil 526 end 527 len = con.gets 528 if len.to_i <= 0 529 con.close 530 return nil 531 end 532 533 rcv='' 534 while(rcv.length != len.to_i) 535 rcv = rcv + con.read(len.to_i - rcv.length) 536 end 537 con.read(2) 538 con.gets 539 Messaging::ConPool.instance.return_connection(nid,con) 540 rcv 541 rescue Exception 542 nil 543 end
routing_hash_comparison(nid,id='0')
click to toggle source
# File lib/roma/romad.rb 700 def routing_hash_comparison(nid,id='0') 701 return :skip if @stats.run_join || @stats.run_recover || @stats.run_balance 702 703 h = async_send_cmd(nid,"mklhash #{id}\r\n") 704 if h && h.start_with?("ERROR") == false && @rttable.mtree.get(id) != h 705 if (id.length - 1) == @rttable.div_bits 706 sync_routing(nid,id) 707 else 708 routing_hash_comparison(nid,"#{id}0") 709 routing_hash_comparison(nid,"#{id}1") 710 end 711 return :inconsistent 712 end 713 :consistent 714 end
start_sync_routing_process()
click to toggle source
# File lib/roma/romad.rb 658 def start_sync_routing_process 659 return if @stats.run_join || @stats.run_recover || @stats.run_balance || @stats.run_sync_routing 660 661 nodes = @rttable.nodes 662 return if nodes.length == 1 && nodes[0] == @stats.ap_str 663 664 @stats.run_sync_routing = true 665 666 idx=nodes.index(@stats.ap_str) 667 unless idx 668 @log.error("My node-id(=#{@stats.ap_str}) does not found in the routingtable.") 669 EventMachine::stop_event_loop 670 return 671 end 672 t = Thread.new{ 673 begin 674 ret = routing_hash_comparison(nodes[idx-1]) 675 if ret == :inconsistent 676 @log.info("create nodes from v_idx"); 677 678 @rttable.create_nodes_from_v_idx 679 begin 680 con = Roma::Messaging::ConPool.instance.get_connection(nodes[idx-1]) 681 con.write("create_nodes_from_v_idx\r\n") 682 if con.gets == "CREATED\r\n" 683 Roma::Messaging::ConPool.instance.return_connection(nodes[idx-1], con) 684 else 685 @log.error("get busy result in create_nodes_from_v_idx command from #{nodes[idx-1]}.") 686 con.close 687 end 688 rescue Exception =>e 689 @log.error("create_nodes_from_v_idx command unreachable to the #{nodes[idx-1]}.") 690 end 691 end 692 rescue Exception =>e 693 @log.error("#{e}\n#{$@}") 694 end 695 @stats.run_sync_routing = false 696 } 697 t[:name] = 'sync_routing' 698 end
stop()
click to toggle source
# File lib/roma/romad.rb 783 def stop 784 @storages.each_value{|st| 785 st.closedb 786 } 787 if @rttable.instance_of?(Roma::Routing::ChurnbasedRoutingTable) 788 @rttable.close_log 789 end 790 @log.info("Romad has stopped: #{@stats.ap_str}") 791 end
sync_routing(nid,id)
click to toggle source
# File lib/roma/romad.rb 716 def sync_routing(nid,id) 717 vn = @rttable.mtree.to_vn(id) 718 @log.warn("vn=#{vn} inconsistent") 719 720 res = async_send_cmd(nid,"getroute #{vn}\r\n") 721 return if res == nil || res.start_with?("ERROR") 722 clk,*nids = res.split(' ') 723 clk = @rttable.set_route(vn, clk.to_i, nids) 724 725 if clk.is_a?(Integer) == false 726 clk,nids = @rttable.search_nodes_with_clk(vn) 727 cmd = "setroute #{vn} #{clk-1}" 728 nids.each{|nid2| cmd << " #{nid2}" } 729 async_send_cmd(nid,"#{cmd}\r\n") 730 end 731 end
timer()
click to toggle source
# File lib/roma/romad.rb 545 def timer 546 t = Thread.new do 547 loop do 548 sleep 1 549 timer_event_1sec 550 end 551 end 552 t[:name] = 'timer_1sec' 553 t = Thread.new do 554 loop do 555 sleep 10 556 timer_event_10sec 557 end 558 end 559 t[:name] = 'timer_10sec' 560 end
timer_event_10sec()
click to toggle source
# File lib/roma/romad.rb 577 def timer_event_10sec 578 if @startup && @rttable.enabled_failover == false 579 @log.debug("nodes_check start") 580 nodes=@rttable.nodes 581 nodes.delete(@stats.ap_str) 582 if nodes_check(nodes) 583 @log.info("all nodes started") 584 AsyncProcess::queue.clear 585 @rttable.enabled_failover = true 586 Command::Receiver::mk_evlist 587 @startup = false 588 end 589 elsif @rttable.enabled_failover == false 590 @log.warn("failover disable now!!") 591 else 592 version_check 593 @rttable.delete_old_trans(@stats.routing_trans_timeout) 594 start_sync_routing_process 595 end 596 597 if (@rttable.enabled_failover && 598 @stats.run_storage_clean_up == false && 599 @stats.run_balance == false && 600 @stats.run_recover == false && 601 @stats.run_iterate_storage == false && 602 @stats.run_join == false && 603 @stats.run_receive_a_vnode.empty? && 604 @stats.do_clean_up?) 605 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_storage_clean_up_process')) 606 end 607 608 if @cr_writer.run_replication 609 if @cr_writer.change_mklhash? 610 nid = @cr_writer.replica_nodelist.sample 611 @cr_writer.update_mklhash(nid) 612 @cr_writer.update_nodelist(nid) 613 @cr_writer.update_rttable(nid) 614 end 615 end 616 617 @stats.clear_counters 618 rescue Exception =>e 619 @log.error("#{e}\n#{$@}") 620 end
timer_event_1sec()
click to toggle source
# File lib/roma/romad.rb 562 def timer_event_1sec 563 if @rttable.enabled_failover 564 nodes=@rttable.nodes 565 nodes.delete(@stats.ap_str) 566 nodes_check(nodes) 567 end 568 569 if (@stats.run_join || @stats.run_recover || @stats.run_balance) && 570 @stats.run_storage_clean_up 571 stop_clean_up 572 end 573 rescue Exception =>e 574 @log.error("#{e}\n#{$@}") 575 end
version_check()
click to toggle source
# File lib/roma/romad.rb 646 def version_check 647 nodes=@rttable.nodes 648 nodes.each{|nid| 649 vs = async_send_cmd(nid,"version\r\n",2) 650 next unless vs 651 if /VERSION\s(?:ROMA-)?(\d+)\.(\d+)\.(\d+)/ =~ vs 652 ver = ($1.to_i << 16) + ($2.to_i << 8) + $3.to_i 653 @rttable.set_version(nid, ver) 654 end 655 } 656 end