module DRbQueue

Constants

ConfiguredAfterStarted
VERSION

Attributes

pid[R]
server[R]
started[R]
started?[R]

Public Instance Methods

configure() { |configuration| ... } click to toggle source
# File lib/drb_queue.rb, line 49
def configure
  raise ConfiguredAfterStarted, "You must configure #{self.name} BEFORE starting the server" if started?

  synchronize { yield configuration }
end
connect_client!() click to toggle source
# File lib/drb_queue.rb, line 77
def connect_client!
  synchronize do
    tries = 0

    begin
      @server = DRbObject.new_with_uri(server_uri)
      @server.ping
    rescue DRb::DRbConnError => e
      raise Server::UnableToStart.new("Couldn't start up the queue server", e) if tries > 4
      sleep 0.1 * (2 ** tries)
      tries += 1
      retry
    end
  end
end
enqueue(worker, *args) click to toggle source
# File lib/drb_queue.rb, line 22
def enqueue(worker, *args)
  raise Server::NotStarted, "You must start the server first" unless started?
  raise ArgumentError, "#{worker} is not a module" unless worker.is_a?(Module)
  raise ArgumentError, "#{worker} does not respond to perform" unless worker.respond_to?(:perform)

  server.enqueue(worker, *args)
end
shutdown!(immediately = false) click to toggle source
# File lib/drb_queue.rb, line 55
def shutdown!(immediately = false)
  return unless started?

  synchronize do
    return unless started?

    Process.kill(immediately ? 'KILL' : 'TERM', pid)

    begin
      ::Timeout.timeout(20) { Process.wait }
    rescue Timeout::Error
      Process.kill('KILL', pid)
      Process.wait
      logger.error("#{self}: forced shutdown")
    ensure
      cleanup_socket
      @started = false
      @pid = nil
    end
  end
end
start!() click to toggle source
# File lib/drb_queue.rb, line 30
def start!
  raise Server::AlreadyStarted, "The server is already started" if started?

  synchronize do
    return if started?

    @pid = fork_server
    at_exit { shutdown! }
    @started = true

    begin
      connect_client!
    rescue => e
      shutdown!
      raise
    end
  end
end

Private Instance Methods

cleanup_socket() click to toggle source
# File lib/drb_queue.rb, line 137
def cleanup_socket
  FileUtils.rm(socket_location) if File.exist?(socket_location)
end
configuration() click to toggle source
# File lib/drb_queue.rb, line 124
def configuration
  @configuration ||= Configuration.new
end
execute_after_fork_callbacks() click to toggle source
# File lib/drb_queue.rb, line 120
def execute_after_fork_callbacks
  after_fork_callbacks.each(&:call)
end
execute_before_fork_callbacks() click to toggle source
# File lib/drb_queue.rb, line 116
def execute_before_fork_callbacks
  before_fork_callbacks.each(&:call)
end
fork_server() click to toggle source
# File lib/drb_queue.rb, line 96
def fork_server
  cleanup_socket

  execute_before_fork_callbacks

  fork do
    execute_after_fork_callbacks

    server = Server.new(configuration)
    DRb.start_service(server_uri, server)

    shutting_down = false
    trap('TERM') { shutting_down = true }
    sleep 0.1 until shutting_down

    server.shutdown!
    DRb.stop_service
  end
end
synchronization_monitor() click to toggle source
# File lib/drb_queue.rb, line 133
def synchronization_monitor
  @synchronization_monitor ||= Monitor.new
end
synchronize(&block) click to toggle source
# File lib/drb_queue.rb, line 129
def synchronize(&block)
  synchronization_monitor.synchronize(&block)
end