class Tribunus

Constants

HEARTBEAT_LOOP_INTERVAL
HEARTBEAT_SECONDS
RomadHost

protocol

update:<reply_ocunt>: <romad_hostname> [<romad_port> …]

TIMEOUT_SECONDS

Public Class Methods

new(romad_hostname,romad_ports,options={}) click to toggle source
   # File lib/roma/tools/tribunus.rb
32 def initialize(romad_hostname,romad_ports,options={})
33   @multi_addr=options[:multicast_addr]||MULTICAST_ADDR
34   @port=options[:udp_port]||UDP_PORT
35   @romad_work_dir=options[:romad_work_dir]||ROMAD_WORK_DIR
36   @ruby_command_name=options[:ruby_command_name]||"ruby"
37   @verbose=options[:verbose]
38   log [:initalized,@multi_addr,@port,@romad_work_dir,@ruby_command_name]
39 
40   @threads=[]
41   @romads={} #port => pid
42   @local_romad_host=RomadHost.new(romad_hostname,romad_ports,nil)
43 
44   @mutex=Mutex.new
45   @remote_servers={} #ipaddr => RomadHost
46 
47 end

Public Instance Methods

choose_rhost() click to toggle source
    # File lib/roma/tools/tribunus.rb
191 def choose_rhost
192   @remote_servers.each do|ipaddr,rhost|
193     unless rhost.ports.empty?
194       return rhost
195     end
196   end
197   nil
198 end
heartbeat_loop() click to toggle source
    # File lib/roma/tools/tribunus.rb
144 def heartbeat_loop
145   loop do
146     delete_ipaddrs=[]
147     @remote_servers.each do|ipaddr,host|
148 
149       if host.updated_at+TIMEOUT_SECONDS < Time.now
150         delete_ipaddrs << ipaddr
151       elsif host.updated_at+HEARTBEAT_SECONDS < Time.now
152         unicast(ipaddr,update_message(0))
153       elsif host.ports.empty?
154         unicast(ipaddr,update_message(1))
155       end
156     end
157 
158     @mutex.synchronize do
159       delete_ipaddrs.each do|ipaddr|
160         @remote_servers.delete(ipaddr)
161       end
162     end
163 
164 
165 
166 
167     sleep(HEARTBEAT_LOOP_INTERVAL)
168   end
169 rescue =>e
170   p e
171 end
join() click to toggle source
    # File lib/roma/tools/tribunus.rb
220 def join
221   @threads.each{|t|t.join}
222 end
multicast(msg) click to toggle source
    # File lib/roma/tools/tribunus.rb
235 def multicast(msg)
236   unicast(@multi_addr,msg)
237 end
receive_update_command(ipaddr,reply_count,params) click to toggle source
    # File lib/roma/tools/tribunus.rb
 89 def receive_update_command(ipaddr,reply_count,params)
 90   param_ary=params.strip.split(/\s+/)
 91   unless param_ary.empty?
 92     hostname=param_ary[0]
 93     ports=param_ary[1..-1].map{|port| port.to_i}
 94     rhost=RomadHost.new(hostname,ports,Time.now)
 95     @remote_servers[ipaddr]=rhost
 96     if reply_count>0
 97       unicast(ipaddr,update_message(reply_count-1))
 98     end
 99   end
100 end
server_loop() click to toggle source
    # File lib/roma/tools/tribunus.rb
126 def server_loop
127   sock=UDPSocket.new
128   sock.bind('0.0.0.0',@port)
129   sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, IPAddr.new(MULTICAST_ADDR).hton+IPAddr.new('0.0.0.0').hton)
130   log 'start_listen'
131   Socket.udp_server_loop_on([sock]) do|msg,msg_src|
132       log [:received ,msg,msg_src]
133     if from_remote?(msg_src.remote_address.ip_address)
134       run_command(msg_src.remote_address.ip_address,msg)
135     end
136   end
137 end
set_trap() click to toggle source
    # File lib/roma/tools/tribunus.rb
173 def set_trap
174   [:INT,:TERM,:HUP].each do|sig|
175     Signal.trap(sig){ Process.kill(sig,*@romads.values);exit(1)  }
176   end
177 end
spawn_new_roma_ring() click to toggle source
   # File lib/roma/tools/tribunus.rb
