class Girl::RelayWorker

Public Class Methods

new( resolv_port, nameserver, resolvd_port, redir_port, proxyd_host, proxyd_port, directs, remotes, im ) click to toggle source

initialize

# File lib/girl/relay_worker.rb, line 7
def initialize( resolv_port, nameserver, resolvd_port, redir_port, proxyd_host, proxyd_port, directs, remotes, im )
  @nameserver_addr = Socket.sockaddr_in( 53, nameserver )
  @resolvd_ports = 10.times.map { | i | resolvd_port + i }
  @qnames = remotes.map { | dom | dom.split( '.' ).map{ | sub | [ sub.size ].pack( 'C' ) + sub }.join }
  @proxyd_host = proxyd_host
  @proxyd_port = proxyd_port
  @directs = directs
  @remotes = remotes
  @custom = Girl::ProxyCustom.new( im )
  @reads = []
  @writes = []
  @roles = {}                      # sock => :dotr / :resolv / :rsv / :redir / :proxy / :src / :dst / :tun
  @is_direct_caches = {}           # ip => true / false
  @src_infos = ConcurrentHash.new  # src => { :src_id, :addrinfo, :proxy_type, :destination_domain, :destination_port,
                                   #          :rbuff, :dst, :dst_id, :ctl, :tun, :renew_tuns_times,
                                   #          :wbuff, :created_at, :closing_write, :closing, :paused }
  @dst_infos = ConcurrentHash.new  # dst => { :src, :domain, :wbuff, :created_at, :connected, :closing_write, :paused }
  @tun_infos = ConcurrentHash.new # tun => { :src, :domain, :wbuff, :rbuff, :created_at, :pong, :is_ping_timeout, :closing, :paused }
  @rsv_infos = ConcurrentHash.new  # rsv => { :src_addr, :created_at }
  @local_addrinfos = Socket.ip_address_list

  new_a_pipe
  new_a_resolv( resolv_port )
  new_a_redir( redir_port )
end

Public Instance Methods

looping() click to toggle source

looping

# File lib/girl/relay_worker.rb, line 36
def looping
  puts "#{ Time.new } looping"
  loop_check_state

  loop do
    rs, ws = IO.select( @reads, @writes )

    rs.each do | sock |
      role = @roles[ sock ]

      case role
      when :dotr then
        read_dotr( sock )
      when :resolv then
        read_resolv( sock )
      when :rsv then
        read_rsv( sock )
      when :redir then
        read_redir( sock )
      when :ctl then
        read_ctl( sock )
      when :src then
        read_src( sock )
      when :dst then
        read_dst( sock )
      when :tun then
        read_tun( sock )
      else
        puts "#{ Time.new } read unknown role #{ role }"
        close_sock( sock )
      end
    end

    ws.each do | sock |
      role = @roles[ sock ]

      case role
      when :src then
        write_src( sock )
      when :dst then
        write_dst( sock )
      when :tun then
        write_tun( sock )
      else
        puts "#{ Time.new } write unknown role #{ role }"
        close_sock( sock )
      end
    end
  end
rescue Interrupt => e
  puts e.class
  quit!
end
quit!() click to toggle source

quit!

# File lib/girl/relay_worker.rb, line 93
def quit!
  # puts "debug exit"
  send_ctlmsg( [ CTL_FIN ].pack( 'C' ) )
  exit
end

Private Instance Methods

add_a_new_source( src ) click to toggle source

add a new source

# File lib/girl/relay_worker.rb, line 104
def add_a_new_source( src )
  return if @ctl.nil? || @ctl.closed? || @ctl_info[ :tund_addrs ].nil?
  src_info = @src_infos[ src ]
  destination_domain = src_info[ :destination_domain ]
  destination_port = src_info[ :destination_port ]
  domain_port = [ destination_domain, destination_port ].join( ':' )
  puts "#{ Time.new } add a new source #{ src_info[ :src_id ] } #{ domain_port }"
  key = [ A_NEW_SOURCE, src_info[ :src_id ] ].pack( 'CQ>' )
  add_ctlmsg( key, "#{ domain_port }/#{ @im }" )
end
add_ctlmsg( key, data ) click to toggle source

add ctlmsg

# File lib/girl/relay_worker.rb, line 118
def add_ctlmsg( key, data )
  return if @ctl.nil? || @ctl.closed?
  ctlmsg = "#{ key }#{ data }"
  send_ctlmsg( ctlmsg )
  @ctl_info[ :resends ] << key
  loop_resend_ctlmsg( key, ctlmsg )
end
add_dst_wbuff( dst, data ) click to toggle source

add dst wbuff

# File lib/girl/relay_worker.rb, line 129
def add_dst_wbuff( dst, data )
  return if dst.nil? || dst.closed?
  dst_info = @dst_infos[ dst ]
  dst_info[ :wbuff ] << data
  add_write( dst )

  if dst_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    src = dst_info[ :src ]

    if src && !src.closed? then
      src_info = @src_infos[ src ]
      puts "#{ Time.new } pause direct src #{ src_info[ :destination_domain ].inspect }"
      @reads.delete( src )
      src_info[ :paused ] = true
    end
  end
end
add_hello() click to toggle source

