class InParallel::InParallelExecutor
Public Class Methods
private method to execute a block of code in a separate process and store the STDOUT and return value for later retrieval
# File lib/in_parallel.rb, line 184 def self._execute_in_parallel(method_sym, obj = self, &block) ret_val = nil # Communicate the return value of the method or block read_result, write_result = IO.pipe Dir.mkdir('tmp') unless Dir.exists? 'tmp' pid = fork do stdout_file = File.new("tmp/pp_#{Process.pid}", 'w') exit_status = 0 trap(:INT) do # Can't use logger inside of trap puts "Warning: Interrupt received in child process; exiting #{Process.pid}" kill_child_processes return end # IO buffer is 64kb, which isn't much... if debug logging is turned on, # this can be exceeded before a process completes. # Storing output in file rather than using IO.pipe STDOUT.reopen(stdout_file) STDERR.reopen(stdout_file) begin # close subprocess's copy of read_result since it only needs to write read_result.close ret_val = obj.instance_eval(&block) ret_val = strip_singleton(ret_val) # In case there are other types that can't be dumped begin # Write the result to the write_result IO stream. Marshal.dump(ret_val, write_result) unless ret_val.nil? rescue StandardError => err @@logger.warn "Warning: return value from child process #{ret_val} " + "could not be transferred to parent process: #{err.message}" end rescue Exception => err @@logger.error "Error in process #{Process.pid}: #{err.message}" # Return the error if an error is rescued so we can re-throw in the main process. Marshal.dump(err, write_result) exit_status = 1 ensure write_result.close exit exit_status end end @@logger.info "Forked process for #{method_sym} - PID = '#{pid}'" write_result.close # Process.detach returns a thread that will be nil if the process is still running and thr if not. # This allows us to check to see if processes have exited without having to call the blocking Process.wait functions. wait_thread = Process.detach(pid) # store the IO object with the STDOUT and waiting thread for each pid process_info = { :wait_thread => wait_thread, :pid => pid, :method_sym => method_sym, :std_out => "tmp/pp_#{pid}", :result => read_result, :tmp_result => "unresolved_parallel_result_#{@@result_id}", :result_buffer => StringIO.new, :index => @@process_infos.count } @@process_infos.push(process_info) @@result_id += 1 process_info end
# File lib/in_parallel.rb, line 248 def self.fork_supported? @@supported ||= Process.respond_to?(:fork) @@logger.warn 'Warning: Fork is not supported on this OS, executing block normally' unless @@supported @@supported end
# File lib/in_parallel.rb, line 39 def self.logger @@logger end
# File lib/in_parallel.rb, line 43 def self.logger=(value) @@logger = value end
# File lib/in_parallel.rb, line 27 def self.main_pid @@main_pid end
# File lib/in_parallel.rb, line 31 def self.parallel_default_timeout @@parallel_default_timeout end
# File lib/in_parallel.rb, line 35 def self.parallel_default_timeout=(value) @@parallel_default_timeout = value end
# File lib/in_parallel.rb, line 16 def self.process_infos @@process_infos end
Runs all methods within the block in parallel in the background
Example - Will spawn a process in the background to run puppet agent on two agents and return immediately:
Parallel.run_in_background do @result_1 = method1 @result_2 = method2 end # Do something else here before waiting for the process to complete # Optionally wait for the processes to complete before continuing. # Otherwise use run_in_background(true) to clean up the process status and output immediately. wait_for_processes(self)
NOTE: must call get_background_results to allow instance variables in calling object to be set, otherwise @result_1 will evaluate to “unresolved_parallel_result_0”
# File lib/in_parallel.rb, line 79 def self.run_in_background(ignore_result = true, &block) if fork_supported? proxy = BlankBindingParallelProxy.new(block.binding) proxy.instance_eval(&block) if ignore_result Process.detach(@@process_infos.last[:pid]) @@process_infos.pop else @@background_objs << { :proxy => proxy, :target => block.binding } return process_infos.last[:tmp_result] end return end # if fork is not supported result = block.call return nil if ignore_result result end
Runs all methods within the block in parallel and waits for them to complete
Example - will spawn 2 processes, (1 for each method) wait until they both complete, and log STDOUT:
InParallel.run_in_parallel do @result_1 = method1 @result_2 = method2 end
NOTE: Only supports assigning instance variables within the block, not local variables
# File lib/in_parallel.rb, line 55 def self.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block) if fork_supported? proxy = BlankBindingParallelProxy.new(block.binding) proxy.instance_eval(&block) return wait_for_processes(proxy, block.binding, timeout, kill_all_on_error) end # if fork is not supported block.call end
Waits for all processes to complete and logs STDOUT and STDERR in chunks from any processes that were triggered from this Parallel class @param [Object] proxy - The instance of the proxy class that the method was executed within (probably only useful when called by run_in_background
) @param [Object] binding - The binding of the block to assign return values to instance variables (probably only useful when called by run_in_background
) @param [Int] timeout Time in seconds to wait before giving up on a child process @param [Boolean] kill_all_on_error Whether to wait for all processes to complete, or fail immediately - killing all other forked processes - when one process errors.
# File lib/in_parallel.rb, line 104 def self.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false) raise_error = nil timeout ||= @@parallel_default_timeout send_int = false trap(:INT) do # Can't use logger inside of trap puts "Warning, recieved interrupt. Processing child results and exiting." send_int = true kill_child_processes end return unless Process.respond_to?(:fork) # Custom process to wait so that we can do things like time out, and kill child processes if # one process returns with an error before the others complete. results_map = Array.new(@@process_infos.count) start_time = Time.now timer = start_time while !@@process_infos.empty? do if @@parallel_signal_interval > 0 && Time.now > timer + @@parallel_signal_interval @@logger.debug 'Waiting for child processes.' timer = Time.now end if Time.now > start_time + timeout kill_child_processes raise_error = ::RuntimeError.new("Child process ran longer than timeout of #{timeout}") end if result = IO.select(@@process_infos.map {|p| p[:result]}, nil, nil, 0.5) read_ios = result.first read_ios.each do |reader| process_info = @@process_infos.find {|p| p[:result] == reader} process_info[:result_buffer] << reader.read if reader.eof? result = process_info[:result_buffer].string # the process completed, get the result and rethrow on error. begin # Print the STDOUT and STDERR for each process with signals for start and end @@logger.info "------ Begin output for #{process_info[:method_sym]} - #{process_info[:pid]}" # Content from the other thread will already be pre-pended with log stuff (info, warn, date/time, etc) # So don't use logger, just use puts. puts " " + File.new(process_info[:std_out], 'r').readlines.join(" ") @@logger.info "------ Completed output for #{process_info[:method_sym]} - #{process_info[:pid]}" marshalled_result = (result.nil? || result.empty?) ? result : Marshal.load(result) # Kill all other processes and let them log their stdout before re-raising # if a child process raised an error. if marshalled_result.is_a?(Exception) raise_error = marshalled_result.dup kill_child_processes if kill_all_on_error marshalled_result = nil end results_map[process_info[:index]] = { process_info[:tmp_result] => marshalled_result } ensure File.delete(process_info[:std_out]) if File.exists?(process_info[:std_out]) # close the read end pipe process_info[:result].close unless process_info[:result].closed? @@process_infos.delete(process_info) end end end end end results = [] # pass in the 'self' from the block.binding which is the instance of the class # that contains the initial binding call. # This gives us access to the instance variables from that context. results = result_lookup(proxy, binding, results_map) if binding # If there are background_objs AND results, don't return the background obj results # (which would mess up expected results from each_in_parallel), # but do process their results in case they are assigned to instance variables @@background_objs.each { |obj| result_lookup(obj[:proxy], obj[:target], results_map) } @@background_objs.clear Process.kill("INT", Process.pid) if send_int raise raise_error unless raise_error.nil? return results end
Private Class Methods
# File lib/in_parallel.rb, line 254 def self.kill_child_processes @@process_infos.each do |process_info| # Send INT to each child process so it returns and can print stdout and stderr to console before exiting. begin Process.kill("INT", process_info[:pid]) rescue Errno::ESRCH # If one of the other processes has completed in the very short time before we try to kill it, handle the exception end end end
Private method to lookup results from the results_map and replace the temp values with actual return values
# File lib/in_parallel.rb, line 287 def self.result_lookup(proxy_obj, target_obj, results_map) target_obj = eval('self', target_obj) proxy_obj ||= target_obj vars = proxy_obj.instance_variables results = [] results_map.each do |tmp_result| results << tmp_result.values[0] vars.each do |var| if proxy_obj.instance_variable_get(var) == tmp_result.keys[0] target_obj.instance_variable_set(var, tmp_result.values[0]) break end end end results end
# File lib/in_parallel.rb, line 266 def self.strip_singleton(obj) unless (obj.nil? || obj.singleton_methods.empty?) obj = obj.dup end begin obj.singleton_class.class_eval do instance_variables.each { |v| instance_eval("remove_instance_variable(:#{v})") } end rescue TypeError # if no singleton_class exists for the object it raises a TypeError end # Recursively check any objects assigned to instance variables for singleton methods, or variables obj.instance_variables.each do |v| obj.instance_variable_set(v, strip_singleton(obj.instance_variable_get(v))) end obj end