class Girl::MirrordWorker

Public Class Methods

new( mirrord_port, infod_port, p2d_ports, p2d_host ) click to toggle source

initialize

# File lib/girl/mirrord_worker.rb, line 7
def initialize( mirrord_port, infod_port, p2d_ports, p2d_host )
  @p2d_host = p2d_host
  @reads = []
  @writes = []
  @roles = {}                    # :dotr / :mirrord / :infod / :p1d / :p2d / :p1 / :p2
  @room_infos = {}               # im => { :mirrord, :p1_addrinfo, :updated_at, :p1d, :p2d }
  @p1d_infos = {}                # p1d => { :im }
  @p2d_infos = {}                # p2d => { :im }
  @p1_infos = ConcurrentHash.new # p1 => { :addrinfo, :im, :p2, :wbuff, :closing_write, :paused }
  @p2_infos = ConcurrentHash.new # p2 => { :addrinfo, :im, :p1, :rbuff, :wbuff, :created_at,
                                 #         :last_recv_at, :last_sent_at, :closing, :closing_write, :paused }

  set_p2d_ports( p2d_ports )
  new_a_pipe
  new_mirrords( mirrord_port )
  new_a_infod( infod_port )
end

Public Instance Methods

looping() click to toggle source

looping

# File lib/girl/mirrord_worker.rb, line 28
def looping
  puts "#{ Time.new } looping"
  loop_check_state

  loop do
    rs, ws = IO.select( @reads, @writes )

    rs.each do | sock |
      role = @roles[ sock ]

      case role
      when :dotr then
        read_dotr( sock )
      when :mirrord
        read_mirrord( sock )
      when :infod
        read_infod( sock )
      when :p1d
        read_p1d( sock )
      when :p2d
        read_p2d( sock )
      when :p1
        read_p1( sock )
      when :p2
        read_p2( sock )
      else
        puts "#{ Time.new } read unknown role #{ role }"
      end
    end

    ws.each do | sock |
      role = @roles[ sock ]

      case role
      when :p1 then
        write_p1( sock )
      when :p2 then
        write_p2( sock )
      else
        puts "#{ Time.new } write unknown role #{ role }"
        close_sock( sock )
      end
    end
  end
rescue Interrupt => e
  puts e.class
  quit!
end
quit!() click to toggle source

quit!

# File lib/girl/mirrord_worker.rb, line 80
def quit!
  # puts "debug exit"
  exit
end

Private Instance Methods

add_p1_wbuff( p1, data ) click to toggle source

add p1 wbuff

# File lib/girl/mirrord_worker.rb, line 90
def add_p1_wbuff( p1, data )
  return if p1.nil? || p1.closed?
  p1_info = @p1_infos[ p1 ]
  p1_info[ :wbuff ] << data
  add_write( p1 )

  if p1_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    p2 = p1_info[ :p2 ]

    if p2 then
      p2_info = @p2_infos[ p2 ]

      if p2_info then
        puts "#{ Time.new } pause p2 #{ p2_info[ :im ].inspect } #{ p2_info[ :addrinfo ].inspect }"
        @reads.delete( p2 )
        p2_info[ :paused ] = true
      end
    end
  end
end
add_p2_rbuff( p2, data ) click to toggle source

add p2 rbuff

# File lib/girl/mirrord_worker.rb, line 114
def add_p2_rbuff( p2, data )
  return if p2.nil? || p2.closed?
  p2_info = @p2_infos[ p2 ]
  return if p2_info[ :closing ]
  p2_info[ :rbuff ] << data

  if p2_info[ :rbuff ].bytesize >= WBUFF_LIMIT then
    # puts "debug p2.rbuff full"
    close_p2( p2 )
  end
end
add_p2_wbuff( p2, data ) click to toggle source

add p2 wbuff

