class S3::Client::API::Storage::Import

Public Class Methods

new(db_name, tbl_name, file_paths, options = {}, &block) click to toggle source
# File lib/s3/client/api/storage.rb, line 123
def initialize(db_name, tbl_name, file_paths, options = {}, &block)
  @db_name = db_name
  @tbl_naem = tbl_name
  @file_paths = file_paths
  @jobs = options.delete(:jobs) || 1
  @label = options.delete(:label) || 'label'
  @splitsz = options.delete(:splitsz) || 100 * 1024 ** 2 #100M
  @api = block[]

  import_parameter = ImportParameter.instance
  import_parameter.db_name = db_name
  import_parameter.tbl_name = tbl_name
  import_parameter.label = @label

  if %w(_ .).include? @label[0]
    raise S3::Client::ParameterInvalid.new("label should not start with '_' or '.'")
  end

  STDERR.puts "Initialize...\njobs: #{@jobs}, splitsz: #{@splitsz}"
end

Public Instance Methods

calc_label_suffix() click to toggle source
# File lib/s3/client/api/storage.rb, line 144
def calc_label_suffix
  prefix = ImportParameter.instance.storage_prefix
  xml_doc = @api.objects(@db_name, prefix: prefix)
  objects_result = S3::Concerns::ObjectsResult.new(xml_doc)
  objects = objects_result.objects

  return 0 if objects.blank?

  objects.map { |o| o.scan(/#{@label}_(\d+)/) }.flatten.map(&:to_i).sort.reverse.first.try(:+, 1)
end
execute(suffix) click to toggle source
# File lib/s3/client/api/storage.rb, line 155
def execute(suffix)
  file_paths = @file_paths.is_a?(String) ? [@file_paths] : @file_paths

  upload_objects = []
  file_paths.each do |file_path|
    file_index = if file_path.end_with?('.gz')
                    import_gz_file(file_path, suffix, upload_objects)
                  elsif file_path == "-"
                    import_stream($stdin, suffix, upload_objects)
                  else
                    import_text_file(file_path, suffix, upload_objects)
                  end

    suffix += file_index
  end

  return upload_objects
end
execute_storage_detail(data, suffix) click to toggle source
# File lib/s3/client/api/storage.rb, line 225
def execute_storage_detail(data, suffix)
  str = StringIO.new
  gz = Zlib::GzipWriter.new(str)
  gz.write data
  gz.close

  options = {
      content_type: 'application/x-gzip',
      bucket: @db_name,
      import: true
  }

  resource = ImportParameter.instance.url(suffix)
  @api.execute_storage(RestParameter.new(:put, resource, options)) do
    str.string
  end
end
import_gz_file(file_path, suffix, upload_objects) click to toggle source
# File lib/s3/client/api/storage.rb, line 174
def import_gz_file(file_path, suffix, upload_objects)
  import_stream(Zlib::GzipReader.open(file_path), suffix, upload_objects)
rescue Zlib::Error
  #if not gzip
  import_text_file(file_path, suffix, upload_objects)
end
import_stream(ifp, suffix, upload_objects) click to toggle source
# File lib/s3/client/api/storage.rb, line 185
def import_stream(ifp, suffix, upload_objects)
  q = SizedQueue.new(@jobs)
  th = Array.new(@jobs) {
    Thread.new{
      while data = q.pop
        break unless data
        STDERR.puts "> starting upload part #{data[2]}, #{data[1].length}"
        execute_storage_detail(data[1], suffix + data[0])
        STDERR.puts "< finished upload part #{data[2]}, #{data[1].length}"
        upload_objects << ImportParameter.instance.object_label(suffix + data[0])
      end
      q.push nil
    }
  }

  begin
    file_index = 0
    import_index = ImportParameter.instance.index
    while true
      buffer = ifp.read(@splitsz)
      break unless buffer
      buffer.force_encoding("ASCII-8BIT")
      nline = ifp.gets
      if nline
        nline.force_encoding("ASCII-8BIT")
        buffer.concat(nline)
      end
      q.push [file_index, buffer, import_index]
      file_index += 1
      import_index += 1
    end
    q.push nil
  end

  th.map(&:join)
  ifp.close

  file_index
end
import_text_file(file_path, suffix, upload_objects) click to toggle source
# File lib/s3/client/api/storage.rb, line 181
def import_text_file(file_path, suffix, upload_objects)
  import_stream(File.open(file_path), suffix, upload_objects)
end