module Rbbt
Constants
- Config
- JOB_DIRS
- LOCK_DIRS
-
LOCK_DIRS
= Rbbt.share.find_all + Rbbt.var.cache.persistence.find_all + Rbbt.var.jobs.find_all +Rbbt.tmp.tsv_open_locks.find_all + Rbbt.tmp.persist_locks.find_all + Rbbt.tmp.sensiblewrite_lock_dir.find_all + Rbbt.tmp.produce_locks.find_all + Rbbt.tmp.step_info_locks.find_all
- MUTEX_FOR_THREAD_EXCLUSIVE
- PERSIST_DIRS
- SENSIBLE_WRITE_DIRS
- VERSIONS
Public Class Methods
Source
# File lib/rbbt/monitor.rb, line 238 def self.__jobs(dirs = JOB_DIRS) job_files = {} dirs.each do |dir| workflow_dirs = dir.glob("*").each do |wdir| workflow = File.basename(wdir) job_files[workflow] = {} task_dirs = wdir.glob('*') task_dirs.each do |tdir| task = File.basename(tdir) job_files[workflow][task] = tdir.glob('*') end end end jobs = {} job_files.each do |workflow,task_jobs| jobs[workflow] ||= {} task_jobs.each do |task, files| jobs[workflow][task] ||= {} files.each do |f| next if f =~ /\.lock$/ job = f.sub(/\.(info|files)/,'') jobs[workflow][task][job] ||= {} if jobs[workflow][task][job][:status].nil? status = nil status = :done if Open.exists? job if status.nil? and f=~/\.info/ info = begin Step::INFO_SERIALIZER.load(Open.read(f, :mode => 'rb')) rescue {} end status = info[:status] pid = info[:pid] end jobs[workflow][task][job][:pid] = pid if pid jobs[workflow][task][job][:status] = status if status end end end end jobs end
REST
Source
# File lib/rbbt/util/version.rb, line 6 def self.add_version(file) dir = Path.setup(Path.caller_lib_dir(file)) return if dir.nil? libname = File.basename(dir).sub('rbbt-','') return if VERSIONS.include? libname version = if dir.VERSION.exists? dir.VERSION.read elsif dir[".git"].exists? begin head = dir[".git"]["HEAD"].read.split(" ").last.strip dir[".git"][head].read.strip rescue nil end elsif libname.include?("-") name,_sep, v = libname.partition("-") if v =~ /^\d+\.\d+\.\d+$/ libname = name v else nil end else nil end return if version.nil? VERSIONS[libname] = version end
Source
# File lib/rbbt/monitor.rb, line 19 def self.dump_memory(file, obj = nil) Log.info "Dumping #{obj} objects into #{ file }" Thread.new do while true Open.write(file) do |f| MUTEX_FOR_THREAD_EXCLUSIVE.synchronize do GC.start ObjectSpace.each_object(obj) do |o| f.puts "---" f.puts(String === o ? o : o.inspect) end end end FileUtils.cp file, file + '.save' sleep 3 end end end
Source
# File lib/rbbt/monitor.rb, line 38 def self.file_time(file) info = {} begin info[:ctime] = File.ctime file info[:atime] = File.atime file info[:elapsed] = Time.now - info[:ctime] rescue Exception end info[:ctime] = Time.now - 999 info end
Source
# File lib/rbbt/monitor.rb, line 126 def self.job_info(workflows = nil, tasks = nil, dirs = JOB_DIRS) require 'rbbt/workflow/step' workflows = [workflows] if workflows and not Array === workflows workflows = workflows.collect{|w| w.to_s} if workflows tasks = [tasks] if tasks and not Array === tasks tasks = tasks.collect{|w| w.to_s} if tasks jobs = {} seen = Set.new _files = Set.new dirs.collect do |dir| next unless Open.exists? dir task_dir_workflows = {} tasks_dirs = if dir == '.' ["."] else #workflowdirs = if (dir_sub_path = Open.find_repo_dir(workflowdir)) # repo_dir, sub_path = dir_sub_path # Open.list_repo_files(*dir_sub_path).collect{|f| f.split("/").first}.uniq.collect{|f| File.join(repo_dir, f)}.uniq # else # dir.glob("*") # end workflowdirs = dir.glob("*") workflowdirs.collect do |workflowdir| workflow = File.basename(workflowdir) next if workflows and not workflows.include? workflow #task_dirs = if (dir_sub_path = Open.find_repo_dir(workflowdir)) # repo_dir, sub_path = dir_sub_path # Open.list_repo_files(*dir_sub_path).collect{|f| f.split("/").first}.uniq.collect{|f| File.join(repo_dir, f)}.uniq # else # workflowdir.glob("*") # end task_dirs = workflowdir.glob("*") task_dirs.each do |tasks_dir| task_dir_workflows[tasks_dir] = workflow end end.compact.flatten end tasks_dirs.collect do |taskdir| task = File.basename(taskdir) next if tasks and not tasks.include? task #files = if (dir_sub_path = Open.find_repo_dir(taskdir)) # repo_dir, sub_path = dir_sub_path # Open.list_repo_files(*dir_sub_path).reject do |f| # f.include?("/.info/") || # f.include?(".files/") || # f.include?(".pid/") || # File.directory?(f) # end.collect do |f| # File.join(repo_dir, f) # end # else # #cmd = "find -L '#{ taskdir }/' -not \\( -path \"#{taskdir}/*.files/*\" -prune \\) -not -name '*.pid' -not -name '*.notify' -not -name '\\.*' 2>/dev/null" # cmd = "find -L '#{ taskdir }/' -not \\( -path \"#{taskdir}/.info/*\" -prune \\) -not \\( -path \"#{taskdir}/*.files/*\" -prune \\) -not -name '*.pid' -not -name '*.md5' -not -name '*.notify' -not -name '\\.*' \\( -not -type d -o -name '*.files' \\) 2>/dev/null" # CMD.cmd(cmd, :pipe => true).read.split("\n") # end files = begin cmd = "find -L '#{ taskdir }/' -not \\( -path \"#{taskdir}/.info/*\" -prune \\) -not \\( -path \"#{taskdir}/*.files/*\" -prune \\) -not -name '*.pid' -not -name '*.md5' -not -name '*.notify' -not -name '\\.*' \\( -not -type d -o -name '*.files' \\) 2>/dev/null" CMD.cmd(cmd, :pipe => true).read.split("\n") end files = files.sort_by{|f| Open.mtime(f) || Time.now} workflow = task_dir_workflows[taskdir] TSV.traverse files, :type => :array, :into => jobs, :_bar => "Finding jobs in #{ taskdir }" do |file| _files << file if m = file.match(/(.*)\.(info|pid|files)$/) file = m[1] end next if seen.include? file seen << file name = file[taskdir.length+1..-1] info_file = file + '.info' info = {} info[:workflow] = workflow info[:task] = task info[:name] = name if Open.exists? file info = info.merge(file_time(file)) info[:done] = true info[:info_file] = Open.exist?(info_file) ? info_file : nil else info = info.merge({:info_file => info_file, :done => false}) end [file, info] end end.compact.flatten end.compact.flatten jobs end
PERSISTS
Source
# File lib/rbbt/monitor.rb, line 283 def self.load_lock(lock) begin info = Misc.insist 3 do Misc.load_yaml(lock) end info.values_at "pid", "ppid", "time" rescue Exception time = begin File.atime(lock) rescue Exception Time.now end [nil, nil, time] end end
Source
# File lib/rbbt/monitor.rb, line 59 def self.lock_info(dirs = LOCK_DIRS) lock_info = {} locks(dirs).each do |f| lock_info[f] = {} begin lock_info[f].merge!(file_time(f)) if File.size(f) > 0 info = Open.open(f) do |s| Misc.load_yaml(s) end IndiferentHash.setup(info) lock_info[f][:pid] = info[:pid] lock_info[f][:ppid] = info[:ppid] end rescue Exception Log.exception $! end end lock_info end
Source
# File lib/rbbt/monitor.rb, line 52 def self.locks(dirs = LOCK_DIRS) dirs.collect do |dir| next unless Open.exists? dir `find -L "#{ dir }" -name "*.lock" 2>/dev/null`.split "\n" end.compact.flatten end
{{{ LOCKS
Source
# File lib/rbbt/util/migrate.rb, line 125 def self.migrate(path, search_path, options = {}) search_path = 'user' if search_path.nil? resource = Rbbt path, real_paths, lpath = migrate_source_paths(path, resource, options[:source]) target = migrate_target_path(lpath, search_path, resource, options[:target]) migrate_files(real_paths, target, options) end
Source
# File lib/rbbt/util/migrate.rb, line 51 def self.migrate_files(real_paths, target, options = {}) excludes = %w(.save .crap .source tmp filecache open-remote) excludes += (options[:exclude] || "").split(/,\s*/) excludes_str = excludes.collect{|s| "--exclude '#{s}'" } * " " hard_link = options[:hard_link] other = options[:other] || [] test_str = options[:test] ? '-nv' : '' real_paths.each do |source_path| Log.low "Migrating #{source_path} #{options[:files].length} files to #{target} - #{Misc.fingerprint(options[:files])}}" if options[:files] if File.directory?(source_path) || source_path.end_with?("/") source_path += "/" unless source_path.end_with? '/' target += "/" unless target.end_with? '/' end next if source_path == target && ! (options[:source] || options[:target]) if options[:target] CMD.cmd("ssh #{options[:target]} mkdir -p '#{File.dirname(target)}'") else Open.mkdir File.dirname(target) end if options[:target] target_path = [options[:target], "'" + target + "'"] * ":" else target_path = "'" + target + "'" end TmpFile.with_file do |tmp_files| if options[:files] Open.write(tmp_files, options[:files] * "\n") files_from_str = "--files-from='#{tmp_files}'" else files_from_str = "" end #cmd = "rsync -avztAXHP --copy-unsafe-links #{test_str} #{files_from_str} #{excludes_str} '#{source_path}' #{target_path} #{other * " "}" # rsync_args = "-avztAXHP --copy-unsafe-links" rsync_args = "-avztHP --copy-unsafe-links --omit-dir-times" rsync_args << " --link-dest '#{source_path}'" if hard_link && ! options[:source] cmd = "rsync #{rsync_args} #{test_str} #{files_from_str} #{excludes_str} '#{source_path}' #{target_path} #{other * " "}" cmd << " && rm -Rf #{source_path}" if options[:delete] && ! options[:files] if options[:print] puts cmd exit 0 else CMD.cmd_log(cmd, :log => Log::HIGH) if options[:delete] && options[:files] remove_files = options[:files].collect{|f| File.join(source_path, f) } dirs = remove_files.select{|f| File.directory? f } remove_files.each do |file| next if dirs.include? file Open.rm file end dirs.each do |dir| FileUtils.rmdir dir if Dir.glob(dir).empty? end end end end end end
Source
# File lib/rbbt/util/migrate.rb, line 3 def self.migrate_source_paths(path, resource = Rbbt, source = nil) if source lpath, *paths = SSHLine.ruby(source, <<-EOF).split("\n") require 'rbbt-util' path = "#{path}" if Open.exists?(path) path = Path.setup(#{resource.to_s}.identify(path)) else path = Path.setup(path) end puts path puts path.find_all.collect{|p| File.directory?(p) ? p + "/" : p } * "\n" EOF [path, paths.collect{|p| [source, p] * ":"}, lpath] else original_path = Path.setup(path) if File.exist?(path) path = resource.identify(path) else path = Path.setup(path) end if original_path.located? paths = [original_path] else path = Path.setup(path) unless Path === path paths = (path.directory? ? path.glob_all : path.find_all) end [path, paths, path] end end
Source
# File lib/rbbt/util/migrate.rb, line 37 def self.migrate_target_path(path, search_path = 'user', resource = Rbbt, target = nil) if target SSHLine.ruby(target, <<-EOF).split("\n").first require 'rbbt-util' path = "#{path}" resource = #{resource.to_s} search_path = "#{search_path}" puts resource[path].find(search_path) EOF else resource[path].find(search_path) end end
Source
# File lib/rbbt/monitor.rb, line 111 def self.persist_info(dirs = PERSIST_DIRS) info = {} persists(dirs).each do |f| begin i = file_time(f) info[f] = i rescue Log.exception $! end end info end
Source
# File lib/rbbt/monitor.rb, line 104 def self.persists(dirs = PERSIST_DIRS) dirs.collect do |dir| next unless Open.exists? dir `find -L "#{ dir }" -name "*.persist" 2>/dev/null`.split "\n" end.compact.flatten end
PERSISTS
Source
# File lib/rbbt/refactor.rb, line 10 def self.relay_module_method(new_mod, new_method, orig_mod, orig_method = nil) orig_method = new_method if orig_method.nil? method = orig_mod.method(orig_method) class << new_mod self end.define_method(new_method, &method) end
Source
# File lib/rbbt/refactor.rb, line 5 def self.require_instead(new_file) Log.low "Requiring #{new_file} instead of #{caller.first}" require new_file end
Source
# File lib/rbbt/monitor.rb, line 89 def self.sensiblewrite_info(dirs = SENSIBLE_WRITE_DIRS) info = {} sensiblewrites(dirs).each do |f| begin i = file_time(f) info[f] = i rescue Log.exception $! end end info end
Source
# File lib/rbbt/monitor.rb, line 82 def self.sensiblewrites(dirs = SENSIBLE_WRITE_DIRS) dirs.collect do |dir| next unless Open.exists? dir `find -L "#{ dir }" -not -name "*.lock" -not -type d 2>/dev/null`.split "\n" end.compact.flatten end
{{{ SENSIBLE WRITES
Source
# File lib/rbbt/util/version.rb, line 37 def self.versions versions = Rbbt::VERSIONS Gem.loaded_specs.keys.each do |gem| next unless gem.include? 'rbbt' name = gem.sub('rbbt-','') next if versions.include? name version = Gem.loaded_specs[gem].version.version versions[name] = version end versions versions.merge(CMD.versions) end