add hello

# File lib/girl/relay_worker.rb, line 150
def add_hello
  hello = @custom.hello
  puts "#{ Time.new } hello i'm #{ hello.inspect }"
  add_ctlmsg( [ HELLO ].pack( 'C' ), hello )
end
add_read( sock, role = nil ) click to toggle source

add read

# File lib/girl/relay_worker.rb, line 159
def add_read( sock, role = nil )
  return if sock.nil? || sock.closed? || @reads.include?( sock )
  @reads << sock

  if role then
    @roles[ sock ] = role
  end
end
add_src_rbuff( src, data ) click to toggle source

add src rbuff

# File lib/girl/relay_worker.rb, line 171
def add_src_rbuff( src, data )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  return if src_info[ :closing ]
  src_info[ :rbuff ] << data

  if src_info[ :rbuff ].bytesize >= WBUFF_LIMIT then
    puts "#{ Time.new } src rbuff full"
    close_src( src )
  end
end
add_src_wbuff( src, data ) click to toggle source

add src wbuff

# File lib/girl/relay_worker.rb, line 186
def add_src_wbuff( src, data )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  return if src_info[ :closing ]
  src_info[ :wbuff ] << data
  add_write( src )

  if src_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    dst = src_info[ :dst ]

    if dst then
      dst_info = @dst_infos[ dst ]

      if dst_info then
        puts "#{ Time.new } pause dst #{ dst_info[ :domain ].inspect }"
        @reads.delete( dst )
        dst_info[ :paused ] = true
      end
    else
      tun = src_info[ :tun ]

      if tun then
        tun_info = @tun_infos[ tun ]

        if tun_info then
          puts "#{ Time.new } pause tun #{ tun_info[ :domain ].inspect }"
          @reads.delete( tun )
          tun_info[ :paused ] = true
        end
      end
    end
  end
end
add_tun_wbuff( tun, data ) click to toggle source

add tun wbuff

# File lib/girl/relay_worker.rb, line 223
def add_tun_wbuff( tun, data )
  return if tun.nil? || tun.closed?
  tun_info = @tun_infos[ tun ]
  return if tun_info[ :closing ]
  tun_info[ :wbuff ] << data
  add_write( tun )

  if tun_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    src = tun_info[ :src ]

    if src && !src.closed? then
      src_info = @src_infos[ src ]
      puts "#{ Time.new } pause remote src #{ src_info[ :destination_domain ].inspect }"
      @reads.delete( src )
      src_info[ :paused ] = true
    end
  end
end
add_write( sock ) click to toggle source

add write

# File lib/girl/relay_worker.rb, line 245
def add_write( sock )
  return if sock.nil? || sock.closed? || @writes.include?( sock )
  @writes << sock
end
close_ctl( ctl ) click to toggle source

close ctl

# File lib/girl/relay_worker.rb, line 253
def close_ctl( ctl )
  return if ctl.nil? || ctl.closed?
  close_sock( ctl )
end
close_dst( dst ) click to toggle source

close dst

# File lib/girl/relay_worker.rb, line 261
def close_dst( dst )
  return if dst.nil? || dst.closed?
  close_sock( dst )
  dst_info = @dst_infos.delete( dst )

  if dst_info then
    close_src( dst_info[ :src ] )
  end
end
close_read_dst( dst ) click to toggle source

close read dst

# File lib/girl/relay_worker.rb, line 274
def close_read_dst( dst )
  return if dst.nil? || dst.closed?
  # puts "debug close read dst"
  dst.close_read
  @reads.delete( dst )

  if dst.closed? then
    # puts "debug dst closed"
    @writes.delete( dst )
    @roles.delete( dst )
    @dst_infos.delete( dst )
  end
end
close_read_src( src ) click to toggle source

close read src

# File lib/girl/relay_worker.rb, line 291
def close_read_src( src )
  return if src.nil? || src.closed?
  # puts "debug close read src"
  src.close_read
  @reads.delete( src )
  src_info = @src_infos[ src ]

  if src_info[ :tun ] then
    send_src_closed_read( src_info[ :src_id ] )
  end

  if src.closed? then
    # puts "debug src closed"
    @writes.delete( src )
    @roles.delete( src )
    @src_infos.delete( src )
  end
end
close_read_tun( tun ) click to toggle source

close read tun

# File lib/girl/relay_worker.rb, line 313
def close_read_tun( tun )
  return if tun.nil? || tun.closed?
  # puts "debug close read tun"
  tun.close_read
  @reads.delete( tun )

  if tun.closed? then
    # puts "debug tun closed"
    @writes.delete( tun )
    @roles.delete( tun )
    @tun_infos.delete( tun )
  end
end
close_rsv( rsv ) click to toggle source

close rsv

# File lib/girl/relay_worker.rb, line 330
def close_rsv( rsv )
  return if rsv.nil? || rsv.closed?
  # puts "debug close rsv"
  close_sock( rsv )
  @rsv_infos.delete( rsv )
end
close_sock( sock ) click to toggle source

close sock

# File lib/girl/relay_worker.rb, line 340
def close_sock( sock )
  return if sock.nil? || sock.closed?
  sock.close
  @reads.delete( sock )
  @writes.delete( sock )
  @roles.delete( sock )