# File lib/girl/mirrord_worker.rb, line 129
def add_p2_wbuff( p2, data )
  return if p2.nil? || p2.closed?
  p2_info = @p2_infos[ p2 ]
  return if p2_info[ :closing ]
  p2_info[ :wbuff ] << data
  p2_info[ :last_recv_at ] = Time.new
  add_write( p2 )

  if p2_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    p1 = p2_info[ :p1 ]

    if p1 then
      p1_info = @p1_infos[ p1 ]

      if p1_info then
        puts "#{ Time.new } pause p1 #{ p1_info[ :im ].inspect } #{ p1_info[ :addrinfo ].inspect }"
        @reads.delete( p1 )
        p1_info[ :paused ] = true
      end
    end
  end
end
add_read( sock, role = nil ) click to toggle source

add read

# File lib/girl/mirrord_worker.rb, line 155
def add_read( sock, role = nil )
  return if sock.nil? || sock.closed? || @reads.include?( sock )
  @reads << sock

  if role then
    @roles[ sock ] = role
  end
end
add_write( sock ) click to toggle source

add write

# File lib/girl/mirrord_worker.rb, line 167
def add_write( sock )
  return if sock.nil? || sock.closed? || @writes.include?( sock )
  @writes << sock
end
close_p1( p1 ) click to toggle source

close p1

# File lib/girl/mirrord_worker.rb, line 175
def close_p1( p1 )
  return if p1.nil? || p1.closed?
  # puts "debug close p1"
  close_sock( p1 )
  @p1_infos.delete( p1 )
end
close_p1d( p1d ) click to toggle source

close p1d

# File lib/girl/mirrord_worker.rb, line 185
def close_p1d( p1d )
  return if p1d.nil? || p1d.closed?
  # puts "debug close p1d"
  close_sock( p1d )
  @p1d_infos.delete( p1d )
end
close_p2( p2 ) click to toggle source

close p2

# File lib/girl/mirrord_worker.rb, line 195
def close_p2( p2 )
  return if p2.nil? || p2.closed?
  # puts "debug close p2"
  close_sock( p2 )
  p2_info = @p2_infos.delete( p2 )

  if p2_info then
    close_p1( p2_info[ :p1 ] )
  end
end
close_p2d( p2d ) click to toggle source

close p2d

# File lib/girl/mirrord_worker.rb, line 209
def close_p2d( p2d )
  return if p2d.nil? || p2d.closed?
  # puts "debug close p2d"
  close_sock( p2d )
  @p2d_infos.delete( p2d )
end
close_read_p1( p1 ) click to toggle source

close read p1

# File lib/girl/mirrord_worker.rb, line 219
def close_read_p1( p1 )
  return if p1.nil? || p1.closed?
  # puts "debug close read p1"
  p1.close_read
  @reads.delete( p1 )

  if p1.closed? then
    # puts "debug p1 closed"
    @writes.delete( p1 )
    @roles.delete( p1 )
    @p1_infos.delete( p1 )
  end
end
close_read_p2( p2 ) click to toggle source

close read p2

# File lib/girl/mirrord_worker.rb, line 236
def close_read_p2( p2 )
  return if p2.nil? || p2.closed?
  # puts "debug close read p2"
  p2.close_read
  @reads.delete( p2 )

  if p2.closed? then
    # puts "debug p2 closed"
    @writes.delete( p2 )
    @roles.delete( p2 )
    @p2_infos.delete( p2 )
  end
end
close_sock( sock ) click to toggle source

close sock

# File lib/girl/mirrord_worker.rb, line 253
def close_sock( sock )
  return if sock.nil? || sock.closed?
  sock.close
  @reads.delete( sock )
  @writes.delete( sock )
  @roles.delete( sock )
end
close_write_p1( p1 ) click to toggle source

close write p1

