module Sluice::Storage::S3

Constants

CONCURRENCY

Constants

FogFile
FogStorage

Aliases for Contracts

RETRIES
RETRY_WAIT
TIMEOUT_WAIT

Public Class Methods

copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Copies files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to copy

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 206
def copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  copying #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:copy, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten)
end
copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Copies files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 183
def copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)            

  puts "  copying inter-account #{describe_from(from_location)} to #{to_location}"
  processed = []
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    processed = download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
  end

  processed
end
copy_files_manifest(s3, manifest, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Copies files between S3 locations maintaining a manifest to avoid copying a file which was copied previously.

Useful in scenarios such as:

  1. You would like to do a move but only have read permission on the source bucket

  2. You would like to do a move but some other process needs to use the files after you

s3

A Fog::Storage s3 connection

manifest

A Sluice::Storage::S3::Manifest object

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to copy

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 229
def copy_files_manifest(s3, manifest, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  copying with manifest #{describe_from(from_files_or_loc)} to #{to_location}"
  ignore = manifest.get_entries(s3) # Files to leave untouched
  processed = process_files(:copy, s3, from_files_or_loc, ignore, match_regex, to_location, alter_filename_lambda, flatten)
  manifest.add_entries(s3, processed)

  processed
end
delete_files(s3, from_files_or_loc, match_regex='.+') click to toggle source

Delete files from S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to delete files from

match_regex

a regex string to match the files to delete

# File lib/sluice/storage/s3/s3.rb, line 157
def delete_files(s3, from_files_or_loc, match_regex='.+')

  puts "  deleting #{describe_from(from_files_or_loc)}"
  process_files(:delete, s3, from_files_or_loc, [], match_regex)
end
describe_from(from_files_or_dir_or_loc) click to toggle source

Provides string describing from_files_or_dir_or_loc for logging purposes.

Parameters:

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

Returns a log-friendly string

# File lib/sluice/storage/s3/s3.rb, line 353
def describe_from(from_files_or_dir_or_loc)
  if from_files_or_dir_or_loc.is_a?(Array)
    "#{from_files_or_dir_or_loc.length} file(s)"
  else
    "files from #{from_files_or_dir_or_loc}"
  end
end
download_file(s3, from_file, to_file) click to toggle source

Download a single file to the exact path specified Has no intelligence around filenaming. Makes sure to create the path as needed.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A Fog::Storage::AWS::File to download

+to_file

A local file path

# File lib/sluice/storage/s3/s3.rb, line 332
def download_file(s3, from_file, to_file)

  FileUtils.mkdir_p(File.dirname(to_file))

  # TODO: deal with bug where Fog hangs indefinitely if network connection dies during download

  local_file = File.open(to_file, "w")
  local_file.write(from_file.body)
  local_file.close
end
download_files(s3, from_files_or_loc, to_directory, match_regex='.+') click to toggle source

Download files from an S3 location to local storage, concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to download files from

to_directory

Local directory to copy files to

match_regex

a regex string to match the files to delete

# File lib/sluice/storage/s3/s3.rb, line 144
def download_files(s3, from_files_or_loc, to_directory, match_regex='.+')

  puts "  downloading #{describe_from(from_files_or_loc)} to #{to_directory}"
  process_files(:download, s3, from_files_or_loc, [], match_regex, to_directory)
end
get_basename(path) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 111
def get_basename(path)
  if is_folder?(path)
    nil
  else
    match = path.match('([^/]+)$')
    if match
      match[1]
    else
      nil
    end
  end
end
glob_files(dir, glob) click to toggle source

A helper function to list all files recursively in a folder

Parameters:

dir

Directory to list files recursively

match_regex

a regex string to match the files to copy

Returns array of files (no sub-directories)

# File lib/sluice/storage/s3/s3.rb, line 622
def glob_files(dir, glob)
  Dir.glob(File.join(dir, glob)).select { |f|
    File.file?(f) # Drop sub-directories
  }
end
is_empty?(s3, location) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 131
def is_empty?(s3, location)
  list_files(s3, location).length == 0
end
is_file?(path) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 98
def is_file?(path)
  !is_folder?(path)
end
is_folder?(path) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 85
def is_folder?(path)
  (path.end_with?('_$folder$') || # EMR-created
    path.end_with?('/'))
end
list_files(s3, location) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 65
def list_files(s3, location)
  files_and_dirs = s3.directories.get(location.bucket, prefix: location.dir_as_path).files

  files = [] # Can't use a .select because of Ruby deep copy issues (array of non-POROs)
  files_and_dirs.each { |f|
    if is_file?(f.key)
      files << f.dup
    end
  }
  files
end
move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Moves files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 281
def move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:move, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten)
end
move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Moves files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

  3. Concurrent deletion of all files from S3 source

In other words, the three operations are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 257
def move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving inter-account #{describe_from(from_location)} to #{to_location}"
  processed = []
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    processed = download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
    delete_files(from_s3, from_location, '.+') # Delete all files we downloaded
  end

  processed
end
name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false) click to toggle source

