class JobDispatch::Broker

The broker is the central communications service of JobDispatch. Clients and Workers both connect to a ZeroMQ ROUTER socket. Clients and workers use a REQ socket, and send a request. The Broker sends a reply immediately or at some point in the future when it is appropriate (eg: when there) is a job to do for a worker, or when a job is completed for a client waiting on job notification).

A very simple representation of a worker and a message received from or to be sent to that worker. this represents job messages from the Broker's ROUTER socket perspective. ie: messages are wrapped by the worker ID so that they can be routed to the client.

The parameters object is expected to be a Hash with indifferent access

Constants

IdleWorker
POLL_TIME
STOP_SIGNALS
WORKER_IDLE_TIME

Attributes

job_subscribers[R]
jobs_in_progress[R]
jobs_in_progress_workers[R]
pub_socket[R]
queues[R]
queues_ready[R]
reply_exceptions[RW]
socket[R]

any object that will respond to `next_job_for_queue`, which should return a job, or nil if there are no jobs for that queue. The returned job should be a JSONable object that will be sent to the worker. This should include `target`, `action` and `parameters` keys.

status[R]
verbose[RW]
worker_names[R]
workers_waiting_for_jobs[R]
workers_waiting_for_reply[R]

Public Class Methods

new(worker_bind_address, wakeup_bind_address, publish_bind_address=nil) click to toggle source
# File lib/job_dispatch/broker.rb, line 37
def initialize(worker_bind_address, wakeup_bind_address, publish_bind_address=nil)
  @worker_bind_address = worker_bind_address
  @wakeup_bind_address = wakeup_bind_address
  @publish_bind_address = publish_bind_address

  # to track REQ-REP state:
  @workers_waiting_for_reply = [] # array of Symbol (worker id = zmq identity of worker)

  # to track jobs:
  @workers_waiting_for_jobs = {} # Hash of key: Identity(worker_id) value: IdleWorker
  @queues = Hash.new { |hash, key| hash[key] = Set.new } # key:queue name, value: Array of Identity of worker id
  @jobs_in_progress = {} # key: job_id, value: Job model object
  @jobs_in_progress_workers = {} #key: job_id, value: worker_id
  @worker_names = {} # Key: Symbol socket identity, value: String claimed name of worker
  @job_subscribers = {} # Key: job_id, value: list of Socket Identities waiting for job completion notifications.
  @queues_ready = {} # Key: Symbol queue name, value: bool ready?
  @status = "OK"
  @reply_exceptions = true

  queues[:default] # ensure the default queue exists.
end

Public Instance Methods

add_available_worker(command, idle_count=0) click to toggle source

add a worker to the list of workers available for jobs.

# File lib/job_dispatch/broker.rb, line 293
def add_available_worker(command, idle_count=0)
  JobDispatch.logger.info("JobDispatch::Broker Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'")

  # immediately remove any existing workers with the given name. If a worker has closed its connection and opened
  # a new one (perhaps it started a long time before the broker did)

  if command.worker_name # this is only sent on initial requests.
    remove_worker_named(command.worker_name)
  end

  queue = command.queue
  idle_worker = IdleWorker.new(command.worker_id, Time.now, queue, command.worker_name, idle_count)
  workers_waiting_for_jobs[command.worker_id] = idle_worker
  queues[queue] << command.worker_id
  queues_ready[queue] = true
  if command.worker_name # this is only sent on initial requests.
    worker_names[command.worker_id] = command.worker_name
  end
end
create_job(command) click to toggle source
# File lib/job_dispatch/broker.rb, line 511
def create_job(command)
  begin
    raise MissingParameterError, "Missing 'job' from command" unless command.parameters[:job].present?

    job_attrs = command.parameters[:job]
    job_attrs[:queue] ||= :default
    job = job_source.create!(job_attrs)
    queues_ready[job_attrs[:queue].to_sym] = true
    {status: 'success', job_id: job.id.to_s}
  rescue StandardError => e
    JobDispatch.logger.error "JobDispatch::Broker#create_job error: #{e}"
    JobDispatch.logger.error e.backtrace.join("\n")
    {status: 'error', message: e.to_s}
  end
