class MapReduce::Reducer

Constants

TIMEOUT

Public Class Methods

new(opts = {}) click to toggle source
# File lib/map_reduce/reducer.rb, line 5
def initialize(opts = {})
  @masters         = opts[:masters] || [::MapReduce::DEFAULT_SOCKET]
  @connection_type = opts[:type]    || :em
  @task            = opts[:task]
end

Public Instance Methods

em_reduce(all = nil, &blk) click to toggle source
# File lib/map_reduce/reducer.rb, line 31
def em_reduce(all = nil, &blk)
  all ||= sockets.dup
  sock = all.sample
  if sock
    sock.send_request(["reduce", @task]) do |message|
      key, *values = message
      if key.nil?
        all.delete sock
      else
        blk.call(key, values)
      end

      em_reduce(all, &blk)
    end
  else
    blk.call([nil])
  end
end
reduce(&blk) click to toggle source
# File lib/map_reduce/reducer.rb, line 11
def reduce(&blk)
  if @connection_type == :em
    em_reduce(&blk)
  else
    sync_reduce(&blk)
  end
end
sync_reduce(&blk) click to toggle source
# File lib/map_reduce/reducer.rb, line 19
def sync_reduce(&blk)
  all = sockets.dup
  while sock = all.sample
    key, *values = sock.send_request(["reduce", @task])
    if key.nil?
      all.delete sock
    else
      blk.call(key, values)
    end
  end
end

Private Instance Methods

sockets() click to toggle source
# File lib/map_reduce/reducer.rb, line 51
def sockets
  @sockets ||= begin
    klass = if @connection_type == :sync
      EM::Protocols::Zmq2::ReqFiber
    else
      EM::Protocols::Zmq2::ReqCb
    end

    @masters.map do |sock|
      s = klass.new
      s.connect(sock)
      s
    end
  end
end