class SimpleFuture
A container holding the (eventual) result of a forked child process once that process finishes. The child process executes the code block that must be passed to the constructor:
sf = SimpleFuture.new { do_slow_thing } ... do stuff ... use(sf.value)
The code block must return a value that can be encoded by `Marshal` and **must not** exit prematurely.
Exceptions thrown inside the block will trigger a `SimpleFuture::ChildError` in the parent process but that exception will contain the original in its `cause` field.
Public Class Methods
Test if all instances created so far have run to completion. As a side effect, it will also call `wait` on instances whose child processes are running but have finished (i.e. their `check_if_ready` would return true.) This lets you use it as a non-blocking way to clean up the remaining children.
# File lib/simple-future.rb, line 206 def self.all_done? @@in_progress.select!{ |sp| !sp.check_if_ready } return @@in_progress.size == 0 end
Return the maximum number of concurrent child processes allowed.
# File lib/simple-future.rb, line 190 def self.max_tasks() return @@max_tasks; end
Set the maximum number of concurrent child processes allowed. If set to less than 1, it is interpreted as meaning no limit.
It is initially set to the number of available cores as provided by the `Etc` module.
# File lib/simple-future.rb, line 197 def self.max_tasks=(value) @@max_tasks = value end
In addition to creating a new `SimpleFuture`, the constructor creates a child process and evaluates `action` in it. If the maximum number of child processes would be exceeded, it will block until a process finishes.
# File lib/simple-future.rb, line 88 def initialize(&action) @readPipe = nil @pid = nil @complete = false @result = nil self.class.all_done? # Reclaim all completed children block_until_clear() launch(action) end
Wait until all child processes have run to completion and recover their results. Programs should call this before exiting if there is a chance that an instance was created without having `wait` called on it.
# File lib/simple-future.rb, line 215 def self.wait_for_all @@in_progress.each{|sp| sp.wait} @@in_progress = [] return end
Public Instance Methods
Check if the child process has finished evaluating the block and has a result ready. If `check_if_ready` returns `true`, `wait` will not block when called.
Note: `check_if_ready` tests if there's data on the pipe to the child process to see if it has finished. A sufficiently evil child block might be able to cause a true result while still blocking `wait`.
Don't do that.
@return [Boolean]
# File lib/simple-future.rb, line 181 def check_if_ready return true if complete? return false unless @readPipe.ready? wait return true end
Test if the child process has finished and its result is available.
Note that this will only be true after a call to `wait` (i.e. the child process finished and its result has been retrieved.) If you want to see if the result is (probably) available, use `check_if_ready`.
# File lib/simple-future.rb, line 106 def complete?() return @complete; end
Return the result of the child process, blocking if it is not yet available. Blocking is done by calling `wait`, so the process will be cleaned up.
# File lib/simple-future.rb, line 111 def value wait return @result end
Block until the child process finishes, recover its result and clean up the process. `wait` must be called for each `SimpleFuture` to prevent zombie processes. In practice, this is rarely a problem since `value` calls `wait` and you usually want to get all of the values. See `wait_for_all`.
It is safe to call `wait` multiple times on a `SimpleFuture`.
@raise [ChildError] The child process raised an uncaught exception. @raise [ResultTypeError] Marshal cannot encode the result @raise [Error] An error occurred in the IPC system or child process.
# File lib/simple-future.rb, line 127 def wait # Quit if the child has already exited return if complete? # Read the contents; this may block data = @readPipe.read # Reap the child process; this shouldn't block for long Process.wait(@pid) # And now we're complete, regardless of what happens next. (We # set it early so that errors later on won't allow waiting again # and associated mystery errors.) @complete = true # Close and discard the pipe; we're done with it @readPipe.close @readPipe = nil # If the child process exited badly, this is an error raise Error.new("Error in child process #{@pid}!") unless $?.exitstatus == 0 && !data.empty? # Decode the result. If it's an exception object, that's the # error that was thrown in the child and that means an error here # as well. rbox = Marshal.load(data) raise rbox if rbox.is_a? ResultTypeError raise ChildError.new("Child process failed with an exception.", rbox) if rbox.is_a? Exception # Ensure rbox is a ResultContainer. This *probably* can't happen. raise Error.new("Invalid result object type: #{rbox.class}") unless rbox.is_a? ResultContainer # Aaaaaand, retrieve the value. @result = rbox.value return # return nil end
Private Instance Methods
If we're currently at maximum allowed processes, wait until the oldest of them finishes. (TO DO: if possible, make it wait until any process exits.)
# File lib/simple-future.rb, line 263 def block_until_clear return unless @@max_tasks > 0 && @@in_progress.size >= @@max_tasks @@in_progress.shift.wait() end
Create a forked child process connected to this one with a pipe, eval `action` and return the marshalled result (or exception, in case of an error) via the pipe. Results are wrapped in a `ResultContainer` so that the parent can distinguish between exceptions and legitimately returned Exception objects.
# File lib/simple-future.rb, line 228 def launch(action) @readPipe, writePipe = IO.pipe @pid = Process.fork do @readPipe.close() result = nil begin result = ResultContainer.new( action.call() ) rescue Exception => e result = e end rs = nil begin rs = Marshal.dump(result) rescue TypeError => e rv = result rv = rv.value if rv.class == ResultContainer rs = Marshal.dump(ResultTypeError.new("Type #{rv.class} " + "cannot be dumped.")) end writePipe.write(rs) writePipe.close exit!(0) end writePipe.close @@in_progress.push self end