module Workflow
Constants
- DEFAULT_NAME
- FORGET_DEP_TASKS
- OUTPUT_FIELDS
- REMOVE_DEP_TASKS
Attributes
Public Class Methods
Source
# File lib/rbbt/workflow/doc.rb, line 21 def self.doc_parse_chunks(str, pattern) parts = str.split(pattern) return {} if parts.length < 2 tasks = Hash[*parts[1..-1].collect{|v| v.strip}] tasks.delete_if{|t,d| d.empty?} tasks end
Source
# File lib/rbbt/workflow/doc.rb, line 3 def self.doc_parse_first_line(str) if str.match(/^([^\n]*)\n\n(.*)/sm) str.replace $2 $1 else "" end end
Source
# File lib/rbbt/workflow/doc.rb, line 12 def self.doc_parse_up_to(str, pattern, keep = false) pre, _pat, _post = str.partition pattern if _pat [pre, (keep ? _pat << _post : _post)] else _post end end
Source
# File lib/rbbt/workflow/accessor.rb, line 19 def self.job_path?(path) path.split("/")[-4] == "jobs" end
Source
# File lib/rbbt/workflow/refactor/inputs.rb, line 15 def self.load_inputs(dir, names, types) inputs = IndiferentHash.setup({}) names.zip(types) do |name, type| filename = File.join(directory, name.to_s) value = Task.load_input_from_file(filename, name, type) inputs[name] = value unless value.nil? end inputs end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 45 def self.nextflow_file_params(file) Open.read(file).scan(/params\.\w+/).collect{|p| p.split(".").last}.uniq end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 49 def self.nextflow_includes(file) Open.read(file).scan(/^include\s*{\s*([^\s]*?)\s+.*?}\s*from\s+["'](.*?)["'](?:\s*params.*)?/).collect{|p| p}.uniq end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 53 def self.nextflow_recursive_params(file) params = nextflow_file_params(file) dir = File.dirname(file) nextflow_includes(file).inject(params) do |params,info| name_str, included_file = info included_file = File.join(dir, included_file) included_file += '.nf' unless File.exist?(included_file) || ! File.exist?(included_file + '.nf') name_str.split(";").each do |name| name = name.strip begin include_params = nextflow_recursive_params(included_file).collect{|p| [p,name] * "-"} params += include_params rescue end end params end end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 5 def self.parse_nextflow_schema(file) doc = Open.open(file){|f| JSON.parse(f.read) } description = doc["description"] properties = {} required = [] properties[nil] = doc["properties"] if doc["properties"] required.concat doc["required"] if doc["required"] doc["definitions"].each do |section,section_info| next unless section_info["properties"] name = section_info["title"] || section properties[name] = section_info["properties"] required.concat section_info["required"] if section_info["required"] if section_info["required"] end if doc["definitions"] required = required.compact.flatten parameters = {} properties.each do |section,param_info| param_info.each do |name,info| input_options = {} type = info["type"] format = info["format"] input_desc = info["description"] input_section = info["description"] input_required = required.include?(name) input_options[:required] = true if input_required && ! OUTPUT_FIELDS.include?(name) if info.include?("enum") type = 'select' input_options[:select_options] = info["enum"] end parameters[name] = {type: type, format: format, description: input_desc, options: input_options, section: section} end end [description, parameters] end
Source
# File lib/rbbt/workflow/doc.rb, line 29 def self.parse_workflow_doc(doc) title = doc_parse_first_line doc description, task_info = doc_parse_up_to doc, /^# Tasks/i task_description, tasks = doc_parse_up_to task_info, /^##/, true tasks = doc_parse_chunks tasks, /^## (.*)/ {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks} end
Source
# File lib/rbbt/workflow/util/trace.rb, line 115 def self.plot_trace_job_times(data, plot, width=800, height=800) data.R <<-EOF, [:svg] rbbt.require('dplyr') rbbt.require('tidyr') rbbt.require('ggplot2') names(data) <- make.names(names(data)) data$id = data$Code data$content = data$Task data$start = data$Start data$end = data$End data$Project = data$Workflow tasks = data #theme_gantt <- function(base_size=11, base_family="Source Sans Pro Light") { theme_gantt <- function(base_size=11, base_family="Sans Serif") { ret <- theme_bw(base_size, base_family) %+replace% theme(panel.background = element_rect(fill="#ffffff", colour=NA), axis.title.x=element_text(vjust=-0.2), axis.title.y=element_text(vjust=1.5), title=element_text(vjust=1.2, family="Source Sans Pro Semibold"), panel.border = element_blank(), axis.line=element_blank(), panel.grid.minor=element_blank(), panel.grid.major.y = element_blank(), panel.grid.major.x = element_line(size=0.5, colour="grey80"), axis.ticks=element_blank(), legend.position="bottom", axis.title=element_text(size=rel(1.2), family="Source Sans Pro Semibold"), strip.text=element_text(size=rel(1.5), family="Source Sans Pro Semibold"), strip.background=element_rect(fill="#ffffff", colour=NA), panel.spacing.y=unit(1.5, "lines"), legend.key = element_blank()) ret } tasks.long <- tasks %>% gather(date.type, task.date, -c(Code,Project, Task, id, Start.second, End.second)) %>% arrange(date.type, task.date) %>% mutate(id = factor(id, levels=rev(unique(id)), ordered=TRUE)) x.breaks <- seq(length(tasks$Task) + 0.5 - 3, 0, by=-3) timeline <- ggplot(tasks.long, aes(y=id, yend=id, x=Start.second, xend=End.second, colour=Task)) + geom_segment() + geom_vline(xintercept=x.breaks, colour="grey80", linetype="dotted") + guides(colour=guide_legend(title=NULL)) + labs(x=NULL, y=NULL) + theme_gantt() + theme(axis.text.x=element_text(angle=45, hjust=1)) rbbt.png_plot('#{plot}', 'plot(timeline)', width=#{width}, height=#{height}, pointsize=6) EOF end
Source
# File lib/rbbt/workflow/refactor.rb, line 95 def require_remote_workflow(wf_name, url) require 'rbbt/workflow/remote_workflow' eval "Object::#{wf_name.split("+").first} = RemoteWorkflow.new '#{ url }', '#{wf_name}'" end
Source
# File lib/rbbt/workflow/refactor.rb, line 100 def require_workflow(wf_name, force_local = true) if Open.remote?(wf_name) or Open.ssh?(wf_name) url = wf_name if Open.ssh?(wf_name) wf_name = File.basename(url.split(":").last) else wf_name = File.basename(url) end begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end original_require_workflow(wf_name) end
Also aliased as: original_require_workflow
Source
# File lib/rbbt/workflow/util/trace.rb, line 220 def self.trace(seed_jobs, options = {}) jobs = [] seed_jobs.each do |step| jobs += step.rec_dependencies + [step] step.info[:archived_info].each do |path,ainfo| next unless Hash === ainfo archived_step = Step.new path archived_step.define_singleton_method :info do ainfo end #class << archived_step # self #end.define_method :info do # ainfo #end jobs << archived_step end if step.info[:archived_info] end jobs = jobs.uniq.sort_by{|job| [job, job.info]; t = job.info[:started] || Open.mtime(job.path) || Time.now; Time === t ? t : Time.parse(t) } report_keys = options[:report_keys] || "" report_keys = report_keys.split(/,\s*/) if String === report_keys data = trace_job_times(jobs, options[:fix_gap], report_keys) summary = trace_job_summary(jobs, report_keys) raise "No jobs to process" if data.size == 0 plot, size, width, height = options.values_at :plot, :size, :width, :height size = 800 if size.nil? width = size.to_i * 2 if width.nil? height = size if height.nil? plot_trace_job_times(data, plot, width, height) if plot if options[:plot_data] data else summary end end
Source
# File lib/rbbt/workflow/util/trace.rb, line 169 def self.trace_job_summary(jobs, report_keys = []) tasks_info = {} report_keys = report_keys.collect{|k| k.to_s} jobs.each do |dep| next unless dep.info[:end] task = [dep.workflow, dep.task_name].compact.collect{|s| s.to_s} * "#" info = tasks_info[task] ||= IndiferentHash.setup({}) dep_info = IndiferentHash.setup(dep.info) ddone = dep_info[:end] started = dep_info[:start] started = Time.parse started if String === started ddone = Time.parse ddone if String === ddone time = ddone - started info[:time] ||= [] info[:time] << time report_keys.each do |key| info[key] = dep_info[key] end dep.info[:config_keys].each do |kinfo| key, value, tokens = kinfo info[key.to_s] = value if report_keys.include? key.to_s end if dep.info[:config_keys] end summary = TSV.setup({}, "Task~Calls,Avg. Time,Total Time#:type=:list") tasks_info.each do |task, info| time_lists = info[:time] avg_time = Misc.mean(time_lists).to_i total_time = Misc.sum(time_lists).to_i calls = time_lists.length summary[task] = [calls, avg_time, total_time] end report_keys.each do |key| summary.add_field Misc.humanize(key) do |task| tasks_info[task][key] end end if Array === report_keys && report_keys.any? summary end
Source
# File lib/rbbt/workflow/util/trace.rb, line 6 def self.trace_job_times(jobs, fix_gap = false, report_keys = nil) data = TSV.setup({}, "Job~Code,Workflow,Task,Start,End#:type=:list") min_start = nil max_done = nil jobs.each do |job| next unless job.info[:end] started = job.info[:start] ddone = job.info[:end] started = Time.parse started if String === started ddone = Time.parse ddone if String === ddone code = [job.workflow, job.task_name].compact.collect{|s| s.to_s} * " · " code = job.name + " - " + code data[job.path] = [code,job.workflow.to_s, job.task_name, started, ddone] if min_start.nil? min_start = started else min_start = started if started < min_start end if max_done.nil? max_done = ddone else max_done = ddone if ddone > max_done end end data.add_field "Start.second" do |k,value| value["Start"] - min_start end data.add_field "End.second" do |k,value| value["End"] - min_start end if fix_gap ranges = [] data.through do |k,values| start, eend = values.values_at "Start.second", "End.second" ranges << (start..eend) end gaps = {} last = nil Misc.collapse_ranges(ranges).each do |range| start = range.begin eend = range.end if last gaps[last] = start - last end last = eend end data.process "End.second" do |value,k,values| gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size}) value - gap end data.process "Start.second" do |value,k,values| gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size}) value - gap end total_gaps = Misc.sum(gaps.collect{|k,v| v}) Log.info "Total gaps: #{total_gaps} seconds" end if report_keys && report_keys.any? job_keys = {} jobs.each do |job| job_info = IndiferentHash.setup(job.info) report_keys.each do |key| job_keys[job.path] ||= {} job_keys[job.path][key] = job_info[key] end end report_keys.each do |key| data.add_field Misc.humanize(key) do |p,values| job_keys[p][key] end end end start = data.column("Start.second").values.flatten.collect{|v| v.to_f}.min eend = data.column("End.second").values.flatten.collect{|v| v.to_f}.max total = eend - start unless eend.nil? || start.nil? Log.info "Total time elapsed: #{total} seconds" if total if report_keys && report_keys.any? job_keys = {} report_keys.each do |key| jobs.each do |job| job_keys[job.path] ||= {} job_keys[job.path][key] = job.info[key] end end report_keys.each do |key| data.add_field Misc.humanize(key) do |p,values| job_keys[p][key] end end end data end
Source
# File lib/rbbt/workflow/refactor.rb, line 124 def workflow_dir @workflow_dir || ENV["RBBT_WORKFLOW_DIR"] || begin workflow_dir_config = Path.setup("etc/workflow_dir") if workflow_dir_config.exists? Path.setup(workflow_dir_config.read.strip) else Path.setup('workflows').find(:user) end end end
Source
# File lib/rbbt/workflow/accessor.rb, line 204 def self.workflow_for(path) begin Kernel.const_get File.dirname(File.dirname(path)) rescue nil end end
Source
# File lib/rbbt/workflow/refactor.rb, line 137 def workflow_repo @workflow_repo || ENV["RBBT_WORKFLOW_REPO"] || begin workflow_repo_config = Path.setup("etc/workflow_repo") if workflow_repo_config.exists? workflow_repo_config.read.strip else 'https://github.com/Rbbt-Workflows/' end end end
Public Instance Methods
Source
# File lib/rbbt/workflow/dependencies.rb, line 106 def assign_dep_inputs(_inputs, options, all_d, task_info) IndiferentHash.setup(_inputs) options.each{|i,v| next if i == :compute or i == "compute" case v when :compute compute = v when Symbol rec_dependency = all_d.flatten.select{|d| d.task_name.to_sym == v }.first if rec_dependency.nil? if _inputs.include?(v) #_inputs[i] = _inputs.delete(v) _inputs[i] = _inputs[v] unless _inputs.include? i #_inputs.delete(v) else _inputs[i] = v unless _inputs.include? i end else input_options = task_info[:input_options][i] || {} #ToDo why was this always true? if input_options[:stream] or true #rec_dependency.run(true).grace unless rec_dependency.done? or rec_dependency.running? _inputs[i] = rec_dependency else rec_dependency.abort if rec_dependency.streaming? and not rec_dependency.running? rec_dependency.clean if rec_dependency.error? or rec_dependency.aborted? if rec_dependency.streaming? and rec_dependency.running? _inputs[i] = rec_dependency.join.load else rec_dependency.run(true) rec_dependency.join _inputs[i] = rec_dependency.load end end end else _inputs[i] = v end } if options _inputs end
Source
# File lib/rbbt/workflow/definition.rb, line 50 def dep(*dependency, &block) @dependencies ||= [] dependency = [tasks.keys.last] if dependency.empty? && ! block_given? if block_given? if dependency.any? wf, task_name, options = dependency options, task_name = task_name, nil if Hash === task_name options, wf = wf, nil if Hash === wf task_name, wf = wf, self if task_name.nil? DependencyBlock.setup block, [wf, task_name, options] end @dependencies << block else if Module === dependency.first or (defined? RemoteWorkflow and RemoteWorkflow === dependency.first) or Hash === dependency.last dependency = ([self] + dependency) unless Module === dependency.first || (defined?(RemoteWorkflow) && RemoteWorkflow === dependency.first) @dependencies << dependency else @dependencies.concat dependency end end end
Source
# File lib/rbbt/workflow/definition.rb, line 79 def dep_task(name, workflow, oname, *rest, &block) dep(workflow, oname, *rest, &block) extension :dep_task unless @extension returns workflow.tasks[oname].result_description if workflow.tasks.include?(oname) unless @result_description task name do raise RbbtException, "dep_task does not have any dependencies" if dependencies.empty? Step.wait_for_jobs dependencies.select{|d| d.streaming? } dep = dependencies.last dep.join raise dep.get_exception if dep.error? raise Aborted, "Aborted dependency #{dep.path}" if dep.aborted? set_info :result_type, dep.info[:result_type] forget = config :forget_dep_tasks, "forget_dep_tasks", :default => FORGET_DEP_TASKS if forget remove = config :remove_dep_tasks, "remove_dep_tasks", :default => REMOVE_DEP_TASKS self.archive_deps self.copy_files_dir self.dependencies = self.dependencies - [dep] Open.rm_rf self.files_dir if Open.exist? self.files_dir FileUtils.cp_r dep.files_dir, self.files_dir if Open.exist?(dep.files_dir) if dep.overriden || ! Workflow.job_path?(dep.path) Open.link dep.path, self.tmp_path else Open.ln_h dep.path, self.tmp_path case remove.to_s when 'true' dep.clean when 'recursive' (dep.dependencies + dep.rec_dependencies).uniq.each do |d| next if d.overriden d.clean unless config(:remove_dep, d.task_signature, d.task_name, d.workflow.to_s, :default => true).to_s == 'false' end dep.clean unless config(:remove_dep, dep.task_signature, dep.task_name, dep.workflow.to_s, :default => true).to_s == 'false' end end else if Open.exists?(dep.files_dir) Open.rm_rf self.files_dir Open.link dep.files_dir, self.files_dir end if defined?(RemoteStep) && RemoteStep === dep Open.write(self.tmp_path, Open.read(dep.path)) else Open.link dep.path, self.path end end nil end end
Also aliased as: task_alias
Source
# File lib/rbbt/workflow/definition.rb, line 34 def desc(description) @description = description end
Source
# File lib/rbbt/workflow/doc.rb, line 37 def documentation_markdown return "" if @libdir.nil? file = @libdir['workflow.md'].find file = @libdir['README.md'].find unless file.exists? if file.exists? file.read else "" end end
Source
# File lib/rbbt/workflow/examples.rb, line 21 def example(task_name, example) task_info(task_name.to_sym)[:input_types].collect do |input,type| next unless example_dir[task_name][example][input].exists? [input, type, example_dir[task_name][example][input].find] end.compact end
Source
# File lib/rbbt/workflow/examples.rb, line 47 def example_inputs(task_name, example) dir = example_dir[task_name][example] info = self.task_info(task_name) Workflow.load_inputs(dir, info[:inputs], info[:input_types]) end
case type when :tsv, :array, :text Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end inputs
end
Source
# File lib/rbbt/workflow/examples.rb, line 53 def example_step(task_name, example="Example", new_inputs = {}) inputs = example_inputs(task_name, example) if new_inputs and new_inputs.any? IndiferentHash.setup(new_inputs) inputs = inputs.merge(new_inputs) end self.job(task_name, example, inputs) end
Source
# File lib/rbbt/workflow/examples.rb, line 8 def examples return {} unless self.libdir.examples.exists? examples = {} example_dir.glob("*/*").each do |example_dir| example = File.basename(example_dir) task_name = File.basename(File.dirname(example_dir)) examples[task_name] ||= [] examples[task_name] << example end IndiferentHash.setup examples examples end
Source
# File lib/rbbt/workflow/definition.rb, line 207 def export_asynchronous(*names) unexport *names asynchronous_exports.concat names asynchronous_exports.uniq! asynchronous_exports end
Also aliased as: export
Source
# File lib/rbbt/workflow/definition.rb, line 193 def export_exec(*names) unexport *names exec_exports.concat names exec_exports.uniq! exec_exports end
Source
# File lib/rbbt/workflow/definition.rb, line 214 def export_stream(*names) unexport *names stream_exports.concat names stream_exports.uniq! stream_exports end
Source
# File lib/rbbt/workflow/definition.rb, line 200 def export_synchronous(*names) unexport *names synchronous_exports.concat names synchronous_exports.uniq! synchronous_exports end
Source
# File lib/rbbt/workflow/definition.rb, line 38 def extension(extension) @extension = extension end
Source
# File lib/rbbt/workflow/refactor.rb, line 79 def fast_load_id(id) path = if Path === directory directory[id].find else File.join(directory, id) end task = task_for path return remote_tasks[task].load_id(id) if remote_tasks && remote_tasks.include?(task) return Workflow.fast_load_step path end
Also aliased as: load_id
Source
# File lib/rbbt/workflow/definition.rb, line 25 def helper(name, *args, &block) if block_given? helpers[name] = block else raise RbbtException, "helper #{name} unkown in #{self} workflow" unless helpers[name] helpers[name].call(*args) end end
Source
# File lib/rbbt/workflow/accessor.rb, line 195 def id_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to workdir_find, path end
def rec_inputs
(taskname)
[taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat(task_from_dep(tn).inputs) }.uniq
end
Source
# File lib/rbbt/workflow/definition.rb, line 223 def import(source, *args) if args.empty? tasks = source.tasks.collect{|n,t| n} + source.helpers.collect{|n,h| n } else tasks = args.flatten end tasks.each do |task| Log.high "Task #{task} from #{source.to_s} is already present in #{self.to_s} and will be cloacked" if self.tasks.include? task.to_sym self.tasks[task.to_sym] = source.tasks[task.to_sym] if source.tasks.include? task.to_sym self.task_dependencies[task.to_sym] = source.task_dependencies[task.to_sym] if source.tasks.include? task.to_sym self.task_description[task.to_sym] = source.task_description[task.to_sym] if source.tasks.include? task.to_sym self.helpers[task.to_sym] = source.helpers[task.to_sym] if source.helpers.include? task.to_sym end end
Source
# File lib/rbbt/workflow/integration/cromwell.rb, line 18 def load_cromwell(file) jar = Rbbt.software.opt.jar["wdltool.jar"].produce.find inputs = JSON.load(CMD.cmd("java -jar '#{jar}' inputs '#{file}'")) workflow_inputs = {} inputs.each do |input,input_type| workflow, task, input_name = input.split(".") workflow_inputs[workflow] ||= {} if input_name.nil? input_name = task else input_name = [task, input_name] * "." end workflow_inputs[workflow][input_name] = input_type end workflow_inputs.each do |workflow,input_list| input_list.each do |input_name,input_type| input_type = input_type.split(" ").last.sub('?','') input_type_fixed = case input_type when "File", "file" :file when "Int" :integer when /Array/ :array else input_type.downcase.to_sym end desc = [workflow, input_name] * "." default = nil input input_name, input_type_fixed, desc, default, :nofile => true end task workflow => :string do |*args| cromwell = file options = {} Misc.in_dir(self.files_dir) do options["metadata-output"] = file('metadata.json') options["inputs"] = file('inputs') cromwell_inputs = {} self.inputs.to_hash.each do |input, value| next if value.nil? key = [workflow.to_s, input] * "." cromwell_inputs[key] = value end Open.write(file('inputs'), cromwell_inputs.to_json ) Cromwell.run_cromwell(cromwell, self.files_dir, options) end Open.read(Dir.glob(File.join(files_dir, "/cromwell-executions/#{workflow}/*/call-*/execution/stdout")).first) end end end
Source
# File lib/rbbt/workflow/doc.rb, line 48 def load_documentation return if @documentation @documentation ||= Workflow.parse_workflow_doc documentation_markdown @documentation[:tasks].each do |task, description| if task.include? "#" workflow, task = task.split("#") workflow = begin Kernel.const_get workflow rescue next end else workflow = self end if workflow.tasks.include? task.to_sym workflow.tasks[task.to_sym].description = description else Log.low "Documentation for #{ task }, but not a #{ workflow.to_s } task" end end end
Source
# File lib/rbbt/workflow/accessor.rb, line 23 def log(status, message = nil, &block) Step.log(status, message, nil, &block) end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 184 def nextflow(path, *args) if File.directory?(path) nextflow_dir path, *args elsif File.exist?(path) nextflow_file path, *args else nextflow_project path, *args end end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 173 def nextflow_dir(path, output = nil) main = File.join(path, 'main.nf') nextflow_file main, File.basename(path), output end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 72 def nextflow_file(file, name = nil, output = nil) name, output = nil, name if Hash === name if Hash === output result, output = output.collect.first else result = :text end dir = Path.setup(File.dirname(file)) nextflow_schema = dir['nextflow_schema.json'] description, params = Workflow.parse_nextflow_schema(nextflow_schema) if nextflow_schema.exists? file = file + '.nf' unless File.exist?(file) || ! File.exist?(file + '.nf') file = File.expand_path(file) name ||= File.basename(file).sub(/\.nf$/,'').gsub(/\s/,'_') Workflow.nextflow_recursive_params(file).each do |param| p,_sep, section = param.partition("-") if ! params.include?(p) params[p] = {type: :string, description: "Undocumented"} end end used_params = [] desc description params.each do |name,info| input name.to_sym, info[:type], info[:description], nil, info[:options].merge(:noload => true) end task name => result do work = file('work') profile = config :profile, :nextflow resume = config :resume, :nextflow config_file = config :config, :nextflow nextflow_inputs = {} inputs.zip(inputs.fields).collect do |v,f| v = if String === v && m = v.match(/^JOB_FILE:(.*)/) file(m[1]) elsif v.nil? Rbbt::Config.get(['nextflow', f] * "_", 'default', f) else v end if f.to_s.include?("-") p,_sep, section = f.to_s.partition("-") name = [section, p] * "." else name = f end case name.to_s when 'outdir' output = nextflow_inputs[name] = v || output || file('output') when 'output' output = nextflow_inputs[name] = v || output || self.tmp_path else nextflow_inputs[name] = v end end current_pwd = FileUtils.pwd Misc.in_dir file('stage') do cmd = "nextflow " cmd += " -C #{config_file}" if config_file cmd += " run" cmd += " -work-dir #{work} -ansi-log false" cmd += " -profile #{profile}" if profile cmd += " -resume" if resume == 'true' Dir.glob(current_pwd + "/*").each do |file| target = File.basename(file) Open.ln_s file, target unless File.exist?(target) end cmd("#{cmd} #{file}", nextflow_inputs.merge('add_option_dashes' => true)) end if output && Open.exists?(output) if File.directory?(output) Dir.glob(output + "/**/*") * "\n" else output_file = output Open.link output, self.tmp_path nil end else work[File.join("*", "*", "*")].glob * "\n" end end end
Source
# File lib/rbbt/workflow/integration/nextflow.rb, line 178 def nextflow_project(project, *args) CMD.cmd_log("nextflow pull #{project}") directory = File.join(ENV["HOME"], '.nextflow/assets', project) nextflow_dir directory, *args end
Source
# File lib/rbbt/workflow/dependencies.rb, line 58 def override_dependencies(inputs) override_dependencies = IndiferentHash.setup({}) return override_dependencies if inputs.nil? inputs.each do |key,value| if String === key && m = key.match(/(.*)#(.*)/) workflow, task = m.values_at 1, 2 workflow = self.to_s if workflow.empty? override_dependencies[workflow] ||= IndiferentHash.setup({}) override_dependencies[workflow][task] = value end end override_dependencies end
Source
# File lib/rbbt/workflow/dependencies.rb, line 151 def real_dependencies(task, orig_jobname, inputs, dependencies) real_dependencies = [] path_deps = {} override_dependencies = override_dependencies(inputs) overriden = false dependencies.each do |dependency| _inputs = IndiferentHash.setup(inputs.dup) jobname = orig_jobname jobname = _inputs[:jobname] if _inputs.include? :jobname real_dep = case dependency when Array workflow, dep_task, options = dependency if override_dependencies[workflow.to_s] && value = override_dependencies[workflow.to_s][dep_task] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, workflow, dep_task) else compute = options[:compute] if options if options && options[:canfail] compute = case compute when nil :canfail when Array compute + [:canfail] else [compute, :canfail] end end all_d = (real_dependencies + real_dependencies.flatten.collect{|d| d.rec_dependencies} ).flatten.compact.uniq _inputs = assign_dep_inputs(_inputs, options, all_d, workflow.task_info(dep_task)) jobname = _inputs.delete :jobname if _inputs.include? :jobname job = workflow._job(dep_task, jobname, _inputs) ComputeDependency.setup(job, compute) if compute overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end when Step job = dependency overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job when Symbol if override_dependencies[self.to_s] && value = override_dependencies[self.to_s][dependency] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, self, dependency) else job = _job(dependency, jobname, _inputs) overriden = true if Symbol === job.overriden && (options.nil? || ! options[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end when Proc if DependencyBlock === dependency orig_dep = dependency.dependency wf, task_name, options = orig_dep if override_dependencies[wf.to_s] && value = override_dependencies[wf.to_s][task_name] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) dep = setup_override_dependency(value, wf, task_name) else options = {} if options.nil? compute = options[:compute] if options[:canfail] compute = case compute when nil :canfail when Array compute + [:canfail] else [compute, :canfail] end end options = IndiferentHash.setup(options.dup) dep = dependency.call jobname, _inputs.merge(options), real_dependencies dep = [dep] unless Array === dep new_=[] dep.each{|d| next if d.nil? if Hash === d d = d.merge(options) d[:workflow] ||= wf d[:task] ||= task_name _override_dependencies = override_dependencies.merge(override_dependencies(d[:inputs] || {})) d = if _override_dependencies[d[:workflow].to_s] && value = _override_dependencies[d[:workflow].to_s][d[:task]] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, d[:workflow], d[:task]) else task_info = d[:workflow].task_info(d[:task]) _inputs = assign_dep_inputs({}, options.merge(d[:inputs] || {}), real_dependencies, task_info) _jobname = d.include?(:jobname) ? d[:jobname] : jobname job = d[:workflow]._job(d[:task], _jobname, _inputs) overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end end ComputeDependency.setup(d, compute) if compute new_ << d } dep = new_ end else _inputs = IndiferentHash.setup(_inputs.dup) dep = dependency.call jobname, _inputs, real_dependencies if Hash === dep dep[:workflow] ||= wf || self _override_dependencies = override_dependencies.merge(override_dependencies(dep[:inputs] || {})) if _override_dependencies[dep[:workflow].to_s] && value = _override_dependencies[dep[:workflow].to_s][dep[:task]] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, dep[:workflow], dep[:task]) else task_info = (dep[:task] && dep[:workflow]) ? dep[:workflow].task_info(dep[:task]) : nil _inputs = assign_dep_inputs({}, dep[:inputs], real_dependencies, task_info) _jobname = dep.include?(:jobname) ? dep[:jobname] : jobname job = dep[:workflow]._job(dep[:task], _jobname, _inputs) overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end end end dep else raise "Dependency for #{task.name} not understood: #{Misc.fingerprint dependency}" end real_dependencies << real_dep end [real_dependencies.flatten.compact, overriden] end
Source
# File lib/rbbt/workflow/dependencies.rb, line 2 def rec_dependencies(taskname, seen = []) @rec_dependencies ||= {} @rec_dependencies[taskname] ||= [] unless task_dependencies.include?(taskname) @rec_dependencies[taskname] ||= begin deps = task_dependencies[taskname] all_deps = [] deps.each do |dep| next if seen.include?(dep) if DependencyBlock === dep all_deps << dep.dependency if dep.dependency else all_deps << dep unless Proc === dep end begin case dep when Array wf, t, o = dep wf.rec_dependencies(t.to_sym, seen + [dep]).each do |d| if Array === d new = d.dup else new = [dep.first, d] end if Hash === o and not o.empty? if Hash === new.last hash = new.last.dup o.each{|k,v| hash[k] ||= v} new[new.length-1] = hash else new.push o.dup end end all_deps << new end if wf && t when String, Symbol rec_deps = rec_dependencies(dep.to_sym, seen + [dep]) all_deps.concat rec_deps when DependencyBlock dep = dep.dependency raise TryAgain end rescue TryAgain retry end end all_deps.uniq end end
Source
# File lib/rbbt/workflow/accessor.rb, line 82 def rec_input_defaults(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject(IndiferentHash.setup({})){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_defaults elsif Symbol === tn new = tasks[tn.to_sym].input_defaults else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end
Source
# File lib/rbbt/workflow/accessor.rb, line 143 def rec_input_descriptions(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_descriptions elsif Symbol === tn new = tasks[tn.to_sym].input_descriptions else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end
Source
# File lib/rbbt/workflow/accessor.rb, line 159 def rec_input_options(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_options elsif Symbol === tn new = tasks[tn.to_sym].input_options else next acc end acc = new.merge(acc) acc = acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end
Source
# File lib/rbbt/workflow/accessor.rb, line 98 def rec_input_types(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_types elsif Symbol === tn new = tasks[tn.to_sym].input_types else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end
Source
# File lib/rbbt/workflow/accessor.rb, line 114 def rec_input_use(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) inputs = {} task.inputs.each do |input| name = task.name workflow = (task.workflow || self).to_s inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end dep_inputs = Task.dep_inputs deps, self dep_inputs.each do |dep,is| name = dep.name workflow = dep.workflow is.each do |input| inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end end inputs end
Source
# File lib/rbbt/workflow/accessor.rb, line 75 def rec_inputs(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) dep_inputs = task.dep_inputs deps, self task.inputs + dep_inputs.values.flatten end
Source
# File lib/rbbt/workflow/definition.rb, line 42 def resumable @resumable = true end
Source
# File lib/rbbt/workflow/definition.rb, line 46 def returns(description) @result_description = description end
Source
# File lib/rbbt/workflow/dependencies.rb, line 72 def setup_override_dependency(dep, workflow, task_name) return [] if dep == :skip || dep == 'skip' unlocated = unlocated_override?(dep) dep = Workflow.load_step(dep) if not Step === dep dep.original_workflow ||= dep.workflow if dep.workflow dep.original_task_name ||= dep.task_name if dep.task_name dep.original_task_name ||= dep.path.split("/")[-3] dep.original_task_name ||= dep.path.split("/")[-2] dep.workflow = workflow dep.info[:name] = dep.name begin workflow = Kernel.const_get workflow if String === workflow dep.task = workflow.tasks[task_name] if dep.task.nil? && workflow.tasks.include?(task_name) rescue Log.exception $! end dep.task_name = task_name dep.overriden = dep.original_task_name.to_sym if dep.original_task_name && dep.original_task_name.to_s != task_name.to_s || ! unlocated dep.extend step_module dep end
Source
# File lib/rbbt/workflow/definition.rb, line 134 def task(name, &block) if Hash === name type = name.first.last name = name.first.first else result_type = consume_result_type || :marshal end name = name.to_sym block = self.method(name) unless block_given? task_info = { :name => name, :inputs => consume_inputs, :description => consume_description, :input_types => consume_input_types, :result_type => (String === type ? type.to_sym : type), :result_description => consume_result_description, :input_defaults => consume_input_defaults, :input_descriptions => consume_input_descriptions, :required_inputs => consume_required_inputs, :extension => consume_extension, :resumable => consume_resumable, :input_options => consume_input_options } task_info[:extension] = case task_info[:result_type].to_s when "tsv" "tsv" when "yaml" "yaml" when "marshal" "marshal" when "json" "json" else nil end if task_info[:extension].nil? task = Task.setup(task_info, &block) last_task = task tasks[name] = task task_dependencies[name] = consume_dependencies task end
Source
# File lib/rbbt/workflow/accessor.rb, line 229 def task_exports [exec_exports, synchronous_exports, asynchronous_exports, stream_exports].compact.flatten.uniq end
Source
# File lib/rbbt/workflow/accessor.rb, line 212 def task_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end workdir_find = File.expand_path(workdir_find) path = File.expand_path(path) dir = File.dirname(path) begin Misc.path_relative_to(workdir_find, dir).sub(/([^\/]+)\/.*/,'\1') rescue nil end end
Source
# File lib/rbbt/workflow/accessor.rb, line 176 def task_from_dep(dep) task = case dep when Array dep.first.tasks[dep[1]] when String tasks[dep.to_sym] when Symbol tasks[dep.to_sym] end raise "Unknown dependency: #{Misc.fingerprint dep}" if task.nil? task end
Source
# File lib/rbbt/workflow/accessor.rb, line 27 def task_info(name) name = name.to_sym task = tasks[name] raise "No '#{name}' task in '#{self.to_s}' Workflow" if task.nil? id = File.join(self.to_s, name.to_s) @task_info ||= {} @task_info[id] ||= begin description = task.description result_description = task.result_description result_type = task.result_type inputs = rec_inputs(name).uniq input_types = rec_input_types(name) input_descriptions = rec_input_descriptions(name) input_use = rec_input_use(name) input_defaults = rec_input_defaults(name) input_options = rec_input_options(name) extension = task.extension export = case when (synchronous_exports.include?(name.to_sym) or synchronous_exports.include?(name.to_s)) :synchronous when (asynchronous_exports.include?(name.to_sym) or asynchronous_exports.include?(name.to_s)) :asynchronous when (exec_exports.include?(name.to_sym) or exec_exports.include?(name.to_s)) :exec when (stream_exports.include?(name.to_sym) or stream_exports.include?(name.to_s)) :stream else :none end dependencies = task_dependencies[name].select{|dep| String === dep or Symbol === dep} { :id => id, :description => description, :export => export, :inputs => inputs, :input_types => input_types, :input_descriptions => input_descriptions, :input_defaults => input_defaults, :input_options => input_options, :input_use => input_use, :result_type => result_type, :result_description => result_description, :dependencies => dependencies, :extension => extension } end end
Source
# File lib/rbbt/workflow/definition.rb, line 184 def unexport(*names) names = names.collect{|n| n.to_s} + names.collect{|n| n.to_sym} names.uniq! exec_exports.replace exec_exports - names if exec_exports synchronous_exports.replace synchronous_exports - names if synchronous_exports asynchronous_exports.replace asynchronous_exports - names if asynchronous_exports stream_exports.replace stream_exports - names if stream_exports end
Source
# File lib/rbbt/workflow/dependencies.rb, line 102 def unlocated_override?(dep) String === dep && ! (Open.exists?(dep) || Open.exists?(dep + '.info')) end