end
close_src( src ) click to toggle source

close src

# File lib/girl/relay_worker.rb, line 351
def close_src( src )
  return if src.nil? || src.closed?
  # puts "debug close src"
  close_sock( src )
  src_info = @src_infos.delete( src )

  if src_info then
    dst = src_info[ :dst ]

    if dst then
      close_dst( dst )
    elsif src_info[ :tun ] then
      close_tun( src_info[ :tun ] )
      send_src_closed( src_info[ :src_id ] )
    end
  end
end
close_write_dst( dst ) click to toggle source

close write dst

# File lib/girl/relay_worker.rb, line 372
def close_write_dst( dst )
  return if dst.nil? || dst.closed?
  # puts "debug close write dst"
  dst.close_write
  @writes.delete( dst )

  if dst.closed? then
    # puts "debug dst closed"
    @reads.delete( dst )
    @roles.delete( dst )
    @dst_infos.delete( dst )
  end
end
close_write_src( src ) click to toggle source

close write src

# File lib/girl/relay_worker.rb, line 389
def close_write_src( src )
  return if src.nil? || src.closed?
  # puts "debug close write src"
  src.close_write
  @writes.delete( src )
  src_info = @src_infos[ src ]

  if src_info[ :tun ] then
    send_src_closed_write( src_info[ :src_id ] )
  end

  if src.closed? then
    # puts "debug src closed"
    @reads.delete( src )
    @roles.delete( src )
    @src_infos.delete( src )
  end
end
close_write_tun( tun ) click to toggle source

close write tun

# File lib/girl/relay_worker.rb, line 411
def close_write_tun( tun )
  return if tun.nil? || tun.closed?
  # puts "debug close write tun"
  tun.close_write
  @writes.delete( tun )

  if tun.closed? then
    # puts "debug tun closed"
    @reads.delete( tun )
    @roles.delete( tun )
    @tun_infos.delete( tun )
  end
end
loop_check_state() click to toggle source

loop check state

# File lib/girl/relay_worker.rb, line 428
def loop_check_state
  Thread.new do
    loop do
      sleep CHECK_STATE_INTERVAL
      now = Time.new

      @src_infos.select{ | src, info | !src.closed? && info[ :paused ] }.each do | src, src_info |
        dst = src_info[ :dst ]

        if dst then
          dst_info = @dst_infos[ dst ]

          if dst_info[ :wbuff ].bytesize < RESUME_BELOW then
            puts "#{ Time.new } resume direct src #{ src_info[ :destination_domain ].inspect }"
            add_read( src )
            src_info[ :paused ] = false
            next_tick
          end
        else
          tun = src_info[ :tun ]

          if tun && !tun.closed? then
            tun_info = @tun_infos[ tun ]

            if tun_info[ :wbuff ].bytesize < RESUME_BELOW then
              puts "#{ Time.new } resume remote src #{ src_info[ :destination_domain ].inspect }"
              add_read( src )
              src_info[ :paused ] = false
              next_tick
            end
          end
        end
      end

      @dst_infos.select{ | dst, info | !dst.closed? }.each do | dst, dst_info |
        if !dst_info[ :connected ] && ( now - dst_info[ :created_at ] >= EXPIRE_CONNECTING ) then
          puts "#{ Time.new } expire dst #{ dst_info[ :domain ].inspect }"
          dst_info[ :closing ] = true
          next_tick
        elsif dst_info[ :paused ] then
          src = dst_info[ :src ]

          if src && !src.closed? then
            src_info = @src_infos[ src ]

            if src_info[ :wbuff ].bytesize < RESUME_BELOW then
              puts "#{ Time.new } resume dst #{ dst_info[ :domain ].inspect }"
              add_read( dst )
              dst_info[ :paused ] = false
              next_tick
            end
          end
        end
      end

      @tun_infos.select{ | tun, info | !tun.closed? }.each do | tun, tun_info |
        if tun_info[ :paused ] then
          src = tun_info[ :src ]

          if src && !src.closed? then
            src_info = @src_infos[ src ]

            if src_info[ :wbuff ].bytesize < RESUME_BELOW then
              puts "#{ Time.new } resume tun #{ tun_info[ :domain ].inspect }"
              add_read( tun )
              tun_info[ :paused ] = false
              next_tick
            end
          end
        elsif !tun_info[ :pong ] && ( now - tun_info[ :created_at ] >= PING_TIMEOUT ) then
          puts "#{ Time.new } ping timeout #{ tun_info[ :domain ].inspect }"
          tun_info[ :is_ping_timeout ] = true
          next_tick
        end
      end

      @rsv_infos.select{ | rsv, info | !rsv.closed? && ( now - info[ :created_at ] >= EXPIRE_NEW ) }.values.each do | rsv_info |
        puts "#{ Time.new } expire rsv"
        rsv_info[ :closing ] = true
        next_tick
      end
    end
  end
end
loop_resend_ctlmsg( key, ctlmsg ) click to toggle source

loop resend ctlmsg

