class Girl::P1Worker

Public Class Methods

new( mirrord_host, mirrord_port, appd_host, appd_port, im ) click to toggle source

initialize

# File lib/girl/p1_worker.rb, line 7
def initialize( mirrord_host, mirrord_port, appd_host, appd_port, im )
  @mirrord_host = mirrord_host
  @mirrord_port = mirrord_port
  @appd_addr = Socket.sockaddr_in( appd_port, appd_host )
  @im = im
  @reads = []
  @writes = []
  @roles = {} # sock => :dotr / :ctl / :p1 / :app
  @p1_infos = ConcurrentHash.new
  @app_infos = ConcurrentHash.new

  new_a_pipe
  new_a_ctl
end

Public Instance Methods

looping() click to toggle source

looping

# File lib/girl/p1_worker.rb, line 25
def looping
  puts "#{ Time.new } looping"
  loop_renew_ctl
  loop_check_state

  loop do
    rs, ws = IO.select( @reads, @writes )

    rs.each do | sock |
      role = @roles[ sock ]

      case role
      when :dotr then
        read_dotr( sock )
      when :ctl then
        read_ctl( sock )
      when :p1 then
        read_p1( sock )
      when :app then
        read_app( sock )
      else
        puts "#{ Time.new } read unknown role #{ role }"
        close_sock( sock )
      end
    end

    ws.each do | sock |
      role = @roles[ sock ]

      case role
      when :p1 then
        write_p1( sock )
      when :app then
        write_app( sock )
      else
        puts "#{ Time.new } write unknown role #{ role }"
        close_sock( sock )
      end
    end
  end
rescue Interrupt => e
  puts e.class
  quit!
end
quit!() click to toggle source

quit!

# File lib/girl/p1_worker.rb, line 73
def quit!
  # puts "debug exit"
  exit
end

Private Instance Methods

add_app_wbuff( app, data ) click to toggle source

add app wbuff

# File lib/girl/p1_worker.rb, line 83
def add_app_wbuff( app, data )
  return if app.nil? || app.closed?
  app_info = @app_infos[ app ]
  app_info[ :wbuff ] << data
  app_info[ :last_recv_at ] = Time.new
  add_write( app )

  if app_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    p1 = app_info[ :p1 ]

    unless p1.closed? then
      puts "#{ Time.new } pause p1"
      @reads.delete( p1 )
      p1_info = @p1_infos[ p1 ]
      p1_info[ :paused ] = true
    end
  end
end
add_p1_wbuff( p1, data ) click to toggle source

add p1 wbuff

# File lib/girl/p1_worker.rb, line 105
def add_p1_wbuff( p1, data )
  return if p1.nil? || p1.closed?
  p1_info = @p1_infos[ p1 ]
  p1_info[ :wbuff ] << data
  add_write( p1 )

  if p1_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    app = p1_info[ :app ]

    unless app.closed? then
      puts "#{ Time.new } pause app"
      @reads.delete( app )
      app_info = @app_infos[ app ]
      app_info[ :paused ] = true
    end
  end
end
add_read( sock, role = nil ) click to toggle source

add read

# File lib/girl/p1_worker.rb, line 126
def add_read( sock, role = nil )
  return if sock.nil? || sock.closed? || @reads.include?( sock )
  @reads << sock

  if role then
    @roles[ sock ] = role
  end
end
add_write( sock ) click to toggle source

add write

# File lib/girl/p1_worker.rb, line 138
def add_write( sock )
  return if sock.nil? || sock.closed? || @writes.include?( sock )
  @writes << sock
end
close_app( app ) click to toggle source

close app

# File lib/girl/p1_worker.rb, line 146
def close_app( app )
  return if app.nil? || app.closed?
  puts "#{ Time.new } close app"
  close_sock( app )
  app_info = @app_infos.delete( app )

  if app_info then
    close_p1( app_info[ :p1 ] )
  end
end
close_ctl() click to toggle source

close ctl

