class DRbQueue::Server

Constants

AlreadyStarted
NotStarted

Attributes

error_handler[RW]
immediate[RW]
immediate?[RW]
logger[RW]
queue[RW]
running[RW]
running?[RW]
store[RW]

Public Class Methods

new(configuration) click to toggle source
# File lib/drb_queue/server.rb, line 34
def initialize(configuration)
  [:logger, :error_handler, :immediate].each do |p|
    __send__("#{p}=", configuration.__send__(p))
  end

  self.queue = Queue.new
  self.running = true

  self.store = configuration.construct_persistence_store

  start_workers(configuration.num_workers)

  if store
    store.each_persisted_work do |serialized_work|
      enqueue_work(Work.unserialize(serialized_work))
    end
  end
end

Public Instance Methods

enqueue(worker, *args) click to toggle source
# File lib/drb_queue/server.rb, line 53
def enqueue(worker, *args)
  uuid.generate.tap do |id|
    enqueue_work(Work.new(worker, args))
  end
end
ping() click to toggle source
# File lib/drb_queue/server.rb, line 63
def ping
  'pong'
end
shutdown!() click to toggle source
# File lib/drb_queue/server.rb, line 67
def shutdown!
  self.running = false

  if store
    begin
      while work = queue.pop(:dont_block)
        store.persist(work)
      end
    rescue ThreadError => e
    rescue => e
      error_handler.call(e)
    end
  elsif queue.size > 0
    logger.error("Queue is non-empty and we're shutting down...probably better to configure a persistence store\n")
  end

  workers.each(&:join)
end
uuid() click to toggle source
# File lib/drb_queue/server.rb, line 59
def uuid
  @uuid ||= UUID.new
end

Private Instance Methods

enqueue_work(work) click to toggle source
# File lib/drb_queue/server.rb, line 91
def enqueue_work(work)
  if immediate?
    work.perform
  else
    queue << work
  end
end
start_worker() click to toggle source
# File lib/drb_queue/server.rb, line 107
def start_worker
  thread = Thread.new do
    loop do
      begin
        break unless running?

        work = queue.pop(:non_blocking)
        work.perform
      rescue ThreadError => e
        sleep 0.05
      rescue => e
        error_handler.call(e)
        start_worker
        break
      end
    end

    workers.delete(thread)
  end

  workers << thread
end
start_workers(num) click to toggle source
# File lib/drb_queue/server.rb, line 99
def start_workers(num)
  num.times.map { start_worker }
end
workers() click to toggle source
# File lib/drb_queue/server.rb, line 103
def workers
  @workers ||= []
end