class Roma::Romad

Attributes

cr_writer[R]
eventloop[RW]
rttable[R]
startup[RW]
stats[R]
storages[R]
wb_writer[R]

Public Class Methods

new(argv = nil) click to toggle source
   # File lib/roma/romad.rb
30 def initialize(argv = nil)
31   @stats = Roma::Stats.instance
32   @startup = true
33   options(argv)
34   initialize_stats
35   initialize_connection
36   initialize_logger
37   initialize_rttable
38   initialize_storages
39   initialize_handler
40   initialize_plugin
41   initialize_wb_writer
42 end

Public Instance Methods

daemon?() click to toggle source
    # File lib/roma/romad.rb
160 def daemon?; @stats.daemon; end
start() click to toggle source
    # File lib/roma/romad.rb
 44 def start
 45   # config version check
 46   if !Config.const_defined?(:VERSION)
 47     @log.error("ROMA FAIL TO BOOT! : config.rb's version is too old.")
 48     exit
 49   elsif Config::VERSION != Roma::VERSION
 50     if /(\d+)\.(\d+)\.(\d+)/ =~ Config::VERSION
 51       version_config = ($1.to_i << 16) + ($2.to_i << 8) + $3.to_i
 52     end
 53     if /(\d+)\.(\d+)\.(\d+)/ =~ Roma::VERSION
 54       version_roma = ($1.to_i << 16) + ($2.to_i << 8) + $3.to_i
 55     end
 56 
 57     if version_config == version_roma
 58       @log.info("This version is development version.")
 59     else 
 60       @log.error("ROMA FAIL TO BOOT! : config.rb's version is differ from current ROMA version.")
 61       exit
 62     end
 63   end
 64 
 65   if node_check(@stats.ap_str)
 66     @log.error("#{@stats.ap_str} is already running.")
 67     return
 68   end
 69 
 70   @storages.each{|hashname,st|
 71     st.opendb
 72   }
 73 
 74   start_async_process
 75   start_wb_process
 76   timer
 77 
 78   if @stats.join_ap
 79     AsyncProcess::queue.push(AsyncMessage.new('start_join_process'))
 80   end
 81 
 82   # select a kind of system call
 83   if Config.const_defined?(:CONNECTION_USE_EPOLL) && Config::CONNECTION_USE_EPOLL
 84     @log.info("use an epoll")
 85     EM.epoll
 86     if Config.const_defined?(:CONNECTION_DESCRIPTOR_TABLE_SIZE)
 87       EM.set_descriptor_table_size(Config::CONNECTION_DESCRIPTOR_TABLE_SIZE)
 88     end
 89   else
 90     @log.info("use a select")
 91   end
 92 
 93   @eventloop = true
 94   while(@eventloop)
 95     @eventloop = false
 96     begin
 97       # initialize an instance of connections as restarting of an evantmachine
 98       Event::Handler::connections.each_key{|k|
 99         begin
100           k.close_connection
101         rescue Exception => e
102           @log.error("#{e}\n#{$@}")
103         end
104       }
105       Event::Handler::connections.clear
106 
107       EventMachine::run do
108         EventMachine.start_server('0.0.0.0', @stats.port,
109                                   Roma::Command::Receiver,
110                                   @storages, @rttable)
111         # a management of connections lives
112         EventMachine::add_periodic_timer( 10 ) {
113           if Event::Handler::connection_expire_time > 0
114             dellist = []
115             Event::Handler::connections.each{|k,v|
116               if k.connected == false || k.last_access == nil
117                 dellist << k
118               elsif k.last_access < Time.now - Event::Handler::connection_expire_time
119                 begin
120                   k.close_connection
121                   if k.addr
122                     @log.info("connection expired from #{k.addr}:#{k.port},lastcmd = #{k.lastcmd}")
123                   else
124                     @log.info("connection expired in irregular connection")
125                     dellist << k
126                   end
127                 rescue Exception => e
128                   @log.error("#{e}\n#{$@}")
129                   dellist << k
130                 end
131               end
132             }
133             dellist.each{|k|
134               @log.info("delete connection lastcmd = #{k.lastcmd}")
135               Event::Handler::connections.delete(k)
136             }
137           end
138         }
139 
140         @log.info("Now accepting connections on address #{@stats.address}, port #{@stats.port}")
141       end
142     rescue Interrupt => e
143       if daemon?
144         @log.error("#{e.inspect}\n#{$@}")
145         retry
146       else
147         $stderr.puts "#{e.inspect}"
148       end
149     rescue Exception => e
150       @log.error("#{e}\n#{$@}")
151       @log.error("restart an eventmachine")
152       retry
153     end
154   end
155   stop_async_process
156   stop_wb_process
157   stop
158 end
stop_clean_up() click to toggle source
    # File lib/roma/romad.rb
