class Resque::Pertry::Purger

Attributes

failed_jobs_limit[W]

sets the number of failed jobs to purge per run

sleep_time[W]

sleep time between purges

verbose[RW]

Public Class Methods

after_database_purge(&block) click to toggle source

allows an app to set a hook to deal with the failed persistence table job

# File lib/resque/pertry/purger.rb, line 58
def after_database_purge(&block)
  @after_database_purge = block
end
after_redis_purge(&block) click to toggle source

allows an app to set a hook to deal with the failed redis job

# File lib/resque/pertry/purger.rb, line 53
def after_redis_purge(&block)
  @after_redis_purge = block
end
failed_jobs_limit() click to toggle source
# File lib/resque/pertry/purger.rb, line 19
def failed_jobs_limit
  @failed_jobs_limit ||= 100
end
purge() click to toggle source

run a purge cycle

# File lib/resque/pertry/purger.rb, line 38
def purge
  procline("working")
  add_stat(:loops, 1)

  purge_resque
  purge_database
end
run() click to toggle source

main loop

# File lib/resque/pertry/purger.rb, line 28
def run
  setup

  loop do
    purge
    wait
  end
end
sleep_time() click to toggle source
# File lib/resque/pertry/purger.rb, line 15
def sleep_time
  @sleep_time ||= 5.minutes
end
stats() click to toggle source
# File lib/resque/pertry/purger.rb, line 23
def stats
  @stats ||= {}
end
status() click to toggle source

display status of failed queues and persistence table

# File lib/resque/pertry/purger.rb, line 47
def status
  show_config
  show_info
end

Private Class Methods

add_stat(stat, value) click to toggle source
# File lib/resque/pertry/purger.rb, line 200
def add_stat(stat, value)
  stats[stat] ||= 0
  stats[stat] += value
  value
end
log(string) click to toggle source

only print if verbose is turned on

# File lib/resque/pertry/purger.rb, line 191
def log(string)
  log!(string) if verbose
end
log!(string) click to toggle source

always print this string

# File lib/resque/pertry/purger.rb, line 196
def log!(string)
  $stdout.puts "#{Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")} - #{string}"
end
procline(string) click to toggle source

update process line

# File lib/resque/pertry/purger.rb, line 115
def procline(string)
  log(string)
  $0 = "resque-pertry purger: #{string}"
end
purge_database() click to toggle source

purge resque-pertry persistence table

# File lib/resque/pertry/purger.rb, line 160
def purge_database
  jobs = ResquePertryPersistence.finnished_or_expired.limit(failed_jobs_limit)
  return 0 if jobs.empty?

  log("purging #{jobs.size} completed, failed, or expired jobs from database")

  jobs.each do |job|
    ResquePertryPersistence.destroy(job.id)
    run_after_database_purge(job)
  end if @after_database_purge

  add_stat("purged from database", jobs.size)
end
purge_redis(redis) click to toggle source

purge resque’s failed queue on a redis client

# File lib/resque/pertry/purger.rb, line 145
def purge_redis(redis)
  failed_jobs = redis.lrange(:failed, 0, failed_jobs_limit - 1)
  return 0 if failed_jobs.empty?

  log("purging #{failed_jobs.size} failed jobs from #{redis.id}")
  redis.ltrim(:failed, failed_jobs.size, -1)

  failed_jobs.each do |failed_job|
    run_after_redis_purge(failed_job, redis)
  end if @after_redis_purge

  add_stat("purged from #{redis.id}", failed_jobs.size)
end
purge_resque() click to toggle source

purge jobs in the failed queue

# File lib/resque/pertry/purger.rb, line 121
 def purge_resque
   with_redis do |redis|
     purge_redis(redis)
   end
end
register_signal_handlers() click to toggle source

intercept signals

# File lib/resque/pertry/purger.rb, line 76
def register_signal_handlers
  trap("TERM") { shutdown }
  trap("INT")  { shutdown }
  trap("QUIT") { shutdown }
  trap("USR1") { status }
end
run_after_database_purge(job) click to toggle source

run hook after_database_purge

# File lib/resque/pertry/purger.rb, line 183
def run_after_database_purge(job)
  return unless @after_database_purge
  @after_database_purge.call(job)
rescue => e
  log!("exception #{e.inspect} while running hook after_database_purge on job #{job.inspect}")
end
run_after_redis_purge(job, redis) click to toggle source

run hook after_redis_purge

# File lib/resque/pertry/purger.rb, line 175
def run_after_redis_purge(job, redis)
  return unless @after_redis_purge
  @after_redis_purge.call(job, redis)
rescue => e
  log!("exception #{e.inspect} while running hook after_redis_purge on job #{job.inspect}")
end
set_stat(stat, value) click to toggle source
# File lib/resque/pertry/purger.rb, line 206
def set_stat(stat, value)
  stats[stat] = value
end
setup() click to toggle source

purger setup and init

# File lib/resque/pertry/purger.rb, line 65
def setup
  stats[:pid] = Process.pid
  stats[:started] = Time.now
  stats[:loops] = 0

  procline("starting")
  status
  register_signal_handlers
end
show_config() click to toggle source
# File lib/resque/pertry/purger.rb, line 83
def show_config
  log("Configuration:")
  [ :sleep_time, :failed_jobs_limit ].each do |v|
    log("\tconfig #{v} = #{send(v)}")
  end
end
show_info() click to toggle source
# File lib/resque/pertry/purger.rb, line 90
def show_info
  with_redis do |redis|
    set_stat("failed queue length on #{redis.id}", redis.llen(:failed))
  end
  
  log!("Status:")
  stats.each do |key, value|
    log!("\t#{key} : #{value}")
  end
end
shutdown() click to toggle source

shutdown the process

# File lib/resque/pertry/purger.rb, line 108
def shutdown
  procline("shutting down")
  status
  exit
end
wait() click to toggle source

just sleep for a while

# File lib/resque/pertry/purger.rb, line 102
def wait
  procline("sleeping for #{sleep_time} seconds")
  sleep(sleep_time)
end
with_redis(&block) click to toggle source
# File lib/resque/pertry/purger.rb, line 127
def with_redis(&block)
  return {} unless block_given?

  # testing the redis class name so we don't have to require resque_redis_composite
  case Resque.redis.redis.class.name
  when "Redis"
    { redis.id => block.call(Resque.redis) }
  when "Resque::RedisComposite"
    Resque.redis.mapping.reduce({}) do |results, (queue, redis)|
      results[redis.id] = block.call(redis)
      results
    end
  else
    raise NotImplementedError, "Unsupported redis client #{Resque.redis.inspect}"
  end
end