class BigShift::Cli
Constants
- ARGUMENTS
- STEPS
Public Class Methods
new(argv, options={})
click to toggle source
# File lib/bigshift/cli.rb, line 22 def initialize(argv, options={}) @argv = argv.dup @factory_factory = options[:factory_factory] || Factory.method(:new) end
Public Instance Methods
run()
click to toggle source
# File lib/bigshift/cli.rb, line 27 def run begin setup unload transfer load cleanup nil rescue Aws::Errors::MissingRegionError, Aws::Sigv4::Errors::MissingCredentialsError => e raise CliError.new('AWS configuration missing or malformed: ' + e.message, e.backtrace, @usage) rescue Signet::AuthorizationError => e raise CliError.new('GCP configuration missing or malformed: ' + e.message, e.backtrace, @usage) end end
Private Instance Methods
cleanup()
click to toggle source
# File lib/bigshift/cli.rb, line 97 def cleanup if run?(:cleanup) @logger.debug('Running cleanup') @factory.cleaner.cleanup(@unload_manifest, @config[:cs_bucket_name]) @logger.debug('Cleanup complete') else @logger.debug('Skipping cleanup') end end
load()
click to toggle source
# File lib/bigshift/cli.rb, line 78 def load if run?(:load) @logger.debug('Querying Redshift schema') rs_table_schema = @factory.redshift_table_schema bq_dataset = @factory.big_query_dataset bq_table = bq_dataset.table(@config[:bq_table_id]) || bq_dataset.create_table(@config[:bq_table_id]) gcs_uri = "gs://#{@config[:cs_bucket_name]}/#{s3_table_prefix}*" options = {} options[:schema] = rs_table_schema.to_big_query options[:allow_overwrite] = true options[:max_bad_records] = @config[:max_bad_records] if @config[:max_bad_records] @logger.debug('Running load') bq_table.load(gcs_uri, options) @logger.debug('Load complete') else @logger.debug('Skipping load') end end
parse_args(argv)
click to toggle source
# File lib/bigshift/cli.rb, line 131 def parse_args(argv) config = {} parser = OptionParser.new do |p| ARGUMENTS.each do |flag, value_name, type, config_key, _| p.on("#{flag} #{value_name}", type) { |v| config[config_key] = v } end end config_errors = [] begin parser.parse!(argv) rescue OptionParser::InvalidOption => e config_errors << e.message end if !config[:gcp_credentials_path] && ENV['GOOGLE_APPLICATION_CREDENTIALS'] config[:gcp_credentials_path] = ENV['GOOGLE_APPLICATION_CREDENTIALS'] end %w[gcp aws rs].each do |prefix| if (path = config["#{prefix}_credentials_path".to_sym]) && File.exist?(path) config["#{prefix}_credentials".to_sym] = YAML.load(File.read(path)) elsif path && !File.exist?(path) config_errors << sprintf('%s does not exist', path.inspect) end end ARGUMENTS.each do |flag, _, _, config_key, required| if !config.include?(config_key) && required config_errors << "#{flag} is required" end end config[:bq_table_id] ||= config[:rs_table_name] config[:rs_schema_name] ||= 'public' if config[:steps] && !config[:steps].empty? config[:steps] = STEPS.select { |s| config[:steps].include?(s.to_s) } else config[:steps] = STEPS end @usage = parser.to_s unless config_errors.empty? raise CliError.new('Configuration missing or malformed', config_errors, @usage) end config end
run?(step)
click to toggle source
# File lib/bigshift/cli.rb, line 44 def run?(step) @config[:steps].include?(step) end
s3_table_prefix()
click to toggle source
# File lib/bigshift/cli.rb, line 173 def s3_table_prefix @s3_table_prefix ||= begin db_name = @config[:rs_database_name] schema_name = @config[:rs_schema_name] table_name = @config[:rs_table_name] prefix = "#{db_name}/#{schema_name}/#{table_name}/#{db_name}-#{schema_name}-#{table_name}-" if (s3_prefix = @config[:s3_prefix]) s3_prefix = s3_prefix.gsub(%r{\A/|/\Z}, '') prefix = "#{s3_prefix}/#{prefix}" end prefix end end
setup()
click to toggle source
# File lib/bigshift/cli.rb, line 48 def setup @config = parse_args(@argv) @factory = @factory_factory.call(@config) @logger = @factory.logger @logger.debug('Setup complete') end
transfer()
click to toggle source
# File lib/bigshift/cli.rb, line 67 def transfer if run?(:transfer) @logger.debug('Running transfer') description = "bigshift-#{@config[:rs_database_name]}-#{@config[:rs_schema_name]}-#{@config[:rs_table_name]}-#{Time.now.utc.strftime('%Y%m%dT%H%M')}" @factory.cloud_storage_transfer.copy_to_cloud_storage(@unload_manifest, @config[:cs_bucket_name], description: description, allow_overwrite: false) @logger.debug('Transfer complete') else @logger.debug('Skipping transfer') end end
unload()
click to toggle source
# File lib/bigshift/cli.rb, line 55 def unload if run?(:unload) @logger.debug('Running unload') s3_uri = "s3://#{@config[:s3_bucket_name]}/#{s3_table_prefix}" @factory.redshift_unloader.unload_to(@config[:rs_schema_name], @config[:rs_table_name], s3_uri, allow_overwrite: false, compression: @config[:compression]) @logger.debug('Unload complete') else @logger.debug('Skipping unload') end @unload_manifest = @factory.create_unload_manifest(@config[:s3_bucket_name], s3_table_prefix) end