class Mobilize::Stage

Public Class Methods

find_by_path(path) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 26
def Stage.find_by_path(path)
  s = Stage.where(:path=>path).first
  return s
end
find_or_create_by_path(path) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 20
def Stage.find_or_create_by_path(path)
  s = Stage.where(:path=>path).first
  s = Stage.create(:path=>path) unless s
  return s
end
perform(id,*args) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 45
def Stage.perform(id,*args)
  s = Stage.where(:path=>id).first
  #check to make sure params are parsable
  begin
    param_hash = s.params
    raise ScriptError if param_hash.class!=Hash
  rescue StandardError, ScriptError
    s.fail({'signal'=>500,
            'err_str'=>"Unable to parse stage params, make sure you don't have issues with your quotes, commas, or colons."})
  end
  s.update_attributes(:started_at=>Time.now.utc)
  s.update_status(%{Starting at #{Time.now.utc}})
  #get response by running method
  response = begin
               "Mobilize::#{s.handler.humanize}".constantize.send("#{s.call}_by_stage_path",s.path)
             rescue => exc
               {'err_str'=>"#{exc.to_s}\n#{exc.backtrace.to_a.join("\n")}", 'signal'=>500}
             end
  unless response
    #re-queue self if no response
    s.enqueue!
    return false
  end
  if response['signal'] == 0
    s.complete(response)
  elsif s.retries_done.to_i < s.params['retries'].to_i
    #retry
    s.update_attributes(:retries_done => s.retries_done.to_i + 1, :response => response)
    s.update_status(%{Retry #{s.retries_done.to_s} at #{Time.now.utc}})
    sleep s['delay'].to_i
    s.enqueue!
  else
    #sleep as much as user specifies
    s.fail(response)
  end
  return true
end

Public Instance Methods

complete(response) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 83
def complete(response)
  s = self
  j = s.job
  r = j.runner
  if s.idx == j.stages.length
    #check for any dependent jobs, if there are, enqueue them
    dep_jobs = r.jobs.select do |dj|
                               dj.active==true and
                                 dj.trigger.strip.downcase == "after #{j.name.downcase}"
                             end
    #put begin/rescue so all dependencies run
    dep_jobs.each do |dj|
                    begin
                      dj_methods = dj.stages.map{|ds| "#{ds.handler}.#{ds.call}"}.uniq
                      unless dj.is_working? or (Jobtracker.disabled_methods & dj_methods).length>0
                        dj.stages.first.update_attributes(:retries_done=>0)
                        dj.stages.first.enqueue!
                      end
                    rescue
                      #job won't run if error, log it a failure
                      response = {"err_str" => "Unable to enqueue first stage of #{dj.path}"}
                      dj.stages.first.fail(response)
                    end
                  end
  else
    #queue up next stage
    s.next.update_attributes(:retries_done=>0)
    s.next.enqueue!
  end
  #complete after dependencies are processed
  #to make sure it doesn't enqueue due to runner check
  s.update_attributes(:completed_at=>Time.now.utc,:response=>response)
  s.update_status("Completed at #{Time.now.utc.to_s}")
  #delete error sheet if any
  begin
    err_sheet_name = "#{j.name}_stage#{s.idx.to_s}.err"
    err_sheet_path =  (r.path.split("/")[0..-2] + [err_sheet_name]).join("/")
    #get a slot, any slot
    gdrive_slot = Gdrive.worker_emails.sort_by{rand}.first
    err_sheet = Gsheet.find_by_path(err_sheet_path,gdrive_slot)
    err_sheet.delete if err_sheet
  rescue
    #if it doesn't delete, whatever
  end
  true
end
enqueue!() click to toggle source
# File lib/mobilize-base/models/stage.rb, line 158
def enqueue!
  s = self
  ::Resque::Job.create("mobilize",Stage,s.path,{})
  return true
end
fail(response,gdrive_slot=nil) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 130
def fail(response,gdrive_slot=nil)
  #get random worker if one is not provided
  gdrive_slot ||= Gdrive.worker_emails.sort_by{rand}.first
  s = self
  j = s.job
  r = j.runner
  u = r.user
  s.update_attributes(:failed_at=>Time.now.utc,:response=>response)
  stage_name = "#{j.name}_stage#{s.idx.to_s}.err"
  target_path =  (r.path.split("/")[0..-2] + [stage_name]).join("/")
  status_msg = "Failed at #{Time.now.utc.to_s}"
  #read err txt, add err sheet, write to it
  err_sheet = Gsheet.find_by_path(target_path,gdrive_slot)
  err_sheet.delete if err_sheet
  err_sheet = Gsheet.find_or_create_by_path(target_path,gdrive_slot)
  err_txt = if response['err_url']
              Dataset.read_by_url(response['err_url'],u.name)
            elsif response['err_str']
              response['err_str']
            end
  err_txt = ["response","\n",err_txt].join
  err_sheet.write(err_txt,u.name)
  #exception will be first row below "response" header
  s.update_status(status_msg)
  #raise the exception so it bubbles up to resque
  raise Exception,err_txt
end
is_working?() click to toggle source
# File lib/mobilize-base/models/stage.rb, line 186
def is_working?
  s = self
  Mobilize::Resque.active_paths.include?(s.path)
end
next() click to toggle source
# File lib/mobilize-base/models/stage.rb, line 38
def next
  s = self
  j = s.job
  return nil if s.idx == j.stages.length
  return j.stages[s.idx]
end
prior() click to toggle source
# File lib/mobilize-base/models/stage.rb, line 31
def prior
  s = self
  j = s.job
  return nil if s.idx==1
  return j.stages[s.idx-2]
end
set_worker_args(args) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 174
def set_worker_args(args)
  s = self
  Jobtracker.set_worker_args(s.worker,args)
end
sources(gdrive_slot) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 210
def sources(gdrive_slot)
  #returns an array of Datasets corresponding to
  #items listed as sources in the stage params
  s = self
  params = s.params
  job = s.job
  runner = job.runner
  source_paths = if params['sources']
                   params['sources']
                 elsif params['source']
                   [params['source']]
                 end
  return [] if (source_paths.class!=Array or source_paths.length==0)
  dsts = []
  source_paths.each do |source_path|
    if source_path.index(/^stage[1-5]$/)
      #stage arguments return the stage's output dst url
      source_stage_path = "#{runner.path}/#{job.name}/#{source_path}"
      source_stage = Stage.where(:path=>source_stage_path).first
      source_stage_out_url = source_stage.response['out_url']
      dsts << Dataset.find_by_url(source_stage_out_url)
    else
      handler = if source_path.index("://")
                  source_path.split("://").first
                else
                  s.handler
                end
      begin
        stage_path = s.path
        dsts << "Mobilize::#{handler.downcase.capitalize}".constantize.path_to_dst(source_path,stage_path,gdrive_slot)
      rescue => exc
        raise "Could not get #{source_path} with error: #{exc.to_s}"
      end
    end
  end
  return dsts
end
target() click to toggle source
# File lib/mobilize-base/models/stage.rb, line 191
def target
  s = self
  params = s.params
  target_path = params['target']
  handler,path = target_path.split("://")
  #if the user has specified a url for a target
  #that is not this stage's handler, disallow
  if handler and path and handler != s.handler
    raise "incompatible target handler #{handler} for #{s.handler} stage"
  else
    begin
      #nil gdrive_slot for targets since there is no verification
      return "Mobilize::#{s.handler.downcase.capitalize}".constantize.path_to_dst(target_path,s.path,nil)
    rescue => exc
      raise "Could not get #{target_path} with error: #{exc.to_s}"
    end
  end
end
update_status(msg) click to toggle source
# File lib/mobilize-base/models/stage.rb, line 179
def update_status(msg)
  s = self
  s.update_attributes(:status=>msg,:status_at=>Time.now.utc)
  Mobilize::Resque.set_worker_args_by_path(s.path,{'status'=>msg})
  return true
end
worker() click to toggle source
# File lib/mobilize-base/models/stage.rb, line 164
def worker
  s = self
  Mobilize::Resque.find_worker_by_path(s.path)
end
worker_args() click to toggle source
# File lib/mobilize-base/models/stage.rb, line 169
def worker_args
  s = self
  Jobtracker.get_worker_args(s.worker)
end