class JobDispatch::Worker
This class is the main worker loop. Run it as a whole process or just as a thread in a multi-threaded worker process.
Constants
- IDLE_COUNT
- IDLE_TIME
Attributes
item_class[R]
queue[R]
socket[R]
Public Class Methods
new(connect_address, options={})
click to toggle source
# File lib/job_dispatch/worker.rb, line 21 def initialize(connect_address, options={}) options ||= {} @connect_address = connect_address @queue = options[:queue] || 'default' @running = false @item_class = options[:item_class] || Worker::Item end
touch(timeout=nil)
click to toggle source
# File lib/job_dispatch/worker.rb, line 100 def self.touch(timeout=nil) sock = Thread.current["JobDispatch::Worker.socket"] job_id = Thread.current["JobDispatch::Worker.job_id"] if sock && job_id sock.send_touch(job_id, timeout) JobDispatch.logger.debug { "touching job #{job_id}"} end end
Public Instance Methods
ask_for_work()
click to toggle source
# File lib/job_dispatch/worker.rb, line 81 def ask_for_work socket.ask_for_work(queue) end
connect()
click to toggle source
# File lib/job_dispatch/worker.rb, line 29 def connect @socket ||= Worker::Socket.new(@connect_address, item_class) Thread.current["JobDispatch::Worker.socket"] = @socket end
disconnect()
click to toggle source
# File lib/job_dispatch/worker.rb, line 34 def disconnect if @socket @socket.close @socket = nil Thread.current["JobDispatch::Worker.socket"] = nil end end
run()
click to toggle source
# File lib/job_dispatch/worker.rb, line 42 def run @running = true @running_thread = Thread.current while running? # puts "connecting" connect # puts "asking for work" ask_for_work rescue StopError # if we are idle for too many times, the broker has restarted or gone away, and we will be stuck in receive # state, so we need to close the socket and make a new one to ask for work again. idle_count = 0 poller = ZMQ::Poller.new poller.register(socket.poll_item) while running? and idle_count < IDLE_COUNT begin poller.poll(IDLE_TIME) if poller.readables.include?(socket.socket) process idle_count = 0 else idle idle_count += 1 end rescue Interrupt, StopError JobDispatch.logger.info("Worker stopping.") stop disconnect # Tell the broker goodbye so that we are removed from the idle worker list and no more jobs will come here. connect send_goodbye sleep(0.1) # let the socket send the message before we disconnect... end end disconnect end end
running?()
click to toggle source
# File lib/job_dispatch/worker.rb, line 89 def running? @running end
send_goodbye()
click to toggle source
# File lib/job_dispatch/worker.rb, line 85 def send_goodbye socket.send_goodbye(queue) end
stop()
click to toggle source
# File lib/job_dispatch/worker.rb, line 93 def stop if running? @running_thread.raise StopError unless @running_thread == Thread.current @running = false end end
Private Instance Methods
idle()
click to toggle source
# File lib/job_dispatch/worker.rb, line 122 def idle # puts "waiting for job to do…" end
process()
click to toggle source
called when the socket is readable. do some work.
# File lib/job_dispatch/worker.rb, line 112 def process item = @socket.read_item if item item.execute @socket.send_response(item.job_id, item.status, item.result) else @socket.send_response("unknown", :error, "failed to decode command") end end