162 def stop_clean_up
163   @stats.last_clean_up = Time.now
164   while(@stats.run_storage_clean_up)
165     @log.info("Storage clean up process will be stop.")
166     @storages.each_value{|st| st.stop_clean_up}
167     sleep 0.005
168   end
169 end

Private Instance Methods

async_broadcast_cmd(cmd,without_nids=nil,tout=nil) click to toggle source
    # File lib/roma/romad.rb
771 def async_broadcast_cmd(cmd,without_nids=nil,tout=nil)
772   without_nids=[@stats.ap_str] unless without_nids
773   res = {}
774   @rttable.nodes.each{ |nid|
775     res[nid] = async_send_cmd(nid,cmd,tout) unless without_nids.include?(nid)
776   }
777   res
778 rescue Exception => e
779   @log.error("#{e}\n#{$@}")
780   nil
781 end
async_send_cmd(nid, cmd, tout=nil) click to toggle source
    # File lib/roma/romad.rb
733 def async_send_cmd(nid, cmd, tout=nil)
734   con = res = nil
735   if tout
736     timeout(tout){
737       con = Roma::Messaging::ConPool.instance.get_connection(nid)
738       unless con
739         @rttable.proc_failed(nid) if @rttable
740         @log.error("#{__FILE__}:#{__LINE__}:#{nid} connection refused,command is #{cmd}.")
741         return nil
742       end
743       con.write(cmd)
744       res = con.gets
745     }
746   else
747     con = Roma::Messaging::ConPool.instance.get_connection(nid)
748     unless con
749       @rttable.proc_failed(nid) if @rttable
750       @log.error("#{__FILE__}:#{__LINE__}:#{nid} connection refused,command is #{cmd}.")
751       return nil
752     end
753     con.write(cmd)
754     res = con.gets
755   end
756   if res == nil
757     @rttable.proc_failed(nid) if @rttable
758     return nil
759   elsif res.start_with?("ERROR") == false
760     @rttable.proc_succeed(nid) if @rttable
761     Roma::Messaging::ConPool.instance.return_connection(nid, con)
762   end
763   res.chomp
764 rescue Exception => e
765   @rttable.proc_failed(nid) if @rttable
766   @log.error("#{__FILE__}:#{__LINE__}:#{e} #{$@}")
767   @log.error("#{__FILE__}:#{__LINE__}:Send command failed that node-id is #{nid},command is #{cmd}.")
768   nil
769 end
get_routedump(nid) click to toggle source
    # File lib/roma/romad.rb
507 def get_routedump(nid)
508   rcv = receive_routing_dump(nid, "routingdump bin\r\n")
509   unless rcv
510     rcv = receive_routing_dump(nid, "routingdump\r\n")
511     rd = Marshal.load(rcv)
512   else
513     rd = Routing::RoutingData.decode_binary(rcv)
514   end
515   rd
516 rescue
517   nil
518 end
gets() click to toggle source
    # File lib/roma/romad.rb
262 def gets
263   ret = gets2
264   @log.info("command log:#{ret.chomp}") if ret
265   ret
266 end
initialize_connection() click to toggle source
    # File lib/roma/romad.rb
200 def initialize_connection
201   if Config.const_defined?(:CONNECTION_CONTINUOUS_LIMIT)
202     unless Event::Handler.set_ccl(Config::CONNECTION_CONTINUOUS_LIMIT)
203       raise "config parse error : Config::CONNECTION_CONTINUOUS_LIMIT"
204     end
205   end
206 
207   if Config.const_defined?(:CONNECTION_EXPTIME)
208     Event::Handler::connection_expire_time = Config::CONNECTION_EXPTIME
209   end
210 
211   if Config.const_defined?(:CONNECTION_POOL_EXPTIME)
212     Messaging::ConPool.instance.expire_time = Config::CONNECTION_POOL_EXPTIME
213   end
214 
215   if Config.const_defined?(:CONNECTION_POOL_MAX)
216     Messaging::ConPool.instance.maxlength = Config::CONNECTION_POOL_MAX
217   end
218 
219   if Config.const_defined?(:CONNECTION_EMPOOL_EXPTIME)
220     Event::EMConPool::instance.expire_time = Config::CONNECTION_EMPOOL_EXPTIME
221   end
222 
223   if Config.const_defined?(:CONNECTION_EMPOOL_MAX)
224     Event::EMConPool::instance.maxlength = Config::CONNECTION_EMPOOL_MAX
225   end
226 end
initialize_handler() click to toggle source
    # File lib/roma/romad.rb