# File lib/girl/relay_worker.rb, line 516
def loop_resend_ctlmsg( key, ctlmsg )
  Thread.new do
    resending = true

    RESEND_LIMIT.times do
      sleep RESEND_INTERVAL

      if @ctl && !@ctl.closed? && @ctl_info[ :resends ].include?( key ) then
        puts "#{ Time.new } resend #{ ctlmsg.inspect }"
        send_ctlmsg( ctlmsg )
      else
        resending = false
      end

      break unless resending
    end

    if resending then
      set_ctl_closing
    end
  end
end
new_a_ctl() click to toggle source

new a ctl

# File lib/girl/relay_worker.rb, line 542
def new_a_ctl
  ctl = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
  ctld_port = @proxyd_port + 10.times.to_a.sample
  ctld_addr = Socket.sockaddr_in( ctld_port, @proxyd_host )
  @ctl = ctl

  @ctl_info = {
    ctld_addr: ctld_addr, # ctld地址
    resends: [],          # 重传的key
    tund_addrs: nil,      # tund地址
    closing: false        # 准备关闭
  }

  add_read( ctl, :ctl )
  puts "#{ Time.new } new a ctl #{ ctld_port }"
  puts "srcs #{ @src_infos.size } dsts #{ @dst_infos.size } tuns #{ @tun_infos.size } rsvs #{ @rsv_infos.size }"
  add_hello
end
new_a_dst( addrinfo, src ) click to toggle source

new a dst

# File lib/girl/relay_worker.rb, line 564
def new_a_dst( addrinfo, src )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  domain = src_info[ :destination_domain ]
  port = src_info[ :destination_port ]
  ip = addrinfo.ip_address
  destination_addr = Socket.sockaddr_in( port, ip )

  begin
    dst = Socket.new( addrinfo.ipv4? ? Socket::AF_INET : Socket::AF_INET6, Socket::SOCK_STREAM, 0 )
  rescue Exception => e
    puts "#{ Time.new } new a dst #{ e.class } #{ domain.inspect } #{ ip } #{ port }"
    close_src( src )
    return
  end

  dst.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )

  begin
    dst.connect_nonblock( destination_addr )
  rescue IO::WaitWritable
  rescue Exception => e
    puts "#{ Time.new } dst connect destination #{ e.class } #{ domain.inspect  } #{ ip } #{ port }"
    dst.close
    close_src( src )
    return
  end

  # puts "debug a new dst #{ dst.local_address.inspect }"
  dst_info = {
    src: src,             # 对应src
    domain: domain,       # 目的地
    wbuff: '',            # 写前
    created_at: Time.new, # 创建时间
    connected: false,     # 是否已连接
    closing_write: false, # 准备关闭写
    closing: false,       # 准备关闭
    paused: false         # 是否已暂停
  }

  @dst_infos[ dst ] = dst_info
  src_info[ :proxy_type ] = :direct
  src_info[ :dst ] = dst

  if src_info[ :rbuff ] then
    # puts "debug move src.rbuff to dst.wbuff"
    dst_info[ :wbuff ] << src_info[ :rbuff ]
  end

  add_read( dst, :dst )
  add_write( dst )
end
new_a_pipe() click to toggle source

new a pipe

# File lib/girl/relay_worker.rb, line 620
def new_a_pipe
  dotr, dotw = IO.pipe
  @dotw = dotw
  add_read( dotr, :dotr )
end
new_a_redir( redir_port ) click to toggle source

new a redir

# File lib/girl/relay_worker.rb, line 629
def new_a_redir( redir_port )
  redir = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  redir.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )
  redir.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 )
  redir.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  redir.bind( Socket.sockaddr_in( redir_port, '0.0.0.0' ) )
  redir.listen( 127 )
  puts "#{ Time.new } redir listen on #{ redir_port }"
  add_read( redir, :redir )
  @redir_port = redir_port
  @redir_local_address = redir.local_address
end
new_a_remote( src ) click to toggle source

new a remote

# File lib/girl/relay_worker.rb, line 645
def new_a_remote( src )
  return if src.nil? || src.closed?

  if @ctl.nil? || @ctl.closed? then
    new_a_ctl
  end

  src_info = @src_infos[ src ]
  src_info[ :proxy_type ] = :remote
  add_a_new_source( src )
end
new_a_resolv( resolv_port ) click to toggle source

new a resolv

# File lib/girl/relay_worker.rb, line 660
def new_a_resolv( resolv_port )
  resolv = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
  resolv.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  resolv.bind( Socket.sockaddr_in( resolv_port, '0.0.0.0' ) )

  puts "#{ Time.new } resolv bind on #{ resolv_port }"
  add_read( resolv, :resolv )
  @resolv = resolv
end
new_a_rsv( src_addr, data ) click to toggle source

new a rsv

# File lib/girl/relay_worker.rb, line 673
def new_a_rsv( src_addr, data )
  rsv = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
  rsv.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )

  if @qnames.any?{ | qname | data.include?( qname ) } then
    data = @custom.encode( data )
    to_addr = Socket.sockaddr_in( @resolvd_ports.sample, @proxyd_host )
  else
    to_addr = @nameserver_addr
  end

  # puts "debug new a rsv to #{ Addrinfo.new( to_addr ).inspect }"

  @rsv_infos[ rsv ] = {
    src_addr: src_addr,
    created_at: Time.new,
    closing: false
  }

  add_read( rsv, :rsv )
  send_data( rsv, to_addr, data )
