module DbSucker::Application::SklavenTreiber::Worker::Helpers

Public Instance Methods

aquire_slots(*which, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 54
def aquire_slots *which, &block
  target_thread = Thread.current
  aquired = []
  which.each_with_index do |wh, i|
    if pool = sklaventreiber.slot_pools[wh]
      waitlock = Queue.new
      channel = app.channelfy_thread app.spawn_thread(:sklaventreiber_worker_slot_progress) {|thr|
        thr[:current_task] = target_thread[:current_task] if target_thread[:current_task]
        thr[:slot_pool_qindex] = Proc.new { pool.qindex(target_thread) }
        waitlock.pop
        pool.aquire(target_thread)
      }
      waitlock << true
      target_thread.wait

      label = "aquiring slot #{i+1}/#{which.length} `#{pool.name}' :slot_pool_qindex(– #%s in queue )(:seconds)..."
      second_progress(channel, label, :blue).tap{ pool.wait_aquired(target_thread) }.join
      if pool.aquired?(target_thread)
        aquired << wh
      else
        break
      end
    else
      raise SlotPoolNotInitializedError, "slot pool `#{wh}' was never initialized, can't aquire slot"
    end
  end
  block.call if block && (which - aquired).empty?
ensure
  release_slots(*which)
end
file_copy(*args, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 24
def file_copy *args, &block
  IO::FileCopy.new(self, *args).tap do |op|
    block.try(:call, op)
  end
end
file_gunzip(*args, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 30
def file_gunzip *args, &block
  IO::FileGunzip.new(self, *args).tap do |op|
    block.try(:call, op)
  end
end
file_import_sql(*args, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 48
def file_import_sql *args, &block
  IO::FileImportSql.new(self, *args).tap do |op|
    block.try(:call, op)
  end
end
file_shasum(*args, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 36
def file_shasum *args, &block
  IO::Shasum.new(self, *args).tap do |op|
    block.try(:call, op)
  end
end
pv_wrap(*args, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 42
def pv_wrap *args, &block
  IO::PvWrapper.new(self, *args).tap do |op|
    block.try(:call, op)
  end
end
release_slots(*which) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 85
def release_slots *which
  which.each_with_index do |wh, i|
    if pool = sklaventreiber.slot_pools[wh]
      pool.release(Thread.current)
    else
      raise SlotPoolNotInitializedError, "slot pool `#{wh}' was never initialized, can't release slot (was most likely never aquired)"
    end
  end
end
runtime() click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 6
def runtime
  if @started
    human_seconds ((@ended || Time.current) - @started).to_i
  end
end
second_progress(channel, status, color = :yellow) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 103
def second_progress channel, status, color = :yellow
  target_thread = Thread.current
  app.spawn_thread(:sklaventreiber_worker_second_progress) do |thr|
    thr[:iteration] = 0
    thr[:started_at] = Time.current
    thr[:current_task] = target_thread[:current_task] if target_thread[:current_task]
    channel[:handler] = thr if channel.respond_to?(:[]=)
    loop do
      if @should_cancel && !thr[:canceled]
        if channel.is_a?(Net::SSH::Connection::Channel)
          if channel[:pty]
            channel.send_data("\C-c") rescue false
          elsif channel[:pid]
            @ctn.kill_remote_process(channel[:pid])
          end
        end
        channel.try(:close) rescue false
        Process.kill(:SIGINT, channel[:ipc_thread].pid) if channel[:ipc_thread]
        thr[:canceled] = true
      end
      stat = status.gsub(":seconds", human_seconds(Time.current - thr[:started_at]))
      if channel[:slot_pool_qindex].respond_to?(:call)
        qi = channel[:slot_pool_qindex].call
        re = /:slot_pool_qindex\(([^\)]+)\)/
        if stat[re]
          stat[re] = qi ? stat[re].match(/\(([^\)]+)\)/)[1].gsub("%s", qi.to_s) : ""
        end
      end
      if channel[:error_message]
        @status = ["[ERROR] #{channel[:error_message]}", :red]
      elsif !channel.active?
        @status = ["[CLOSED] #{stat}", :red]
      elsif channel.closing?
        @status = ["[CLOSING] #{stat}", :red]
      else
        @status = [stat, color]
      end
      break unless channel.active?
      thr.wait(0.1)
      thr[:iteration] += 1
    end
  end
end
sftp_download(*args, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 12
def sftp_download *args, &block
  IO::SftpDownload.new(self, *args).tap do |op|
    block.try(:call, op)
  end
end
sftp_native_download(*args, &block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 18
def sftp_native_download *args, &block
  IO::SftpNativeDownload.new(self, *args).tap do |op|
    block.try(:call, op)
  end
end
wait_defer_ready(label = nil) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/helpers.rb, line 95
def wait_defer_ready label = nil
  channel = app.fake_channel {|c| c[:slot_pool_qindex].call.zero? || @should_cancel }
  channel[:slot_pool_qindex] = Proc.new { sklaventreiber.sync { sklaventreiber.workers.count{|w| !w.done? && !w.deferred? } } }

  label = "deferred import: #{human_bytes(File.size(@local_file_raw))} raw SQL :slot_pool_qindex(– waiting for %s workers )(:seconds)"
  second_progress(channel, label, :blue).join
end