class DbSucker::Application::SklavenTreiber

Attributes

app[R]
data[R]
monitor[R]
poll[R]
slot_pools[R]
status[R]
throughput[R]
trxid[R]
window[R]
workers[R]

Public Class Methods

new(app, trxid) click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 6
def initialize app, trxid
  @app = app
  @trxid = trxid
  @status = ["initializing", "gray"]
  @monitor = Monitor.new
  @workers = []
  @threads = []
  @slot_pools = {}
  @sleep_before_exit = 0
  @throughput = Worker::IO::Throughput.new(self)

  @data = {
    database: nil,
    tables_transfer: nil,
    tables_transfer_list: [],
    tables_total: nil,
    tables_done: 0,
  }
end

Public Instance Methods

_check_remote_tmp_directory() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 103
def _check_remote_tmp_directory
  @status = ["checking remote temp directory", "blue"]
  @ctn.sftp_begin
  @ctn.sftp_start do |sftp|
    # check tmp directory
    app.debug "Checking remote temp directory #{app.c @ctn.tmp_path, :magenta}"
    begin
      sftp.dir.glob("#{@ctn.tmp_path}", "**/*")
    rescue Net::SFTP::StatusException => ex
      if ex.message["no such file"]
        app.abort "Temp directory `#{@ctn.tmp_path}' does not exist on the remote side!", 2
      else
        raise
      end
    end
  end
end
_control_thread() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 287
def _control_thread
  if $core_runtime_exiting && $core_runtime_exiting < 100
    $core_runtime_exiting += 100
    app.sandboxed { @workers.each {|w| catch(:abort_execution) { w.cancel! } } }
    app.sandboxed { @slot_pools.each{|n, p| p.softclose! } }
    app.wakeup_handlers
  end
end
_init_window() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 96
def _init_window
  return unless app.opts[:window_enabled]
  @window = Window.new(app, self)
  @window.init!
  @window.start
end
_initialize_slot_pools() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 147
def _initialize_slot_pools
  app.opts[:slot_pools].each do |name, slots|
    @slot_pools[name] = SlotPool.new(slots, name)
  end
end
_initialize_workers() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 153
def _initialize_workers
  @status = ["initializing workers 0/#{@data[:tables_transfer]}", "blue"]

  @data[:tables_transfer_list].each_with_index do |table, index|
    @status = ["initializing workers #{index+1}/#{@data[:tables_transfer]}", "blue"]
    @workers << Worker.new(self, @ctn, @var, table)
  end
end
_queueoff() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 268
def _queueoff
  loop do
    return if $core_runtime_exiting
    worker = false
    sync do
      pending = @workers.select(&:pending?)
      return unless pending.any?
      worker = pending.first.aquire(Thread.current)
    end
    if worker
      begin
        worker.run
      ensure
        sync { @data[:tables_done] += 1 }
      end
    end
  end
end
_run_consumers() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 193
def _run_consumers
  cnum = [app.opts[:consumers], @data[:tables_transfer]].min
  @data[:window_col2] = cnum.to_s.length
  if cnum <= 1
    _run_in_main_thread
  else
    _run_in_threads(cnum)
  end
end
_run_in_main_thread() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 203
def _run_in_main_thread
  @status = ["running in main thread...", "green"]

  # control thread
  ctrlthr = app.spawn_thread(:sklaventreiber_worker_ctrl) do |thr|
    loop do
      _control_thread
      break if thr[:stop]
      thr.wait(0.1)
    end
  end

  begin
    Thread.current[:managed_worker] = :main
    _queueoff
  ensure
    ctrlthr[:stop] = true
    ctrlthr.signal.join
  end
end
_run_in_threads(cnum) click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 224
def _run_in_threads(cnum)
  @status = ["starting consumer 0/#{cnum}", "blue"]

  # initializing consumer threads
  cnum.times do |wi|
    @status = ["starting consumer #{wi+1}/#{cnum}", "blue"]
    @threads << app.spawn_thread(:sklaventreiber_worker) {|thr|
      begin
        thr[:managed_worker] = wi
        thr.wait(0.1) until thr[:start] || $core_runtime_exiting
        _queueoff
      rescue Interrupt
      end
    }
  end

  # start consumer threads
  @status = ["running", "green"]
  @threads.each{|t| t[:start] = true; t.signal }

  # master thread (control)
  additionals = 0
  Thread.current[:summon_workers] = 0
  while @threads.any?(&:alive?)
    _control_thread
    Thread.current.sync do
      Thread.current[:summon_workers].times do
        app.debug "Spawned additional worker due to deferred import to prevent softlocks"
        @threads << app.spawn_thread(:sklaventreiber_worker) {|thr|
          begin
            additionals += 1
            thr[:managed_worker] = cnum + additionals
            _queueoff
          rescue Interrupt
          end
        }
      end
      Thread.current[:summon_workers] = 0
    end
    Thread.current.wait(0.1)
  end
  @threads.each(&:join)
