class LogStash::Inputs::S3
Stream events from files from a S3
bucket.
Each line from each file generates an event. Files ending in '.gz' are handled as gzip'ed files.
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/s3.rb, line 63 def register require "digest/md5" require "aws-sdk" @region_endpoint = @region if !@region.empty? @logger.info("Registering s3 input", :bucket => @bucket, :region_endpoint => @region_endpoint) if @credentials.nil? @access_key_id = ENV['AWS_ACCESS_KEY_ID'] @secret_access_key = ENV['AWS_SECRET_ACCESS_KEY'] elsif @credentials.is_a? Array if @credentials.length ==1 File.open(@credentials[0]) { |f| f.each do |line| unless (/^\#/.match(line)) if(/\s*=\s*/.match(line)) param, value = line.split('=', 2) param = param.chomp().strip() value = value.chomp().strip() if param.eql?('AWS_ACCESS_KEY_ID') @access_key_id = value elsif param.eql?('AWS_SECRET_ACCESS_KEY') @secret_access_key = value end end end end } elsif @credentials.length == 2 @access_key_id = @credentials[0] @secret_access_key = @credentials[1] else raise ArgumentError.new('Credentials must be of the form "/path/to/file" or ["id", "secret"]') end end if @access_key_id.nil? or @secret_access_key.nil? raise ArgumentError.new('Missing AWS credentials') end if @bucket.nil? raise ArgumentError.new('Missing AWS bucket') end if @sincedb_path.nil? if ENV['HOME'].nil? raise ArgumentError.new('No HOME or sincedb_path set') end @sincedb_path = File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}")) end s3 = AWS::S3.new( :access_key_id => @access_key_id, :secret_access_key => @secret_access_key, :region => @region_endpoint ) @s3bucket = s3.buckets[@bucket] unless @backup_to_bucket.nil? @backup_bucket = s3.buckets[@backup_to_bucket] unless @backup_bucket.exists? s3.buckets.create(@backup_to_bucket) end end unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end end
run(queue)
click to toggle source
# File lib/logstash/inputs/s3.rb, line 135 def run(queue) loop do process_new(queue) sleep(@interval) end finished end
Private Instance Methods
list_new(since=nil)
click to toggle source
# File lib/logstash/inputs/s3.rb, line 161 def list_new(since=nil) if since.nil? since = Time.new(0) end objects = {} @s3bucket.objects.with_prefix(@prefix).each do |log| if log.last_modified > since objects[log.key] = log.last_modified end end return sorted_objects = objects.keys.sort {|a,b| objects[a] <=> objects[b]} end
process_line(queue, metadata, line)
click to toggle source
# File lib/logstash/inputs/s3.rb, line 229 def process_line(queue, metadata, line) if /#Version: .+/.match(line) junk, version = line.strip().split(/#Version: (.+)/) unless version.nil? metadata[:version] = version end elsif /#Fields: .+/.match(line) junk, format = line.strip().split(/#Fields: (.+)/) unless format.nil? metadata[:format] = format end else @codec.decode(line) do |event| decorate(event) unless metadata[:version].nil? event["cloudfront_version"] = metadata[:version] end unless metadata[:format].nil? event["cloudfront_fields"] = metadata[:format] end queue << event end end return metadata end
process_local_log(queue, filename)
click to toggle source
# File lib/logstash/inputs/s3.rb, line 207 def process_local_log(queue, filename) metadata = { :version => nil, :format => nil, } File.open(filename) do |file| if filename.end_with?('.gz') gz = Zlib::GzipReader.new(file) gz.each_line do |line| metadata = process_line(queue, metadata, line) end else file.each do |line| metadata = process_line(queue, metadata, line) end end end end
process_log(queue, key)
click to toggle source
# File lib/logstash/inputs/s3.rb, line 179 def process_log(queue, key) object = @s3bucket.objects[key] tmp = Dir.mktmpdir("logstash-") begin filename = File.join(tmp, File.basename(key)) File.open(filename, 'wb') do |s3file| object.read do |chunk| s3file.write(chunk) end end process_local_log(queue, filename) unless @backup_to_bucket.nil? backup_object = @backup_bucket.objects[key] backup_object.write(Pathname.new(filename)) end unless @backup_to_dir.nil? FileUtils.cp(filename, @backup_to_dir) end if @delete object.delete() end end FileUtils.remove_entry_secure(tmp, force=true) end
process_new(queue, since=nil)
click to toggle source
# File lib/logstash/inputs/s3.rb, line 144 def process_new(queue, since=nil) if since.nil? since = sincedb_read() end objects = list_new(since) objects.each do |k| @logger.debug("S3 input processing", :bucket => @bucket, :key => k) lastmod = @s3bucket.objects[k].last_modified process_log(queue, k) sincedb_write(lastmod) end end
sincedb_read()
click to toggle source
# File lib/logstash/inputs/s3.rb, line 258 def sincedb_read() if File.exists?(@sincedb_path) since = Time.parse(File.read(@sincedb_path).chomp.strip) else since = Time.new(0) end return since end
sincedb_write(since=nil)
click to toggle source
# File lib/logstash/inputs/s3.rb, line 270 def sincedb_write(since=nil) if since.nil? since = Time.now() end File.open(@sincedb_path, 'w') { |file| file.write(since.to_s) } end