class LogStash::Outputs::LogstashAzureBlobOutput

Logstash outout plugin that uploads the logs to Azure blobs. The logs are stored on local temporary file which is uploaded as a blob to Azure cloud

@author Jaime Margolin

@!attribute storage_account_name

Azure storage account name (required) - found under the access keys tab

@!attribute storage_access_key

Azure storage account access key (required) - found under the access keys tab

@!attribute contianer_name

Blob container to uplaod blobs to (required)

@!attribute size_file

File size to use for local tmeporary File

@!attribute time_file

time to upload the local File

@!attribute restore

restore after crash

@!attribute temporary_directory

temporary directory where the temporary files will be written

@!attribute prefix

prefix for the files to be uploaded

@!attribute upload queue size

upload que size

@!attribute upload workers count

how much workers for uplaod

@!attribute rotation_strategy_val

what will be considered to do the tmeporary file rotation

@!attribute tags

tags for the files

@!attribute encoding

the encoding of the files

@example basic configuration

output {
  azure {
    storage_account_name => "my-azure-account"    # required
    storage_access_key => "my-super-secret-key"   # required
    contianer_name => "my-contianer"              # required
    size_file => 1024*1024*5                      # optional
    time_file => 10                               # optional
    restore => true                               # optional
    temporary_directory => "path/to/directory"    # optional
    prefix => "a_prefix"                          # optional
    upload_queue_size => 2                        # optional
    upload_workers_count => 1                     # optional
    rotation_strategy_val => "size_and_time"      # optional
    tags => []                                    # optional
    encoding => "none"                            # optional
  }
}

Constants

CRASH_RECOVERY_THREADPOOL
PERIODIC_CHECK_INTERVAL_IN_SECONDS
PREFIX_KEY_NORMALIZE_CHARACTER

Attributes

container_name[RW]
encoding[RW]
prefix[RW]
restore[RW]
rotation_strategy_val[RW]
size_file[RW]
storage_access_key[RW]
storage_account_name[RW]
tags[RW]
temporary_directory[RW]
time_file[RW]
upload_queue_size[RW]
upload_workers_count[RW]

Public Instance Methods

blob_container_resource() click to toggle source

login to azure cloud using azure gem and create the contianer if it doesn't exist @return [Object] the azure_blob_service object, which is the endpoint to azure gem

# File lib/logstash/outputs/azure.rb, line 204
def blob_container_resource
  Azure.config.storage_account_name = storage_account_name
  Azure.config.storage_access_key = storage_access_key
  azure_blob_service = Azure::Blob::BlobService.new
  list = azure_blob_service.list_containers
  list.each do |item|
    @container = item if item.name == container_name
  end

  azure_blob_service.create_container(container_name) unless @container
  azure_blob_service
end
clean_temporary_file(file) click to toggle source

Cleans the temporary files after it is uploaded to azure blob

# File lib/logstash/outputs/azure.rb, line 265
def clean_temporary_file(file)
  @logger.debug('Removing temporary file', file: file.path)
  file.delete!
end
close() click to toggle source

close the temporary file and uploads the content to Azure

# File lib/logstash/outputs/azure.rb, line 159
def close
  stop_periodic_check if @rotation.needs_periodic?

  @logger.debug('Uploading current workspace')

  # The plugin has stopped receiving new events, but we still have
  # data on disk, lets make sure it get to Azure blob.
  # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
  # the content in the temporary directly and upload it.
  # This will block the shutdown until all upload are done or the use force quit.
  @file_repository.each_files do |file|
    upload_file(file)
  end

  @file_repository.shutdown

  @uploader.stop # wait until all the current upload are complete
  @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
end
multi_receive_encoded(events_and_encoded) click to toggle source

Receives multiple events and check if there is space in temporary directory @param events_and_encoded [Object]