# File lib/girl/p1_worker.rb, line 160
def close_ctl
  return if @ctl.nil? || @ctl.closed?
  close_sock( @ctl )
end
close_p1( p1 ) click to toggle source

close p1

# File lib/girl/p1_worker.rb, line 168
def close_p1( p1 )
  return if p1.nil? || p1.closed?
  puts "#{ Time.new } close p1"
  close_sock( p1 )
  @p1_infos.delete( p1 )
end
close_read_app( app ) click to toggle source

close read app

# File lib/girl/p1_worker.rb, line 178
def close_read_app( app )
  return if app.nil? || app.closed?
  # puts "debug close read app"
  app.close_read
  @reads.delete( app )

  if app.closed? then
    # puts "debug app closed"
    @writes.delete( app )
    @roles.delete( app )
    @app_infos.delete( app )
  end
end
close_read_p1( p1 ) click to toggle source

close read p1

# File lib/girl/p1_worker.rb, line 195
def close_read_p1( p1 )
  return if p1.nil? || p1.closed?
  # puts "debug close read p1"
  p1.close_read
  @reads.delete( p1 )

  if p1.closed? then
    # puts "debug p1 closed"
    @writes.delete( p1 )
    @roles.delete( p1 )
    @p1_infos.delete( p1 )
  end
end
close_sock( sock ) click to toggle source

close sock

# File lib/girl/p1_worker.rb, line 212
def close_sock( sock )
  return if sock.nil? || sock.closed?
  sock.close
  @reads.delete( sock )
  @writes.delete( sock )
  @roles.delete( sock )
end
close_write_app( app ) click to toggle source

close write app

# File lib/girl/p1_worker.rb, line 223
def close_write_app( app )
  return if app.nil? || app.closed?
  # puts "debug close write app"
  app.close_write
  @writes.delete( app )

  if app.closed? then
    # puts "debug app closed"
    @reads.delete( app )
    @roles.delete( app )
    @app_infos.delete( app )
  end
end
close_write_p1( p1 ) click to toggle source

close write p1

# File lib/girl/p1_worker.rb, line 240
def close_write_p1( p1 )
  return if p1.nil? || p1.closed?
  # puts "debug close write p1"
  p1.close_write
  @writes.delete( p1 )

  if p1.closed? then
    # puts "debug p1 closed"
    @reads.delete( p1 )
    @roles.delete( p1 )
    @p1_infos.delete( p1 )
  end
end
loop_check_state() click to toggle source

loop check state

# File lib/girl/p1_worker.rb, line 257
def loop_check_state
  Thread.new do
    loop do
      sleep CHECK_STATE_INTERVAL
      now = Time.new

      @app_infos.select{ | app, _ | !app.closed? }.each do | app, app_info |
        last_recv_at = app_info[ :last_recv_at ] || app_info[ :created_at ]
        last_sent_at = app_info[ :last_sent_at ] || app_info[ :created_at ]
        is_expire = ( now - last_recv_at >= EXPIRE_AFTER ) && ( now - last_sent_at >= EXPIRE_AFTER )

        if is_expire then
          puts "#{ Time.new } expire app"
          app_info[ :closing ] = true
          next_tick
        elsif app_info[ :paused ] then
          p1 = app_info[ :p1 ]

          unless p1.closed? then
            p1_info = @p1_infos[ p1 ]

            if p1_info[ :wbuff ].bytesize < RESUME_BELOW then
              puts "#{ Time.new } resume app"
              add_read( app )
              app_info[ :paused ] = false
              next_tick
            end
          end
        end
      end

      @p1_infos.select{ | p1, info | !p1.closed? && info[ :paused ] }.each do | p1, p1_info |
        app = p1_info[ :app ]

        unless app.closed? then
          app_info = @app_infos[ app ]

          if app_info[ :wbuff ].bytesize < RESUME_BELOW then
            puts "#{ Time.new } resume p1"
            add_read( p1 )
            p1_info[ :paused ] = false
            next_tick
          end
        end
      end
    end
  end
