class BandManager
This class is used to launch new workflows
Public Class Methods
add(wfile, var = [], uid='admin', bucket='manband' , debug=false)
click to toggle source
Add a new workflow based on input workflow object wfile: workflow input file var: optional runtime variables uid: optional user id debug: activate debug mode, do not execute the commands @return workflow id
# File lib/manband/bandmanager.rb, line 40 def self.add(wfile, var = [], uid='admin', bucket='manband' , debug=false) rvariables = Hash.new #fworkflow = YAML.load_file(wfile) fworkflow = self.load(wfile) if fworkflow==nil return nil end terminals = 0 fworkflow["workflow"].each do |node| if node[1]["command"]!=nil # First init with vars defined in workflow exprs = node[1]["command"].scan(/#var\.(.*?)#/) if exprs.length > 0 for reg in 0..exprs.length-1 rvariables[exprs[reg][0]] = nil end end if node[1]["next"]==nil # This is a terminal node terminals += 1 @@log.debug "terminal: "+node[0] end end end @@log.debug "nb terminals: "+terminals.to_s if var!=nil var.each do |rvariable| rvardef = rvariable.split('=') rvariables[rvardef[0]]=rvardef[1] end @@log.debug "Using runtime variables: "+rvariables.to_json end workflow = WorkFlow.new(:uid => uid , :name => fworkflow["workflow"]["name"], :description => fworkflow["workflow"]["description"], :created_at => Time.now, :file => wfile, :terminals => terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => rvariables.to_json, :bucket => bucket) err =workflow.save if err == false return nil end instances = 0 regexp = nil # Check if workflow need to be run for many files if fworkflow["workflow"]["root"]["url"]!=nil nodepath = fworkflow["workflow"]["root"]["url"] if fworkflow["workflow"]["root"]["regexp"]!=nil regexp = Regexp.new(fworkflow["workflow"]["root"]["regexp"]) end filelist = Array.new Dir.new(nodepath).entries.each do |n| @@log.debug "Test file "+n if (regexp==nil || regexp.match(n)) && File.file?(nodepath+"/"+n) @@log.debug "New sub workflow" instances += 1 subworkflow = WorkFlow.new(:uid => uid , :name => fworkflow["workflow"]["name"], :description => fworkflow["workflow"]["description"], :created_at => Time.now, :file => wfile, :terminals => terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => rvariables.to_json, :bucket => bucket, :parent => workflow.id) err = subworkflow.save if !File.exists?(nodepath+"/root") FileUtils.mkdir_p subworkflow.workdir+"/root" end File.symlink(nodepath+"/"+n,subworkflow.workdir+"/root/"+n) @@log.debug("Add new sub workflow "+subworkflow.id.to_s) end end end @@log.debug "Number of instances: "+instances.to_s workflow.update(:instances => instances) return workflow.id end
launch(wfile, var = [], uid='admin', bucket='manband' , debug=false)
click to toggle source
Launch a new workflow based on input workflow object wfile: workflow input file
uid: optional user id debug: activate debug mode, do not execute the commands @return workflow id
# File lib/manband/bandmanager.rb, line 168 def self.launch(wfile, var = [], uid='admin', bucket='manband' , debug=false) workflow = self.add(wfile,var,uid,bucket,debug) if workflow==nil return nil end @@log.debug "Start workflow "+workflow.to_s self.start(workflow,debug) return workflow end
launchclone(id)
click to toggle source
Execute a workflow from an other one (clone it)
# File lib/manband/bandmanager.rb, line 146 def self.launchclone(id) workflow = WorkFlow.get(id) newworkflow = WorkFlow.new(:uid => workflow.uid , :name => workflow.name, :description => workflow.description, :created_at => Time.now, :file => workflow.file, :terminals => workflow.terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => workflow.vars, :bucket => workflow.bucket) newworkflow.save rootjob = Job.new(:wid => newworkflow.id, :node => "root", :command => "", :status => STATUS_NEW, :instances => 0, :maxinstances => 0, :workdir => '') rootjob.save newworkflow.parse('root',rootjob.id) jobmsg = '{ "id" : "'+newworkflow.id.to_s+'", "root" : "'+rootjob.id.to_s+'"}' job = Job.new(:wid => newworkflow.id, :node => "fake", :command => "", :status => STATUS_FAKE, :instances => 0, :workdir => '') Utils.publish('manband.master', { "operation" => OP_START, "msg" => jobmsg }) job.logMessage(OP_START,jobmsg) return newworkflow.id end
load(wfile)
click to toggle source
Loads a workflow file
# File lib/manband/bandmanager.rb, line 20 def self.load(wfile) if !File.exists?(wfile) @@log.error "Workflow file "+wfile+" does not exist!" return nil end begin workflow = YAML.load_file(wfile) rescue @@log.error "Error while load file" return nil end return workflow end
start(id,debug=false)
click to toggle source
Start a workflow in state NEW In debug mode sends a OP_SKIP instead of OP_START
# File lib/manband/bandmanager.rb, line 116 def self.start(id,debug=false) workflow = WorkFlow.get(id) wlist = Array.new wflows = WorkFlow.all(:parent => id) if wflows == nil || wflows.length==0 wlist.push(id) else wflows.each do |wflow| wlist.push wflow.id end end @@log.debug "Execute workflow list: "+wlist.to_s wlist.each do |wflow| workflow = WorkFlow.get(wflow) rootjob = Job.new(:wid => workflow.id, :node => "root", :command => "", :status => STATUS_NEW, :instances => 0, :maxinstances => 0, :workdir => '') rootjob.save workflow.parse('root',rootjob.id) # Request workflow management msg = '{ "id" : "'+workflow.id.to_s+'", "root" : "'+rootjob.id.to_s+'"}' if debug msg = '{ "id" : "'+rootjob.id.to_s+'","workflow" : "'+workflow.id.to_s+'", "node" : "'+rootjob.id.to_s+'"}' Utils.publish("manband.master", { "operation" => OP_SKIP, "msg" => msg }) else Utils.publish("manband.master", { "operation" => OP_START, "msg" => msg }) end end end