class LogStash::Outputs::S3

LET'S ROCK AND ROLL ON THE CODE!

Public Class Methods

format_message(event) click to toggle source
# File lib/logstash/outputs/s3.rb, line 346
def self.format_message(event)
   message = "Date: #{event["@timestamp"]}\n"
   message << "Source: #{event["source"]}\n"
   message << "Tags: #{event["tags"].join(', ')}\n"
   message << "Fields: #{event.to_hash.inspect}\n"
   message << "Message: #{event["message"]}"
end

Public Instance Methods

aws_s3_config() click to toggle source

Method to set up the aws configuration and establish connection

# File lib/logstash/outputs/s3.rb, line 152
def aws_s3_config

 @endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com'

 @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region)

 AWS.config(
   :access_key_id => @access_key_id,
   :secret_access_key => @secret_access_key,
   :s3_endpoint => @endpoint_region
 )
 @s3 = AWS::S3.new 

end
getFinalPath() click to toggle source

this method is used for create new path for name the file

# File lib/logstash/outputs/s3.rb, line 203
def getFinalPath
  
  @pass_time = Time.now 
  return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M")

end
newFile(flag) click to toggle source

This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.

# File lib/logstash/outputs/s3.rb, line 237
def newFile (flag)
 
  if (flag == true)
    @current_final_path = getFinalPath
    @sizeCounter = 0
  end

  if (@tags.size != 0)
    @tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w")
  else
    @tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w")
  end

end
receive(event) click to toggle source
# File lib/logstash/outputs/s3.rb, line 295
def receive(event)
 return unless output?(event)
  
 # Prepare format of Events
 if (@format == "plain")
    message = self.class.format_message(event)
 elsif (@format == "json")
    message = event.to_json
 else
    message = event.to_s
 end
 
 if(time_file !=0)
    @logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s
 end

 # if specific the size
 if(size_file !=0)
   
   if (@tempFile.size < @size_file )

      @logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s
      @logger.debug "S3: put event into: "+File.basename(@tempFile)

      # Put the event in the file, now!
      File.open(@tempFile, 'a') do |file|
        file.puts message
        file.write "\n"
      end

    else

      @logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file"
      upFile(false, File.basename(@tempFile))
      @sizeCounter += 1
      newFile(false)

    end
    
 # else we put all in one file
 else

   @logger.debug "S3: put event into "+File.basename(@tempFile)
   File.open(@tempFile, 'a') do |file|
     file.puts message
     file.write "\n"
   end
 end
   
end
register() click to toggle source
# File lib/logstash/outputs/s3.rb, line 253
def register
  require "aws-sdk"
  @temp_directory = "/opt/logstash/S3_temp/"

  if (@tags.size != 0)
      @tag_path = ""
      for i in (0..@tags.size-1)
         @tag_path += @tags[i].to_s+"." 
      end
  end

  if !(File.directory? @temp_directory)
   @logger.debug "S3: Directory "+@temp_directory+" doesn't exist, let's make it!"
   Dir.mkdir(@temp_directory)
  else
   @logger.debug "S3: Directory "+@temp_directory+" exist, nothing to do"
  end 
  
  if (@restore == true )
    @logger.debug "S3: is attempting to verify previous crashes..."
  
    upFile(true, "*.txt")    
  end
  
  newFile(true)
  
  if (time_file != 0)
     first_time = true
     @thread = time_alert(@time_file*60) do
      if (first_time == false)
        @logger.debug "S3: time_file triggered,  let's bucket the file if dosen't empty  and create new file "
        upFile(false, File.basename(@tempFile))
        newFile(true)
      else
        first_time = false
      end
    end
  end

end
time_alert(interval) { || ... } click to toggle source

This method is used to manage sleep and awaken thread.

# File lib/logstash/outputs/s3.rb, line 168
def time_alert(interval)

  Thread.new do
   loop do
     start_time = Time.now
     yield
     elapsed = Time.now - start_time
     sleep([interval - elapsed, 0].max)
   end
  end

end
upFile(flag, name) click to toggle source

This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file

# File lib/logstash/outputs/s3.rb, line 212
def upFile(flag, name)
  
  Dir[@temp_directory+name].each do |file|
    name_file = File.basename(file)
   
    if (flag == true)
     @logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!"
    end
   
    if (!File.zero?(file))  
      write_on_bucket(file, name_file)

      if (flag == true)
         @logger.debug "S3: file: "+name_file+" restored on bucket "+@bucket
      else
         @logger.debug "S3: file: "+name_file+" was put on bucket "+@bucket
      end
    end

    File.delete (file)

  end
end
write_on_bucket(file_data, file_basename) click to toggle source

this method is used for write files on bucket. It accept the file and the name of file.

# File lib/logstash/outputs/s3.rb, line 182
def write_on_bucket (file_data, file_basename)

 # if you lose connection with s3, bad control implementation.
 if ( @s3 == nil) 
   aws_s3_config
 end

 # find and use the bucket
 bucket = @s3.buckets[@bucket]

 @logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!"

 # prepare for write the file
 object = bucket.objects[file_basename]
 object.write(:file => file_data, :acl => @canned_acl)

 @logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\""

end