# File lib/girl/mirrord_worker.rb, line 264
def close_write_p1( p1 )
  return if p1.nil? || p1.closed?
  # puts "debug close write p1"
  p1.close_write
  @writes.delete( p1 )

  if p1.closed? then
    # puts "debug p1 closed"
    @reads.delete( p1 )
    @roles.delete( p1 )
    @p1_infos.delete( p1 )
  end
end
close_write_p2( p2 ) click to toggle source

close write p2

# File lib/girl/mirrord_worker.rb, line 281
def close_write_p2( p2 )
  return if p2.nil? || p2.closed?
  # puts "debug close write p2"
  p2.close_write
  @writes.delete( p2 )

  if p2.closed? then
    # puts "debug p2 closed"
    @reads.delete( p2 )
    @roles.delete( p2 )
    @p2_infos.delete( p2 )
  end
end
del_room_info( im ) click to toggle source

del room info

# File lib/girl/mirrord_worker.rb, line 298
def del_room_info( im )
  room_info = @room_infos.delete( im )

  if room_info then
    close_p1d( room_info[ :p1d ] )
    close_p2d( room_info[ :p2d ] )
  end
end
loop_check_state() click to toggle source

loop check state

# File lib/girl/mirrord_worker.rb, line 310
def loop_check_state
  Thread.new do
    loop do
      sleep CHECK_STATE_INTERVAL
      now = Time.new

      @p2_infos.select{ | p2, _ | !p2.closed? }.each do | p2, p2_info |
        last_recv_at = p2_info[ :last_recv_at ] || p2_info[ :created_at ]
        last_sent_at = p2_info[ :last_sent_at ] || p2_info[ :created_at ]
        is_expire = ( now - last_recv_at >= EXPIRE_AFTER ) && ( now - last_sent_at >= EXPIRE_AFTER )

        if is_expire then
          puts "#{ Time.new } expire p2 #{ p2_info[ :im ].inspect } #{ p2_info[ :addrinfo ].inspect }"
          p2_info[ :closing ] = true
          next_tick
        elsif p2_info[ :paused ] then
          p1 = p2_info[ :p1 ]

          if p1 && !p1.closed? then
            p1_info = @p1_infos[ p1 ]

            if p1_info[ :wbuff ].bytesize < RESUME_BELOW then
              puts "#{ Time.new } resume p2 #{ p2_info[ :im ].inspect } #{ p2_info[ :addrinfo ].inspect }"
              add_read( p2 )
              p2_info[ :paused ] = false
              next_tick
            end
          end
        end
      end

      @p1_infos.select{ | p1, info | !p1.closed? && info[ :paused ] }.each do | p1, p1_info |
        p2 = p1_info[ :p2 ]

        if p2 && !p2.closed? then
          p2_info = @p2_infos[ p2 ]

          if p2_info[ :wbuff ].bytesize < RESUME_BELOW then
            puts "#{ Time.new } resume p1 #{ p1_info[ :im ].inspect } #{ p1_info[ :addrinfo ].inspect }"
            add_read( p1 )
            p1_info[ :paused ] = false
            next_tick
          end
        end
      end
    end
  end
end
new_a_infod( infod_port ) click to toggle source

new a infod

# File lib/girl/mirrord_worker.rb, line 362
def new_a_infod( infod_port )
  infod = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
  infod.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  infod.bind( Socket.sockaddr_in( infod_port, '127.0.0.1' ) )

  puts "#{ Time.new } infod bind on #{ infod_port }"
  add_read( infod, :infod )
end
new_a_listener( port, host ) click to toggle source

new a listener

# File lib/girl/mirrord_worker.rb, line 374
def new_a_listener( port, host )
  listener = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  listener.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )
  listener.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 )

  if RUBY_PLATFORM.include?( 'linux' ) then
    listener.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  end

  listener.bind( Socket.sockaddr_in( port, host ) )
  listener.listen( 127 )
  listener
end
new_a_pipe() click to toggle source

new a pipe

# File lib/girl/mirrord_worker.rb, line 391
def new_a_pipe
  dotr, dotw = IO.pipe
  @dotw = dotw
  add_read( dotr, :dotr )
