class Job

This class manages the job execution and its status on a job handler.

Public Class Methods

debug(mode) click to toggle source

Sets debug mode mode: boolean

# File lib/manband/job.rb, line 32
def self.debug(mode)
  @@debug = mode
end

Public Instance Methods

ack() click to toggle source
# File lib/manband/job.rb, line 47
def ack()
  @channel.acknowledge(@tag, false)
end
error!(instance = 0) click to toggle source

Sets a job in ERROR status. instance: job instance number in fault

# File lib/manband/job.rb, line 233
def error!(instance = 0)
  @status= STATUS_ERROR
  curjob = Job.get(@id)
  err = JSON.parse(curjob.error)
  err.push(instance)
  curjob.update(:status => STATUS_ERROR, :error => err.to_json)
  workflow = WorkFlow.get(@wid)
  workflow.update(:status => STATUS_ERROR)
end
finish() click to toggle source

Change instance counter If workflow is in suspend status, suspend the job at the end of its treatment

# File lib/manband/job.rb, line 154
def finish
  if @status == STATUS_SKIP
    return
  end
  workflow = WorkFlow.get(@wid)
  if workflow.status == STATUS_SUSPEND
    @status= STATUS_SUSPEND
    DataMapper.repository(:default).adapter.execute('UPDATE jobs SET instances = instances + 1, status = '+STATUS_SUSPEND+' WHERE id='+@id.to_s);
    curjob = Job.get(@id)
    #curjob = Job.get(@id)
    #instancesplusone = curjob.instances + 1
    #curjob.update(:status => STATUS_SUSPEND, :instances => instancesplusone)
  elsif @status == STATUS_SUSPEND
    # Job set in suspend mode (breakpoint), keep in this status
    #instancesplusone = curjob.instances + 1
    #curjob.update(:instances => instancesplusone)
    DataMapper.repository(:default).adapter.execute('UPDATE jobs SET instances = instances + 1 WHERE id='+@id.to_s);
    curjob = Job.get(@id)
  else
    @status= STATUS_OVER
    DataMapper.repository(:default).adapter.execute('UPDATE jobs SET instances = instances + 1  WHERE id='+@id.to_s);
    # TODO add a lock for update
    curjob = Job.get(@id)
    if curjob.instances >= curjob.maxinstances
      curjob.update(:status => STATUS_OVER)
    else
    end
  end
end
isover?() click to toggle source

Compares intance counter to max instances to determine if job is over.

# File lib/manband/job.rb, line 185
def isover?
  if @status == STATUS_OVER
    return true
  elsif @status == STATUS_SKIP
    return true
  else
    return false
  end
end
logMessage(operation,msg) click to toggle source

If debug, log the message in the database

# File lib/manband/job.rb, line 336
def logMessage(operation,msg)
    if @@debug == true
      bmsg = BandMessage.new(:wid => @wid, :message => '{ "operation" => '+operation+', "msg" => '+msg+' }' )
      bmsg.save
    end
end
resume() click to toggle source

Resume a suspended job Update its status and run next jobs

# File lib/manband/job.rb, line 197
def resume
  workflow = WorkFlow.get(@wid)
  # if status = suspend, continue to next
  # if status = error, restart job
  if @status == STATUS_SUSPEND
    @@log.debug "Resume from suspended job "+@id.to_s+" for workflow "+@wid.to_s
    @status= STATUS_OVER
    curjob = Job.get(@id)
    curjob.update(:status => STATUS_OVER)
    runnext
  elsif @status == STATUS_ERROR
    @@log.debug "Resume from error job "+@id.to_s+" for workflow "+@wid.to_s
    @status = STATUS_RUNNING
    curjob = Job.get(@id)
    commands = workflow.getnodecommand(curjob.node)
    if commands == nil
      @@log.error "Could not get command for node "+curjob.node
      return
    end
    # Restart whole job, use new tmp dir, reset errors
    curjob.update(:status => STATUS_RUNNING, :command => commands.to_json, :instances => 0, :workdir => FlowConfig.getjobdir(workflow.workdir), :error => "[]", :maxinstances => commands.length)
    i = 0
    if commands.length > 1
      i = 1
    end
    commands.each do |command|
      jobmsg = '{ "workflow" : "'+@wid.to_s+'" ,  "node" : "'+@node+'", "id" : "'+@id.to_s+'", "instance" : "'+i.to_s+'" }'
      i += 1
      # send message
      sendmessage(OP_RUN,jobmsg,curjob.queue)
    end
  end
