class Delayed::Monitor
Constants
- METRICS
Attributes
jobs[R]
Public Class Methods
new()
click to toggle source
# File lib/delayed/monitor.rb, line 20 def initialize @jobs = Job.group(priority_case_statement).group(:queue) @jobs = @jobs.where(queue: Worker.queues) if Worker.queues.any? end
Public Instance Methods
run!()
click to toggle source
# File lib/delayed/monitor.rb, line 25 def run! ActiveSupport::Notifications.instrument('delayed.monitor.run', default_tags) do METRICS.each { |metric| emit_metric!(metric) } end interruptable_sleep(sleep_delay) end
Private Instance Methods
alert_age_percent_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 97 def alert_age_percent_grouped oldest_workable_job_grouped.each_with_object({}) do |job, metrics| max_age = Job.db_time_now - job.run_at metrics[[job.priority.to_i, job.queue]] = [max_age / job.priority.alert_age * 100, 100].min if job.priority.alert_age end end
count_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 65 def count_grouped jobs.count end
default_results()
click to toggle source
# File lib/delayed/monitor.rb, line 45 def default_results @default_results ||= Priority.names.values.flat_map { |priority| (Worker.queues.presence || [Worker.default_queue_name]).map do |queue| [[priority.to_i, queue], 0] end }.to_h end
emit_metric!(metric)
click to toggle source
# File lib/delayed/monitor.rb, line 36 def emit_metric!(metric) send("#{metric}_grouped").reverse_merge(default_results).each do |(priority, queue), value| ActiveSupport::Notifications.instrument( "delayed.job.#{metric}", default_tags.merge(priority: Priority.new(priority).to_s, queue: queue, value: value), ) end end
erroring_count_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 77 def erroring_count_grouped jobs.erroring.count end
failed_count_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 81 def failed_count_grouped jobs.failed.count end
future_count_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 69 def future_count_grouped jobs.where("run_at > ?", Job.db_time_now).count end
locked_count_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 73 def locked_count_grouped jobs.locked.count end
max_age_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 91 def max_age_grouped oldest_workable_job_grouped.each_with_object({}) do |job, metrics| metrics[[job.priority.to_i, job.queue]] = Job.db_time_now - job.run_at end end
max_lock_age_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 85 def max_lock_age_grouped oldest_locked_job_grouped.each_with_object({}) do |job, metrics| metrics[[job.priority.to_i, job.queue]] = Job.db_time_now - job.locked_at end end
oldest_locked_job_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 112 def oldest_locked_job_grouped jobs.working.select("#{priority_case_statement} AS priority, queue, MIN(locked_at) AS locked_at") end
oldest_workable_job_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 116 def oldest_workable_job_grouped jobs.workable(Job.db_time_now).select("(#{priority_case_statement}) AS priority, queue, MIN(run_at) AS run_at") end
priority_case_statement()
click to toggle source
# File lib/delayed/monitor.rb, line 120 def priority_case_statement [ 'CASE', Priority.ranges.values.map do |range| [ "WHEN priority >= #{range.first.to_i}", ("AND priority < #{range.last.to_i}" unless range.last.infinite?), "THEN #{range.first.to_i}", ].compact end, 'END', ].flatten.join(' ') end
say(message)
click to toggle source
# File lib/delayed/monitor.rb, line 53 def say(message) Delayed.say(message) end
workable_count_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 104 def workable_count_grouped jobs.workable(Job.db_time_now).count end
working_count_grouped()
click to toggle source
# File lib/delayed/monitor.rb, line 108 def working_count_grouped jobs.working.count end