class Mobilize::Runner
Public Class Methods
find_by_path(path)
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 15 def Runner.find_by_path(path) Runner.where(:path=>path).first end
find_by_title(title)
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 19 def Runner.find_by_title(title) Runner.where(:path=>"#{title}/jobs").first end
find_or_create_by_path(path)
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 56 def Runner.find_or_create_by_path(path) Runner.where(:path=>path).first || Runner.create(:path=>path,:active=>true) end
perform(id,*args)
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 23 def Runner.perform(id,*args) r = Runner.find_by_path(id) #get gdrive slot for read gdrive_slot = Gdrive.slot_worker_by_path(r.path) || Gdrive.worker_emails.sort_by{rand}.first r.update_attributes(:started_at=>Time.now.utc) begin #make sure any updates to activity are processed first #as in when someone runs a "once" job that has completed r.update_gsheet(gdrive_slot) #read the jobs in the gsheet and update models with news r.read_gsheet(gdrive_slot) #queue up the jobs that are due and active rescue => exc #log the exception, but continue w job processing #This ensures jobs are still processed if google drive goes down r.update_status("Failed to read or update gsheet with #{exc.to_s} #{exc.backtrace.join(";")}") end r.jobs.each do |j| begin if j.is_due? j.update_attributes(:active=>false) if j.trigger=='once' s = j.stages.first s.update_attributes(:retries_done=>0) s.enqueue! end rescue ScriptError, StandardError => exc r.update_status("Failed to enqueue #{j.path}") end end r.update_gsheet(gdrive_slot) r.update_attributes(:completed_at=>Time.now.utc) end
Public Instance Methods
enqueue!()
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 114 def enqueue! r = self ::Resque::Job.create("mobilize",Runner,r.path,{}) return true end
force_due()
click to toggle source
update runner started_at to be whatever the notification is - 1.second which will force runner to be due
# File lib/mobilize-base/models/runner.rb, line 131 def force_due r = self r.update_attributes(:started_at=>(Time.now.utc - Jobtracker.runner_read_freq - 1.second)) end
is_due?()
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 136 def is_due? r = self.reload #make sure we're on the right server return false unless r.is_on_server? return false if r.is_working? prev_due_time = Time.now.utc - Jobtracker.runner_read_freq return true if r.started_at.nil? or r.started_at < prev_due_time end
is_on_server?()
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 120 def is_on_server? r = self u = r.user resque_server = u.resque_server current_server = Jobtracker.current_server return true if ['127.0.0.1',current_server].include?(resque_server) end
read_gsheet(gdrive_slot)
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 60 def read_gsheet(gdrive_slot) r = self #argument converts line breaks in cells to spaces gsheet_tsv = r.gsheet(gdrive_slot).to_tsv(" ") #turn it into a hash array gsheet_hashes = gsheet_tsv.tsv_to_hash_array #go through each job, update relevant job with its params done_jobs = [] #parse out the jobs and update the Job collection gsheet_hashes.each do |gsheet_hash| #skip non-jobs or jobs without required values next if (gsheet_hash['name'].to_s.first == "#" or ['name','active','trigger','stage1'].select{|c| gsheet_hash[c].to_s.strip==""}.length>0) #find job w this name, or make one j = r.jobs.select{|rj| rj.name == gsheet_hash['name']}.first || Job.find_or_create_by_path("#{r.path}/#{gsheet_hash['name']}") j.update_from_hash(gsheet_hash) r.update_status("Updated #{j.path} stages at #{Time.now.utc}") #add this job to list of read ones done_jobs << j end #delete user jobs that are not included in Runner (r.jobs.map{|j| j.path} - done_jobs.map{|j| j.path}).each do |gsheet_hash_path| j = Job.where(:path=>gsheet_hash_path).first j.delete if j r.update_status("Deleted job:#{gsheet_hash_path}") end r.update_status("jobs read at #{Time.now.utc}") return true end
update_gsheet(gdrive_slot)
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 89 def update_gsheet(gdrive_slot) r = self #there's nothing to update if runner has never had a completed at return false unless r.completed_at jobs_gsheet = r.gsheet(gdrive_slot) upd_jobs = r.jobs.select{|j| j.status_at and j.status_at.to_f > j.runner.completed_at.to_f} upd_rows = upd_rows = upd_jobs.map do |j| uj = {'name'=>j.name, 'status'=>j.status} #jobs can only be turned off #automatically, not back on if j.active==false uj['active'] = false end uj end jobs_gsheet.add_or_update_rows(upd_rows) r.update_status("gsheet updated") return true end
worker()
click to toggle source
# File lib/mobilize-base/models/runner.rb, line 109 def worker r = self Mobilize::Resque.find_worker_by_path(r.path) end