module DbSucker::Application::SklavenTreiber::Worker::Routines
Public Instance Methods
_do_import_file()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 273 def _do_import_file @status = ["importing #{human_bytes(File.size(@local_file_raw))} SQL data into local server...", :yellow] imp = @var.data["importer"] impf = @var.parse_flags(var.data["importer_flags"]).merge(deferred: @deferred) if imp == "void10" t = app.channelfy_thread app.spawn_thread(:sklaventreiber_worker_io_import_sql) {|thr| thr.wait(10) } second_progress(t, "importing with void10, sleeping 10 seconds (:seconds)...").join elsif imp == "sequel" || @var.constraint(table) raise NotImplementedError, "SequelImporter is not yet implemented/ported to new db_sucker version!" # # imp_was_sequel = imp == "sequel" # # imp = "sequel" # # t = app.channelfy_thread Thread.new { # # Thread.current[:importer] = imp = SequelImporter.new(worker, file, ignore_errors: !imp_was_sequel) # # imp.start # # } # var.load_local_file(self, file) do |importer, channel| # case importer # when "sequel" # sequel_progress(channel).join # if channel[:importer].error # @status = ["importing with Sequel", :yellow] # raise channel[:importer].error # end # else second_progress(channel, "#{"(deferred) " if deferred}loading file (#{human_filesize(File.size(file))}) into local SQL server (:seconds)...").join # end # throw :abort_execution, channel[:error_message] if channel[:error_message] # @return_message = channel[:return_message] if channel[:return_message] # end elsif imp == "binary" t = app.channelfy_thread app.spawn_thread(:sklaventreiber_worker_io_import_sql) {|thr| begin file_import_sql(@ctn, :instruction) do |fi| @status = [fi, "yellow"] fi.instruction = @var.import_instruction_for(@local_file_raw, impf) fi.filesize = File.size(@local_file_raw) fi.status_format = app.opts[:status_format] fi.abort_if { @should_cancel } fi.import! end rescue Worker::IO::FileImportSql::ImportError => ex fail! "ImportError: #{ex.message}" sleep 3 end } else raise Container::Variation::ImporterNotFoundError, "variation `#{@var.cfg.name}/#{@var.name}' defines unknown importer `#{imp}' (in `#{@var.cfg.src}')" end t.join end
_l_copy_file(file = nil)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 190 def _l_copy_file file = nil label = "copying #{var.copies_file_compressed? ? "gzipped" : "raw"} file" @status = ["#{label}...", :yellow] @copy_file_source = var.copies_file_compressed? ? @local_file_compressed : @local_file_raw @copy_file_target = copy_file_destination(var.data["file"]) file_copy(@ctn, @copy_file_source => @copy_file_target) do |fc| fc.label = label fc.status_format = app.opts[:status_format] @status = [fc, "yellow"] fc.abort_if { @should_cancel } fc.copy! end end
_l_decompress_file()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 206 def _l_decompress_file label = "decompressing file" @status = ["#{label}...", :yellow] file_gunzip(@ctn, @local_file_compressed) do |fc| fc.filesize = @remote_file_raw_filesize fc.label = label fc.status_format = app.opts[:status_format] @status = [fc, "yellow"] fc.abort_if { @should_cancel } fc.on_success do @local_files_to_remove.delete(@local_file_compressed) @local_file_raw = fc.local @local_files_to_remove << @local_file_raw end fc.gunzip! end end
_l_download_file()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 140 def _l_download_file @status = ["initiating download...", "yellow"] @local_file_compressed = local_tmp_file(File.basename(@remote_file_compressed)) @local_files_to_remove << @local_file_compressed case app.opts[:file_transport] when :ruby sftp_download(@ctn, @remote_file_compressed => @local_file_compressed) do |dl| dl.status_format = app.opts[:status_format] @status = [dl, "yellow"] dl.abort_if { @should_cancel } dl.download! end when :native sftp_native_download(@ctn, @remote_file_compressed => @local_file_compressed) do |dl| dl.status_format = app.opts[:status_format] @status = [dl, "yellow"] dl.abort_if { @should_cancel } dl.download! end else raise UnknownFileTransportError, "Unknown file transport `#{app.opts[:file_transport]}' configured, valid are `ruby' and `native'!" end end
_l_import_file()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 250 def _l_import_file if File.size(@local_file_raw) > app.opts[:deferred_threshold] && app.opts[:deferred_import] @deferred = true @perform << "l_wait_for_workers" else # cancel!("importing not yet implemented", true) _do_import_file end end
_l_import_file_deferred()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 268 def _l_import_file_deferred @status = ["importing #{human_bytes(File.size(@local_file_raw))} SQL data into local server...", :yellow] _do_import_file end
_l_verify_compressed_hash()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 165 def _l_verify_compressed_hash return unless @integrity[:compressed] label = "verifying compressed file" @status = ["#{label}...", :yellow] file_shasum(@ctn, @local_file_compressed) do |fc| fc.label = label fc.sha = ctn.integrity_sha fc.status_format = app.opts[:status_format] @status = [fc, "yellow"] fc.abort_if { @should_cancel } fc.on_success do @integrity[:compressed_local] = fc.result end fc.verify! end if !@should_cancel && @integrity[:compressed] != @integrity[:compressed_local] @status = ["[INTEGRITY] downloaded compressed file corrupted! (remote: #{@integrity[:compressed]}, local: #{@integrity[:compressed_local]})", :red] throw :abort_execution, true end end
_l_verify_raw_hash()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 226 def _l_verify_raw_hash return unless @integrity[:raw] label = "verifying raw file" @status = ["#{label}...", :yellow] file_shasum(@ctn, @local_file_raw) do |fc| fc.label = label fc.sha = ctn.integrity_sha fc.status_format = app.opts[:status_format] @status = [fc, "yellow"] fc.abort_if { @should_cancel } fc.on_success do @integrity[:raw_local] = fc.result end fc.verify! end if !@should_cancel && @integrity[:raw] != @integrity[:raw_local] @status = ["[INTEGRITY] extracted raw file corrupted! (remote: #{@integrity[:raw]}, local: #{@integrity[:raw_local]})", :red] throw :abort_execution, true end end
_l_wait_for_workers()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 260 def _l_wait_for_workers @perform << "l_import_file_deferred" unless Thread.current[:managed_worker] == :main Thread.main.sync { Thread.main[:summon_workers] += 1 } end wait_defer_ready end
_r_calculate_compressed_hash()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 113 def _r_calculate_compressed_hash @status = ["calculating integrity hash for compressed file...", "yellow"] pv_wrap(@ctn, nil) do |pv| pv.enabled do |pvbinary| pv.filesize = @remote_file_raw_filesize pv.label = "hashing compressed file" pv.entity = "hashing compressed file" pv.status_format = app.opts[:status_format] @status = [pv, "yellow"] pv.abort_if { @should_cancel } pv.cmd = ctn.calculate_remote_integrity_hash_command(@remote_file_compressed, pvbinary) end pv.fallback do cmd, (channel, pv.result) = ctn.calculate_remote_integrity_hash(@remote_file_compressed, false) second_progress(channel, "calculating integrity hash for compressed file (:seconds)...").join end pv.on_success do @integrity[:compressed] = pv.result.for_group(:stdout).join.split(" ").first.try(:strip).presence end pv.perform! end end
_r_calculate_raw_hash()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 57 def _r_calculate_raw_hash @status = ["calculating integrity hash for raw file...", "yellow"] pv_wrap(@ctn, nil) do |pv| pv.enabled do |pvbinary| pv.filesize = @remote_file_raw_filesize pv.label = "hashing raw file" pv.entity = "hashing raw file" pv.status_format = app.opts[:status_format] @status = [pv, "yellow"] pv.abort_if { @should_cancel } pv.cmd = ctn.calculate_remote_integrity_hash_command(@remote_file_raw, pvbinary) end pv.fallback do cmd, (channel, pv.result) = ctn.calculate_remote_integrity_hash(@remote_file_raw, false) second_progress(channel, "calculating integrity hash for raw file (:seconds)...").join end pv.on_success do @integrity = { raw: pv.result.for_group(:stdout).join.split(" ").first.try(:strip).presence } end pv.perform! end end
_r_compress_file()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 84 def _r_compress_file @status = ["compressing file for transfer...", "yellow"] pv_wrap(@ctn, nil) do |pv| pv.enabled do |pvbinary| pv.filesize = @remote_file_raw_filesize pv.label = "compressing" pv.entity = "compress" pv.status_format = app.opts[:status_format] @status = [pv, "yellow"] pv.abort_if { @should_cancel } @remote_file_compressed, pv.cmd = var.compress_file_command(@remote_file_raw, pvbinary) @remote_files_to_remove << @remote_file_compressed end pv.fallback do @remote_file_compressed, (channel, result) = var.compress_file(@remote_file_raw, false) @remote_files_to_remove << @remote_file_compressed second_progress(channel, "compressing file for transfer (:seconds)...").join end pv.on_success do @remote_files_to_remove.delete(@remote_file_raw) end pv.perform! end end
_r_dump_file()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/routines.rb, line 6 def _r_dump_file @status = ["dumping table to remote file...", "yellow"] pv_wrap(@ctn, nil) do |pv| pv.enabled do |pvbinary| pv.filesize = -1 pv.label = "dumping table" pv.entity = "table dump" pv.status_format = app.opts[:status_format] pv.mode = :nofs @status = [pv, "yellow"] pv.abort_if { @should_cancel } @remote_file_raw_tmp, pv.cmd = var.dump_to_remote_command(self, pvbinary) end pv.fallback do @remote_file_raw_tmp, (channel, result) = var.dump_to_remote(self, false) second_progress(channel, "dumping table to remote file (:seconds)...").join end pv.on_complete do @remote_files_to_remove << @remote_file_raw_tmp end pv.perform! end _cancelpoint # check if response has any sort of errors and abort # if result.any? # r = result.join # if m = r.match(/(Unknown column '(.+)') in .+ \(([0-9]+)\)/i) # @status = ["[DUMP] Failed: #{m[1]} (#{m[3]})", :red] # throw :abort_execution, true # end # end @remote_file_raw = @remote_file_raw_tmp[0..-5] ctn.sftp_start do |sftp| # rename tmp file sftp.rename!(@remote_file_raw_tmp, @remote_file_raw) # save size for gzip progress @remote_file_raw_filesize = sftp.lstat!(@remote_file_raw).size end @remote_files_to_remove.delete(@remote_file_raw_tmp) @remote_files_to_remove << @remote_file_raw end