class Step
Constants
- INFO_SERIALIZER
- MAIN_RSYNC_ARGS
- STREAM_CACHE
- STREAM_CACHE_MUTEX
Attributes
Public Class Methods
Source
# File lib/rbbt/workflow/util/archive.rb, line 109 def self.archive(files, target = nil, recursive = true) target = self.path + '.tar.gz' if target.nil? target = File.expand_path(target) if String === target job_files = job_files_for_archive files, recursive TmpFile.with_file do |tmpdir| job_files.each do |file| Step.link_job file, tmpdir end Misc.in_dir(tmpdir) do if File.directory?(target) CMD.cmd_log("rsync #{MAIN_RSYNC_ARGS} --copy-unsafe-links '#{ tmpdir }/' '#{ target }/'") else Misc.tarize('.', target) #CMD.cmd_log("tar cvhzf '#{target}' ./*") end end Log.debug "Archive finished at: #{target}" end end
Source
# File lib/rbbt/workflow/step/status.rb, line 2 def self.clean(path) info_file = Step.info_file path pid_file = Step.pid_file path md5_file = Step.md5_file path files_dir = Step.files_dir path tmp_path = Step.tmp_path path if ! (Open.writable?(path) && Open.writable?(info_file)) Log.warn "Could not clean #{path}: not writable" return end if ENV["RBBT_DEBUG_CLEAN"] == 'true' raise "DO NOT CLEAN" end if (Open.exists?(path) or Open.broken_link?(path)) or Open.exists?(pid_file) or Open.exists?(info_file) or Open.exists?(files_dir) or Open.broken_link?(files_dir) or Open.exists?(pid_file) @result = nil @pid = nil Misc.insist do Open.rm info_file if Open.exists?(info_file) Open.rm md5_file if Open.exists?(md5_file) Open.rm path if (Open.exists?(path) || Open.broken_link?(path)) Open.rm_rf files_dir if Open.exists?(files_dir) || Open.broken_link?(files_dir) Open.rm pid_file if Open.exists?(pid_file) Open.rm tmp_path if Open.exists?(tmp_path) end end end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 12 def self.dup_stream(stream) case stream when IO, File, Step return stream if stream.respond_to?(:closed?) and stream.closed? return stream if stream.respond_to?(:done?) and stream.done? STREAM_CACHE_MUTEX.synchronize do stream_key = Misc.fingerprint(stream) current = STREAM_CACHE[stream_key] case current when nil, Step Log.medium "Not duplicating stream #{stream_key}" STREAM_CACHE[stream_key] = stream when File if Open.exists?(current.path) Log.medium "Reopening file #{stream_key}" Open.open(current.path) else new = Misc.dup_stream(current) Log.medium "Duplicating file #{stream_key} #{current.inspect} => #{Misc.fingerprint(new)}" new end else new = Misc.dup_stream(current) Log.medium "Duplicating stream #{stream_key} #{ Misc.fingerprint(stream) } => #{Misc.fingerprint(new)}" new end end when TSV::Dumper, TSV::Parser orig_stream = stream stream = stream.stream return stream if stream.closed? STREAM_CACHE_MUTEX.synchronize do if STREAM_CACHE[stream].nil? Log.high "Not duplicating #{Misc.fingerprint orig_stream} #{ stream.inspect }" STREAM_CACHE[stream] = stream else new = Misc.dup_stream(STREAM_CACHE[stream]) Log.high "Duplicating #{Misc.fingerprint orig_stream} #{ stream.inspect } into #{new.inspect}" new end end else stream end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 65 def self.files_dir(path) path.nil? ? nil : Path.setup(path + '.files') end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 69 def self.info_file(path) return nil if path.nil? info_file = path + '.info' return info_file if Open.exist?(info_file) if path.end_with?('.gz') info_file_nogz = path.sub(/\.gz$/,'') + '.info' return info_file_nogz if Open.exists?(info_file_nogz) end info_file end
Source
# File lib/rbbt/workflow/util/archive.rb, line 55 def self.job_files_for_archive(files, recursive = false, skip_overriden = false) job_files = Set.new jobs = files.collect do |file| if Step === file file else file = file.sub(/\.info$/,'') Step.new(File.expand_path(file)) end end.uniq jobs.each do |step| next unless File.exist?(step.path) next if skip_overriden && step.overriden job_files << step.path job_files << step.info_file if File.exist?(step.info_file) job_files << Step.md5_file(step.path) if File.exist?(Step.md5_file step.path) job_file_dir_content = Dir.glob(step.files_dir + '/**/*') job_files += job_file_dir_content job_files << step.files_dir if File.exist?(step.files_dir) rec_dependencies = Set.new next unless recursive deps = [step.path] seen = Set.new while deps.any? path = deps.shift dep = Step.load path seen << dep.path #dep.load_dependencies_from_info dep.dependencies.each do |dep| next if seen.include? dep.path deps << dep.path rec_dependencies << dep.path end if dep.info[:dependencies] end rec_dependencies.each do |path| dep = Workflow.load_step path.dup job_files << dep.path job_files << dep.files_dir if Dir.glob(dep.files_dir + '/*').any? job_files << dep.info_file if File.exist?(dep.info_file) end end job_files.to_a end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 109 def self.job_name_for_info_file(info_file, extension = nil) if extension and not extension.empty? info_file.sub(/\.#{extension}\.info$/,'') else info_file.sub(/\.info$/,'') end end
Source
# File lib/rbbt/workflow/util/archive.rb, line 6 def self.link_job(path, target_dir, task = nil, workflow = nil) Path.setup(target_dir) name = File.basename(path) task = File.basename(File.dirname(path)) if task.nil? workflow = File.basename(File.dirname(File.dirname(path))) if workflow.nil? return if target_dir[workflow][task][name].exists? || File.symlink?(target_dir[workflow][task][name].find) Log.debug "Linking #{ path }" FileUtils.mkdir_p target_dir[workflow][task] unless target_dir[workflow][task].exists? FileUtils.ln_s path, target_dir[workflow][task][name].find if File.exist?(path) FileUtils.ln_s path + '.files', target_dir[workflow][task][name].find + '.files' if File.exist?(path + '.files') FileUtils.ln_s path + '.info', target_dir[workflow][task][name].find + '.info' if File.exist?(path + '.info') end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 35 def self.load_serialized_info(io) IndiferentHash.setup(INFO_SERIALIZER.load(io)) end
Source
# File lib/rbbt/workflow/step/info.rb, line 243 def self.log(status, message, path, &block) if block if Hash === message log_progress(status, message, path, &block) else log_block(status, message, path, &block) end else log_string(status, message, path) end end
Source
# File lib/rbbt/workflow/step/info.rb, line 162 def self.log_block(status, message, path, &block) start = Time.now status = status.to_s status_color = self.status_color status Log.info do now = Time.now str = Log.color :reset str << "#{ Log.color status_color, status}" str << ": #{ message }" if message and message != :result str << " -- #{Log.color :blue, path.to_s}" if path str << " #{Log.color :yellow, Process.pid}" str end res = yield eend = Time.now Log.info do now = Time.now str = "#{ Log.color :cyan, status.to_s } +#{Log.color :green, "%.2f" % (eend - start)}" str << ": #{ res }" if message == :result str << " -- #{Log.color :blue, path.to_s}" if path str << " #{Log.color :yellow, Process.pid}" str end res end
Source
# File lib/rbbt/workflow/step/info.rb, line 204 def self.log_progress(status, options = {}, path = nil, &block) options = Misc.add_defaults options, :severity => Log::INFO, :file => (@exec ? nil : path) max = Misc.process_options options, :max Log::ProgressBar.with_bar(max, options) do |bar| begin res = yield bar raise KeepBar.new res if IO === res res rescue Log.exception $! raise $! end end end
Source
# File lib/rbbt/workflow/step/info.rb, line 189 def self.log_string(status, message, path) Log.info do status = status.to_s status_color = self.status_color status str = Log.color :reset str << "#{ Log.color status_color, status}" str << ": #{ message }" if message str << " -- #{Log.color :blue, path.to_s}" if path str << " #{Log.color :yellow, Process.pid}" str end end
Source
# File lib/rbbt/workflow/refactor.rb, line 12 def self.md5_file(path) path.nil? ? nil : path + '.md5' end
Source
# File lib/rbbt/workflow/util/archive.rb, line 164 def self.migrate(paths, search_path, options = {}) subpath_files = {} target_paths = [] resource = options[:resource] || Rbbt target = Rbbt.migrate_target_path('var/jobs', search_path, resource, options[:target]) (Array === paths ? paths : [paths]).each do |path| if Step === path if options[:source] path = Rbbt.identify(path.path) else path = path.path end end search_path = 'user' if search_path.nil? path, real_paths, lpath = self.migrate_source_paths(path, resource, options[:source], options[:recursive]) real_paths.sort.each do |path| parts = path.split("/") subpath = parts[0..-4] * "/" + "/" if subpath_files.keys.any? && subpath.start_with?(subpath_files.keys.last) subpath = subpath_files.keys.last end source = path.chars[subpath.length..-1] * "" subpath_files[subpath] ||= [] subpath_files[subpath] << source end target_paths << File.join(target, *path.split("/")[-3..-1]) end subpath_files.each do |subpath, files| Rbbt.migrate_files([subpath], target, options.merge(:files => files)) end target_paths end
Source
# File lib/rbbt/workflow/util/archive.rb, line 131 def self.migrate_source_paths(path, resource = Rbbt, source = nil, recursive = true) recursive = false if recursive.nil? if source lpath, *paths = Misc.ssh_run(source, <<-EOF).split("\n") require 'rbbt-util' require 'rbbt/workflow' recursive = #{ recursive.to_s } path = "#{path}" if Open.exists?(path) path = #{resource.to_s}.identify(path) else path = Path.setup(path) end files = path.glob_all.collect{|p| File.directory?(p) ? p + "/" : p } files = Step.job_files_for_archive(files, recursive) puts path puts files * "\n" EOF [path, paths.collect{|p| [source, p] * ":"}, lpath] else path = Path.setup(path.dup) files = path.glob_all files = Step.job_files_for_archive(files, recursive) [path, files, path] end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 94 def self.pid_file(path) path.nil? ? nil : path + '.pid' end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 72 def self.prepare_for_execution(job) return if job.done? && ! job.dirty? status = job.status.to_s if defined?(WorkflowRemoteClient) && WorkflowRemoteClient::RemoteStep === job return unless (status == 'done' or status == 'error' or status == 'aborted') else return if status == 'streaming' and job.running? end canfail = nil job.status_lock.synchronize do status = job.status.to_s if (status == 'error' && (job.recoverable_error? || job.dirty?)) || (job.noinfo? && Open.exists?(job.pid_file)) || job.aborted? || (job.done? && ! job.updated?) || (job.error? && ! job.updated?) || (job.done? && job.dirty?) || (job.error? && job.dirty?) || (!(job.noinfo? || job.done? || job.error? || job.aborted? || job.running?)) if ! (job.resumable? && (job.updated? && ! job.dirty?)) Log.high "About to clean -- status: #{status}, present #{File.exist?(job.path)}, " + %w(done? error? recoverable_error? noinfo? updated? dirty? aborted? running? resumable?). collect{|v| [v, job.send(v)]*": "} * ", " if RBBT_DEBUG_CLEAN job.clean end job.set_info :status, :cleaned end job.dup_inputs unless status == 'done' or job.started? job.init_info(status == 'noinfo') unless status == 'waiting' || status == 'done' || job.started? || ! Workflow.job_path?(job.path) canfail = ComputeDependency === job && job.canfail? end Step.raise_dependency_error(job) if job.error? and not canfail end
Source
# File lib/rbbt/workflow/step/produce.rb, line 2 def self.produce_dependencies(jobs, tasks, cpus) deps = [] jobs = [jobs] unless Array === jobs tasks = [tasks] unless Array === tasks tasks = tasks.collect{|t| t.to_s} jobs.each do |job| job.rec_dependencies.each do |dep| next if dep.done? dep.clean if dep.error? && dep.recoverable_error? deps << dep if tasks.include?(dep.task_name.to_s) or tasks.include?([dep.workflow.to_s, dep.task_name] * "#") end end cpus = jobs.length if cpus.to_s == "max" cpus = cpus.to_i if String === cpus TSV.traverse deps.collect{|dep| dep.path}, :type => :array, :cpus => cpus, :bar => "Prepare dependencies #{Misc.fingerprint tasks} for #{Misc.fingerprint jobs}" do |path| dep = deps.select{|dep| dep.path == path}.first dep.produce nil end end
Source
# File lib/rbbt/workflow/util/provenance.rb, line 86 def self.prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil) info = step.info || {} info[:task_name] = task path = step.path status = info[:status] || :missing status = "remote" if Open.remote?(path) || Open.ssh?(path) name = info[:name] || File.basename(path) status = :unsync if status == :done and not Open.exist?(path) status = :notfound if status == :noinfo and not Open.exist?(path) this_step_msg = prov_report_msg(status, name, path, info, input) input_dependencies = {} step.dependencies.each do |dep| if dep.input_dependencies.any? dep.input_dependencies.each do |id| input_name, _dep = dep.recursive_inputs.fields.zip(dep.recursive_inputs).select{|f,d| d == id || (String === d && d.start_with?(id.files_dir)) || (Array === d && d.include?(id)) }.last if input_name input_dependencies[id] ||= [] input_dependencies[id] << [dep, input_name] end end end end if step.dependencies str = "" str = " " * offset + this_step_msg if ENV["RBBT_ORIGINAL_STACK"] == 'true' step.dependencies.dup.tap{|l| l.reverse! if ENV["RBBT_ORIGINAL_STACK"] == 'true' }.each do |dep| path = dep.path new = ! seen.include?(path) if new seen << path str << prov_report(dep, offset + 1, task, seen, expand_repeats, input_dependencies[dep]) else if expand_repeats str << Log.color(Step.status_color(dep.status), Log.uncolor(prov_report(dep, offset+1, task))) else info = dep.info || {} status = info[:status] || :missing status = "remote" if Open.remote?(path) || Open.ssh?(path) name = info[:name] || File.basename(path) status = :unsync if status == :done and not Open.exist?(path) status = :notfound if status == :noinfo and not Open.exist?(path) str << Log.color(Step.status_color(status), " " * (offset + 1) + Log.uncolor(prov_report_msg(status, name, path, info, input_dependencies[dep]))) end end end if step.dependencies str += (" " * offset) + this_step_msg unless ENV["RBBT_ORIGINAL_STACK"] == 'true' str end
Source
# File lib/rbbt/workflow/util/provenance.rb, line 29 def self.prov_report_msg(status, name, path, info, input = nil) parts = path.sub(/\{.*/,'').split "/" parts.pop task = Log.color(:yellow, parts.pop) workflow = Log.color(:magenta, parts.pop) #if status.to_s == 'noinfo' && parts.last != 'jobs' if ! Workflow.job_path?(path) task, status, workflow = Log.color(:yellow, info[:task_name]), Log.color(:green, "file"), Log.color(:magenta, "-") end path_mtime = begin Open.mtime(path) rescue Exception nil end if input.nil? || input.empty? input_str = nil else input = input.reject{|dep,name| (input & dep.dependencies.collect{|d| [d,name]}).any? } input = input.reject{|dep,name| (input & dep.input_dependencies.collect{|d| [d,name]}).any? } input_str = Log.color(:magenta, "-> ") + input.collect{|dep,name| Log.color(:yellow, dep.task_name.to_s) + ":" + Log.color(:yellow, name) }.uniq * " " end str = if ! (Open.remote?(path) || Open.ssh?(path)) && (Open.exists?(path) && $main_mtime && path_mtime && ($main_mtime - path_mtime) < -2) prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " << " (#{Log.color(:red, "Mtime out of sync") })" else prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " end if $inputs and $inputs.any? job_inputs = Workflow.load_step(path).recursive_inputs.to_hash IndiferentHash.setup(job_inputs) $inputs.each do |input| value = job_inputs[input] next if value.nil? value_str = Misc.fingerprint(value) str << "\t#{Log.color :magenta, input}=#{value_str}" end end if $info_fields and $info_fields.any? $info_fields.each do |field| IndiferentHash.setup(info) value = info[field] next if value.nil? value_str = Misc.fingerprint(value) str << "\t#{Log.color :magenta, field}=#{value_str}" end end str << "\n" end
Source
# File lib/rbbt/workflow/util/provenance.rb, line 24 def self.prov_status_msg(status) color = status_color(status) Log.color(color, status.to_s) end
Source
# File lib/rbbt/workflow/util/archive.rb, line 210 def self.purge(path, recursive = false, skip_overriden = true) path = [path] if String === path job_files = job_files_for_archive path, recursive, skip_overriden job_files.each do |file| begin Log.debug "Purging #{file}" Open.rm_rf file if Open.exists?(file) rescue Log.warn "Could not erase '#{file}': #{$!.message}" end end end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 5 def self.purge_stream_cache # Log.debug "Purging dup. stream cache" STREAM_CACHE_MUTEX.synchronize do STREAM_CACHE.clear end end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 113 def self.raise_dependency_error(job) begin if job.get_exception klass = job.get_exception.class else klass = Kernel.const_get(info[:exception][:class]) end rescue Log.exception $! raise DependencyError, job end if (klass <= RbbtException) raise DependencyRbbtException, job else raise DependencyError, job end end
Source
# File lib/rbbt/workflow/refactor/inputs.rb, line 2 def self.save_inputs(inputs, input_types, dir) inputs.collect do |name,value| next if value.nil? type = input_types[name] type = type.to_s if type Task.save_input(dir, name, type, value) name end.compact.any? end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 30 def self.serialize_info(info) info = info.clean_version if IndiferentHash === info INFO_SERIALIZER.dump(info) end
Source
# File lib/rbbt/workflow/step/info.rb, line 148 def self.status_color(status) status = status.split(">").last case status when "starting" :yellow when "error", "aborted" :red when "done" :green else :cyan end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 98 def self.step_info(path) begin Open.open(info_file(path), :mode => 'rb') do |f| self.load_serialized_info(f) end rescue Exception Log.exception $! {} end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 82 def self.tmp_path(path) path = path.find if Path === path path = File.expand_path(path) dir = File.dirname(path) filename = File.basename(path) File.join(dir, '.' << filename) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 40 def self.wait_for_jobs(jobs) jobs = [jobs] if Step === jobs begin threads = [] threads = jobs.collect do |j| Thread.new do begin j.soft_grace j.join unless j.done? rescue Exception Log.error "Exception waiting for job: #{Log.color :blue, j.path}" raise $! end end end threads.each{|t| t.join } rescue Exception threads.each{|t| t.exit } jobs.each do |j| j.abort end raise $! end end
Public Instance Methods
Source
# File lib/rbbt/workflow/step/status.rb, line 142 def aborted? status = self.status status == :aborted || ((status != :ending && status != :dependencies && status != :cleaned && status != :noinfo && status != :setup && status != :noinfo && status != :waiting) && nopid?) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 361 def access CMD.cmd("touch -c -h -a #{self.path} #{self.info_file}") end
Source
# File lib/rbbt/workflow/util/archive.rb, line 21 def archive(target = nil) target = self.path + '.tar.gz' if target.nil? target = File.expand_path(target) TmpFile.with_file do |tmpdir| Step.link_job self.path, tmpdir rec_dependencies = Set.new deps = [self.path] seen = Set.new while deps.any? path = deps.shift dep = Step.load path seen << dep.path dep.dependencies.each do |dep| next if seen.include? dep.path deps << dep.path rec_dependencies << dep.path end if dep.dependencies end rec_dependencies.each do |path| Step.link_job path, tmpdir end Misc.in_dir(tmpdir) do if File.directory?(target) CMD.cmd_log("rsync #{MAIN_RSYNC_ARGS} --copy-unsafe-links '#{ tmpdir }/' '#{ target }/'") else CMD.cmd_log("tar cvhzf '#{target}' ./*") end end Log.debug "Archive finished at: #{target}" end end
Source
# File lib/rbbt/workflow/step/info.rb, line 79 def archive_deps self.set_info :archived_info, archived_info self.set_info :archived_dependencies, info[:dependencies] end
Source
# File lib/rbbt/workflow/step/info.rb, line 84 def archived_info return info[:archived_info] if info[:archived_info] archived_info = {} dependencies.each do |dep| if Symbol === dep.overriden && ! Open.exists?(dep.info_file) archived_info[dep.path] = dep.overriden else archived_info[dep.path] = dep.info end archived_info.merge!(dep.archived_info) end if dependencies archived_info end
Source
# File lib/rbbt/workflow/step/info.rb, line 100 def archived_inputs return {} unless info[:archived_dependencies] archived_info = self.archived_info all_inputs = IndiferentHash.setup({}) deps = info[:archived_dependencies].collect{|p| p.last} seen = [] while path = deps.pop dep_info = archived_info[path] if Hash === dep_info dep_info[:inputs].each do |k,v| all_inputs[k] = v unless all_inputs.include?(k) end if dep_info[:inputs] deps.concat(dep_info[:dependencies].collect{|p| p.last } - seen) if dep_info[:dependencies] deps.concat(dep_info[:archived_dependencies].collect{|p| p.last } - seen) if dep_info[:archived_dependencies] end seen << path end all_inputs end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 355 def canfail_paths return Set.new if done? && ! Open.exists?(info_file) @canfail_paths ||= begin if info[:canfail] paths = info[:canfail].uniq paths = Workflow.relocate_array self.path, paths if relocated Set.new(paths) else canfail_paths = Set.new all_deps = dependencies || [] all_deps.each do |dep| next if canfail_paths.include? dep.path canfail_paths += dep.canfail_paths next unless ComputeDependency === dep && dep.canfail? canfail_paths << dep.path canfail_paths += dep.rec_dependencies.collect{|d| d.path } end canfail_paths begin set_info :canfail, canfail_paths.to_a rescue Errno::EROFS end canfail_paths end end end
Source
# File lib/rbbt/workflow/step/status.rb, line 34 def clean if ! Open.exists?(info_file) Log.high "Refusing to clean step with no .info file: #{path}" return self end status = [] status << "dirty" if done? && dirty? status << "not running" if ! done? && ! running? status.unshift " " if status.any? Log.high "Cleaning step: #{path}#{status * " "}" Log.stack caller if RBBT_DEBUG_CLEAN abort if ! done? && running? Step.clean(path) @done = false self end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 346 def config(key, *tokens) options = tokens.pop if Hash === tokens.last options ||= {} new_tokens = [] if workflow workflow_name = workflow.to_s new_tokens << ("workflow:" << workflow_name) new_tokens << ("task:" << workflow_name << "#" << task_name.to_s) end new_tokens << ("task:" << task_name.to_s) Rbbt::Config.get(key, tokens + new_tokens, options) end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 575 def dependencies=(dependencies) @dependencies = dependencies set_info :dependencies, dependencies.collect{|dep| [dep.task_name, dep.name, dep.path]} if dependencies end
Source
# File lib/rbbt/workflow/step/status.rb, line 76 def dirty? return true if Open.exists?(pid_file) && ! ( Open.exists?(info_file) || done? ) return true if done? && ! (status == :done || status == :noinfo) return false unless done? || status == :done return false unless ENV["RBBT_UPDATE"] == "true" status = self.status if done? and not (status == :done or status == :ending or status == :producing) and not status == :noinfo return true end if status == :done and not done? return true end if dirty_files.any? Log.low "Some dirty files found for #{self.path}: #{Misc.fingerprint dirty_files}" true else ! self.updated? end end
Source
# File lib/rbbt/workflow/step/status.rb, line 63 def dirty_files rec_dependencies = self.rec_dependencies(true) return [] if rec_dependencies.empty? canfail_paths = self.canfail_paths dirty_files = rec_dependencies.reject{|dep| (defined?(WorkflowRemoteClient) && WorkflowRemoteClient::RemoteStep === dep) || ! Open.exists?(dep.info_file) || (dep.path && (Open.exists?(dep.path) || Open.remote?(dep.path))) || ((dep.error? || dep.aborted?) && (! dep.recoverable_error? || canfail_paths.include?(dep.path))) } end
Source
# File lib/rbbt/workflow/step/status.rb, line 100 def done? @done ||= path and Open.exists?(path) end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 60 def dup_inputs return if @inputs.nil? return if @dupped or ENV["RBBT_NO_STREAM"] == 'true' return if ComputeDependency === self and self.compute == :produce Log.low "Dupping inputs for #{path}" dupped_inputs = @inputs.collect do |input| Step.dup_stream input end @inputs.replace dupped_inputs @dupped = true end
Source
# File lib/rbbt/workflow/step/info.rb, line 263 def exception(ex, msg = nil) ex_class = ex.class.to_s backtrace = ex.backtrace if ex.respond_to?(:backtrace) message = ex.message if ex.respond_to?(:message) set_info :backtrace, backtrace set_info :exception, {:class => ex_class, :message => message, :backtrace => backtrace} if msg.nil? log :error, "#{ex_class} -- #{message}" else log :error, "#{msg} -- #{message}" end self._abort end
Source
# File lib/rbbt/workflow/refactor.rb, line 8 def exec(noload = false) old_exec end
Also aliased as: old_exec
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 227 def execute_and_dup(step, dep_step, log = true) dup = step.result.nil? execute_dependency(step, log) if dup and step.streaming? and not step.result.nil? if dep_step[step.path] and dep_step[step.path].length > 1 stream = step.result other_steps = dep_step[step.path].uniq.reject{|d| d.overriden } other_steps = other_steps.collect{|d| deps_using_step_input = d.rec_dependencies.select{|d| d.inputs.include? step } deps_using_step_input.any? ? deps_using_step_input : d }.flatten.uniq return unless other_steps.length > 1 log_dependency_exec(step, "duplicating #{other_steps.length}") copies = Misc.tee_stream_thread_multiple(stream, other_steps.length) copies.extend StreamArray step.instance_variable_set("@result", copies) end end end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 156 def execute_dependency(dependency, log = true) task_name = self.task_name canfail_paths = self.canfail_paths already_failed = [] begin dependency.resolve_input_steps if dependency.done? dependency.inputs.each do |v| Misc.consume_stream(v) if IO === v Misc.consume_stream(TSV.get_stream v) if Step === v and not v.done? and v.streaming? end log_dependency_exec(dependency, :done) if log return end dependency.status_lock.synchronize do if dependency.aborted? || (dependency.error? && dependency.recoverable_error? && ! canfail_paths.include?(dependency.path) && ! already_failed.include?(dependency.path)) || (!Open.remote?(dependency.path) && dependency.missing?) if dependency.resumable? dependency.status = :resume else Log.warn "Cleaning dep. on exec #{Log.color :blue, dependency.path} (missing: #{dependency.missing?}; error #{dependency.error?})" dependency.clean already_failed << dependency.path raise TryAgain end end end if dependency.status == :resume || ! (dependency.started? || dependency.error?) log_dependency_exec(dependency, :starting) dependency.run(true) raise TryAgain end dependency.grace if dependency.error? log_dependency_exec(dependency, :error) Step.raise_dependency_error dependency end if dependency.streaming? log_dependency_exec(dependency, :streaming) if log return end begin log_dependency_exec(dependency, :joining) dependency.join raise TryAgain unless dependency.done? rescue Aborted raise TryAgain end rescue TryAgain #Log.low "Retrying dep. #{Log.color :yellow, dependency.task_name.to_s} -- [#{dependency.status}] #{(dependency.messages || ["No message"]).last}" retry rescue Aborted, Interrupt Log.error "Aborted dep. #{Log.color :red, dependency.task_name.to_s}" raise $! rescue Interrupt Log.error "Interrupted while in dep. #{Log.color :red, dependency.task_name.to_s}" raise $! rescue Exception Log.error "Exception in dep. #{ Log.color :red, dependency.task_name.to_s } -- #{$!.message}" raise $! unless canfail_paths.include? dependency.path end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 267 def file(name=nil) Path.setup(File.join(files_dir, name.to_s), workflow, self) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 260 def files files = Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path}.collect do |path| Misc.path_relative_to(files_dir, path) end files end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 252 def files_dir @files_dir ||= Step.files_dir path end
{{{ INFO
Source
# File lib/rbbt/workflow/step/info.rb, line 277 def get_exception if info[:exception].nil? return Aborted if aborted? return Exception.new(messages.last) if error? Exception.new "" else ex_class, ex_message, ex_backtrace = info[:exception].values_at :class, :message, :backtrace begin klass = Kernel.const_get(ex_class) ex = klass.new ex_message ex.set_backtrace ex_backtrace unless ex_backtrace.nil? or ex_backtrace.empty? ex rescue Log.exception $! Exception.new ex_message end end end
Source
# File lib/rbbt/workflow/step/info.rb, line 2 def info(check_lock = true) return {:status => :noinfo} if info_file.nil? || ! Open.exists?(info_file) begin Misc.insist do begin return @info_cache if @info_cache and @info_cache_time and Open.ctime(info_file) < @info_cache_time rescue Exception raise $! end begin @info_cache = Misc.insist(3, 1.6, info_file) do Misc.insist(2, 1, info_file) do Misc.insist(3, 0.2, info_file) do raise TryAgain, "Info locked" if check_lock and info_lock.locked? info_lock.lock if check_lock and false begin Open.open(info_file, :mode => 'rb') do |file| Step.load_serialized_info(file) end ensure info_lock.unlock if check_lock and false end end end end @info_cache_time = Time.now @info_cache end end rescue Exception Log.debug{"Error loading info file: " + info_file} Log.exception $! #Open.rm info_file #Misc.sensiblewrite(info_file, Step.serialize_info({:status => :error, :messages => ["Info file lost"]})) raise $! end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 175 def info_file @info_file ||= Step.info_file(path) end
{{{ INFO
Source
# File lib/rbbt/workflow/step/accessor.rb, line 183 def info_lock @info_lock = begin path = Persist.persistence_path(info_file + '.lock', {:dir => Step.lock_dir}) #Lockfile.new path, :refresh => false, :dont_use_lock_id => true Lockfile.new path end if @info_lock.nil? @info_lock end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 201 def init_info(force = false) return nil if @exec || info_file.nil? || (Open.exists?(info_file) && ! force) batch_job = info[:batch_job] if Open.exists?(info_file) batch_system = info[:batch_system] if Open.exists?(info_file) Open.lock(info_file, :lock => info_lock) do i = {:status => :waiting, :pid => Process.pid, :path => path, :real_inputs => real_inputs, :overriden => overriden} i[:batch_job] = batch_job if batch_job i[:batch_system] = batch_system if batch_system i[:dependencies] = dependencies.collect{|dep| [dep.task_name, dep.name, dep.path]} if dependencies Misc.sensiblewrite(info_file, Step.serialize_info(i), :force => true, :lock => false) @info_cache = IndiferentHash.setup(i) @info_cache_time = Time.now end end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 147 def input_dependencies @input_dependencies ||= recursive_inputs(true).flatten. select{|i| Step === i || (defined?(RemoteStep) && RemoteStep === i) } + recursive_inputs(true).flatten. select{|dep| Path === dep && Step === dep.resource }. #select{|dep| ! dep.resource.started? }. # Ignore input_deps already started collect{|dep| dep.resource } end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 431 def inspect Misc.fingerprint(self) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 424 def knowledge_base(organism = nil) @_kb ||= begin kb_dir = self.file('knowledge_base') KnowledgeBase.new kb_dir, organism end end
Source
# File lib/rbbt/workflow/step/info.rb, line 62 def load_dependencies_from_info relocated = nil @dependencies = (self.info[:dependencies] || []).collect do |task,name,dep_path| dep_path = task if dep_path.nil? if Open.exists?(dep_path) || Open.exists?(dep_path + '.info') Workflow._load_step dep_path else next if FalseClass === relocated new_path = Workflow.relocate(path, dep_path) relocated = true if Open.exists?(new_path) || Open.exists?(new_path + '.info') Workflow._load_step new_path end end.compact @relocated = relocated end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 287 def load_file(name, type = nil, options = {}) if type.nil? and name =~ /.*\.(\w+)$/ extension = name.match(/.*\.(\w+)$/)[1] case extension when "tc" type = :tc when "tsv" type = :tsv when "list", "ary", "array" type = :array when "yaml" type = :yaml when "marshal" type = :marshal else type = :other end else type ||= :other end case type.to_sym when :tc Persist.open_tokyocabinet(file(name), false) when :tsv TSV.open Open.open(file(name)), options when :array #Open.read(file(name)).split /\n|,\s*/ Open.read(file(name)).split "\n" when :yaml Misc.load_yaml(file(name)) when :marshal Marshal.load(Open.open(file(name))) else Open.read(file(name)) end end
Source
# File lib/rbbt/workflow/step/info.rb, line 44 def load_inputs_from_info if info[:inputs] info_inputs = info[:inputs] if task && task.respond_to?(:inputs) && task.inputs IndiferentHash.setup info_inputs @inputs = NamedArray.setup info_inputs.values_at(*task.inputs.collect{|name| name.to_s}), task.inputs else if Hash === info_inputs @inputs = NamedArray.setup info_inputs.values, info_inputs.keys else @inputs = info_inputs end end else nil end end
Source
# File lib/rbbt/workflow/step/info.rb, line 255 def log(status, message = nil, &block) self.status = status if message self.message Log.uncolor(message) end Step.log(status, message, path, &block) end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 132 def log_dependency_exec(dependency, action) task_name = self.task_name str = Log.color(:reset, "") str << Log.color(:yellow, task_name.to_s || "") str << " " str << Log.color(:magenta, action.to_s) str << " " str << Log.color(:yellow, dependency.task_name.to_s || "") str << " -- " str << "#{Log.color :blue, dependency.path}" Log.info str end
Source
# File lib/rbbt/workflow/step/info.rb, line 219 def log_progress(status, options = {}, &block) Step.log_progress(status, options, file(:progress), &block) end
Source
# File lib/rbbt/workflow/refactor.rb, line 16 def md5_file Step.md5_file(path) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 235 def merge_info(hash) return nil if @exec or info_file.nil? return nil if ! writable? value = Annotated.purge value if defined? Annotated Open.lock(info_file, :lock => info_lock) do i = info(false) i.merge! hash dump = Step.serialize_info(i) @info_cache = IndiferentHash.setup(i) Misc.sensiblewrite(info_file, dump, :force => true, :lock => false) if Open.exists?(info_file) @info_cache_time = Time.now value end end
Source
# File lib/rbbt/workflow/step/info.rb, line 143 def message(message) message = Log.uncolor(message) set_info(:messages, (messages || []) << message) end
Source
# File lib/rbbt/workflow/step/info.rb, line 135 def messages if messages = info[:messages] messages else set_info(:messages, []) if self.respond_to?(:set_info) end || [] end
Source
# File lib/rbbt/workflow/step/status.rb, line 130 def missing? status == :done && ! Open.exists?(path) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 372 def monitor_stream(stream, options = {}, &block) case options[:bar] when TrueClass bar = progress_bar when Hash bar = progress_bar options[:bar] when Numeric bar = progress_bar :max => options[:bar] else bar = options[:bar] end out = if bar.nil? Misc.line_monitor_stream stream, &block elsif (block.nil? || block.arity == 0) Misc.line_monitor_stream stream do bar.tick end elsif block.arity == 1 Misc.line_monitor_stream stream do |line| bar.tick block.call line end elsif block.arity == 2 Misc.line_monitor_stream stream do |line| block.call line, bar end end ConcurrentStream.setup(out, :abort_callback => Proc.new{ Log::ProgressBar.remove_bar(bar, true) if bar }, :callback => Proc.new{ Log::ProgressBar.remove_bar(bar) if bar }) bgzip = (options[:compress] || options[:gzip]).to_s == 'bgzip' bgzip = true if options[:bgzip] gzip = true if options[:compress] || options[:gzip] if bgzip Open.bgzip(out) elsif gzip Open.gzip(out) else out end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 152 def name @name ||= path.sub(/.*\/#{Regexp.quote task_name.to_s}\/(.*)/, '\1') end
Source
# File lib/rbbt/workflow/step/status.rb, line 108 def noinfo? status == :noinfo end
Source
# File lib/rbbt/workflow/step/status.rb, line 138 def nopid? ! Open.exists?(pid_file) && ! (status.nil? || status == :aborted || status == :done || status == :error || status == :cleaned) end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 538 def overriden? return @overriden return true if @overriden return true if dependencies && dependencies.select{|dep| TrueClass === dep.overriden }.any? info[:archived_info].each do |f,i| next if Symbol === i return true if i[:overriden] || i["overriden"] end if info[:archived_info] return false end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 559 def overriden_deps ord = [] deps = dependencies.dup while dep = deps.shift case dep.overriden when FalseClass next when Symbol ord << dep else deps += dep.dependencies end end ord end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 179 def pid_file @pid_file ||= Step.pid_file(path) end
Source
# File lib/rbbt/workflow/step/info.rb, line 223 def progress_bar(msg = "Progress", options = nil, &block) if Hash === msg and options.nil? options = msg msg = nil end options = {} if options.nil? max = options[:max] bar = Log::ProgressBar.new_bar(max, {:desc => msg, :file => (@exec ? nil : file(:progress))}.merge(options)) if block_given? bar.init res = yield bar bar.remove res else bar end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 325 def provenance provenance = {} dependencies.each do |dep| next unless dep.path.exists? if Open.exists? dep.info_file provenance[dep.path] = dep.provenance if Open.exists? dep.path else provenance[dep.path] = nil end end {:inputs => info[:inputs], :provenance => provenance} end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 338 def provenance_paths provenance = {} dependencies.each do |dep| provenance[dep.path] = dep.provenance_paths if Open.exists? dep.path end provenance end
Source
# File lib/rbbt/util/python/step.rb, line 12 def python(python = nil, options = {}, &block) begin RbbtPython.add_path self.workflow.libdir.python.find rescue Log.warn "Error loading libdir python for workflow '#{Misc.fingerprint self.workflow}'" end case python when Path python_file python.find, options when String if Open.exists?(python) python_file python else TmpFile.with_file do |dir| pkg = "pkg#{rand(100)}" Open.write File.join(dir, "#{pkg}/__init__.py"), code RbbtPython.add_path dir Misc.in_dir dir do yield pkg end end end else python_block(python, &block) end end
Source
# File lib/rbbt/util/python/step.rb, line 8 def python_block(options = {}, &block) RbbtPython.run options, &block end
Source
# File lib/rbbt/util/python/step.rb, line 4 def python_file(file, options = {}) CMD.cmd_log(:python, file, options) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 365 def rec_access access rec_dependencies.each do |dep| dep.access end end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 583 def rec_dependencies(connected = false, seen = []) # A step result with no info_file means that it was manually # placed. In that case, do not consider its dependencies return [] if ! (defined? WorkflowRemoteClient && WorkflowRemoteClient::RemoteStep === self) && ! Open.exists?(self.info_file) && Open.exists?(self.path.to_s) return [] if dependencies.nil? or dependencies.empty? if self.overriden? archived_deps = [] else archived_deps = self.info[:archived_info] ? self.info[:archived_info].keys : [] end new_dependencies = [] dependencies.each{|step| #next if self.done? && Open.exists?(info_file) && info[:dependencies] && info[:dependencies].select{|task,name,path| path == step.path }.empty? next if archived_deps.include? step.path next if seen.include? step next if step.done? && connected && ! step.updatable? r = step.rec_dependencies(connected, new_dependencies) new_dependencies.concat r new_dependencies << step } new_dependencies.uniq end
connected = true means that dependency searching ends when a result is done but dependencies are absent, meanining that the file could have been dropped in
Source
# File lib/rbbt/workflow/step/info.rb, line 296 def recoverable_error? return true if aborted? return false unless error? begin return true unless info[:exception] klass = Kernel.const_get(info[:exception][:class]) ! (klass <= RbbtException ) rescue Exception true end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 420 def relocated? done? && info[:path] && info[:path] != path end
Source
# File lib/rbbt/workflow/refactor.rb, line 22 def reset_info(info = {}) if ENV["BATCH_SYSTEM"] info = info.dup info[:batch_system] = ENV["BATCH_SYSTEM"] info[:batch_job] = ENV["BATCH_JOB_ID"] end save_info(info) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 140 def result_description @result_description ||= if @task.respond_to?(:result_description) @task.result_description else nil end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 148 def result_description=(description) @result_description = description end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 128 def result_type @result_type ||= if @task.respond_to?(:result_type) @task.result_type || info[:result_type] else info[:result_type] end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 136 def result_type=(type) @result_type = type end
Source
# File lib/rbbt/workflow/step/status.rb, line 51 def resumable? (task && task.resumable) || status == :waiting || status == :cleaned end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 250 def run_compute_dependencies(type, list, dep_step = {}) if Array === type type, *rest = type end canfail = (rest && rest.include?(:canfail)) || type == :canfail case type when :canfail list.each do |dep| begin dep.produce rescue RbbtException Log.warn "Allowing failing of #{dep.path}: #{dep.messages.last if dep.messages}" rescue Exception Log.warn "Not Allowing failing of #{dep.path} because #{$!.class} not RbbtException" raise $! end nil end when :produce, :no_dup list.each do |step| Misc.insist do begin step.produce rescue RbbtException raise $! unless canfail || step.canfail? rescue Exception step.exception $! if step.recoverable_error? raise $! else raise StopInsist.new($!) end end end end nil when :bootstrap cpus = rest.nil? ? nil : rest.first if cpus.nil? keys = ['bootstrap'] + list.collect{|d| [d.task_name, d.task_signature] }.flatten.uniq cpus = config('dep_cpus', *keys, :default => [5, list.length / 2].min) elsif Symbol === cpus cpus = config('dep_cpus', cpus, :default => [5, list.length / 2].min) end respawn = rest && rest.include?(:respawn) respawn = false if rest && rest.include?(:norespawn) respawn = rest && rest.include?(:always_respawn) respawn = :always if respawn.nil? Misc.bootstrap(list, cpus, :bar => "Bootstrapping dependencies for #{self.short_path} [#{cpus}]", :respawn => respawn) do |dep| begin Signal.trap(:INT) do dep.abort raise Aborted end Misc.insist do begin dep.produce Log.warn "Error in bootstrap dependency #{dep.path}: #{dep.messages.last}" if dep.error? or dep.aborted? rescue Aborted ex = $! begin dep.abort Log.warn "Aborted bootstrap dependency #{dep.path}: #{dep.messages.last}" if dep.error? or dep.aborted? rescue end raise StopInsist.new(ex) rescue RbbtException if canfail || dep.canfail? Log.warn "Allowing failing of #{dep.path}: #{dep.messages.last}" else Log.warn "NOT Allowing failing of #{dep.path}: #{dep.messages.last}" dep.exception $! if dep.recoverable_error? begin dep.abort rescue end raise $! else raise StopInsist.new($!) end end end end rescue dep.abort raise $! end nil end else list.each do |step| execute_and_dup(step, dep_step, false) end end end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 383 def run_dependencies rec_dependencies = self.rec_dependencies(true) + input_dependencies.reject{|d| d.started? } return if rec_dependencies.empty? all_deps = rec_dependencies + [self] compute_deps = rec_dependencies.collect do |dep| next unless ComputeDependency === dep dep.rec_dependencies + dep.inputs.flatten.select{|i| Step === i} end.compact.flatten.uniq canfail_paths = self.canfail_paths dep_step = {} seen_paths = Set.new all_deps.uniq.each do |step| next if seen_paths.include? step.path seen_paths << step.path begin Step.prepare_for_execution(step) unless step == self rescue DependencyError, DependencyRbbtException raise $! unless canfail_paths.include? step.path end next unless step.dependencies and step.dependencies.any? # ToDo is this really necessary #(step.dependencies + step.input_dependencies).each do |step_dep| step.dependencies.each do |step_dep| next unless step.dependencies.include?(step_dep) next if step_dep.done? or step_dep.running? or (ComputeDependency === step_dep and (step_dep.compute == :nodup or step_dep.compute == :ignore)) dep_step[step_dep.path] ||= [] dep_step[step_dep.path] << step end end produced = [] (dependencies + input_dependencies).each do |dep| next if dep.started? next unless ComputeDependency === dep if dep.compute == :produce dep.produce produced << dep.path end end self.dup_inputs required_dep_paths = [] dep_step.each do |path,list| required_dep_paths << path if (list & dependencies).any? end required_dep_paths.concat dependencies.collect{|dep| dep.path} required_dep_paths.concat input_dependencies.collect{|dep| dep.path} required_dep_paths.concat(dependencies.collect do |dep| [dep.path] + dep.input_dependencies end.flatten) pre_deps = [] simple_dependencies = [] compute_simple_dependencies = {} compute_last_deps = {} seen_paths = Set.new rec_dependencies.uniq.reverse.each do |step| next if seen_paths.include? step.path seen_paths << step.path next unless required_dep_paths.include? step.path required_seen_paths = seen_paths & required_dep_paths inputs = step.inputs inputs = inputs.values if Hash === inputs internal = inputs.select{|i| i.respond_to?(:path) && required_seen_paths.include?(i.path) }.any? if ComputeDependency === step next if produced.include? step.path if internal compute_last_deps[step.compute] ||= [] compute_last_deps[step.compute] << step else compute_simple_dependencies[step.compute] ||= [] compute_simple_dependencies[step.compute] << step end else if internal simple_dependencies << step else simple_dependencies.prepend(step) end end end log :dependencies, "Processing dependencies for #{Log.color :yellow, task_name.to_s || ""}" if compute_simple_dependencies.any? || simple_dependencies.any? || compute_last_deps.any? Log.debug "compute_simple_dependencies: #{Misc.fingerprint(compute_simple_dependencies)} - #{Log.color :blue, self.path}" if compute_simple_dependencies.any? compute_simple_dependencies.each do |type,list| run_compute_dependencies(type, list, dep_step) end Log.low "pre_deps: #{Misc.fingerprint(pre_deps)} - #{Log.color :blue, self.path}" if pre_deps.any? pre_deps.each do |step| next if compute_deps.include? step begin execute_and_dup(step, dep_step, false) rescue Exception raise $! unless canfail_paths.include?(step.path) end end Log.debug "simple_dependencies: #{Misc.fingerprint(simple_dependencies)} - #{Log.color :blue, self.path}" if simple_dependencies.any? simple_dependencies.each do |step| next if compute_deps.include? step begin Exception execute_and_dup(step, dep_step) rescue raise $! unless canfail_paths.include?(step.path) end end Log.low "compute_last_deps: #{Misc.fingerprint(compute_simple_dependencies)} - #{Log.color :blue, self.path}" if compute_simple_dependencies.any? compute_simple_dependencies.each do |type,list| run_compute_dependencies(type, list, dep_step) end dangling_deps = all_deps.reject{|dep| dep.done? || canfail_paths.include?(dep.path) }. select{|dep| dep.waiting? } Log.medium "Aborting (actually not) waiting dangling dependencies #{Misc.fingerprint dangling_deps}" if dangling_deps.any? #dangling_deps.each{|dep| dep.abort } end
Source
# File lib/rbbt/workflow/step/status.rb, line 112 def running? return false if ! (started? || status == :ending) return nil unless Open.exist?(self.pid_file) pid = Open.read(self.pid_file).to_i return false if done? or error? or aborted? if Misc.pid_exists?(pid) pid else done? or error? or aborted? end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 271 def save_file(name, content) content = case when String === content content when Array === content content * "\n" when TSV === content content.to_s when Hash === content content.collect{|*p| p * "\t"} * "\n" else content.to_s end Open.write(file(name), content) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 216 def set_info(key, value) return nil if @exec or info_file.nil? return nil if ! writable? value = Annotated.purge value if defined? Annotated Open.lock(info_file, :lock => info_lock) do i = info(false).dup value = Annotated.purge(value) i[key] = value dump = Step.serialize_info(i) @info_cache = IndiferentHash.setup(i) Misc.sensiblewrite(info_file, dump, :force => true, :lock => false) if Open.exists?(info_file) @info_cache_time = Time.now value end end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 156 def short_path [task_name, name] * "/" end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 160 def short_path_real [(Symbol === overriden ? overriden : task_name).to_s, name] * "/" end
Source
# File lib/rbbt/workflow/refactor.rb, line 57 def soft_grace sleep 1 until Open.exists?(info_file) end
Source
# File lib/rbbt/workflow/step/status.rb, line 126 def stalled? started? && ! (done? || running? || done? || error? || aborted?) end
Source
# File lib/rbbt/workflow/step/status.rb, line 55 def started? Open.exists?(path) or (Open.exists?(pid_file) && Open.exists?(info_file)) end
Source
# File lib/rbbt/workflow/step/info.rb, line 122 def status begin info[:status] rescue Exception Log.error "Exception reading status: #{$!.message}" :error end end
Source
# File lib/rbbt/workflow/step/info.rb, line 131 def status=(status) set_info(:status, status) end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 192 def status_lock return @mutex #@status_lock = begin # path = Persist.persistence_path(info_file + '.status.lock', {:dir => Step.lock_dir}) # Lockfile.new path, :refresh => false, :dont_use_lock_id => true # end if @status_lock.nil? #@status_lock end
Source
# File lib/rbbt/workflow/step/dependencies.rb, line 523 def stop_dependencies return if dependencies.nil? dependencies.each do |dep| if dep.nil? Log.warn "Dependency is nil #{Misc.fingerprint step} -- #{Misc.fingerprint dependencies}" next end next if dep.done? or dep.aborted? dep.abort if dep.running? end kill_children end
Source
# File lib/rbbt/workflow/step/status.rb, line 104 def streaming? (IO === @result) or (not @saved_stream.nil?) or status == :streaming end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 169 def task_signature [workflow.to_s, task_name] * "#" end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 256 def tmp_path @tmp_path ||= Step.tmp_path path end
Source
# File lib/rbbt/workflow/step/status.rb, line 59 def waiting? Open.exists?(info_file) and not started? end
Source
# File lib/rbbt/workflow/step/accessor.rb, line 164 def workflow_short_path return short_path unless workflow workflow.to_s + "#" + short_path end