class Delayed::Worker
Attributes
Sets the name of the worker. Setting the name to nil will reset the default worker name
name_prefix
is ignored if name is set directly
Public Class Methods
# File lib/delayed/worker.rb, line 37 def self.delay_job?(job) if delay_jobs.is_a?(Proc) delay_jobs.arity == 1 ? delay_jobs.call(job) : delay_jobs.call else delay_jobs end end
# File lib/delayed/worker.rb, line 45 def initialize @failed_reserve_count = 0 # Reset lifecycle on the offhand chance that something lazily # triggered its creation before all plugins had been registered. Delayed.setup_lifecycle end
Public Instance Methods
# File lib/delayed/worker.rb, line 168 def failed(job) self.class.lifecycle.run_callbacks(:failure, self, job) do job.hook(:failure) rescue StandardError => e say "Error when running failure callback: #{e}", 'error' say e.backtrace.join("\n"), 'error' ensure job.destroy_failed_jobs? ? job.destroy : job.fail! end end
# File lib/delayed/worker.rb, line 179 def job_say(job, text, level = Delayed.default_log_level) text = "Job #{job.name} (id=#{job.id})#{say_queue(job.queue)} #{text}" say text, level end
# File lib/delayed/worker.rb, line 189 def max_attempts(job) job.max_attempts || self.class.max_attempts end
# File lib/delayed/worker.rb, line 193 def max_run_time(job) job.max_run_time || self.class.max_run_time end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker restarts: Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 57 def name return @name unless @name.nil? begin "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue StandardError "#{@name_prefix}pid:#{Process.pid}" end end
# File lib/delayed/worker.rb, line 84 def on_exit! Delayed::Job.clear_locks!(name) end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 156 def reschedule(job, time = nil) if (job.attempts += 1) < max_attempts(job) time ||= job.reschedule_at job.run_at = time job.unlock job.save! else job_say job, "FAILED permanently because of #{job.attempts} consecutive failures", 'error' failed(job) end end
# File lib/delayed/worker.rb, line 124 def run(job) metadata = { status: 'RUNNING', name: job.name, run_at: job.run_at, created_at: job.created_at, priority: job.priority, queue: job.queue, attempts: job.attempts, enqueued_for: (Time.current - job.created_at).round, } job_say job, metadata.to_json run_time = Benchmark.realtime do Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do job.invoke_job end job.destroy end job_say job, format('COMPLETED after %.4f seconds', run_time) true # did work rescue DeserializationError => e job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error' job.error = e failed(job) rescue Exception => e # rubocop:disable Lint/RescueException self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, e) } false # work failed end
# File lib/delayed/worker.rb, line 71 def run! @realtime = Benchmark.realtime do @result = work_off end count = @result[0] + @result[1] say format("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last) if count.positive? interruptable_sleep(self.class.sleep_delay) if count < max_claims reload! unless stop? end
# File lib/delayed/worker.rb, line 120 def run_thread_callbacks(job, &block) self.class.lifecycle.run_callbacks(:thread, self, job, &block) end
# File lib/delayed/worker.rb, line 184 def say(text, level = Delayed.default_log_level) text = "[Worker(#{name})] #{text}" Delayed.say("#{Time.now.strftime('%FT%T%z')}: #{text}", level) end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 90 def work_off(num = 100) success = Concurrent::AtomicFixnum.new(0) failure = Concurrent::AtomicFixnum.new(0) num.times do jobs = reserve_jobs break if jobs.empty? pool = Concurrent::FixedThreadPool.new(jobs.length) jobs.each do |job| pool.post do run_thread_callbacks(job) do if run_job(job) success.increment else failure.increment end end end end pool.shutdown pool.wait_for_termination break if stop? # leave if we're exiting end [success, failure].map(&:value) end
Protected Instance Methods
# File lib/delayed/worker.rb, line 203 def handle_failed_job(job, error) job.error = error job_say job, "FAILED (#{job.attempts} prior attempts) with #{error.class.name}: #{error.message}", 'error' reschedule(job) end
# File lib/delayed/worker.rb, line 229 def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end
The backend adapter may return either a list or a single job In some backends, this can be controlled with the `max_claims` config Either way, we map this to an array of job instances
# File lib/delayed/worker.rb, line 216 def reserve_jobs jobs = [Delayed::Job.reserve(self)].compact.flatten(1) @failed_reserve_count = 0 jobs rescue ::Exception => e # rubocop:disable Lint/RescueException say "Error while reserving job(s): #{e}" Delayed::Job.recover_from(e) @failed_reserve_count += 1 raise FatalBackendError if @failed_reserve_count >= 10 [] end
# File lib/delayed/worker.rb, line 209 def run_job(job) self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } end
# File lib/delayed/worker.rb, line 199 def say_queue(queue) " (queue=#{queue})" if queue end