class InParallel::InParallelExecutor

Public Class Methods

_execute_in_parallel(method_sym, obj = self, &block) click to toggle source

private method to execute a block of code in a separate process and store the STDOUT and return value for later retrieval

# File lib/in_parallel.rb, line 184
def self._execute_in_parallel(method_sym, obj = self, &block)
  ret_val                   = nil
  # Communicate the return value of the method or block
  read_result, write_result = IO.pipe
  Dir.mkdir('tmp') unless Dir.exists? 'tmp'
  pid                       = fork do
    stdout_file = File.new("tmp/pp_#{Process.pid}", 'w')
    exit_status = 0
    trap(:INT) do
      # Can't use logger inside of trap
      puts "Warning: Interrupt received in child process; exiting #{Process.pid}"
      kill_child_processes
      return
    end

    # IO buffer is 64kb, which isn't much... if debug logging is turned on,
    # this can be exceeded before a process completes.
    # Storing output in file rather than using IO.pipe
    STDOUT.reopen(stdout_file)
    STDERR.reopen(stdout_file)

    begin
      # close subprocess's copy of read_result since it only needs to write
      read_result.close
      ret_val = obj.instance_eval(&block)
      ret_val = strip_singleton(ret_val)
      # In case there are other types that can't be dumped
      begin
        # Write the result to the write_result IO stream.
        Marshal.dump(ret_val, write_result) unless ret_val.nil?
      rescue StandardError => err
        @@logger.warn "Warning: return value from child process #{ret_val} " +
                          "could not be transferred to parent process: #{err.message}"
      end
    rescue Exception => err
      @@logger.error "Error in process #{Process.pid}: #{err.message}"
      # Return the error if an error is rescued so we can re-throw in the main process.
      Marshal.dump(err, write_result)
      exit_status = 1
    ensure
      write_result.close
      exit exit_status
    end
  end

  @@logger.info "Forked process for #{method_sym} - PID = '#{pid}'"
  write_result.close
  # Process.detach returns a thread that will be nil if the process is still running and thr if not.
  # This allows us to check to see if processes have exited without having to call the blocking Process.wait functions.
  wait_thread  = Process.detach(pid)
  # store the IO object with the STDOUT and waiting thread for each pid
  process_info = { :wait_thread => wait_thread,
                   :pid         => pid,
                   :method_sym  => method_sym,
                   :std_out     => "tmp/pp_#{pid}",
                   :result      => read_result,
                   :tmp_result  => "unresolved_parallel_result_#{@@result_id}",
                   :result_buffer => StringIO.new,
                   :index       => @@process_infos.count }
  @@process_infos.push(process_info)
  @@result_id += 1
  process_info
end
fork_supported?() click to toggle source
# File lib/in_parallel.rb, line 248
def self.fork_supported?
  @@supported ||= Process.respond_to?(:fork)
  @@logger.warn 'Warning: Fork is not supported on this OS, executing block normally' unless @@supported
  @@supported
end
logger() click to toggle source
# File lib/in_parallel.rb, line 39
def self.logger
  @@logger
end
logger=(value) click to toggle source
# File lib/in_parallel.rb, line 43
def self.logger=(value)
  @@logger = value
end
main_pid() click to toggle source
# File lib/in_parallel.rb, line 27
def self.main_pid
  @@main_pid
end
parallel_default_timeout() click to toggle source
# File lib/in_parallel.rb, line 31
def self.parallel_default_timeout
  @@parallel_default_timeout
end
parallel_default_timeout=(value) click to toggle source
# File lib/in_parallel.rb, line 35
def self.parallel_default_timeout=(value)
  @@parallel_default_timeout = value
end
process_infos() click to toggle source
# File lib/in_parallel.rb, line 16
def self.process_infos
  @@process_infos
end
run_in_background(ignore_result = true, &block) click to toggle source

Runs all methods within the block in parallel in the background

Example - Will spawn a process in the background to run puppet agent on two agents and return immediately:

Parallel.run_in_background do
  @result_1 = method1
  @result_2 = method2
end
# Do something else here before waiting for the process to complete

# Optionally wait for the processes to complete before continuing.
# Otherwise use run_in_background(true) to clean up the process status and output immediately.
wait_for_processes(self)

NOTE: must call get_background_results to allow instance variables in calling object to be set, otherwise @result_1 will evaluate to “unresolved_parallel_result_0”

# File lib/in_parallel.rb, line 79
def self.run_in_background(ignore_result = true, &block)
  if fork_supported?
    proxy = BlankBindingParallelProxy.new(block.binding)
    proxy.instance_eval(&block)

    if ignore_result
      Process.detach(@@process_infos.last[:pid])
      @@process_infos.pop
    else
      @@background_objs << { :proxy => proxy, :target => block.binding }
      return process_infos.last[:tmp_result]
    end
    return
  end
  # if fork is not supported
  result = block.call
  return nil if ignore_result
  result
end
run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block) click to toggle source

Runs all methods within the block in parallel and waits for them to complete

Example - will spawn 2 processes, (1 for each method) wait until they both complete, and log STDOUT:

InParallel.run_in_parallel do
  @result_1 = method1
  @result_2 = method2
end

NOTE: Only supports assigning instance variables within the block, not local variables

# File lib/in_parallel.rb, line 55
def self.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block)
  if fork_supported?
    proxy = BlankBindingParallelProxy.new(block.binding)
    proxy.instance_eval(&block)
    return wait_for_processes(proxy, block.binding, timeout, kill_all_on_error)
  end
  # if fork is not supported
  block.call