end
new_mirrords( begin_port ) click to toggle source

new mirrords

# File lib/girl/mirrord_worker.rb, line 400
def new_mirrords( begin_port )
  10.times do | i |
    mirrord_port = begin_port + i
    mirrord = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
    mirrord.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
    mirrord.bind( Socket.sockaddr_in( mirrord_port, '0.0.0.0' ) )
    puts "#{ Time.new } mirrord bind on #{ mirrord_port }"
    add_read( mirrord, :mirrord )
  end
end
next_tick() click to toggle source

next tick

# File lib/girl/mirrord_worker.rb, line 414
def next_tick
  @dotw.write( '.' )
end
read_dotr( dotr ) click to toggle source

read dotr

# File lib/girl/mirrord_worker.rb, line 467
def read_dotr( dotr )
  dotr.read_nonblock( READ_SIZE )
  @p2_infos.select{ | _, info | info[ :closing ] }.keys.each{ | p2 | close_p2( p2 ) }
end
read_infod( infod ) click to toggle source

read infod

# File lib/girl/mirrord_worker.rb, line 518
def read_infod( infod )
  data, addrinfo, rflags, *controls = infod.recvmsg

  data2 = @room_infos.sort_by{ | _, info | info[ :updated_at ] }.reverse.map do | im, info |
    [
      info[ :updated_at ],
      @p2d_ports[ im ],
      im + ' ' * ( ROOM_TITLE_LIMIT - im.size ),
      info[ :p1_addrinfo ].ip_unpack.join( ':' )
    ].join( ' ' )
  end.join( "\n" )

  send_data( infod, data2, addrinfo )
end
read_mirrord( mirrord ) click to toggle source

read mirrord

# File lib/girl/mirrord_worker.rb, line 475
def read_mirrord( mirrord )
  data, addrinfo, rflags, *controls = mirrord.recvmsg
  return if ( data.size > ROOM_TITLE_LIMIT ) || data =~ /\r|\n/
  im = data
  room_info = @room_infos[ im ]

  if room_info then
    room_info[ :mirrord ] = mirrord
    room_info[ :p1_addrinfo ] = addrinfo
    room_info[ :updated_at ] = Time.new
  elsif @p2d_ports.include?( im ) then
    p2d_port = @p2d_ports[ im ]
    p1d = new_a_listener( 0, '0.0.0.0' )
    p2d = new_a_listener( p2d_port, @p2d_host )
    print "#{ Time.new } p1d listen on #{ p1d.local_address.inspect }"
    puts " p2d listen on #{ p2d.local_address.inspect }"

    @p1d_infos[ p1d ] = {
      im: im
    }

    @p2d_infos[ p2d ] = {
      im: im
    }

    @room_infos[ im ] = {
      mirrord: mirrord,
      p1_addrinfo: addrinfo,
      updated_at: Time.new,
      p1d: p1d,
      p2d: p2d
    }

    add_read( p1d, :p1d )
    add_read( p2d, :p2d )
  else
    puts "#{ Time.new } unknown room #{ im.inspect }"
  end
end
read_p1( p1 ) click to toggle source

read p1