end
new_a_tun( src ) click to toggle source

new a tun

# File lib/girl/relay_worker.rb, line 699
def new_a_tun( src )
  return if src.nil? || src.closed? || @ctl_info[ :tund_addrs ].nil?
  src_info = @src_infos[ src ]
  dst_id = src_info[ :dst_id ]
  return unless dst_id

  tund_addr = @ctl_info[ :tund_addrs ].sample
  # puts "debug new a tun #{ Addrinfo.new( tund_addr ).inspect }"

  tun = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  tun.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )

  begin
    tun.connect_nonblock( tund_addr )
  rescue IO::WaitWritable
  rescue Exception => e
    puts "#{ Time.new } connect tund #{ e.class }"
    tun.close
    return
  end

  domain = src_info[ :destination_domain ]

  @tun_infos[ tun ] = {
    src: src,                                # 对应src
    domain: src_info[ :destination_domain ], # 目的地
    rbuff: '',                               # 暂存不满一块的流量
    wbuff: [ dst_id ].pack( 'n' ),           # 写前
    created_at: Time.new,                    # 创建时间
    pong: false,                             # 是否有回应
    is_ping_timeout: false,                  # ping超时
    closing: false,                          # 准备关闭
    paused: false                            # 是否已暂停
  }

  src_info[ :tun ] = tun
  add_read( tun, :tun )
  add_write( tun )
end
new_a_tunnel( addrinfo, src ) click to toggle source

new a tunnel

# File lib/girl/relay_worker.rb, line 742
def new_a_tunnel( addrinfo, src )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  ip = addrinfo.ip_address
  port = src_info[ :destination_port ]

  if ( @local_addrinfos.any?{ | _addrinfo | _addrinfo.ip_address == ip } ) && ( port == @redir_port ) then
    puts "#{ Time.new } ignore #{ ip }:#{ port }"
    close_src( src )
    return
  end

  if ( src_info[ :destination_domain ] == @proxyd_host ) && ![ 80, 443 ].include?( port ) then
    # 访问远端非80/443端口,直连
    puts "#{ Time.new } direct #{ ip } #{ port }"
    new_a_dst( addrinfo, src )
    return
  end

  if @is_direct_caches.include?( ip ) then
    is_direct = @is_direct_caches[ ip ]
  else
    is_direct = @directs.any?{ | direct | direct.include?( ip ) }
    puts "#{ Time.new } cache is direct #{ ip } #{ is_direct }"
    @is_direct_caches[ ip ] = is_direct
  end

  if is_direct then
    # puts "debug hit directs #{ addrinfo.inspect }"
    new_a_dst( addrinfo, src )
  else
    # puts "debug go remote #{ addrinfo.inspect }"
    new_a_remote( src )
  end
end
next_tick() click to toggle source

next tick

# File lib/girl/relay_worker.rb, line 781
def next_tick
  @dotw.write( '.' )
end
pack_a_chunk( data ) click to toggle source

pack a chunk

# File lib/girl/relay_worker.rb, line 788
def pack_a_chunk( data )
  data = @custom.encode( data )
  "#{ [ data.bytesize ].pack( 'n' ) }#{ data }"
end
read_ctl( ctl ) click to toggle source

read ctl

# File lib/girl/relay_worker.rb, line 1054
def read_ctl( ctl )
  begin
    data, addrinfo, rflags, *controls = ctl.recvmsg
  rescue Exception => e
    puts "#{ Time.new } ctl recvmsg #{ e.class }"
    close_ctl( ctl )
    return
  end

  data = @custom.decode( data )
  ctl_num = data[ 0 ].unpack( 'C' ).first

  case ctl_num
  when TUND_PORTS then
    return if data.bytesize != 21
    tund_ports = data[ 1, 20 ].unpack( 'n*' )
    puts "#{ Time.new } got tund ports #{ tund_ports.inspect }"
    @ctl_info[ :resends ].delete( [ HELLO ].pack( 'C' ) )
    @ctl_info[ :tund_addrs ] = tund_ports.map{ | tund_port | Socket.sockaddr_in( tund_port, @proxyd_host ) }
    @src_infos.select{ | _, info | ( info[ :proxy_type ] == :remote ) && !info[ :dst_id ] }.keys.each{ | src | add_a_new_source( src ) }
  when PAIRED then
    return if data.bytesize != 11 || @ctl_info[ :tund_addrs ].nil?
    src_id, dst_id = data[ 1, 10 ].unpack( 'Q>n' )
    src, src_info = @src_infos.find{ | _, info | ( info[ :src_id ] == src_id ) && info[ :dst_id ].nil? }
    return if src.nil? || src.closed?

    # puts "debug got paired #{ src_id } #{ dst_id }"
    src_info[ :dst_id ] = dst_id
    @ctl_info[ :resends ].delete( [ A_NEW_SOURCE, src_id ].pack( 'CQ>' ) )
    new_a_tun( src )
  when UNKNOWN_CTL_ADDR then
    puts "#{ Time.new } got unknown ctl addr"
    close_ctl( ctl )
    new_a_ctl
  end