end
dispatch_jobs_to_workers() click to toggle source
# File lib/job_dispatch/broker.rb, line 335
def dispatch_jobs_to_workers
  # dequeue jobs from database for each queue
  queues.each_pair do |queue, worker_ids|
    # we only need to check the database if there are available workers in that queue
    if worker_ids.count > 0 && queues_ready[queue]
      worker_id = worker_ids.first

      job = begin
        job_source.dequeue_job_for_queue(queue.to_s)
      rescue StandardError => e
        # Log any errors reported dequeuing jobs, and treat it as no jobs available. This could
        # be, for example, that the database is not contactable at this point in time.
        JobDispatch.logger.error "JobDispatch::Broker#dispatch_jobs_to_workers: #{e}"
        nil
      end

      if job
        JobDispatch.logger.info("JobDispatch::Broker dispatching job #{job.id} to worker #{worker_id.to_json}")
        send_job_to_worker(job, worker_id)

        job.expire_execution_at = Time.now + (job.timeout || Job::DEFAULT_EXECUTION_TIMEOUT)
        job.status = JobDispatch::Job::IN_PROGRESS
        job.save
        publish_job_status(job)
      else
        # no job. mark the queue as not ready so we don't repeatedly check for jobs in an empty queue.
        queues_ready[queue] = false
      end
    end
  end
end
expire_timed_out_jobs() click to toggle source
# File lib/job_dispatch/broker.rb, line 368
def expire_timed_out_jobs
  expired_job_ids = @jobs_in_progress.each_with_object([]) do |(job_id, job), expired|
    # check if job has timed out. If so, implement retry logic.
    expired << job_id if job.timed_out?
  end

  expired_job_ids.each do |job_id|
    job = jobs_in_progress.delete(job_id)
    @jobs_in_progress_workers.delete(job_id)
    if job.is_a? InternalJob
      # no action / publish required
    elsif job
      JobDispatch.logger.info("JobDispatch::Broker expiring job #{job_id} has timed out.")
      job.failed!("job timed out")
      publish_job_status(job)
    end
  end
end
fetch_job(command) click to toggle source
# File lib/job_dispatch/broker.rb, line 543
def fetch_job(command)
  begin
    raise "Missing parameter 'job_id'" unless command.parameters[:job_id]
    job = job_source.find(command.parameters[:job_id])
    raise "Job not found" unless job
    {status: 'success', job: json_for_job(job)}
  rescue StandardError => e
    JobDispatch.logger.error e.to_s
    JobDispatch.logger.error e.backtrace.join("\n")
    {status: 'error', message: e.to_s}
  end
end
handle_completed_job(command) click to toggle source
# File lib/job_dispatch/broker.rb, line 394
def handle_completed_job(command)
  # look up the job and process its completion.
  job_id = command.parameters[:job_id]
  if job_id
    job = jobs_in_progress.delete(job_id)
    jobs_in_progress_workers.delete(job_id)
    if job.is_a? InternalJob
      # no publish or save action required.
    else
      # ensure the job record is up to date. Also in mongo, lock time is reduced by doing a read before
      # doing an update.
      begin
        job = JobDispatch.config.job_class.find(job_id)
      rescue StandardError => e
        JobDispatch.logger.error("JobDispatch::Broker Job #{job_id} completed, but failed to reload from database: #{e}")
        job = nil
      end

      if job
        JobDispatch.logger.info(
            "JobDispatch::Broker completed job #{job_id} " \
            "from worker #{command.worker_id.to_json} " \
            "status = #{command.parameters[:status]}")
        if command.success?
          job.succeeded!(command.parameters[:result])
          publish_job_status(job)
        else
          job.failed!(command.parameters[:result])
          publish_job_status(job)
        end
      end
    end
  end
end
json_for_job(job) click to toggle source
# File lib/job_dispatch/broker.rb, line 440
def json_for_job(job)
  if job
    hash = if job.respond_to? :as_job_queue_item
             job.as_job_queue_item
           else
             job.as_json
           end.with_indifferent_access
    hash[:id] = hash[:id].to_s
    hash
  end
end
last_job(command) click to toggle source
# File lib/job_dispatch/broker.rb, line 527
def last_job(command)
  begin
    queue = command.parameters[:queue] || 'default'
    job = job_source.where(:queue => queue).last
    if job
      {status: 'success', job: json_for_job(job)}
    else
      {status: 'error', message: 'no last job'}
    end
  rescue StandardError => e
    JobDispatch.logger.error e.to_s
    JobDispatch.logger.error e.backtrace.join("\n")
    {status: 'error', message: e.to_s}
  end
