class Fluent::Plugin::S3Output
Constants
- COMPRESSOR_REGISTRY
- DEFAULT_FORMAT_TYPE
- MAX_HEX_RANDOM_LENGTH
Attributes
Public Class Methods
Source
# File lib/fluent/plugin/out_s3.rb, line 16 def initialize super @compressor = nil @uuid_flush_enabled = false end
Calls superclass method
Source
# File lib/fluent/plugin/out_s3.rb, line 686 def self.register_compressor(name, compressor) COMPRESSOR_REGISTRY.register(name, compressor) end
Public Instance Methods
Source
# File lib/fluent/plugin/out_s3.rb, line 193 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter, :inject) super Aws.use_bundled_cert! if @use_bundled_cert if reject_s3_endpoint? raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" end begin buffer_type = @buffer_config[:@type] @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log) rescue => e log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}" @compressor = TextCompressor.new end @compressor.configure(conf) @formatter = formatter_create if @hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}" end unless @index_format =~ /^%(0\d*)?[dxX]$/ raise Fluent::ConfigError, "index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types" end if @reduced_redundancy log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead" @storage_class = "REDUCED_REDUNDANCY" end @s3_object_key_format = process_s3_object_key_format if !@check_object if conf.has_key?('s3_object_key_format') log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten." else log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{time_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format" @s3_object_key_format = "%{path}/%{time_slice}_%{hms_slice}.%{file_extension}" end end check_s3_path_safety(conf) # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] @values_for_s3_object_chunk = {} @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end
Calls superclass method
Source
# File lib/fluent/plugin/out_s3.rb, line 280 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end
Source
# File lib/fluent/plugin/out_s3.rb, line 247 def multi_workers_ready? true end
Source
# File lib/fluent/plugin/out_s3.rb, line 188 def reject_s3_endpoint? @s3_endpoint && !@s3_endpoint.end_with?('vpce.amazonaws.com') && @s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) } end
Source
# File lib/fluent/plugin/out_s3.rb, line 251 def start options = setup_credentials options[:region] = @s3_region if @s3_region options[:endpoint] = @s3_endpoint if @s3_endpoint options[:use_accelerate_endpoint] = @enable_transfer_acceleration options[:use_dualstack_endpoint] = @enable_dual_stack options[:http_proxy] = @proxy_uri if @proxy_uri options[:force_path_style] = @force_path_style options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil? options[:signature_version] = @signature_version unless @signature_version.nil? options[:ssl_verify_peer] = @ssl_verify_peer options[:ssl_ca_bundle] = @ssl_ca_bundle if @ssl_ca_bundle options[:ssl_ca_directory] = @ssl_ca_directory if @ssl_ca_directory log.on_trace do options[:http_wire_trace] = true options[:logger] = log end s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) check_apikeys if @check_apikey_on_start ensure_bucket if @check_bucket ensure_bucket_lifecycle super end
Calls superclass method
Source
# File lib/fluent/plugin/out_s3.rb, line 285 def write(chunk) i = 0 metadata = chunk.metadata previous_path = nil time_slice = if metadata.timekey.nil? ''.freeze else @time_slice_with_tz.call(metadata.timekey) end if @check_object begin @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } values_for_s3_object_key_pre = { "%{path}" => @path, "%{file_extension}" => @compressor.ext, } values_for_s3_object_key_post = { "%{time_slice}" => time_slice, "%{index}" => sprintf(@index_format,i), }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key| values_for_s3_object_key_pre.fetch(matched_key, matched_key) end s3path = extract_placeholders(s3path, chunk) s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post) if (i > 0) && (s3path == previous_path) if @overwrite log.warn "#{s3path} already exists, but will overwrite" break else raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}" end end i += 1 previous_path = s3path end while @bucket.object(s3path).exists? else if @localtime hms_slicer = Time.now.strftime("%H%M%S") else hms_slicer = Time.now.utc.strftime("%H%M%S") end @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } values_for_s3_object_key_pre = { "%{path}" => @path, "%{file_extension}" => @compressor.ext, } values_for_s3_object_key_post = { "%{date_slice}" => time_slice, # For backward compatibility "%{time_slice}" => time_slice, "%{hms_slice}" => hms_slicer, }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key| values_for_s3_object_key_pre.fetch(matched_key, matched_key) end s3path = extract_placeholders(s3path, chunk) s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post) end tmp = Tempfile.new("s3-") tmp.binmode begin @compressor.compress(chunk, tmp) tmp.rewind log.debug "out_s3: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to s3://#{@s3_bucket}/#{s3path}" put_options = { body: tmp, content_type: @compressor.content_type, storage_class: @storage_class, } put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5 put_options[:acl] = @acl if @acl put_options[:grant_full_control] = @grant_full_control if @grant_full_control put_options[:grant_read] = @grant_read if @grant_read put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp put_options[:checksum_algorithm] = @checksum_algorithm if @checksum_algorithm put_options[:tagging] = @tagging if @tagging if @s3_metadata put_options[:metadata] = {} @s3_metadata.each do |k, v| put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)}) end end @bucket.object(s3path).put(put_options) log.debug "out_s3: completed to write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to s3://#{@s3_bucket}/#{s3path}" @values_for_s3_object_chunk.delete(chunk.unique_id) if @warn_for_delay if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" end end ensure begin tmp.close(true) rescue => e log.info "out_s3: Tempfile#close caused unexpected error", error: e end end end
Private Instance Methods
Source
# File lib/fluent/plugin/out_s3.rb, line 492 def check_apikeys @bucket.objects(prefix: @path, :max_keys => 1).first rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}" end
Source
# File lib/fluent/plugin/out_s3.rb, line 481 def check_s3_path_safety(conf) unless conf.has_key?('s3_object_key_format') log.warn "The default value of s3_object_key_format will use ${chunk_id} instead of %{index} to avoid object conflict in v2" end is_working_on_parallel = @buffer_config.flush_thread_count > 1 || system_config.workers > 1 if is_working_on_parallel && ['${chunk_id}', '%{uuid_flush}', '%{hex_random}'].none? { |key| @s3_object_key_format.include?(key) } log.warn "No ${chunk_id}, %{uuid_flush} or %{hex_random} in s3_object_key_format with multiple flush threads or multiple workers. Recommend to set ${chunk_id}, %{uuid_flush} or %{hex_random} to avoid data lost by object conflict" end end
Source
# File lib/fluent/plugin/out_s3.rb, line 429 def ensure_bucket if !@bucket.exists? if @auto_create_bucket log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" @s3.create_bucket(bucket: @s3_bucket) else raise "The specified bucket does not exist: bucket = #{@s3_bucket}" end end end
Source
# File lib/fluent/plugin/out_s3.rb, line 440 def ensure_bucket_lifecycle unless @bucket_lifecycle_rules.empty? old_rules = get_bucket_lifecycle_rules new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule| { id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" } end unless old_rules == new_rules log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}" @bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } }) end end end
Source
# File lib/fluent/plugin/out_s3.rb, line 454 def get_bucket_lifecycle_rules begin @bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule| { id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] } end rescue Aws::S3::Errors::NoSuchLifecycleConfiguration [] end end
Source
# File lib/fluent/plugin/out_s3.rb, line 408 def hex_random(chunk) unique_hex = Fluent::UniqueId.hex(chunk.unique_id) unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness unique_hex[0...@hex_random_length] end
Source
# File lib/fluent/plugin/out_s3.rb, line 464 def process_s3_object_key_format %W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph| if @s3_object_key_format.include?(ph) raise Fluent::ConfigError, %!#{ph} placeholder in s3_object_key_format is removed! end } if @s3_object_key_format.include?('%{uuid_flush}') @uuid_flush_enabled = true end @s3_object_key_format.gsub('%{hostname}') { |expr| log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead" Socket.gethostname } end
Source
# File lib/fluent/plugin/out_s3.rb, line 500 def setup_credentials options = {} credentials_options = {} case when @assume_role_credentials c = @assume_role_credentials iam_user_credentials = @aws_key_id && @aws_sec_key ? Aws::Credentials.new(@aws_key_id, @aws_sec_key) : nil region = c.sts_region || @s3_region credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds credentials_options[:external_id] = c.external_id if c.external_id credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url credentials_options[:sts_http_proxy] = c.sts_http_proxy if c.sts_http_proxy if c.sts_http_proxy && c.sts_endpoint_url credentials_options[:client] = if iam_user_credentials Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url, credentials: iam_user_credentials) else Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url) end elsif c.sts_http_proxy credentials_options[:client] = if iam_user_credentials Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy, credentials: iam_user_credentials) else Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy) end elsif c.sts_endpoint_url credentials_options[:client] = if iam_user_credentials Aws::STS::Client.new(region: region, endpoint: c.sts_endpoint_url, credentials: iam_user_credentials) else Aws::STS::Client.new(region: region, endpoint: c.sts_endpoint_url) end else credentials_options[:client] = if iam_user_credentials Aws::STS::Client.new(region: region, credentials: iam_user_credentials) else Aws::STS::Client.new(region: region) end end options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) when @aws_key_id && @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key when @web_identity_credentials c = @web_identity_credentials region = c.sts_region || @s3_region credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:web_identity_token_file] = c.web_identity_token_file credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url credentials_options[:sts_http_proxy] = c.sts_http_proxy if c.sts_http_proxy if c.sts_http_proxy && c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url) elsif c.sts_http_proxy credentials_options[:client] = Aws::STS::Client.new(region: region, http_proxy: c.sts_http_proxy) elsif c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(region: region, endpoint: c.sts_endpoint_url) else credentials_options[:client] = Aws::STS::Client.new(region: region) end options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options) when @instance_profile_credentials c = @instance_profile_credentials credentials_options[:retries] = c.retries if c.retries credentials_options[:ip_address] = c.ip_address if c.ip_address credentials_options[:port] = c.port if c.port credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new(credentials_options) else options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) end when @shared_credentials c = @shared_credentials credentials_options[:path] = c.path if c.path credentials_options[:profile_name] = c.profile_name if c.profile_name options[:credentials] = Aws::SharedCredentials.new(credentials_options) when @aws_iam_retries log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead") credentials_options[:retries] = @aws_iam_retries if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new(credentials_options) else options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) end else # Use default credentials # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html end options end
Source
# File lib/fluent/plugin/out_s3.rb, line 419 def timekey_to_timeformat(timekey) case timekey when nil then '' when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive when 60...3600 then '%Y%m%d%H%M' when 3600...86400 then '%Y%m%d%H' else '%Y%m%d' end end
This is stolen from Fluentd
Source
# File lib/fluent/plugin/out_s3.rb, line 414 def uuid_random SecureRandom.uuid end