end
run(curhandler,instance=0) click to toggle source

Update job status and launch the command locally Sends a finish message or an error message according to the job status. if storage is needed, job will send a finish AND a store message. Storage will occur in parallel of the rest of the workflow.

# File lib/manband/job.rb, line 69
def run(curhandler,instance=0)
  # Send run command, possibly multiple ones according to pattern
  # Command would send a message when over
  curjob = Job.get(@id)
  if @status != STATUS_SUSPEND
    @status=STATUS_RUNNING
    curjob.update(:status => STATUS_RUNNING, :handler => curhandler.to_s)
  else
    curjob.update(:handler => curhandler.to_s)
  end
  workdir = curjob.workdir
  if instance>0
    workdir = workdir + "/node" + instance.to_s
  end
  
  EventMachine.defer do
  err = runcommand(workdir,instance)
  # In the case of IF_ACTOR, exit code is not an error
  # but the job to select. Reset error code after updating the type.
  if @type == IF_ACTOR
    @type = err + 1
    curjob.update(:type => (err + 1))
    err = 0
  end 
  if err == nil || err > 0
    # An error occured
    jobmsg = '{ "workflow" : "'+@wid.to_s+'" ,  "node" : "'+@node+'", "id" : "'+@id.to_s+'", "handler" : "'+curhandler.to_s+'", "instance" : "'+instance.to_s+'" }'
    sendmessage(OP_ERROR,jobmsg)
  else
    if @type == INTERACTIVE_ACTOR
      @@log.debug "Interactive job, job will take in charge the answer and storage"
    else
      if curjob.store == STORE_DO || curjob.store == STORE_ERROR
        workflow = WorkFlow.get(curjob.wid)
        jobmsg = '{ "id" : "'+curjob.id.to_s+'", "bucket" : "'+workflow.bucket+'" }'
        sendmessage(OP_STORE,jobmsg)
      end
      jobmsg = '{ "workflow" : "'+@wid.to_s+'" ,  "node" : "'+@node+'", "id" : "'+@id.to_s+'", "handler" : "'+curhandler.to_s+'", "instance" : "'+instance.to_s+'" }'
      sendmessage(OP_FINISH,jobmsg)
    end
  end
  ack()
  end # End EventMachine
end
runcommand(workdir,instance) click to toggle source

Execute locally the command, creating directories and setting environment variables to empty string for security. wordir: job working directory instance: instance number in the list of commands

# File lib/manband/job.rb, line 126
def runcommand(workdir,instance)
  initcmd = "AMQP_URL='' && MYSQL_URL='' && WID="+@wid.to_s+" && JID="+@id.to_s+" && INSTANCE="+instance.to_s+" && mkdir -p "+workdir+" && cd "+workdir+" && WORKDIR="+workdir+" && " 
  curjob = Job.get(@id)
  command = JSON.parse(curjob.command)
  if command.length > 1
    cmd = initcmd+command[0]
  else
    cmd = initcmd+command[instance-1]
  end
  if FlowConfig.sudo() == true
    wflow = WorkFlow.get(@wid)
    script = wflow.workdir+"/"+self.id.to_s+"-"+instance.to_s+".sh"
    if !File.exists?(wflow.workdir)
       Dir.mkdir(wflow.workdir)
       system("chown -R "+wflow.uid+" "+wflow.workdir)
    end
    File.open(script, 'w') {|f| f.write("#!/bin/bash\n"+cmd) } 
    File.chmod(0755,script) 
    cmd = "sudo -u "+wflow.uid+" '"+script+"'"
  end
  @@log.debug cmd
    system(cmd)
    return $?.exitstatus
end
runnext(skip=false) click to toggle source

Run next jobs