# File lib/logstash/outputs/azure.rb, line 137
def multi_receive_encoded(events_and_encoded)
  prefix_written_to = Set.new

  events_and_encoded.each do |event, encoded|
    prefix_key = normalize_key(event.sprintf(@prefix))
    prefix_written_to << prefix_key

    begin
      @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
      # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
      # Log the error and rethrow it.
    rescue Errno::ENOSPC => e
      @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
      raise e
    end
  end

  # Groups IO calls to optimize fstat checks
  rotate_if_needed(prefix_written_to)
end
normalize_key(prefix_key) click to toggle source

Validates and normalize prefix key @param prefix_key [String]

# File lib/logstash/outputs/azure.rb, line 181
def normalize_key(prefix_key)
  prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
end
register() click to toggle source

initializes the LogstashAzureBlobOutput instances validates all config parameters initializes the uploader

# File lib/logstash/outputs/azure.rb, line 105
def register
  unless @prefix.empty?
    unless PathValidator.valid?(prefix)
      raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
    end
  end

  unless WritableDirectoryValidator.valid?(@temporary_directory)
    raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
  end

  if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
    raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
  end

  @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

  @rotation = rotation_strategy

  executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                max_threads: @upload_workers_count,
                                                max_queue: @upload_queue_size,
                                                fallback_policy: :caller_runs)

  @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

  restore_from_crash if @restore
  start_periodic_check if @rotation.needs_periodic?
end
restore_from_crash() click to toggle source

uploads files if there was a crash before

# File lib/logstash/outputs/azure.rb, line 271
def restore_from_crash
  @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

  temp_folder_path = Pathname.new(@temporary_directory)
  Dir.glob(::File.join(@temporary_directory, '**/*'))
     .select { |file| ::File.file?(file) }
     .each do |file|
    temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
    @logger.debug('Recovering from crash and uploading', file: temp_file.path)
    @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
  end
end
rotate_if_needed(prefixes) click to toggle source

check if it needs to rotate according to rotation policy and rotates it if it needs @param prefixes [String]

# File lib/logstash/outputs/azure.rb, line 219
def rotate_if_needed(prefixes)
  prefixes.each do |prefix|
    # Each file access is thread safe,
    # until the rotation is done then only
    # one thread has access to the resource.
    @file_repository.get_factory(prefix) do |factory|
      temp_file = factory.current

      if @rotation.rotate?(temp_file)
        @logger.debug('Rotate file',
                      strategy: @rotation.class.name,
                      key: temp_file.key,
                      path: temp_file.path)

        upload_file(temp_file)
        factory.rotate!
      end
    end
  end
end
rotation_strategy() click to toggle source

creates an instance for the rotation strategy

# File lib/logstash/outputs/azure.rb, line 253
def rotation_strategy
  case @rotation_strategy_val
  when 'size'
    SizeRotationPolicy.new(size_file)
  when 'time'
    TimeRotationPolicy.new(time_file)
  when 'size_and_time'
    SizeAndTimeRotationPolicy.new(size_file, time_file)
  end
end
start_periodic_check() click to toggle source

checks periodically the tmeporary file if it needs to be rotated

# File lib/logstash/outputs/azure.rb, line 186
def start_periodic_check
  @logger.debug('Start periodic rotation check')

  @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
    @logger.debug('Periodic check for stale files')

    rotate_if_needed(@file_repository.keys)
  end

  @periodic_check.execute
end
stop_periodic_check() click to toggle source
# File lib/logstash/outputs/azure.rb, line 198
def stop_periodic_check
  @periodic_check.shutdown
end
upload_file(temp_file) click to toggle source

uploads the file using the Uploader

# File lib/logstash/outputs/azure.rb, line 241
def upload_file(temp_file)
  @logger.debug('Queue for upload', path: temp_file.path)

  # if the queue is full the calling thread will be used to upload
  temp_file.close # make sure the content is on disk
  unless temp_file.empty? # rubocop:disable GuardClause
    @uploader.upload_async(temp_file,
                           on_complete: method(:clean_temporary_file))
  end
end