class LogStash::Inputs::S3

Stream events from files from a S3 bucket.

Each line from each file generates an event. Files ending in '.gz' are handled as gzip'ed files.

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/s3.rb, line 63
def register
  require "digest/md5"
  require "aws-sdk"

  @region_endpoint = @region if !@region.empty?

  @logger.info("Registering s3 input", :bucket => @bucket, :region_endpoint => @region_endpoint)

  if @credentials.nil?
    @access_key_id = ENV['AWS_ACCESS_KEY_ID']
    @secret_access_key = ENV['AWS_SECRET_ACCESS_KEY']
  elsif @credentials.is_a? Array
    if @credentials.length ==1
      File.open(@credentials[0]) { |f| f.each do |line|
        unless (/^\#/.match(line))
          if(/\s*=\s*/.match(line))
            param, value = line.split('=', 2)
            param = param.chomp().strip()
            value = value.chomp().strip()
            if param.eql?('AWS_ACCESS_KEY_ID')
              @access_key_id = value
            elsif param.eql?('AWS_SECRET_ACCESS_KEY')
              @secret_access_key = value
            end
          end
        end
      end
      }
    elsif @credentials.length == 2
      @access_key_id = @credentials[0]
      @secret_access_key = @credentials[1]
    else
      raise ArgumentError.new('Credentials must be of the form "/path/to/file" or ["id", "secret"]')
    end
  end
  if @access_key_id.nil? or @secret_access_key.nil?
    raise ArgumentError.new('Missing AWS credentials')
  end

  if @bucket.nil?
    raise ArgumentError.new('Missing AWS bucket')
  end

  if @sincedb_path.nil?
    if ENV['HOME'].nil?
      raise ArgumentError.new('No HOME or sincedb_path set')
    end
    @sincedb_path = File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}"))
  end

  s3 = AWS::S3.new(
    :access_key_id => @access_key_id,
    :secret_access_key => @secret_access_key,
    :region => @region_endpoint
  )

  @s3bucket = s3.buckets[@bucket]

  unless @backup_to_bucket.nil?
    @backup_bucket = s3.buckets[@backup_to_bucket]
    unless @backup_bucket.exists?
      s3.buckets.create(@backup_to_bucket)
    end
  end

  unless @backup_to_dir.nil?
    Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir)
  end

end
run(queue) click to toggle source
# File lib/logstash/inputs/s3.rb, line 135
def run(queue)
  loop do
    process_new(queue)
    sleep(@interval)
  end
  finished
end

Private Instance Methods

list_new(since=nil) click to toggle source
# File lib/logstash/inputs/s3.rb, line 161
def list_new(since=nil)

  if since.nil?
    since = Time.new(0)
  end

  objects = {}
  @s3bucket.objects.with_prefix(@prefix).each do |log|
    if log.last_modified > since
      objects[log.key] = log.last_modified
    end
  end

  return sorted_objects = objects.keys.sort {|a,b| objects[a] <=> objects[b]}

end
process_line(queue, metadata, line) click to toggle source
# File lib/logstash/inputs/s3.rb, line 229
def process_line(queue, metadata, line)

  if /#Version: .+/.match(line)
    junk, version = line.strip().split(/#Version: (.+)/)
    unless version.nil?
      metadata[:version] = version
    end
  elsif /#Fields: .+/.match(line)
    junk, format = line.strip().split(/#Fields: (.+)/)
    unless format.nil?
      metadata[:format] = format
    end
  else
    @codec.decode(line) do |event|
      decorate(event)
      unless metadata[:version].nil?
        event["cloudfront_version"] = metadata[:version]
      end
      unless metadata[:format].nil?
        event["cloudfront_fields"] = metadata[:format]
      end
      queue << event
    end
  end
  return metadata

end
process_local_log(queue, filename) click to toggle source
# File lib/logstash/inputs/s3.rb, line 207
def process_local_log(queue, filename)

  metadata = {
    :version => nil,
    :format => nil,
  }
  File.open(filename) do |file|
    if filename.end_with?('.gz')
      gz = Zlib::GzipReader.new(file)
      gz.each_line do |line|
        metadata = process_line(queue, metadata, line)
      end
    else
      file.each do |line|
        metadata = process_line(queue, metadata, line)
      end
    end
  end

end
process_log(queue, key) click to toggle source
# File lib/logstash/inputs/s3.rb, line 179
def process_log(queue, key)

  object = @s3bucket.objects[key]
  tmp = Dir.mktmpdir("logstash-")
  begin
    filename = File.join(tmp, File.basename(key))
    File.open(filename, 'wb') do |s3file|
      object.read do |chunk|
        s3file.write(chunk)
      end
    end
    process_local_log(queue, filename)
    unless @backup_to_bucket.nil?
      backup_object = @backup_bucket.objects[key]
      backup_object.write(Pathname.new(filename))
    end
    unless @backup_to_dir.nil?
      FileUtils.cp(filename, @backup_to_dir)
    end
    if @delete
      object.delete()
    end
  end
  FileUtils.remove_entry_secure(tmp, force=true)

end
process_new(queue, since=nil) click to toggle source
# File lib/logstash/inputs/s3.rb, line 144
def process_new(queue, since=nil)

  if since.nil?
      since = sincedb_read()
  end

  objects = list_new(since)
  objects.each do |k|
    @logger.debug("S3 input processing", :bucket => @bucket, :key => k)
    lastmod = @s3bucket.objects[k].last_modified
    process_log(queue, k)
    sincedb_write(lastmod)
  end

end
sincedb_read() click to toggle source
# File lib/logstash/inputs/s3.rb, line 258
def sincedb_read()

  if File.exists?(@sincedb_path)
    since = Time.parse(File.read(@sincedb_path).chomp.strip)
  else
    since = Time.new(0)
  end
  return since

end
sincedb_write(since=nil) click to toggle source
# File lib/logstash/inputs/s3.rb, line 270
def sincedb_write(since=nil)

  if since.nil?
    since = Time.now()
  end
  File.open(@sincedb_path, 'w') { |file| file.write(since.to_s) }

end