class WorkFlow

This class orchestrator the workflow status and the workflow file analysis. It determines if workflow is over, what are the next jobs to execute, …

Public Instance Methods

clean() click to toggle source

Clean a workflow directory

# File lib/manband/workflow.rb, line 268
def clean
  if self.workdir == nil
    return
  end
  if File.directory? self.workdir
    FileUtils.rm_rf(self.workdir)
  end
end
delete(deletesub=true) click to toggle source

Delete workflow, its directory and all its components (jobs, messages,…) deletesub: delete sub workflows too

# File lib/manband/workflow.rb, line 279
def delete(deletesub=true)
  self.clean
  jobs = Job.all(:wid => self.id)
  if jobs!=nil
    jobs.destroy
  end
  links = JobLink.all(:wid => self.id)
  if links!=nil
    links.destroy
  end
  messages = BandMessage.all(:wid => self.id)
  if messages != nil
    messages.destroy
  end
  if deletesub
    subworkflows = WorkFlow.all(:parent => self.id)
    if subworkflows != nil
      subworkflows.each do |sub|
        sub.delete(false)
      end
    end
  end
  self.destroy
end
getnextjobs(curnode) click to toggle source

Get the list of jobs to be run after current node @return an array of node names

# File lib/manband/workflow.rb, line 63
def getnextjobs(curnode)
  #fworkflow = YAML.load_file(@file)
  fworkflow = BandManager.load(@file)
  if fworkflow==nil
    return nil
  end
  if fworkflow["workflow"][curnode]["next"] == nil
    @@log.debug "no next node, this branch is over"
    #isover?
    return nil
  end
  nexts = fworkflow["workflow"][curnode]["next"].split(',')
  if nexts[0].empty?
    @@log.debug "no next node, this branch is over"
    #isover?
    return nil
  end
  return nexts
end
getnodecommand(curnode) click to toggle source

Return a list of commands for the node in the workflow There is one command per input file matching regular expresssions, if any. Default is 1 command.