end
wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false) click to toggle source

Waits for all processes to complete and logs STDOUT and STDERR in chunks from any processes that were triggered from this Parallel class @param [Object] proxy - The instance of the proxy class that the method was executed within (probably only useful when called by run_in_background) @param [Object] binding - The binding of the block to assign return values to instance variables (probably only useful when called by run_in_background) @param [Int] timeout Time in seconds to wait before giving up on a child process @param [Boolean] kill_all_on_error Whether to wait for all processes to complete, or fail immediately - killing all other forked processes - when one process errors.

# File lib/in_parallel.rb, line 104
def self.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false)
  raise_error = nil
  timeout     ||= @@parallel_default_timeout
  send_int = false
  trap(:INT) do
    # Can't use logger inside of trap
    puts "Warning, recieved interrupt.  Processing child results and exiting."
    send_int = true
    kill_child_processes
  end
  return unless Process.respond_to?(:fork)
  # Custom process to wait so that we can do things like time out, and kill child processes if
  # one process returns with an error before the others complete.
  results_map = Array.new(@@process_infos.count)
  start_time  = Time.now
  timer       = start_time
  while !@@process_infos.empty? do
    if @@parallel_signal_interval > 0 && Time.now > timer + @@parallel_signal_interval
      @@logger.debug 'Waiting for child processes.'
      timer = Time.now
    end
    if Time.now > start_time + timeout
      kill_child_processes
      raise_error = ::RuntimeError.new("Child process ran longer than timeout of #{timeout}")
    end

    if result = IO.select(@@process_infos.map {|p| p[:result]}, nil, nil, 0.5)
      read_ios = result.first
      read_ios.each do |reader|
        process_info = @@process_infos.find {|p| p[:result] == reader}
        process_info[:result_buffer] << reader.read
        if reader.eof?
          result = process_info[:result_buffer].string
          # the process completed, get the result and rethrow on error.
          begin
            # Print the STDOUT and STDERR for each process with signals for start and end
            @@logger.info "------ Begin output for #{process_info[:method_sym]} - #{process_info[:pid]}"
            # Content from the other thread will already be pre-pended with log stuff (info, warn, date/time, etc)
            # So don't use logger, just use puts.
            puts "  " + File.new(process_info[:std_out], 'r').readlines.join("  ")
            @@logger.info "------ Completed output for #{process_info[:method_sym]} - #{process_info[:pid]}"
            marshalled_result = (result.nil? || result.empty?) ? result : Marshal.load(result)
            # Kill all other processes and let them log their stdout before re-raising
            # if a child process raised an error.
            if marshalled_result.is_a?(Exception)
              raise_error = marshalled_result.dup
              kill_child_processes if kill_all_on_error
                marshalled_result = nil
            end
            results_map[process_info[:index]] = { process_info[:tmp_result] => marshalled_result }
          ensure
            File.delete(process_info[:std_out]) if File.exists?(process_info[:std_out])
            # close the read end pipe
            process_info[:result].close unless process_info[:result].closed?
            @@process_infos.delete(process_info)
          end
        end
      end
    end
  end

  results = []

  # pass in the 'self' from the block.binding which is the instance of the class
  # that contains the initial binding call.
  # This gives us access to the instance variables from that context.
  results = result_lookup(proxy, binding, results_map) if binding

  # If there are background_objs AND results, don't return the background obj results
  # (which would mess up expected results from each_in_parallel),
  # but do process their results in case they are assigned to instance variables
  @@background_objs.each { |obj| result_lookup(obj[:proxy], obj[:target], results_map) }
  @@background_objs.clear

  Process.kill("INT", Process.pid) if send_int
  raise raise_error unless raise_error.nil?
  return results
end

Private Class Methods

kill_child_processes() click to toggle source
# File lib/in_parallel.rb, line 254
def self.kill_child_processes
  @@process_infos.each do |process_info|
    # Send INT to each child process so it returns and can print stdout and stderr to console before exiting.
    begin
      Process.kill("INT", process_info[:pid])
    rescue Errno::ESRCH
      # If one of the other processes has completed in the very short time before we try to kill it, handle the exception
    end
  end
end
result_lookup(proxy_obj, target_obj, results_map) click to toggle source

Private method to lookup results from the results_map and replace the temp values with actual return values

# File lib/in_parallel.rb, line 287
def self.result_lookup(proxy_obj, target_obj, results_map)
  target_obj = eval('self', target_obj)
  proxy_obj  ||= target_obj
  vars       = proxy_obj.instance_variables
  results    = []
  results_map.each do |tmp_result|
    results << tmp_result.values[0]
    vars.each do |var|
      if proxy_obj.instance_variable_get(var) == tmp_result.keys[0]
        target_obj.instance_variable_set(var, tmp_result.values[0])
        break
      end
    end
  end
  results
end
strip_singleton(obj) click to toggle source
# File lib/in_parallel.rb, line 266
def self.strip_singleton(obj)
  unless (obj.nil? || obj.singleton_methods.empty?)
    obj = obj.dup
  end
  begin
    obj.singleton_class.class_eval do
      instance_variables.each { |v| instance_eval("remove_instance_variable(:#{v})") }
    end
  rescue TypeError # if no singleton_class exists for the object it raises a TypeError
  end

  # Recursively check any objects assigned to instance variables for singleton methods, or variables
  obj.instance_variables.each do |v|
    obj.instance_variable_set(v, strip_singleton(obj.instance_variable_get(v)))
  end
  obj
end