class MapReduce::Master

Public Class Methods

new(opts = {}) click to toggle source
# File lib/map_reduce/master.rb, line 5
def initialize(opts = {})
  @socket_addr = opts[:socket]     || ::MapReduce::DEFAULT_SOCKET
  @log_folder  = opts[:log_folder] || "/tmp/map_reduce"
  @delimiter   = opts[:delimiter]  || "\t"

  @tasks = {}
end

Public Instance Methods

after_map(&blk) click to toggle source
# File lib/map_reduce/master.rb, line 23
def after_map(&blk)
  @after_map = blk
end
after_reduce(&blk) click to toggle source
# File lib/map_reduce/master.rb, line 27
def after_reduce(&blk)
  @after_reduce = blk
end
recieve_msg(message, envelope) click to toggle source
# File lib/map_reduce/master.rb, line 31
def recieve_msg(message, envelope)
  mtype = case message[0]
  when "map"
    store_map(message, envelope)
  when "map_finished"
    all_finished?(message, envelope)
  when "reduce"
    send_reduce(message, envelope)
  else
    MapReduce.logger.error("Wrong message type: #{mtype}")
  end
end
run() click to toggle source
# File lib/map_reduce/master.rb, line 13
def run
  EM.run do
    socket
  end
end
stop() click to toggle source
# File lib/map_reduce/master.rb, line 19
def stop
  EM.stop
end

Private Instance Methods

all_finished?(message, envelope) click to toggle source
# File lib/map_reduce/master.rb, line 75
def all_finished?(message, envelope)
  status, task = message
  register(task, envelope, "mapper", status)
  if @tasks[task]["mapper"].all?{ |k,v| v == status }
    ok(envelope)
  else
    no(envelope)
  end
end
map_log(task) click to toggle source
# File lib/map_reduce/master.rb, line 85
def map_log(task)
  @map_log ||= {}
  @map_log[task] ||= MapReduce::MapLog.new(@log_folder, task)
end
np(envelope) click to toggle source
# File lib/map_reduce/master.rb, line 101
def np(envelope)
  reply(["not ok"], envelope)
end
ok(envelope) click to toggle source
# File lib/map_reduce/master.rb, line 97
def ok(envelope)
  reply(["ok"], envelope)
end
reduce_log(task, force = false) click to toggle source
# File lib/map_reduce/master.rb, line 90
def reduce_log(task, force = false)
  @reduce_log ||= {}
  @reduce_log[task] ||= MapReduce::ReduceLog.new(map_log(task), @delimiter)
  @reduce_log[task].force  if force
  @reduce_log[task]
end
register(task, envelope, type, status) click to toggle source
# File lib/map_reduce/master.rb, line 109
def register(task, envelope, type, status)
  @tasks[task] ||= {}
  @tasks[task][type] ||= {}
  @tasks[task][type][envelope[0]] = status
end
reply(resp, envelope) click to toggle source
# File lib/map_reduce/master.rb, line 105
def reply(resp, envelope)
  socket.send_reply(resp, envelope)
end
send_reduce(message, envelope) click to toggle source
# File lib/map_reduce/master.rb, line 55
def send_reduce(message, envelope)
  status, task = message

  data = if @tasks.fetch(task, {}).fetch("reducer", {}).fetch(envelope[0], nil) == "reduce"
    reduce_log(task).get_data
  else
    reduce_log(task, true).get_data
  end

  if data
    register(task, envelope, "reducer", status)
  else
    register(task, envelope, "reducer", "reduce_finished")
  end

  reply(data, envelope)

  @after_reduce.call(data[0], data[1], task)  if data && @after_reduce
end
socket() click to toggle source
# File lib/map_reduce/master.rb, line 115
def socket
  @socket ||= begin
    master = self
    sock = MapReduce::Socket::Master.new(self)
    sock.bind(@socket_addr)
    sock
  end
end
store_map(message, envelope) click to toggle source
# File lib/map_reduce/master.rb, line 46
def store_map(message, envelope)
  status, key, value, task = message
  map_log(task) << "#{key}#{@delimiter}#{value}"
  ok(envelope)
  register(task, envelope, "mapper", status)

  @after_map.call(key, value, task)  if @after_map
end