class Girl::P1Worker
Public Class Methods
new( mirrord_host, mirrord_port, appd_host, appd_port, im )
click to toggle source
initialize
# File lib/girl/p1_worker.rb, line 7 def initialize( mirrord_host, mirrord_port, appd_host, appd_port, im ) @mirrord_host = mirrord_host @mirrord_port = mirrord_port @appd_addr = Socket.sockaddr_in( appd_port, appd_host ) @im = im @reads = [] @writes = [] @roles = {} # sock => :dotr / :ctl / :p1 / :app @p1_infos = ConcurrentHash.new @app_infos = ConcurrentHash.new new_a_pipe new_a_ctl end
Public Instance Methods
looping()
click to toggle source
looping
# File lib/girl/p1_worker.rb, line 25 def looping puts "#{ Time.new } looping" loop_renew_ctl loop_check_state loop do rs, ws = IO.select( @reads, @writes ) rs.each do | sock | role = @roles[ sock ] case role when :dotr then read_dotr( sock ) when :ctl then read_ctl( sock ) when :p1 then read_p1( sock ) when :app then read_app( sock ) else puts "#{ Time.new } read unknown role #{ role }" close_sock( sock ) end end ws.each do | sock | role = @roles[ sock ] case role when :p1 then write_p1( sock ) when :app then write_app( sock ) else puts "#{ Time.new } write unknown role #{ role }" close_sock( sock ) end end end rescue Interrupt => e puts e.class quit! end
quit!()
click to toggle source
quit!
# File lib/girl/p1_worker.rb, line 73 def quit! # puts "debug exit" exit end
Private Instance Methods
add_app_wbuff( app, data )
click to toggle source
add app wbuff
# File lib/girl/p1_worker.rb, line 83 def add_app_wbuff( app, data ) return if app.nil? || app.closed? app_info = @app_infos[ app ] app_info[ :wbuff ] << data app_info[ :last_recv_at ] = Time.new add_write( app ) if app_info[ :wbuff ].bytesize >= WBUFF_LIMIT then p1 = app_info[ :p1 ] unless p1.closed? then puts "#{ Time.new } pause p1" @reads.delete( p1 ) p1_info = @p1_infos[ p1 ] p1_info[ :paused ] = true end end end
add_p1_wbuff( p1, data )
click to toggle source
add p1 wbuff
# File lib/girl/p1_worker.rb, line 105 def add_p1_wbuff( p1, data ) return if p1.nil? || p1.closed? p1_info = @p1_infos[ p1 ] p1_info[ :wbuff ] << data add_write( p1 ) if p1_info[ :wbuff ].bytesize >= WBUFF_LIMIT then app = p1_info[ :app ] unless app.closed? then puts "#{ Time.new } pause app" @reads.delete( app ) app_info = @app_infos[ app ] app_info[ :paused ] = true end end end
add_read( sock, role = nil )
click to toggle source
add read
# File lib/girl/p1_worker.rb, line 126 def add_read( sock, role = nil ) return if sock.nil? || sock.closed? || @reads.include?( sock ) @reads << sock if role then @roles[ sock ] = role end end
add_write( sock )
click to toggle source
add write
# File lib/girl/p1_worker.rb, line 138 def add_write( sock ) return if sock.nil? || sock.closed? || @writes.include?( sock ) @writes << sock end
close_app( app )
click to toggle source
close app
# File lib/girl/p1_worker.rb, line 146 def close_app( app ) return if app.nil? || app.closed? puts "#{ Time.new } close app" close_sock( app ) app_info = @app_infos.delete( app ) if app_info then close_p1( app_info[ :p1 ] ) end end
close_ctl()
click to toggle source
close ctl
# File lib/girl/p1_worker.rb, line 160 def close_ctl return if @ctl.nil? || @ctl.closed? close_sock( @ctl ) end
close_p1( p1 )
click to toggle source
close p1
# File lib/girl/p1_worker.rb, line 168 def close_p1( p1 ) return if p1.nil? || p1.closed? puts "#{ Time.new } close p1" close_sock( p1 ) @p1_infos.delete( p1 ) end
close_read_app( app )
click to toggle source
close read app
# File lib/girl/p1_worker.rb, line 178 def close_read_app( app ) return if app.nil? || app.closed? # puts "debug close read app" app.close_read @reads.delete( app ) if app.closed? then # puts "debug app closed" @writes.delete( app ) @roles.delete( app ) @app_infos.delete( app ) end end
close_read_p1( p1 )
click to toggle source
close read p1
# File lib/girl/p1_worker.rb, line 195 def close_read_p1( p1 ) return if p1.nil? || p1.closed? # puts "debug close read p1" p1.close_read @reads.delete( p1 ) if p1.closed? then # puts "debug p1 closed" @writes.delete( p1 ) @roles.delete( p1 ) @p1_infos.delete( p1 ) end end
close_sock( sock )
click to toggle source
close sock
# File lib/girl/p1_worker.rb, line 212 def close_sock( sock ) return if sock.nil? || sock.closed? sock.close @reads.delete( sock ) @writes.delete( sock ) @roles.delete( sock ) end
close_write_app( app )
click to toggle source
close write app
# File lib/girl/p1_worker.rb, line 223 def close_write_app( app ) return if app.nil? || app.closed? # puts "debug close write app" app.close_write @writes.delete( app ) if app.closed? then # puts "debug app closed" @reads.delete( app ) @roles.delete( app ) @app_infos.delete( app ) end end
close_write_p1( p1 )
click to toggle source
close write p1
# File lib/girl/p1_worker.rb, line 240 def close_write_p1( p1 ) return if p1.nil? || p1.closed? # puts "debug close write p1" p1.close_write @writes.delete( p1 ) if p1.closed? then # puts "debug p1 closed" @reads.delete( p1 ) @roles.delete( p1 ) @p1_infos.delete( p1 ) end end
loop_check_state()
click to toggle source
loop check state
# File lib/girl/p1_worker.rb, line 257 def loop_check_state Thread.new do loop do sleep CHECK_STATE_INTERVAL now = Time.new @app_infos.select{ | app, _ | !app.closed? }.each do | app, app_info | last_recv_at = app_info[ :last_recv_at ] || app_info[ :created_at ] last_sent_at = app_info[ :last_sent_at ] || app_info[ :created_at ] is_expire = ( now - last_recv_at >= EXPIRE_AFTER ) && ( now - last_sent_at >= EXPIRE_AFTER ) if is_expire then puts "#{ Time.new } expire app" app_info[ :closing ] = true next_tick elsif app_info[ :paused ] then p1 = app_info[ :p1 ] unless p1.closed? then p1_info = @p1_infos[ p1 ] if p1_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume app" add_read( app ) app_info[ :paused ] = false next_tick end end end end @p1_infos.select{ | p1, info | !p1.closed? && info[ :paused ] }.each do | p1, p1_info | app = p1_info[ :app ] unless app.closed? then app_info = @app_infos[ app ] if app_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume p1" add_read( p1 ) p1_info[ :paused ] = false next_tick end end end end end end
loop_renew_ctl()
click to toggle source
loop renew ctl
# File lib/girl/p1_worker.rb, line 309 def loop_renew_ctl Thread.new do loop do sleep RENEW_CTL_INTERVAL if @ctl && !@ctl.closed? && !@ctl_info[ :closing ] then @ctl_info[ :closing ] = true next_tick end end end end
new_a_ctl()
click to toggle source
new a ctl
# File lib/girl/p1_worker.rb, line 325 def new_a_ctl ctl = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 ) ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 ) if RUBY_PLATFORM.include?( 'linux' ) then ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 ) end mirrord_port = @mirrord_port + 10.times.to_a.sample mirrord_addr = Socket.sockaddr_in( mirrord_port, @mirrord_host ) @ctl = ctl @ctl_info = { mirrord_addr: mirrord_addr, closing: false } add_read( ctl, :ctl ) puts "#{ Time.new } send im #{ @im.inspect } #{ @mirrord_host } #{ mirrord_port }" send_im end
new_a_pipe()
click to toggle source
new a pipe
# File lib/girl/p1_worker.rb, line 350 def new_a_pipe dotr, dotw = IO.pipe @dotw = dotw add_read( dotr, :dotr ) end
new_app_and_p1( p1d_port, p2_id )
click to toggle source
new app and p1
# File lib/girl/p1_worker.rb, line 359 def new_app_and_p1( p1d_port, p2_id ) app = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) app.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 ) begin app.connect_nonblock( @appd_addr ) rescue IO::WaitWritable rescue Exception => e puts "#{ Time.new } app connect appd addr #{ e.class }" app.close return end p1 = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) p1.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 ) begin p1.connect_nonblock( Socket.sockaddr_in( p1d_port, @mirrord_host ) ) rescue IO::WaitWritable rescue Exception => e puts "#{ Time.new } connect p1d #{ e.class }" app.close p1.close return end @app_infos[ app ] = { wbuff: '', created_at: Time.new, last_recv_at: nil, last_sent_at: nil, closing: false, closing_write: false, paused: false, p1: p1 } @p1_infos[ p1 ] = { wbuff: [ p2_id ].pack( 'Q>' ), closing_write: false, paused: false, app: app } add_read( app, :app ) add_read( p1, :p1 ) add_write( p1 ) puts "#{ Time.new } new app and p1 #{ p1d_port } #{ p2_id } app infos #{ @app_infos.size } p1 infos #{ @p1_infos.size }" end
next_tick()
click to toggle source
next tick
# File lib/girl/p1_worker.rb, line 412 def next_tick @dotw.write( '.' ) end
read_app( app )
click to toggle source
read app
# File lib/girl/p1_worker.rb, line 511 def read_app( app ) if app.closed? then puts "#{ Time.new } read app but app closed?" return end app_info = @app_infos[ app ] p1 = app_info[ :p1 ] begin data = app.read_nonblock( READ_SIZE ) rescue Exception => e puts "#{ Time.new } read app #{ e.class }" close_read_app( app ) set_p1_closing_write( p1 ) return end add_p1_wbuff( p1, data ) end
read_ctl( ctl )
click to toggle source
read ctl
# File lib/girl/p1_worker.rb, line 467 def read_ctl( ctl ) if ctl.closed? then puts "#{ Time.new } read ctl but ctl closed?" return end data, addrinfo, rflags, *controls = ctl.recvmsg if addrinfo.to_sockaddr != @ctl_info[ :mirrord_addr ] then puts "#{ Time.new } mirrord addr not match #{ addrinfo.inspect }" return end p1d_port, p2_id = data.unpack( 'nQ>' ) new_app_and_p1( p1d_port, p2_id ) end
read_dotr( dotr )
click to toggle source
read dotr
# File lib/girl/p1_worker.rb, line 453 def read_dotr( dotr ) dotr.read_nonblock( READ_SIZE ) if @ctl && !@ctl.closed? && @ctl_info[ :closing ] then close_ctl new_a_ctl end @app_infos.select{ | _, info | info[ :closing ] }.keys.each{ | app | close_app( app ) } end
read_p1( p1 )
click to toggle source
read p1
# File lib/girl/p1_worker.rb, line 487 def read_p1( p1 ) if p1.closed? then puts "#{ Time.new } read p1 but p1 closed?" return end p1_info = @p1_infos[ p1 ] app = p1_info[ :app ] begin data = p1.read_nonblock( READ_SIZE ) rescue Exception => e puts "#{ Time.new } read p1 #{ e.class }" close_read_p1( p1 ) set_app_closing_write( app ) return end add_app_wbuff( app, data ) end
send_im()
click to toggle source
send im
# File lib/girl/p1_worker.rb, line 419 def send_im begin @ctl.sendmsg( @im, 0, @ctl_info[ :mirrord_addr ] ) rescue Exception => e puts "#{ Time.new } ctl sendmsg #{ e.class }" end end
set_app_closing_write( app )
click to toggle source
set app closing write
# File lib/girl/p1_worker.rb, line 430 def set_app_closing_write( app ) return if app.nil? || app.closed? app_info = @app_infos[ app ] return if app_info[ :closing_write ] app_info[ :closing_write ] = true add_write( app ) end
set_p1_closing_write( p1 )
click to toggle source
set p1 closing write
# File lib/girl/p1_worker.rb, line 441 def set_p1_closing_write( p1 ) return if p1.nil? || p1.closed? p1_info = @p1_infos[ p1 ] return if p1_info[ :closing_write ] # puts "debug set p1 closing write" p1_info[ :closing_write ] = true add_write( p1 ) end
write_app( app )
click to toggle source
write app
# File lib/girl/p1_worker.rb, line 578 def write_app( app ) if app.closed? then puts "#{ Time.new } write app but app closed?" return end app_info = @app_infos[ app ] data = app_info[ :wbuff ] # 写前为空,处理关闭写 if data.empty? then if app_info[ :closing_write ] then close_write_app( app ) else @writes.delete( app ) end return end # 写入 begin written = app.write_nonblock( data ) rescue Exception => e puts "#{ Time.new } write app #{ e.class }" close_write_app( app ) close_read_p1( app_info[ :p1 ] ) return end data = data[ written..-1 ] app_info[ :wbuff ] = data end
write_p1( p1 )
click to toggle source
write p1
# File lib/girl/p1_worker.rb, line 535 def write_p1( p1 ) if p1.closed? then puts "#{ Time.new } write p1 but p1 closed?" return end p1_info = @p1_infos[ p1 ] app = p1_info[ :app ] data = p1_info[ :wbuff ] # 写前为空,处理关闭写 if data.empty? then if p1_info[ :closing_write ] then close_write_p1( p1 ) else @writes.delete( p1 ) end return end # 写入 begin written = p1.write_nonblock( data ) rescue Exception => e puts "#{ Time.new } write p1 #{ e.class }" close_write_p1( p1 ) close_read_app( app ) return end data = data[ written..-1 ] p1_info[ :wbuff ] = data unless app.closed? then app_info = @app_infos[ app ] app_info[ :last_sent_at ] = Time.new end end