class MapReduce::Mapper
Public Class Methods
new(opts = {})
click to toggle source
# File lib/map_reduce/mapper.rb, line 3 def initialize(opts = {}) @masters = opts[:masters] || [::MapReduce::DEFAULT_SOCKET] @connection_type = opts[:type] || :em @task_name = opts[:task] @disconnected = {} end
Public Instance Methods
emit(key, value, &blk)
click to toggle source
# File lib/map_reduce/mapper.rb, line 10 def emit(key, value, &blk) raise MapReduce::Exceptions::BlankKey, "Key can't be nil" if key.nil? sock = pick_master(key) sock.send_request(["map", key, value, @task_name]) do |res| if res @disconnected.delete(sock) if @disconnected[sock] if blk blk.call(res) else return res end else @disconnected[sock] = true emit(key, value, &blk) end end end
Also aliased as: map
wait_for_all(&blk)
click to toggle source
# File lib/map_reduce/mapper.rb, line 30 def wait_for_all(&blk) finished = Hash[socket.map{ |s| [s, false] }] sockets.each do |sock| sock.send_request(["map_finished", @task_name]) do |message| finished[sock] = message[0] == "ok" if finished.all?{ |k,v| v } if block_given? blk.call else return end else after(1) do wait_for_all(&blk) end end end end end
Private Instance Methods
after(sec) { || ... }
click to toggle source
# File lib/map_reduce/mapper.rb, line 52 def after(sec) klass = if @connection_type == :sync EM::Synchrony else EM end klass.add_timer(sec) do yield end end
pick_master(key)
click to toggle source
# File lib/map_reduce/mapper.rb, line 64 def pick_master(key) num = Digest::MD5.hexdigest(key.to_s).to_i(16) % sockets.size sock = sockets[num] # LOL :) if @disconnected[sock] && rand(10) != 0 pick_master(key.to_s.chars.to_a.shuffle.join) else sock end end
sockets()
click to toggle source
# File lib/map_reduce/mapper.rb, line 75 def sockets @sockets ||= begin klass = if @connection_type == :sync EM::Protocols::Zmq2::ReqFiber else EM::Protocols::Zmq2::ReqCb end @masters.map do |sock| s = klass.new s.connect(sock) s end end end