class BigShift::CloudStorageTransfer
Constants
- DEFAULT_POLL_INTERVAL
Public Class Methods
new(storage_transfer_service, project_id, aws_credentials, options={})
click to toggle source
# File lib/bigshift/cloud_storage_transfer.rb, line 3 def initialize(storage_transfer_service, project_id, aws_credentials, options={}) @storage_transfer_service = storage_transfer_service @project_id = project_id @aws_credentials = aws_credentials @clock = options[:clock] || Time @thread = options[:thread] || Kernel @logger = options[:logger] || NullLogger::INSTANCE end
Public Instance Methods
copy_to_cloud_storage(unload_manifest, cloud_storage_bucket, options={})
click to toggle source
# File lib/bigshift/cloud_storage_transfer.rb, line 12 def copy_to_cloud_storage(unload_manifest, cloud_storage_bucket, options={}) poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL transfer_job = create_transfer_job(unload_manifest, cloud_storage_bucket, options[:description], options[:allow_overwrite]) transfer_job = @storage_transfer_service.create_transfer_job(transfer_job) @logger.info(sprintf('Transferring %d objects (%.2f GiB) from s3://%s/%s to gs://%s/%s', unload_manifest.count, unload_manifest.total_file_size.to_f/2**30, unload_manifest.bucket_name, unload_manifest.prefix, cloud_storage_bucket, unload_manifest.prefix)) await_completion(transfer_job, poll_interval) validate_transfer(unload_manifest, cloud_storage_bucket) nil end
Private Instance Methods
await_completion(transfer_job, poll_interval)
click to toggle source
# File lib/bigshift/cloud_storage_transfer.rb, line 59 def await_completion(transfer_job, poll_interval) started = false loop do operation = nil failures = 0 begin operations_response = @storage_transfer_service.list_transfer_operations('transferOperations', filter: JSON.dump({project_id: @project_id, job_names: [transfer_job.name]})) operation = operations_response.operations && operations_response.operations.first rescue Google::Apis::ServerError => e failures += 1 if failures < 5 @logger.debug(sprintf('Error while waiting for job %s, will retry: %s (%s)', transfer_job.name.inspect, e.message.inspect, e.class.name)) @thread.sleep(poll_interval) retry else raise sprintf('Transfer failed: %s (%s)', e.message.inspect, e.class.name) end end if operation && operation.done? handle_completion(transfer_job, operation) break else status = operation && operation.metadata && operation.metadata['status'] if status == 'IN_PROGRESS' && !started @logger.info(sprintf('Transfer %s started', transfer_job.description)) started = true else @logger.debug(sprintf('Waiting for job %s (name: %s, status: %s)', transfer_job.description.inspect, transfer_job.name.inspect, status ? status.inspect : 'unknown')) end @thread.sleep(poll_interval) end end end
create_transfer_job(unload_manifest, cloud_storage_bucket, description, allow_overwrite)
click to toggle source
# File lib/bigshift/cloud_storage_transfer.rb, line 26 def create_transfer_job(unload_manifest, cloud_storage_bucket, description, allow_overwrite) soon = @clock.now.utc + 60 Google::Apis::StoragetransferV1::TransferJob.new( description: description, project_id: @project_id, status: 'ENABLED', schedule: Google::Apis::StoragetransferV1::Schedule.new( schedule_start_date: Google::Apis::StoragetransferV1::Date.new(year: soon.year, month: soon.month, day: soon.day), schedule_end_date: Google::Apis::StoragetransferV1::Date.new(year: soon.year, month: soon.month, day: soon.day), start_time_of_day: Google::Apis::StoragetransferV1::TimeOfDay.new(hours: soon.hour, minutes: soon.min) ), transfer_spec: Google::Apis::StoragetransferV1::TransferSpec.new( aws_s3_data_source: Google::Apis::StoragetransferV1::AwsS3Data.new( bucket_name: unload_manifest.bucket_name, aws_access_key: Google::Apis::StoragetransferV1::AwsAccessKey.new( access_key_id: @aws_credentials.access_key_id, secret_access_key: @aws_credentials.secret_access_key, ) ), gcs_data_sink: Google::Apis::StoragetransferV1::GcsData.new( bucket_name: cloud_storage_bucket ), object_conditions: Google::Apis::StoragetransferV1::ObjectConditions.new( include_prefixes: [unload_manifest.prefix], exclude_prefixes: [unload_manifest.manifest_key] ), transfer_options: Google::Apis::StoragetransferV1::TransferOptions.new( overwrite_objects_already_existing_in_sink: !!allow_overwrite ) ) ) end
handle_completion(transfer_job, operation)
click to toggle source
# File lib/bigshift/cloud_storage_transfer.rb, line 93 def handle_completion(transfer_job, operation) if operation.metadata['status'] == 'FAILED' raise 'Transfer failed' else message = sprintf('Transfer %s complete', transfer_job.description) if (counters = operation.metadata['counters']) size_in_gib = counters['bytesCopiedToSink'].to_f / 2**30 message << sprintf(', %s objects and %.2f GiB copied', counters['objectsCopiedToSink'], size_in_gib) end @logger.info(message) end end
validate_transfer(unload_manifest, cloud_storage_bucket)
click to toggle source
# File lib/bigshift/cloud_storage_transfer.rb, line 106 def validate_transfer(unload_manifest, cloud_storage_bucket) unload_manifest.validate_transfer(cloud_storage_bucket) @logger.info('Transfer validated, all file sizes match') end