module DbSucker::Application::SklavenTreiber::Worker::Core
Public Instance Methods
_cancelpoint(reason = nil)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 75 def _cancelpoint reason = nil if @should_cancel reason ||= @should_cancel if @should_cancel.is_a?(String) reason ||= @status[0] @should_cancel = false @state = :canceled @status = ["CANCELED#{" (was #{reason.to_s.gsub("[CLOSING] ", "")})" if reason}", "red"] throw :abort_execution, true true elsif @state == :failed throw :abort_execution, true true end _pausepoint end
_pausepoint()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 38 def _pausepoint sync do return if !(@state == :pausing || @state == :paused) return unless @pause_data return unless @thread == Thread.current @state = :paused @pause_data[:signal].broadcast end @thread[:paused] = true Thread.stop @thread[:paused] = false _cancelpoint end
aquire(thread)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 10 def aquire thread @thread = thread thread[:current_task] = descriptive thread[:current_worker] = self if m = thread[:managed_worker] debug "Consumer thread ##{m} aquired worker #{descriptive}" else debug "Main thread aquired worker #{descriptive}" end @status = ["initializing...", "gray"] @state = :aquired self end
cancel!(reason = nil, now = false)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 62 def cancel! reason = nil, now = false return if done? @should_cancel = reason || true unpause sync { _cancelpoint(reason) if pending? || now } end
fail!(reason, now = false)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 69 def fail! reason, now = false @status = ["FAILED(#{@current_perform}) #{reason}", "red"] @state = :failed throw :abort_execution, true if now end
pause(wait = false)
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 24 def pause wait = false sync do return if done? return if @state == :pausing || @state == :paused @pause_data = { state_was: @state, signal: @monitor.new_cond } if @state == :pending @state = :paused else @state = :pausing @pause_data[:signal].wait if wait end end end
priority()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 91 def priority 100 - ({ running: 50, aquired: 50, pausing: 50, paused: 30, canceled: 35, pending: 30, failed: 20, done: 10, }[@state] || 0) end
run()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 104 def run @state = :running @sshing = true @started = Time.current @download_state = { state: :idle, offset: 0 } @remote_files_to_remove = [] @local_files_to_remove = [] @current_perform = nil app.fire(:worker_routine_before_all, self) catch :abort_execution do perform.each_with_index do |m, i| @current_perform = m _cancelpoint @step = i + 1 r = catch(:abort_execution) { aquire_slots(*app.opts[:routine_pools][m.to_sym]) do begin r0 = Time.current app.fire(:worker_routine_before, self, @current_perform) send(:"_#{m}") ensure app.fire(:worker_routine_after, self, @current_perform) @timings[m] = Time.current - r0 end end nil } throw :abort_execution if r _cancelpoint end @status = ["DONE (#{runtime})", "green"] end rescue StandardError => ex @exception = ex fail! "#{ex.class}: #{ex.message}" Thread.main[:app].notify_exception("SklavenTreiber::Worker encountered an error in `#{@current_perform}' (ctn: #{ctn.name}, var: #{var.name}, db: #{ctn.source["database"]}, table: #{table})", ex) rescue Interrupt => ex @state = :failed ensure # cleanup temp files ctn.sftp_start do |sftp| @remote_files_to_remove.each do |file| sftp.remove!(file) rescue false end end if @remote_files_to_remove.any? # cleanup local temp files @local_files_to_remove.each do |file| File.unlink(file) rescue false end app.fire(:worker_routine_after_all, self) @state = :done if !canceled? && !failed? @ended = Time.current @sshing = false # debug timings debug "[Timings(#{table})] all: #{human_seconds(@timings.values.sum, 3)}, #{@timings.map{|a,t| "#{a}: #{human_seconds(t, 3)}" } * ", "}", 50 end
sync() { || ... }
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 6 def sync @monitor.synchronize { yield } end
unpause()
click to toggle source
# File lib/db_sucker/application/sklaven_treiber/worker/core.rb, line 52 def unpause sync do return if !(@state == :pausing || @state == :paused) return unless @pause_data @state = @pause_data[:state_was] @pause_data = false @thread.wakeup if @thread end end