module DbSucker::Application::SklavenTreiber::Worker::Helpers
Public Instance Methods
aquire_slots(*which, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 54 def aquire_slots *which, &block target_thread = Thread.current aquired = [] which.each_with_index do |wh, i| if pool = sklaventreiber.slot_pools[wh] waitlock = Queue.new channel = app.channelfy_thread app.spawn_thread(:sklaventreiber_worker_slot_progress) {|thr| thr[:current_task] = target_thread[:current_task] if target_thread[:current_task] thr[:slot_pool_qindex] = Proc.new { pool.qindex(target_thread) } waitlock.pop pool.aquire(target_thread) } waitlock << true target_thread.wait label = "aquiring slot #{i+1}/#{which.length} `#{pool.name}' :slot_pool_qindex(– #%s in queue )(:seconds)..." second_progress(channel, label, :blue).tap{ pool.wait_aquired(target_thread) }.join if pool.aquired?(target_thread) aquired << wh else break end else raise SlotPoolNotInitializedError, "slot pool `#{wh}' was never initialized, can't aquire slot" end end block.call if block && (which - aquired).empty? ensure release_slots(*which) end
file_copy(*args, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 24 def file_copy *args, &block IO::FileCopy.new(self, *args).tap do |op| block.try(:call, op) end end
file_gunzip(*args, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 30 def file_gunzip *args, &block IO::FileGunzip.new(self, *args).tap do |op| block.try(:call, op) end end
file_import_sql(*args, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 48 def file_import_sql *args, &block IO::FileImportSql.new(self, *args).tap do |op| block.try(:call, op) end end
file_shasum(*args, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 36 def file_shasum *args, &block IO::Shasum.new(self, *args).tap do |op| block.try(:call, op) end end
pv_wrap(*args, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 42 def pv_wrap *args, &block IO::PvWrapper.new(self, *args).tap do |op| block.try(:call, op) end end
release_slots(*which)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 85 def release_slots *which which.each_with_index do |wh, i| if pool = sklaventreiber.slot_pools[wh] pool.release(Thread.current) else raise SlotPoolNotInitializedError, "slot pool `#{wh}' was never initialized, can't release slot (was most likely never aquired)" end end end
runtime()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 6 def runtime if @started human_seconds ((@ended || Time.current) - @started).to_i end end
second_progress(channel, status, color = :yellow)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 103 def second_progress channel, status, color = :yellow target_thread = Thread.current app.spawn_thread(:sklaventreiber_worker_second_progress) do |thr| thr[:iteration] = 0 thr[:started_at] = Time.current thr[:current_task] = target_thread[:current_task] if target_thread[:current_task] channel[:handler] = thr if channel.respond_to?(:[]=) loop do if @should_cancel && !thr[:canceled] if channel.is_a?(Net::SSH::Connection::Channel) if channel[:pty] channel.send_data("\C-c") rescue false elsif channel[:pid] @ctn.kill_remote_process(channel[:pid]) end end channel.try(:close) rescue false Process.kill(:SIGINT, channel[:ipc_thread].pid) if channel[:ipc_thread] thr[:canceled] = true end stat = status.gsub(":seconds", human_seconds(Time.current - thr[:started_at])) if channel[:slot_pool_qindex].respond_to?(:call) qi = channel[:slot_pool_qindex].call re = /:slot_pool_qindex\(([^\)]+)\)/ if stat[re] stat[re] = qi ? stat[re].match(/\(([^\)]+)\)/)[1].gsub("%s", qi.to_s) : "" end end if channel[:error_message] @status = ["[ERROR] #{channel[:error_message]}", :red] elsif !channel.active? @status = ["[CLOSED] #{stat}", :red] elsif channel.closing? @status = ["[CLOSING] #{stat}", :red] else @status = [stat, color] end break unless channel.active? thr.wait(0.1) thr[:iteration] += 1 end end end
sftp_download(*args, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 12 def sftp_download *args, &block IO::SftpDownload.new(self, *args).tap do |op| block.try(:call, op) end end
sftp_native_download(*args, &block)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 18 def sftp_native_download *args, &block IO::SftpNativeDownload.new(self, *args).tap do |op| block.try(:call, op) end end
wait_defer_ready(label = nil)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 95 def wait_defer_ready label = nil channel = app.fake_channel {|c| c[:slot_pool_qindex].call.zero? || @should_cancel } channel[:slot_pool_qindex] = Proc.new { sklaventreiber.sync { sklaventreiber.workers.count{|w| !w.done? && !w.deferred? } } } label = "deferred import: #{human_bytes(File.size(@local_file_raw))} raw SQL :slot_pool_qindex(– waiting for %s workers )(:seconds)" second_progress(channel, label, :blue).join end