class Mobilize::Stage
Public Class Methods
find_by_path(path)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 26 def Stage.find_by_path(path) s = Stage.where(:path=>path).first return s end
find_or_create_by_path(path)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 20 def Stage.find_or_create_by_path(path) s = Stage.where(:path=>path).first s = Stage.create(:path=>path) unless s return s end
perform(id,*args)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 45 def Stage.perform(id,*args) s = Stage.where(:path=>id).first #check to make sure params are parsable begin param_hash = s.params raise ScriptError if param_hash.class!=Hash rescue StandardError, ScriptError s.fail({'signal'=>500, 'err_str'=>"Unable to parse stage params, make sure you don't have issues with your quotes, commas, or colons."}) end s.update_attributes(:started_at=>Time.now.utc) s.update_status(%{Starting at #{Time.now.utc}}) #get response by running method response = begin "Mobilize::#{s.handler.humanize}".constantize.send("#{s.call}_by_stage_path",s.path) rescue => exc {'err_str'=>"#{exc.to_s}\n#{exc.backtrace.to_a.join("\n")}", 'signal'=>500} end unless response #re-queue self if no response s.enqueue! return false end if response['signal'] == 0 s.complete(response) elsif s.retries_done.to_i < s.params['retries'].to_i #retry s.update_attributes(:retries_done => s.retries_done.to_i + 1, :response => response) s.update_status(%{Retry #{s.retries_done.to_s} at #{Time.now.utc}}) sleep s['delay'].to_i s.enqueue! else #sleep as much as user specifies s.fail(response) end return true end
Public Instance Methods
complete(response)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 83 def complete(response) s = self j = s.job r = j.runner if s.idx == j.stages.length #check for any dependent jobs, if there are, enqueue them dep_jobs = r.jobs.select do |dj| dj.active==true and dj.trigger.strip.downcase == "after #{j.name.downcase}" end #put begin/rescue so all dependencies run dep_jobs.each do |dj| begin dj_methods = dj.stages.map{|ds| "#{ds.handler}.#{ds.call}"}.uniq unless dj.is_working? or (Jobtracker.disabled_methods & dj_methods).length>0 dj.stages.first.update_attributes(:retries_done=>0) dj.stages.first.enqueue! end rescue #job won't run if error, log it a failure response = {"err_str" => "Unable to enqueue first stage of #{dj.path}"} dj.stages.first.fail(response) end end else #queue up next stage s.next.update_attributes(:retries_done=>0) s.next.enqueue! end #complete after dependencies are processed #to make sure it doesn't enqueue due to runner check s.update_attributes(:completed_at=>Time.now.utc,:response=>response) s.update_status("Completed at #{Time.now.utc.to_s}") #delete error sheet if any begin err_sheet_name = "#{j.name}_stage#{s.idx.to_s}.err" err_sheet_path = (r.path.split("/")[0..-2] + [err_sheet_name]).join("/") #get a slot, any slot gdrive_slot = Gdrive.worker_emails.sort_by{rand}.first err_sheet = Gsheet.find_by_path(err_sheet_path,gdrive_slot) err_sheet.delete if err_sheet rescue #if it doesn't delete, whatever end true end
enqueue!()
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 158 def enqueue! s = self ::Resque::Job.create("mobilize",Stage,s.path,{}) return true end
fail(response,gdrive_slot=nil)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 130 def fail(response,gdrive_slot=nil) #get random worker if one is not provided gdrive_slot ||= Gdrive.worker_emails.sort_by{rand}.first s = self j = s.job r = j.runner u = r.user s.update_attributes(:failed_at=>Time.now.utc,:response=>response) stage_name = "#{j.name}_stage#{s.idx.to_s}.err" target_path = (r.path.split("/")[0..-2] + [stage_name]).join("/") status_msg = "Failed at #{Time.now.utc.to_s}" #read err txt, add err sheet, write to it err_sheet = Gsheet.find_by_path(target_path,gdrive_slot) err_sheet.delete if err_sheet err_sheet = Gsheet.find_or_create_by_path(target_path,gdrive_slot) err_txt = if response['err_url'] Dataset.read_by_url(response['err_url'],u.name) elsif response['err_str'] response['err_str'] end err_txt = ["response","\n",err_txt].join err_sheet.write(err_txt,u.name) #exception will be first row below "response" header s.update_status(status_msg) #raise the exception so it bubbles up to resque raise Exception,err_txt end
is_working?()
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 186 def is_working? s = self Mobilize::Resque.active_paths.include?(s.path) end
next()
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 38 def next s = self j = s.job return nil if s.idx == j.stages.length return j.stages[s.idx] end
prior()
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 31 def prior s = self j = s.job return nil if s.idx==1 return j.stages[s.idx-2] end
set_worker_args(args)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 174 def set_worker_args(args) s = self Jobtracker.set_worker_args(s.worker,args) end
sources(gdrive_slot)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 210 def sources(gdrive_slot) #returns an array of Datasets corresponding to #items listed as sources in the stage params s = self params = s.params job = s.job runner = job.runner source_paths = if params['sources'] params['sources'] elsif params['source'] [params['source']] end return [] if (source_paths.class!=Array or source_paths.length==0) dsts = [] source_paths.each do |source_path| if source_path.index(/^stage[1-5]$/) #stage arguments return the stage's output dst url source_stage_path = "#{runner.path}/#{job.name}/#{source_path}" source_stage = Stage.where(:path=>source_stage_path).first source_stage_out_url = source_stage.response['out_url'] dsts << Dataset.find_by_url(source_stage_out_url) else handler = if source_path.index("://") source_path.split("://").first else s.handler end begin stage_path = s.path dsts << "Mobilize::#{handler.downcase.capitalize}".constantize.path_to_dst(source_path,stage_path,gdrive_slot) rescue => exc raise "Could not get #{source_path} with error: #{exc.to_s}" end end end return dsts end
target()
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 191 def target s = self params = s.params target_path = params['target'] handler,path = target_path.split("://") #if the user has specified a url for a target #that is not this stage's handler, disallow if handler and path and handler != s.handler raise "incompatible target handler #{handler} for #{s.handler} stage" else begin #nil gdrive_slot for targets since there is no verification return "Mobilize::#{s.handler.downcase.capitalize}".constantize.path_to_dst(target_path,s.path,nil) rescue => exc raise "Could not get #{target_path} with error: #{exc.to_s}" end end end
update_status(msg)
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 179 def update_status(msg) s = self s.update_attributes(:status=>msg,:status_at=>Time.now.utc) Mobilize::Resque.set_worker_args_by_path(s.path,{'status'=>msg}) return true end
worker()
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 164 def worker s = self Mobilize::Resque.find_worker_by_path(s.path) end
worker_args()
click to toggle source
# File lib/mobilize-base/models/stage.rb, line 169 def worker_args s = self Jobtracker.get_worker_args(s.worker) end