class Fluent::Plugin::TailInput::TailWatcher::FIFO

Attributes

buffer[R]
encoding[R]
from_encoding[R]
max_line_size[R]

Public Class Methods

new(from_encoding, encoding, log, max_line_size=nil) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1009
def initialize(from_encoding, encoding, log, max_line_size=nil)
  @from_encoding = from_encoding
  @encoding = encoding
  @need_enc = from_encoding != encoding
  @buffer = ''.force_encoding(from_encoding)
  @eol = "\n".encode(from_encoding).freeze
  @max_line_size = max_line_size
  @skip_current_line = false
  @skipping_current_line_bytesize = 0
  @log = log
end

Public Instance Methods

<<(chunk) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1023
def <<(chunk)
  # Although "chunk" is most likely transient besides String#force_encoding itself
  # won't affect the actual content of it, it is also probable that "chunk" is
  # a reused buffer and changing its encoding causes some problems on the caller side.
  #
  # Actually, the caller here is specific and "chunk" comes from IO#partial with
  # the second argument, which the function always returns as a return value.
  #
  # Feeding a string that has its encoding attribute set to any double-byte or
  # quad-byte encoding to IO#readpartial as the second arguments results in an
  # assertion failure on Ruby < 2.4.0 for unknown reasons.
  orig_encoding = chunk.encoding
  chunk.force_encoding(from_encoding)
  @buffer << chunk
  # Thus the encoding needs to be reverted back here
  chunk.force_encoding(orig_encoding)
end
convert(s) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1041
def convert(s)
  if @need_enc
    s.encode!(@encoding, @from_encoding)
  else
    s
  end
rescue
  s.encode!(@encoding, @from_encoding, :invalid => :replace, :undef => :replace)
end
read_lines(lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1051
def read_lines(lines)
  idx = @buffer.index(@eol)
  has_skipped_line = false

  until idx.nil?
    # Using freeze and slice is faster than slice!
    # See https://github.com/fluent/fluentd/pull/2527
    @buffer.freeze
    rbuf = @buffer.slice(0, idx + 1)
    @buffer = @buffer.slice(idx + 1, @buffer.size)
    idx = @buffer.index(@eol)

    is_long_line = @max_line_size && (
      @skip_current_line || rbuf.bytesize > @max_line_size
    )

    if is_long_line
      @log.warn "received line length is longer than #{@max_line_size}"
      if @skip_current_line
        @log.debug("The continuing line is finished. Finally discarded data: ") { convert(rbuf).chomp }
      else
        @log.debug("skipped line: ") { convert(rbuf).chomp }
      end
      has_skipped_line = true
      @skip_current_line = false
      @skipping_current_line_bytesize = 0
      next
    end

    lines << convert(rbuf)
  end

  is_long_current_line = @max_line_size && (
    @skip_current_line || @buffer.bytesize > @max_line_size
  )

  if is_long_current_line
    @log.debug(
      "The continuing current line length is longer than #{@max_line_size}." +
      " The received data will be discarded until this line is finished." +
      " Discarded data: "
    ) { convert(@buffer).chomp }
    @skip_current_line = true
    @skipping_current_line_bytesize += @buffer.bytesize
    @buffer.clear
  end

  return has_skipped_line
end
reading_bytesize() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1101
def reading_bytesize
  return @skipping_current_line_bytesize if @skip_current_line
  @buffer.bytesize
end