A helper function to prepare destination filenames and paths. This is a bit weird

  • it needs to exist because of differences

in the way that Amazon S3, Fog and Unix treat filepaths versus keys.

Parameters:

filepath

Path to file (including old filename)

new_filename

Replace the filename in the path with this

remove_path

If this is set, strip this from the front of the path

add_path

If this is set, add this to the front of the path

flatten

strips off any sub-folders below the from_location

TODO: this badly needs unit tests

# File lib/sluice/storage/s3/s3.rb, line 669
def name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false)

  # First, replace the filename in filepath with new one
  dirname = File.dirname(filepath)
  new_filepath = (dirname == '.') ? new_filename : dirname + '/' + new_filename

  # Nothing more to do
  return new_filepath if remove_path.nil? and add_path.nil? and not flatten

  shortened_filepath =  if flatten
                          # Let's revert to just the filename
                          new_filename
                        else
                          # If we have a 'remove_path', it must be found at
                          # the start of the path.
                          # If it's not, you're probably using name_file()
                          # wrong.
                          if !filepath.start_with?(remove_path)
                            raise StorageOperationError, "name_file failed. Filepath '#{filepath}' does not start with '#{remove_path}'"
                          end

                          # Okay, let's remove the filepath
                          new_filepath[remove_path.length()..-1]
                        end

  # Nothing more to do
  return shortened_filepath if add_path.nil?

  # Add the new filepath on to the start and return
  return add_path + shortened_filepath
end
new_fog_s3_from(region, access_key_id, secret_access_key) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 45
def new_fog_s3_from(region, access_key_id, secret_access_key)
  fog = Fog::Storage.new({
    :provider => 'AWS',
    :region => region,
    :aws_access_key_id => access_key_id,
    :aws_secret_access_key => secret_access_key
  })
  fog.sync_clock
  fog
end
process_files(operation, s3, from_files_or_dir_or_loc, ignore=[], match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=false, flatten=false) click to toggle source

Concurrent file operations between S3 locations. Supports:

  • Download

  • Upload

  • Copy

  • Delete

  • Move (= Copy + Delete)

Parameters:

operation

Operation to perform. :download, :upload, :copy, :delete, :move supported

ignore

Array of filenames to ignore (used by manifest code)

s3

A Fog::Storage s3 connection

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

match_regex_or_glob

a regex or glob string to match the files to process

to_loc_or_dir

S3Location or local directory to process files to

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_loc_or_dir

