class Fluent::Plugin::TailInput
Constants
- Entry
- MetricsInfo
- RESERVED_CHARS
- TargetInfo
Attributes
Public Class Methods
Fluent::Plugin::TailInput::GroupWatch::new
# File lib/fluent/plugin/in_tail.rb, line 51 def initialize super @paths = [] @tails = {} @tails_rotate_wait = {} @pf_file = nil @pf = nil @ignore_list = [] @shutdown_start_time = nil @metrics = nil @startup = true end
Public Instance Methods
Fluent::Plugin::Base#close
# File lib/fluent/plugin/in_tail.rb, line 290 def close super # close file handles after all threads stopped (in #close of thread plugin helper) # It may be because we need to wait IOHanlder.ready_to_shutdown() close_watcher_handles end
# File lib/fluent/plugin/in_tail.rb, line 550 def close_watcher_handles @tails.keys.each do |path| tw = @tails.delete(path) if tw tw.close end end @tails_rotate_wait.keys.each do |tw| tw.close end end
Fluent::Plugin::TailInput::GroupWatch#configure
# File lib/fluent/plugin/in_tail.rb, line 126 def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first unless parser_config raise Fluent::ConfigError, "<parse> section is required." end (1..Fluent::Plugin::MultilineParser::FORMAT_MAX_NUM).each do |n| parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"] end parser_config['unmatched_lines'] = conf['emit_unmatched_lines'] super if !@enable_watch_timer && !@enable_stat_watcher raise Fluent::ConfigError, "either of enable_watch_timer or enable_stat_watcher must be true" end if @glob_policy == :always && @path_delimiter == ',' raise Fluent::ConfigError, "cannot use glob_policy as always with the default path_delimitor: `,\"" end if @glob_policy == :extended && /\{.*,.*\}/.match(@path) && extended_glob_pattern(@path) raise Fluent::ConfigError, "cannot include curly braces with glob patterns in `#{@path}\". Use glob_policy always instead." end if RESERVED_CHARS.include?(@path_delimiter) rc = RESERVED_CHARS.join(', ') raise Fluent::ConfigError, "#{rc} are reserved words: #{@path_delimiter}" end @paths = @path.split(@path_delimiter).map(&:strip).uniq if @paths.empty? raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input" end if @path_timezone Fluent::Timezone.validate!(@path_timezone) @path_formatters = @paths.map{|path| [path, Fluent::Timezone.formatter(@path_timezone, path)]}.to_h @exclude_path_formatters = @exclude_path.map{|path| [path, Fluent::Timezone.formatter(@path_timezone, path)]}.to_h end # TODO: Use plugin_root_dir and storage plugin to store positions if available if @pos_file if @variable_store.key?(@pos_file) && !called_in_test? plugin_id_using_this_path = @variable_store[@pos_file] raise Fluent::ConfigError, "Other 'in_tail' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end @variable_store[@pos_file] = self.plugin_id else if @follow_inodes raise Fluent::ConfigError, "Can't follow inodes without pos_file configuration parameter" end $log.warn "'pos_file PATH' parameter is not set to a 'tail' source." $log.warn "this parameter is highly recommended to save the position to resume tailing." end configure_tag configure_encoding @multiline_mode = parser_config["@type"] =~ /multiline/ @receive_handler = if @multiline_mode method(:parse_multilines) else method(:parse_singleline) end @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION # parser is already created by parser helper @parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage) @capability = Fluent::Capability.new(:current_process) if @read_bytes_limit_per_second > 0 if !@enable_watch_timer raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature" end min_bytes = TailWatcher::IOHandler::BYTES_TO_READ if @read_bytes_limit_per_second < min_bytes log.warn "Should specify greater equal than #{min_bytes}. Use #{min_bytes} for read_bytes_limit_per_second" @read_bytes_limit_per_second = min_bytes end end opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files") throttling_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_throttled_total", help_text: "Total number of times throttling occurs per file when throttling enabled") @metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics, throttling_metrics) end
# File lib/fluent/plugin/in_tail.rb, line 226 def configure_encoding unless @encoding if @from_encoding raise Fluent::ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." end end @encoding = parse_encoding_param(@encoding) if @encoding @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding if @encoding && (@encoding == @from_encoding) log.warn "'encoding' and 'from_encoding' are same encoding. No effect" end end
# File lib/fluent/plugin/in_tail.rb, line 215 def configure_tag if @tag.index('*') @tag_prefix, @tag_suffix = @tag.split('*') @tag_prefix ||= '' @tag_suffix ||= '' else @tag_prefix = nil @tag_suffix = nil end end
# File lib/fluent/plugin/in_tail.rb, line 495 def construct_watcher(target_info) path = target_info.path # The file might be rotated or removed after collecting paths, so check inode again here. begin target_info.ino = Fluent::FileWrapper.stat(path).ino rescue Errno::ENOENT, Errno::EACCES $log.warn "stat() for #{path} failed. Continuing without tailing it." return end pe = nil if @pf pe = @pf[target_info] pe.update(target_info.ino, 0) if @read_from_head && pe.read_inode.zero? end begin tw = setup_watcher(target_info, pe) rescue WatcherSetupError => e log.warn "Skip #{path} because unexpected setup error happens: #{e}" return end @tails[path] = tw tw.on_notify end
# File lib/fluent/plugin/in_tail.rb, line 720 def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n @parser.parse(line) { |time, record| if time && record record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else if @emit_unmatched_lines record = {'unmatched_line' => line} record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(Fluent::EventTime.now, record) end log.warn "pattern not matched: #{line.inspect}" end } rescue => e log.warn 'invalid line found', file: tail_watcher.path, line: line, error: e.to_s log.debug_backtrace(e.backtrace) end end
# File lib/fluent/plugin/in_tail.rb, line 614 def detach_watcher(tw, ino, close_io = true) if @follow_inodes && tw.ino != ino log.warn("detach_watcher could be detaching an unexpected tail_watcher with a different ino.", path: tw.path, actual_ino_in_tw: tw.ino, expect_ino_to_close: ino) end tw.watchers.each do |watcher| event_loop_detach(watcher) end tw.detach(@shutdown_start_time) tw.close if close_io if @pf && tw.unwatched && (@follow_inode || !@tails[tw.path]) target_info = TargetInfo.new(tw.path, ino) @pf.unwatch(target_info) end end
# File lib/fluent/plugin/in_tail.rb, line 638 def detach_watcher_after_rotate_wait(tw, ino) # Call event_loop_attach/event_loop_detach is high-cost for short-live object. # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) end return if @tails_rotate_wait[tw] if throttling_is_enabled?(tw) # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do elapsed = Fluent::Clock.now - start_time_to_wait if tw.eof? && elapsed >= @rotate_wait timer.detach @tails_rotate_wait.delete(tw) detach_watcher(tw, ino) end end @tails_rotate_wait[tw] = { ino: ino, timer: timer } else # when the throttling feature isn't enabled, just wait @rotate_wait timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do @tails_rotate_wait.delete(tw) detach_watcher(tw, ino) end @tails_rotate_wait[tw] = { ino: ino, timer: timer } end end
# File lib/fluent/plugin/in_tail.rb, line 391 def existence_path hash = {} @tails.each {|path, tw| if @follow_inodes hash[tw.ino] = TargetInfo.new(tw.path, tw.ino) else hash[tw.path] = TargetInfo.new(tw.path, tw.ino) end } hash end
# File lib/fluent/plugin/in_tail.rb, line 324 def expand_paths date = Fluent::EventTime.now paths = [] @paths.each { |path| path = if @path_timezone @path_formatters[path].call(date) else date.to_time.strftime(path) end if use_glob?(path) paths += Dir.glob(path).select { |p| begin is_file = !File.directory?(p) if (File.readable?(p) || have_read_capability?) && is_file if @limit_recently_modified && File.mtime(p) < (date.to_time - @limit_recently_modified) false else true end else if is_file unless @ignore_list.include?(p) log.warn "#{p} unreadable. It is excluded and would be examined next time." @ignore_list << p if @ignore_repeated_permission_error end end false end rescue Errno::ENOENT, Errno::EACCES log.debug("#{p} is missing after refresh file list") false end } else # When file is not created yet, Dir.glob returns an empty array. So just add when path is static. paths << path end } excluded = @exclude_path.map { |path| path = if @path_timezone @exclude_path_formatters[path].call(date) else date.to_time.strftime(path) end use_glob?(path) ? Dir.glob(path) : path }.flatten.uniq # filter out non existing files, so in case pattern is without '*' we don't do unnecessary work hash = {} (paths - excluded).select { |path| FileTest.exist?(path) }.each { |path| # Even we just checked for existence, there is a race condition here as # of which stat() might fail with ENOENT. See #3224. begin target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino) if @follow_inodes hash[target_info.ino] = target_info else hash[target_info.path] = target_info end rescue Errno::ENOENT, Errno::EACCES => e $log.warn "expand_paths: stat() for #{path} failed with #{e.class.name}. Skip file." end } hash end
# File lib/fluent/plugin/in_tail.rb, line 302 def extended_glob_pattern(path) path.include?('*') || path.include?('?') || /\[.*\]/.match(path) end
# File lib/fluent/plugin/in_tail.rb, line 671 def flush_buffer(tw, buf) buf.chomp! @parser.parse(buf) { |time, record| if time && record tag = if @tag_prefix || @tag_suffix @tag_prefix + tw.tag + @tag_suffix else @tag end record[@path_key] ||= tw.path unless @path_key.nil? router.emit(tag, time, record) else if @emit_unmatched_lines record = { 'unmatched_line' => buf } record[@path_key] ||= tail_watcher.path unless @path_key.nil? tag = if @tag_prefix || @tag_suffix @tag_prefix + tw.tag + @tag_suffix else @tag end router.emit(tag, Fluent::EventTime.now, record) end log.warn "got incomplete line at shutdown from #{tw.path}: #{buf.inspect}" end } end
# File lib/fluent/plugin/in_tail.rb, line 297 def have_read_capability? @capability.have_capability?(:effective, :dac_read_search) || @capability.have_capability?(:effective, :dac_override) end
# File lib/fluent/plugin/in_tail.rb, line 240 def parse_encoding_param(encoding_name) begin Encoding.find(encoding_name) if encoding_name rescue ArgumentError => e raise Fluent::ConfigError, e.message end end
No need to check if line_buffer_timer_flusher is nil, since line_buffer_timer_flusher should exist
# File lib/fluent/plugin/in_tail.rb, line 751 def parse_multilines(lines, tail_watcher) lb = tail_watcher.line_buffer_timer_flusher.line_buffer es = Fluent::MultiEventStream.new if @parser.has_firstline? tail_watcher.line_buffer_timer_flusher.reset_timer lines.each { |line| if @parser.firstline?(line) if lb convert_line_to_event(lb, es, tail_watcher) end lb = line else if lb.nil? if @emit_unmatched_lines convert_line_to_event(line, es, tail_watcher) end log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}" else lb << line end end } else lb ||= '' lines.each do |line| lb << line @parser.parse(lb) { |time, record| if time && record convert_line_to_event(lb, es, tail_watcher) lb = '' end } end end tail_watcher.line_buffer_timer_flusher.line_buffer = lb es end
# File lib/fluent/plugin/in_tail.rb, line 742 def parse_singleline(lines, tail_watcher) es = Fluent::MultiEventStream.new lines.each { |line| convert_line_to_event(line, es, tail_watcher) } es end
@return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
# File lib/fluent/plugin/in_tail.rb, line 699 def receive_lines(lines, tail_watcher) es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix @tag_prefix + tail_watcher.tag + @tag_suffix else @tag end begin router.emit_stream(tag, es) rescue Fluent::Plugin::Buffer::BufferOverflowError return false rescue # ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces. return true end end return true end
in_tail with ‘*’ path doesn’t check rotation file equality at refresh phase. So you should not use ‘*’ path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
# File lib/fluent/plugin/in_tail.rb, line 408 def refresh_watchers target_paths_hash = expand_paths existence_paths_hash = existence_path log.debug { target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",") existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",") "tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}" } if !@follow_inodes need_unwatch_in_stop_watchers = true else # When using @follow_inodes, need this to unwatch the rotated old inode when it disappears. # After `update_watcher` detaches an old TailWatcher, the inode is lost from the `@tails`. # So that inode can't be contained in `removed_hash`, and can't be unwatched by `stop_watchers`. # # This logic may work for `@follow_inodes false` too. # Just limiting the case to suppress the impact to existing logics. @pf&.unwatch_removed_targets(target_paths_hash) need_unwatch_in_stop_watchers = false end removed_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)} added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)} # If an exisiting TailWatcher already follows a target path with the different inode, # it means that the TailWatcher following the rotated file still exists. In this case, # `refresh_watcher` can't start the new TailWatcher for the new current file. So, we # should output a warning log in order to prevent silent collection stops. # (Such as https://github.com/fluent/fluentd/pull/4327) # (Usually, such a TailWatcher should be removed from `@tails` in `update_watcher`.) # (The similar warning may work for `@follow_inodes true` too. Just limiting the case # to suppress the impact to existing logics.) unless @follow_inodes target_paths_hash.each do |path, target| next unless @tails.key?(path) # We can't use `existence_paths_hash[path].ino` because it is from `TailWatcher.ino`, # which is very unstable parameter. (It can be `nil` or old). # So, we need to use `TailWatcher.pe.read_inode`. existing_watcher_inode = @tails[path].pe.read_inode if existing_watcher_inode != target.ino log.warn "Could not follow a file (inode: #{target.ino}) because an existing watcher for that filepath follows a different inode: #{existing_watcher_inode} (e.g. keeps watching a already rotated file). If you keep getting this message, please restart Fluentd.", filepath: target.path end end end stop_watchers(removed_hash, unwatched: need_unwatch_in_stop_watchers) unless removed_hash.empty? start_watchers(added_hash) unless added_hash.empty? @startup = false if @startup end
# File lib/fluent/plugin/in_tail.rb, line 461 def setup_watcher(target_info, pe) line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil read_from_head = !@startup || @read_from_head tw = TailWatcher.new(target_info, pe, log, read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics) if @enable_watch_timer tt = TimerTrigger.new(1, log) { tw.on_notify } tw.register_watcher(tt) end if @enable_stat_watcher tt = StatWatcher.new(target_info.path, log) { tw.on_notify } tw.register_watcher(tt) end tw.watchers.each do |watcher| event_loop_attach(watcher) end tw.group_watcher = add_path_to_group_watcher(target_info.path) tw rescue => e if tw tw.watchers.each do |watcher| event_loop_detach(watcher) end tw.detach(@shutdown_start_time) tw.close end raise e end
Fluent::Plugin::Base#shutdown
# File lib/fluent/plugin/in_tail.rb, line 278 def shutdown @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) @tails_rotate_wait.keys.each do |tw| detach_watcher(tw, @tails_rotate_wait[tw][:ino], false) end @pf_file.close if @pf_file super end
Fluent::Plugin::Base#start
# File lib/fluent/plugin/in_tail.rb, line 248 def start super if @pos_file pos_file_dir = File.dirname(@pos_file) FileUtils.mkdir_p(pos_file_dir, mode: @dir_perm) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true @pf = PositionFile.load(@pf_file, @follow_inodes, expand_paths, logger: log) if @pos_file_compaction_interval timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do log.info('Clean up the pos file') @pf.try_compact end end end refresh_watchers unless @skip_refresh_on_startup timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) end
# File lib/fluent/plugin/in_tail.rb, line 523 def start_watchers(targets_info) targets_info.each_value {|target_info| construct_watcher(target_info) break if before_shutdown? } end
Fluent::Plugin::Input#statistics
# File lib/fluent/plugin/in_tail.rb, line 789 def statistics stats = super stats = { 'input' => stats["input"].merge({ 'opened_file_count' => @metrics.opened.get, 'closed_file_count' => @metrics.closed.get, 'rotated_file_count' => @metrics.rotated.get, 'throttled_log_count' => @metrics.throttled.get, }) } stats end
Fluent::PluginId#stop
# File lib/fluent/plugin/in_tail.rb, line 270 def stop if @variable_store @variable_store.delete(@pos_file) end super end
# File lib/fluent/plugin/in_tail.rb, line 530 def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| remove_path_from_group_watcher(target_info.path) if remove_watcher tw = @tails.delete(target_info.path) else tw = @tails[target_info.path] end if tw tw.unwatched = unwatched if immediate detach_watcher(tw, target_info.ino, false) else detach_watcher_after_rotate_wait(tw, target_info.ino) end end } end
# File lib/fluent/plugin/in_tail.rb, line 632 def throttling_is_enabled?(tw) return true if @read_bytes_limit_per_second > 0 return true if tw.group_watcher && tw.group_watcher.limit >= 0 false end
refresh_watchers
calls @tails.keys so we don’t use stop_watcher -> start_watcher sequence for safety.
# File lib/fluent/plugin/in_tail.rb, line 563 def update_watcher(tail_watcher, pe, new_inode) # TODO we should use another callback for this. # To supress impact to existing logics, limit the case to `@follow_inodes`. # We may not need `@follow_inodes` condition. if @follow_inodes && new_inode.nil? # nil inode means the file disappeared, so we only need to stop it. @tails.delete(tail_watcher.path) # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 # Because of this problem, log duplication can occur during `rotate_wait`. # Need to set `rotate_wait 0` for a workaround. # Duplication will occur if `refresh_watcher` is called during the `rotate_wait`. # In that case, `refresh_watcher` will add the new TailWatcher to tail the same target, # and it causes the log duplication. # (Other `detach_watcher_after_rotate_wait` may have the same problem. # We need the mechanism not to add duplicated TailWathcer with detaching TailWatcher.) detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode) return end path = tail_watcher.path log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds") if @pf pe_inode = pe.read_inode target_info_from_position_entry = TargetInfo.new(path, pe_inode) unless pe_inode == @pf[target_info_from_position_entry].read_inode log.warn "Skip update_watcher because watcher has been already updated by other inotify event", path: path, inode: pe.read_inode, inode_in_pos_file: @pf[target_info_from_position_entry].read_inode return end end new_target_info = TargetInfo.new(path, new_inode) if @follow_inodes new_position_entry = @pf[new_target_info] # If `refresh_watcher` find the new file before, this will not be zero. # In this case, only we have to do is detaching the current tail_watcher. if new_position_entry.read_inode == 0 @tails[path] = setup_watcher(new_target_info, new_position_entry) @tails[path].on_notify end else @tails[path] = setup_watcher(new_target_info, pe) @tails[path].on_notify end detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode) end
Curly braces is not supported with default path_delimiter because the default delimiter of path is “,”. This should be collided for wildcard pattern for curly braces and be handled as an error on configure
.
# File lib/fluent/plugin/in_tail.rb, line 310 def use_glob?(path) if @glob_policy == :always # For future extensions, we decided to use `always' term to handle # regular expressions as much as possible. # This is because not using `true' as a returning value # when choosing :always here. extended_glob_pattern(path) || /\{.*,.*\}/.match(path) elsif @glob_policy == :extended extended_glob_pattern(path) elsif @glob_policy == :backward_compatible path.include?('*') end end
Private Instance Methods
# File lib/fluent/plugin/in_tail.rb, line 805 def io_handler(watcher, path) TailWatcher::IOHandler.new( watcher, path: path, log: log, read_lines_limit: @read_lines_limit, read_bytes_limit_per_second: @read_bytes_limit_per_second, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding, metrics: @metrics, max_line_size: @max_line_size, &method(:receive_lines) ) end