end
process_command(command) click to toggle source
# File lib/job_dispatch/broker.rb, line 167
def process_command(command)
  # prepare for immediate reply
  reply = Broker::Command.new(command.worker_id)

  begin
    case command.command
      when "ready"
        # add to list of workers who are ready for work
        add_available_worker(command, 0)

        # don't reply, leaves worker blocked waiting for a job to do.
        reply = nil

      when "goodbye"
        reply.parameters = remove_available_worker(command)

      when "completed"
        #  process completed job.
        handle_completed_job(command)

        if command.worker_ready?
          # a completed job also means the worker is available for more work.
          add_available_worker(command, 1)
          reply = nil
        else
          reply.parameters = {:status => 'thanks'}
        end

      when "notify"
        # synchronous notification of job status.

        job_id = command.parameters[:job_id]
        raise MissingParameterError, "Missing 'job_id' parameter" unless job_id

        if jobs_in_progress[job_id]
          workers_waiting_for_reply << command.worker_id
          job_subscribers[job_id.to_s] ||= []
          job_subscribers[job_id.to_s] << command.worker_id
          reply = nil
        else
          job = job_source.find(job_id) # load job from storage and return to requester.
          reply.parameters = job_status_parameters(job)
        end


      when "touch"
        # perhaps this could also be processed of a PUB/SUB socket so that it doesn't require a synchronous
        # response to the worker...
        reply.parameters = touch_job(command)

      when "status"
        reply.parameters = status_response

      when "enqueue"
        reply.parameters = create_job(command)

      when "last"
        reply.parameters = last_job(command)

      when "fetch"
        reply.parameters = fetch_job(command)

      when "quit"
        process_quit
        reply.parameters = {:status => 'bye'}
        @running = false

      else
        # unknown command, reply with error immediately to fulfil REQ-REP state machine contract.
        reply.parameters = {:status => 'unknown command!'}
    end

  rescue RSpec::Expectations::ExpectationNotMetError
    raise # allow test exceptions through.
  rescue StandardError => e
    if reply_exceptions
      # all others reply over socket.
      JobDispatch.logger.error("JobDispatch::Broker #{e}")
      JobDispatch.logger.error e.backtrace.join("\n")
      reply.parameters = {:status => 'error', :message => e.to_s}
    else
      # used during testing to raise errors so that Rspec can catch them as a test failure.
      raise
    end
  end

  reply
end
process_messages(poller) click to toggle source
# File lib/job_dispatch/broker.rb, line 128
def process_messages(poller)
  # TODO: calculate the amount of time to sleep to wake up such that a scheduled event happens as close
  # as possible to the time it was supposed to happen. This could additionally mean that the POLL_TIME
  # could be arbitrarily large. As any communication with the broker will wake it immediately.
  poll_time = JobDispatch.config.broker_options.try(:[], :poll_time) || POLL_TIME
  poller.poll(poll_time)

  if @wake_socket && poller.readables.include?(@wake_socket)
    @wake_socket.recv # no message to process, just consume messages in order to wake the poller
  end

  if poller.readables.include?(socket.socket)
    command = read_command
    JobDispatch.logger.debug("JobDispatch::Broker received command: #{command.command}(#{command.parameters.inspect})")
    reply = process_command(command)
    send_command(reply) if reply
  end
end
process_quit() click to toggle source
# File lib/job_dispatch/broker.rb, line 429
def process_quit
  JobDispatch.logger.info("JobDispatch::Broker Sending quit message to idle workers")

  quit_params = {command: 'quit'}
  until workers_waiting_for_jobs.empty?
    worker_id, worker = workers_waiting_for_jobs.first
    send_job_to_worker(InternalJob.new('quit', worker.queue), worker_id)
  end
end
queues_with_available_workers() click to toggle source
# File lib/job_dispatch/broker.rb, line 387
def queues_with_available_workers
  @queues.each_with_object([]) do |(queue, workers), object|
    object << queue unless workers.nil? || workers.empty?
  end
end
read_command() click to toggle source

read a command from a worker. We will keep a 1:1 REQ-REP model with each worker so we need to track the state of the worker.

# File lib/job_dispatch/broker.rb, line 150
def read_command
  command = socket.read_command
  @workers_waiting_for_reply << command.worker_id
  command
end
remove_available_worker(command) click to toggle source