end
_select_tables() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 121
def _select_tables
  @status = ["selecting tables for transfer", "blue"]
  ttt, at = @var.tables_to_transfer

  # apply only/except filters provided via command line
  if @app.opts[:suck_only].any? && @app.opts[:suck_except].any?
    raise OptionParser::InvalidArgument, "only one of `--only' or `--except' option can be provided at the same time"
  elsif @app.opts[:suck_only].any?
    unless (r = @app.opts[:suck_only] - at).empty?
      raise Container::TableNotFoundError, "table(s) `#{r * ", "}' for the database `#{@ctn.source["database"]}' could not be found (provided via --only, variation `#{@ctn.name}/#{@var.name}' in `#{@ctn.src}')"
    end
    ttt = @app.opts[:suck_only]
  elsif @app.opts[:suck_except].any?
    unless (r = @app.opts[:suck_except] - at).empty?
      raise Container::TableNotFoundError, "table(s) `#{r * ", "}' for the database `#{@ctn.source["database"]}' could not be found (provided via --except, variation `#{@ctn.name}/#{@var.name}' in `#{@ctn.src}')"
    end
    ttt = ttt - @app.opts[:suck_except]
  end

  @data[:database] = @ctn.source["database"]
  @data[:tables_transfer] = ttt.length
  @data[:tables_transfer_list] = ttt
  @data[:window_col1] = ttt.map(&:length).max
  @data[:tables_total] = at.length
end
_start_ssh_poll() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 162
def _start_ssh_poll
  wait_lock = Queue.new
  @poll = app.spawn_thread(:sklaventreiber_ssh_poll) do |thr|
    thr[:force] = true
    thr[:iteration] = 0
    thr[:errors] = 0
    wait_lock << true
    begin
      @ctn.loop_ssh(0.1) {
        thr[:iteration] += 1
        thr[:last_iteration] = Time.current
        thr[:force] || @workers.select{|w| !w.done? || w.sshing }.any?
      }
    rescue Container::SSH::ChannelOpenFailedError
      thr[:errors] += 1
      sleep 0.5
      retry
    end

    if thr[:errors].zero?
      app.debug "SSH error count (#{thr[:errors]})"
    elsif thr[:errors] > 25
      app.warning "SSH error count (#{thr[:errors]}) is high! Verify remote MaxSessions setting or lower concurrent worker count."
    else
      app.warning "SSH errors occured (#{thr[:errors]})! Verify remote MaxSessions setting or lower concurrent worker count."
    end
  end
  wait_lock.pop
  sleep 0.01 until @poll[:iteration] && @poll[:iteration] > 0
end
pause_all_workers() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 38
def pause_all_workers
  sync { @workers.each {|wrk| pause_worker(wrk) } }
end
pause_worker(worker) click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 30
def pause_worker worker
  sync { worker.pause }
end
spooled() { || ... } click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 46
def spooled
  stdout_was = app.opts[:stdout]
  app.opts[:stdout] = SklavenTreiber::LogSpool.new(stdout_was) if app.opts[:window_enabled]
  yield if block_given?
ensure
  app.opts[:stdout].spooldown do |meth, args, time|
    stdout_was.send(meth, *args)
  end if app.opts[:stdout].respond_to?(:spooldown)
  app.opts[:stdout] = stdout_was
end
sync() { || ... } click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 26
def sync
  @monitor.synchronize { yield }
end
unpause_all_workers() click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 42
def unpause_all_workers
  sync { @workers.each {|wrk| unpause_worker(wrk) } }
end
unpause_worker(worker) click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 34
def unpause_worker worker
  sync { worker.unpause }
end
whip_it!(ctn, var) click to toggle source
# File lib/db_sucker/application/sklaven_treiber.rb, line 57
def whip_it! ctn, var
  @ctn, @var = ctn, var

  _start_ssh_poll
  _init_window
  _check_remote_tmp_directory
  _select_tables
  _initialize_slot_pools
  _initialize_workers
  @ctn.pv_utility # lazy load
  @poll[:force] = false
  @throughput.start_loop

  @sleep_before_exit = 3 if @window
  _run_consumers
ensure
  app.sandboxed do
    @status = ["terminating (canceling workers)", "red"]
    @workers.each {|w| catch(:abort_execution) { w.cancel! } }
  end
  app.sandboxed do
    @status = ["terminating (SSH poll)", "red"]
    if @poll
      @poll[:force] = false
      @poll.join
    end
  end
  @status = ["terminated", "red"]
  sleep @sleep_before_exit
  app.sandboxed { @window.try(:stop) }
  app.sandboxed { @ctn.try(:sftp_end) }
  app.sandboxed { @throughput.try(:stop_loop) }
  app.sandboxed { @slot_pools.each{|n, p| p.close! } }
  app.sandboxed do
    app.puts @window.try(:_render_final_results)
  end
  @ctn, @var = nil, nil
end