class BigShift::Cli

Constants

ARGUMENTS
STEPS

Public Class Methods

new(argv, options={}) click to toggle source
# File lib/bigshift/cli.rb, line 22
def initialize(argv, options={})
  @argv = argv.dup
  @factory_factory = options[:factory_factory] || Factory.method(:new)
end

Public Instance Methods

run() click to toggle source
# File lib/bigshift/cli.rb, line 27
def run
  begin
    setup
    unload
    transfer
    load
    cleanup
    nil
  rescue Aws::Errors::MissingRegionError, Aws::Sigv4::Errors::MissingCredentialsError => e
    raise CliError.new('AWS configuration missing or malformed: ' + e.message, e.backtrace, @usage)
  rescue Signet::AuthorizationError => e
    raise CliError.new('GCP configuration missing or malformed: ' + e.message, e.backtrace, @usage)
  end
end

Private Instance Methods

cleanup() click to toggle source
# File lib/bigshift/cli.rb, line 97
def cleanup
  if run?(:cleanup)
    @logger.debug('Running cleanup')
    @factory.cleaner.cleanup(@unload_manifest, @config[:cs_bucket_name])
    @logger.debug('Cleanup complete')
  else
    @logger.debug('Skipping cleanup')
  end
end
load() click to toggle source
# File lib/bigshift/cli.rb, line 78
def load
  if run?(:load)
    @logger.debug('Querying Redshift schema')
    rs_table_schema = @factory.redshift_table_schema
    bq_dataset = @factory.big_query_dataset
    bq_table = bq_dataset.table(@config[:bq_table_id]) || bq_dataset.create_table(@config[:bq_table_id])
    gcs_uri = "gs://#{@config[:cs_bucket_name]}/#{s3_table_prefix}*"
    options = {}
    options[:schema] = rs_table_schema.to_big_query
    options[:allow_overwrite] = true
    options[:max_bad_records] = @config[:max_bad_records] if @config[:max_bad_records]
    @logger.debug('Running load')
    bq_table.load(gcs_uri, options)
    @logger.debug('Load complete')
  else
    @logger.debug('Skipping load')
  end
end
parse_args(argv) click to toggle source
# File lib/bigshift/cli.rb, line 131
def parse_args(argv)
  config = {}
  parser = OptionParser.new do |p|
    ARGUMENTS.each do |flag, value_name, type, config_key, _|
      p.on("#{flag} #{value_name}", type) { |v| config[config_key] = v }
    end
  end
  config_errors = []
  begin
    parser.parse!(argv)
  rescue OptionParser::InvalidOption => e
    config_errors << e.message
  end
  if !config[:gcp_credentials_path] && ENV['GOOGLE_APPLICATION_CREDENTIALS']
    config[:gcp_credentials_path] = ENV['GOOGLE_APPLICATION_CREDENTIALS']
  end
  %w[gcp aws rs].each do |prefix|
    if (path = config["#{prefix}_credentials_path".to_sym]) && File.exist?(path)
      config["#{prefix}_credentials".to_sym] = YAML.load(File.read(path))
    elsif path && !File.exist?(path)
      config_errors << sprintf('%s does not exist', path.inspect)
    end
  end
  ARGUMENTS.each do |flag, _, _, config_key, required|
    if !config.include?(config_key) && required
      config_errors << "#{flag} is required"
    end
  end
  config[:bq_table_id] ||= config[:rs_table_name]
  config[:rs_schema_name] ||= 'public'
  if config[:steps] && !config[:steps].empty?
    config[:steps] = STEPS.select { |s| config[:steps].include?(s.to_s) }
  else
    config[:steps] = STEPS
  end
  @usage = parser.to_s
  unless config_errors.empty?
    raise CliError.new('Configuration missing or malformed', config_errors, @usage)
  end
  config
end
run?(step) click to toggle source
# File lib/bigshift/cli.rb, line 44
def run?(step)
  @config[:steps].include?(step)
end
s3_table_prefix() click to toggle source
# File lib/bigshift/cli.rb, line 173
def s3_table_prefix
  @s3_table_prefix ||= begin
    db_name = @config[:rs_database_name]
    schema_name = @config[:rs_schema_name]
    table_name = @config[:rs_table_name]
    prefix = "#{db_name}/#{schema_name}/#{table_name}/#{db_name}-#{schema_name}-#{table_name}-"
    if (s3_prefix = @config[:s3_prefix])
      s3_prefix = s3_prefix.gsub(%r{\A/|/\Z}, '')
      prefix = "#{s3_prefix}/#{prefix}"
    end
    prefix
  end
end
setup() click to toggle source
# File lib/bigshift/cli.rb, line 48
def setup
  @config = parse_args(@argv)
  @factory = @factory_factory.call(@config)
  @logger = @factory.logger
  @logger.debug('Setup complete')
end
transfer() click to toggle source
# File lib/bigshift/cli.rb, line 67
def transfer
  if run?(:transfer)
    @logger.debug('Running transfer')
    description = "bigshift-#{@config[:rs_database_name]}-#{@config[:rs_schema_name]}-#{@config[:rs_table_name]}-#{Time.now.utc.strftime('%Y%m%dT%H%M')}"
    @factory.cloud_storage_transfer.copy_to_cloud_storage(@unload_manifest, @config[:cs_bucket_name], description: description, allow_overwrite: false)
    @logger.debug('Transfer complete')
  else
    @logger.debug('Skipping transfer')
  end
end
unload() click to toggle source
# File lib/bigshift/cli.rb, line 55
def unload
  if run?(:unload)
    @logger.debug('Running unload')
    s3_uri = "s3://#{@config[:s3_bucket_name]}/#{s3_table_prefix}"
    @factory.redshift_unloader.unload_to(@config[:rs_schema_name], @config[:rs_table_name], s3_uri, allow_overwrite: false, compression: @config[:compression])
    @logger.debug('Unload complete')
  else
    @logger.debug('Skipping unload')
  end
  @unload_manifest = @factory.create_unload_manifest(@config[:s3_bucket_name], s3_table_prefix)
end