remove a worker from available list. Worker is shutting down or indicating that it will no longer be available for doing work.

# File lib/job_dispatch/broker.rb, line 315
def remove_available_worker(command)
  JobDispatch.logger.info("JobDispatch::Broker Removing Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'")

  # the goodbye command is sent by another socket connection, so the worker_id (socket identity) will
  # not match the socket actually waiting for work. Remove the worker by its name, not socket identity

  remove_worker_named(command.worker_name)
  {status: "see ya later"}
end
remove_worker_named(worker_name) click to toggle source
# File lib/job_dispatch/broker.rb, line 325
def remove_worker_named(worker_name)
  keys = worker_names.select { |id, name| name == worker_name }.keys
  keys.each do |worker_id|
    workers_waiting_for_reply.delete(worker_id) # socket will be closing, no need to send it anything.
    worker = workers_waiting_for_jobs.delete(worker_id)
    queues[worker.queue].delete(worker_id) if worker
    worker_names.delete(worker_id)
  end
end
run() click to toggle source
# File lib/job_dispatch/broker.rb, line 67
def run
  begin
    puts "JobDispatch::Broker running in process #{Process.pid}"
    JobDispatch.logger.info("JobDispatch::Broker running in process #{Process.pid}")
    @running = true
    @running_thread = Thread.current
    poller = ZMQ::Poller.new

    @socket = JobDispatch::Broker::Socket.new(@worker_bind_address)
    @socket.connect
    poller.register(@socket.poll_item)

    if @publish_bind_address
      @pub_socket = JobDispatch.context.socket(ZMQ::PUB)
      @pub_socket.bind(@publish_bind_address)
    end

    if @wakeup_bind_address
      JobDispatch.logger.info("JobDispatch::Broker signaller SUB socket bound to #{@wakeup_bind_address}")
      @wake_socket = JobDispatch.context.socket(ZMQ::SUB)
      @wake_socket.subscribe('')
      @wake_socket.bind(@wakeup_bind_address)
      poller.register(@wake_socket)
    end

    while running?
      begin
        process_messages(poller)
        dispatch_jobs_to_workers
        expire_timed_out_jobs
        send_idle_commands
      rescue SignalException => e
        signal_name = Signal.signame(e.signo)
        if STOP_SIGNALS.include?(signal_name)
          JobDispatch.logger.info("JobDispatch::Broker shutting down, due to #{signal_name} signal")
          puts "JobDispatch::Broker shutting down, due to #{signal_name} signal"
          @running = false
          @status = "SHUTDOWN"
          # sleep 1
          process_quit
          sleep 1 # let ZMQ send the messages before we close the socket.
        end
      rescue StandardError => e
        JobDispatch.logger.error "Unexpected exception: #{e}"
      end
    end
  ensure
    @socket.disconnect if @socket
    @socket = nil
  end
end
running?() click to toggle source
# File lib/job_dispatch/broker.rb, line 59
def running?
  @running
end
send_command(command) click to toggle source

send a command out the socket. Also maintains the state of the list of workers so that we can keep the REQ-REP contract.

# File lib/job_dispatch/broker.rb, line 159
def send_command(command)
  raise "Worker not waiting for reply" unless workers_waiting_for_reply.include?(command.worker_id)
  workers_waiting_for_reply.delete(command.worker_id)
  JobDispatch.logger.debug("JobDispatch::Broker sending command: #{command.inspect}")
  socket.send_command command
end
send_idle_commands(idle_time=nil) click to toggle source
# File lib/job_dispatch/broker.rb, line 256
def send_idle_commands(idle_time=nil)
  idle_time ||= Time.now
  idle_time -= WORKER_IDLE_TIME
  idle_workers = @workers_waiting_for_jobs.select { |worker_id, worker| worker.idle_since < idle_time || worker.idle_count == 0 }
  idle_workers.each do |worker_id, worker|
    send_job_to_worker(InternalJob.new('idle', worker.queue), worker_id)
    worker.idle_count += 1
  end