# File lib/sluice/storage/s3/s3.rb, line 378
def process_files(operation, s3, from_files_or_dir_or_loc, ignore=[], match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=false, flatten=false)

  # Validate that the file operation makes sense
  case operation
  when :copy, :move, :download, :upload
    if to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s requires the to_loc_or_dir to be set" % operation
    end
  when :delete
    unless to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s does not support the to_loc_or_dir argument" % operation
    end
    if alter_filename_lambda.class == Proc
      raise StorageOperationError "File operation %s does not support the alter_filename_lambda argument" % operation
    end
  else
    raise StorageOperationError "File operation %s is unsupported. Try :download, :upload, :copy, :delete or :move" % operation
  end

  # If we have an array of files, no additional globbing required
  if from_files_or_dir_or_loc.is_a?(Array)
    files_to_process = from_files_or_dir_or_loc # Could be filepaths or Fog::Storage::AWS::File's
    globbed = true
  # Otherwise if it's an upload, we can glob now
  elsif operation == :upload
    files_to_process = glob_files(from_files_or_dir_or_loc, match_regex_or_glob)
    globbed = true
  # Otherwise we'll do threaded globbing later...
  else
    files_to_process = []
    from_loc = from_files_or_dir_or_loc # Alias
    globbed = false
  end

  threads = []
  mutex = Mutex.new
  complete = false
  marker_opts = {}
  processed_files = [] # For manifest updating, determining if any files were moved etc

  # If an exception is thrown in a thread that isn't handled, die quickly
  Thread.abort_on_exception = true

  # Create Ruby threads to concurrently execute s3 operations
  for i in (0...CONCURRENCY)

    # Each thread pops a file off the files_to_process array, and moves it.
    # We loop until there are no more files
    threads << Thread.new(i) do |thread_idx|

      loop do
        file = false
        filepath = false
        from_bucket = false
        from_path = false
        match = false

        # First critical section:
        # only allow one thread to modify the array at any time
        mutex.synchronize do

          # No need to do further globbing
          if globbed
            if files_to_process.size == 0
              complete = true
              next
            end

            file = files_to_process.pop
            # Support raw filenames and also Fog::Storage::AWS::File's
            if (file.is_a?(Fog::Storage::AWS::File))
              from_bucket = file.directory.key # Bucket
              from_path = Sluice::Storage.trail_slash(File.dirname(file.key))
              filepath = file.key
            else
              from_bucket = nil # Not used
              if from_files_or_dir_or_loc.is_a?(Array)
                from_path = Sluice::Storage.trail_slash(File.dirname(file))
              else
                from_path = from_files_or_dir_or_loc # The root dir
              end
              filepath = file
            end

            match = true # Match is implicit in the glob
          else

            while !complete && !match do
              if files_to_process.size == 0
                # S3 batches 1000 files per request.
                # We load up our array with the files to move
                files_to_process = s3.directories.get(from_loc.bucket, :prefix => from_loc.dir).files.all(marker_opts).to_a
                # If we don't have any files after the S3 request, we're complete
                if files_to_process.size == 0
                  complete = true
                  next
                else
                  marker_opts['marker'] = files_to_process.last.key

                  # By reversing the array we can use pop and get FIFO behaviour
                  # instead of the performance penalty incurred by unshift
                  files_to_process = files_to_process.reverse
                end
              end

              file = files_to_process.pop
              from_bucket = from_loc.bucket
              from_path = from_loc.dir_as_path
              filepath = file.key

              # TODO: clean up following https://github.com/snowplow/sluice/issues/25
              match = if match_regex_or_glob.is_a? NegativeRegex
                        !filepath.match(match_regex_or_glob.regex)
                      else
                        filepath.match(match_regex_or_glob)
                      end

            end
          end
        end
        # End of mutex.synchronize

        # Kill this thread's loop (and thus this thread) if we are complete
        break if complete

        # Skip processing for a folder or file which doesn't match our regexp or glob
        next if is_folder?(filepath) or not match

        # Name file
        basename = get_basename(filepath)
        next if ignore.include?(basename) # Don't process if in our leave list

        filename = rename_file(filepath, basename, alter_filename_lambda)

        # What are we doing? Let's determine source and target
        # Note that target excludes bucket name where relevant
        case operation
        when :upload
          source = "#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    UPLOAD #{source} +-> #{to_loc_or_dir.bucket}/#{target}"                
        when :download
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir, flatten)
          puts "(t#{thread_idx})    DOWNLOAD #{source} +-> #{target}"
        when :move
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    MOVE #{source} -> #{to_loc_or_dir.bucket}/#{target}"
        when :copy
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    COPY #{source} +-> #{to_loc_or_dir.bucket}/#{target}"
        when :delete
          source = "#{from_bucket}/#{filepath}"
          # No target
          puts "(t#{thread_idx})    DELETE x #{source}" 
        end

        # Upload is a standalone operation vs move/copy/delete
        if operation == :upload
          retry_x(
            Sluice::Storage::S3,
            [:upload_file, s3, filepath, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +/> #{target}",
            "Problem uploading #{filepath}. Retrying.")
        end                

        # Download is a standalone operation vs move/copy/delete
        if operation == :download
          retry_x(
            Sluice::Storage::S3,
            [:download_file, s3, file, target],
            RETRIES,
            "      +/> #{target}",
            "Problem downloading #{filepath}. Retrying.")
        end

        # A move or copy starts with a copy file
        if [:move, :copy].include? operation
          retry_x(
            file,
            [:copy, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +-> #{to_loc_or_dir.bucket}/#{target}",
            "Problem copying #{filepath}. Retrying.")
        end

        # A move or delete ends with a delete
        if [:move, :delete].include? operation
          retry_x(
            file,
            [:destroy],
            RETRIES,
            "      x #{source}",
            "Problem destroying #{filepath}. Retrying.")
        end

        # Second critical section: we need to update
        # processed_files in a thread-safe way
        mutex.synchronize do
          processed_files << filepath
        end
      end
    end
  end

  # Wait for threads to finish
  threads.each { |aThread|  aThread.join }

  processed_files # Return the processed files
end
retry_x(object, send_args, retries, attempt_msg, failure_msg) click to toggle source

A helper function to attempt to run a function retries times

Parameters:

object

Object to send our function to

send_args

Function plus arguments

retries

Number of retries to attempt

attempt_msg

Message to puts on each attempt

failure_msg

Message to puts on each failure

# File lib/sluice/storage/s3/s3.rb, line 638
def retry_x(object, send_args, retries, attempt_msg, failure_msg)
  i = 0
  begin
    Timeout::timeout(TIMEOUT_WAIT) do # In case our operation times out
      object.send(*send_args)
      puts attempt_msg
    end
  rescue
    raise unless i < retries
    puts failure_msg
    sleep(RETRY_WAIT)  # Give us a bit of time before retrying
    i += 1
    retry
  end
end
upload_file(s3, from_file, to_bucket, to_file) click to toggle source

Upload a single file to the exact location specified Has no intelligence around filenaming.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A local file path

+to_bucket

The Fog::Directory to upload to

+to_file

The file path to upload to

# File lib/sluice/storage/s3/s3.rb, line 310
def upload_file(s3, from_file, to_bucket, to_file)

  local_file = File.open(from_file)

  dir = s3.directories.new(:key => to_bucket) # No request made
  file = dir.files.create(
    :key    => to_file,
    :body   => local_file
  )
  file.save('x-amz-server-side-encryption' => 'AES256')
  local_file.close
end
upload_files(s3, from_files_or_dir, to_location, match_glob='*') click to toggle source

Uploads files to S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_dir

Local array of files or local directory to upload files from

to_location

S3Location to upload files to

match_glob

a filesystem glob to match the files to upload

# File lib/sluice/storage/s3/s3.rb, line 295
def upload_files(s3, from_files_or_dir, to_location, match_glob='*')

  puts "  uploading #{describe_from(from_files_or_dir)} to #{to_location}"
  process_files(:upload, s3, from_files_or_dir, [], match_glob, to_location)
end

Private Class Methods

rename_file(filepath, basename, rename_lambda=false) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 596
def self.rename_file(filepath, basename, rename_lambda=false)

  if rename_lambda.class == Proc
    case rename_lambda.arity
    when 2
      rename_lambda.call(basename, filepath)
    when 1
      rename_lambda.call(basename)
    when 0
      rename_lambda.call()
    else
      raise StorageOperationError "Expect arity of 0, 1 or 2 for rename_lambda, not #{rename_lambda.arity}"
    end
  else
    basename
  end
end

Private Instance Methods

copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Copies files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to copy

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 206
def copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  copying #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:copy, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten)
end
copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Copies files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 183
def copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)            

  puts "  copying inter-account #{describe_from(from_location)} to #{to_location}"
  processed = []
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    processed = download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
  end

  processed
end
copy_files_manifest(s3, manifest, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Copies files between S3 locations maintaining a manifest to avoid copying a file which was copied previously.

Useful in scenarios such as:

  1. You would like to do a move but only have read permission on the source bucket

  2. You would like to do a move but some other process needs to use the files after you

s3

A Fog::Storage s3 connection

manifest

A Sluice::Storage::S3::Manifest object

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to copy

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 229
def copy_files_manifest(s3, manifest, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  copying with manifest #{describe_from(from_files_or_loc)} to #{to_location}"
  ignore = manifest.get_entries(s3) # Files to leave untouched
  processed = process_files(:copy, s3, from_files_or_loc, ignore, match_regex, to_location, alter_filename_lambda, flatten)
  manifest.add_entries(s3, processed)

  processed
end
delete_files(s3, from_files_or_loc, match_regex='.+') click to toggle source

Delete files from S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to delete files from

match_regex

a regex string to match the files to delete

# File lib/sluice/storage/s3/s3.rb, line 157
def delete_files(s3, from_files_or_loc, match_regex='.+')

  puts "  deleting #{describe_from(from_files_or_loc)}"
  process_files(:delete, s3, from_files_or_loc, [], match_regex)
end
describe_from(from_files_or_dir_or_loc) click to toggle source

Provides string describing from_files_or_dir_or_loc for logging purposes.

Parameters:

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

Returns a log-friendly string

# File lib/sluice/storage/s3/s3.rb, line 353
def describe_from(from_files_or_dir_or_loc)
  if from_files_or_dir_or_loc.is_a?(Array)
    "#{from_files_or_dir_or_loc.length} file(s)"
  else
    "files from #{from_files_or_dir_or_loc}"
  end
end
download_file(s3, from_file, to_file) click to toggle source

Download a single file to the exact path specified Has no intelligence around filenaming. Makes sure to create the path as needed.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A Fog::Storage::AWS::File to download

+to_file

A local file path

# File lib/sluice/storage/s3/s3.rb, line 332
def download_file(s3, from_file, to_file)

  FileUtils.mkdir_p(File.dirname(to_file))

  # TODO: deal with bug where Fog hangs indefinitely if network connection dies during download

  local_file = File.open(to_file, "w")
  local_file.write(from_file.body)
  local_file.close
end
download_files(s3, from_files_or_loc, to_directory, match_regex='.+') click to toggle source

Download files from an S3 location to local storage, concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to download files from

to_directory

Local directory to copy files to

match_regex

a regex string to match the files to delete

# File lib/sluice/storage/s3/s3.rb, line 144
def download_files(s3, from_files_or_loc, to_directory, match_regex='.+')

  puts "  downloading #{describe_from(from_files_or_loc)} to #{to_directory}"
  process_files(:download, s3, from_files_or_loc, [], match_regex, to_directory)
end
get_basename(path) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 111
def get_basename(path)
  if is_folder?(path)
    nil
  else
    match = path.match('([^/]+)$')
    if match
      match[1]
    else
      nil
    end
  end
end
glob_files(dir, glob) click to toggle source

A helper function to list all files recursively in a folder

Parameters:

dir

Directory to list files recursively

match_regex

a regex string to match the files to copy

Returns array of files (no sub-directories)

# File lib/sluice/storage/s3/s3.rb, line 622
def glob_files(dir, glob)
  Dir.glob(File.join(dir, glob)).select { |f|
    File.file?(f) # Drop sub-directories
  }
end
is_empty?(s3, location) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 131
def is_empty?(s3, location)
  list_files(s3, location).length == 0
end
is_file?(path) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 98
def is_file?(path)
  !is_folder?(path)
end
is_folder?(path) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 85
def is_folder?(path)
  (path.end_with?('_$folder$') || # EMR-created
    path.end_with?('/'))
end
list_files(s3, location) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 65
def list_files(s3, location)
  files_and_dirs = s3.directories.get(location.bucket, prefix: location.dir_as_path).files

  files = [] # Can't use a .select because of Ruby deep copy issues (array of non-POROs)
  files_and_dirs.each { |f|
    if is_file?(f.key)
      files << f.dup
    end
  }
  files
end
move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Moves files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 281
def move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:move, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten)
end
move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) click to toggle source

Moves files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

  3. Concurrent deletion of all files from S3 source

In other words, the three operations are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location

# File lib/sluice/storage/s3/s3.rb, line 257
def move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving inter-account #{describe_from(from_location)} to #{to_location}"
  processed = []
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    processed = download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
    delete_files(from_s3, from_location, '.+') # Delete all files we downloaded
  end

  processed
end
name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false) click to toggle source

