class S3::Client::API::Storage::Import
Public Class Methods
new(db_name, tbl_name, file_paths, options = {}, &block)
click to toggle source
# File lib/s3/client/api/storage.rb, line 123 def initialize(db_name, tbl_name, file_paths, options = {}, &block) @db_name = db_name @tbl_naem = tbl_name @file_paths = file_paths @jobs = options.delete(:jobs) || 1 @label = options.delete(:label) || 'label' @splitsz = options.delete(:splitsz) || 100 * 1024 ** 2 #100M @api = block[] import_parameter = ImportParameter.instance import_parameter.db_name = db_name import_parameter.tbl_name = tbl_name import_parameter.label = @label if %w(_ .).include? @label[0] raise S3::Client::ParameterInvalid.new("label should not start with '_' or '.'") end STDERR.puts "Initialize...\njobs: #{@jobs}, splitsz: #{@splitsz}" end
Public Instance Methods
calc_label_suffix()
click to toggle source
# File lib/s3/client/api/storage.rb, line 144 def calc_label_suffix prefix = ImportParameter.instance.storage_prefix xml_doc = @api.objects(@db_name, prefix: prefix) objects_result = S3::Concerns::ObjectsResult.new(xml_doc) objects = objects_result.objects return 0 if objects.blank? objects.map { |o| o.scan(/#{@label}_(\d+)/) }.flatten.map(&:to_i).sort.reverse.first.try(:+, 1) end
execute(suffix)
click to toggle source
# File lib/s3/client/api/storage.rb, line 155 def execute(suffix) file_paths = @file_paths.is_a?(String) ? [@file_paths] : @file_paths upload_objects = [] file_paths.each do |file_path| file_index = if file_path.end_with?('.gz') import_gz_file(file_path, suffix, upload_objects) elsif file_path == "-" import_stream($stdin, suffix, upload_objects) else import_text_file(file_path, suffix, upload_objects) end suffix += file_index end return upload_objects end
execute_storage_detail(data, suffix)
click to toggle source
# File lib/s3/client/api/storage.rb, line 225 def execute_storage_detail(data, suffix) str = StringIO.new gz = Zlib::GzipWriter.new(str) gz.write data gz.close options = { content_type: 'application/x-gzip', bucket: @db_name, import: true } resource = ImportParameter.instance.url(suffix) @api.execute_storage(RestParameter.new(:put, resource, options)) do str.string end end
import_gz_file(file_path, suffix, upload_objects)
click to toggle source
# File lib/s3/client/api/storage.rb, line 174 def import_gz_file(file_path, suffix, upload_objects) import_stream(Zlib::GzipReader.open(file_path), suffix, upload_objects) rescue Zlib::Error #if not gzip import_text_file(file_path, suffix, upload_objects) end
import_stream(ifp, suffix, upload_objects)
click to toggle source
# File lib/s3/client/api/storage.rb, line 185 def import_stream(ifp, suffix, upload_objects) q = SizedQueue.new(@jobs) th = Array.new(@jobs) { Thread.new{ while data = q.pop break unless data STDERR.puts "> starting upload part #{data[2]}, #{data[1].length}" execute_storage_detail(data[1], suffix + data[0]) STDERR.puts "< finished upload part #{data[2]}, #{data[1].length}" upload_objects << ImportParameter.instance.object_label(suffix + data[0]) end q.push nil } } begin file_index = 0 import_index = ImportParameter.instance.index while true buffer = ifp.read(@splitsz) break unless buffer buffer.force_encoding("ASCII-8BIT") nline = ifp.gets if nline nline.force_encoding("ASCII-8BIT") buffer.concat(nline) end q.push [file_index, buffer, import_index] file_index += 1 import_index += 1 end q.push nil end th.map(&:join) ifp.close file_index end
import_text_file(file_path, suffix, upload_objects)
click to toggle source
# File lib/s3/client/api/storage.rb, line 181 def import_text_file(file_path, suffix, upload_objects) import_stream(File.open(file_path), suffix, upload_objects) end