class CZTop::Poller::ZPoller

This is the trivial poller based on zpoller. It only supports polling for readability, but it also supports doing that on CLIENT/SERVER sockets, which is useful for {CZTop::Poller}.

@see api.zeromq.org/czmq3-0:zpoller

Public Class Methods

new(reader, *readers) click to toggle source

Initializes the Poller. At least one reader has to be given. @param reader [Socket, Actor] socket to poll for input @param readers [Socket, Actor] any additional sockets to poll for input

# File lib/cztop/poller/zpoller.rb, line 18
def initialize(reader, *readers)
  @sockets = {} # to keep references and return same instances
  ptr      = Zpoller.new(reader,
                         *readers.flat_map { |r| [:pointer, r] },
                         :pointer, nil)
  attach_ffi_delegate(ptr)
  remember_socket(reader)
  readers.each { |r| remember_socket(r) }
end

Public Instance Methods

add(reader) click to toggle source

Adds another reader socket to the poller. @param reader [Socket, Actor] socket to poll for input @return [void] @raise [SystemCallError] if this fails

# File lib/cztop/poller/zpoller.rb, line 33
def add(reader)
  rc = ffi_delegate.add(reader)
  raise_zmq_err(format('unable to add socket %p', reader)) if rc == -1
  remember_socket(reader)
end
ignore_interrupts() click to toggle source

Tells the zpoller to ignore interrupts. By default, {#wait} will return immediately if it detects an interrupt (when zsys_interrupted is set to something other than zero). Calling this method will supress this behavior. @return [void]

# File lib/cztop/poller/zpoller.rb, line 75
def ignore_interrupts
  ffi_delegate.ignore_interrupts
end
nonstop=(flag) click to toggle source

By default the poller stops if the process receives a SIGINT or SIGTERM signal. This makes it impossible to shut-down message based architectures like zactors. This method lets you switch off break handling. The default nonstop setting is off (false).

Setting this will cause {#wait} to never raise.

@param flag [Boolean] whether the poller should run nonstop

# File lib/cztop/poller/zpoller.rb, line 88
def nonstop=(flag)
  ffi_delegate.set_nonstop(flag)
end
remove(reader) click to toggle source

Removes a reader socket from the poller. @param reader [Socket, Actor] socket to remove @return [void] @raise [ArgumentError] if socket was invalid, e.g. it wasn’t registered

in this poller

@raise [SystemCallError] if this fails for another reason

# File lib/cztop/poller/zpoller.rb, line 46
def remove(reader)
  rc = ffi_delegate.remove(reader)
  raise_zmq_err(format('unable to remove socket %p', reader)) if rc == -1
  forget_socket(reader)
end
wait(timeout = -1) click to toggle source

Waits and returns the first socket that becomes readable. @param timeout [Integer] how long to wait in ms, or 0 to avoid

blocking, or -1 to wait indefinitely

@return [Socket, Actor] first socket of interest @return [nil] if the timeout expired or @raise [Interrupt] if the timeout expired or

# File lib/cztop/poller/zpoller.rb, line 59
def wait(timeout = -1)
  ptr = ffi_delegate.wait(timeout)
  if ptr.null?
    raise Interrupt if ffi_delegate.terminated

    return nil
  end
  socket_by_ptr(ptr)
end

Private Instance Methods

forget_socket(socket) click to toggle source

Forgets the socket because it has been removed from the poller. @param socket [Socket, Actor] the socket instance to forget @return [void]

# File lib/cztop/poller/zpoller.rb, line 107
def forget_socket(socket)
  @sockets.delete(socket.to_ptr.to_i)
end
remember_socket(socket) click to toggle source

Remembers the socket so a call to {#wait} can return with the exact same instance of {Socket}, and it also makes sure the socket won’t get GC’d. @param socket [Socket, Actor] the socket instance to remember @return [void]

# File lib/cztop/poller/zpoller.rb, line 99
def remember_socket(socket)
  @sockets[socket.to_ptr.to_i] = socket
end
socket_by_ptr(ptr) click to toggle source

Gets the previously remembered socket associated to the given pointer. @param ptr [FFI::Pointer] the pointer to a socket @return [Socket, Actor] the socket associated to the given pointer @raise [SystemCallError] if no socket is registered under given pointer

# File lib/cztop/poller/zpoller.rb, line 116
def socket_by_ptr(ptr)
  @sockets[ptr.to_i] or
    # NOTE: This should never happen, since #wait will return nil if
    # +zpoller_wait+ returned NULL. But it's better to fail early in case
    # it ever returns a wrong pointer.
    raise_zmq_err("no socket known for pointer #{ptr.inspect}")
end