module Mobilize::Resque
Public Class Methods
active_paths()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 56 def Resque.active_paths #first argument of the payload is the runner / stage path unless the worker is Jobtracker Resque.jobs('active').compact.map{|j| j['args'].first unless j['class']=='Jobtracker'}.compact end
config()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 3 def Resque.config Base.config('resque') end
failures()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 37 def Resque.failures ::Resque::Failure.all(0,0).select{|f| f['queue'] == Resque.queue_name} end
find_worker_by_path(path)
click to toggle source
Resque
workers and methods to find
# File lib/mobilize-base/handlers/resque.rb, line 62 def Resque.find_worker_by_path(path) Resque.workers('working').select{|w| w.job.ie{|j| j and j['payload'] and j['payload']['args'].first == path}}.first end
get_worker_args(worker)
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 79 def Resque.get_worker_args(worker) key = "worker:#{worker}" json = ::Resque.redis.get(key) if json hash = JSON.parse(json) hash['payload']['args'].last end end
jobs(state="active")
click to toggle source
active state refers to jobs that are either queued or working
# File lib/mobilize-base/handlers/resque.rb, line 42 def Resque.jobs(state="active") working_jobs = Resque.workers('working').map{|w| w.job['payload']} return working_jobs if state == 'working' queued_jobs = ::Resque.peek(Resque.queue_name,0,0).to_a return queued_jobs if state == 'queued' return working_jobs + queued_jobs if state == 'active' failed_jobs = Resque.failures.map{|f| f['payload']} return failed_jobs if state == 'failed' timeout_jobs = Resque.workers("timeout").map{|w| w.job['payload']} return timeout_jobs if state == 'timeout' return working_jobs + queued_jobs + failed_jobs if state == 'all' raise "invalid state #{state}" end
kill_idle_and_stale_workers()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 167 def Resque.kill_idle_and_stale_workers idle_pids = Resque.workers('idle').select{|w| w.job=={}}.map{|w| w.to_s.split(":").second} stale_pids = Resque.workers('stale').select{|w| w.job=={}}.map{|w| w.to_s.split(":").second} idle_stale_pids = (idle_pids & stale_pids) if idle_stale_pids.length == 0 return false else "kill -QUIT #{idle_stale_pids.join(" ")}".bash(false) end return true end
kill_idle_workers(count=nil)
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 155 def Resque.kill_idle_workers(count=nil) idle_pids = Resque.workers('idle').select{|w| w.job=={}}.map{|w| w.to_s.split(":").second} if count.to_i > idle_pids.length or count == 0 return false elsif count "kill -QUIT #{idle_pids[0..count-1].join(" ")}".bash(false) else "kill -QUIT #{idle_pids.join(" ")}".bash(false) end return true end
kill_workers(count=nil)
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 179 def Resque.kill_workers(count=nil) pids = Resque.workers.map{|w| w.to_s.split(":").second} if count.to_i > pids.length or count == 0 return false elsif count "kill -QUIT #{pids[0..count-1].join(" ")}".bash(false) elsif pids.length>0 "kill -QUIT #{pids.join(" ")}".bash(false) else return false end return true end
log_path()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 15 def Resque.log_path Base.log_path("mobilize-resque-#{::Mobilize::Base.env}") end
new_failures_by_email()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 110 def Resque.new_failures_by_email fjobs = {} exc_to_s = Hash.new(0) Resque.failures.each_with_index do |f,f_i| #skip if already notified next if f['notified'] #try to send message to stage owner, where appropriate stage_path = f['payload']['args'].first email = begin s = Stage.where(:path=>stage_path).first if s.params['notify'].to_s=="false" next elsif s.params['notify'].to_s.index("@") s.params['notify'] else s.job.runner.user.email end rescue ScriptError, StandardError #jobs without stages are sent to admins [Gdrive.admin_group_name,Gdrive.domain].join("@") end exc_to_s = f['error'] if fjobs[email].nil? fjobs[email] = {stage_path => {exc_to_s => 1}} elsif fjobs[email][stage_path].nil? fjobs[email][stage_path] = {exc_to_s => 1} elsif fjobs[email][stage_path][exc_to_s].nil? fjobs[email][stage_path][exc_to_s] = 1 else fjobs[email][stage_path][exc_to_s] += 1 end #add notified flag to redis f['notified'] = true #tag stage with email ::Resque.redis.lset(:failed, f_i, ::Resque.encode(f)) end return fjobs end
prep_workers(max_workers=Resque.config['max_workers'])
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 193 def Resque.prep_workers(max_workers=Resque.config['max_workers']) curr_workers = Resque.workers.length if curr_workers > max_workers #kill as many idlers as necessary Resque.kill_idle_workers(curr_workers - max_workers) #wait a few secs for these guys to die sleep 10 curr_workers = Resque.workers.length if curr_workers > max_workers #kill working workers Resque.kill_workers(curr_workers - max_workers) end else Resque.start_workers(max_workers-curr_workers) end return true end
queue_name()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 7 def Resque.queue_name Resque.config['queue_name'] end
queues()
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 11 def Resque.queues Base.queues end
set_worker_args(worker,args)
click to toggle source
takes a worker and invokes redis to set the last value in its second arg array element by our convention this is a Hash
# File lib/mobilize-base/handlers/resque.rb, line 90 def Resque.set_worker_args(worker,args) key = "worker:#{worker}" json = ::Resque.redis.get(key) if json hash = JSON.parse(json) payload_args = hash['payload']['args'] #jobmaster only gets one arg if payload_args[1].nil? payload_args[1] = args else payload_args[1] = {} unless payload_args[1].class==Hash args.keys.each{|k,v| payload_args[1][k] = args[k]} end ::Resque.redis.set(key,hash.to_json) return true else return false end end
set_worker_args_by_path(path,args)
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 66 def Resque.set_worker_args_by_path(path,args) #this only works on working workers worker = Resque.find_worker_by_path(path) args_string = args.map{|k,v| "#{k}: #{v}"}.join(";") #also fire a log, cap logfiles at 10 MB worker_string = worker ? worker.to_s : "no worker" info_msg = "[#{worker_string} for #{path}: #{Time.now.utc}] #{args_string}" Logger.new(Resque.log_path, 10, 1024*1000*10).info(info_msg) return false unless worker Resque.set_worker_args(worker,args) return true end
start_workers(count=1)
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 149 def Resque.start_workers(count=1) count.times do "(cd #{Base.root};rake mobilize:work[#{Base.env}]) >> #{Resque.log_path} 2>&1 &".bash end end
workers(state="all")
click to toggle source
# File lib/mobilize-base/handlers/resque.rb, line 19 def Resque.workers(state="all") workers = ::Resque.workers.select{|w| w.queues.first == Resque.queue_name} return workers if state == 'all' working_workers = workers.select{|w| w.job['queue'] == Resque.queue_name} return working_workers if state == 'working' idle_workers = workers.select{|w| w.job['queue'].nil?} return idle_workers if state == 'idle' stale_workers = workers.select{|w| Time.parse(w.started) < Jobtracker.deployed_at} return stale_workers if state == 'stale' timeout_workers = workers.select do |w| w.job['payload'] and w.job['payload']['class']!='Jobtracker' and w.job['run_at'] < (Time.now.utc - Jobtracker.max_run_time) end return timeout_workers if state == 'timeout' raise "invalid state #{state}" end