end
read_dotr( dotr ) click to toggle source

read dotr

# File lib/girl/relay_worker.rb, line 945
def read_dotr( dotr )
  dotr.read_nonblock( READ_SIZE )
  @rsv_infos.select{ | _, info | info[ :closing ] }.keys.each{ | rsv | close_rsv( rsv ) }

  if @ctl && !@ctl.closed? && @ctl_info[ :closing ] then
    send_ctlmsg( [ CTL_FIN ].pack( 'C' ) )
    close_ctl( @ctl )
  end

  @dst_infos.select{ | _, info | info[ :closing ] }.keys.each{ | dst | close_dst( dst ) }
  @tun_infos.select{ | _, info | info[ :is_ping_timeout ] }.keys.each{ | tun | renew_a_tun( tun ) }
end
read_dst( dst ) click to toggle source

read dst

# File lib/girl/relay_worker.rb, line 1148
def read_dst( dst )
  if dst.closed? then
    puts "#{ Time.new } read dst but dst closed?"
    return
  end

  dst_info = @dst_infos[ dst ]
  src = dst_info[ :src ]

  begin
    data = dst.read_nonblock( CHUNK_SIZE )
  rescue Exception => e
    # puts "debug read dst #{ e.class }"
    close_read_dst( dst )
    set_src_closing_write( src )
    return
  end

  add_src_wbuff( src, data )
end
read_redir( redir ) click to toggle source

read redir

# File lib/girl/relay_worker.rb, line 998
def read_redir( redir )
  begin
    src, addrinfo = redir.accept_nonblock
  rescue IO::WaitReadable, Errno::EINTR => e
    puts "accept #{ e.class }"
    return
  end

  begin
    # /usr/include/linux/netfilter_ipv4.h
    option = src.getsockopt( Socket::SOL_IP, 80 )
  rescue Exception => e
    puts "get SO_ORIGINAL_DST #{ e.class }"
    src.close
  end

  dest_family, dest_port, dest_host = option.unpack( 'nnN' )
  dest_addr = Socket.sockaddr_in( dest_port, dest_host )
  dest_addrinfo = Addrinfo.new( dest_addr )
  dest_ip = dest_addrinfo.ip_address

  src_id = rand( ( 2 ** 64 ) - 2 ) + 1
  # puts "debug accept a src #{ addrinfo.ip_unpack.inspect } to #{ dest_ip }:#{ dest_port } #{ src_id }"

  @src_infos[ src ] = {
    src_id: src_id,              # src_id
    addrinfo: addrinfo,          # addrinfo
    proxy_type: :checking,       # :checking / :direct / :tunnel
    destination_domain: dest_ip, # 目的地域名
    destination_port: dest_port, # 目的地端口
    rbuff: '',                   # 读到的流量
    dst: nil,                    # :direct的场合,对应的dst
    dst_id: nil,                 # 远端dst id
    ctl: nil,                    # :tunnel的场合,对应的ctl
    tun: nil,                    # :remote的场合,对应的tun
    renew_tun_times: 0,          # :remote的场合,重建tun次数
    pong: false,                 # :remote的场合,连接已确认
    wbuff: '',                   # 从dst/tun读到的流量
    created_at: Time.new,        # 创建时间
    closing_write: false,        # 准备关闭写
    closing: false,              # 准备关闭
    paused: false                # 是否暂停
  }

  add_read( src, :src )

  if @ctl.nil? || @ctl.closed? then
    new_a_ctl
  end

  new_a_tunnel( dest_addrinfo, src )
end
read_resolv( resolv ) click to toggle source

read resolv

# File lib/girl/relay_worker.rb, line 961
def read_resolv( resolv )
  data, addrinfo, rflags, *controls = resolv.recvmsg
  # puts "debug resolv recvmsg #{ addrinfo.ip_unpack.inspect } #{ data.inspect }"
  new_a_rsv( addrinfo, data )
end
read_rsv( rsv ) click to toggle source

read rsv

# File lib/girl/relay_worker.rb, line 970
def read_rsv( rsv )
  if rsv.closed? then
    puts "#{ Time.new } read rsv but rsv closed?"
    return
  end

  begin
    data, addrinfo, rflags, *controls = rsv.recvmsg
  rescue Exception => e
    puts "#{ Time.new } rsv recvmsg #{ e.class }"
    close_rsv( rsv )
    return
  end

  # puts "debug rsv recvmsg #{ addrinfo.ip_unpack.inspect } #{ data.inspect }"

  if addrinfo.ip_address == @proxyd_host then
    data = @custom.decode( data )
  end

  rsv_info = @rsv_infos[ rsv ]
  send_data( @resolv, rsv_info[ :src_addr ], data )
  close_rsv( rsv )
end
read_src( src ) click to toggle source

read src

