class Fluent::Plugin::TailInput::TailWatcher::FIFO
Attributes
buffer[R]
encoding[R]
from_encoding[R]
max_line_size[R]
Public Class Methods
new(from_encoding, encoding, log, max_line_size=nil)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1009 def initialize(from_encoding, encoding, log, max_line_size=nil) @from_encoding = from_encoding @encoding = encoding @need_enc = from_encoding != encoding @buffer = ''.force_encoding(from_encoding) @eol = "\n".encode(from_encoding).freeze @max_line_size = max_line_size @skip_current_line = false @skipping_current_line_bytesize = 0 @log = log end
Public Instance Methods
<<(chunk)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1023 def <<(chunk) # Although "chunk" is most likely transient besides String#force_encoding itself # won't affect the actual content of it, it is also probable that "chunk" is # a reused buffer and changing its encoding causes some problems on the caller side. # # Actually, the caller here is specific and "chunk" comes from IO#partial with # the second argument, which the function always returns as a return value. # # Feeding a string that has its encoding attribute set to any double-byte or # quad-byte encoding to IO#readpartial as the second arguments results in an # assertion failure on Ruby < 2.4.0 for unknown reasons. orig_encoding = chunk.encoding chunk.force_encoding(from_encoding) @buffer << chunk # Thus the encoding needs to be reverted back here chunk.force_encoding(orig_encoding) end
convert(s)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1041 def convert(s) if @need_enc s.encode!(@encoding, @from_encoding) else s end rescue s.encode!(@encoding, @from_encoding, :invalid => :replace, :undef => :replace) end
read_lines(lines)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1051 def read_lines(lines) idx = @buffer.index(@eol) has_skipped_line = false until idx.nil? # Using freeze and slice is faster than slice! # See https://github.com/fluent/fluentd/pull/2527 @buffer.freeze rbuf = @buffer.slice(0, idx + 1) @buffer = @buffer.slice(idx + 1, @buffer.size) idx = @buffer.index(@eol) is_long_line = @max_line_size && ( @skip_current_line || rbuf.bytesize > @max_line_size ) if is_long_line @log.warn "received line length is longer than #{@max_line_size}" if @skip_current_line @log.debug("The continuing line is finished. Finally discarded data: ") { convert(rbuf).chomp } else @log.debug("skipped line: ") { convert(rbuf).chomp } end has_skipped_line = true @skip_current_line = false @skipping_current_line_bytesize = 0 next end lines << convert(rbuf) end is_long_current_line = @max_line_size && ( @skip_current_line || @buffer.bytesize > @max_line_size ) if is_long_current_line @log.debug( "The continuing current line length is longer than #{@max_line_size}." + " The received data will be discarded until this line is finished." + " Discarded data: " ) { convert(@buffer).chomp } @skip_current_line = true @skipping_current_line_bytesize += @buffer.bytesize @buffer.clear end return has_skipped_line end
reading_bytesize()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1101 def reading_bytesize return @skipping_current_line_bytesize if @skip_current_line @buffer.bytesize end