module DRbQueue
Constants
- ConfiguredAfterStarted
- VERSION
Attributes
pid[R]
server[R]
started[R]
started?[R]
Public Instance Methods
configure() { |configuration| ... }
click to toggle source
# File lib/drb_queue.rb, line 49 def configure raise ConfiguredAfterStarted, "You must configure #{self.name} BEFORE starting the server" if started? synchronize { yield configuration } end
connect_client!()
click to toggle source
# File lib/drb_queue.rb, line 77 def connect_client! synchronize do tries = 0 begin @server = DRbObject.new_with_uri(server_uri) @server.ping rescue DRb::DRbConnError => e raise Server::UnableToStart.new("Couldn't start up the queue server", e) if tries > 4 sleep 0.1 * (2 ** tries) tries += 1 retry end end end
enqueue(worker, *args)
click to toggle source
# File lib/drb_queue.rb, line 22 def enqueue(worker, *args) raise Server::NotStarted, "You must start the server first" unless started? raise ArgumentError, "#{worker} is not a module" unless worker.is_a?(Module) raise ArgumentError, "#{worker} does not respond to perform" unless worker.respond_to?(:perform) server.enqueue(worker, *args) end
shutdown!(immediately = false)
click to toggle source
# File lib/drb_queue.rb, line 55 def shutdown!(immediately = false) return unless started? synchronize do return unless started? Process.kill(immediately ? 'KILL' : 'TERM', pid) begin ::Timeout.timeout(20) { Process.wait } rescue Timeout::Error Process.kill('KILL', pid) Process.wait logger.error("#{self}: forced shutdown") ensure cleanup_socket @started = false @pid = nil end end end
start!()
click to toggle source
# File lib/drb_queue.rb, line 30 def start! raise Server::AlreadyStarted, "The server is already started" if started? synchronize do return if started? @pid = fork_server at_exit { shutdown! } @started = true begin connect_client! rescue => e shutdown! raise end end end
Private Instance Methods
cleanup_socket()
click to toggle source
# File lib/drb_queue.rb, line 137 def cleanup_socket FileUtils.rm(socket_location) if File.exist?(socket_location) end
configuration()
click to toggle source
# File lib/drb_queue.rb, line 124 def configuration @configuration ||= Configuration.new end
execute_after_fork_callbacks()
click to toggle source
# File lib/drb_queue.rb, line 120 def execute_after_fork_callbacks after_fork_callbacks.each(&:call) end
execute_before_fork_callbacks()
click to toggle source
# File lib/drb_queue.rb, line 116 def execute_before_fork_callbacks before_fork_callbacks.each(&:call) end
fork_server()
click to toggle source
# File lib/drb_queue.rb, line 96 def fork_server cleanup_socket execute_before_fork_callbacks fork do execute_after_fork_callbacks server = Server.new(configuration) DRb.start_service(server_uri, server) shutting_down = false trap('TERM') { shutting_down = true } sleep 0.1 until shutting_down server.shutdown! DRb.stop_service end end
synchronization_monitor()
click to toggle source
# File lib/drb_queue.rb, line 133 def synchronization_monitor @synchronization_monitor ||= Monitor.new end
synchronize(&block)
click to toggle source
# File lib/drb_queue.rb, line 129 def synchronize(&block) synchronization_monitor.synchronize(&block) end