A helper function to prepare destination filenames and paths. This is a bit weird

  • it needs to exist because of differences

in the way that Amazon S3, Fog and Unix treat filepaths versus keys.

Parameters:

filepath

Path to file (including old filename)

new_filename

Replace the filename in the path with this

remove_path

If this is set, strip this from the front of the path

add_path

If this is set, add this to the front of the path

flatten

strips off any sub-folders below the from_location

TODO: this badly needs unit tests

# File lib/sluice/storage/s3/s3.rb, line 669
def name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false)

  # First, replace the filename in filepath with new one
  dirname = File.dirname(filepath)
  new_filepath = (dirname == '.') ? new_filename : dirname + '/' + new_filename

  # Nothing more to do
  return new_filepath if remove_path.nil? and add_path.nil? and not flatten

  shortened_filepath =  if flatten
                          # Let's revert to just the filename
                          new_filename
                        else
                          # If we have a 'remove_path', it must be found at
                          # the start of the path.
                          # If it's not, you're probably using name_file()
                          # wrong.
                          if !filepath.start_with?(remove_path)
                            raise StorageOperationError, "name_file failed. Filepath '#{filepath}' does not start with '#{remove_path}'"
                          end

                          # Okay, let's remove the filepath
                          new_filepath[remove_path.length()..-1]
                        end

  # Nothing more to do
  return shortened_filepath if add_path.nil?

  # Add the new filepath on to the start and return
  return add_path + shortened_filepath
