module Enumerable::Parallelize
Public Class Methods
it(enum, num_threads, method, collect_exceptions) { |e, thread_idx| ... }
click to toggle source
# File lib/parallelize/enumerable_ext.rb, line 3 def self.it enum, num_threads, method, collect_exceptions, &block raise ArgumentError.new("Block not given") unless block_given? raise ArgumentError.new("Invalid number of threads") if num_threads < 1 threads = [] reap = lambda do |tidx| threads[tidx..-1].each do |t| t.raise Interrupt if t.alive? begin t.join rescue Exception nil end end end begin prev_trap = trap('INT') { Thread.current.raise Interrupt } count = enum.count{true} return [] if count == 0 enum.each_slice((count / num_threads.to_f).ceil) do |slice| threads << case block.arity when 2 Thread.new(slice, threads.length) { |my_slice, thread_idx| my_slice.send(method) { |e| yield e, thread_idx } } when 1 Thread.new(slice) { |my_slice| my_slice.send(method) { |e| yield e } } when 0, -1 raise ArgumentError.new("Invalid arity: #{block.arity}") if RUBY_VERSION !~ /^1.8\./ && block.arity == -1 Thread.new(slice) { |my_slice| my_slice.send(method) { yield } } else raise ArgumentError.new("Invalid arity: #{block.arity}") end end exceptions = {} threads.each_with_index do |thr, idx| begin thr.join rescue Exception => e if collect_exceptions exceptions[idx] = e nil else reap.call idx + 1 raise e end end end rescue Interrupt # Interrupts threads reap.call 0 raise ensure trap('INT', prev_trap) if prev_trap end unless exceptions.empty? raise ParallelException.new(exceptions) end if method == :each threads elsif method == :map threads.map(&:value).inject(:+) end end