class JobDispatch::Worker::Socket

Attributes

item_class[R]
socket[R]
touch_socket[R]

Public Class Methods

new(connect_address, item_klass) click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 14
def initialize(connect_address, item_klass)
  @socket = JobDispatch.context.socket(ZMQ::REQ)
  @socket.connect(connect_address)
  @touch_socket = JobDispatch.context.socket(ZMQ::DEALER)
  @touch_socket.connect(connect_address)
  @item_class = item_klass
end

Public Instance Methods

ask_for_work(queue) click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 26
def ask_for_work(queue)
  @socket.send(JSON.dump({command: 'ready', queue: queue, worker_name: identity}))
end
close() click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 34
def close
  if @socket
    @socket.close rescue nil
    @socket = nil
  end
  if @touch_socket
    @touch_socket.close rescue nil
    @touch_socket = nil
  end
end
drain_touch_socket() click to toggle source

drain any messages that may have been received on the touch socket.

# File lib/job_dispatch/worker/socket.rb, line 82
def drain_touch_socket
  loop do
    message = @touch_socket.recv_nonblock
    break if message.nil?
  end
end
identity() click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 45
def identity
  @identity ||= begin
    hostname = ::Socket.gethostname
    process = Process.pid
    thread = Thread.current.object_id.to_s(16)
    ['ruby', hostname, process, thread].join(':')
  end
end
poll_item() click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 22
def poll_item
  @poll_item ||= ZMQ::Pollitem(@socket, ZMQ::POLLIN)
end
read_item() click to toggle source

read an incoming message. The thread will block if there is no readable message.

@return [JobDispatch::Item] the item to be processed (or nil if there isn't a valid job)

# File lib/job_dispatch/worker/socket.rb, line 57
def read_item
  begin
    drain_touch_socket
    json = @socket.recv
    params = JSON.parse(json)
    case params["command"]
      when "job"
        item = item_class.new params["target"], params["method"], *params["parameters"]
      when "idle"
        item = item_class.new "JobDispatch", "idle"
      when "quit"
        puts "It's quittin' time!"
        Process.exit(0)
      else
        item = item_class.new "JobDispatch", "unknown_command", params
    end
    item.job_id = params["job_id"]
  rescue StandardError => e
    JobDispatch.logger.error "Failed to read message from worker socket: #{e}"
    nil
  end
  item
end
send_goodbye(queue) click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 30
def send_goodbye(queue)
  @socket.send(JSON.dump({command: 'goodbye', worker_name: identity}))
end
send_response(job_id, status, result) click to toggle source

after execution, send the response.

# File lib/job_dispatch/worker/socket.rb, line 90
def send_response(job_id, status, result)
  JobDispatch.logger.info "Worker #{Process.pid} completed job_id: #{job_id}: #{status}, result: #{result}"
  response = {
      command: 'completed',
      ready: true,
      job_id: job_id,
      result: result,
      status: status
  }
  @socket.send(JSON.dump(response))
end
send_touch(job_id, timeout=nil) click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 102
def send_touch(job_id, timeout=nil)
  hash = {
      command: 'touch',
      job_id: job_id
  }
  hash[:timeout] = timeout if timeout
  @touch_socket.send(JSON.dump(hash))
end