class Delayed::Monitor

Constants

METRICS

Attributes

jobs[R]

Public Class Methods

new() click to toggle source
# File lib/delayed/monitor.rb, line 20
def initialize
  @jobs = Job.group(priority_case_statement).group(:queue)
  @jobs = @jobs.where(queue: Worker.queues) if Worker.queues.any?
end

Public Instance Methods

run!() click to toggle source
# File lib/delayed/monitor.rb, line 25
def run!
  ActiveSupport::Notifications.instrument('delayed.monitor.run', default_tags) do
    METRICS.each { |metric| emit_metric!(metric) }
  end
  interruptable_sleep(sleep_delay)
end

Private Instance Methods

alert_age_percent_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 97
def alert_age_percent_grouped
  oldest_workable_job_grouped.each_with_object({}) do |job, metrics|
    max_age = Job.db_time_now - job.run_at
    metrics[[job.priority.to_i, job.queue]] = [max_age / job.priority.alert_age * 100, 100].min if job.priority.alert_age
  end
end
count_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 65
def count_grouped
  jobs.count
end
default_results() click to toggle source
# File lib/delayed/monitor.rb, line 45
def default_results
  @default_results ||= Priority.names.values.flat_map { |priority|
    (Worker.queues.presence || [Worker.default_queue_name]).map do |queue|
      [[priority.to_i, queue], 0]
    end
  }.to_h
end
default_tags() click to toggle source
# File lib/delayed/monitor.rb, line 57
def default_tags
  @default_tags ||= {
    table: Job.table_name,
    database: Job.database_name,
    database_adapter: Job.database_adapter_name,
  }
end
emit_metric!(metric) click to toggle source
# File lib/delayed/monitor.rb, line 36
def emit_metric!(metric)
  send("#{metric}_grouped").reverse_merge(default_results).each do |(priority, queue), value|
    ActiveSupport::Notifications.instrument(
      "delayed.job.#{metric}",
      default_tags.merge(priority: Priority.new(priority).to_s, queue: queue, value: value),
    )
  end
end
erroring_count_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 77
def erroring_count_grouped
  jobs.erroring.count
end
failed_count_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 81
def failed_count_grouped
  jobs.failed.count
end
future_count_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 69
def future_count_grouped
  jobs.where("run_at > ?", Job.db_time_now).count
end
locked_count_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 73
def locked_count_grouped
  jobs.locked.count
end
max_age_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 91
def max_age_grouped
  oldest_workable_job_grouped.each_with_object({}) do |job, metrics|
    metrics[[job.priority.to_i, job.queue]] = Job.db_time_now - job.run_at
  end
end
max_lock_age_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 85
def max_lock_age_grouped
  oldest_locked_job_grouped.each_with_object({}) do |job, metrics|
    metrics[[job.priority.to_i, job.queue]] = Job.db_time_now - job.locked_at
  end
end
oldest_locked_job_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 112
def oldest_locked_job_grouped
  jobs.working.select("#{priority_case_statement} AS priority, queue, MIN(locked_at) AS locked_at")
end
oldest_workable_job_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 116
def oldest_workable_job_grouped
  jobs.workable(Job.db_time_now).select("(#{priority_case_statement}) AS priority, queue, MIN(run_at) AS run_at")
end
priority_case_statement() click to toggle source
# File lib/delayed/monitor.rb, line 120
def priority_case_statement
  [
    'CASE',
    Priority.ranges.values.map do |range|
      [
        "WHEN priority >= #{range.first.to_i}",
        ("AND priority < #{range.last.to_i}" unless range.last.infinite?),
        "THEN #{range.first.to_i}",
      ].compact
    end,
    'END',
  ].flatten.join(' ')
end
say(message) click to toggle source
# File lib/delayed/monitor.rb, line 53
def say(message)
  Delayed.say(message)
end
workable_count_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 104
def workable_count_grouped
  jobs.workable(Job.db_time_now).count
end
working_count_grouped() click to toggle source
# File lib/delayed/monitor.rb, line 108
def working_count_grouped
  jobs.working.count
end