end
loop_renew_ctl() click to toggle source

loop renew ctl

# File lib/girl/p1_worker.rb, line 309
def loop_renew_ctl
  Thread.new do
    loop do
      sleep RENEW_CTL_INTERVAL

      if @ctl && !@ctl.closed? && !@ctl_info[ :closing ] then
        @ctl_info[ :closing ] = true
        next_tick
      end
    end
  end
end
new_a_ctl() click to toggle source

new a ctl

# File lib/girl/p1_worker.rb, line 325
def new_a_ctl
  ctl = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
  ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 )

  if RUBY_PLATFORM.include?( 'linux' ) then
    ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  end

  mirrord_port = @mirrord_port + 10.times.to_a.sample
  mirrord_addr = Socket.sockaddr_in( mirrord_port, @mirrord_host )

  @ctl = ctl
  @ctl_info = {
    mirrord_addr: mirrord_addr,
    closing: false
  }

  add_read( ctl, :ctl )
  puts "#{ Time.new } send im #{ @im.inspect } #{ @mirrord_host } #{ mirrord_port }"
  send_im
end
new_a_pipe() click to toggle source

new a pipe

# File lib/girl/p1_worker.rb, line 350
def new_a_pipe
  dotr, dotw = IO.pipe
  @dotw = dotw
  add_read( dotr, :dotr )
end
new_app_and_p1( p1d_port, p2_id ) click to toggle source

new app and p1

# File lib/girl/p1_worker.rb, line 359
def new_app_and_p1( p1d_port, p2_id )
  app = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  app.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )

  begin
    app.connect_nonblock( @appd_addr )
  rescue IO::WaitWritable
  rescue Exception => e
    puts "#{ Time.new } app connect appd addr #{ e.class }"
    app.close
    return
  end

  p1 = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  p1.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )

  begin
    p1.connect_nonblock( Socket.sockaddr_in( p1d_port, @mirrord_host ) )
  rescue IO::WaitWritable
  rescue Exception => e
    puts "#{ Time.new } connect p1d #{ e.class }"
    app.close
    p1.close
    return
  end

  @app_infos[ app ] = {
    wbuff: '',
    created_at: Time.new,
    last_recv_at: nil,
    last_sent_at: nil,
    closing: false,
    closing_write: false,
    paused: false,
    p1: p1
  }

  @p1_infos[ p1 ] = {
    wbuff: [ p2_id ].pack( 'Q>' ),
    closing_write: false,
    paused: false,
    app: app
  }

  add_read( app, :app )
  add_read( p1, :p1 )
  add_write( p1 )
  puts "#{ Time.new } new app and p1 #{ p1d_port } #{ p2_id } app infos #{ @app_infos.size } p1 infos #{ @p1_infos.size }"
end
next_tick() click to toggle source

next tick

# File lib/girl/p1_worker.rb, line 412
def next_tick
  @dotw.write( '.' )
end
read_app( app ) click to toggle source

read app

# File lib/girl/p1_worker.rb, line 511
def read_app( app )
  if app.closed? then
    puts "#{ Time.new } read app but app closed?"
    return
  end

  app_info = @app_infos[ app ]
  p1 = app_info[ :p1 ]

  begin
    data = app.read_nonblock( READ_SIZE )
  rescue Exception => e
    puts "#{ Time.new } read app #{ e.class }"
    close_read_app( app )
    set_p1_closing_write( p1 )
    return
  end

  add_p1_wbuff( p1, data )
end
read_ctl( ctl ) click to toggle source

read ctl

# File lib/girl/p1_worker.rb, line 467
def read_ctl( ctl )
  if ctl.closed? then
    puts "#{ Time.new } read ctl but ctl closed?"
    return
  end

  data, addrinfo, rflags, *controls = ctl.recvmsg

  if addrinfo.to_sockaddr != @ctl_info[ :mirrord_addr ] then
    puts "#{ Time.new } mirrord addr not match #{ addrinfo.inspect }"
    return
  end

  p1d_port, p2_id = data.unpack( 'nQ>' )
  new_app_and_p1( p1d_port, p2_id )
