class Tribunus
Constants
- HEARTBEAT_LOOP_INTERVAL
- HEARTBEAT_SECONDS
- RomadHost
protocol
update:<reply_ocunt>: <romad_hostname> [<romad_port> …]
- TIMEOUT_SECONDS
Public Class Methods
new(romad_hostname,romad_ports,options={})
click to toggle source
# File lib/roma/tools/tribunus.rb 32 def initialize(romad_hostname,romad_ports,options={}) 33 @multi_addr=options[:multicast_addr]||MULTICAST_ADDR 34 @port=options[:udp_port]||UDP_PORT 35 @romad_work_dir=options[:romad_work_dir]||ROMAD_WORK_DIR 36 @ruby_command_name=options[:ruby_command_name]||"ruby" 37 @verbose=options[:verbose] 38 log [:initalized,@multi_addr,@port,@romad_work_dir,@ruby_command_name] 39 40 @threads=[] 41 @romads={} #port => pid 42 @local_romad_host=RomadHost.new(romad_hostname,romad_ports,nil) 43 44 @mutex=Mutex.new 45 @remote_servers={} #ipaddr => RomadHost 46 47 end
Public Instance Methods
choose_rhost()
click to toggle source
# File lib/roma/tools/tribunus.rb 191 def choose_rhost 192 @remote_servers.each do|ipaddr,rhost| 193 unless rhost.ports.empty? 194 return rhost 195 end 196 end 197 nil 198 end
heartbeat_loop()
click to toggle source
# File lib/roma/tools/tribunus.rb 144 def heartbeat_loop 145 loop do 146 delete_ipaddrs=[] 147 @remote_servers.each do|ipaddr,host| 148 149 if host.updated_at+TIMEOUT_SECONDS < Time.now 150 delete_ipaddrs << ipaddr 151 elsif host.updated_at+HEARTBEAT_SECONDS < Time.now 152 unicast(ipaddr,update_message(0)) 153 elsif host.ports.empty? 154 unicast(ipaddr,update_message(1)) 155 end 156 end 157 158 @mutex.synchronize do 159 delete_ipaddrs.each do|ipaddr| 160 @remote_servers.delete(ipaddr) 161 end 162 end 163 164 165 166 167 sleep(HEARTBEAT_LOOP_INTERVAL) 168 end 169 rescue =>e 170 p e 171 end
join()
click to toggle source
# File lib/roma/tools/tribunus.rb 220 def join 221 @threads.each{|t|t.join} 222 end
multicast(msg)
click to toggle source
# File lib/roma/tools/tribunus.rb 235 def multicast(msg) 236 unicast(@multi_addr,msg) 237 end
receive_update_command(ipaddr,reply_count,params)
click to toggle source
# File lib/roma/tools/tribunus.rb 89 def receive_update_command(ipaddr,reply_count,params) 90 param_ary=params.strip.split(/\s+/) 91 unless param_ary.empty? 92 hostname=param_ary[0] 93 ports=param_ary[1..-1].map{|port| port.to_i} 94 rhost=RomadHost.new(hostname,ports,Time.now) 95 @remote_servers[ipaddr]=rhost 96 if reply_count>0 97 unicast(ipaddr,update_message(reply_count-1)) 98 end 99 end 100 end
server_loop()
click to toggle source
# File lib/roma/tools/tribunus.rb 126 def server_loop 127 sock=UDPSocket.new 128 sock.bind('0.0.0.0',@port) 129 sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, IPAddr.new(MULTICAST_ADDR).hton+IPAddr.new('0.0.0.0').hton) 130 log 'start_listen' 131 Socket.udp_server_loop_on([sock]) do|msg,msg_src| 132 log [:received ,msg,msg_src] 133 if from_remote?(msg_src.remote_address.ip_address) 134 run_command(msg_src.remote_address.ip_address,msg) 135 end 136 end 137 end
set_trap()
click to toggle source
# File lib/roma/tools/tribunus.rb 173 def set_trap 174 [:INT,:TERM,:HUP].each do|sig| 175 Signal.trap(sig){ Process.kill(sig,*@romads.values);exit(1) } 176 end 177 end
spawn_new_roma_ring()
click to toggle source
# File lib/roma/tools/tribunus.rb 55 def spawn_new_roma_ring 56 spawn_romads(nil,nil) 57 end
spawn_romad_join(port,remote_host,remote_port)
click to toggle source
# File lib/roma/tools/tribunus.rb 83 def spawn_romad_join(port,remote_host,remote_port) 84 pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname,"-j","#{remote_host}_#{remote_port}",:chdir=>@romad_work_dir) 85 @romads[port]=pid 86 end
spawn_romads(remote_host,remote_port)
click to toggle source
# File lib/roma/tools/tribunus.rb 60 def spawn_romads(remote_host,remote_port) 61 nodes=@local_romad_host.ports.map do|port| 62 "#{@local_romad_host.hostname}_#{port}" 63 end 64 nodes << "#{remote_host}_#{remote_port}" if remote_host 65 pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,MKROUTE_PATH,*nodes,:chdir=>@romad_work_dir) 66 if(Process.waitpid2(pid)[1].to_i!=0) 67 raise "failed to make route" 68 end 69 70 @local_romad_host.ports.each do|port| 71 pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname, :chdir=>@romad_work_dir) 72 @romads[port]=pid 73 end 74 end
spawn_romads_join(remote_host,remote_port)
click to toggle source
# File lib/roma/tools/tribunus.rb 76 def spawn_romads_join(remote_host,remote_port) 77 @local_romad_host.ports.map do|port| 78 spawn_romad_join(port,remote_host,remote_port) 79 end 80 end
start_discover()
click to toggle source
# File lib/roma/tools/tribunus.rb 205 def start_discover 206 prepare_to_start 207 sleep(0.2) 208 multicast(update_message(1,true)) 209 10.times{sleep(0.3)} 210 rhost=choose_rhost 211 if rhost 212 spawn_romads_join(rhost.hostname,rhost.ports.first) 213 else 214 $stderr.puts "no server responded" 215 exit 1 216 end 217 218 end
start_join(host,port)
click to toggle source
# File lib/roma/tools/tribunus.rb 199 def start_join(host,port) 200 prepare_to_start 201 sleep(0.2) 202 spawn_romads_join(host,port) 203 end
start_new_ring()
click to toggle source
# File lib/roma/tools/tribunus.rb 186 def start_new_ring 187 prepare_to_start 188 spawn_new_roma_ring 189 end
unicast(ipaddr,msg)
click to toggle source
# File lib/roma/tools/tribunus.rb 224 def unicast(ipaddr,msg) 225 log [:message, ipaddr,msg] 226 s=UDPSocket.new 227 begin 228 s.connect(ipaddr,@port) 229 s.sendmsg(msg) 230 ensure 231 s.close 232 end 233 end
update_message(reply_count,initial=false)
click to toggle source
# File lib/roma/tools/tribunus.rb 116 def update_message(reply_count,initial=false) 117 msg="update:#{reply_count}: #{@local_romad_host.hostname}" 118 if !initial && !@romads.keys.empty? 119 msg+=" " 120 msg+= @romads.keys.join(' ') 121 end 122 log [:msg, msg,@romads] 123 msg 124 end
Private Instance Methods
from_remote?(ipaddr)
click to toggle source
# File lib/roma/tools/tribunus.rb 49 def from_remote?(ipaddr) 50 from_remote= !Socket.ip_address_list.any?{|addr|addr.ip_address==ipaddr} 51 end
log(obj)
click to toggle source
# File lib/roma/tools/tribunus.rb 21 def log(obj) 22 if @verbose 23 if obj.is_a? String 24 $stderr.puts obj 25 else 26 $stderr.puts obj.inspect 27 end 28 end 29 end
prepare_to_start()
click to toggle source
# File lib/roma/tools/tribunus.rb 179 def prepare_to_start 180 set_trap 181 @threads << Thread.start{self.server_loop} 182 @threads << Thread.start{self.heartbeat_loop} 183 end
run_command(ipaddr,msg)
click to toggle source
# File lib/roma/tools/tribunus.rb 102 def run_command(ipaddr,msg) 103 match_data=/\A(.*):(\d+):(.*)/.match(msg) 104 if(match_data) 105 command=match_data[1] 106 reply_count=match_data[2].to_i 107 rest=match_data[3] 108 case command 109 when "update" 110 receive_update_command(ipaddr,reply_count,rest) 111 end 112 end 113 end