class Girl::ProxydWorker
Public Class Methods
new( proxyd_port, infod_port, nameserver )
click to toggle source
initialize
# File lib/girl/proxyd_worker.rb, line 7 def initialize( proxyd_port, infod_port, nameserver ) @custom = Girl::ProxydCustom.new @reads = [] @writes = [] @roles = {} # sock => :dotr / :ctld / :ctl / :infod / :dst / :tund / :tun / :dns @ctl_infos = {} # im => { :ctl_addr, :ctld, :tunds } @tund_infos = {} # tund => { :im } @resolv_caches = {} # domain => [ ip, created_at ] @dst_infos = ConcurrentHash.new # dst => { :dst_id, :im, :domain, :rbuff, :tun, :wbuff, :src_id, # :created_at, :connected, :last_add_wbuff_at, :closing_write, :closing, :paused } @tun_infos = ConcurrentHash.new # tun => { :im, :dst, :domain, :rbuff, :wbuff, :created_at, :last_add_wbuff_at, :closing, :paused } @dns_infos = ConcurrentHash.new # dns => { :im, :src_id, :domain, :port, :created_at, :closing } @traffs = ConcurrentHash.new # im => { :in, :out } @nameserver_addr = Socket.sockaddr_in( 53, nameserver ) new_a_pipe new_ctlds( proxyd_port ) new_a_infod( infod_port ) end
Public Instance Methods
looping()
click to toggle source
looping
# File lib/girl/proxyd_worker.rb, line 30 def looping puts "#{ Time.new } looping" loop_check_state loop_check_traff loop do rs, ws = IO.select( @reads, @writes ) rs.each do | sock | role = @roles[ sock ] case role when :dotr then read_dotr( sock ) when :dns then read_dns( sock ) when :ctld then read_ctld( sock ) when :infod then read_infod( sock ) when :dst then read_dst( sock ) when :tund then read_tund( sock ) when :tun then read_tun( sock ) else puts "#{ Time.new } read unknown role #{ role }" close_sock( sock ) end end ws.each do | sock | role = @roles[ sock ] case role when :dst then write_dst( sock ) when :tun then write_tun( sock ) else puts "#{ Time.new } write unknown role #{ role }" close_sock( sock ) end end end rescue Interrupt => e puts e.class quit! end
quit!()
click to toggle source
quit!
# File lib/girl/proxyd_worker.rb, line 84 def quit! # puts "debug exit" exit end
Private Instance Methods
add_dst_rbuff( dst, data )
click to toggle source
add dst rbuff
# File lib/girl/proxyd_worker.rb, line 94 def add_dst_rbuff( dst, data ) return if dst.nil? || dst.closed? dst_info = @dst_infos[ dst ] return if dst_info[ :closing ] dst_info[ :rbuff ] << data if dst_info[ :rbuff ].bytesize >= WBUFF_LIMIT then puts "#{ Time.new } dst rbuff full" close_dst( dst ) end end
add_dst_wbuff( dst, data )
click to toggle source
add dst wbuff
# File lib/girl/proxyd_worker.rb, line 109 def add_dst_wbuff( dst, data ) return if dst.nil? || dst.closed? dst_info = @dst_infos[ dst ] return if dst_info[ :closing ] dst_info[ :wbuff ] << data dst_info[ :last_add_wbuff_at ] = Time.new add_write( dst ) if dst_info[ :wbuff ].bytesize >= WBUFF_LIMIT then tun = dst_info[ :tun ] if tun then tun_info = @tun_infos[ tun ] if tun_info then puts "#{ Time.new } pause tun #{ tun_info[ :im ].inspect } #{ tun_info[ :domain ].inspect }" @reads.delete( tun ) tun_info[ :paused ] = true end end end end
add_read( sock, role = nil )
click to toggle source
add read
# File lib/girl/proxyd_worker.rb, line 135 def add_read( sock, role = nil ) return if sock.nil? || sock.closed? || @reads.include?( sock ) @reads << sock if role then @roles[ sock ] = role end end
add_tun_wbuff( tun, data )
click to toggle source
add tun wbuff
# File lib/girl/proxyd_worker.rb, line 147 def add_tun_wbuff( tun, data ) return if tun.nil? || tun.closed? tun_info = @tun_infos[ tun ] return if tun_info[ :closing ] tun_info[ :wbuff ] << data tun_info[ :last_add_wbuff_at ] = Time.new add_write( tun ) if tun_info[ :wbuff ].bytesize >= WBUFF_LIMIT then dst = tun_info[ :dst ] if dst then dst_info = @dst_infos[ dst ] if dst_info then puts "#{ Time.new } pause dst #{ dst_info[ :im ].inspect } #{ dst_info[ :domain ].inspect }" @reads.delete( dst ) dst_info[ :paused ] = true end end end end
add_write( sock )
click to toggle source
add write
# File lib/girl/proxyd_worker.rb, line 173 def add_write( sock ) return if sock.nil? || sock.closed? || @writes.include?( sock ) @writes << sock end
close_dns( dns )
click to toggle source
close dns
# File lib/girl/proxyd_worker.rb, line 181 def close_dns( dns ) return if dns.nil? || dns.closed? close_sock( dns ) @dns_infos.delete( dns ) end
close_dst( dst )
click to toggle source
close dst
# File lib/girl/proxyd_worker.rb, line 190 def close_dst( dst ) return if dst.nil? || dst.closed? # puts "debug close dst" close_sock( dst ) dst_info = @dst_infos.delete( dst ) if dst_info then close_tun( dst_info[ :tun ] ) end end
close_dsts( im )
click to toggle source
close dsts
# File lib/girl/proxyd_worker.rb, line 204 def close_dsts( im ) @dst_infos.select{ | _, info | info[ :im ] == im }.keys.each{ | dst | close_dst( dst ) } end
close_read_dst( dst )
click to toggle source
close read dst
# File lib/girl/proxyd_worker.rb, line 211 def close_read_dst( dst ) return if dst.nil? || dst.closed? # puts "debug close read dst" dst.close_read @reads.delete( dst ) if dst.closed? then # puts "debug dst closed" @writes.delete( dst ) @roles.delete( dst ) @dst_infos.delete( dst ) end end
close_read_tun( tun )
click to toggle source
close read tun
# File lib/girl/proxyd_worker.rb, line 228 def close_read_tun( tun ) return if tun.nil? || tun.closed? # puts "debug close read tun" tun.close_read @reads.delete( tun ) if tun.closed? then # puts "debug tun closed" @writes.delete( tun ) @roles.delete( tun ) @tun_infos.delete( tun ) end end
close_sock( sock )
click to toggle source
close sock
# File lib/girl/proxyd_worker.rb, line 245 def close_sock( sock ) return if sock.nil? || sock.closed? sock.close @reads.delete( sock ) @writes.delete( sock ) @roles.delete( sock ) end
close_tun( tun )
click to toggle source
close tun
# File lib/girl/proxyd_worker.rb, line 256 def close_tun( tun ) return if tun.nil? || tun.closed? # puts "debug close tun" close_sock( tun ) @tun_infos.delete( tun ) end
close_tund( tund )
click to toggle source
close tund
# File lib/girl/proxyd_worker.rb, line 266 def close_tund( tund ) return if tund.nil? || tund.closed? # puts "debug close tund" close_sock( tund ) @tund_infos.delete( tund ) end
close_write_dst( dst )
click to toggle source
close write dst
# File lib/girl/proxyd_worker.rb, line 276 def close_write_dst( dst ) return if dst.nil? || dst.closed? # puts "debug close write dst" dst.close_write @writes.delete( dst ) if dst.closed? then # puts "debug dst closed" @reads.delete( dst ) @roles.delete( dst ) @dst_infos.delete( dst ) end end
close_write_tun( tun )
click to toggle source
close write tun
# File lib/girl/proxyd_worker.rb, line 293 def close_write_tun( tun ) return if tun.nil? || tun.closed? # puts "debug close write tun" tun.close_write @writes.delete( tun ) if tun.closed? then # puts "debug tun closed" @reads.delete( tun ) @roles.delete( tun ) @tun_infos.delete( tun ) end end
loop_check_state()
click to toggle source
loop check state
# File lib/girl/proxyd_worker.rb, line 310 def loop_check_state Thread.new do loop do sleep CHECK_STATE_INTERVAL now = Time.new @dst_infos.select{ | dst, _ | !dst.closed? }.each do | dst, dst_info | if dst_info[ :connected ] then is_expire = ( now - ( dst_info[ :last_add_wbuff_at ] || dst_info[ :created_at ] ) >= EXPIRE_AFTER ) else is_expire = ( now - dst_info[ :created_at ] >= EXPIRE_CONNECTING ) end if is_expire then puts "#{ Time.new } expire dst #{ dst_info[ :im ].inspect } #{ dst_info[ :domain ].inspect }" dst_info[ :closing ] = true next_tick elsif dst_info[ :paused ] then tun = dst_info[ :tun ] if tun && !tun.closed? then tun_info = @tun_infos[ tun ] if tun_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume dst #{ dst_info[ :im ].inspect } #{ dst_info[ :domain ].inspect }" add_read( dst ) dst_info[ :paused ] = false next_tick end end end end @tun_infos.select{ | tun, info | !tun.closed? }.each do | tun, tun_info | if now - ( tun_info[ :last_add_wbuff_at ] || tun_info[ :created_at ] ) >= EXPIRE_AFTER then puts "#{ Time.new } expire tun #{ tun_info[ :im ].inspect } #{ tun_info[ :domain ].inspect }" tun_info[ :closing ] = true next_tick elsif tun_info[ :paused ] then dst = tun_info[ :dst ] if dst && !dst.closed? then dst_info = @dst_infos[ dst ] if dst_info[ :wbuff ].bytesize < RESUME_BELOW then puts "#{ Time.new } resume tun #{ tun_info[ :im ].inspect } #{ tun_info[ :domain ].inspect }" add_read( tun ) tun_info[ :paused ] = false next_tick end end end end @dns_infos.select{ | dns, info | !dns.closed? && ( now - info[ :created_at ] >= EXPIRE_NEW ) }.values.each do | dns_info | puts "#{ Time.new } expire dns #{ dns_info[ :im ].inspect } #{ dns_info[ :domain ].inspect }" dns_info[ :closing ] = true next_tick end end end end
loop_check_traff()
click to toggle source
loop check traff
# File lib/girl/proxyd_worker.rb, line 376 def loop_check_traff if RESET_TRAFF_DAY > 0 then Thread.new do loop do sleep CHECK_TRAFF_INTERVAL if Time.new.day == RESET_TRAFF_DAY then puts "#{ Time.new } reset traffs" @traffs.each{ | im, info | info[ :in ] = info[ :out ] = 0 } end end end end end
new_a_dst( ipaddr, domain, port, src_id, im, ctld, ctl_addr )
click to toggle source
new a dst
# File lib/girl/proxyd_worker.rb, line 394 def new_a_dst( ipaddr, domain, port, src_id, im, ctld, ctl_addr ) return if ctld.nil? || ctld.closed? begin dst = Socket.new( ipaddr.ipv4? ? Socket::AF_INET : Socket::AF_INET6, Socket::SOCK_STREAM, 0 ) rescue Exception => e puts "#{ Time.new } new a dst #{ im.inspect } #{ domain.inspect } #{ port } #{ e.class }" return end dst.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 ) ip = ipaddr.to_s destination_addr = Socket.sockaddr_in( port, ip ) begin dst.connect_nonblock( destination_addr ) rescue IO::WaitWritable rescue Exception => e puts "#{ Time.new } connect destination #{ im.inspect } #{ domain.inspect } #{ ip } #{ port } #{ e.class }" dst.close return end dst_id = dst.local_address.ip_port @dst_infos[ dst ] = { dst_id: dst_id, # dst_id im: im, # 标识 domain: domain, # 目的地 rbuff: '', # 对应的tun没准备好,暂存读到的流量 tun: nil, # 对应的tun wbuff: '', # 从tun读到的流量 src_id: src_id, # 近端src id created_at: Time.new, # 创建时间 connected: false, # 是否已连接 last_add_wbuff_at: nil, # 上一次加写前的时间 closing_write: false, # 准备关闭写 closing: false, # 准备关闭 paused: false # 已暂停 } add_read( dst, :dst ) add_write( dst ) data = [ PAIRED, src_id, dst_id ].pack( 'CQ>n' ) # puts "debug add ctlmsg paired #{ im.inspect } #{ src_id } #{ dst_id }" send_ctlmsg( ctld, data, ctl_addr ) end
new_a_infod( infod_port )
click to toggle source
new a infod
# File lib/girl/proxyd_worker.rb, line 446 def new_a_infod( infod_port ) infod = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 ) infod.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 ) infod.bind( Socket.sockaddr_in( infod_port, '127.0.0.1' ) ) puts "#{ Time.new } infod bind on #{ infod_port }" add_read( infod, :infod ) end
new_a_pipe()
click to toggle source
new a pipe
# File lib/girl/proxyd_worker.rb, line 457 def new_a_pipe dotr, dotw = IO.pipe @dotw = dotw add_read( dotr, :dotr ) end
new_a_tund()
click to toggle source
new a tund
# File lib/girl/proxyd_worker.rb, line 466 def new_a_tund tund = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) tund.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 ) tund.bind( Socket.sockaddr_in( 0, '0.0.0.0' ) ) tund.listen( 127 ) tund end
new_ctlds( begin_port )
click to toggle source
new ctlds
# File lib/girl/proxyd_worker.rb, line 477 def new_ctlds( begin_port ) 10.times do | i | ctld_port = begin_port + i ctld = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 ) ctld.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 ) ctld.bind( Socket.sockaddr_in( ctld_port, '0.0.0.0' ) ) puts "#{ Time.new } ctld bind on #{ ctld_port }" add_read( ctld, :ctld ) end end
next_tick()
click to toggle source
next tick
# File lib/girl/proxyd_worker.rb, line 491 def next_tick @dotw.write( '.' ) end
pack_a_chunk( data )
click to toggle source
pack a chunk
# File lib/girl/proxyd_worker.rb, line 498 def pack_a_chunk( data ) data = @custom.encode( data ) "#{ [ data.bytesize ].pack( 'n' ) }#{ data }" end
read_ctld( ctld )
click to toggle source
read ctld
# File lib/girl/proxyd_worker.rb, line 696 def read_ctld( ctld ) data, addrinfo, rflags, *controls = ctld.recvmsg data = @custom.decode( data ) ctl_num = data[ 0 ].unpack( 'C' ).first ctl_addr = addrinfo.to_sockaddr case ctl_num when HELLO then return if data.bytesize <= 1 im = data[ 1..-1 ] result = @custom.check( im, addrinfo ) if result != :success then puts "#{ Time.new } #{ result } #{ im.inspect } #{ addrinfo.inspect }" return end unless @traffs.include?( im ) then @traffs[ im ] = { in: 0, out: 0 } end ctl_info = @ctl_infos[ im ] if ctl_info then ctl_info[ :ctl_addr ] = ctl_addr ctl_info[ :ctld ] = ctld else tunds = [] 10.times do tund = new_a_tund @tund_infos[ tund ] = { im: im } add_read( tund, :tund ) tunds << tund end ctl_info = { ctl_addr: ctl_addr, # ctl地址 ctld: ctld, # 对应的ctld tunds: tunds, # 对应tunds } @ctl_infos[ im ] = ctl_info end puts "#{ Time.new } got hello #{ addrinfo.ip_unpack.inspect } #{ im.inspect }" print "ctls #{ @ctl_infos.size } tunds #{ @tund_infos.size }" puts " dsts #{ @dst_infos.size } tuns #{ @tun_infos.size } dnses #{ @dns_infos.size }" send_tund_ports( ctl_info ) when A_NEW_SOURCE then return if data.bytesize <= 9 src_id = data[ 1, 8 ].unpack( 'Q>' ).first domain_port, im = data[ 9..-1 ].split( '/' ) if im then ctl_info = @ctl_infos[ im ] unless ctl_info then puts "#{ Time.new } got a new source but unknown im #{ im.inspect }" send_ctlmsg( ctld, [ UNKNOWN_CTL_ADDR ].pack( 'C' ), ctl_addr ) return end ctl_info[ :ctl_addr ] = ctl_addr else im, ctl_info = @ctl_infos.find{ | _, info | info[ :ctl_addr ] == ctl_addr } unless im then puts "#{ Time.new } got a new source but unknown ctl addr" send_ctlmsg( ctld, [ UNKNOWN_CTL_ADDR ].pack( 'C' ), ctl_addr ) return end end ctl_info[ :ctld ] = ctld dst_info = @dst_infos.values.find{ | info | info[ :src_id ] == src_id } if dst_info then # puts "debug dst info exist, send ctlmsg paired #{ src_id } #{ dst_info[ :dst_id ] }" data2 = [ PAIRED, src_id, dst_info[ :dst_id ] ].pack( 'CQ>n' ) send_ctlmsg( ctld, data2, ctl_addr ) return end # puts "debug got a new source #{ src_id } #{ domain_port.inspect } #{ im.inspect }" resolve_domain_port( domain_port, src_id, im, ctl_info[ :ctld ], ctl_info[ :ctl_addr ] ) when CTL_FIN then im, _ = @ctl_infos.find{ | _, info | info[ :ctl_addr ] == ctl_addr } return unless im # puts "debug got ctl fin #{ im.inspect }" close_dsts( im ) ctl_info = @ctl_infos.delete( im ) ctl_info[ :tunds ].each{ | tund | close_tund( tund ) } when SOURCE_CLOSED then return if data.bytesize <= 9 src_id = data[ 1, 8 ].unpack( 'Q>' ).first im = data[ 9..-1 ] return unless @ctl_infos.include?( im ) # puts "debug got src closed #{ src_id } #{ im.inspect }" dst, _ = @dst_infos.find{ | _, info | info[ :src_id ] == src_id } close_dst( dst ) when SOURCE_CLOSED_READ then return if data.bytesize <= 9 src_id = data[ 1, 8 ].unpack( 'Q>' ).first im = data[ 9..-1 ] return unless @ctl_infos.include?( im ) # puts "debug got src closed read #{ src_id } #{ im.inspect }" dst, _ = @dst_infos.find{ | _, info | info[ :src_id ] == src_id } set_dst_closing_write( dst ) when SOURCE_CLOSED_WRITE then return if data.bytesize <= 9 src_id = data[ 1, 8 ].unpack( 'Q>' ).first im = data[ 9..-1 ] return unless @ctl_infos.include?( im ) # puts "debug got src closed write #{ src_id } #{ im.inspect }" dst, _ = @dst_infos.find{ | _, info | info[ :src_id ] == src_id } close_read_dst( dst ) end end
read_dns( dns )
click to toggle source
read dns
# File lib/girl/proxyd_worker.rb, line 646 def read_dns( dns ) if dns.closed? then puts "#{ Time.new } read dns but dns closed?" return end begin data, addrinfo, rflags, *controls = dns.recvmsg rescue Exception => e puts "#{ Time.new } dns recvmsg #{ e.class }" close_dns( dns ) return end # puts "debug recv dns #{ data.inspect }" dns_info = @dns_infos[ dns ] im = dns_info[ :im ] ctl_info = @ctl_infos[ im ] unless ctl_info then puts "#{ Time.new } read dns but ctl already closed #{ im.inspect }" close_dns( dns ) return end begin packet = Net::DNS::Packet::parse( data ) rescue Exception => e puts "#{ Time.new } parse packet #{ e.class }" close_dns( dns ) return end ans = packet.answer.find{ | ans | ans.class == Net::DNS::RR::A } if ans then domain = dns_info[ :domain ] ipaddr = IPAddr.new( ans.value ) @resolv_caches[ domain ] = [ ipaddr, Time.new ] src_id = dns_info[ :src_id ] port = dns_info[ :port ] new_a_dst( ipaddr, domain, port, src_id, im, ctl_info[ :ctld ], ctl_info[ :ctl_addr ] ) end close_dns( dns ) end
read_dotr( dotr )
click to toggle source
read dotr
# File lib/girl/proxyd_worker.rb, line 636 def read_dotr( dotr ) dotr.read_nonblock( READ_SIZE ) @dns_infos.select{ | _, info | info[ :closing ] }.keys.each{ | dns | close_dns( dns ) } @dst_infos.select{ | _, info | info[ :closing ] }.keys.each{ | dst | close_dst( dst ) } @tun_infos.select{ | _, info | info[ :closing ] }.keys.each{ | tun | close_tun( tun ) } end
read_dst( dst )
click to toggle source
read dst
# File lib/girl/proxyd_worker.rb, line 846 def read_dst( dst ) if dst.closed? then puts "#{ Time.new } read dst but dst closed?" return end dst_info = @dst_infos[ dst ] tun = dst_info[ :tun ] begin data = dst.read_nonblock( CHUNK_SIZE ) rescue Exception => e # puts "debug read dst #{ e.class }" close_read_dst( dst ) set_tun_closing_write( tun ) return end @traffs[ dst_info[ :im ] ][ :in ] += data.bytesize if tun && !tun.closed? then add_tun_wbuff( tun, pack_a_chunk( data ) ) else # puts "debug add dst rbuff #{ data.bytesize }" add_dst_rbuff( dst, data ) end end
read_infod( infod )
click to toggle source
read infod
# File lib/girl/proxyd_worker.rb, line 822 def read_infod( infod ) data, addrinfo, rflags, *controls = infod.recvmsg ctl_num = data[ 0 ].unpack( 'C' ).first # puts "debug infod got #{ ctl_num } #{ addrinfo.ip_unpack.inspect }" case ctl_num when TRAFF_INFOS then data2 = [ TRAFF_INFOS ].pack( 'C' ) @traffs.sort.each do | im, info | data2 << [ [ im.bytesize ].pack( 'C' ), im, [ info[ :in ], info[ :out ] ].pack( 'Q>Q>' ) ].join end begin infod.sendmsg_nonblock( data2, 0, addrinfo ) rescue Exception => e puts "#{ Time.new } infod sendmsg to #{ e.class } #{ addrinfo.ip_unpack.inspect }" end end end
read_tun( tun )
click to toggle source
read tun
# File lib/girl/proxyd_worker.rb, line 913 def read_tun( tun ) if tun.closed? then puts "#{ Time.new } read tun but tun closed?" return end tun_info = @tun_infos[ tun ] begin data = tun.read_nonblock( READ_SIZE ) rescue Exception => e # puts "debug read tun #{ e.class }" close_read_tun( tun ) return end # # debug # # let ping timeout # unless @debug_let_ping_timeout then # sleep PING_TIMEOUT + 1 # @debug_let_ping_timeout = true # return # end # # # debug # # let ping out of limit # sleep PING_TIMEOUT + 1 dst = tun_info[ :dst ] unless dst then if data.bytesize < 2 then puts "#{ Time.new } tun ping less than 2?" close_tun( tun ) return end dst_id = data[ 0, 2 ].unpack( 'n' ).first dst, dst_info = @dst_infos.find{ | _, info | info[ :dst_id ] == dst_id } unless dst then # puts "debug dst not found #{ dst_id }" close_tun( tun ) return end tun_info[ :dst ] = dst tun_info[ :domain ] = dst_info[ :domain ] set_dst_info_tun( dst_info, tun ) data = data[ 2..-1 ] if data.empty? then return end end data = "#{ tun_info[ :rbuff ] }#{ data }" loop do if data.bytesize <= 2 then tun_info[ :rbuff ] = data break end len = data[ 0, 2 ].unpack( 'n' ).first if len == 0 then puts "#{ Time.new } zero traffic len?" close_tun( tun ) close_dst( dst ) return end chunk = data[ 2, len ] if chunk.bytesize < len then tun_info[ :rbuff ] = data break end chunk = @custom.decode( chunk ) add_dst_wbuff( dst, chunk ) data = data[ ( 2 + len )..-1 ] end end
read_tund( tund )
click to toggle source
read tund
# File lib/girl/proxyd_worker.rb, line 877 def read_tund( tund ) if tund.closed? then puts "#{ Time.new } read tund but tund closed?" return end tund_info = @tund_infos[ tund ] begin tun, _ = tund.accept_nonblock rescue Exception => e puts "#{ Time.new } tund accept #{ e.class } #{ tund_info[ :im ].inspect }" close_dsts( tund_info[ :im ] ) return end # puts "debug accept a tun" @tun_infos[ tun ] = { im: tund_info[ :im ], # 标识 dst: nil, # 对应dst domain: nil, # 目的地 rbuff: '', # 暂存不满一块的流量 wbuff: '', # 写前 created_at: Time.new, # 创建时间 last_add_wbuff_at: nil, # 上一次加写前的时间 closing: false, # 准备关闭 paused: false # 是否暂停 } add_read( tun, :tun ) end
resolve_domain_port( domain_port, src_id, im, ctld, ctl_addr )
click to toggle source
resolve domain port
# File lib/girl/proxyd_worker.rb, line 506 def resolve_domain_port( domain_port, src_id, im, ctld, ctl_addr ) colon_idx = domain_port.rindex( ':' ) return unless colon_idx domain = domain_port[ 0...colon_idx ] port = domain_port[ ( colon_idx + 1 )..-1 ].to_i resolv_cache = @resolv_caches[ domain ] if resolv_cache then ipaddr, created_at = resolv_cache if Time.new - created_at < RESOLV_CACHE_EXPIRE then # puts "debug #{ domain.inspect } hit resolv cache #{ ipaddr.to_s }" new_a_dst( ipaddr, domain, port, src_id, im, ctld, ctl_addr ) return end # puts "debug expire #{ domain.inspect } resolv cache" @resolv_caches.delete( domain ) end begin ipaddr = IPAddr.new( domain ) if ipaddr.ipv4? || ipaddr.ipv6? then new_a_dst( ipaddr, domain, port, src_id, im, ctld, ctl_addr ) return end rescue Exception => e end begin packet = Net::DNS::Packet.new( domain ) rescue Exception => e puts "#{ Time.new } new packet #{ e.class } #{ im.inspect } #{ domain.inspect }" return end dns = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 ) begin # puts "debug dns query #{ domain.inspect }" dns.sendmsg_nonblock( packet.data, 0, @nameserver_addr ) rescue Exception => e puts "#{ Time.new } dns sendmsg #{ e.class } #{ im.inspect } #{ domain.inspect }" dns.close return end @dns_infos[ dns ] = { im: im, src_id: src_id, domain: domain, port: port, created_at: Time.new, closing: false } add_read( dns, :dns ) end
send_ctlmsg( ctld, data, to_addr )
click to toggle source
send ctlmsg
# File lib/girl/proxyd_worker.rb, line 570 def send_ctlmsg( ctld, data, to_addr ) return if ctld.nil? || ctld.closed? data = @custom.encode( data ) begin ctld.sendmsg_nonblock( data, 0, to_addr ) rescue Exception => e puts "#{ Time.new } ctld sendmsg #{ e.class }" end end
send_tund_ports( ctl_info )
click to toggle source
send tund ports
# File lib/girl/proxyd_worker.rb, line 584 def send_tund_ports( ctl_info ) tund_ports = ctl_info[ :tunds ].map{ | tund | tund.local_address.ip_port } puts "#{ Time.new } send tund ports #{ tund_ports.inspect }" data = [ TUND_PORTS, *tund_ports ].pack( 'Cn*' ) send_ctlmsg( ctl_info[ :ctld ], data, ctl_info[ :ctl_addr ] ) end
set_dst_closing_write( dst )
click to toggle source
set dst closing write
# File lib/girl/proxyd_worker.rb, line 594 def set_dst_closing_write( dst ) return if dst.nil? || dst.closed? dst_info = @dst_infos[ dst ] return if dst_info[ :closing ] || dst_info[ :closing_write ] # puts "debug set dst closing write" dst_info[ :closing_write ] = true add_write( dst ) end
set_dst_info_tun( dst_info, tun )
click to toggle source
set dst info tun
# File lib/girl/proxyd_worker.rb, line 606 def set_dst_info_tun( dst_info, tun ) dst_info[ :tun ] = tun # puts "debug add pong #{ dst_info[ :src_id ] }" data = [ dst_info[ :src_id ] ].pack( 'Q>' ) until dst_info[ :rbuff ].empty? do chunk_data = dst_info[ :rbuff ][ 0, CHUNK_SIZE ] # puts "debug move dst rbuff to tun wbuff #{ chunk_data.bytesize }" data << pack_a_chunk( chunk_data ) dst_info[ :rbuff ] = dst_info[ :rbuff ][ chunk_data.bytesize..-1 ] end add_tun_wbuff( tun, data ) end
set_tun_closing_write( tun )
click to toggle source
set tun closing write
# File lib/girl/proxyd_worker.rb, line 624 def set_tun_closing_write( tun ) return if tun.nil? || tun.closed? tun_info = @tun_infos[ tun ] return if tun_info[ :closing ] || tun_info[ :closing_write ] # puts "debug set tun closing write" tun_info[ :closing_write ] = true add_write( tun ) end
write_dst( dst )
click to toggle source
write dst
# File lib/girl/proxyd_worker.rb, line 1002 def write_dst( dst ) if dst.closed? then puts "#{ Time.new } write dst but dst closed?" return end dst_info = @dst_infos[ dst ] dst_info[ :connected ] = true tun = dst_info[ :tun ] data = dst_info[ :wbuff ] # 写前为空,处理关闭写 if data.empty? then if dst_info[ :closing_write ] then close_write_dst( dst ) else @writes.delete( dst ) end return end # 写入 begin written = dst.write_nonblock( data ) rescue Exception => e # puts "debug write dst #{ e.class }" close_write_dst( dst ) close_read_tun( tun ) return end data = data[ written..-1 ] dst_info[ :wbuff ] = data @traffs[ dst_info[ :im ] ][ :out ] += written end
write_tun( tun )
click to toggle source
write tun
# File lib/girl/proxyd_worker.rb, line 1042 def write_tun( tun ) if tun.closed? then puts "#{ Time.new } write tun but tun closed?" return end tun_info = @tun_infos[ tun ] dst = tun_info[ :dst ] data = tun_info[ :wbuff ] # 写前为空,处理关闭写 if data.empty? then if tun_info[ :closing_write ] then close_write_tun( tun ) else @writes.delete( tun ) end return end # 写入 begin written = tun.write_nonblock( data ) rescue Exception => e # puts "debug write tun #{ e.class }" close_tun( tun ) close_read_dst( dst ) return end data = data[ written..-1 ] tun_info[ :wbuff ] = data @traffs[ tun_info[ :im ] ][ :out ] += written end