256 def initialize_handler
257   if @stats.verbose
258     Event::Handler.class_eval{
259       alias gets2 gets
260       undef gets
261 
262       def gets
263         ret = gets2
264         @log.info("command log:#{ret.chomp}") if ret
265         ret
266       end
267     }
268   end
269 
270   if @stats.join_ap
271     Command::Receiver::mk_evlist
272   else
273     Command::Receiver::mk_starting_evlist
274   end
275 end
initialize_logger() click to toggle source
    # File lib/roma/romad.rb
277 def initialize_logger
278   Roma::Logging::RLogger.create_singleton_instance("#{Roma::Config::LOG_PATH}/#{@stats.ap_str}.log",
279                                                    Roma::Config::LOG_SHIFT_AGE,
280                                                    Roma::Config::LOG_SHIFT_SIZE)
281   @log = Roma::Logging::RLogger.instance
282 
283   if Config.const_defined? :LOG_LEVEL
284     case Config::LOG_LEVEL
285     when :debug
286       @log.level = Roma::Logging::RLogger::Severity::DEBUG
287     when :info
288       @log.level = Roma::Logging::RLogger::Severity::INFO
289     when :warn
290       @log.level = Roma::Logging::RLogger::Severity::WARN
291     when :error
292       @log.level = Roma::Logging::RLogger::Severity::ERROR
293     end
294   end
295 end
initialize_plugin() click to toggle source
    # File lib/roma/romad.rb
237 def initialize_plugin
238   return unless Roma::Config.const_defined? :PLUGIN_FILES
239 
240   Roma::Config::PLUGIN_FILES.each do|f|
241     require "roma/plugin/#{f}"
242     @log.info("roma/plugin/#{f} loaded")
243   end
244   Roma::CommandPlugin.plugins.each do|plugin|
245       Roma::Command::Receiver.class_eval do
246         include plugin
247       end
248       @log.info("#{plugin.to_s} included")
249   end
250 
251   if @stats.disabled_cmd_protect
252     Command::Receiver::mk_evlist
253   end
254 end
initialize_rttable() click to toggle source
    # File lib/roma/romad.rb
412 def initialize_rttable
413   if @stats.join_ap
414     initialize_rttable_join
415   else
416     fname = "#{Roma::Config::RTTABLE_PATH}/#{@stats.ap_str}.route"
417     raise "#{fname} not found." unless File::exist?(fname)
418     rd = Roma::Routing::RoutingData::load(fname)
419     raise "It failed in loading the routing table data." unless rd
420     if Config.const_defined? :RTTABLE_CLASS
421       @rttable = Config::RTTABLE_CLASS.new(rd,fname)
422     else
423       @rttable = Roma::Routing::ChurnbasedRoutingTable.new(rd,fname)
424     end
425   end
426 
427   if Roma::Config.const_defined?(:RTTABLE_SUB_NID)
428     @rttable.sub_nid = Roma::Config::RTTABLE_SUB_NID
429   end
430 
431   if Roma::Config.const_defined?(:ROUTING_FAIL_CNT_THRESHOLD)
432     @rttable.fail_cnt_threshold = Roma::Config::ROUTING_FAIL_CNT_THRESHOLD
433   end
434   if Roma::Config.const_defined?(:ROUTING_FAIL_CNT_GAP)
435     @rttable.fail_cnt_gap = Roma::Config::ROUTING_FAIL_CNT_GAP
436   end
437   @rttable.lost_action = Roma::Config::DEFAULT_LOST_ACTION
438   @rttable.auto_recover = Roma::Config::AUTO_RECOVER if defined?(Roma::Config::AUTO_RECOVER)
439 
440   @rttable.enabled_failover = false
441   @rttable.set_leave_proc{|nid|
442     Roma::Messaging::ConPool.instance.close_same_host(nid)
443     Roma::Event::EMConPool.instance.close_same_host(nid)
444     Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('broadcast_cmd',["leave #{nid}",[@stats.ap_str,nid,5]]))
445   }
446   @rttable.set_lost_proc{
447     if @rttable.lost_action == :shutdown
448       async_broadcast_cmd("rbalse lose_data\r\n")
449       EventMachine::stop_event_loop
450       @log.error("Romad has stopped, so that lose data.")
451     end
452   }
453   @rttable.set_recover_proc{|action|
454     if (@rttable.lost_action == :shutdown || @rttable.lost_action == :auto_assign) && @rttable.auto_recover == true
455       Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new("#{action}"))
456     elsif
457       @log.error("AUTO_RECOVER is off or Unavailable value is set to [DEFAULT_LOST_ACTION] => #{@rttable.lost_action}")
458     end
459   }
460 
461   if Roma::Config.const_defined?(:ROUTING_EVENT_LIMIT_LINE)
462     @rttable.event_limit_line = Roma::Config::ROUTING_EVENT_LIMIT_LINE
463   end
464   Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_get_routing_event'))
465 end
initialize_rttable_join() click to toggle source
    # File lib/roma/romad.rb
