class Fluent::Plugin::TcpInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_tcp.rb, line 60
def configure(conf)
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, "<parse> section is required."
  end
  super
  @_event_loop_blocking_timeout = @blocking_timeout
  @source_hostname_key ||= @source_host_key if @source_host_key

  @nodes = nil
  if @security
    @nodes = []
    @security.clients.each do |client|
      if client.host && client.network
        raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
      end
      if !client.host && !client.network
        raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
      end
      source = nil
      if client.host
        begin
          source = IPSocket.getaddress(client.host)
        rescue SocketError
          raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
        end
      end
      source_addr = begin
                      IPAddr.new(source || client.network)
                    rescue ArgumentError
                      raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                    end
      @nodes.push(source_addr)
    end
  end

  @parser = parser_create(conf: parser_config)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_tcp.rb, line 100
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_tcp.rb, line 104
def start
  super

  log.info "listening tcp socket", bind: @bind, port: @port
  del_size = @delimiter.length
  discard_till_next_delimiter = false
  if @_extract_enabled && @_extract_tag_key
    server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
      unless check_client(conn)
        conn.close
        next
      end

      conn.buffer << data
      buf = conn.buffer
      pos = 0
      while i = buf.index(@delimiter, pos)
        msg = buf[pos...i]
        pos = i + del_size

        if discard_till_next_delimiter
          discard_till_next_delimiter = false
          next
        end

        if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
          log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
          next
        end

        @parser.parse(msg) do |time, record|
          unless time && record
            log.warn "pattern not matched", message: msg
            next
          end

          tag = extract_tag_from_record(record)
          tag ||= @tag
          time ||= extract_time_from_record(record) || Fluent::EventTime.now
          record[@source_address_key] = conn.remote_addr if @source_address_key
          record[@source_hostname_key] = conn.remote_host if @source_hostname_key
          router.emit(tag, time, record)
        end
      end
      buf.slice!(0, pos) if pos > 0
      # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
      # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
      if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
        log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
        buf.clear
        # We should discard the subsequent data until the next delimiter comes.
        discard_till_next_delimiter = true
        next
      end
    end
  else
    server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
      unless check_client(conn)
        conn.close
        next
      end

      conn.buffer << data
      buf = conn.buffer
      pos = 0
      es = Fluent::MultiEventStream.new
      while i = buf.index(@delimiter, pos)
        msg = buf[pos...i]
        pos = i + del_size

        if discard_till_next_delimiter
          discard_till_next_delimiter = false
          next
        end

        if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
          log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
          next
        end

        @parser.parse(msg) do |time, record|
          unless time && record
            log.warn "pattern not matched", message: msg
            next
          end

          time ||= extract_time_from_record(record) || Fluent::EventTime.now
          record[@source_address_key] = conn.remote_addr if @source_address_key
          record[@source_hostname_key] = conn.remote_host if @source_hostname_key
          es.add(time, record)
        end
      end
      router.emit_stream(@tag, es)
      buf.slice!(0, pos) if pos > 0
      # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
      # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
      if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
        log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
        buf.clear
        # We should discard the subsequent data until the next delimiter comes.
        discard_till_next_delimiter = true
        next
      end
    end
  end
end

Private Instance Methods

check_client(conn) click to toggle source
# File lib/fluent/plugin/in_tcp.rb, line 213
def check_client(conn)
  if @nodes
    remote_addr = conn.remote_addr
    node = @nodes.find { |n| n.include?(remote_addr) rescue false }
    unless node
      log.warn "anonymous client '#{remote_addr}' denied"
      return false
    end
  end

  true
end