class JobDispatch::Worker::Socket
Attributes
item_class[R]
socket[R]
touch_socket[R]
Public Class Methods
new(connect_address, item_klass)
click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 14 def initialize(connect_address, item_klass) @socket = JobDispatch.context.socket(ZMQ::REQ) @socket.connect(connect_address) @touch_socket = JobDispatch.context.socket(ZMQ::DEALER) @touch_socket.connect(connect_address) @item_class = item_klass end
Public Instance Methods
ask_for_work(queue)
click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 26 def ask_for_work(queue) @socket.send(JSON.dump({command: 'ready', queue: queue, worker_name: identity})) end
close()
click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 34 def close if @socket @socket.close rescue nil @socket = nil end if @touch_socket @touch_socket.close rescue nil @touch_socket = nil end end
drain_touch_socket()
click to toggle source
drain any messages that may have been received on the touch socket.
# File lib/job_dispatch/worker/socket.rb, line 82 def drain_touch_socket loop do message = @touch_socket.recv_nonblock break if message.nil? end end
identity()
click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 45 def identity @identity ||= begin hostname = ::Socket.gethostname process = Process.pid thread = Thread.current.object_id.to_s(16) ['ruby', hostname, process, thread].join(':') end end
poll_item()
click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 22 def poll_item @poll_item ||= ZMQ::Pollitem(@socket, ZMQ::POLLIN) end
read_item()
click to toggle source
read an incoming message. The thread will block if there is no readable message.
@return [JobDispatch::Item] the item to be processed (or nil if there isn't a valid job)
# File lib/job_dispatch/worker/socket.rb, line 57 def read_item begin drain_touch_socket json = @socket.recv params = JSON.parse(json) case params["command"] when "job" item = item_class.new params["target"], params["method"], *params["parameters"] when "idle" item = item_class.new "JobDispatch", "idle" when "quit" puts "It's quittin' time!" Process.exit(0) else item = item_class.new "JobDispatch", "unknown_command", params end item.job_id = params["job_id"] rescue StandardError => e JobDispatch.logger.error "Failed to read message from worker socket: #{e}" nil end item end
send_goodbye(queue)
click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 30 def send_goodbye(queue) @socket.send(JSON.dump({command: 'goodbye', worker_name: identity})) end
send_response(job_id, status, result)
click to toggle source
after execution, send the response.
# File lib/job_dispatch/worker/socket.rb, line 90 def send_response(job_id, status, result) JobDispatch.logger.info "Worker #{Process.pid} completed job_id: #{job_id}: #{status}, result: #{result}" response = { command: 'completed', ready: true, job_id: job_id, result: result, status: status } @socket.send(JSON.dump(response)) end
send_touch(job_id, timeout=nil)
click to toggle source
# File lib/job_dispatch/worker/socket.rb, line 102 def send_touch(job_id, timeout=nil) hash = { command: 'touch', job_id: job_id } hash[:timeout] = timeout if timeout @touch_socket.send(JSON.dump(hash)) end