467 def initialize_rttable_join
468   name = async_send_cmd(@stats.join_ap,"whoami\r\n")
469   unless name
470     raise "No respons from #{@stats.join_ap}."
471   end
472 
473   if name != @stats.name
474     raise "#{@stats.join_ap} has diffarent name.\n" +
475       "me = \"#{@stats.name}\"  #{@stats.join_ap} = \"#{name}\""
476   end
477 
478   fname = "#{Roma::Config::RTTABLE_PATH}/#{@stats.ap_str}.route"
479   if rd = get_routedump(@stats.join_ap)
480     rd.save(fname)
481   else
482     raise "It failed in getting the routing table data from #{@stats.join_ap}."
483   end
484 
485   if rd.nodes.include?(@stats.ap_str)
486     raise "ROMA has already contained #{@stats.ap_str}."
487   end
488 
489   @rttable = Roma::Routing::ChurnbasedRoutingTable.new(rd,fname)
490   nodes = @rttable.nodes
491 
492   nodes.each{|nid|
493     begin
494       con = Roma::Messaging::ConPool.instance.get_connection(nid)
495       con.write("join #{@stats.ap_str}\r\n")
496       if con.gets != "ADDED\r\n"
497         raise "Hotscale initialize failed.\n#{nid} is busy."
498       end
499       Roma::Messaging::ConPool.instance.return_connection(nid, con)
500     rescue =>e
501       raise "Hotscale initialize failed.\n#{nid} unreachable connection."
502     end
503   }
504   @rttable.add_node(@stats.ap_str)
505 end
initialize_stats() click to toggle source
    # File lib/roma/romad.rb
173 def initialize_stats
174   if Config.const_defined?(:REDUNDANT_ZREDUNDANT_SIZE)
175     @stats.size_of_zredundant = Config::REDUNDANT_ZREDUNDANT_SIZE
176   end
177   if Config.const_defined?(:DATACOPY_STREAM_COPY_WAIT_PARAM)
178     @stats.stream_copy_wait_param = Config::DATACOPY_STREAM_COPY_WAIT_PARAM
179   end
180   if Config.const_defined?(:LOG_STREAM_SHOW_WAIT_PARAM)
181     @stats.stream_show_wait_param = Config::LOG_STREAM_SHOW_WAIT_PARAM
182   end
183   if Config.const_defined?(:WB_COMMAND_MAP)
184     @stats.wb_command_map = Config::WB_COMMAND_MAP
185   end
186   if Config.const_defined?(:STORAGE_CLEAN_UP_INTERVAL)
187     @stats.clean_up_interval = Config::STORAGE_CLEAN_UP_INTERVAL
188   end
189   if Config.const_defined?(:LOG_SHIFT_SIZE)
190     @stats.log_shift_size = Config::LOG_SHIFT_SIZE
191   end
192   if Config.const_defined?(:LOG_SHIFT_AGE)
193     @stats.log_shift_age = Config::LOG_SHIFT_AGE
194   end
195   if Config.const_defined?(:LOG_LEVEL)
196     @stats.log_level = Config::LOG_LEVEL
197   end
198 end
initialize_storages() click to toggle source
    # File lib/roma/romad.rb
