class Fluent::Plugin::S3Output::ParquetCompressor
Public Instance Methods
compress(chunk, tmp)
click to toggle source
# File lib/fluent/plugin/s3_compressor_parquet.rb, line 47 def compress(chunk, tmp) chunk_is_file = @buffer_type == "file" path = if chunk_is_file chunk.path else w = Tempfile.new("chunk-parquet-tmp") w.binmode chunk.write_to(w) w.close w.path end stdout, stderr, status = columnify(path, tmp.path) unless status.success? raise Fluent::UnrecoverableError, "failed to execute columnify command. stdout=#{stdout} stderr=#{stderr} status=#{status.inspect}" end ensure unless chunk_is_file w.close(true) rescue nil end end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::S3Output::Compressor#configure
# File lib/fluent/plugin/s3_compressor_parquet.rb, line 23 def configure(conf) super check_command("columnify", "-h") if [:lzo, :brotli, :lz4].include?(@compress.parquet_compression_codec) raise Fluent::ConfigError, "unsupported compression codec: #{@compress.parquet_compression_codec}" end @parquet_compression_codec = @compress.parquet_compression_codec.to_s.upcase if @compress.record_type == :json @record_type = :jsonl else @record_type = @compress.record_type end end
content_type()
click to toggle source
# File lib/fluent/plugin/s3_compressor_parquet.rb, line 43 def content_type "application/octet-stream".freeze end
ext()
click to toggle source
# File lib/fluent/plugin/s3_compressor_parquet.rb, line 39 def ext "parquet".freeze end
Private Instance Methods
columnify(src_path, dst_path)
click to toggle source
# File lib/fluent/plugin/s3_compressor_parquet.rb, line 70 def columnify(src_path, dst_path) Open3.capture3("columnify", "-parquetCompressionCodec", @parquet_compression_codec, "-parquetPageSize", @compress.parquet_page_size.to_s, "-parquetRowGroupSize", @compress.parquet_row_group_size.to_s, "-recordType", @record_type.to_s, "-schemaType", @compress.schema_type.to_s, "-schemaFile", @compress.schema_file, "-output", dst_path, src_path) end