end
send_job_to_worker(job, worker_id) click to toggle source
# File lib/job_dispatch/broker.rb, line 267
def send_job_to_worker(job, worker_id)
  # remove from queue and idle workers lists.
  idle_worker = workers_waiting_for_jobs.delete(worker_id)
  queues[idle_worker.queue].delete(worker_id)

  # serialise job for json message
  hash = json_for_job(job)

  # use the job record id or assign a uuid as the job id
  job_id = job.id ? job.id.to_s : SecureRandom.uuid
  hash[:job_id] = job_id
  hash[:command] = 'job' unless job.is_a?(InternalJob)
  job_id = hash[:job_id] ||= SecureRandom.uuid

  # add to working lists
  jobs_in_progress[job_id] = job
  jobs_in_progress_workers[job_id] = worker_id

  # send the command.
  command = Broker::Command.new(worker_id, hash)
  JobDispatch.logger.info("JobDispatch::Broker Sending command '#{hash[:command]}' to worker: #{worker_id.to_json}")
  send_command(command)
end
status_response() click to toggle source
# File lib/job_dispatch/broker.rb, line 453
def status_response
  num_tcp_connections = `lsof -np #{Process.pid}`.split.select { |l| l=~ /TCP/ }.count

  response = {
      :status => status,
      :num_tcp_connections => num_tcp_connections,
      :queues => {}
  }

  queues.each_pair do |queue, _|
    response[:queues][queue.to_sym] = {}
  end

  jobs_in_progress.each_with_object(response[:queues]) do |(job_id, job), _queues|
    queue = job.queue.to_sym
    _queues[queue] ||= {}
    worker_id = jobs_in_progress_workers[job_id]
    _queues[queue][worker_id.to_hex] = {
        :status => :processing,
        :name => worker_names[worker_id],
        :job_id => job_id,
        :queue => job.queue,
        :job => json_for_job(job),
    }
  end

  workers_waiting_for_jobs.each_with_object(response[:queues]) do |(worker_id, worker), _queues|
    queue = worker.queue.to_sym
    _queues[queue] ||= {}
    _queues[queue][worker_id.to_hex] = {
        :status => :idle,
        :name => worker_names[worker_id],
        :queue => worker.queue,
    }
  end

  response
end
stop() click to toggle source
# File lib/job_dispatch/broker.rb, line 120
def stop
  if running?
    @running = false
    @running_thread.raise SignalException.new("TERM") unless Thread.current == @running_thread
  end
end
touch_job(command) click to toggle source

reset the timeout on the job. Called for a long process to confirm to the dispatcher that the worker is still actively working on the job and has not died.

@return [Hash] result to be sent to client.

# File lib/job_dispatch/broker.rb, line 496
def touch_job(command)
  job_id = command.parameters[:job_id]
  job = @jobs_in_progress[job_id]
  if job
    timeout = command.parameters[:timeout] || job.timeout || Job::DEFAULT_EXECUTION_TIMEOUT
    job.expire_execution_at = Time.now + timeout
    JobDispatch.logger.info("JobDispatch::Broker#touch timeout on job #{job_id} to #{job.expire_execution_at}")
    job.save
    {status: "success"}
  else
    JobDispatch.logger.info("JobDispatch::Broker#touch job #{job_id} not in progress.")
    {status: "error", message: "the specified job does not appear to be in progress"}
  end
end
verbose?() click to toggle source
# File lib/job_dispatch/broker.rb, line 63
def verbose?
  verbose
end

Private Instance Methods

job_source() click to toggle source
# File lib/job_dispatch/broker.rb, line 558
def job_source
  JobDispatch.config.job_class
end
job_status_parameters(job) click to toggle source
# File lib/job_dispatch/broker.rb, line 582
def job_status_parameters(job)
  {
      status: Job::STATUS_STRINGS[job.status] || 'unknown',
      job_id: job.id.to_s,
      job: json_for_job(job)
  }
end
publish_job_status(job) click to toggle source
# File lib/job_dispatch/broker.rb, line 562
def publish_job_status(job)
  parameters = job_status_parameters(job)

  if pub_socket
    # send as plain text so that ZMQ SUB filtering can be done on the job_id.
    # sent as two lines: job_id then LF then status.
    pub_socket.send("#{job.id}\n#{parameters[:status]}")
  end

  socket_ids = job_subscribers.delete(job.id.to_s)
  if socket_ids
    socket_ids.each do |socket_id|
      # send the command.
      command = Broker::Command.new(socket_id, parameters)
      JobDispatch.logger.info("JobDispatch::Broker Sending job notification for job id '#{job.id}' status = #{status} to socket: #{socket_id.to_json}")
      send_command(command)
    end
  end
end