# File lib/manband/workflow.rb, line 161
def getnodecommand(curnode)
  #fworkflow = YAML.load_file(@file)
  fworkflow = BandManager.load(@file)
  if fworkflow==nil
    return nil
  end
  maincommand = fworkflow["workflow"][curnode]["command"]
  # Manage node regexp
  exprs = maincommand.scan(/#(node|var)\.(.*?)#/)
  if exprs.length == 0
    return [ fworkflow["workflow"][curnode]["command"] ]
  end
  subnodefilelist = Hash.new
  multinode=nil
  for reg in 0..exprs.length-1
    @@log.debug "Expr "+": "+exprs[reg][0]+" "+exprs[reg][1]
    if exprs[reg][0] == "var"
      maincommand = setruntimevars(exprs[reg][1],maincommand)
      if maincommand == nil
        @@log.error "Runtime var "+exprs[reg][1]+" is not defined for  workflow "+@id.to_s
        return nil
      end
    end
    if exprs[reg][0] == "node"
      # Get regexp for this node
      subnode = exprs[reg][1]
      # If regexp is empty, we jsut want the directory
      if fworkflow["workflow"][curnode][subnode]==nil
        return nil
      end
      if fworkflow["workflow"][curnode][subnode]['regexp'].strip == ''
        subnoderegexp = nil
      else
        subnoderegexp = Regexp.new(fworkflow["workflow"][curnode][subnode]['regexp'])
      end
      # List all files for this node regexp
      if subnode.match(/local/)
        # Local files reference
        nodepath = fworkflow["workflow"][curnode][subnode]['url']
      else
        # An other node reference
        if subnode == 'root'
          nodepath = self.workdir+'/root';
        else
          subjob = Job.first(:wid => @id, :node => subnode)
          nodepath = subjob.workdir
        end
      end
      
      filelist = Array.new
      if subnoderegexp == nil
        filelist.push(nodepath+"/")
      else
        if !File.exists?(nodepath)
          @@log.error("path does not exists!")
          return nil
        end
        Dir.new(nodepath).entries.each do |n|
          if subnoderegexp.match(n)
            filelist.push(nodepath+"/"+n)
          end
        end
      end
      @@log.debug "File list "+subnode+": "+filelist.to_s
      # update file list per node regexp in the command
      subnodefilelist[subnode]=filelist 
      if filelist.length>1
        if multinode!=nil
          # We do not support multiple lists in same command (N*N*N*....)
          return nil
        else
          multinode = subnode
        end
      end
    end
  end

  # TODO manage local URI regexp (or remote)
  # If remote: list then download

  # Now create an array of command with file lists substitution
  commands = Array.new
  subnodefilelist.each do |key,slist|
    if slist[0] == nil
      return nil
    end
    if key != multinode
      # replace, one match only allowed here
      maincommand["#node."+key+"#"]= slist[0]
    end
  end
  if multinode!=nil
    @@log.debug "Multinode: "+multinode
    subnodefilelist[multinode].each do |file|
      newcommand = String.new(maincommand)
      @@log.debug "Command: "+newcommand
      newcommand["#node."+multinode+"#"]= file
      commands.push(newcommand)
    end
  else
    commands.push(maincommand)
  end
  @@log.debug "Commands: "+commands.to_s
  return commands
end
isover?() click to toggle source

Checks if a workflow is over, e.g. we have reached all the terminal nodes (leafs).

# File lib/manband/workflow.rb, line 41
def isover?
  # decrement terminals
  @terminals = @terminals - 1
  curw = nil
  # Use lock if MYSQL
  if ENV['MYSQL_URL'].include?("mysql")
    DataMapper.repository(:default).adapter.execute("UPDATE work_flows SET terminals = terminals - 1 WHERE id="+@id.to_s);
    curw = WorkFlow.get(@id)
  else
    curw = WorkFlow.get(@id)
    curw.update(:terminals => @terminals)
  end
  if curw.terminals <=0
    @@log.info "Workflow "+@id.to_s+" is over"
    curw.update(:terminated_at => Time.now, :status => STATUS_OVER)
    return true
  end
  return false
end
parse(curnode, id = nil) click to toggle source

Parse workflow file and create jobs and links in the database curnode: current node id: id of the node as link originator

# File lib/manband/workflow.rb, line 86
def parse(curnode, id = nil)
  #fworkflow = YAML.load_file(@file)
  fworkflow = BandManager.load(@file)
  if fworkflow==nil
    return nil
  end
  jobs = getnextjobs(curnode)
  if jobs == nil
    return
  end
  jobs.each do |job|
    if Job.count(:wid => @id, :node => job) == 0
      queue = ""
      if fworkflow["workflow"][job]["queue"]!=nil
        queue = fworkflow["workflow"][job]["queue"]
      end
      status = STATUS_NEW
      if fworkflow["workflow"][job]["breakpoint"]!=nil
        @@log.debug "Node "+job+" has a breakpoint set"
        status = STATUS_SUSPEND
      end
      store = STORE_NO
      if (!fworkflow["options"].nil? && fworkflow["options"]["store"] == "all") || fworkflow["workflow"][job]["store"] == true
        @@log.debug "Add store option for job "+job
        store = STORE_DO
      end
      type = ACTOR
      if fworkflow["workflow"][job]["type"]=='if'
        type = IF_ACTOR
      end
      if fworkflow["workflow"][job]["type"]=='interactive'
        type = INTERACTIVE_ACTOR
      end
      workdir = FlowConfig.getjobdir(@workdir)
      if curnode == "root"
        workdir = self.workdir + "/root";
      end
      newjob = Job.new(:wid => @id, :node => job, :command => "", :status => status, :instances => 0, :maxinstances => 0, :queue => queue, :workdir => FlowConfig.getjobdir(@workdir), :store => store, :type => type)
      newjob.save
      if id != nil
        @@log.debug "Add link "+id.to_s+"->"+newjob.id.to_s+","+newjob.node
        link = JobLink.new(:wid => @id, :from => id, :to => newjob.id)
        link.save
      end
      parse(job,newjob.id)
    else
      if id!=nil
        # Already declared, just add link
        linkedjob = Job.first(:wid => @id, :node => job)
        @@log.debug "Add link "+id.to_s+"->"+linkedjob.id.to_s+","+linkedjob.node
        link = JobLink.new(:wid => @id, :from => id, :to => linkedjob.id)
        link.save
      end
    end
  end
end
setruntimevars(varexpr,command) click to toggle source

Update in command the runtime vars return modified command

# File lib/manband/workflow.rb, line 145
def setruntimevars(varexpr,command)
  newcommand = String.new(command)
  runtimevars = JSON.parse(self.vars)
  if runtimevars[varexpr]!=nil
    @@log.debug "Replace in command #var."+varexpr+"# by "+runtimevars[varexpr]
    newcommand["#var."+varexpr+"#"]= runtimevars[varexpr]
  else
    # runtime var is node defined
    return nil
  end
  return newcommand
end