# File lib/manband/job.rb, line 244
def runnext(skip=false)
  workflow = WorkFlow.get(@wid)
  # Are all previous jobs over?
  previous = JobLink.all(:to => @id)
  #nexts = workflow.getnextjobs(@node)
  if previous!=nil
    @@log.debug "Checking previous jobs: "+previous.length.to_s
  end
  if previous == nil || previous.length==0
    # This is fine, must be a root node
  elsif previous.length==1
    # Only 1 parent. As we got the message, previous is over
  else
    # Check each job
    previous.each do |link|
      pjob = Job.get(link.from)
      if pjob.status != STATUS_OVER
        @@log.debug "At least one previous job is not over ("+pjob.id.to_s+"), wait for next message"
        return false
      end
    end
    @@log.debug "All previous jobs are over, continue...."
  end

  # Look at next jobs
  nexts = JobLink.all(:from => @id)
  #nexts = workflow.getnextjobs(@node)
  if nexts == nil || nexts.length==0
    workflow.isover?
    return true
  end
  # Which next job is it?
  # Start at 1 because 0 is a valid status code, consider if type from value 1
  count = 1
  # For each job to follow
  nexts.each do |link|
  job = Job.get(link.to)
  #nexts.each do |nextnode|
    #nextnode.strip!
    #queue = workflow.getnodequeue(nextnode)
    # Explicit skip, or IF actor, we skip all actors but one
    if skip==true  || @status == STATUS_SKIP  || (@type>0 && count!= @type)
      jobstatus = STATUS_SKIP
      operation = OP_SKIP
      commands = [ "skipcommand"]
    else
      jobstatus = STATUS_NEW
      operation = OP_RUN
      commands = workflow.getnodecommand(job.node) 
    end
    count=count+1
    if commands == nil
      @@log.error "Could not get command for node "+job.id.to_s
      job.update(:command => "", :status => STATUS_ERROR, :instances => 0, :maxinstances => 0, :workdir => FlowConfig.getjobdir(workflow.workdir))
      workflow.update(:status => STATUS_ERROR)
      return
    end
    @@log.debug "New job command: "+commands.to_s
    job.update(:command => commands.to_json, :status => jobstatus, :instances => 0, :maxinstances => commands.length, :workdir => FlowConfig.getjobdir(workflow.workdir))
    #err = job.save
    i=0
    # If multiple instances, differenciate and start at 1
    if commands.length > 1
      i = 1
    end
    commands.each do |command| 
      jobmsg = '{ "workflow" : "'+job.wid.to_s+'" ,  "node" : "'+job.node+'", "id" : "'+job.id.to_s+'", "instance" : "'+i.to_s+'" }'
      i += 1
      # send message
      sendmessage(operation,jobmsg,job.queue)
    end
  end
end
sendmessage(operation,msg,jobqueue='') click to toggle source

Sends a message. According to the operation, message will be sent to master or node queues. operation: kind of message msg: message to send jobqueue: optional specific queue

# File lib/manband/job.rb, line 323
def sendmessage(operation,msg,jobqueue='')
  queue = "manband.master"
  if operation == OP_RUN || operation == OP_SKIP || operation == OP_DESTROY || operation == OP_CLEAN|| operation == OP_STORE
    queue = "manband.node"+jobqueue
  end
  if queue != nil
    @@log.debug "Send #"+operation+","+msg
    Utils.enqueue(queue, { "operation" => operation, "msg" => msg })
    logMessage(operation,msg)
  end
end
setAck(channel,tag) click to toggle source
# File lib/manband/job.rb, line 42
def setAck(channel,tag)
  @channel = channel
  @tag = tag
end
skip(curhandler) click to toggle source

Skip treatment, just answer, for debug

# File lib/manband/job.rb, line 115
def skip(curhandler)
  curjob = Job.get(@id)
  curjob.update(:handler => curhandler.to_s)
  jobmsg = '{ "workflow" : "'+@wid.to_s+'" ,  "node" : "'+@node+'", "id" : "'+@id.to_s+'", "handler" : "'+curhandler.to_s+'" }'
  sendmessage(OP_FINISH,jobmsg)
end