class Fluent::Plugin::S3Input
Constants
- DEFAULT_PARSE_TYPE
- EXTRACTOR_REGISTRY
Attributes
Public Class Methods
Source
# File lib/fluent/plugin/in_s3.rb, line 18 def initialize super @extractor = nil end
Calls superclass method
Source
# File lib/fluent/plugin/in_s3.rb, line 457 def self.register_extractor(name, extractor) EXTRACTOR_REGISTRY.register(name, extractor) end
Public Instance Methods
Source
# File lib/fluent/plugin/in_s3.rb, line 131 def configure(conf) super if reject_s3_endpoint? raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" end if @sqs.endpoint && (@sqs.endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @sqs.endpoint.include?(e) }) raise Fluent::ConfigError, "sqs/endpoint parameter is not supported for SQS, use s3_region instead. This parameter is for SQS compatible services" end parser_config = conf.elements("parse").first unless @sqs.queue_name raise Fluent::ConfigError, "sqs/queue_name is required" end if !!@aws_key_id ^ !!@aws_sec_key raise Fluent::ConfigError, "aws_key_id or aws_sec_key is missing" end if !!@sqs.aws_key_id ^ !!@sqs.aws_sec_key raise Fluent::ConfigError, "sqs/aws_key_id or sqs/aws_sec_key is missing" end Aws.use_bundled_cert! if @use_bundled_cert @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log) @extractor.configure(conf) @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE) end
Calls superclass method
Source
# File lib/fluent/plugin/in_s3.rb, line 163 def multi_workers_ready? true end
Source
# File lib/fluent/plugin/in_s3.rb, line 126 def reject_s3_endpoint? @s3_endpoint && !@s3_endpoint.end_with?('vpce.amazonaws.com') && @s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) } end
Source
# File lib/fluent/plugin/in_s3.rb, line 191 def shutdown @running = false super end
Calls superclass method
Source
# File lib/fluent/plugin/in_s3.rb, line 167 def start super s3_client = create_s3_client log.debug("Succeeded to create S3 client") @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) raise "#{@bucket.name} is not found." unless @bucket.exists? check_apikeys if @check_apikey_on_start sqs_client = create_sqs_client log.debug("Succeeded to create SQS client") response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id) sqs_queue_url = response.queue_url log.debug("Succeeded to get SQS queue URL") @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client) @running = true thread_create(:in_s3, &method(:run)) end
Calls superclass method
Private Instance Methods
Source
# File lib/fluent/plugin/in_s3.rb, line 332 def check_apikeys @bucket.objects.first log.debug("Succeeded to verify API keys") rescue => e raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}" end
Source
# File lib/fluent/plugin/in_s3.rb, line 301 def create_s3_client options = setup_credentials options[:region] = @s3_region if @s3_region options[:endpoint] = @s3_endpoint if @s3_endpoint options[:force_path_style] = @force_path_style options[:http_proxy] = @proxy_uri if @proxy_uri log.on_trace do options[:http_wire_trace] = true options[:logger] = log end Aws::S3::Client.new(options) end
Source
# File lib/fluent/plugin/in_s3.rb, line 315 def create_sqs_client options = setup_credentials options[:region] = @s3_region if @s3_region options[:endpoint] = @sqs.endpoint if @sqs.endpoint options[:http_proxy] = @proxy_uri if @proxy_uri if @sqs.aws_key_id && @sqs.aws_sec_key options[:access_key_id] = @sqs.aws_key_id options[:secret_access_key] = @sqs.aws_sec_key end log.on_trace do options[:http_wire_trace] = true options[:logger] = log end Aws::SQS::Client.new(options) end
Source
# File lib/fluent/plugin/in_s3.rb, line 240 def get_raw_key(body) if @sqs.event_bridge_mode body["detail"]["object"]["key"] else body["Records"].first["s3"]["object"]["key"] end end
Source
# File lib/fluent/plugin/in_s3.rb, line 230 def is_valid_queue(body) if @sqs.event_bridge_mode log.debug("checking for eventbridge property") !!body["detail"] else log.debug("checking for Records property") !!body["Records"] end end
Source
# File lib/fluent/plugin/in_s3.rb, line 339 def process(body) raw_key = get_raw_key(body) key = CGI.unescape(raw_key) io = @bucket.object(key).get.body content = @extractor.extract(io) es = Fluent::MultiEventStream.new content.each_line do |line| @parser.parse(line) do |time, record| if @add_object_metadata record['s3_bucket'] = @s3_bucket record['s3_key'] = raw_key end es.add(time, record) end end router.emit_stream(@tag, es) end
Source
# File lib/fluent/plugin/in_s3.rb, line 198 def run options = {} options[:wait_time_seconds] = @sqs.wait_time_seconds options[:skip_delete] = @sqs.skip_delete @poller.before_request do |stats| throw :stop_polling unless @running end begin @poller.poll(options) do |message| begin body = Yajl.load(message.body) log.debug(body) next unless is_valid_queue(body) # skip test queue if @match_regexp raw_key = get_raw_key(body) key = CGI.unescape(raw_key) next unless @match_regexp.match?(key) end process(body) rescue => e log.warn(error: e) log.warn_backtrace(e.backtrace) throw :skip_delete end end rescue => e log.warn("SQS Polling Failed. Retry in #{@sqs.retry_error_interval} seconds", error: e) sleep(@sqs.retry_error_interval) retry end end
Source
# File lib/fluent/plugin/in_s3.rb, line 248 def setup_credentials options = {} credentials_options = {} case when @aws_key_id && @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key when @assume_role_credentials c = @assume_role_credentials credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds credentials_options[:external_id] = c.external_id if c.external_id if @s3_region credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region) end options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) when @web_identity_credentials c = @web_identity_credentials credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:web_identity_token_file] = c.web_identity_token_file credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds if @s3_region credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region) end options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options) when @instance_profile_credentials c = @instance_profile_credentials credentials_options[:retries] = c.retries if c.retries credentials_options[:ip_address] = c.ip_address if c.ip_address credentials_options[:port] = c.port if c.port credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new(credentials_options) else options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) end when @shared_credentials c = @shared_credentials credentials_options[:path] = c.path if c.path credentials_options[:profile_name] = c.profile_name if c.profile_name options[:credentials] = Aws::SharedCredentials.new(credentials_options) else # Use default credentials # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html end options end