class CZTop::Poller::ZPoller
This is the trivial poller based on zpoller. It only supports polling for readability, but it also supports doing that on CLIENT/SERVER sockets, which is useful for {CZTop::Poller}.
Public Class Methods
Initializes the Poller
. At least one reader has to be given. @param reader [Socket, Actor] socket to poll for input @param readers [Socket, Actor] any additional sockets to poll for input
# File lib/cztop/poller/zpoller.rb, line 18 def initialize(reader, *readers) @sockets = {} # to keep references and return same instances ptr = Zpoller.new(reader, *readers.flat_map { |r| [:pointer, r] }, :pointer, nil) attach_ffi_delegate(ptr) remember_socket(reader) readers.each { |r| remember_socket(r) } end
Public Instance Methods
Adds another reader socket to the poller. @param reader [Socket, Actor] socket to poll for input @return [void] @raise [SystemCallError] if this fails
# File lib/cztop/poller/zpoller.rb, line 33 def add(reader) rc = ffi_delegate.add(reader) raise_zmq_err(format('unable to add socket %p', reader)) if rc == -1 remember_socket(reader) end
Tells the zpoller to ignore interrupts. By default, {#wait} will return immediately if it detects an interrupt (when zsys_interrupted
is set to something other than zero). Calling this method will supress this behavior. @return [void]
# File lib/cztop/poller/zpoller.rb, line 75 def ignore_interrupts ffi_delegate.ignore_interrupts end
By default the poller stops if the process receives a SIGINT or SIGTERM signal. This makes it impossible to shut-down message based architectures like zactors. This method lets you switch off break handling. The default nonstop setting is off (false).
Setting this will cause {#wait} to never raise.
@param flag [Boolean] whether the poller should run nonstop
# File lib/cztop/poller/zpoller.rb, line 88 def nonstop=(flag) ffi_delegate.set_nonstop(flag) end
Removes a reader socket from the poller. @param reader [Socket, Actor] socket to remove @return [void] @raise [ArgumentError] if socket was invalid, e.g. it wasn’t registered
in this poller
@raise [SystemCallError] if this fails for another reason
# File lib/cztop/poller/zpoller.rb, line 46 def remove(reader) rc = ffi_delegate.remove(reader) raise_zmq_err(format('unable to remove socket %p', reader)) if rc == -1 forget_socket(reader) end
Waits and returns the first socket that becomes readable. @param timeout [Integer] how long to wait in ms, or 0 to avoid
blocking, or -1 to wait indefinitely
@return [Socket, Actor] first socket of interest @return [nil] if the timeout expired or @raise [Interrupt] if the timeout expired or
# File lib/cztop/poller/zpoller.rb, line 59 def wait(timeout = -1) ptr = ffi_delegate.wait(timeout) if ptr.null? raise Interrupt if ffi_delegate.terminated return nil end socket_by_ptr(ptr) end
Private Instance Methods
Forgets the socket because it has been removed from the poller. @param socket [Socket, Actor] the socket instance to forget @return [void]
# File lib/cztop/poller/zpoller.rb, line 107 def forget_socket(socket) @sockets.delete(socket.to_ptr.to_i) end
Remembers the socket so a call to {#wait} can return with the exact same instance of {Socket}, and it also makes sure the socket won’t get GC’d. @param socket [Socket, Actor] the socket instance to remember @return [void]
# File lib/cztop/poller/zpoller.rb, line 99 def remember_socket(socket) @sockets[socket.to_ptr.to_i] = socket end
Gets the previously remembered socket associated to the given pointer. @param ptr [FFI::Pointer] the pointer to a socket @return [Socket, Actor] the socket associated to the given pointer @raise [SystemCallError] if no socket is registered under given pointer
# File lib/cztop/poller/zpoller.rb, line 116 def socket_by_ptr(ptr) @sockets[ptr.to_i] or # NOTE: This should never happen, since #wait will return nil if # +zpoller_wait+ returned NULL. But it's better to fail early in case # it ever returns a wrong pointer. raise_zmq_err("no socket known for pointer #{ptr.inspect}") end