class Girl::ProxyWorker
Public Class Methods
new( redir_port, proxyd_host, proxyd_port, directs, remotes, nameserver, im )
click to toggle source
initialize
# File lib/girl/proxy_worker.rb, line 7 def initialize( redir_port, proxyd_host, proxyd_port, directs, remotes, nameserver, im ) @proxyd_host = proxyd_host @proxyd_port = proxyd_port @directs = directs @remotes = remotes @nameserver_addr = Socket.sockaddr_in( 53, nameserver ) @im = im @custom = Girl::ProxyCustom.new( im ) @reads = [] @writes = [] @roles = {} # sock => :dotr / :redir / :ctl / :src / :dst / :tun / :dns @resolv_caches = {} # domain => [ ip, created_at ] @is_direct_caches = {} # ip => true / false @src_infos = ConcurrentHash.new # src => { :src_id, :addrinfo, :proxy_proto, :proxy_type, :destination_domain, :destination_port, # :is_connect, :rbuff, :dst, :dst_id, :ctl, :tun, :renew_tun_times, # :wbuff, :created_at, :closing_write, :closing, :paused } @dst_infos = ConcurrentHash.new # dst => { :src, :domain, :wbuff, :created_at, :connected, :closing_write, :closing, :paused } @tun_infos = ConcurrentHash.new # tun => { :src, :domain, :wbuff, :rbuff, :created_at, :pong, :is_ping_timeout, :closing, :paused } @dns_infos = ConcurrentHash.new # dns => { :domain, :src, :created_at, :closing } @local_addrinfos = Socket.ip_address_list new_a_pipe new_a_redir( redir_port ) end
Public Instance Methods
looping()
click to toggle source
looping
# File lib/girl/proxy_worker.rb, line 35 def looping puts "#{ Time.new } looping" loop_check_state loop do rs, ws = IO.select( @reads, @writes ) rs.each do | sock | role = @roles[ sock ] case role when :dotr then read_dotr( sock ) when :redir then read_redir( sock ) when :dns then read_dns( sock ) when :ctl then read_ctl( sock ) when :src then read_src( sock ) when :dst then read_dst( sock ) when :tun then read_tun( sock ) else puts "#{ Time.new } read unknown role #{ role }" close_sock( sock ) end end ws.each do | sock | role = @roles[ sock ] case role when :src then write_src( sock ) when :dst then write_dst( sock ) when :tun then write_tun( sock ) else puts "#{ Time.new } write unknown role #{ role }" close_sock( sock ) end end end rescue Interrupt => e puts e.class quit! end
quit!()
click to toggle source
quit!
# File lib/girl/proxy_worker.rb, line 90 def quit! # puts "debug exit" send_ctlmsg( [ CTL_FIN ].pack( 'C' ) ) exit end
Private Instance Methods
add_a_new_source( src )
click to toggle source
add a new source
# File lib/girl/proxy_worker.rb, line 101 def add_a_new_source( src ) return if @ctl.nil? || @ctl.closed? || @ctl_info[ :tund_addrs ].nil? src_info = @src_infos[ src ] destination_domain = src_info[ :destination_domain ] destination_port = src_info[ :destination_port ] domain_port = [ destination_domain, destination_port ].join( ':' ) puts "#{ Time.new } add a new source #{ src_info[ :src_id ] } #{ domain_port }" key = [ A_NEW_SOURCE, src_info[ :src_id ] ].pack( 'CQ>' ) add_ctlmsg( key, "#{ domain_port }/#{ @im }" ) end
add_ctlmsg( key, data )
click to toggle source
add ctlmsg
# File lib/girl/proxy_worker.rb, line 115 def add_ctlmsg( key, data ) return if @ctl.nil? || @ctl.closed? ctlmsg = "#{ key }#{ data }" send_ctlmsg( ctlmsg ) @ctl_info[ :resends ] << key loop_resend_ctlmsg( key, ctlmsg ) end
add_dst_wbuff( dst, data )
click to toggle source
add dst wbuff
# File lib/girl/proxy_worker.rb, line 126 def add_dst_wbuff( dst, data ) return if dst.nil? || dst.closed? dst_info = @dst_infos[ dst ] dst_info[ :wbuff ] << data add_write( dst ) if dst_info[ :wbuff ].bytesize >= WBUFF_LIMIT then src = dst_info[ :src ] if src && !src.closed? then src_info = @src_infos[ src ] puts "#{ Time.new } pause direct src #{ src_info[ :destination_domain ].inspect }" @reads.delete( src ) src_info[ :paused ] = true end end end
add_hello()
click to toggle source
add hello
# File lib/girl/proxy_worker.rb, line 147 def add_hello hello = @custom.hello puts "#{ Time.new } hello i'm #{ hello.inspect }" add_ctlmsg( [ HELLO ].pack( 'C' ), hello ) end
add_read( sock, role = nil )
click to toggle source
add read
# File lib/girl/proxy_worker.rb, line 156 def add_read( sock, role = nil ) return if sock.nil? || sock.closed? || @reads.include?( sock ) @reads << sock if role then @roles[ sock ] = role end end
add_socks5_conn_reply( src )
click to toggle source
add socks5 conn reply
# File lib/girl/proxy_worker.rb, line 168 def add_socks5_conn_reply( src ) # +----+-----+-------+------+----------+----------+ # |VER | REP | RSV | ATYP | BND.ADDR | BND.PORT | # +----+-----+-------+------+----------+----------+ # | 1 | 1 | X'00' | 1 | Variable | 2 | # +----+-----+-------+------+----------+----------+ redir_ip, redir_port = @redir_local_address.ip_unpack data = [ [ 5, 0, 0, 1 ].pack( 'C4' ), IPAddr.new( redir_ip ).hton, [ redir_port ].pack( 'n' ) ].join # puts "debug add src.wbuff socks5 conn reply #{ data.inspect }" add_src_wbuff( src, data ) end
add_src_rbuff( src, data )
click to toggle source
add src rbuff
# File lib/girl/proxy_worker.rb, line 183 def add_src_rbuff( src, data ) return if src.nil? || src.closed? src_info = @src_infos[ src ] return if src_info[ :closing ] src_info[ :rbuff ] << data if src_info[ :rbuff ].bytesize >= WBUFF_LIMIT then puts "#{ Time.new } src rbuff full" close_src( src ) end end
add_src_wbuff( src, data )
click to toggle source
add src wbuff
# File lib/girl/proxy_worker.rb, line 198 def add_src_wbuff( src, data ) return if src.nil? || src.closed? src_info = @src_infos[ src ] return if src_info[ :closing ] src_info[ :wbuff ] << data add_write( src ) if src_info[ :wbuff ].bytesize >= WBUFF_LIMIT then dst = src_info[ :dst ] if dst then dst_info = @dst_infos[ dst ] if dst_info then puts "#{ Time.new } pause dst #{ dst_info[ :domain ].inspect }" @reads.delete( dst ) dst_info[ :paused ] = true end else tun = src_info[ :tun ] if tun then tun_info = @tun_infos[ tun ] if tun_info then puts "#{ Time.new } pause tun #{ tun_info[ :domain ].inspect }" @reads.delete( tun ) tun_info[ :paused ] = true end end end end end
add_tun_wbuff( tun, data )
click to toggle source
add tun wbuff
# File lib/girl/proxy_worker.rb, line 235 def add_tun_wbuff( tun, data ) return if tun.nil? || tun.closed? tun_info = @tun_infos[ tun ] return if tun_info[ :closing ] tun_info[ :wbuff ] << data add_write( tun ) if tun_info[ :wbuff ].bytesize >= WBUFF_LIMIT then src = tun_info[ :src ] if src && !src.closed? then src_info = @src_infos[ src ] puts "#{ Time.new } pause remote src #{ src_info[ :destination_domain ].inspect }" @reads.delete( src ) src_info[ :paused ] = true end end end
add_write( sock )
click to toggle source
add write
# File lib/girl/proxy_worker.rb, line 257 def add_write( sock ) return if sock.nil? || sock.closed? || @writes.include?( sock ) @writes << sock end
close_ctl( ctl )
click to toggle source
close ctl
# File lib/girl/proxy_worker.rb, line 265 def close_ctl( ctl ) return if ctl.nil? || ctl.closed? close_sock( ctl ) end
close_dns( dns )
click to toggle source
close dns
# File lib/girl/proxy_worker.rb, line 273 def close_dns( dns ) return if dns.nil? || dns.closed? close_sock( dns ) @dns_infos.delete( dns ) end
close_dst( dst )
click to toggle source
close dst
# File lib/girl/proxy_worker.rb, line 282 def close_dst( dst ) return if dst.nil? || dst.closed? close_sock( dst ) dst_info = @dst_infos.delete( dst ) if dst_info then close_src( dst_info[ :src ] ) end end
close_read_dst( dst )
click to toggle source
close read dst
# File lib/girl/proxy_worker.rb, line 295 def close_read_dst( dst ) return if dst.nil? || dst.closed? # puts "debug close read dst" dst.close_read @reads.delete( dst ) if dst.closed? then # puts "debug dst closed" @writes.delete( dst ) @roles.delete( dst ) @dst_infos.delete( dst ) end end
close_read_src( src )
click to toggle source
close read src
# File lib/girl/proxy_worker.rb, line 312 def close_read_src( src ) return if src.nil? || src.closed? # puts "debug close read src" src.close_read @reads.delete( src ) src_info = @src_infos[ src ] if src_info[ :tun ] then send_src_closed_read( src_info[ :src_id ] ) end if src.closed? then # puts "debug src closed" @writes.delete( src ) @roles.delete( src ) @src_infos.delete( src ) end end
close_read_tun( tun )
click to toggle source
close read tun
# File lib/girl/proxy_worker.rb, line 334 def close_read_tun( tun ) return if tun.nil? || tun.closed? # puts "debug close read tun" tun.close_read @reads.delete( tun ) if tun.closed? then # puts "debug tun closed" @writes.delete( tun ) @roles.delete( tun ) @tun_infos.delete( tun ) end end
close_sock( sock )
click to toggle source
close sock
# File lib/girl/proxy_worker.rb, line 351 def close_sock( sock ) return if sock.nil? || sock.closed? sock.close @reads.delete( sock ) @writes.delete( sock ) @roles.delete( sock ) end
close_src( src )
click to toggle source
close src
# File lib/girl/proxy_worker.rb, line 362 def close_src( src ) return if src.nil? || src.closed? # puts "debug close src" close_sock( src ) src_info = @src_infos.delete( src ) if src_info then dst = src_info[ :dst ] if dst then close_dst( dst ) elsif src_info[ :tun ] then close_tun( src_info[ :tun ] ) send_src_closed( src_info[ :src_id ] ) end end end
close_tun( tun )
click to toggle source
close tun
# File lib/girl/proxy_worker.rb, line 383 def close_tun( tun ) return if tun.nil? || tun.closed? # puts "debug close tun" close_sock( tun ) @tun_infos.delete( tun ) end
close_write_dst( dst )
click to toggle source
close write dst
# File lib/girl/proxy_worker.rb, line 393 def close_write_dst( dst ) return if dst.nil? || dst.closed? # puts "debug close write dst" dst.close_write @writes.delete( dst ) if dst.closed? then # puts "debug dst closed" @reads.delete( dst ) @roles.delete( dst ) @dst_infos.delete( dst ) end end
close_write_src( src )
click to toggle source
close write src
# File lib/girl/proxy_worker.rb, line 410 def close_write_src( src ) return if src.nil? || src.closed? # puts "debug close write src" src.close_write @writes.delete( src ) src_info = @src_infos[ src ] if src_info[ :tun ] then send_src_closed_write( src_info[ :src_id ] ) end if src.closed? then # puts "debug src closed" @reads.delete( src ) @roles.delete( src ) @src_infos.delete( src ) end end
close_write_tun( tun )
click to toggle source
close write tun
# File lib/girl/proxy_worker.rb, line 432 def close_write_tun( tun ) return if tun.nil? || tun.closed? # puts "debug close write tun" tun.close_write @writes.delete( tun ) if tun.closed? then # puts "debug tun closed" @reads.delete( tun ) @roles.delete( tun ) @tun_infos.delete( tun ) end end
loop_check_state()
click to toggle source
loop check state
# File lib/girl/proxy_worker.rb, line 449 def loop_check_state Thread.new do loop do sleep CHECK_STATE_INTERVAL now = Time.new @src_infos.select{ | src, info | !src.closed? && info[ :paused ] }.each do | src, src_info | dst = src_info[ :dst ] if dst then dst_info = @dst_infos[ dst ] if dst_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume direct src #{ src_info[ :destination_domain ].inspect }" add_read( src ) src_info[ :paused ] = false next_tick end else tun = src_info[ :tun ] if tun && !tun.closed? then tun_info = @tun_infos[ tun ] if tun_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume remote src #{ src_info[ :destination_domain ].inspect }" add_read( src ) src_info[ :paused ] = false next_tick end end end end @dst_infos.select{ | dst, info | !dst.closed? }.each do | dst, dst_info | if !dst_info[ :connected ] && ( now - dst_info[ :created_at ] >= EXPIRE_CONNECTING ) then puts "#{ Time.new } expire dst #{ dst_info[ :domain ].inspect }" dst_info[ :closing ] = true next_tick elsif dst_info[ :paused ] then src = dst_info[ :src ] if src && !src.closed? then src_info = @src_infos[ src ] if src_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume dst #{ dst_info[ :domain ].inspect }" add_read( dst ) dst_info[ :paused ] = false next_tick end end end end @tun_infos.select{ | tun, info | !tun.closed? }.each do | tun, tun_info | if tun_info[ :paused ] then src = tun_info[ :src ] if src && !src.closed? then src_info = @src_infos[ src ] if src_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume tun #{ tun_info[ :domain ].inspect }" add_read( tun ) tun_info[ :paused ] = false next_tick end end elsif !tun_info[ :pong ] && ( now - tun_info[ :created_at ] >= PING_TIMEOUT ) then puts "#{ Time.new } ping timeout #{ tun_info[ :domain ].inspect }" tun_info[ :is_ping_timeout ] = true next_tick end end @dns_infos.select{ | dns, info | !dns.closed? && ( now - info[ :created_at ] >= EXPIRE_NEW ) }.values.each do | dns_info | puts "#{ Time.new } expire dns #{ dns_info[ :domain ].inspect }" dns_info[ :closing ] = true next_tick end end end end
loop_resend_ctlmsg( key, ctlmsg )
click to toggle source
loop resend ctlmsg
# File lib/girl/proxy_worker.rb, line 537 def loop_resend_ctlmsg( key, ctlmsg ) Thread.new do resending = true RESEND_LIMIT.times do sleep RESEND_INTERVAL if @ctl && !@ctl.closed? && @ctl_info[ :resends ].include?( key ) then puts "#{ Time.new } resend #{ ctlmsg.inspect }" send_ctlmsg( ctlmsg ) else resending = false end break unless resending end if resending then set_ctl_closing end end end
new_a_ctl()
click to toggle source
new a ctl
# File lib/girl/proxy_worker.rb, line 563 def new_a_ctl ctl = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 ) ctld_port = @proxyd_port + 10.times.to_a.sample ctld_addr = Socket.sockaddr_in( ctld_port, @proxyd_host ) @ctl = ctl @ctl_info = { ctld_addr: ctld_addr, # ctld地址 resends: [], # 重传的key tund_addrs: nil, # tund地址 closing: false # 准备关闭 } add_read( ctl, :ctl ) puts "#{ Time.new } new a ctl #{ ctld_port }" puts "srcs #{ @src_infos.size } dsts #{ @dst_infos.size } tuns #{ @tun_infos.size } dnses #{ @dns_infos.size }" add_hello end
new_a_dst( ipaddr, src )
click to toggle source
new a dst
# File lib/girl/proxy_worker.rb, line 585 def new_a_dst( ipaddr, src ) return if src.nil? || src.closed? src_info = @src_infos[ src ] domain = src_info[ :destination_domain ] port = src_info[ :destination_port ] ip = ipaddr.to_s destination_addr = Socket.sockaddr_in( port, ip ) begin dst = Socket.new( ipaddr.ipv4? ? Socket::AF_INET : Socket::AF_INET6, Socket::SOCK_STREAM, 0 ) rescue Exception => e puts "#{ Time.new } new a dst #{ e.class } #{ domain.inspect } #{ ip } #{ port }" close_src( src ) return end dst.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 ) begin dst.connect_nonblock( destination_addr ) rescue IO::WaitWritable rescue Exception => e puts "#{ Time.new } dst connect destination #{ e.class } #{ domain.inspect } #{ ip } #{ port }" dst.close close_src( src ) return end # puts "debug a new dst #{ dst.local_address.inspect }" dst_info = { src: src, # 对应src domain: domain, # 目的地 wbuff: '', # 写前 created_at: Time.new, # 创建时间 connected: false, # 是否已连接 closing_write: false, # 准备关闭写 closing: false, # 准备关闭 paused: false # 是否已暂停 } @dst_infos[ dst ] = dst_info src_info[ :proxy_type ] = :direct src_info[ :dst ] = dst if src_info[ :proxy_proto ] == :http then if src_info[ :is_connect ] then # puts "debug add src wbuff http ok" add_src_wbuff( src, HTTP_OK ) elsif src_info[ :rbuff ] then # puts "debug move src rbuff to dst wbuff" dst_info[ :wbuff ] << src_info[ :rbuff ] end elsif src_info[ :proxy_proto ] == :socks5 then add_socks5_conn_reply( src ) end add_read( dst, :dst ) add_write( dst ) end
new_a_pipe()
click to toggle source
new a pipe
# File lib/girl/proxy_worker.rb, line 648 def new_a_pipe dotr, dotw = IO.pipe @dotw = dotw add_read( dotr, :dotr ) end
new_a_redir( redir_port )
click to toggle source
new a redir
# File lib/girl/proxy_worker.rb, line 657 def new_a_redir( redir_port ) redir = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) redir.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 ) redir.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 ) if RUBY_PLATFORM.include?( 'linux' ) then redir.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 ) end redir.bind( Socket.sockaddr_in( redir_port, '0.0.0.0' ) ) redir.listen( 127 ) puts "#{ Time.new } redir listen on #{ redir_port }" add_read( redir, :redir ) @redir_port = redir_port @redir_local_address = redir.local_address end
new_a_remote( src )
click to toggle source
new a remote
# File lib/girl/proxy_worker.rb, line 677 def new_a_remote( src ) return if src.nil? || src.closed? if @ctl.nil? || @ctl.closed? then new_a_ctl end src_info = @src_infos[ src ] src_info[ :proxy_type ] = :remote add_a_new_source( src ) end
new_a_tun( src )
click to toggle source
new a tun
# File lib/girl/proxy_worker.rb, line 692 def new_a_tun( src ) return if src.nil? || src.closed? || @ctl_info[ :tund_addrs ].nil? src_info = @src_infos[ src ] dst_id = src_info[ :dst_id ] return unless dst_id tund_addr = @ctl_info[ :tund_addrs ].sample # puts "debug new a tun #{ Addrinfo.new( tund_addr ).inspect }" tun = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) tun.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 ) begin tun.connect_nonblock( tund_addr ) rescue IO::WaitWritable rescue Exception => e puts "#{ Time.new } connect tund #{ e.class }" tun.close return end domain = src_info[ :destination_domain ] @tun_infos[ tun ] = { src: src, # 对应src domain: src_info[ :destination_domain ], # 目的地 rbuff: '', # 暂存不满一块的流量 wbuff: [ dst_id ].pack( 'n' ), # 写前 created_at: Time.new, # 创建时间 pong: false, # 是否有回应 is_ping_timeout: false, # ping超时 closing: false, # 准备关闭 paused: false # 是否已暂停 } src_info[ :tun ] = tun add_read( tun, :tun ) add_write( tun ) end
new_a_tunnel( ipaddr, src )
click to toggle source
new a tunnel
# File lib/girl/proxy_worker.rb, line 735 def new_a_tunnel( ipaddr, src ) return if src.nil? || src.closed? src_info = @src_infos[ src ] ip = ipaddr.to_s port = src_info[ :destination_port ] if ( @local_addrinfos.any?{ | _addrinfo | _addrinfo.ip_address == ip } ) && ( port == @redir_port ) then puts "#{ Time.new } ignore #{ ip }:#{ port }" close_src( src ) return end if ( src_info[ :destination_domain ] == @proxyd_host ) && ![ 80, 443 ].include?( port ) then # 访问远端非80/443端口,直连 puts "#{ Time.new } direct #{ ip } #{ port }" new_a_dst( ipaddr, src ) return end if @is_direct_caches.include?( ip ) then is_direct = @is_direct_caches[ ip ] else is_direct = @directs.any?{ | direct | direct.include?( ip ) } puts "#{ Time.new } cache is direct #{ src_info[ :destination_domain ] } #{ ip } #{ is_direct }" @is_direct_caches[ ip ] = is_direct end if is_direct then # puts "debug hit directs #{ ip }" new_a_dst( ipaddr, src ) else # puts "debug go remote #{ ip }" new_a_remote( src ) end end
next_tick()
click to toggle source
next tick
# File lib/girl/proxy_worker.rb, line 774 def next_tick @dotw.write( '.' ) end
pack_a_chunk( data )
click to toggle source
pack a chunk
# File lib/girl/proxy_worker.rb, line 781 def pack_a_chunk( data ) data = @custom.encode( data ) "#{ [ data.bytesize ].pack( 'n' ) }#{ data }" end
read_ctl( ctl )
click to toggle source
read ctl
# File lib/girl/proxy_worker.rb, line 1103 def read_ctl( ctl ) begin data, addrinfo, rflags, *controls = ctl.recvmsg rescue Exception => e puts "#{ Time.new } ctl recvmsg #{ e.class }" close_ctl( ctl ) return end data = @custom.decode( data ) ctl_num = data[ 0 ].unpack( 'C' ).first case ctl_num when TUND_PORTS then return if data.bytesize != 21 tund_ports = data[ 1, 20 ].unpack( 'n*' ) puts "#{ Time.new } got tund ports #{ tund_ports.inspect }" @ctl_info[ :resends ].delete( [ HELLO ].pack( 'C' ) ) @ctl_info[ :tund_addrs ] = tund_ports.map{ | tund_port | Socket.sockaddr_in( tund_port, @proxyd_host ) } @src_infos.select{ | _, info | ( info[ :proxy_type ] == :remote ) && !info[ :dst_id ] }.keys.each{ | src | add_a_new_source( src ) } when PAIRED then return if data.bytesize != 11 || @ctl_info[ :tund_addrs ].nil? src_id, dst_id = data[ 1, 10 ].unpack( 'Q>n' ) src, src_info = @src_infos.find{ | _, info | ( info[ :src_id ] == src_id ) && info[ :dst_id ].nil? } return if src.nil? || src.closed? # puts "debug got paired #{ src_id } #{ dst_id }" src_info[ :dst_id ] = dst_id @ctl_info[ :resends ].delete( [ A_NEW_SOURCE, src_id ].pack( 'CQ>' ) ) new_a_tun( src ) when UNKNOWN_CTL_ADDR then puts "#{ Time.new } got unknown ctl addr" close_ctl( ctl ) new_a_ctl end end
read_dns( dns )
click to toggle source
read dns
# File lib/girl/proxy_worker.rb, line 1060 def read_dns( dns ) if dns.closed? then puts "#{ Time.new } read dns but dns closed?" return end begin data, addrinfo, rflags, *controls = dns.recvmsg rescue Exception => e puts "#{ Time.new } dns recvmsg #{ e.class }" close_dns( dns ) return end # puts "debug recv dns #{ data.inspect }" begin packet = Net::DNS::Packet::parse( data ) rescue Exception => e puts "#{ Time.new } parse packet #{ e.class }" close_dns( dns ) return end dns_info = @dns_infos[ dns ] src = dns_info[ :src ] domain = dns_info[ :domain ] ans = packet.answer.find{ | ans | ans.class == Net::DNS::RR::A } if ans then ipaddr = IPAddr.new( ans.value ) @resolv_caches[ domain ] = [ ipaddr, Time.new ] new_a_tunnel( ipaddr, src ) else puts "#{ Time.new } dns query no answer #{ domain.inspect }" close_src( src ) end close_dns( dns ) end
read_dotr( dotr )
click to toggle source
read dotr
# File lib/girl/proxy_worker.rb, line 1005 def read_dotr( dotr ) dotr.read_nonblock( READ_SIZE ) @dns_infos.select{ | _, info | info[ :closing ] }.keys.each{ | dns | close_dns( dns ) } if @ctl && !@ctl.closed? && @ctl_info[ :closing ] then send_ctlmsg( [ CTL_FIN ].pack( 'C' ) ) close_ctl( @ctl ) end @dst_infos.select{ | _, info | info[ :closing ] }.keys.each{ | dst | close_dst( dst ) } @tun_infos.select{ | _, info | info[ :is_ping_timeout ] }.keys.each{ | tun | renew_a_tun( tun ) } end
read_dst( dst )
click to toggle source
read dst
# File lib/girl/proxy_worker.rb, line 1338 def read_dst( dst ) if dst.closed? then puts "#{ Time.new } read dst but dst closed?" return end dst_info = @dst_infos[ dst ] src = dst_info[ :src ] begin data = dst.read_nonblock( CHUNK_SIZE ) rescue Exception => e # puts "debug read dst #{ e.class }" close_read_dst( dst ) set_src_closing_write( src ) return end add_src_wbuff( src, data ) end
read_redir( redir )
click to toggle source
read redir
# File lib/girl/proxy_worker.rb, line 1021 def read_redir( redir ) begin src, addrinfo = redir.accept_nonblock rescue IO::WaitReadable, Errno::EINTR => e puts "accept #{ e.class }" return end src_id = rand( ( 2 ** 64 ) - 2 ) + 1 # puts "debug accept a src #{ src_id } #{ addrinfo.ip_unpack.inspect }" @src_infos[ src ] = { src_id: src_id, # src id addrinfo: addrinfo, # addrinfo proxy_proto: :uncheck, # :uncheck / :http / :socks5 proxy_type: :uncheck, # :uncheck / :checking / :direct / :remote / :negotiation destination_domain: nil, # 目的地域名 destination_port: nil, # 目的地端口 is_connect: true, # 代理协议是http的场合,是否是CONNECT rbuff: '', # 读到的流量 dst: nil, # :direct的场合,对应的dst dst_id: nil, # :remote的场合,远端dst id ctl: nil, # :remote的场合,对应的ctl tun: nil, # :remote的场合,对应的tun renew_tun_times: 0, # :remote的场合,重建tun次数 pong: false, # :remote的场合,连接已确认 wbuff: '', # 从dst/tun读到的流量 created_at: Time.new, # 创建时间 closing_write: false, # 准备关闭写 closing: false, # 准备关闭 paused: false # 是否暂停 } add_read( src, :src ) end
read_src( src )
click to toggle source
read src
# File lib/girl/proxy_worker.rb, line 1143 def read_src( src ) if src.closed? then puts "#{ Time.new } read src but src closed?" return end src_info = @src_infos[ src ] begin data = src.read_nonblock( CHUNK_SIZE ) rescue Exception => e # puts "debug read src #{ e.class }" close_read_src( src ) dst = src_info[ :dst ] if dst then set_dst_closing_write( dst ) else set_tun_closing_write( src_info[ :tun ] ) end return end proxy_type = src_info[ :proxy_type ] case proxy_type when :uncheck then if data[ 0, 7 ] == 'CONNECT' then # puts "debug CONNECT" domain_port = data.split( "\r\n" )[ 0 ].split( ' ' )[ 1 ] unless domain_port then puts "#{ Time.new } CONNECT miss domain" close_src( src ) return end elsif data[ 0 ].unpack( 'C' ).first == 5 then # puts "debug socks5 #{ data.inspect }" # https://tools.ietf.org/html/rfc1928 # # +----+----------+----------+ # |VER | NMETHODS | METHODS | # +----+----------+----------+ # | 1 | 1 | 1 to 255 | # +----+----------+----------+ nmethods = data[ 1 ].unpack( 'C' ).first methods = data[ 2, nmethods ].unpack( 'C*' ) unless methods.include?( 0 ) then puts "#{ Time.new } miss method 00" close_src( src ) return end # +----+--------+ # |VER | METHOD | # +----+--------+ # | 1 | 1 | # +----+--------+ data2 = [ 5, 0 ].pack( 'CC' ) add_src_wbuff( src, data2 ) src_info[ :proxy_proto ] = :socks5 src_info[ :proxy_type ] = :negotiation return else # puts "debug not CONNECT #{ data.inspect }" host_line = data.split( "\r\n" ).find{ | _line | _line[ 0, 6 ] == 'Host: ' } unless host_line then # puts "debug not found host line" close_src( src ) return end lines = data.split( "\r\n" ) unless lines.empty? then method, url, proto = lines.first.split( ' ' ) if proto && url && proto[ 0, 4 ] == 'HTTP' && url[ 0, 7 ] == 'http://' then domain_port = url.split( '/' )[ 2 ] # puts "debug domain port #{ domain_port }" end end unless domain_port then # puts "debug not HTTP" domain_port = host_line.split( ' ' )[ 1 ] unless domain_port then puts "#{ Time.new } Host line miss domain" close_src( src ) return end end src_info[ :is_connect ] = false src_info[ :rbuff ] << data end colon_idx = domain_port.rindex( ':' ) close_idx = domain_port.rindex( ']' ) if colon_idx && ( close_idx.nil? || ( colon_idx > close_idx ) ) then domain = domain_port[ 0...colon_idx ] port = domain_port[ ( colon_idx + 1 )..-1 ].to_i else domain = domain_port port = 80 end domain = domain.gsub( /\[|\]/, '' ) src_info[ :proxy_proto ] = :http src_info[ :destination_domain ] = domain src_info[ :destination_port ] = port resolve_domain( domain, src ) when :checking then # puts "debug add src rbuff before resolved #{ data.inspect }" src_info[ :rbuff ] << data when :negotiation then # +----+-----+-------+------+----------+----------+ # |VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT | # +----+-----+-------+------+----------+----------+ # | 1 | 1 | X'00' | 1 | Variable | 2 | # +----+-----+-------+------+----------+----------+ # puts "debug negotiation #{ data.inspect }" ver, cmd, rsv, atyp = data[ 0, 4 ].unpack( 'C4' ) if cmd == 1 then # puts "debug socks5 CONNECT" if atyp == 1 then destination_host, destination_port = data[ 4, 6 ].unpack( 'Nn' ) destination_addr = Socket.sockaddr_in( destination_port, destination_host ) begin destination_addrinfo = Addrinfo.new( destination_addr ) rescue Exception => e puts "#{ Time.new } new addrinfo #{ e.class }" close_src( src ) return end destination_ip = destination_addrinfo.ip_address src_info[ :destination_domain ] = destination_ip src_info[ :destination_port ] = destination_port # puts "debug IP V4 address #{ destination_ip } #{ destination_port }" ipaddr = IPAddr.new( destination_ip ) new_a_tunnel( ipaddr, src ) elsif atyp == 3 then domain_len = data[ 4 ].unpack( 'C' ).first if ( domain_len + 7 ) == data.bytesize then domain = data[ 5, domain_len ] port = data[ ( 5 + domain_len ), 2 ].unpack( 'n' ).first src_info[ :destination_domain ] = domain src_info[ :destination_port ] = port # puts "debug DOMAINNAME #{ domain } #{ port }" resolve_domain( domain, src ) end else puts "#{ Time.new } socks5 atyp #{ atyp } not implement" close_src( src ) end else puts "#{ Time.new } socks5 cmd #{ cmd } not implement" close_src( src ) end when :remote then tun = src_info[ :tun ] if tun then add_tun_wbuff( tun, pack_a_chunk( data ) ) else # puts "debug add src rbuff #{ data.bytesize }" add_src_rbuff( src, data ) end when :direct then dst = src_info[ :dst ] if dst then add_dst_wbuff( dst, data ) else # puts "debug add src.rbuff #{ data.bytesize }" add_src_rbuff( src, data ) end end end
read_tun( tun )
click to toggle source
read tun
# File lib/girl/proxy_worker.rb, line 1362 def read_tun( tun ) if tun.closed? then puts "#{ Time.new } read tun but tun closed?" return end tun_info = @tun_infos[ tun ] src = tun_info[ :src ] if src.closed? then close_tun( tun ) return end begin data = tun.read_nonblock( READ_SIZE ) rescue Exception => e # puts "debug read tun #{ e.class }" close_read_tun( tun ) set_src_closing_write( src ) return end unless tun_info[ :pong ] then if data.bytesize < 8 then puts "#{ Time.new } pong length less than 8?" close_tun( tun ) return end src_info = @src_infos[ src ] if data[ 0, 8 ].unpack( 'Q>' ).first != src_info[ :src_id ] then puts "#{ Time.new } invalid pong?" close_tun( tun ) return end # puts "debug got pong #{ data.bytesize }" set_tun_info_pong( tun, src ) data = data[ 8..-1 ] if data.empty? then return end end data = "#{ tun_info[ :rbuff ] }#{ data }" loop do if data.bytesize <= 2 then tun_info[ :rbuff ] = data break end len = data[ 0, 2 ].unpack( 'n' ).first if len == 0 then puts "#{ Time.new } zero traffic len?" close_tun( tun ) close_src( src ) return end chunk = data[ 2, len ] if chunk.bytesize < len then tun_info[ :rbuff ] = data break end chunk = @custom.decode( chunk ) add_src_wbuff( src, chunk ) data = data[ ( 2 + len )..-1 ] end end
renew_a_tun( tun )
click to toggle source
renew a tun
# File lib/girl/proxy_worker.rb, line 789 def renew_a_tun( tun ) tun_info = close_tun( tun ) src = tun_info[ :src ] return if src.nil? || src.closed? src_info = @src_infos[ src ] if src_info[ :renew_tun_times ] >= RENEW_TUN_LIMIT then puts "#{ Time.new } renew a tun out of limit #{ src_info[ :destination_domain ].inspect } #{ src_info[ :destination_port ] }" close_src( src ) else new_a_tun( src ) src_info[ :renew_tun_times ] += 1 end end
resolve_domain( domain, src )
click to toggle source
resolve domain
# File lib/girl/proxy_worker.rb, line 807 def resolve_domain( domain, src ) return if src.nil? || src.closed? if @remotes.any?{ | remote | ( domain.size >= remote.size ) && ( domain[ ( remote.size * -1 )..-1 ] == remote ) } then # puts "debug hit remotes #{ domain }" new_a_remote( src ) return end resolv_cache = @resolv_caches[ domain ] if resolv_cache then ipaddr, created_at = resolv_cache if Time.new - created_at < RESOLV_CACHE_EXPIRE then # puts "debug hit resolv cache #{ domain } #{ ipaddr.to_s }" new_a_tunnel( ipaddr, src ) return end # puts "debug expire resolv cache #{ domain }" @resolv_caches.delete( domain ) end src_info = @src_infos[ src ] if domain == 'localhost' then ip = src_info[ :addrinfo ].ip_address # puts "debug redirect #{ domain } #{ ip }" ipaddr = IPAddr.new( ip ) new_a_tunnel( ipaddr, src ) return end begin ipaddr = IPAddr.new( domain ) if ipaddr.ipv4? || ipaddr.ipv6? then new_a_tunnel( ipaddr, src ) return end rescue Exception => e end begin packet = Net::DNS::Packet.new( domain ) rescue Exception => e puts "#{ Time.new } new packet #{ e.class } #{ domain.inspect }" close_src( src ) return end dns = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 ) begin # puts "debug dns query #{ domain }" dns.sendmsg_nonblock( packet.data, 0, @nameserver_addr ) rescue Exception => e puts "#{ Time.new } dns sendmsg #{ e.class }" dns.close close_src( src ) return end @dns_infos[ dns ] = { domain: domain, src: src, created_at: Time.new, closing: false } add_read( dns, :dns ) src_info[ :proxy_type ] = :checking end
send_ctlmsg( data )
click to toggle source
send ctlmsg
# File lib/girl/proxy_worker.rb, line 885 def send_ctlmsg( data ) return if @ctl.nil? || @ctl.closed? data = @custom.encode( data ) begin @ctl.sendmsg_nonblock( data, 0, @ctl_info[ :ctld_addr ] ) rescue Exception => e puts "#{ Time.new } ctl sendmsg #{ e.class }" set_ctl_closing end end
send_src_closed( src_id )
click to toggle source
send src closed
# File lib/girl/proxy_worker.rb, line 900 def send_src_closed( src_id ) return if @ctl.nil? || @ctl.closed? data = "#{ [ SOURCE_CLOSED, src_id ].pack( 'CQ>' ) }#{ @im }" send_ctlmsg( data ) end
send_src_closed_read( src_id )
click to toggle source
send src closed read
# File lib/girl/proxy_worker.rb, line 909 def send_src_closed_read( src_id ) return if @ctl.nil? || @ctl.closed? data = "#{ [ SOURCE_CLOSED_READ, src_id ].pack( 'CQ>' ) }#{ @im }" send_ctlmsg( data ) end
send_src_closed_write( src_id )
click to toggle source
send src closed write
# File lib/girl/proxy_worker.rb, line 918 def send_src_closed_write( src_id ) return if @ctl.nil? || @ctl.closed? data = "#{ [ SOURCE_CLOSED_WRITE, src_id ].pack( 'CQ>' ) }#{ @im }" send_ctlmsg( data ) end
set_ctl_closing()
click to toggle source
set ctl closing
# File lib/girl/proxy_worker.rb, line 927 def set_ctl_closing return if @ctl.nil? || @ctl.closed? || @ctl_info[ :closing ] @ctl_info[ :closing ] = true next_tick end
set_dst_closing_write( dst )
click to toggle source
set dst closing write
# File lib/girl/proxy_worker.rb, line 936 def set_dst_closing_write( dst ) return if dst.nil? || dst.closed? dst_info = @dst_infos[ dst ] return if dst_info[ :closing_write ] # puts "debug set dst closing write" dst_info[ :closing_write ] = true add_write( dst ) end
set_src_closing_write( src )
click to toggle source
set src closing write
# File lib/girl/proxy_worker.rb, line 948 def set_src_closing_write( src ) return if src.nil? || src.closed? src_info = @src_infos[ src ] return if src_info[ :closing ] || src_info[ :closing_write ] src_info[ :closing_write ] = true add_write( src ) end
set_tun_closing_write( tun )
click to toggle source
set tun closing write
# File lib/girl/proxy_worker.rb, line 959 def set_tun_closing_write( tun ) return if tun.nil? || tun.closed? tun_info = @tun_infos[ tun ] return if tun_info[ :closing ] || tun_info[ :closing_write ] tun_info[ :closing_write ] = true add_write( tun ) end
set_tun_info_pong( tun, src )
click to toggle source
set tun info pong
# File lib/girl/proxy_worker.rb, line 970 def set_tun_info_pong( tun, src ) tun_info = @tun_infos[ tun ] tun_info[ :pong ] = true src_info = @src_infos[ src ] unless src_info[ :pong ] then if src_info[ :proxy_proto ] == :http then if src_info[ :is_connect ] then # puts "debug add src wbuff http ok" add_src_wbuff( src, HTTP_OK ) end elsif src_info[ :proxy_proto ] == :socks5 then add_socks5_conn_reply( src ) end src_info[ :pong ] = true end unless src_info[ :rbuff ].empty? then data = '' until src_info[ :rbuff ].empty? do # puts "debug move src rbuff to tun wbuff #{ src_info[ :rbuff ].bytesize }" chunk_data = src_info[ :rbuff ][ 0, CHUNK_SIZE ] data << pack_a_chunk( chunk_data ) src_info[ :rbuff ] = src_info[ :rbuff ][ chunk_data.bytesize..-1 ] end add_tun_wbuff( tun, data ) end end
write_dst( dst )
click to toggle source
write dst
# File lib/girl/proxy_worker.rb, line 1486 def write_dst( dst ) if dst.closed? then puts "#{ Time.new } write dst but dst closed?" return end dst_info = @dst_infos[ dst ] dst_info[ :connected ] = true src = dst_info[ :src ] data = dst_info[ :wbuff ] # 写前为空,处理关闭写 if data.empty? then if dst_info[ :closing_write ] then close_write_dst( dst ) else @writes.delete( dst ) end return end # 写入 begin written = dst.write_nonblock( data ) rescue Exception => e # puts "debug write dst #{ e.class }" close_write_dst( dst ) close_read_src( src ) return end data = data[ written..-1 ] dst_info[ :wbuff ] = data end
write_src( src )
click to toggle source
write src
# File lib/girl/proxy_worker.rb, line 1442 def write_src( src ) if src.closed? then puts "#{ Time.new } write src but src closed?" return end src_info = @src_infos[ src ] dst = src_info[ :dst ] data = src_info[ :wbuff ] # 写前为空,处理关闭写 if data.empty? then if src_info[ :closing_write ] then close_write_src( src ) else @writes.delete( src ) end return end # 写入 begin written = src.write_nonblock( data ) rescue Exception => e # puts "debug write src #{ e.class }" close_write_src( src ) if dst then close_read_dst( dst ) else close_read_tun( src_info[ :tun ] ) end return end data = data[ written..-1 ] src_info[ :wbuff ] = data end
write_tun( tun )
click to toggle source
write tun
# File lib/girl/proxy_worker.rb, line 1525 def write_tun( tun ) if tun.closed? then puts "#{ Time.new } write tun but tun closed?" return end tun_info = @tun_infos[ tun ] src = tun_info[ :src ] data = tun_info[ :wbuff ] # 写前为空,处理关闭写 if data.empty? then if tun_info[ :closing_write ] then close_write_tun( tun ) else @writes.delete( tun ) end return end # 写入 begin written = tun.write_nonblock( data ) rescue Exception => e # puts "debug write tun #{ e.class }" close_write_tun( tun ) close_read_src( src ) return end # puts "debug write tun #{ written }" data = data[ written..-1 ] tun_info[ :wbuff ] = data end