367 def initialize_storages
368   @storages = {}
369   if Config.const_defined? :STORAGE_PATH
370     path = "#{Roma::Config::STORAGE_PATH}/#{@stats.ap_str}"
371   end
372 
373   if Config.const_defined? :STORAGE_CLASS
374     st_class = Config::STORAGE_CLASS
375   end
376 
377   if Config.const_defined? :STORAGE_DIVNUM
378     st_divnum = Config::STORAGE_DIVNUM
379   end
380   if Config.const_defined? :STORAGE_OPTION
381     st_option = Config::STORAGE_OPTION
382   end
383 
384   path ||= './'
385   st_class ||= Storage::RubyHashStorage
386   st_divnum ||= 10
387   st_option ||= nil
388   Dir.glob("#{path}/*").each{|f|
389     if File.directory?(f)
390       hname = File.basename(f)
391       st = st_class.new
392       st.storage_path = "#{path}/#{hname}"
393       st.vn_list = @rttable.vnodes
394       st.st_class = st_class
395       st.divnum = st_divnum
396       st.option = st_option
397       @storages[hname] = st
398     end
399   }
400   if @storages.length == 0
401     hname = 'roma'
402     st = st_class.new
403     st.storage_path = "#{path}/#{hname}"
404     st.vn_list = @rttable.vnodes
405     st.st_class = st_class
406     st.divnum = st_divnum
407     st.option = st_option
408     @storages[hname] = st
409   end
410 end
initialize_wb_writer() click to toggle source
    # File lib/roma/romad.rb
228 def initialize_wb_writer
229   @wb_writer = Roma::WriteBehind::FileWriter.new(
230                                                  Roma::Config::WRITEBEHIND_PATH,
231                                                  Roma::Config::WRITEBEHIND_SHIFT_SIZE,
232                                                  @log)
233 
234   @cr_writer = Roma::WriteBehind::StreamWriter.new(@log)
235 end
node_check(nid) click to toggle source
    # File lib/roma/romad.rb
629 def node_check(nid)
630   if @startup && @rttable.enabled_failover == false
631     unless Roma::Messaging::ConPool.instance.check_connection(nid) 
632       @log.info("I'm waiting for booting the #{nid} instance.")
633       return false
634     end
635   end
636   name = async_send_cmd(nid,"whoami\r\n",2)
637   return false unless name
638   if name != @stats.name
639     @log.error("#{nid} has diffarent name.")
640     @log.error("me = \"#{@stats.name}\"  #{nid} = \"#{name}\"")
641     return false
642   end
643   return true
644 end
nodes_check(nodes) click to toggle source
    # File lib/roma/romad.rb
622 def nodes_check(nodes)
623   nodes.each{|nid|
624     return false unless node_check(nid)
625   }
626   return true
627 end
options(argv) click to toggle source
    # File lib/roma/romad.rb
