class DbSucker::Application::SklavenTreiber::Worker::IO::Base

Constants

DataIntegrityError
STATUS_FORMATTERS
UnknownFormatterError

Attributes

closing[R]
ctn[R]
entity[RW]
filesize[RW]
label[RW]
local[R]
mode[RW]
offset[R]
operror[R]
read_size[RW]
remote[R]
state[R]
status_format[R]
throughput[RW]
verify_handle[R]

Public Class Methods

new(worker, ctn, fd) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 14
def initialize worker, ctn, fd
  # references
  @worker = worker
  @ctn = ctn
  @thread = Thread.current

  # remote & local
  @remote = fd.is_a?(Hash) ? fd.keys[0] : fd
  @local = fd.values[0] if fd.is_a?(Hash)

  # defaults
  @label ||= "working"
  @entity ||= "task"
  @status_format = :off
  @mode = :fs
  @throughput = worker.sklaventreiber.throughput.register(self)
  @read_size = 128 * 1024 # 128kb
  @filesize = 0

  # callbacks
  @abort_if = Proc.new { false }
  @on_error = Proc.new {}
  @on_complete = Proc.new {}
  @on_success = Proc.new {}
  init
  reset_state
end

Public Instance Methods

abort_if(&block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 53
def abort_if &block
  @abort_if = block
end
execute(opts = {}) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 83
def execute opts = {}, &block
  opts = opts.reverse_merge(tries: 1, sleep_error: 0)
  try = 1
  begin
    reset_state
    throughput.measure(&block)
    if !@closing && !@worker.should_cancel
      @state = :done
      @on_success.call(self)
    end
  rescue StandardError => ex
    @operror = "##{try} #{ex.class}: #{ex.message}"
    @on_error.call(self, ex, @operror)
    try += 1
    if try > opts[:tries]
      raise ex
    else
      Thread.current.wait(opts[:sleep_error])
      retry
    end
  ensure
    @on_complete.call(self)
  end
ensure
  @throughput.unregister
end
handle_pause() click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 79
def handle_pause
  @thread[:paused] ? throughput.pause! : throughput.unpause!
end
init() click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 42
def init
end
on_complete(&block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 61
def on_complete &block
  @on_complete = block
end
on_error(&block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 57
def on_error &block
  @on_error = block
end
on_success(&block) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 65
def on_success &block
  @on_success = block
end
prepare_local_destination() click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 75
def prepare_local_destination
  FileUtils.mkdir_p(File.dirname(@local))
end
reset_state() click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 45
def reset_state
  @operror = nil
  @closing = false
  @state = :idle
  @offset = 0
  @throughput.reset_stats
end
status_format=(which) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 69
def status_format= which
  which = which.to_sym
  raise UnknownFormatterError, "unknown status format `#{which}', available options: #{STATUS_FORMATTERS * ", "}" unless STATUS_FORMATTERS.include?(which)
  @status_format = which
end
to_curses(target) click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 159
def to_curses target
  handle_pause
  _this = self
  tp = @throughput
  target.instance_eval do
    if _this.operror
      red "#{_this.operror}"
      return
    end

    if _this.closing
      red "[CLOSING] "
    end

    if _this.status_format == :none
      blue "#{_this.label}"
      break
    end

    case _this.state
    when :idle, :init
      yellow "#{_this.label}: "
      gray " initiating..."
    when :finishing
      yellow "#{_this.label}: "
      gray " finishing..."
    when :verifying
      yellow "#{_this.label}: "
      gray " verifying..."
      gray " #{_this.verify_handle.throughput.f_done_percentage}" if _this.verify_handle.respond_to?(:throughput)
    when :done
      if _this.mode == :nofs
        green "#{_this.entity || _this.label} complete(?): "
        cyan "#{human_bytes _this.offset}"
      else
        if _this.offset == _this.filesize
          green "#{_this.entity || _this.label} complete: #{tp.f_done_percentage}"
          yellow " – "
          cyan "#{human_bytes _this.offset}"
        else
          red "#{_this.entity || _this.label} INCOMPLETE: #{tp.f_done_percentage}"
          yellow " – "
          cyan "#{tp.f_byte_progress}"
        end
      end
    when :downloading, :copying, :decompressing, :working
      yellow "#{_this.label}: "
      if _this.mode == :nofs
        blue "#{tp.f_offset}"
        if _this.status_format == :minimal
          gray " [#{tp.f_runtime}]"
        elsif _this.status_format == :full
          gray " [#{tp.f_bps}/s – #{tp.f_runtime}]"
        end
        #progress_bar(-1)
      else
        diffp = tp.done_percentage
        color = diffp > 90 ? :green : diffp > 75 ? :blue : diffp > 50 ? :cyan : diffp > 25 ? :yellow : :red
        send(color, tp.f_done_percentage.rjust(7, " ") << " ")

        if _this.status_format == :minimal
          yellow "[#{tp.f_eta}]"
        elsif _this.status_format == :full
          yellow "[#{tp.f_eta} – #{tp.f_bps.rjust(9, " ")}/s]"
        end

        if _this.status_format == :full
          f_has, f_tot = tp.f_offset, tp.f_filesize
          gray " [#{f_has.rjust(f_tot.length, "0")}/#{f_tot}]"
        end

        progress_bar(diffp, prog_done_color: color, prog_current_color: color)
      end
    end
  end
end
to_s() click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/io/base.rb, line 110
def to_s
  handle_pause
  return @operror if @operror
  tp = @throughput

  [].tap do |r|
    r << "[CLOSING]" if @closing
    if @status_format == :none
      r << "#{@label}"
      break
    end
    case @state
    when :idle, :init
      r << "#{@label}:"
      r << " initiating..."
    when :finishing
      r << "#{@label}:"
      r << " finishing..."
    when :verifying
      r << "#{@label}:"
      r << " verifying..."
      r << " #{@verify_handle.throughput.f_done_percentage}" if @verify_handle.respond_to?(:throughput)
    when :done
      if @mode == :nofs
        r << "#{@entity || @label} complete(?): #{tp.f_offset}"
      else
        r << "#{@entity || @label} #{@offset == @filesize ? "complete" : "INCOMPLETE"}: #{tp.f_done_percentage} – #{tp.f_byte_progress}"
      end
    when :downloading, :copying, :decompressing, :working
      r << "#{@label}:"
      if @mode == :nofs
        r << "[#{tp.f_offset} – #{tp.f_bps}/s]"
      else
        r << tp.f_done_percentage.rjust(7, " ")
        if @status_format == :minimal
          r << "[#{tp.f_eta}]"
        elsif @status_format == :full
          r << "[#{tp.f_eta} – #{tp.f_bps.rjust(9, " ")}/s]"
        end

        if @status_format == :full
          f_has, f_tot = tp.f_offset, tp.f_filesize
          r << "[#{f_has.rjust(f_tot.length, "0")}/#{f_tot}]"
        end
      end
    end
  end * " "
end