class Fluent::Plugin::FileBuffer

Constants

DEFAULT_CHUNK_LIMIT_SIZE
DEFAULT_TOTAL_LIMIT_SIZE

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::new
# File lib/fluent/plugin/buf_file.rb, line 45
def initialize
  super
  @symlink_path = nil
  @multi_workers_available = false
  @additional_resume_path = nil
  @buffer_path = nil
  @variable_store = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Buffer#configure
# File lib/fluent/plugin/buf_file.rb, line 54
def configure(conf)
  super

  @variable_store = Fluent::VariableStore.fetch_or_build(:buf_file)

  multi_workers_configured = owner.system_config.workers > 1

  using_plugin_root_dir = false
  unless @path
    if root_dir = owner.plugin_root_dir
      @path = File.join(root_dir, 'buffer')
      using_plugin_root_dir = true # plugin_root_dir path contains worker id
    else
      raise Fluent::ConfigError, "buffer path is not configured. specify 'path' in <buffer>"
    end
  end

  type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
  if @variable_store.has_key?(@path) && !called_in_test?
    type_using_this_path = @variable_store[@path]
    raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
  end

  @buffer_path = @path
  @variable_store[@buffer_path] = type_of_owner

  specified_directory_exists = File.exist?(@path) && File.directory?(@path)
  unexisting_path_for_directory = !File.exist?(@path) && !@path.include?('.*')

  if specified_directory_exists || unexisting_path_for_directory # directory
    if using_plugin_root_dir || !multi_workers_configured
      @path = File.join(@path, "buffer.*#{@path_suffix}")
    else
      @path = File.join(@path, "worker#{fluentd_worker_id}", "buffer.*#{@path_suffix}")
      if fluentd_worker_id == 0
        # worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration)
        @additional_resume_path = File.join(File.expand_path("../../", @path), "buffer.*#{@path_suffix}")
      end
    end
    @multi_workers_available = true
  else # specified path is file path
    if File.basename(@path).include?('.*.')
      # valid file path
    elsif File.basename(@path).end_with?('.*')
      @path = @path + @path_suffix
    else
      # existing file will be ignored
      @path = @path + ".*#{@path_suffix}"
    end
    @multi_workers_available = false
  end

  if @dir_permission
    @dir_permission = @dir_permission.to_i(8) if @dir_permission.is_a?(String)
  else
    @dir_permission = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
  end
end
generate_chunk(metadata) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 205
def generate_chunk(metadata)
  # FileChunk generates real path with unique_id
  perm = @file_permission || system_config.file_permission
  chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress)
  log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata

  return chunk
end
handle_broken_files(path, mode, e) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 214
def handle_broken_files(path, mode, e)
  log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message
  unique_id = Fluent::Plugin::Buffer::FileChunk.unique_id_from_path(path)
  backup(unique_id) { |f|
    File.open(path, 'rb') { |chunk|
      chunk.set_encoding(Encoding::ASCII_8BIT)
      chunk.sync = true
      chunk.binmode
      IO.copy_stream(chunk, f)
    }
  }
rescue => error
  log.error "backup failed. Delete corresponding files.", :err_msg => error.message
ensure
  log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away." if @disable_chunk_backup
  File.unlink(path, path + '.meta') rescue nil
end
multi_workers_ready?() click to toggle source

This method is called only when multi worker is configured

# File lib/fluent/plugin/buf_file.rb, line 114
def multi_workers_ready?
  unless @multi_workers_available
    log.error "file buffer with multi workers should be configured to use directory 'path', or system root_dir and plugin id"
  end
  @multi_workers_available
end
persistent?() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 135
def persistent?
  true
end
resume() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 139
def resume
  stage = {}
  queue = []
  exist_broken_file = false

  patterns = [@path]
  patterns.unshift @additional_resume_path if @additional_resume_path
  Dir.glob(escaped_patterns(patterns)) do |path|
    next unless File.file?(path)

    if owner.respond_to?(:buffer_config) && owner.buffer_config&.flush_at_shutdown
      # When `flush_at_shutdown` is `true`, the remaining chunk files during resuming are possibly broken
      # since there may be a power failure or similar failure.
      log.warn { "restoring buffer file: path = #{path}" }
    else
      log.debug { "restoring buffer file: path = #{path}" }
    end

    m = new_metadata() # this metadata will be overwritten by resuming .meta file content
                       # so it should not added into @metadata_list for now
    mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path)
    if mode == :unknown
      log.debug "unknown state chunk found", path: path
      next
    end

    begin
      chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode, compress: @compress) # file chunk resumes contents of metadata
    rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e
      exist_broken_file = true
      handle_broken_files(path, mode, e)
      next
    end

    case chunk.state
    when :staged
      # unstaged chunk created at Buffer#write_step_by_step is identified as the staged chunk here because FileChunk#assume_chunk_state checks only the file name.
      # https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L663
      # This case can happen when fluentd process is killed by signal or other reasons between creating unstaged chunks and changing them to staged mode in Buffer#write
      # these chunks(unstaged chunks) has shared the same metadata
      # So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364
      if chunk_size_full?(chunk) || stage.key?(chunk.metadata)
        chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
        queue << chunk.enqueued!
      else
        stage[chunk.metadata] = chunk
      end
    when :queued
      queue << chunk
    end
  end

  queue.sort_by!{ |chunk| chunk.modified_at }

  # If one of the files is corrupted, other files may also be corrupted and be undetected.
  # The time priods of each chunk are helpful to check the data.
  if exist_broken_file
    log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files."
    (stage.values + queue).each { |chunk|
      log.info "  #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at
    }
  end

  return stage, queue
end
start() click to toggle source
Calls superclass method Fluent::Plugin::Buffer#start
# File lib/fluent/plugin/buf_file.rb, line 121
def start
  FileUtils.mkdir_p File.dirname(@path), mode: @dir_permission

  super
end
stop() click to toggle source
Calls superclass method Fluent::PluginId#stop
# File lib/fluent/plugin/buf_file.rb, line 127
def stop
  if @variable_store
    @variable_store.delete(@buffer_path)
  end

  super
end

Private Instance Methods

escaped_patterns(patterns) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 234
def escaped_patterns(patterns)
  patterns.map { |pattern|
    # '{' '}' are special character in Dir.glob
    pattern.gsub(/[\{\}]/) { |c| "\\#{c}" }
  }
end