# File lib/girl/mirrord_worker.rb, line 616
def read_p1( p1 )
  if p1.closed? then
    puts "#{ Time.new } read p1 but p1 closed?"
    return
  end

  p1_info = @p1_infos[ p1 ]
  p2 = p1_info[ :p2 ]

  begin
    data = p1.read_nonblock( READ_SIZE )
  rescue Exception => e
    # puts "debug read p1 #{ e.class }"
    close_read_p1( p1 )
    set_p2_closing_write( p2 )
    return
  end

  unless p2 then
    if data.bytesize < 8 then
      puts "#{ Time.new } read p1 miss p2 id? #{ data.inspect }"
      close_p1( p1 )
      return
    end

    p2_id = data[ 0, 8 ].unpack( 'Q>' ).first
    p2, p2_info = @p2_infos.find{ | _, info | ( info[ :p2_id ] == p2_id ) && info[ :p1 ].nil? }

    unless p2 then
      # puts "debug p2 not found #{ p2_id }"
      close_p1( p1 )
      return
    end

    puts "#{ Time.new } paired #{ p1_info[ :im ].inspect } #{ p1_info[ :addrinfo ].inspect } #{ p2_info[ :addrinfo ].inspect } #{ p2_id }"
    p1_info[ :p2 ] = p2
    p1_info[ :wbuff ] << p2_info[ :rbuff ]

    unless p1_info[ :wbuff ].empty? then
      add_write( p1 )
    end

    p2_info[ :p1 ] = p1
    data = data[ 8..-1 ]

    if data.empty? then
      return
    end
  end

  add_p2_wbuff( p2, data )
end
read_p1d( p1d ) click to toggle source

read p1d

# File lib/girl/mirrord_worker.rb, line 536
def read_p1d( p1d )
  if p1d.closed? then
    puts "#{ Time.new } read p1d but p1d closed?"
    return
  end

  p1d_info = @p1d_infos[ p1d ]
  im = p1d_info[ :im ]

  begin
    p1, addrinfo = p1d.accept_nonblock
  rescue Exception => e
    puts "#{ Time.new } p1d accept #{ e.class } #{ im.inspect }"
    del_room_info( im )
    return
  end

  @p1_infos[ p1 ] = {
    addrinfo: addrinfo,   # 地址
    im: im,               # 标识
    p2: nil,              # 对应p2
    wbuff: '',            # 写前
    closing_write: false, # 准备关闭写
    paused: false         # 是否暂停
  }

  add_read( p1, :p1 )
  puts "#{ Time.new } here comes a p1 #{ im.inspect } #{ addrinfo.inspect }"
end
read_p2( p2 ) click to toggle source

read p2

# File lib/girl/mirrord_worker.rb, line 672
def read_p2( p2 )
  if p2.closed? then
    puts "#{ Time.new } read p2 but p2 closed?"
    return
  end

  p2_info = @p2_infos[ p2 ]
  p1 = p2_info[ :p1 ]

  begin
    data = p2.read_nonblock( READ_SIZE )
  rescue Exception => e
    # puts "debug read p2 #{ e.class }"
    close_read_p2( p2 )
    set_p1_closing_write( p1 )
    return
  end

  if p1 then
    add_p1_wbuff( p1, data )
  else
    add_p2_rbuff( p2, data )
  end
end
read_p2d( p2d ) click to toggle source

read p2d

# File lib/girl/mirrord_worker.rb, line 569
def read_p2d( p2d )
  if p2d.closed? then
    puts "#{ Time.new } read p2d but p2d closed?"
    return
  end

  p2d_info = @p2d_infos[ p2d ]
  im = p2d_info[ :im ]

  begin
    p2, addrinfo = p2d.accept_nonblock
  rescue Exception => e
    puts "#{ Time.new } p2d accept #{ e.class } #{ im.inspect }"
    del_room_info( im )
    return
  end

  room_info = @room_infos[ im ]
  p1d_port = room_info[ :p1d ].local_address.ip_port
  p2_id = rand( ( 2 ** 64 ) - 2 ) + 1

  @p2_infos[ p2 ] = {
    p2_id: p2_id,         # p2 id
    addrinfo: addrinfo,   # 地址
    im: im,               # 标识
    p1: nil,              # 对应p1
    rbuff: '',            # 匹配到p1之前,暂存流量
    wbuff: '',            # 写前
    created_at: Time.new, # 创建时间
    last_recv_at: nil,    # 上一次收到流量(p1读到,放入p2写前)的时间
    last_sent_at: nil,    # 上一次中转流量(p1写)的时间
    closing: false,       # 是否准备关闭
    closing_write: false, # 准备关闭写
    paused: false         # 是否暂停
  }

  add_read( p2, :p2 )

  puts "#{ Time.new } here comes a p2 #{ im.inspect } #{ addrinfo.inspect } #{ p2_id }"
  puts "rooms #{ @room_infos.size } p1ds #{ @p1d_infos.size } p2ds #{ @p2d_infos.size } p1s #{ @p1_infos.size } p2s #{ @p2_infos.size }"
  data = [ p1d_port, p2_id ].pack( 'nQ>' )
  send_data( room_info[ :mirrord ], data, room_info[ :p1_addrinfo ] )