55 def spawn_new_roma_ring
56   spawn_romads(nil,nil)
57 end
spawn_romad_join(port,remote_host,remote_port) click to toggle source
   # File lib/roma/tools/tribunus.rb
83 def spawn_romad_join(port,remote_host,remote_port)
84   pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname,"-j","#{remote_host}_#{remote_port}",:chdir=>@romad_work_dir)
85   @romads[port]=pid
86 end
spawn_romads(remote_host,remote_port) click to toggle source
   # File lib/roma/tools/tribunus.rb
60 def spawn_romads(remote_host,remote_port)
61   nodes=@local_romad_host.ports.map do|port|
62     "#{@local_romad_host.hostname}_#{port}"
63   end
64   nodes << "#{remote_host}_#{remote_port}" if remote_host
65   pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,MKROUTE_PATH,*nodes,:chdir=>@romad_work_dir)
66   if(Process.waitpid2(pid)[1].to_i!=0)
67     raise "failed to make route"
68   end
69 
70   @local_romad_host.ports.each do|port|
71     pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname, :chdir=>@romad_work_dir)
72     @romads[port]=pid
73   end
74 end
spawn_romads_join(remote_host,remote_port) click to toggle source
   # File lib/roma/tools/tribunus.rb
76 def spawn_romads_join(remote_host,remote_port)
77   @local_romad_host.ports.map do|port|
78     spawn_romad_join(port,remote_host,remote_port)
79   end
80 end
start_discover() click to toggle source
    # File lib/roma/tools/tribunus.rb
205 def start_discover
206   prepare_to_start
207   sleep(0.2)
208   multicast(update_message(1,true))
209   10.times{sleep(0.3)}
210   rhost=choose_rhost
211   if rhost
212     spawn_romads_join(rhost.hostname,rhost.ports.first)
213   else
214     $stderr.puts "no server responded"
215     exit 1
216   end
217 
218 end
start_join(host,port) click to toggle source
    # File lib/roma/tools/tribunus.rb
199 def start_join(host,port)
200   prepare_to_start
201   sleep(0.2)
202   spawn_romads_join(host,port)
203 end
start_new_ring() click to toggle source
    # File lib/roma/tools/tribunus.rb
186 def start_new_ring
187   prepare_to_start
188   spawn_new_roma_ring
189 end
unicast(ipaddr,msg) click to toggle source
    # File lib/roma/tools/tribunus.rb
224 def unicast(ipaddr,msg)
225   log [:message, ipaddr,msg]
226   s=UDPSocket.new
227   begin
228     s.connect(ipaddr,@port)
229     s.sendmsg(msg)
230   ensure
231       s.close
232   end
233 end
update_message(reply_count,initial=false) click to toggle source
    # File lib/roma/tools/tribunus.rb
116 def update_message(reply_count,initial=false)
117   msg="update:#{reply_count}: #{@local_romad_host.hostname}"
118   if !initial && !@romads.keys.empty?
119     msg+=" "
120     msg+= @romads.keys.join(' ')
121   end
122   log [:msg, msg,@romads]
123   msg
124 end

Private Instance Methods

from_remote?(ipaddr) click to toggle source
   # File lib/roma/tools/tribunus.rb
49 def from_remote?(ipaddr)
50     from_remote= !Socket.ip_address_list.any?{|addr|addr.ip_address==ipaddr}
51 end
log(obj) click to toggle source
   # File lib/roma/tools/tribunus.rb
21 def log(obj)
22   if @verbose
23     if obj.is_a? String
24       $stderr.puts obj
25     else
26       $stderr.puts obj.inspect
27     end
28   end
29 end
prepare_to_start() click to toggle source
    # File lib/roma/tools/tribunus.rb
179 def prepare_to_start
180   set_trap
181   @threads << Thread.start{self.server_loop}
182   @threads << Thread.start{self.heartbeat_loop}
183 end
run_command(ipaddr,msg) click to toggle source
    # File lib/roma/tools/tribunus.rb
102 def run_command(ipaddr,msg)
103   match_data=/\A(.*):(\d+):(.*)/.match(msg)
104   if(match_data)
105     command=match_data[1]
106     reply_count=match_data[2].to_i
107     rest=match_data[3]
108     case command
109     when "update"
110       receive_update_command(ipaddr,reply_count,rest)
111     end
112   end
113 end