class BandManager

This class is used to launch new workflows

Public Class Methods

add(wfile, var = [], uid='admin', bucket='manband' , debug=false) click to toggle source

Add a new workflow based on input workflow object wfile: workflow input file var: optional runtime variables uid: optional user id debug: activate debug mode, do not execute the commands @return workflow id

# File lib/manband/bandmanager.rb, line 40
def self.add(wfile, var = [], uid='admin', bucket='manband' , debug=false)
  rvariables = Hash.new
  #fworkflow = YAML.load_file(wfile)
  fworkflow = self.load(wfile)
  if fworkflow==nil
    return nil
  end
  terminals = 0
  fworkflow["workflow"].each do |node|
    if node[1]["command"]!=nil
      # First init with vars defined in workflow
      exprs = node[1]["command"].scan(/#var\.(.*?)#/)
      if exprs.length > 0
        for reg in 0..exprs.length-1
          rvariables[exprs[reg][0]] = nil
        end
      end
      if node[1]["next"]==nil
        # This is a terminal node
        terminals += 1
        @@log.debug "terminal: "+node[0]
      end
    end
  end
  @@log.debug "nb terminals: "+terminals.to_s


  if var!=nil
    var.each do |rvariable|
      rvardef = rvariable.split('=')
      rvariables[rvardef[0]]=rvardef[1]
    end
    @@log.debug "Using runtime variables: "+rvariables.to_json
  end

  workflow = WorkFlow.new(:uid => uid , :name => fworkflow["workflow"]["name"], :description => fworkflow["workflow"]["description"], :created_at => Time.now, :file => wfile, :terminals => terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => rvariables.to_json, :bucket => bucket)

  err =workflow.save

  if err == false
    return nil
  end

  instances = 0
  regexp = nil
  # Check if workflow need to be run for many files
  if fworkflow["workflow"]["root"]["url"]!=nil
    nodepath = fworkflow["workflow"]["root"]["url"]
    if fworkflow["workflow"]["root"]["regexp"]!=nil
      regexp = Regexp.new(fworkflow["workflow"]["root"]["regexp"])
    end
      filelist = Array.new
      Dir.new(nodepath).entries.each do |n|
        @@log.debug "Test file "+n
        if (regexp==nil || regexp.match(n)) && File.file?(nodepath+"/"+n)
          @@log.debug "New sub workflow"
          instances += 1
          subworkflow = WorkFlow.new(:uid => uid , :name => fworkflow["workflow"]["name"], :description => fworkflow["workflow"]["description"], :created_at => Time.now, :file => wfile, :terminals => terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => rvariables.to_json, :bucket => bucket, :parent => workflow.id)
          err = subworkflow.save
          if !File.exists?(nodepath+"/root")
            FileUtils.mkdir_p  subworkflow.workdir+"/root"
          end
          File.symlink(nodepath+"/"+n,subworkflow.workdir+"/root/"+n)
          @@log.debug("Add new sub workflow "+subworkflow.id.to_s)            
        end
      end
   end
   @@log.debug "Number of instances: "+instances.to_s
   workflow.update(:instances => instances)


  return workflow.id
end
launch(wfile, var = [], uid='admin', bucket='manband' , debug=false) click to toggle source

Launch a new workflow based on input workflow object wfile: workflow input file

uid: optional user id debug: activate debug mode, do not execute the commands @return workflow id

# File lib/manband/bandmanager.rb, line 168
def self.launch(wfile, var = [], uid='admin', bucket='manband' , debug=false)
  workflow = self.add(wfile,var,uid,bucket,debug)
  if workflow==nil
    return nil
  end
  @@log.debug "Start workflow "+workflow.to_s
  self.start(workflow,debug)
  return workflow
end
launchclone(id) click to toggle source

Execute a workflow from an other one (clone it)

# File lib/manband/bandmanager.rb, line 146
def self.launchclone(id)
  workflow = WorkFlow.get(id)
  newworkflow = WorkFlow.new(:uid => workflow.uid , :name => workflow.name, :description => workflow.description, :created_at => Time.now, :file => workflow.file, :terminals => workflow.terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => workflow.vars, :bucket => workflow.bucket)
  newworkflow.save
  rootjob = Job.new(:wid => newworkflow.id, :node => "root", :command => "", :status => STATUS_NEW, :instances => 0, :maxinstances => 0, :workdir => '')
  rootjob.save
  newworkflow.parse('root',rootjob.id)
  jobmsg = '{ "id" : "'+newworkflow.id.to_s+'", "root" : "'+rootjob.id.to_s+'"}'
  job = Job.new(:wid => newworkflow.id, :node => "fake", :command => "", :status => STATUS_FAKE, :instances => 0, :workdir => '')
  
  Utils.publish('manband.master', { "operation" => OP_START, "msg" => jobmsg })
  job.logMessage(OP_START,jobmsg)

  return newworkflow.id
end
load(wfile) click to toggle source

Loads a workflow file

# File lib/manband/bandmanager.rb, line 20
def self.load(wfile) 
  if !File.exists?(wfile)
    @@log.error "Workflow file "+wfile+" does not exist!"
    return nil
  end
  begin
    workflow = YAML.load_file(wfile)
  rescue
    @@log.error "Error while load file"
    return nil
  end
  return workflow
end
start(id,debug=false) click to toggle source

Start a workflow in state NEW In debug mode sends a OP_SKIP instead of OP_START

# File lib/manband/bandmanager.rb, line 116
def self.start(id,debug=false)
  workflow = WorkFlow.get(id)
  wlist = Array.new
  wflows = WorkFlow.all(:parent => id)
  if wflows == nil || wflows.length==0
    wlist.push(id)
  else
    wflows.each do |wflow|
      wlist.push wflow.id
    end
  end

  @@log.debug "Execute workflow list: "+wlist.to_s
  wlist.each do |wflow|
    workflow = WorkFlow.get(wflow)
    rootjob = Job.new(:wid => workflow.id, :node => "root", :command => "", :status => STATUS_NEW, :instances => 0, :maxinstances => 0, :workdir => '')
    rootjob.save
    workflow.parse('root',rootjob.id)
    # Request workflow management
    msg = '{ "id" : "'+workflow.id.to_s+'", "root" : "'+rootjob.id.to_s+'"}'
    if debug
      msg = '{ "id" : "'+rootjob.id.to_s+'","workflow" : "'+workflow.id.to_s+'", "node" : "'+rootjob.id.to_s+'"}'
      Utils.publish("manband.master", { "operation" => OP_SKIP, "msg" => msg })
    else
      Utils.publish("manband.master", { "operation" => OP_START, "msg" => msg })
    end
  end
end