end
send_data( sock, data, target_addr ) click to toggle source

send data

# File lib/girl/mirrord_worker.rb, line 421
def send_data( sock, data, target_addr )
  begin
    sock.sendmsg_nonblock( data, 0, target_addr )
  rescue Exception => e
    puts "#{ Time.new } sendmsg #{ e.class }"
  end
end
set_p1_closing_write( p1 ) click to toggle source

set p1 closing write

# File lib/girl/mirrord_worker.rb, line 432
def set_p1_closing_write( p1 )
  return if p1.nil? || p1.closed?
  p1_info = @p1_infos[ p1 ]
  return if p1_info[ :closing_write ]
  # puts "debug set p1 closing write"
  p1_info[ :closing_write ] = true
  add_write( p1 )
end
set_p2_closing_write( p2 ) click to toggle source

set p2 closing write

# File lib/girl/mirrord_worker.rb, line 444
def set_p2_closing_write( p2 )
  return if p2.nil? || p2.closed?
  p2_info = @p2_infos[ p2 ]
  return if p2_info[ :closing_write ]
  # puts "debug set p2 closing write"
  p2_info[ :closing_write ] = true
  add_write( p2 )
end
set_p2d_ports( p2d_ports ) click to toggle source

set p2d ports

# File lib/girl/mirrord_worker.rb, line 456
def set_p2d_ports( p2d_ports )
  @p2d_ports = {}

  p2d_ports.each do | im, p2d_port |
    @p2d_ports[ im ] = p2d_port
  end
end
write_p1( p1 ) click to toggle source

write p1

# File lib/girl/mirrord_worker.rb, line 700
def write_p1( p1 )
  if p1.closed? then
    puts "#{ Time.new } write p1 but p1 closed?"
    return
  end

  p1_info = @p1_infos[ p1 ]
  p2 = p1_info[ :p2 ]
  data = p1_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if p1_info[ :closing_write ] then
      close_write_p1( p1 )
    else
      @writes.delete( p1 )
    end

    return
  end

  # 写入
  begin
    written = p1.write_nonblock( data )
  rescue Exception => e
    # puts "debug write p1 #{ e.class }"
    close_write_p1( p1 )
    close_read_p2( p2 )
    return
  end

  data = data[ written..-1 ]
  p1_info[ :wbuff ] = data

  if p2 && !p2.closed? then
    p2_info = @p2_infos[ p2 ]
    p2_info[ :last_sent_at ] = Time.new
  end
end
write_p2( p2 ) click to toggle source

write p2

# File lib/girl/mirrord_worker.rb, line 743
def write_p2( p2 )
  if p2.closed? then
    puts "#{ Time.new } write p2 but p2 closed?"
    return
  end

  p2_info = @p2_infos[ p2 ]
  data = p2_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if p2_info[ :closing_write ] then
      close_write_p2( p2 )
    else
      @writes.delete( p2 )
    end

    return
  end

  # 写入
  begin
    written = p2.write_nonblock( data )
  rescue Exception => e
    # puts "debug write p2 #{ e.class }"
    close_write_p2( p2 )
    close_read_p1( p2_info[ :p1 ] )
    return
  end

  data = data[ written..-1 ]
  p2_info[ :wbuff ] = data
end