class MapReduce::Reducer
Constants
- TIMEOUT
Public Class Methods
new(opts = {})
click to toggle source
# File lib/map_reduce/reducer.rb, line 5 def initialize(opts = {}) @masters = opts[:masters] || [::MapReduce::DEFAULT_SOCKET] @connection_type = opts[:type] || :em @task = opts[:task] end
Public Instance Methods
em_reduce(all = nil, &blk)
click to toggle source
# File lib/map_reduce/reducer.rb, line 31 def em_reduce(all = nil, &blk) all ||= sockets.dup sock = all.sample if sock sock.send_request(["reduce", @task]) do |message| key, *values = message if key.nil? all.delete sock else blk.call(key, values) end em_reduce(all, &blk) end else blk.call([nil]) end end
reduce(&blk)
click to toggle source
# File lib/map_reduce/reducer.rb, line 11 def reduce(&blk) if @connection_type == :em em_reduce(&blk) else sync_reduce(&blk) end end
sync_reduce(&blk)
click to toggle source
# File lib/map_reduce/reducer.rb, line 19 def sync_reduce(&blk) all = sockets.dup while sock = all.sample key, *values = sock.send_request(["reduce", @task]) if key.nil? all.delete sock else blk.call(key, values) end end end
Private Instance Methods
sockets()
click to toggle source
# File lib/map_reduce/reducer.rb, line 51 def sockets @sockets ||= begin klass = if @connection_type == :sync EM::Protocols::Zmq2::ReqFiber else EM::Protocols::Zmq2::ReqCb end @masters.map do |sock| s = klass.new s.connect(sock) s end end end