end
new_fog_s3_from(region, access_key_id, secret_access_key) click to toggle source
# File lib/sluice/storage/s3/s3.rb, line 45
def new_fog_s3_from(region, access_key_id, secret_access_key)
  fog = Fog::Storage.new({
    :provider => 'AWS',
    :region => region,
    :aws_access_key_id => access_key_id,
    :aws_secret_access_key => secret_access_key
  })
  fog.sync_clock
  fog
end
process_files(operation, s3, from_files_or_dir_or_loc, ignore=[], match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=false, flatten=false) click to toggle source

Concurrent file operations between S3 locations. Supports:

  • Download

  • Upload

  • Copy

  • Delete

  • Move (= Copy + Delete)

Parameters:

operation

Operation to perform. :download, :upload, :copy, :delete, :move supported

ignore

Array of filenames to ignore (used by manifest code)

s3

A Fog::Storage s3 connection

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

match_regex_or_glob

a regex or glob string to match the files to process

to_loc_or_dir

S3Location or local directory to process files to

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_loc_or_dir

# File lib/sluice/storage/s3/s3.rb, line 378
def process_files(operation, s3, from_files_or_dir_or_loc, ignore=[], match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=false, flatten=false)

  # Validate that the file operation makes sense
  case operation
  when :copy, :move, :download, :upload
    if to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s requires the to_loc_or_dir to be set" % operation
    end
  when :delete
    unless to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s does not support the to_loc_or_dir argument" % operation
    end
    if alter_filename_lambda.class == Proc
      raise StorageOperationError "File operation %s does not support the alter_filename_lambda argument" % operation
    end
  else
    raise StorageOperationError "File operation %s is unsupported. Try :download, :upload, :copy, :delete or :move" % operation
  end

  # If we have an array of files, no additional globbing required
  if from_files_or_dir_or_loc.is_a?(Array)
    files_to_process = from_files_or_dir_or_loc # Could be filepaths or Fog::Storage::AWS::File's
    globbed = true
  # Otherwise if it's an upload, we can glob now
  elsif operation == :upload
    files_to_process = glob_files(from_files_or_dir_or_loc, match_regex_or_glob)
    globbed = true
  # Otherwise we'll do threaded globbing later...
  else
    files_to_process = []
    from_loc = from_files_or_dir_or_loc # Alias
    globbed = false
  end

  threads = []
  mutex = Mutex.new
  complete = false
  marker_opts = {}
  processed_files = [] # For manifest updating, determining if any files were moved etc

  # If an exception is thrown in a thread that isn't handled, die quickly
  Thread.abort_on_exception = true

  # Create Ruby threads to concurrently execute s3 operations
  for i in (0...CONCURRENCY)

    # Each thread pops a file off the files_to_process array, and moves it.
    # We loop until there are no more files
    threads << Thread.new(i) do |thread_idx|

      loop do
        file = false
        filepath = false
        from_bucket = false
        from_path = false
        match = false

        # First critical section:
        # only allow one thread to modify the array at any time
        mutex.synchronize do

          # No need to do further globbing
          if globbed
            if files_to_process.size == 0
              complete = true
              next
            end

            file = files_to_process.pop
            # Support raw filenames and also Fog::Storage::AWS::File's
            if (file.is_a?(Fog::Storage::AWS::File))
              from_bucket = file.directory.key # Bucket
              from_path = Sluice::Storage.trail_slash(File.dirname(file.key))
              filepath = file.key
            else
              from_bucket = nil # Not used
              if from_files_or_dir_or_loc.is_a?(Array)
                from_path = Sluice::Storage.trail_slash(File.dirname(file))
              else
                from_path = from_files_or_dir_or_loc # The root dir
              end
              filepath = file
            end

            match = true # Match is implicit in the glob
          else

            while !complete && !match do
              if files_to_process.size == 0
                # S3 batches 1000 files per request.
                # We load up our array with the files to move
                files_to_process = s3.directories.get(from_loc.bucket, :prefix => from_loc.dir).files.all(marker_opts).to_a
                # If we don't have any files after the S3 request, we're complete
                if files_to_process.size == 0
                  complete = true
                  next
                else
                  marker_opts['marker'] = files_to_process.last.key

                  # By reversing the array we can use pop and get FIFO behaviour
                  # instead of the performance penalty incurred by unshift
                  files_to_process = files_to_process.reverse
                end
              end

              file = files_to_process.pop
              from_bucket = from_loc.bucket
              from_path = from_loc.dir_as_path
              filepath = file.key

              # TODO: clean up following https://github.com/snowplow/sluice/issues/25
              match = if match_regex_or_glob.is_a? NegativeRegex
                        !filepath.match(match_regex_or_glob.regex)
                      else
                        filepath.match(match_regex_or_glob)
                      end

            end
          end
        end
        # End of mutex.synchronize

        # Kill this thread's loop (and thus this thread) if we are complete
        break if complete

        # Skip processing for a folder or file which doesn't match our regexp or glob
        next if is_folder?(filepath) or not match

        # Name file
        basename = get_basename(filepath)
        next if ignore.include?(basename) # Don't process if in our leave list

        filename = rename_file(filepath, basename, alter_filename_lambda)

        # What are we doing? Let's determine source and target
        # Note that target excludes bucket name where relevant
        case operation
        when :upload
          source = "#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    UPLOAD #{source} +-> #{to_loc_or_dir.bucket}/#{target}"                
        when :download
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir, flatten)
          puts "(t#{thread_idx})    DOWNLOAD #{source} +-> #{target}"
        when :move
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    MOVE #{source} -> #{to_loc_or_dir.bucket}/#{target}"
        when :copy
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    COPY #{source} +-> #{to_loc_or_dir.bucket}/#{target}"
        when :delete
          source = "#{from_bucket}/#{filepath}"
          # No target
          puts "(t#{thread_idx})    DELETE x #{source}" 
        end

        # Upload is a standalone operation vs move/copy/delete
        if operation == :upload
          retry_x(
            Sluice::Storage::S3,
            [:upload_file, s3, filepath, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +/> #{target}",
            "Problem uploading #{filepath}. Retrying.")
        end                

        # Download is a standalone operation vs move/copy/delete
        if operation == :download
          retry_x(
            Sluice::Storage::S3,
            [:download_file, s3, file, target],
            RETRIES,
            "      +/> #{target}",
            "Problem downloading #{filepath}. Retrying.")
        end

        # A move or copy starts with a copy file
        if [:move, :copy].include? operation
          retry_x(
            file,
            [:copy, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +-> #{to_loc_or_dir.bucket}/#{target}",
            "Problem copying #{filepath}. Retrying.")
        end

        # A move or delete ends with a delete
        if [:move, :delete].include? operation
          retry_x(
            file,
            [:destroy],
            RETRIES,
            "      x #{source}",
            "Problem destroying #{filepath}. Retrying.")
        end

        # Second critical section: we need to update
        # processed_files in a thread-safe way
        mutex.synchronize do
          processed_files << filepath
        end
      end
    end
  end

  # Wait for threads to finish
  threads.each { |aThread|  aThread.join }

  processed_files # Return the processed files
end
retry_x(object, send_args, retries, attempt_msg, failure_msg) click to toggle source

A helper function to attempt to run a function retries times

Parameters:

object

Object to send our function to

send_args

Function plus arguments

retries

Number of retries to attempt

attempt_msg

Message to puts on each attempt

failure_msg

Message to puts on each failure

# File lib/sluice/storage/s3/s3.rb, line 638
def retry_x(object, send_args, retries, attempt_msg, failure_msg)
  i = 0
  begin
    Timeout::timeout(TIMEOUT_WAIT) do # In case our operation times out
      object.send(*send_args)
      puts attempt_msg
    end
  rescue
    raise unless i < retries
    puts failure_msg
    sleep(RETRY_WAIT)  # Give us a bit of time before retrying
    i += 1
    retry
  end
end
upload_file(s3, from_file, to_bucket, to_file) click to toggle source

Upload a single file to the exact location specified Has no intelligence around filenaming.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A local file path

+to_bucket

The Fog::Directory to upload to

+to_file

The file path to upload to

# File lib/sluice/storage/s3/s3.rb, line 310
def upload_file(s3, from_file, to_bucket, to_file)

  local_file = File.open(from_file)

  dir = s3.directories.new(:key => to_bucket) # No request made
  file = dir.files.create(
    :key    => to_file,
    :body   => local_file
  )
  file.save('x-amz-server-side-encryption' => 'AES256')
  local_file.close
end
upload_files(s3, from_files_or_dir, to_location, match_glob='*') click to toggle source

Uploads files to S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_dir

Local array of files or local directory to upload files from

to_location

S3Location to upload files to

match_glob

a filesystem glob to match the files to upload

# File lib/sluice/storage/s3/s3.rb, line 295
def upload_files(s3, from_files_or_dir, to_location, match_glob='*')

  puts "  uploading #{describe_from(from_files_or_dir)} to #{to_location}"
  process_files(:upload, s3, from_files_or_dir, [], match_glob, to_location)
end