module Mobilize::Jobtracker
Public Class Methods
admins()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 26 def Jobtracker.admins Jobtracker.config['admins'] end
config()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 3 def Jobtracker.config Base.config('jobtracker') end
current_server()
click to toggle source
# File lib/mobilize-base/jobtracker.rb, line 92 def Jobtracker.current_server server = case Base.env when "production", "staging" begin Socket.gethostbyname(Socket.gethostname).first rescue nil end else "127.0.0.1" end server end
cycle_freq()
click to toggle source
modify this to increase the frequency of request cycles
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 8 def Jobtracker.cycle_freq Jobtracker.config['cycle_freq'] end
deploy_servers()
click to toggle source
# File lib/mobilize-base/jobtracker.rb, line 78 def Jobtracker.deploy_servers servers = begin deploy_file_path = "#{Base.root}/config/deploy/#{Base.env}.rb" server_line = File.readlines(deploy_file_path).select{|l| l.strip.starts_with?("role ")}.first #reject arguments that start w symbols server_strings = server_line.split(",")[1..-1].reject{|t| t.strip.starts_with?(":")} server_strings.map{|ss| ss.gsub("'","").gsub('"','').strip} rescue # for dev/test ["127.0.0.1"] end servers end
disabled_methods()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 38 def Jobtracker.disabled_methods Jobtracker.config['disabled_methods'] end
enqueue!()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 100 def Jobtracker.enqueue! ::Resque::Job.create(Resque.queue_name, Jobtracker, 'jobtracker',{}) end
failures()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 84 def Jobtracker.failures Resque.failures end
get_args()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 68 def Jobtracker.get_args Resque.get_worker_args(Jobtracker.worker) end
kill_idle_workers()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 76 def Jobtracker.kill_idle_workers Resque.kill_idle_workers end
kill_workers()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 72 def Jobtracker.kill_workers Resque.kill_workers end
last_notification()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 130 def Jobtracker.last_notification return Jobtracker.get_args["last_notification"] if Jobtracker.get_args end
last_notification=(time)
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 134 def Jobtracker.last_notification=(time) Jobtracker.set_args({"last_notification"=>time}) end
max_run_time()
click to toggle source
long running tolerance
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 22 def Jobtracker.max_run_time Jobtracker.config['max_run_time'] end
max_run_time_workers()
click to toggle source
# File lib/mobilize-base/jobtracker.rb, line 6 def Jobtracker.max_run_time_workers #return workers who have been cranking away for 6+ hours workers = Jobtracker.workers('working').select do |w| w.job['run_at'].to_s.length>0 and (Time.now.utc - Time.parse(w.job['run_at'])) > Jobtracker.max_run_time end return workers end
notif_due?()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 138 def Jobtracker.notif_due? last_duetime = Time.now.utc - Jobtracker.notification_freq return (Jobtracker.last_notification.to_s.length==0 || Jobtracker.last_notification.to_datetime < last_duetime) end
notification_freq()
click to toggle source
frequency of notifications
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 13 def Jobtracker.notification_freq Jobtracker.config['notification_freq'] end
perform(id,*args)
click to toggle source
# File lib/mobilize-base/jobtracker.rb, line 106 def Jobtracker.perform(id,*args) while Jobtracker.status != 'stopping' users = User.all Jobtracker.run_notifications #run throush all users randomly #so none are privileged on JT restarts users.sort_by{rand}.each do |u| r = u.runner Jobtracker.update_status("Checking #{r.path}") if r.is_on_server? #check for run_now file run_now_dir = "/home/#{u.name}/mobilize/" run_now_path = "#{run_now_dir}run_now" run_now = if `sudo ls #{run_now_dir}`.split("\n").include?("run_now") #delete user's run now file `sudo rm -rf #{run_now_path}` true else false end r.force_due if run_now if r.is_due? r.enqueue! Jobtracker.update_status("Enqueued #{r.path}") end end sleep 5 end Jobtracker.update_status("told to stop") return true end
prep_workers()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 80 def Jobtracker.prep_workers Resque.prep_workers end
restart()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 58 def Jobtracker.restart Jobtracker.stop! Jobtracker.start end
restart!()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 104 def Jobtracker.restart! Jobtracker.stop! Jobtracker.start return true end
restart_workers!()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 110 def Jobtracker.restart_workers! Jobtracker.kill_workers sleep 10 Jobtracker.prep_workers Jobtracker.update_status("put workers back on the queue") end
run_notifications()
click to toggle source
# File lib/mobilize-base/jobtracker.rb, line 24 def Jobtracker.run_notifications if Jobtracker.notif_due? notifs = [] if Jobtracker.failures.length>0 failure_hash = Resque.new_failures_by_email failure_hash.each do |email,stage_paths| n = {} n['subject'] = "#{stage_paths.keys.length.to_s} new failed jobs, #{stage_paths.values.map{|v| v.values}.flatten.sum.to_s} failures" #one row per exception type, with the job name n['body'] = stage_paths.map do |path,exceptions| exceptions.map do |exc_to_s,times| [path," : ",exc_to_s,", ",times," times"].join end end.flatten.join("\n\n") u = User.where(:name=>email.split("@").first).first if u runner_dst = Dataset.find_by_url("gsheet://#{u.runner.path}") n['body'] += "\n\n#{runner_dst.http_url}" if runner_dst and runner_dst.http_url end n['to'] = email #uncomment to receive a copy of emails sent to users #n['bcc'] = [Gdrive.admin_group_name,Gdrive.domain].join("@") notifs << n end end lws = Jobtracker.max_run_time_workers if lws.length>0 bod = begin lws.map{|w| w.job['payload']['args']}.first.join("\n") rescue "Failed to get job names" end n = {} n['subject'] = "#{lws.length.to_s} max run time jobs" n['body'] = bod n['to'] = [Gdrive.admin_group_name,Gdrive.domain].join("@") notifs << n end #deliver each email generated notifs.each do |notif| begin Gmail.write(notif).deliver rescue #log email on failure Jobtracker.update_status("Failed to deliver #{notif.to_s}") end end #update notification time so JT knows to wait a while Jobtracker.last_notification = Time.now.utc.to_s Jobtracker.update_status("Sent notification at #{Jobtracker.last_notification}") end return true end
runner_read_freq()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 17 def Jobtracker.runner_read_freq Jobtracker.config['runner_read_freq'] end
set_args(args)
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 63 def Jobtracker.set_args(args) Resque.set_worker_args(Jobtracker.worker,args) return true end
start()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 88 def Jobtracker.start if Jobtracker.status!='stopped' Jobtracker.update_status("Jobtracker still #{Jobtracker.status}") else #make sure that workers are running and at the right number #Resque.prep_workers #queue up the jobtracker (starts the perform method) Jobtracker.enqueue! end return true end
start_worker(count=nil)
click to toggle source
# File lib/mobilize-base/jobtracker.rb, line 15 def Jobtracker.start_worker(count=nil) Resque.start_workers(count) end
status()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 42 def Jobtracker.status args = Jobtracker.get_args return args['status'] if args job = Resque.jobs.select{|j| j['args'].first=='jobtracker'}.first return 'queued' if job return 'stopped' end
stop!()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 117 def Jobtracker.stop! #send signal for Jobtracker to check for Jobtracker.update_status('stopping') sleep 5 i=0 while Jobtracker.status=='stopping' puts "#{Jobtracker.to_s} still on queue, waiting" sleep 5 i+=1 end return true end
update_status(msg)
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 50 def Jobtracker.update_status(msg) #this is to keep jobtracker from resisting stop commands return false if Jobtracker.status=="stopping" #Jobtracker has no persistent database state Resque.set_worker_args_by_path("jobtracker",{'status'=>msg}) return true end
worker()
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 30 def Jobtracker.worker Resque.find_worker_by_path("jobtracker") end
workers(state="all")
click to toggle source
# File lib/mobilize-base/helpers/jobtracker_helper.rb, line 34 def Jobtracker.workers(state="all") Resque.workers(state) end