297 def options(argv)
298   opts = OptionParser.new
299   opts.banner="usage:#{File.basename($0)} [options] address"
300 
301   @stats.daemon = false
302   opts.on("-d","--daemon") { |v| @stats.daemon = true }
303 
304   opts.on_tail("-h", "--help", "Show this message") {
305     puts opts; exit
306   }
307 
308   opts.on("-j","--join [address:port]") { |v| @stats.join_ap = v }
309 
310   opts.on("-p", "--port [PORT]") { |v| @stats.port = v }
311 
312   @stats.verbose = false
313   opts.on(nil,"--verbose"){ |v| @stats.verbose = true }
314 
315   opts.on_tail("-v", "--version", "Show version") {
316     puts "romad.rb #{Roma::VERSION}"; exit
317   }
318 
319   opts.on("-n", "--name [name]") { |v| @stats.name = v }
320 
321   ##
322   # "--enabled_repeathost" is deplicated. We will rename it to "--replication_in_host"
323   ##
324   @stats.enabled_repetition_host_in_routing = false
325   opts.on(nil,"--enabled_repeathost", "Allow redundancy to same host"){
326     @stats.enabled_repetition_host_in_routing = true
327     puts "Warning: \"--enabled_repeathost\" is deplicated. Please use \"--replication_in_host\""
328   }
329   opts.on(nil,"--replication_in_host", "Allow redundancy to same host"){
330     @stats.enabled_repetition_host_in_routing = true
331   }
332 
333   @stats.disabled_cmd_protect = false
334   opts.on(nil,"--disabled_cmd_protect", "Command protection disable while starting"){
335     @stats.disabled_cmd_protect = true
336   }
337 
338   opts.on("--config [file path of the config.rb]"){ |v| @stats.config_path = File.expand_path(v) }
339 
340   opts.parse!(argv)
341   raise OptionParser::ParseError.new if argv.length < 1
342   @stats.address = argv[0]
343 
344   @stats.config_path = 'roma/config' unless @stats.config_path
345 
346   unless require @stats.config_path
347     raise "config.rb has already been load outside the romad.rb."
348   end
349 
350   @stats.name = Config::DEFAULT_NAME unless @stats.name
351   @stats.port = Config::DEFAULT_PORT.to_s unless @stats.port
352 
353   unless @stats.port =~ /^\d+$/
354     raise OptionParser::ParseError.new('Port number is not numeric.')
355   end
356 
357   @stats.join_ap.sub!(':','_') if @stats.join_ap
358   if @stats.join_ap && !(@stats.join_ap =~ /^.+_\d+$/)
359     raise OptionParser::ParseError.new('[address:port] can not parse.')
360   end
361 rescue OptionParser::ParseError => e
362   $stderr.puts e.message
363   $stderr.puts opts.help
364   exit 1
365 end
receive_routing_dump(nid, cmd) click to toggle source
    # File lib/roma/romad.rb
520 def receive_routing_dump(nid, cmd)
521   con = Messaging::ConPool.instance.get_connection(nid)
522   con.write(cmd)
523   unless select [con], nil, nil, 1
524     con.close
525     return nil
526   end
527   len = con.gets
528   if len.to_i <= 0
529     con.close
530     return nil
531   end
532 
533   rcv=''
534   while(rcv.length != len.to_i)
535     rcv = rcv + con.read(len.to_i - rcv.length)
536   end
537   con.read(2)
538   con.gets
539   Messaging::ConPool.instance.return_connection(nid,con)
540   rcv
541 rescue Exception
542   nil
543 end
routing_hash_comparison(nid,id='0') click to toggle source
    # File lib/roma/romad.rb
700 def routing_hash_comparison(nid,id='0')
701   return :skip if @stats.run_join || @stats.run_recover || @stats.run_balance
702 
703   h = async_send_cmd(nid,"mklhash #{id}\r\n")
704   if h && h.start_with?("ERROR") == false && @rttable.mtree.get(id) != h
705     if (id.length - 1) == @rttable.div_bits
706       sync_routing(nid,id)
707     else
708       routing_hash_comparison(nid,"#{id}0")
709       routing_hash_comparison(nid,"#{id}1")
710     end
711     return :inconsistent
712   end
713   :consistent
714 end
start_sync_routing_process() click to toggle source
    # File lib/roma/romad.rb
658 def start_sync_routing_process
659   return if @stats.run_join || @stats.run_recover || @stats.run_balance || @stats.run_sync_routing
660 
661   nodes = @rttable.nodes
662   return if nodes.length == 1 && nodes[0] == @stats.ap_str
663 
664   @stats.run_sync_routing = true
665 
666   idx=nodes.index(@stats.ap_str)
667   unless idx
668     @log.error("My node-id(=#{@stats.ap_str}) does not found in the routingtable.")
669     EventMachine::stop_event_loop
670     return
671   end
672   t = Thread.new{
673     begin
674       ret = routing_hash_comparison(nodes[idx-1])
675       if ret == :inconsistent
676         @log.info("create nodes from v_idx");
677 
678         @rttable.create_nodes_from_v_idx
679         begin
680           con = Roma::Messaging::ConPool.instance.get_connection(nodes[idx-1])
681           con.write("create_nodes_from_v_idx\r\n")
682           if con.gets == "CREATED\r\n"
683             Roma::Messaging::ConPool.instance.return_connection(nodes[idx-1], con)
684           else
685             @log.error("get busy result in create_nodes_from_v_idx command from #{nodes[idx-1]}.")
686             con.close
687           end
688         rescue Exception =>e
689           @log.error("create_nodes_from_v_idx command unreachable to the #{nodes[idx-1]}.")
690         end
691       end
692     rescue Exception =>e
693       @log.error("#{e}\n#{$@}")
694     end
695     @stats.run_sync_routing = false
696   }
697   t[:name] = 'sync_routing'
698 end
stop() click to toggle source
    # File lib/roma/romad.rb
