class WorkFlow
This class orchestrator the workflow status and the workflow file analysis. It determines if workflow is over, what are the next jobs to execute, …
Public Instance Methods
Clean a workflow directory
# File lib/manband/workflow.rb, line 268 def clean if self.workdir == nil return end if File.directory? self.workdir FileUtils.rm_rf(self.workdir) end end
Delete workflow, its directory and all its components (jobs, messages,…) deletesub: delete sub workflows too
# File lib/manband/workflow.rb, line 279 def delete(deletesub=true) self.clean jobs = Job.all(:wid => self.id) if jobs!=nil jobs.destroy end links = JobLink.all(:wid => self.id) if links!=nil links.destroy end messages = BandMessage.all(:wid => self.id) if messages != nil messages.destroy end if deletesub subworkflows = WorkFlow.all(:parent => self.id) if subworkflows != nil subworkflows.each do |sub| sub.delete(false) end end end self.destroy end
Get the list of jobs to be run after current node @return an array of node names
# File lib/manband/workflow.rb, line 63 def getnextjobs(curnode) #fworkflow = YAML.load_file(@file) fworkflow = BandManager.load(@file) if fworkflow==nil return nil end if fworkflow["workflow"][curnode]["next"] == nil @@log.debug "no next node, this branch is over" #isover? return nil end nexts = fworkflow["workflow"][curnode]["next"].split(',') if nexts[0].empty? @@log.debug "no next node, this branch is over" #isover? return nil end return nexts end
Return a list of commands for the node in the workflow There is one command per input file matching regular expresssions, if any. Default is 1 command.
# File lib/manband/workflow.rb, line 161 def getnodecommand(curnode) #fworkflow = YAML.load_file(@file) fworkflow = BandManager.load(@file) if fworkflow==nil return nil end maincommand = fworkflow["workflow"][curnode]["command"] # Manage node regexp exprs = maincommand.scan(/#(node|var)\.(.*?)#/) if exprs.length == 0 return [ fworkflow["workflow"][curnode]["command"] ] end subnodefilelist = Hash.new multinode=nil for reg in 0..exprs.length-1 @@log.debug "Expr "+": "+exprs[reg][0]+" "+exprs[reg][1] if exprs[reg][0] == "var" maincommand = setruntimevars(exprs[reg][1],maincommand) if maincommand == nil @@log.error "Runtime var "+exprs[reg][1]+" is not defined for workflow "+@id.to_s return nil end end if exprs[reg][0] == "node" # Get regexp for this node subnode = exprs[reg][1] # If regexp is empty, we jsut want the directory if fworkflow["workflow"][curnode][subnode]==nil return nil end if fworkflow["workflow"][curnode][subnode]['regexp'].strip == '' subnoderegexp = nil else subnoderegexp = Regexp.new(fworkflow["workflow"][curnode][subnode]['regexp']) end # List all files for this node regexp if subnode.match(/local/) # Local files reference nodepath = fworkflow["workflow"][curnode][subnode]['url'] else # An other node reference if subnode == 'root' nodepath = self.workdir+'/root'; else subjob = Job.first(:wid => @id, :node => subnode) nodepath = subjob.workdir end end filelist = Array.new if subnoderegexp == nil filelist.push(nodepath+"/") else if !File.exists?(nodepath) @@log.error("path does not exists!") return nil end Dir.new(nodepath).entries.each do |n| if subnoderegexp.match(n) filelist.push(nodepath+"/"+n) end end end @@log.debug "File list "+subnode+": "+filelist.to_s # update file list per node regexp in the command subnodefilelist[subnode]=filelist if filelist.length>1 if multinode!=nil # We do not support multiple lists in same command (N*N*N*....) return nil else multinode = subnode end end end end # TODO manage local URI regexp (or remote) # If remote: list then download # Now create an array of command with file lists substitution commands = Array.new subnodefilelist.each do |key,slist| if slist[0] == nil return nil end if key != multinode # replace, one match only allowed here maincommand["#node."+key+"#"]= slist[0] end end if multinode!=nil @@log.debug "Multinode: "+multinode subnodefilelist[multinode].each do |file| newcommand = String.new(maincommand) @@log.debug "Command: "+newcommand newcommand["#node."+multinode+"#"]= file commands.push(newcommand) end else commands.push(maincommand) end @@log.debug "Commands: "+commands.to_s return commands end
Checks if a workflow is over, e.g. we have reached all the terminal nodes (leafs).
# File lib/manband/workflow.rb, line 41 def isover? # decrement terminals @terminals = @terminals - 1 curw = nil # Use lock if MYSQL if ENV['MYSQL_URL'].include?("mysql") DataMapper.repository(:default).adapter.execute("UPDATE work_flows SET terminals = terminals - 1 WHERE id="+@id.to_s); curw = WorkFlow.get(@id) else curw = WorkFlow.get(@id) curw.update(:terminals => @terminals) end if curw.terminals <=0 @@log.info "Workflow "+@id.to_s+" is over" curw.update(:terminated_at => Time.now, :status => STATUS_OVER) return true end return false end
Parse workflow file and create jobs and links in the database curnode: current node id: id of the node as link originator
# File lib/manband/workflow.rb, line 86 def parse(curnode, id = nil) #fworkflow = YAML.load_file(@file) fworkflow = BandManager.load(@file) if fworkflow==nil return nil end jobs = getnextjobs(curnode) if jobs == nil return end jobs.each do |job| if Job.count(:wid => @id, :node => job) == 0 queue = "" if fworkflow["workflow"][job]["queue"]!=nil queue = fworkflow["workflow"][job]["queue"] end status = STATUS_NEW if fworkflow["workflow"][job]["breakpoint"]!=nil @@log.debug "Node "+job+" has a breakpoint set" status = STATUS_SUSPEND end store = STORE_NO if (!fworkflow["options"].nil? && fworkflow["options"]["store"] == "all") || fworkflow["workflow"][job]["store"] == true @@log.debug "Add store option for job "+job store = STORE_DO end type = ACTOR if fworkflow["workflow"][job]["type"]=='if' type = IF_ACTOR end if fworkflow["workflow"][job]["type"]=='interactive' type = INTERACTIVE_ACTOR end workdir = FlowConfig.getjobdir(@workdir) if curnode == "root" workdir = self.workdir + "/root"; end newjob = Job.new(:wid => @id, :node => job, :command => "", :status => status, :instances => 0, :maxinstances => 0, :queue => queue, :workdir => FlowConfig.getjobdir(@workdir), :store => store, :type => type) newjob.save if id != nil @@log.debug "Add link "+id.to_s+"->"+newjob.id.to_s+","+newjob.node link = JobLink.new(:wid => @id, :from => id, :to => newjob.id) link.save end parse(job,newjob.id) else if id!=nil # Already declared, just add link linkedjob = Job.first(:wid => @id, :node => job) @@log.debug "Add link "+id.to_s+"->"+linkedjob.id.to_s+","+linkedjob.node link = JobLink.new(:wid => @id, :from => id, :to => linkedjob.id) link.save end end end end
Update in command the runtime vars return modified command
# File lib/manband/workflow.rb, line 145 def setruntimevars(varexpr,command) newcommand = String.new(command) runtimevars = JSON.parse(self.vars) if runtimevars[varexpr]!=nil @@log.debug "Replace in command #var."+varexpr+"# by "+runtimevars[varexpr] newcommand["#var."+varexpr+"#"]= runtimevars[varexpr] else # runtime var is node defined return nil end return newcommand end