class LogStash::Outputs::LogstashAzureBlobOutput::FileRepository
sub class for LogstashAzureBlobOutput
this class manages the temporary directory for the temporary files
Constants
- DEFAULT_STALE_TIME_SECS
- DEFAULT_STATE_SWEEPER_INTERVAL_SECS
Public Class Methods
new(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
click to toggle source
initializes the class with more variables
# File lib/logstash/outputs/blob/file_repository.rb, line 66 def initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) # The path need to contains the prefix so when we start # logtash after a crash we keep the remote structure @prefixed_factories = ConcurrentHashMap.new @sweeper_interval = sweeper_interval @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time) start_stale_sweeper end
Public Instance Methods
each_files() { |current| ... }
click to toggle source
with lock for each file
# File lib/logstash/outputs/blob/file_repository.rb, line 86 def each_files @prefixed_factories.elements.each do |prefixed_file| prefixed_file.with_lock { |factory| yield factory.current } end end
get_factory(prefix_key) { |factory| ... }
click to toggle source
Return the file factory
# File lib/logstash/outputs/blob/file_repository.rb, line 93 def get_factory(prefix_key) @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } end
get_file(prefix_key) { |current| ... }
click to toggle source
gets file from prefix_key
# File lib/logstash/outputs/blob/file_repository.rb, line 98 def get_file(prefix_key) get_factory(prefix_key) { |factory| yield factory.current } end
keys()
click to toggle source
gets the key set
# File lib/logstash/outputs/blob/file_repository.rb, line 81 def keys @prefixed_factories.keySet end
remove_stale(k, v)
click to toggle source
remove the stale given key and value
# File lib/logstash/outputs/blob/file_repository.rb, line 113 def remove_stale(k, v) if v.stale? # rubocop:disable Style/GuardClause @prefixed_factories.remove(k, v) v.delete! end end
shutdown()
click to toggle source
stops. shutdown
# File lib/logstash/outputs/blob/file_repository.rb, line 103 def shutdown stop_stale_sweeper end
size()
click to toggle source
gets factory's size
# File lib/logstash/outputs/blob/file_repository.rb, line 108 def size @prefixed_factories.size end
start_stale_sweeper()
click to toggle source
starts the stale sweeper
# File lib/logstash/outputs/blob/file_repository.rb, line 121 def start_stale_sweeper @stale_sweeper = Concurrent::TimerTask.new(execution_interval: @sweeper_interval) do LogStash::Util.set_thread_name('LogstashAzureBlobOutput, Stale factory sweeper') @prefixed_factories.forEach { |k, v| remove_stale(k, v) } end @stale_sweeper.execute end
stop_stale_sweeper()
click to toggle source
stops the stale sweeper
# File lib/logstash/outputs/blob/file_repository.rb, line 132 def stop_stale_sweeper @stale_sweeper.shutdown end