# File lib/girl/relay_worker.rb, line 1094
def read_src( src )
  if src.closed? then
    puts "#{ Time.new } read src but src closed?"
    return
  end

  src_info = @src_infos[ src ]

  begin
    data = src.read_nonblock( CHUNK_SIZE )
  rescue Exception => e
    # puts "debug read src #{ e.class }"
    close_read_src( src )
    dst = src_info[ :dst ]

    if dst then
      set_dst_closing_write( dst )
    else
      set_tun_closing_write( src_info[ :tun ] )
    end

    return
  end

  proxy_type = src_info[ :proxy_type ]

  case proxy_type
  when :checking then
    # puts "debug add src rbuff before resolved #{ data.inspect }"
    src_info[ :rbuff ] << data
  when :remote then
    tun = src_info[ :tun ]

    if tun then
      add_tun_wbuff( tun, pack_a_chunk( data ) )
    else
      # puts "debug add src.rbuff #{ data.bytesize }"
      add_src_rbuff( src, data )
    end
  when :direct then
    dst = src_info[ :dst ]

    if dst then
      add_dst_wbuff( dst, data )
    else
      # puts "debug add src.rbuff #{ data.bytesize }"
      add_src_rbuff( src, data )
    end
  end
end
read_tun( tun ) click to toggle source

read tun

# File lib/girl/relay_worker.rb, line 1172
def read_tun( tun )
  if tun.closed? then
    puts "#{ Time.new } read tun but tun closed?"
    return
  end

  tun_info = @tun_infos[ tun ]
  src = tun_info[ :src ]

  if src.closed? then
    close_tun( tun )
    return
  end

  begin
    data = tun.read_nonblock( READ_SIZE )
  rescue Exception => e
    # puts "debug read tun #{ e.class }"
    close_read_tun( tun )
    set_src_closing_write( src )
    return
  end

  unless tun_info[ :pong ] then
    if data.bytesize < 8 then
      puts "#{ Time.new } pong length less than 8?"
      close_tun( tun )
      return
    end

    src_info = @src_infos[ src ]

    if data[ 0, 8 ].unpack( 'Q>' ).first != src_info[ :src_id ] then
      puts "#{ Time.new } invalid pong?"
      close_tun( tun )
      return
    end

    # puts "debug got pong #{ data.bytesize }"
    set_tun_info_pong( tun, src )
    data = data[ 8..-1 ]

    if data.empty? then
      return
    end
  end

  data = "#{ tun_info[ :rbuff ] }#{ data }"

  loop do
    if data.bytesize <= 2 then
      tun_info[ :rbuff ] = data
      break
    end

    len = data[ 0, 2 ].unpack( 'n' ).first

    if len == 0 then
      puts "#{ Time.new } zero traffic len?"
      close_tun( tun )
      close_src( src )
      return
    end

    chunk = data[ 2, len ]

    if chunk.bytesize < len then
      tun_info[ :rbuff ] = data
      break
    end

    chunk = @custom.decode( chunk )
    add_src_wbuff( src, chunk )
    data = data[ ( 2 + len )..-1 ]
  end
end
renew_a_tun( tun ) click to toggle source

renew a tun

# File lib/girl/relay_worker.rb, line 796
def renew_a_tun( tun )
  tun_info = close_tun( tun )
  src = tun_info[ :src ]
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]

  if src_info[ :renew_tun_times ] >= RENEW_TUN_LIMIT then
    puts "#{ Time.new } renew a tun out of limit #{ src_info[ :destination_domain ].inspect } #{ src_info[ :destination_port ] }"
    close_src( src )
  else
    new_a_tun( src )
    src_info[ :renew_tun_times ] += 1
  end
end
send_ctlmsg( data ) click to toggle source

send ctlmsg

# File lib/girl/relay_worker.rb, line 814
def send_ctlmsg( data )
  return if @ctl.nil? || @ctl.closed?
  data = @custom.encode( data )

  begin
    @ctl.sendmsg_nonblock( data, 0, @ctl_info[ :ctld_addr ] )
  rescue Exception => e
    puts "#{ Time.new } ctl sendmsg #{ e.class }"
    set_ctl_closing
  end
end
send_data( sock, to_addr, data ) click to toggle source

send data

# File lib/girl/relay_worker.rb, line 829
def send_data( sock, to_addr, data )
  begin
    sock.sendmsg_nonblock( data, 0, to_addr )
  rescue Exception => e
    puts "#{ Time.new } sendmsg #{ e.class } #{ to_addr.inspect }"
  end
end
send_src_closed( src_id ) click to toggle source

send src closed

# File lib/girl/relay_worker.rb, line 840
def send_src_closed( src_id )
  return if @ctl.nil? || @ctl.closed?
  data = "#{ [ SOURCE_CLOSED, src_id ].pack( 'CQ>' ) }#{ @im }"
  send_ctlmsg( data )
end
send_src_closed_read( src_id ) click to toggle source

send src closed read

# File lib/girl/relay_worker.rb, line 849
def send_src_closed_read( src_id )
  return if @ctl.nil? || @ctl.closed?
  data = "#{ [ SOURCE_CLOSED_READ, src_id ].pack( 'CQ>' ) }#{ @im }"
  send_ctlmsg( data )
end
send_src_closed_write( src_id ) click to toggle source

send src closed write

# File lib/girl/relay_worker.rb, line 858
def send_src_closed_write( src_id )
  return if @ctl.nil? || @ctl.closed?
  data = "#{ [ SOURCE_CLOSED_WRITE, src_id ].pack( 'CQ>' ) }#{ @im }"
  send_ctlmsg( data )
