class LogStash::Outputs::S3
LET'S ROCK AND ROLL ON THE CODE!
Public Class Methods
format_message(event)
click to toggle source
# File lib/logstash/outputs/s3.rb, line 346 def self.format_message(event) message = "Date: #{event["@timestamp"]}\n" message << "Source: #{event["source"]}\n" message << "Tags: #{event["tags"].join(', ')}\n" message << "Fields: #{event.to_hash.inspect}\n" message << "Message: #{event["message"]}" end
Public Instance Methods
aws_s3_config()
click to toggle source
Method to set up the aws configuration and establish connection
# File lib/logstash/outputs/s3.rb, line 152 def aws_s3_config @endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com' @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region) AWS.config( :access_key_id => @access_key_id, :secret_access_key => @secret_access_key, :s3_endpoint => @endpoint_region ) @s3 = AWS::S3.new end
getFinalPath()
click to toggle source
this method is used for create new path for name the file
# File lib/logstash/outputs/s3.rb, line 203 def getFinalPath @pass_time = Time.now return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M") end
newFile(flag)
click to toggle source
This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
# File lib/logstash/outputs/s3.rb, line 237 def newFile (flag) if (flag == true) @current_final_path = getFinalPath @sizeCounter = 0 end if (@tags.size != 0) @tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w") else @tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w") end end
receive(event)
click to toggle source
# File lib/logstash/outputs/s3.rb, line 295 def receive(event) return unless output?(event) # Prepare format of Events if (@format == "plain") message = self.class.format_message(event) elsif (@format == "json") message = event.to_json else message = event.to_s end if(time_file !=0) @logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s end # if specific the size if(size_file !=0) if (@tempFile.size < @size_file ) @logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s @logger.debug "S3: put event into: "+File.basename(@tempFile) # Put the event in the file, now! File.open(@tempFile, 'a') do |file| file.puts message file.write "\n" end else @logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file" upFile(false, File.basename(@tempFile)) @sizeCounter += 1 newFile(false) end # else we put all in one file else @logger.debug "S3: put event into "+File.basename(@tempFile) File.open(@tempFile, 'a') do |file| file.puts message file.write "\n" end end end
register()
click to toggle source
# File lib/logstash/outputs/s3.rb, line 253 def register require "aws-sdk" @temp_directory = "/opt/logstash/S3_temp/" if (@tags.size != 0) @tag_path = "" for i in (0..@tags.size-1) @tag_path += @tags[i].to_s+"." end end if !(File.directory? @temp_directory) @logger.debug "S3: Directory "+@temp_directory+" doesn't exist, let's make it!" Dir.mkdir(@temp_directory) else @logger.debug "S3: Directory "+@temp_directory+" exist, nothing to do" end if (@restore == true ) @logger.debug "S3: is attempting to verify previous crashes..." upFile(true, "*.txt") end newFile(true) if (time_file != 0) first_time = true @thread = time_alert(@time_file*60) do if (first_time == false) @logger.debug "S3: time_file triggered, let's bucket the file if dosen't empty and create new file " upFile(false, File.basename(@tempFile)) newFile(true) else first_time = false end end end end
time_alert(interval) { || ... }
click to toggle source
This method is used to manage sleep and awaken thread.
# File lib/logstash/outputs/s3.rb, line 168 def time_alert(interval) Thread.new do loop do start_time = Time.now yield elapsed = Time.now - start_time sleep([interval - elapsed, 0].max) end end end
upFile(flag, name)
click to toggle source
This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file
# File lib/logstash/outputs/s3.rb, line 212 def upFile(flag, name) Dir[@temp_directory+name].each do |file| name_file = File.basename(file) if (flag == true) @logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!" end if (!File.zero?(file)) write_on_bucket(file, name_file) if (flag == true) @logger.debug "S3: file: "+name_file+" restored on bucket "+@bucket else @logger.debug "S3: file: "+name_file+" was put on bucket "+@bucket end end File.delete (file) end end
write_on_bucket(file_data, file_basename)
click to toggle source
this method is used for write files on bucket. It accept the file and the name of file.
# File lib/logstash/outputs/s3.rb, line 182 def write_on_bucket (file_data, file_basename) # if you lose connection with s3, bad control implementation. if ( @s3 == nil) aws_s3_config end # find and use the bucket bucket = @s3.buckets[@bucket] @logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!" # prepare for write the file object = bucket.objects[file_basename] object.write(:file => file_data, :acl => @canned_acl) @logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\"" end