module HPC::Orchestration
Public Class Methods
Source
# File lib/rbbt/hpc/orchestrate/rules.rb, line 34 def self.accumulate_rules(current, new) return IndiferentHash.setup({}) if (new.nil? || new.empty?) && (current.nil? || current.empty?) return IndiferentHash.setup(current.dup) if new.nil? || new.empty? return IndiferentHash.setup(new.dup) if current.nil? || current.empty? target = IndiferentHash.setup(current.dup) new.each do |k,value| case k.to_s when "config_keys" target[k] = add_config_keys target["config_keys"], value when "cpus" target[k] = [target[k], value].compact.sort_by{|v| v.to_i}.last when "time" target[k] = Misc.format_seconds [target[k], value].compact.inject(0){|acc,t| acc += Misc.timespan t } when "skip" skip = target[k] && value target.delete k unless skip else next if target.include?(k) target[k] = value end end target end
Source
# File lib/rbbt/hpc/orchestrate/batches.rb, line 78 def self.add_batch_deps(batches) batches.each do |batch| jobs = batch[:jobs] all_deps = jobs.collect{|d| job_dependencies(d) }.flatten.uniq - jobs minimum = all_deps all_deps.each do |dep| minimum -= job_dependencies(dep) end all_deps = minimum deps = all_deps.collect do |d| (batches - [batch]).select{|batch| batch[:jobs].collect(&:path).include? d.path } end.flatten.uniq batch[:deps] = deps end batches end
Source
# File lib/rbbt/hpc/orchestrate/rules.rb, line 3 def self.add_config_keys(current, new) if current.nil? new else new + ',' + current end.gsub(/,\s*/,',').split(",").reverse.uniq.reverse * "," end
Source
# File lib/rbbt/hpc/orchestrate/batches.rb, line 99 def self.add_rules_and_consolidate(rules, batches) chain_rules = parse_chains(rules) batches.each do |batch| job_rules = batch[:jobs].inject(nil) do |acc,job| workflow = job.workflow task_name = job.task_name task_rules = task_specific_rules(rules, workflow, task_name) acc = accumulate_rules(acc, task_rules.dup) end if chain = batch[:chain] batch[:rules] = merge_rules(chain_rules[chain][:rules].dup, job_rules) else batch[:rules] = job_rules end end begin batches.each do |batch| batch[:deps] = batch[:deps].collect do |dep| dep[:target] || dep end if batch[:deps] end batches.each do |batch| next if batch[:top_level].overriden? next unless batch[:rules][:skip] batch[:rules].delete :skip next if batch[:deps].nil? if batch[:deps].any? batch_dep_jobs = batch[:top_level].rec_dependencies target = batch[:deps].select do |target| batch_dep_jobs.include?(target[:top_level]) && # Don't piggyback batches that are an input dependency, only real dependencies (batch[:deps] - [target] - target[:deps]).empty? end.first next if target.nil? target[:jobs] = batch[:jobs] + target[:jobs] target[:deps] = (target[:deps] + batch[:deps]).uniq - [target] target[:top_level] = batch[:top_level] target[:rules] = accumulate_rules(target[:rules], batch[:rules]) batch[:target] = target end raise TryAgain end rescue TryAgain retry end batches.delete_if{|b| b[:target] } batches end
Source
# File lib/rbbt/hpc/orchestrate/batches.rb, line 45 def self.chain_batches(rules, chains, workload) chain_rules = parse_chains(rules) batches = [] while job = workload.pop matches = chains.select{|name,info| info[:jobs].include? job } if matches.any? name, info = matches.sort_by do |name,info| num_jobs = info[:jobs].length total_tasks = chain_rules[name][:tasks].values.flatten.uniq.length num_jobs.to_f + 1/total_tasks end.last workload = workload - info[:jobs] info[:chain] = name batch = info else batch = {:jobs => [job], :top_level => job} end chains.delete_if{|name,info| batch[:jobs].include? info[:top_level] } chains.each do |name,info| info[:jobs] = info[:jobs] - batch[:jobs] end chains.delete_if{|name,info| info[:jobs].length < 2 } batches << batch end batches end
Source
# File lib/rbbt/hpc/orchestrate/chains.rb, line 3 def self.check_chains(chains, job) return [] if Symbol === job.overriden_task matches = [] chains.each do |name, chain| workflow = job.overriden_workflow || job.workflow task_name = job.overriden_workflow || job.task_name next unless chain[:tasks].include?(workflow.to_s) next unless chain[:tasks][workflow.to_s].include?(task_name.to_s) matches << name end matches end
Source
# File lib/rbbt/hpc/orchestrate/batches.rb, line 154 def self.job_batches(rules, job) job_chains = self.job_chains(rules, job).dup workload = job_workload(job).uniq batches = chain_batches(rules, job_chains, workload) batches = add_batch_deps(batches) batches = add_rules_and_consolidate(rules, batches) end
Source
# File lib/rbbt/hpc/orchestrate/chains.rb, line 62 def self.job_chains(rules, job, computed = {}) computed[Misc.fingerprint([rules, job.path, job.object_id])] ||= begin chains = self.parse_chains(rules) matches = check_chains(chains, job) dependencies = job_dependencies(job) job_chains = [] new_job_chains = {} dependencies.each do |dep| dep_matches = check_chains(chains, dep) common = matches & dep_matches dep_chains = job_chains(rules, dep, computed) found = [] dep_chains.each do |match,info| if common.include?(match) found << match new_info = new_job_chains[match] ||= {} new_info[:jobs] ||= [] new_info[:jobs].concat info[:jobs] new_info[:top_level] = job else job_chains << [match, info] end end (common - found).each do |match| info = {} info[:jobs] = [job, dep] info[:top_level] = job job_chains << [match, info] end end new_job_chains.each do |match,info| info[:jobs].prepend job job_chains << [match, info] end job_chains end end
Source
# File lib/rbbt/hpc/orchestrate/chains.rb, line 58 def self.job_dependencies(job) (job.dependencies + job.input_dependencies).uniq.select{|d| ! d.done? || d.dirty? } end
Source
# File lib/rbbt/hpc/orchestrate/rules.rb, line 68 def self.job_rules(rules, job, force = false) return {} if (job.done? or job.error?) && ! force job_rules = task_specific_rules(rules, job.workflow.to_s, job.task_name.to_s) job.dependencies.each do |dep| job_rules = accumulate_rules(job_rules, job_rules(rules, dep)) end job_rules end
Source
# File lib/rbbt/hpc/orchestrate/batches.rb, line 19 def self.job_workload(job) workload = [] path_jobs = {} path_jobs[job.path] = job heap = [] heap << job.path while job_path = heap.pop job = path_jobs[job_path] next if job.done? workload << job deps = job_dependencies(job) deps.each do |d| path_jobs[d.path] ||= d end heap.concat deps.collect(&:path) heap.uniq! end workload.uniq end
Source
# File lib/rbbt/hpc/orchestrate/rules.rb, line 17 def self.merge_rules(current, new) return IndiferentHash.setup({}) if (new.nil? || new.empty?) && (current.nil? || current.empty?) return IndiferentHash.setup(current.dup) if new.nil? || new.empty? return IndiferentHash.setup(new.dup) if current.nil? || current.empty? target = IndiferentHash.setup(current.dup) new.each do |k,value| case k.to_s when "config_keys" target[k] = add_config_keys target["config_keys"], value else next if target.include?(k) target[k] = value end end target end
Source
# File lib/rbbt/hpc/orchestrate.rb, line 25 def self.orchestration_rules(orchestration_rules_file = nil) rules = {} if orchestration_rules_file if Open.exists?(orchestration_rules_file) rules = Misc.load_yaml(orchestration_rules_file) elsif Rbbt.etc.batch[orchestration_rules_file].exists? rules = Misc.load_yaml(Rbbt.etc.batch[orchestration_rules_file]) elsif Rbbt.etc.batch[orchestration_rules_file + '.yaml'].exists? rules = Misc.load_yaml(Rbbt.etc.batch[orchestration_rules_file + '.yaml']) else raise "Orchestration rules file not found: #{orchestration_rules_file}" end elsif Rbbt.etc.batch["default.yaml"].exists? rules = Misc.load_yaml(Rbbt.etc.batch["default.yaml"]) end IndiferentHash.setup(rules) end
Source
# File lib/rbbt/hpc/orchestrate/chains.rb, line 16 def self.parse_chains(rules) chains = IndiferentHash.setup({}) rules.each do |workflow,rules| next unless rules["chains"] rules["chains"].each do |name,rules| rules = IndiferentHash.setup(rules.dup) chain_tasks = rules.delete(:tasks).split(/,\s*/) workflow = rules.delete(:workflow) if rules.include?(:workflow) chain_tasks.each do |task| chain_workflow, chain_task = task.split("#") chain_task, chain_workflow = chain_workflow, workflow if chain_task.nil? or chain_tasks.empty? chains[name] ||= IndiferentHash.setup({:tasks => {}, :rules => rules }) chains[name][:tasks][chain_workflow] ||= [] chains[name][:tasks][chain_workflow] << chain_task end end end return chains if rules["chains"].nil? rules["chains"].each do |name,rules| rules = IndiferentHash.setup(rules.dup) chain_tasks = rules.delete(:tasks).split(/,\s*/) workflow = rules.delete(:workflow) chain_tasks.each do |task| chain_workflow, chain_task = task.split("#") chain_task, chain_workflow = chain_workflow, workflow if chain_task.nil? or chain_tasks.empty? chains[name] ||= IndiferentHash.setup({:tasks => {}, :rules => rules }) chains[name][:tasks][chain_workflow] ||= [] chains[name][:tasks][chain_workflow] << chain_task end end chains end
Source
# File lib/rbbt/hpc/orchestrate/batches.rb, line 7 def self.pb(batch) if Array === batch iii :BATCHES batch.each{|b| pb b} iii :END_BATCHES else n = batch.dup n[:deps] = n[:deps].collect{|b| b[:top_level] } if n[:deps] iif n end end
Source
# File lib/rbbt/hpc/orchestrate/rules.rb, line 58 def self.task_specific_rules(rules, workflow, task) defaults = rules[:defaults] || {} workflow = workflow.to_s task = task.to_s return defaults if rules[workflow].nil? workflow_rules = merge_rules(workflow_rules(rules, workflow), defaults) return IndiferentHash.setup(workflow_rules.dup) if rules[workflow][task].nil? merge_rules(rules[workflow][task], workflow_rules) end
Source
# File lib/rbbt/hpc/orchestrate/rules.rb, line 11 def self.workflow_rules(rules, workflow) return {} if rules[workflow].nil? return {} if rules[workflow]["defaults"].nil? IndiferentHash.setup(rules[workflow]["defaults"]) end
Public Instance Methods
Source
# File lib/rbbt/hpc/orchestrate.old.rb, line 74 def get_chains(job, rules, chains = {}) job_rules = self.job_rules(rules, job) job_deps = get_job_dependencies(job) input_deps = [] job.rec_dependencies.each do |dep| input_deps.concat dep.input_dependencies end job_deps.each do |dep| input_deps.concat dep.input_dependencies get_chains(dep, rules, chains) end job_deps.select do |dep| chained = job_rules["chain_tasks"] && job_rules["chain_tasks"][job.workflow.to_s] && job_rules["chain_tasks"][job.workflow.to_s].include?(job.task_name.to_s) && job_rules["chain_tasks"][dep.workflow.to_s] && job_rules["chain_tasks"][dep.workflow.to_s].include?(dep.task_name.to_s) dep_skip = dep.done? && ! input_deps.include?(dep) && self.job_rules(rules, dep)["skip"] chained || dep_skip end.each do |dep| chains[job] ||= [] chains[job] << dep chains[job].concat chains[dep] if chains[dep] chains[job].uniq! end chains end
Source
# File lib/rbbt/hpc/orchestrate.old.rb, line 55 def get_job_dependencies(job, job_rules = nil) deps = job.dependencies || [] deps += job.input_dependencies || [] deps end
Source
# File lib/rbbt/hpc/orchestrate.old.rb, line 61 def get_recursive_job_dependencies(job) deps = get_job_dependencies(job) (deps + deps.collect{|dep| get_recursive_job_dependencies(dep) }).flatten end
Source
# File lib/rbbt/hpc/orchestrate.old.rb, line 4 def job_rules(rules, job) workflow = job.workflow.to_s task_name = job.task_name.to_s task_name = job.overriden.to_s if Symbol === job.overriden defaults = rules["defaults"] || {} defaults = defaults.merge(rules[workflow]["defaults"] || {}) if rules[workflow] job_rules = IndiferentHash.setup(defaults.dup) rules["chains"].each do |name,info| IndiferentHash.setup(info) chain_tasks = info[:tasks].split(/,\s*/) chain_tasks.each do |task| task_workflow, chain_task = task.split("#") chain_task, task_workflow = task_workflow, info[:workflow] if chain_task.nil? or chain_tasks.empty? job_rules["chain_tasks"] ||= {} job_rules["chain_tasks"][task_workflow] ||= [] job_rules["chain_tasks"][task_workflow] << chain_task next unless task_name == chain_task.to_s && workflow == task_workflow.to_s config_keys = job_rules.delete :config_keys job_rules = IndiferentHash.setup(job_rules.merge(info)) if config_keys config_keys.gsub!(/,\s+/,',') job_rules[:config_keys] = job_rules[:config_keys] ? config_keys + "," + job_rules[:config_keys] : config_keys end end if job_rules["chain_tasks"][workflow] && job_rules["chain_tasks"][workflow].include?(task_name) break else job_rules.delete "chain_tasks" end end if rules["chains"] config_keys = job_rules.delete :config_keys job_rules = IndiferentHash.setup(job_rules.merge(rules[workflow][task_name])) if rules[workflow] && rules[workflow][task_name] if config_keys config_keys.gsub!(/,\s+/,',') job_rules[:config_keys] = job_rules[:config_keys] ? config_keys + "," + job_rules[:config_keys] : config_keys end if rules["skip"] && rules["skip"][workflow] job_rules["skip"] = true if rules["skip"][workflow].split(/,\s*/).include? task_name end job_rules end
Source
# File lib/rbbt/hpc/orchestrate.old.rb, line 200 def orchestrate_job(job, options) options.delete "recursive_clean" options.delete "clean_task" options.delete "clean" options.delete "tail" options.delete "printpath" options.delete "detach" options.delete "jobname" rules = Misc.load_yaml(options[:orchestration_rules]) if options[:orchestration_rules] rules ||= {} IndiferentHash.setup(rules) chains = get_chains(job, rules) workload(job, rules, chains, options) end
Source
# File lib/rbbt/hpc/orchestrate.old.rb, line 66 def piggyback(job, job_rules, job_deps) return false unless job_rules["skip"] final_deps = job_deps - job_deps.collect{|dep| get_recursive_job_dependencies(dep)}.flatten.uniq final_deps = final_deps.reject{|dep| dep.done? } return final_deps.first if final_deps.length == 1 return false end
Source
# File lib/rbbt/hpc/orchestrate.rb, line 8 def prepare_for_execution(job) rec_dependencies = job.rec_dependencies(true) return if rec_dependencies.empty? all_deps = rec_dependencies + [job] all_deps.each do |dep| begin dep.clean if (dep.error? && dep.recoverable_error?) || dep.aborted? || (dep.done? && dep.updated?) rescue RbbtException next end end end
Source
# File lib/rbbt/hpc/orchestrate.rb, line 114 def produce_jobs(jobs, options = {}) jobs.each do |job| self.orchestrate_job(job, options) end Step.wait_for_jobs jobs end
Source
# File lib/rbbt/hpc/orchestrate.old.rb, line 105 def workload(job, rules, chains, options, seen = nil) return [] if job.done? if seen.nil? seen = {} target_job = true end job_rules = self.job_rules(rules, job) job_deps = get_job_dependencies(job) chain = chains[job] chain = chain.reject{|j| seen.include? j.path} if chain chain = chain.reject{|dep| dep.done? } if chain piggyback = piggyback(job, job_rules, job_deps) dep_ids = job_deps.collect do |dep| seen[dep.path] ||= nil if chain && chain.include?(dep) #&& ! job.input_dependencies.include?(dep) next_options = IndiferentHash.setup(options.dup) if piggyback and piggyback == dep next_options[:piggyback] ||= [] next_options[:piggyback].push job ids = workload(dep, rules, chains, next_options, seen) else next_options.delete :piggyback ids = workload(dep, rules, chains, next_options, seen) end ids = [ids].flatten.compact.collect{|id| ['canfail', id] * ":"} if job.canfail_paths.include? dep.path seen[dep.path] = ids ids end.compact.flatten.uniq return seen[job.path] || dep_ids if seen.include?(job.path) if piggyback and seen[piggyback.path] return seen[job.path] = seen[piggyback.path] end job_rules.delete :chain_tasks job_rules.delete :tasks job_rules.delete :workflow option_config_keys = options[:config_keys] job_options = IndiferentHash.setup(options.merge(job_rules).merge(:batch_dependencies => dep_ids)) job_options.delete :orchestration_rules config_keys = job_rules.delete(:config_keys) if config_keys config_keys.gsub!(/,\s+/,',') job_options[:config_keys] = job_options[:config_keys] ? config_keys + "," + job_options[:config_keys] : config_keys end if option_config_keys option_config_keys = option_config_keys.gsub(/,\s+/,',') job_options[:config_keys] = job_options[:config_keys] ? job_options[:config_keys] + "," + option_config_keys : option_config_keys end if options[:piggyback] manifest = options[:piggyback].uniq manifest += [job] manifest.concat chain if chain job = options[:piggyback].first job_rules = self.job_rules(rules, job) new_config_keys = self.job_rules(rules, job)[:config_keys] if new_config_keys new_config_keys = new_config_keys.gsub(/,\s+/,',') job_options[:config_keys] = job_options[:config_keys] ? job_options[:config_keys] + "," + new_config_keys : new_config_keys end job_options.delete :piggyback else manifest = [job] manifest.concat chain if chain end manifest.uniq! job_options[:manifest] = manifest.collect{|j| j.task_signature } job_options[:config_keys] = job_options[:config_keys].split(",").uniq * "," if job_options[:config_keys] if options[:dry_run] puts Log.color(:magenta, "Manifest: ") + Log.color(:blue, job_options[:manifest] * ", ") + " - tasks: #{job_options[:task_cpus] || 1} - time: #{job_options[:time]} - config: #{job_options[:config_keys]}" puts Log.color(:yellow, "Deps: ") + Log.color(:blue, job_options[:batch_dependencies]*", ") job_options[:manifest].first else run_job(job, job_options) end end