class LogStash::Outputs::GoogleCloudStorage
Summary: plugin to upload log events to Google Cloud Storage (GCS), rolling files based on the date pattern provided as a configuration setting. Events are written to files locally and, once file is closed, this plugin uploads it to the configured bucket.
For more info on Google Cloud Storage, please go to: cloud.google.com/products/cloud-storage
In order to use this plugin, a Google service account must be used. For more information, please refer to: developers.google.com/storage/docs/authentication#service_accounts
Recommendation: experiment with the settings depending on how much log data you generate, so the uploader can keep up with the generated logs. Using gzip output can be a good option to reduce network traffic when uploading the log files and in terms of storage costs as well.
USAGE: This is an example of logstash config:
output {
google_cloud_storage { bucket => "my_bucket" (required) key_path => "/path/to/privatekey.p12" (required) key_password => "notasecret" (optional) service_account => "1234@developer.gserviceaccount.com" (required) temp_directory => "/tmp/logstash-gcs" (optional) log_file_prefix => "logstash_gcs" (optional) max_file_size_kbytes => 1024 (optional) output_format => "plain" (optional) date_pattern => "%Y-%m-%dT%H:00" (optional) flush_interval_secs => 2 (optional) gzip => false (optional) uploader_interval_secs => 60 (optional) }
}
Improvements TODO list:
-
Support logstash event variables to determine filename.
-
Turn Google API code into a Plugin Mixin (like AwsConfig).
-
There's no recover method, so if logstash/plugin crashes, files may not
be uploaded to GCS.
-
Allow user to configure file name.
-
Allow parallel uploads for heavier loads (+ connection configuration if
exposed by Ruby API client)
Public Instance Methods
Method called for each log event. It writes the event to the current output file, flushing depending on flush interval configuration.
# File lib/logstash/outputs/google_cloud_storage.rb, line 137 def receive(event) return unless output?(event) @logger.debug("GCS: receive method called", :event => event) if (@output_format == "json") message = event.to_json else message = event.to_s end new_base_path = get_base_path() # Time to roll file based on the date pattern? Or is it over the size limit? if (@current_base_path != new_base_path || (@max_file_size_kbytes > 0 && @temp_file.size >= @max_file_size_kbytes * 1024)) @logger.debug("GCS: log file will be closed and uploaded", :filename => File.basename(@temp_file.to_path), :size => @temp_file.size.to_s, :max_size => @max_file_size_kbytes.to_s) # Close does not guarantee that data is physically written to disk. @temp_file.fsync() @temp_file.close() initialize_next_log() end @temp_file.write(message) @temp_file.write("\n") sync_log_file() @logger.debug("GCS: event appended to log file", :filename => File.basename(@temp_file.to_path)) end
# File lib/logstash/outputs/google_cloud_storage.rb, line 114 def register require "fileutils" require "thread" @logger.debug("GCS: register plugin") @upload_queue = Queue.new @last_flush_cycle = Time.now initialize_temp_directory() initialize_current_log() initialize_google_client() initialize_uploader() if @gzip @content_type = 'application/gzip' else @content_type = 'text/plain' end end
# File lib/logstash/outputs/google_cloud_storage.rb, line 172 def teardown @logger.debug("GCS: teardown method called") @temp_file.fsync() @temp_file.close() end
Private Instance Methods
Returns base path to log file that is invariant regardless of whether max file or gzip options.
# File lib/logstash/outputs/google_cloud_storage.rb, line 260 def get_base_path return @temp_directory + File::SEPARATOR + @log_file_prefix + "_" + Socket.gethostname() + "_" + Time.now.strftime(@date_pattern) end
Returns full path to the log file based on global variables (like current_base_path) and configuration options (max file size and gzip enabled).
# File lib/logstash/outputs/google_cloud_storage.rb, line 276 def get_full_path if @max_file_size_kbytes > 0 return @current_base_path + ".part" + ("%03d" % @size_counter) + get_suffix() else return @current_base_path + get_suffix() end end
Returns latest part number for a base path. This method checks all existing log files in order to find the highest part number, so this file can be used for appending log events.
Only applicable if max file size is enabled.
# File lib/logstash/outputs/google_cloud_storage.rb, line 290 def get_latest_part_number(base_path) part_numbers = Dir.glob(base_path + ".part*" + get_suffix()).map do |item| match = /^.*\.part(?<part_num>\d+)#{get_suffix()}$/.match(item) next if match.nil? match[:part_num].to_i end return part_numbers.max if part_numbers.any? 0 end
Returns log file suffix, which will vary depending on whether gzip is enabled.
# File lib/logstash/outputs/google_cloud_storage.rb, line 268 def get_suffix return @gzip ? ".log.gz" : ".log" end
Opens log file on plugin initialization, trying to resume from an existing file. If max file size is enabled, find the highest part number and resume from it.
# File lib/logstash/outputs/google_cloud_storage.rb, line 323 def initialize_current_log @current_base_path = get_base_path if @max_file_size_kbytes > 0 @size_counter = get_latest_part_number(@current_base_path) @logger.debug("GCS: resuming from latest part.", :part => @size_counter) end open_current_file() end
Initializes Google Client instantiating client and authorizing access.
# File lib/logstash/outputs/google_cloud_storage.rb, line 355 def initialize_google_client require "google/api_client" require "openssl" @client = Google::APIClient.new(:application_name => 'Logstash Google Cloud Storage output plugin', :application_version => '0.1') @storage = @client.discovered_api('storage', 'v1beta1') key = Google::APIClient::PKCS12.load_key(@key_path, @key_password) service_account = Google::APIClient::JWTAsserter.new(@service_account, 'https://www.googleapis.com/auth/devstorage.read_write', key) @client.authorization = service_account.authorize end
Generates new log file name based on configuration options and opens log file. If max file size is enabled, part number if incremented in case the the base log file name is the same (e.g. log file was not rolled given the date pattern).
# File lib/logstash/outputs/google_cloud_storage.rb, line 338 def initialize_next_log new_base_path = get_base_path if @max_file_size_kbytes > 0 @size_counter = @current_base_path == new_base_path ? @size_counter + 1 : 0 @logger.debug("GCS: opening next log file.", :filename => @current_base_path, :part => @size_counter) else @logger.debug("GCS: opening next log file.", :filename => @current_base_path) end @current_base_path = new_base_path open_current_file() end
Creates temporary directory, if it does not exist.
A random suffix is appended to the temporary directory
# File lib/logstash/outputs/google_cloud_storage.rb, line 204 def initialize_temp_directory require "stud/temporary" if @temp_directory.empty? @temp_directory = Stud::Temporary.directory("logstash-gcs") @logger.info("GCS: temporary directory generated", :directory => @temp_directory) end if !(File.directory? @temp_directory) @logger.debug("GCS: directory doesn't exist. Creating it.", :directory => @temp_directory) FileUtils.mkdir_p(@temp_directory) end end
Starts thread to upload log files.
Uploader is done in a separate thread, not holding the receive method above.
# File lib/logstash/outputs/google_cloud_storage.rb, line 223 def initialize_uploader @uploader = Thread.new do @logger.debug("GCS: starting uploader") while true filename = @upload_queue.pop # Reenqueue if it is still the current file. if filename == @temp_file.to_path if @current_base_path == get_base_path() @logger.debug("GCS: reenqueue as log file is being currently appended to.", :filename => filename) @upload_queue << filename # If we got here, it means that older files were uploaded, so let's # wait another minute before checking on this file again. sleep @uploader_interval_secs next else @logger.debug("GCS: flush and close file to be uploaded.", :filename => filename) @temp_file.fsync() @temp_file.close() initialize_next_log() end end upload_object(filename) @logger.debug("GCS: delete local temporary file ", :filename => filename) File.delete(filename) sleep @uploader_interval_secs end end end
Opens current log file and updates @temp_file with an instance of IOWriter
. This method also adds file to the upload queue.
# File lib/logstash/outputs/google_cloud_storage.rb, line 304 def open_current_file() path = get_full_path() stat = File.stat(path) rescue nil if stat and stat.ftype == "fifo" and RUBY_PLATFORM == "java" fd = java.io.FileWriter.new(java.io.File.new(path)) else fd = File.new(path, "a") end if @gzip fd = Zlib::GzipWriter.new(fd) end @temp_file = GCSIOWriter.new(fd) @upload_queue << @temp_file.to_path end
Flushes temporary log file every flush_interval_secs seconds or so. This is triggered by events, but if there are no events there's no point flushing files anyway.
Inspired by lib/logstash/outputs/file.rb (flush(fd), flush_pending_files)
# File lib/logstash/outputs/google_cloud_storage.rb, line 186 def sync_log_file if flush_interval_secs <= 0 @temp_file.fsync() return end return unless Time.now - @last_flush_cycle >= flush_interval_secs @temp_file.fsync() @logger.debug("GCS: flushing file", :path => @temp_file.to_path, :fd => @temp_file) @last_flush_cycle = Time.now end
Uploads a local file to the configured bucket.
# File lib/logstash/outputs/google_cloud_storage.rb, line 373 def upload_object(filename) begin @logger.debug("GCS: upload object.", :filename => filename) media = Google::APIClient::UploadIO.new(filename, @content_type) metadata_insert_result = @client.execute(:api_method => @storage.objects.insert, :parameters => { 'uploadType' => 'multipart', 'bucket' => @bucket, 'name' => File.basename(filename) }, :body_object => {contentType: @content_type}, :media => media) contents = metadata_insert_result.data @logger.debug("GCS: multipart insert", :object => contents.name, :self_link => contents.self_link) rescue => e @logger.error("GCS: failed to upload file", :exception => e) # TODO(rdc): limit retries? sleep 1 retry end end