class S3::Client::API::Storage::MultipartUpload

Public Class Methods

new(bucket, object, options = {}, &block) click to toggle source
# File lib/s3/client/api/storage.rb, line 271
def initialize(bucket, object, options = {}, &block)
  type = MIME::Types.type_for(object).first
  content_type = type ? type.to_s : 'application/octet-stream'
  options = options.merge(bucket: bucket, content_type: content_type)

  @bucket = bucket
  @object = object
  @splitsz = options.delete(:splitsz) || 100 * 1024 ** 2 #100MB
  @jobs = options.delete(:jobs) || 1
  @options = options
  @api = block[]
end

Public Instance Methods

abort_multipart_upload(upload_id) click to toggle source
# File lib/s3/client/api/storage.rb, line 314
def abort_multipart_upload(upload_id)
  resource = "/#{@object}?uploadId=#{upload_id}"
  @api.execute_storage(RestParameter.new(:delete, resource, @options))
end
complete_multipart_upload(upload_id, upload_objects) click to toggle source
# File lib/s3/client/api/storage.rb, line 298
def complete_multipart_upload(upload_id, upload_objects)
  resource = "/#{@object}?uploadId=#{upload_id}"

  payload = '<CompleteMultipartUpload>'
  upload_objects.each do |part, etag|
    payload += "<Part><PartNumber>#{part}</PartNumber><ETag>#{etag}</ETag></Part>"
  end
  payload += '</CompleteMultipartUpload>'

  @api.execute_storage(RestParameter.new(:post, resource, @options)) do
    payload
  end

  puts "complete multipart upload."
end
initiate_multipart_upload() click to toggle source
# File lib/s3/client/api/storage.rb, line 284
def initiate_multipart_upload
  STDERR.puts "Initiate multipart upload...\njobs:#{@jobs}, splitsz:#{@splitsz}"
  resource = "/#{@object}?uploads"
  response = @api.execute_storage(RestParameter.new(:post, resource, @options))
  upload_id = response.elements['InitiateMultipartUploadResult/UploadId'].text
  return upload_id
end
upload_part(upload_id, &block) click to toggle source
# File lib/s3/client/api/storage.rb, line 292
def upload_part(upload_id, &block)
  upload_objects = {}
  split_stream(upload_id, upload_objects, &block)
  return Hash[upload_objects.sort]
end

Private Instance Methods

split_stream(upload_id, upload_objects, &block) click to toggle source
# File lib/s3/client/api/storage.rb, line 321
def split_stream(upload_id, upload_objects, &block)
  limit = 5 * 1024 ** 2 #5MB
  raise "split size is invalid. below lower limit of #{limit} byte" if @splitsz < limit

  ifp = block[]

  q = SizedQueue.new(@jobs)
  th = Array.new(@jobs) {
    Thread.new{
      while data = q.pop
        break unless data
        puts "> starting upload part #{data[0]}, #{data[1].length}"
        resource = "/#{@object}?partNumber=#{data[0]}&uploadId=#{upload_id}"
        response = @api.execute_storage(RestParameter.new(:put, resource, @options)) do
          data[1]
        end
        puts "< finished upload part #{data[0]}, #{data[1].length}"
        upload_objects[data[0]] = response.headers['ETag'].first
      end
      q.push nil
    }
  }

  begin
    file_index = 1
    while true
      buffer = ifp.read(@splitsz)
      break unless buffer
      buffer.force_encoding("ASCII-8BIT")

      q.push [file_index, buffer]
      file_index += 1
    end
    q.push nil
  end

  th.map(&:join)
  puts "finished upload #{file_index-1} part objects."
end