class LogStash::Outputs::GoogleBigQuery
b - BigQuery charges for storage and for queries, depending on how much data it reads to perform a query. These are other aspects to consider when considering the date pattern which will be used to create new tables and also how to compose the queries when using BQ. For more info on BigQuery Pricing, please access: developers.google.com/bigquery/pricing
USAGE: This is an example of logstash config:
output {
google_bigquery { project_id => "folkloric-guru-278" (required) dataset => "logs" (required) csv_schema => "path:STRING,status:INTEGER,score:FLOAT" (required) key_path => "/path/to/privatekey.p12" (required) key_password => "notasecret" (optional) service_account => "1234@developer.gserviceaccount.com" (required) temp_directory => "/tmp/logstash-bq" (optional) temp_file_prefix => "logstash_bq" (optional) date_pattern => "%Y-%m-%dT%H:00" (optional) flush_interval_secs => 2 (optional) uploader_interval_secs => 60 (optional) deleter_interval_secs => 60 (optional) }
}
Improvements TODO list:
-
Refactor common code between Google BQ and GCS plugins.
-
Turn Google API code into a Plugin Mixin (like AwsConfig).
-
There's no recover method, so if logstash/plugin crashes, files may not
be uploaded to BQ.
Public Instance Methods
Method called for each log event. It writes the event to the current output file, flushing depending on flush interval configuration.
# File lib/logstash/outputs/google_bigquery.rb, line 179 def receive(event) return unless output?(event) @logger.debug("BQ: receive method called", :event => event) # Message must be written as json message = event.to_json # Remove "@" from property names message = message.gsub(/\"@(\w+)\"/, '"\1"') new_base_path = get_base_path() # Time to roll file based on the date pattern? Or are we due to upload it to BQ? if (@current_base_path != new_base_path || Time.now - @last_file_time >= @uploader_interval_secs) @logger.debug("BQ: log file will be closed and uploaded", :filename => File.basename(@temp_file.to_path), :size => @temp_file.size.to_s, :uploader_interval_secs => @uploader_interval_secs.to_s) # Close alone does not guarantee that data is physically written to disk, # so flushing it before. @temp_file.fsync() @temp_file.close() initialize_next_log() end @temp_file.write(message) @temp_file.write("\n") sync_log_file() @logger.debug("BQ: event appended to log file", :filename => File.basename(@temp_file.to_path)) end
# File lib/logstash/outputs/google_bigquery.rb, line 139 def register require 'csv' require "fileutils" require "thread" @logger.debug("BQ: register plugin") @fields = Array.new CSV.parse(@csv_schema.gsub('\"', '""')).flatten.each do |field| temp = field.strip.split(":") # Check that the field in the schema follows the format (<name>:<value>) if temp.length != 2 raise "BigQuery schema must follow the format <field-name>:<field-value>" end @fields << { "name" => temp[0], "type" => temp[1] } end # Check that we have at least one field in the schema if @fields.length == 0 raise "BigQuery schema must contain at least one field" end @json_schema = { "fields" => @fields } @upload_queue = Queue.new @delete_queue = Queue.new @last_flush_cycle = Time.now initialize_temp_directory() initialize_current_log() initialize_google_client() initialize_uploader() initialize_deleter() end
# File lib/logstash/outputs/google_bigquery.rb, line 214 def teardown @logger.debug("BQ: teardown method called") @temp_file.flush() @temp_file.close() end
Private Instance Methods
Returns base path to log file that is invariant regardless of any user options.
# File lib/logstash/outputs/google_bigquery.rb, line 366 def get_base_path return get_undated_path() + "_" + Time.now.strftime(@date_pattern) end
Returns date from a temporary log file name.
# File lib/logstash/outputs/google_bigquery.rb, line 379 def get_date_pattern(filename) match = /^#{get_undated_path()}_(?<date>.*)\.part(\d+)\.log$/.match(filename) return match[:date] end
Returns full path to the log file based on global variables (like current_base_path) and configuration options (max file size).
# File lib/logstash/outputs/google_bigquery.rb, line 373 def get_full_path return @current_base_path + ".part" + ("%03d" % @size_counter) + ".log" end
Uploads a local file to the configured bucket.
# File lib/logstash/outputs/google_bigquery.rb, line 468 def get_job_status(job_id) begin require 'json' @logger.debug("BQ: check job status.", :job_id => job_id) get_result = @client.execute(:api_method => @bq.jobs.get, :parameters => { 'jobId' => job_id, 'projectId' => @project_id }) response = JSON.parse(get_result.response.body) @logger.debug("BQ: successfully invoked API.", :response => response) if response.has_key?("error") raise response["error"] end # Successful invocation contents = response["status"] return contents rescue => e @logger.error("BQ: failed to check status", :exception => e) # TODO(rdc): limit retries? sleep 1 retry end end
Returns latest part number for a base path. This method checks all existing log files in order to find the highest part number, so this file can be used for appending log events.
Only applicable if max file size is enabled.
# File lib/logstash/outputs/google_bigquery.rb, line 390 def get_latest_part_number(base_path) part_numbers = Dir.glob(base_path + ".part*.log").map do |item| match = /^.*\.part(?<part_num>\d+).log$/.match(item) next if match.nil? match[:part_num].to_i end return part_numbers.max if part_numbers.any? 0 end
Returns undated path used to construct base path and final full path. This path only includes directory, prefix, and hostname info.
# File lib/logstash/outputs/google_bigquery.rb, line 358 def get_undated_path return @temp_directory + File::SEPARATOR + @temp_file_prefix + "_" + Socket.gethostname() end
Opens log file on plugin initialization, trying to resume from an existing file. If max file size is enabled, find the highest part number and resume from it.
# File lib/logstash/outputs/google_bigquery.rb, line 420 def initialize_current_log @current_base_path = get_base_path @last_file_time = Time.now @size_counter = get_latest_part_number(@current_base_path) @logger.debug("BQ: resuming from latest part.", :part => @size_counter) open_current_file() end
Starts thread to delete uploaded log files once their jobs are done.
Deleter is done in a separate thread, not holding the receive method above.
# File lib/logstash/outputs/google_bigquery.rb, line 265 def initialize_deleter @uploader = Thread.new do @logger.debug("BQ: starting deleter") while true delete_item = @delete_queue.pop job_id = delete_item["job_id"] filename = delete_item["filename"] job_status = get_job_status(job_id) case job_status["state"] when "DONE" if job_status.has_key?("errorResult") @logger.error("BQ: job failed, please enable debug and check full "\ "response (probably the issue is an incompatible "\ "schema). NOT deleting local file.", :job_id => job_id, :filename => filename, :job_status => job_status) else @logger.debug("BQ: job is done, deleting local temporary file ", :job_id => job_id, :filename => filename, :job_status => job_status) File.delete(filename) end when "PENDING", "RUNNING" @logger.debug("BQ: job is not done, NOT deleting local file yet.", :job_id => job_id, :filename => filename, :job_status => job_status) @delete_queue << delete_item else @logger.error("BQ: unknown job status, please enable debug and "\ "check full response (probably the issue is an "\ "incompatible schema). NOT deleting local file yet.", :job_id => job_id, :filename => filename, :job_status => job_status) end sleep @deleter_interval_secs end end end
Initializes Google Client instantiating client and authorizing access.
# File lib/logstash/outputs/google_bigquery.rb, line 447 def initialize_google_client require "google/api_client" require "openssl" @client = Google::APIClient.new(:application_name => 'Logstash Google BigQuery output plugin', :application_version => '0.1') @bq = @client.discovered_api('bigquery', 'v2') key = Google::APIClient::PKCS12.load_key(@key_path, @key_password) # Authorization scope reference: # https://developers.google.com/bigquery/docs/authorization service_account = Google::APIClient::JWTAsserter.new(@service_account, 'https://www.googleapis.com/auth/bigquery', key) @client.authorization = service_account.authorize end
Generates new log file name based on configuration options and opens log file. If max file size is enabled, part number if incremented in case the the base log file name is the same (e.g. log file was not rolled given the date pattern).
# File lib/logstash/outputs/google_bigquery.rb, line 434 def initialize_next_log new_base_path = get_base_path @size_counter = @current_base_path == new_base_path ? @size_counter + 1 : 0 @logger.debug("BQ: opening next log file.", :filename => @current_base_path, :part => @size_counter) @current_base_path = new_base_path @last_file_time = Time.now open_current_file() end
Creates temporary directory, if it does not exist.
A random suffix is appended to the temporary directory
# File lib/logstash/outputs/google_bigquery.rb, line 246 def initialize_temp_directory if @temp_directory.empty? require "stud/temporary" @temp_directory = Stud::Temporary.directory("logstash-bq") @logger.info("BQ: temporary directory generated", :directory => @temp_directory) end if !(File.directory? @temp_directory) @logger.debug("BQ: directory doesn't exist. Creating it.", :directory => @temp_directory) FileUtils.mkdir_p(@temp_directory) end end
Starts thread to upload log files.
Uploader is done in a separate thread, not holding the receive method above.
# File lib/logstash/outputs/google_bigquery.rb, line 313 def initialize_uploader @uploader = Thread.new do @logger.debug("BQ: starting uploader") while true filename = @upload_queue.pop # Reenqueue if it is still the current file. if filename == @temp_file.to_path if @current_base_path == get_base_path() if Time.now - @last_file_time < @uploader_interval_secs @logger.debug("BQ: reenqueue as log file is being currently appended to.", :filename => filename) @upload_queue << filename # If we got here, it means that older files were uploaded, so let's # wait another minute before checking on this file again. sleep @uploader_interval_secs next else @logger.debug("BQ: flush and close file to be uploaded.", :filename => filename) @temp_file.flush() @temp_file.close() initialize_next_log() end end end if File.size(filename) > 0 job_id = upload_object(filename) @delete_queue << { "filename" => filename, "job_id" => job_id } else @logger.debug("BQ: skipping empty file.") @logger.debug("BQ: delete local temporary file ", :filename => filename) File.delete(filename) end sleep @uploader_interval_secs end end end
Opens current log file and updates @temp_file with an instance of IOWriter
. This method also adds file to the upload queue.
# File lib/logstash/outputs/google_bigquery.rb, line 404 def open_current_file() path = get_full_path() stat = File.stat(path) rescue nil if stat and stat.ftype == "fifo" and RUBY_PLATFORM == "java" fd = java.io.FileWriter.new(java.io.File.new(path)) else fd = File.new(path, "a") end @temp_file = IOWriter.new(fd) @upload_queue << @temp_file.to_path end
Flushes temporary log file every flush_interval_secs seconds or so. This is triggered by events, but if there are no events there's no point flushing files anyway.
Inspired by lib/logstash/outputs/file.rb (flush(fd), flush_pending_files)
# File lib/logstash/outputs/google_bigquery.rb, line 228 def sync_log_file if flush_interval_secs <= 0 @temp_file.fsync return end return unless Time.now - @last_flush_cycle >= flush_interval_secs @temp_file.fsync @logger.debug("BQ: flushing file", :path => @temp_file.to_path, :fd => @temp_file) @last_flush_cycle = Time.now end
Uploads a local file to the configured bucket.
# File lib/logstash/outputs/google_bigquery.rb, line 499 def upload_object(filename) begin require 'json' table_id = @table_prefix + "_" + get_date_pattern(filename) # BQ does not accept anything other than alphanumeric and _ # Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en table_id = table_id.gsub!(':','_').gsub!('-', '_') @logger.debug("BQ: upload object.", :filename => filename, :table_id => table_id) media = Google::APIClient::UploadIO.new(filename, "application/octet-stream") body = { "configuration" => { "load" => { "sourceFormat" => "NEWLINE_DELIMITED_JSON", "schema" => @json_schema, "destinationTable" => { "projectId" => @project_id, "datasetId" => @dataset, "tableId" => table_id }, 'createDisposition' => 'CREATE_IF_NEEDED', 'writeDisposition' => 'WRITE_APPEND' } } } insert_result = @client.execute(:api_method => @bq.jobs.insert, :body_object => body, :parameters => { 'uploadType' => 'multipart', 'projectId' => @project_id }, :media => media) job_id = JSON.parse(insert_result.response.body)["jobReference"]["jobId"] @logger.debug("BQ: multipart insert", :job_id => job_id) return job_id rescue => e @logger.error("BQ: failed to upload file", :exception => e) # TODO(rdc): limit retries? sleep 1 retry end end