end
read_dotr( dotr ) click to toggle source

read dotr

# File lib/girl/p1_worker.rb, line 453
def read_dotr( dotr )
  dotr.read_nonblock( READ_SIZE )

  if @ctl && !@ctl.closed? && @ctl_info[ :closing ] then
    close_ctl
    new_a_ctl
  end

  @app_infos.select{ | _, info | info[ :closing ] }.keys.each{ | app | close_app( app ) }
end
read_p1( p1 ) click to toggle source

read p1

# File lib/girl/p1_worker.rb, line 487
def read_p1( p1 )
  if p1.closed? then
    puts "#{ Time.new } read p1 but p1 closed?"
    return
  end

  p1_info = @p1_infos[ p1 ]
  app = p1_info[ :app ]

  begin
    data = p1.read_nonblock( READ_SIZE )
  rescue Exception => e
    puts "#{ Time.new } read p1 #{ e.class }"
    close_read_p1( p1 )
    set_app_closing_write( app )
    return
  end

  add_app_wbuff( app, data )
end
send_im() click to toggle source

send im

# File lib/girl/p1_worker.rb, line 419
def send_im
  begin
    @ctl.sendmsg( @im, 0, @ctl_info[ :mirrord_addr ] )
  rescue Exception => e
    puts "#{ Time.new } ctl sendmsg #{ e.class }"
  end
end
set_app_closing_write( app ) click to toggle source

set app closing write

# File lib/girl/p1_worker.rb, line 430
def set_app_closing_write( app )
  return if app.nil? || app.closed?
  app_info = @app_infos[ app ]
  return if app_info[ :closing_write ]
  app_info[ :closing_write ] = true
  add_write( app )
end
set_p1_closing_write( p1 ) click to toggle source

set p1 closing write

# File lib/girl/p1_worker.rb, line 441
def set_p1_closing_write( p1 )
  return if p1.nil? || p1.closed?
  p1_info = @p1_infos[ p1 ]
  return if p1_info[ :closing_write ]
  # puts "debug set p1 closing write"
  p1_info[ :closing_write ] = true
  add_write( p1 )
end
write_app( app ) click to toggle source

write app

# File lib/girl/p1_worker.rb, line 578
def write_app( app )
  if app.closed? then
    puts "#{ Time.new } write app but app closed?"
    return
  end

  app_info = @app_infos[ app ]
  data = app_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if app_info[ :closing_write ] then
      close_write_app( app )
    else
      @writes.delete( app )
    end

    return
  end

  # 写入
  begin
    written = app.write_nonblock( data )
  rescue Exception => e
    puts "#{ Time.new } write app #{ e.class }"
    close_write_app( app )
    close_read_p1( app_info[ :p1 ] )
    return
  end

  data = data[ written..-1 ]
  app_info[ :wbuff ] = data
end
write_p1( p1 ) click to toggle source

write p1

# File lib/girl/p1_worker.rb, line 535
def write_p1( p1 )
  if p1.closed? then
    puts "#{ Time.new } write p1 but p1 closed?"
    return
  end

  p1_info = @p1_infos[ p1 ]
  app = p1_info[ :app ]
  data = p1_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if p1_info[ :closing_write ] then
      close_write_p1( p1 )
    else
      @writes.delete( p1 )
    end

    return
  end

  # 写入
  begin
    written = p1.write_nonblock( data )
  rescue Exception => e
    puts "#{ Time.new } write p1 #{ e.class }"
    close_write_p1( p1 )
    close_read_app( app )
    return
  end

  data = data[ written..-1 ]
  p1_info[ :wbuff ] = data

  unless app.closed? then
    app_info = @app_infos[ app ]
    app_info[ :last_sent_at ] = Time.new
  end
end