end
set_ctl_closing() click to toggle source

set ctl closing

# File lib/girl/relay_worker.rb, line 867
def set_ctl_closing
  return if @ctl.nil? || @ctl.closed? || @ctl_info[ :closing ]
  @ctl_info[ :closing ] = true
  next_tick
end
set_dst_closing_write( dst ) click to toggle source

set dst closing write

# File lib/girl/relay_worker.rb, line 876
def set_dst_closing_write( dst )
  return if dst.nil? || dst.closed?
  dst_info = @dst_infos[ dst ]
  return if dst_info[ :closing_write ]
  # puts "debug set dst closing write"
  dst_info[ :closing_write ] = true
  add_write( dst )
end
set_src_closing_write( src ) click to toggle source

set src closing write

# File lib/girl/relay_worker.rb, line 888
def set_src_closing_write( src )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  return if src_info[ :closing ] || src_info[ :closing_write ]
  src_info[ :closing_write ] = true
  add_write( src )
end
set_tun_closing_write( tun ) click to toggle source

set tun closing write

# File lib/girl/relay_worker.rb, line 899
def set_tun_closing_write( tun )
  return if tun.nil? || tun.closed?
  tun_info = @tun_infos[ tun ]
  return if tun_info[ :closing ] || tun_info[ :closing_write ]
  tun_info[ :closing_write ] = true
  add_write( tun )
end
set_tun_info_pong( tun, src ) click to toggle source

set tun info pong

# File lib/girl/relay_worker.rb, line 910
def set_tun_info_pong( tun, src )
  tun_info = @tun_infos[ tun ]
  tun_info[ :pong ] = true
  src_info = @src_infos[ src ]

  unless src_info[ :pong ] then
    if src_info[ :proxy_proto ] == :http then
      if src_info[ :is_connect ] then
        # puts "debug add src wbuff http ok"
        add_src_wbuff( src, HTTP_OK )
      end
    elsif src_info[ :proxy_proto ] == :socks5 then
      add_socks5_conn_reply( src )
    end

    src_info[ :pong ] = true
  end

  unless src_info[ :rbuff ].empty? then
    data = ''

    until src_info[ :rbuff ].empty? do
      # puts "debug move src rbuff to tun wbuff #{ src_info[ :rbuff ].bytesize }"
      chunk_data = src_info[ :rbuff ][ 0, CHUNK_SIZE ]
      data << pack_a_chunk( chunk_data )
      src_info[ :rbuff ] = src_info[ :rbuff ][ chunk_data.bytesize..-1 ]
    end

    add_tun_wbuff( tun, data )
  end
end
write_dst( dst ) click to toggle source

write dst

# File lib/girl/relay_worker.rb, line 1299
def write_dst( dst )
  if dst.closed? then
    puts "#{ Time.new } write dst but dst closed?"
    return
  end

  dst_info = @dst_infos[ dst ]
  dst_info[ :connected ] = true
  src = dst_info[ :src ]
  data = dst_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if dst_info[ :closing_write ] then
      close_write_dst( dst )
    else
      @writes.delete( dst )
    end

    return
  end

  # 写入
  begin
    written = dst.write_nonblock( data )
  rescue IO::WaitWritable, Errno::EINTR
    print 'w'
    return
  rescue Exception => e
    # puts "debug write dst #{ e.class }"
    close_write_dst( dst )
    close_read_src( src )
    return
  end

  data = data[ written..-1 ]
  dst_info[ :wbuff ] = data
end
write_src( src ) click to toggle source

write src

# File lib/girl/relay_worker.rb, line 1252
def write_src( src )
  if src.closed? then
    puts "#{ Time.new } write src but src closed?"
    return
  end

  src_info = @src_infos[ src ]
  dst = src_info[ :dst ]
  data = src_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if src_info[ :closing_write ] then
      close_write_src( src )
    else
      @writes.delete( src )
    end

    return
  end

  # 写入
  begin
    written = src.write_nonblock( data )
  rescue IO::WaitWritable, Errno::EINTR
    print 'w'
    return
  rescue Exception => e
    # puts "debug write src #{ e.class }"
    close_write_src( src )

    if dst then
      close_read_dst( dst )
    else
      close_read_tun( src_info[ :tun ] )
    end

    return
  end

  data = data[ written..-1 ]
  src_info[ :wbuff ] = data
end
write_tun( tun ) click to toggle source

write tun

# File lib/girl/relay_worker.rb, line 1341
def write_tun( tun )
  if tun.closed? then
    puts "#{ Time.new } write tun but tun closed?"
    return
  end

  tun_info = @tun_infos[ tun ]
  src = tun_info[ :src ]
  data = tun_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if tun_info[ :closing_write ] then
      close_write_tun( tun )
    else
      @writes.delete( tun )
    end

    return
  end

  # 写入
  begin
    written = tun.write_nonblock( data )
  rescue Exception => e
    # puts "debug write tun #{ e.class }"
    close_write_tun( tun )
    close_read_src( src )
    return
  end

  # puts "debug write tun #{ written }"
  data = data[ written..-1 ]
  tun_info[ :wbuff ] = data
end