783 def stop
784   @storages.each_value{|st|
785     st.closedb
786   }
787   if @rttable.instance_of?(Roma::Routing::ChurnbasedRoutingTable)
788     @rttable.close_log
789   end
790   @log.info("Romad has stopped: #{@stats.ap_str}")
791 end
sync_routing(nid,id) click to toggle source
    # File lib/roma/romad.rb
716 def sync_routing(nid,id)
717   vn = @rttable.mtree.to_vn(id)
718   @log.warn("vn=#{vn} inconsistent")
719 
720   res = async_send_cmd(nid,"getroute #{vn}\r\n")
721   return if res == nil || res.start_with?("ERROR")
722   clk,*nids = res.split(' ')
723   clk = @rttable.set_route(vn, clk.to_i, nids)
724 
725   if clk.is_a?(Integer) == false
726     clk,nids = @rttable.search_nodes_with_clk(vn)
727     cmd = "setroute #{vn} #{clk-1}"
728     nids.each{|nid2| cmd << " #{nid2}" }
729     async_send_cmd(nid,"#{cmd}\r\n")
730   end
731 end
timer() click to toggle source
    # File lib/roma/romad.rb
545 def timer
546   t = Thread.new do
547     loop do
548       sleep 1
549       timer_event_1sec
550     end
551   end
552   t[:name] = 'timer_1sec'
553   t = Thread.new do
554     loop do
555       sleep 10
556       timer_event_10sec
557     end
558   end
559   t[:name] = 'timer_10sec'
560 end
timer_event_10sec() click to toggle source
    # File lib/roma/romad.rb
577 def timer_event_10sec
578   if @startup && @rttable.enabled_failover == false
579     @log.debug("nodes_check start")
580     nodes=@rttable.nodes
581     nodes.delete(@stats.ap_str)
582     if nodes_check(nodes)
583       @log.info("all nodes started")
584       AsyncProcess::queue.clear
585       @rttable.enabled_failover = true
586       Command::Receiver::mk_evlist
587       @startup = false
588     end
589   elsif @rttable.enabled_failover == false
590     @log.warn("failover disable now!!")
591   else
592     version_check
593     @rttable.delete_old_trans(@stats.routing_trans_timeout)
594     start_sync_routing_process
595   end
596 
597   if (@rttable.enabled_failover &&
598       @stats.run_storage_clean_up == false &&
599       @stats.run_balance == false &&
600       @stats.run_recover == false &&
601       @stats.run_iterate_storage == false &&
602       @stats.run_join == false &&
603       @stats.run_receive_a_vnode.empty? &&
604       @stats.do_clean_up?)
605     Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_storage_clean_up_process'))
606   end
607 
608   if @cr_writer.run_replication
609     if @cr_writer.change_mklhash?
610       nid = @cr_writer.replica_nodelist.sample
611       @cr_writer.update_mklhash(nid)
612       @cr_writer.update_nodelist(nid)
613       @cr_writer.update_rttable(nid)
614     end
615   end
616 
617   @stats.clear_counters
618 rescue Exception =>e
619   @log.error("#{e}\n#{$@}")
620 end
timer_event_1sec() click to toggle source
    # File lib/roma/romad.rb
562 def timer_event_1sec
563   if @rttable.enabled_failover
564     nodes=@rttable.nodes
565     nodes.delete(@stats.ap_str)
566     nodes_check(nodes)
567   end
568 
569   if (@stats.run_join || @stats.run_recover || @stats.run_balance) &&
570       @stats.run_storage_clean_up
571     stop_clean_up
572   end
573 rescue Exception =>e
574   @log.error("#{e}\n#{$@}")
575 end
version_check() click to toggle source
    # File lib/roma/romad.rb
646 def version_check
647   nodes=@rttable.nodes
648   nodes.each{|nid|
649     vs = async_send_cmd(nid,"version\r\n",2)
650     next unless vs
651     if /VERSION\s(?:ROMA-)?(\d+)\.(\d+)\.(\d+)/ =~ vs
652       ver = ($1.to_i << 16) + ($2.to_i << 8) + $3.to_i
653       @rttable.set_version(nid, ver)
654     end
655   }
656 end