class Job
This class manages the job execution and its status on a job handler.
Public Class Methods
Sets debug mode mode: boolean
# File lib/manband/job.rb, line 32 def self.debug(mode) @@debug = mode end
Public Instance Methods
# File lib/manband/job.rb, line 47 def ack() @channel.acknowledge(@tag, false) end
Sets a job in ERROR status. instance: job instance number in fault
# File lib/manband/job.rb, line 233 def error!(instance = 0) @status= STATUS_ERROR curjob = Job.get(@id) err = JSON.parse(curjob.error) err.push(instance) curjob.update(:status => STATUS_ERROR, :error => err.to_json) workflow = WorkFlow.get(@wid) workflow.update(:status => STATUS_ERROR) end
Change instance counter If workflow is in suspend status, suspend the job at the end of its treatment
# File lib/manband/job.rb, line 154 def finish if @status == STATUS_SKIP return end workflow = WorkFlow.get(@wid) if workflow.status == STATUS_SUSPEND @status= STATUS_SUSPEND DataMapper.repository(:default).adapter.execute('UPDATE jobs SET instances = instances + 1, status = '+STATUS_SUSPEND+' WHERE id='+@id.to_s); curjob = Job.get(@id) #curjob = Job.get(@id) #instancesplusone = curjob.instances + 1 #curjob.update(:status => STATUS_SUSPEND, :instances => instancesplusone) elsif @status == STATUS_SUSPEND # Job set in suspend mode (breakpoint), keep in this status #instancesplusone = curjob.instances + 1 #curjob.update(:instances => instancesplusone) DataMapper.repository(:default).adapter.execute('UPDATE jobs SET instances = instances + 1 WHERE id='+@id.to_s); curjob = Job.get(@id) else @status= STATUS_OVER DataMapper.repository(:default).adapter.execute('UPDATE jobs SET instances = instances + 1 WHERE id='+@id.to_s); # TODO add a lock for update curjob = Job.get(@id) if curjob.instances >= curjob.maxinstances curjob.update(:status => STATUS_OVER) else end end end
Compares intance counter to max instances to determine if job is over.
# File lib/manband/job.rb, line 185 def isover? if @status == STATUS_OVER return true elsif @status == STATUS_SKIP return true else return false end end
If debug, log the message in the database
# File lib/manband/job.rb, line 336 def logMessage(operation,msg) if @@debug == true bmsg = BandMessage.new(:wid => @wid, :message => '{ "operation" => '+operation+', "msg" => '+msg+' }' ) bmsg.save end end
Resume a suspended job Update its status and run next jobs
# File lib/manband/job.rb, line 197 def resume workflow = WorkFlow.get(@wid) # if status = suspend, continue to next # if status = error, restart job if @status == STATUS_SUSPEND @@log.debug "Resume from suspended job "+@id.to_s+" for workflow "+@wid.to_s @status= STATUS_OVER curjob = Job.get(@id) curjob.update(:status => STATUS_OVER) runnext elsif @status == STATUS_ERROR @@log.debug "Resume from error job "+@id.to_s+" for workflow "+@wid.to_s @status = STATUS_RUNNING curjob = Job.get(@id) commands = workflow.getnodecommand(curjob.node) if commands == nil @@log.error "Could not get command for node "+curjob.node return end # Restart whole job, use new tmp dir, reset errors curjob.update(:status => STATUS_RUNNING, :command => commands.to_json, :instances => 0, :workdir => FlowConfig.getjobdir(workflow.workdir), :error => "[]", :maxinstances => commands.length) i = 0 if commands.length > 1 i = 1 end commands.each do |command| jobmsg = '{ "workflow" : "'+@wid.to_s+'" , "node" : "'+@node+'", "id" : "'+@id.to_s+'", "instance" : "'+i.to_s+'" }' i += 1 # send message sendmessage(OP_RUN,jobmsg,curjob.queue) end end end
Update job status and launch the command locally Sends a finish message or an error message according to the job status. if storage is needed, job will send a finish AND a store message. Storage will occur in parallel of the rest of the workflow.
# File lib/manband/job.rb, line 69 def run(curhandler,instance=0) # Send run command, possibly multiple ones according to pattern # Command would send a message when over curjob = Job.get(@id) if @status != STATUS_SUSPEND @status=STATUS_RUNNING curjob.update(:status => STATUS_RUNNING, :handler => curhandler.to_s) else curjob.update(:handler => curhandler.to_s) end workdir = curjob.workdir if instance>0 workdir = workdir + "/node" + instance.to_s end EventMachine.defer do err = runcommand(workdir,instance) # In the case of IF_ACTOR, exit code is not an error # but the job to select. Reset error code after updating the type. if @type == IF_ACTOR @type = err + 1 curjob.update(:type => (err + 1)) err = 0 end if err == nil || err > 0 # An error occured jobmsg = '{ "workflow" : "'+@wid.to_s+'" , "node" : "'+@node+'", "id" : "'+@id.to_s+'", "handler" : "'+curhandler.to_s+'", "instance" : "'+instance.to_s+'" }' sendmessage(OP_ERROR,jobmsg) else if @type == INTERACTIVE_ACTOR @@log.debug "Interactive job, job will take in charge the answer and storage" else if curjob.store == STORE_DO || curjob.store == STORE_ERROR workflow = WorkFlow.get(curjob.wid) jobmsg = '{ "id" : "'+curjob.id.to_s+'", "bucket" : "'+workflow.bucket+'" }' sendmessage(OP_STORE,jobmsg) end jobmsg = '{ "workflow" : "'+@wid.to_s+'" , "node" : "'+@node+'", "id" : "'+@id.to_s+'", "handler" : "'+curhandler.to_s+'", "instance" : "'+instance.to_s+'" }' sendmessage(OP_FINISH,jobmsg) end end ack() end # End EventMachine end
Execute locally the command, creating directories and setting environment variables to empty string for security. wordir: job working directory instance: instance number in the list of commands
# File lib/manband/job.rb, line 126 def runcommand(workdir,instance) initcmd = "AMQP_URL='' && MYSQL_URL='' && WID="+@wid.to_s+" && JID="+@id.to_s+" && INSTANCE="+instance.to_s+" && mkdir -p "+workdir+" && cd "+workdir+" && WORKDIR="+workdir+" && " curjob = Job.get(@id) command = JSON.parse(curjob.command) if command.length > 1 cmd = initcmd+command[0] else cmd = initcmd+command[instance-1] end if FlowConfig.sudo() == true wflow = WorkFlow.get(@wid) script = wflow.workdir+"/"+self.id.to_s+"-"+instance.to_s+".sh" if !File.exists?(wflow.workdir) Dir.mkdir(wflow.workdir) system("chown -R "+wflow.uid+" "+wflow.workdir) end File.open(script, 'w') {|f| f.write("#!/bin/bash\n"+cmd) } File.chmod(0755,script) cmd = "sudo -u "+wflow.uid+" '"+script+"'" end @@log.debug cmd system(cmd) return $?.exitstatus end
Run next jobs
# File lib/manband/job.rb, line 244 def runnext(skip=false) workflow = WorkFlow.get(@wid) # Are all previous jobs over? previous = JobLink.all(:to => @id) #nexts = workflow.getnextjobs(@node) if previous!=nil @@log.debug "Checking previous jobs: "+previous.length.to_s end if previous == nil || previous.length==0 # This is fine, must be a root node elsif previous.length==1 # Only 1 parent. As we got the message, previous is over else # Check each job previous.each do |link| pjob = Job.get(link.from) if pjob.status != STATUS_OVER @@log.debug "At least one previous job is not over ("+pjob.id.to_s+"), wait for next message" return false end end @@log.debug "All previous jobs are over, continue...." end # Look at next jobs nexts = JobLink.all(:from => @id) #nexts = workflow.getnextjobs(@node) if nexts == nil || nexts.length==0 workflow.isover? return true end # Which next job is it? # Start at 1 because 0 is a valid status code, consider if type from value 1 count = 1 # For each job to follow nexts.each do |link| job = Job.get(link.to) #nexts.each do |nextnode| #nextnode.strip! #queue = workflow.getnodequeue(nextnode) # Explicit skip, or IF actor, we skip all actors but one if skip==true || @status == STATUS_SKIP || (@type>0 && count!= @type) jobstatus = STATUS_SKIP operation = OP_SKIP commands = [ "skipcommand"] else jobstatus = STATUS_NEW operation = OP_RUN commands = workflow.getnodecommand(job.node) end count=count+1 if commands == nil @@log.error "Could not get command for node "+job.id.to_s job.update(:command => "", :status => STATUS_ERROR, :instances => 0, :maxinstances => 0, :workdir => FlowConfig.getjobdir(workflow.workdir)) workflow.update(:status => STATUS_ERROR) return end @@log.debug "New job command: "+commands.to_s job.update(:command => commands.to_json, :status => jobstatus, :instances => 0, :maxinstances => commands.length, :workdir => FlowConfig.getjobdir(workflow.workdir)) #err = job.save i=0 # If multiple instances, differenciate and start at 1 if commands.length > 1 i = 1 end commands.each do |command| jobmsg = '{ "workflow" : "'+job.wid.to_s+'" , "node" : "'+job.node+'", "id" : "'+job.id.to_s+'", "instance" : "'+i.to_s+'" }' i += 1 # send message sendmessage(operation,jobmsg,job.queue) end end end
Sends a message. According to the operation, message will be sent to master or node queues. operation: kind of message msg: message to send jobqueue: optional specific queue
# File lib/manband/job.rb, line 323 def sendmessage(operation,msg,jobqueue='') queue = "manband.master" if operation == OP_RUN || operation == OP_SKIP || operation == OP_DESTROY || operation == OP_CLEAN|| operation == OP_STORE queue = "manband.node"+jobqueue end if queue != nil @@log.debug "Send #"+operation+","+msg Utils.enqueue(queue, { "operation" => operation, "msg" => msg }) logMessage(operation,msg) end end
# File lib/manband/job.rb, line 42 def setAck(channel,tag) @channel = channel @tag = tag end
Skip treatment, just answer, for debug
# File lib/manband/job.rb, line 115 def skip(curhandler) curjob = Job.get(@id) curjob.update(:handler => curhandler.to_s) jobmsg = '{ "workflow" : "'+@wid.to_s+'" , "node" : "'+@node+'", "id" : "'+@id.to_s+'", "handler" : "'+curhandler.to_s+'" }' sendmessage(OP_FINISH,jobmsg) end