class JobDispatch::Worker

This class is the main worker loop. Run it as a whole process or just as a thread in a multi-threaded worker process.

Constants

IDLE_COUNT
IDLE_TIME

Attributes

item_class[R]
queue[R]
socket[R]

Public Class Methods

new(connect_address, options={}) click to toggle source
# File lib/job_dispatch/worker.rb, line 21
def initialize(connect_address, options={})
  options ||= {}
  @connect_address = connect_address
  @queue = options[:queue] || 'default'
  @running = false
  @item_class = options[:item_class] || Worker::Item
end
touch(timeout=nil) click to toggle source
# File lib/job_dispatch/worker.rb, line 100
def self.touch(timeout=nil)
  sock = Thread.current["JobDispatch::Worker.socket"]
  job_id = Thread.current["JobDispatch::Worker.job_id"]
  if sock && job_id
    sock.send_touch(job_id, timeout)
    JobDispatch.logger.debug { "touching job #{job_id}"}
  end
end

Public Instance Methods

ask_for_work() click to toggle source
# File lib/job_dispatch/worker.rb, line 81
def ask_for_work
  socket.ask_for_work(queue)
end
connect() click to toggle source
# File lib/job_dispatch/worker.rb, line 29
def connect
  @socket ||= Worker::Socket.new(@connect_address, item_class)
  Thread.current["JobDispatch::Worker.socket"] = @socket
end
disconnect() click to toggle source
# File lib/job_dispatch/worker.rb, line 34
def disconnect
  if @socket
    @socket.close
    @socket = nil
    Thread.current["JobDispatch::Worker.socket"] = nil
  end
end
run() click to toggle source
# File lib/job_dispatch/worker.rb, line 42
def run
  @running = true
  @running_thread = Thread.current
  while running?
    # puts "connecting"
    connect
    # puts "asking for work"
    ask_for_work rescue StopError

    # if we are idle for too many times, the broker has restarted or gone away, and we will be stuck in receive
    # state, so we need to close the socket and make a new one to ask for work again.

    idle_count = 0
    poller = ZMQ::Poller.new
    poller.register(socket.poll_item)
    while running? and idle_count < IDLE_COUNT
      begin
        poller.poll(IDLE_TIME)
        if poller.readables.include?(socket.socket)
          process
          idle_count = 0
        else
          idle
          idle_count += 1
        end
      rescue Interrupt, StopError
        JobDispatch.logger.info("Worker stopping.")
        stop
        disconnect
        # Tell the broker goodbye so that we are removed from the idle worker list and no more jobs will come here.
        connect
        send_goodbye
        sleep(0.1) # let the socket send the message before we disconnect...
      end
    end
    disconnect
  end
end
running?() click to toggle source
# File lib/job_dispatch/worker.rb, line 89
def running?
  @running
end
send_goodbye() click to toggle source
# File lib/job_dispatch/worker.rb, line 85
def send_goodbye
  socket.send_goodbye(queue)
end
stop() click to toggle source
# File lib/job_dispatch/worker.rb, line 93
def stop
  if running?
    @running_thread.raise StopError unless @running_thread == Thread.current
    @running = false
  end
end

Private Instance Methods

idle() click to toggle source
# File lib/job_dispatch/worker.rb, line 122
def idle
  # puts "waiting for job to do…"
end
process() click to toggle source

called when the socket is readable. do some work.

# File lib/job_dispatch/worker.rb, line 112
def process
  item = @socket.read_item
  if item
    item.execute
    @socket.send_response(item.job_id, item.status, item.result)
  else
    @socket.send_response("unknown", :error, "failed to decode command")
  end
end