class LogStash::Inputs::Tcp
Read events over a TCP socket.
Like stdin and file inputs, each event is assumed to be one line of text.
Can either accept connections from clients or connect to a server, depending on `mode`.
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
LogStash::Inputs::Base::new
# File lib/logstash/inputs/tcp.rb, line 57 def initialize(*args) super(*args) end
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 62 def register require "socket" require "timeout" require "openssl" if @ssl_enable @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) if @ssl_verify @cert_store = OpenSSL::X509::Store.new # Load the system default certificate path to the store @cert_store.set_default_paths if File.directory?(@ssl_cacert) @cert_store.add_path(@ssl_cacert) else @cert_store.add_file(@ssl_cacert) end @ssl_context.cert_store = @cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end end # @ssl_enable if server? @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") begin @server_socket = TCPServer.new(@host, @port) rescue Errno::EADDRINUSE @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) raise end if @ssl_enable @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) end # @ssl_enable end end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 149 def run(output_queue) if server? run_server(output_queue) else run_client(output_queue) end end
run_client(output_queue)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 200 def run_client(output_queue) @thread = Thread.current while true client_socket = TCPSocket.new(@host, @port) if @ssl_enable client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context) begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) next end end client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Opened connection", :client => "#{client_socket.peer}") handle_socket(client_socket, client_socket.peer, output_queue, @codec.clone) end # loop ensure client_socket.close end # def run public def teardown if server? @interrupted = true @thread.raise(LogStash::ShutdownSignal) end end # def teardown end
run_server(output_queue)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 157 def run_server(output_queue) @thread = Thread.current @client_threads = [] loop do # Start a new thread for each connection. begin @client_threads << Thread.start(@server_socket.accept) do |s| # TODO(sissel): put this block in its own method. # monkeypatch a 'peer' method onto the socket. s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") begin handle_socket(s, s.peer, output_queue, @codec.clone) rescue Interrupted s.close rescue nil end end # Thread.start rescue OpenSSL::SSL::SSLError => ssle # NOTE(mrichar1): This doesn't return a useful error message for some reason @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) rescue IOError, LogStash::ShutdownSignal if @interrupted # Intended shutdown, get out of the loop @server_socket.close @client_threads.each do |thread| thread.raise(LogStash::ShutdownSignal) end break else # Else it was a genuine IOError caused by something else, so propagate it up.. raise end end end # loop rescue LogStash::ShutdownSignal # nothing to do ensure @server_socket.close end # def run_server def run_client(output_queue) @thread = Thread.current while true client_socket = TCPSocket.new(@host, @port) if @ssl_enable client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context) begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) next end end client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Opened connection", :client => "#{client_socket.peer}") handle_socket(client_socket, client_socket.peer, output_queue, @codec.clone) end # loop ensure client_socket.close end # def run public def teardown if server? @interrupted = true @thread.raise(LogStash::ShutdownSignal) end end # def teardown end
teardown()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 225 def teardown if server? @interrupted = true @thread.raise(LogStash::ShutdownSignal) end end
Private Instance Methods
handle_socket(socket, client_address, output_queue, codec)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 100 def handle_socket(socket, client_address, output_queue, codec) while true buf = nil # NOTE(petef): the timeout only hits after the line is read # or socket dies # TODO(sissel): Why do we have a timeout here? What's the point? if @data_timeout == -1 buf = read(socket) else Timeout::timeout(@data_timeout) do buf = read(socket) end end codec.decode(buf) do |event| decorate(event) event["host"] = client_address event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify output_queue << event end end # loop do rescue => e codec.respond_to?(:flush) && codec.flush do |event| decorate(event) event["host"] = client_address event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify output_queue << event end @logger.debug("An error occurred. Closing connection", :client => socket.peer, :exception => e) ensure begin socket.close rescue IOError #pass end # begin end
read(socket)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 144 def read(socket) return socket.sysread(16384) end
server?()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 139 def server? @mode == "server" end