class CZTop::Reactor
An implementation of the Reactor
pattern described in [Pattern-Oriented Software Architecture (Volume 2)]. It allows an asynchronous application to be described as one or more “reactions” to events, in this case either I/O conditions on a ZMQ socket or a timer expiring.
[POSA2]: www.cs.wustl.edu/~schmidt/POSA/POSA2/
Constants
- DEFAULT_POLL_INTERVAL
The maximum number of seconds to wait for events when there are no timers registered.
- POLLER_RETRY_ERRORS
Errors encountered during the poll that should retry instead of raising
- VALID_EVENTS
The events that can be registered and the corresponding mask
- VERSION
The version of this library
Attributes
Sockets and the handlers that handle their IO
Registered timers as a Timers::Group
The handle of the default timer that is used to ensure the polling loop notices new sockets and timers.
Public Class Methods
Create a new CZTop::Reactor
# File lib/cztop/reactor.rb, line 55 def initialize @sockets = Hash.new do |hsh,key| hsh[ key ] = { events: [], handler: nil } end @timers = Timers::Group.new @wakeup_timer = @timers.every( DEFAULT_POLL_INTERVAL ) do # No-op -- just ensures that new timers that are registered are only # delayed by (at most) the DEFAULT_POLL_INTERVAL before they start. end @socket_pointers = {} @poller_ptr = CZTop::Poller::ZMQ.poller_new ObjectSpace.define_finalizer( @poller_ptr, -> (obj_id) { # $stderr.puts "Freeing the poller pointer %p" % [ @poller_ptr ] ptr_ptr = ::FFI::MemoryPointer.new( :pointer ) ptr_ptr.write_pointer( @poller_ptr ) CZTop::Poller::ZMQ.poller_destroy( ptr_ptr ) }) @event_ptr = ::FFI::MemoryPointer.new( CZTop::Poller::ZMQ::PollerEvent ) super end
Public Instance Methods
Register a timer that will call the specified callback
once after delay
seconds.
# File lib/cztop/reactor.rb, line 233 def add_oneshot_timer( delay, &callback ) self.log.debug "Registering a oneshot timer: will call %p after %0.2fs" % [ callback, delay ] return self.timers.after( delay, &callback ) end
Register a timer that will call the specified callback
once every delay
seconds until it is cancelled.
# File lib/cztop/reactor.rb, line 242 def add_periodic_timer( delay, &callback ) self.log.debug "Registering a periodic timer: will call %p every %0.2fs" % [ callback, delay ] return self.timers.every( delay, &callback ) end
Clear all registered sockets and returns the sockets that were cleared.
# File lib/cztop/reactor.rb, line 218 def clear self.synchronize do sockets = self.sockets.keys sockets.each {|sock| self.unregister(sock) } return sockets end end
Remove the specified events
from the list that will be polled for on the given socket
handle.
# File lib/cztop/reactor.rb, line 190 def disable_events( socket, *events ) self.synchronize do socket = self.socket_for_ptr( socket ) if socket.is_a?( FFI::Pointer ) self.sockets[ socket ][:events] -= events self.update_poller_for( socket ) end end
Returns true
if no sockets are registered.
# File lib/cztop/reactor.rb, line 212 def empty? return self.sockets.empty? && self.timers.empty? end
Add the specified events
to the list that will be polled for on the given socket
.
# File lib/cztop/reactor.rb, line 168 def enable_events( socket, *events ) invalid = events - ( events & VALID_EVENTS.keys ) if !invalid.empty? raise ArgumentError, "invalid events: %p" % [ invalid ] end self.synchronize do socket = self.socket_for_ptr( socket ) if socket.is_a?( FFI::Pointer ) raise ArgumentError, "%p is not registered yet" % [ socket ] unless self.registered?( socket ) self.sockets[ socket ][ :events ] |= events self.update_poller_for( socket ) end end
Returns true
if the specified event
is enabled for the given socket
.
# File lib/cztop/reactor.rb, line 201 def event_enabled?( socket, event ) socket = self.socket_for_ptr( socket ) if socket.is_a?( FFI::Pointer ) return false unless self.sockets.key?( socket ) return self.sockets[ socket ][ :events ].include?( event ) end
Pause all timers registered with the reactor.
# File lib/cztop/reactor.rb, line 262 def pause_timers self.timers.pause end
Poll registered sockets or fire timers and return.
# File lib/cztop/reactor.rb, line 324 def poll_once( ignore_interrupts: false, ignore_eagain: true ) self.log.debug "Polling %d sockets" % [ self.sockets.length ] wait_interval = self.timers.wait_interval || DEFAULT_POLL_INTERVAL # If there's a timer already due to fire, don't wait at all event = if wait_interval > 0 self.log.debug "Waiting for IO for %fms" % [ wait_interval * 1000 ] self.wait( wait_interval * 1000 ) else nil end self.log.debug "Got event %p" % [ event ] if event if event.socket handler = self.sockets[ event.socket ][ :handler ] handler.call( event ) else self.log.warn "Got an event for an unregistered socket: racing to shut down?" end else self.log.debug "Expired: firing timers." self.timers.fire end # self.log.debug "%d sockets after polling: %s" % # [ self.sockets.length, self.sockets_description ] rescue Interrupt raise unless ignore_interrupts self.log.debug "Interrupted." return nil rescue Errno::EAGAIN, Errno::EWOULDBLOCK raise unless ignore_eagain self.log.debug "EAGAIN" return nil end
Register the specified socket
with the reactor for the specified events
. The following events are supported:
:read
-
Data may be read from the socket without blocking.
:write
-
Data may be written to the socket without blocking.
Registering a handle will unregister any previously registered event/handler+arguments pairs associated with the handle.
# File lib/cztop/reactor.rb, line 113 def register( socket, *events, &handler ) if !events.empty? && !events.last.is_a?( Symbol ) handler_obj = events.pop handler = handler_obj.method( :handle_io_event ) end raise LocalJumpError, "no block or handler given" unless handler self.synchronize do self.unregister( socket ) ptr = self.ptr_for_socket( socket ) rc = CZTop::Poller::ZMQ.poller_add( @poller_ptr, ptr, nil, 0 ) self.log.debug "poller_add: rc = %p" % [ rc ] CZTop::HasFFIDelegate.raise_zmq_err if rc == -1 self.log.info "Registered: %p with handler: %p" % [ socket, handler ] self.sockets[ socket ][ :handler ] = handler self.enable_events( socket, *events ) @socket_pointers[ ptr.to_i ] = socket end end
Create a CZTop::Monitor for the specified socket
that will listen for the specified events
(which are monitor events, not I/O events). It will be automatically registered with the reactor for the `:read` event with the specified callback
, then returned.
# File lib/cztop/reactor.rb, line 291 def register_monitor( socket, *events, &callback ) if !events.empty? && events.last.respond_to?( :handle_monitor_event ) handler = events.pop callback = handler.method( :handle_monitor_event ) end events.map!( &:to_s ) events.push( 'ALL' ) if events.empty? monitor = CZTop::Monitor.new( socket ) monitor.listen( *events ) monitor.start monitor.verbose! if $VERBOSE self.register( monitor.actor, :read, &callback ) return monitor end
Returns true
if the given socket
handle is registered with the reactor.
# File lib/cztop/reactor.rb, line 161 def registered?( socket ) return self.sockets.key?( socket ) end
Remove the specified timer
from the reactor.
# File lib/cztop/reactor.rb, line 250 def remove_timer( timer ) timer.cancel end
Restore the specified timer
to the reactor.
# File lib/cztop/reactor.rb, line 256 def resume_timer( timer ) timer.reset end
Resume all timers registered with the reactor.
# File lib/cztop/reactor.rb, line 268 def resume_timers self.timers.resume end
Return the socket object for the given pointer
(an FFI::Pointer), or nil
if the pointer is unknown.
# File lib/cztop/reactor.rb, line 373 def socket_for_ptr( pointer ) return @socket_pointers[ pointer.to_i ] end
Return a description of the sockets registered with the reactor.
# File lib/cztop/reactor.rb, line 379 def sockets_description return self.sockets.map do |sock, _| case sock when CZTop::Socket "%s {%s}" % [ sock.class.name.sub(/.*::/, ''), sock.last_endpoint || 'not connected' ] when CZTop::Actor "Actor %#x" % [ sock.object_id * 2 ] else "??? %p ???" % [ sock.class ] end end.join( ', ' ) end
Poll registered sockets and fire timers until they're all unregistered.
# File lib/cztop/reactor.rb, line 318 def start_polling( **opts ) self.poll_once( **opts ) until self.empty? end
Stop polling for events and prepare to shut down.
# File lib/cztop/reactor.rb, line 364 def stop_polling self.log.debug "Stopping the poll loop." self.clear self.timers.cancel end
Remove the specified socket
from the receiver's list of registered handles, if present. Returns the handle if it was registered, or nil
if it was not.
# File lib/cztop/reactor.rb, line 143 def unregister( socket ) self.synchronize do if self.sockets.delete( socket ) self.log.info "Unregistering: %p" % [ socket ] ptr = self.ptr_for_socket( socket ) rc = CZTop::Poller::ZMQ.poller_remove( @poller_ptr, ptr ) self.log.debug "poller_remove: rc = %p" % [ rc ] CZTop::HasFFIDelegate.raise_zmq_err if rc == -1 end @socket_pointers.delete( ptr.to_i ) end end
Execute a block
with all registered timers paused, then resume them when the block returns.
# File lib/cztop/reactor.rb, line 275 def with_timers_paused self.pause_timers return yield ensure self.resume_timers end
Protected Instance Methods
Return the ZMQ bitmask for the events the specified socket
is registered for.
# File lib/cztop/reactor.rb, line 430 def mask_for( socket ) return self.sockets[ socket ][ :events ].inject( 0 ) do |mask, evt| mask | VALID_EVENTS[ evt ] end end
Return the low-level handle for socket
. Raises an ArgumentError if argument is not a CZTop::Socket or a CZTop::Actor.
# File lib/cztop/reactor.rb, line 439 def ptr_for_socket( socket ) unless socket.is_a?( CZTop::Socket ) || socket.is_a?( CZTop::Actor ) raise ArgumentError, "expected a CZTop::Socket or a CZTop::Actor, got %p" % [ socket ] end return CZMQ::FFI::Zsock.resolve( socket ) end
Modify the underlying poller's event mask with the events socket
is interested in.
# File lib/cztop/reactor.rb, line 417 def update_poller_for( socket ) self.synchronize do event_mask = self.mask_for( socket ) ptr = self.ptr_for_socket( socket ) rc = CZTop::Poller::ZMQ.poller_modify( @poller_ptr, ptr, event_mask ) CZTop::HasFFIDelegate.raise_zmq_err if rc == -1 end end
Waits for events on registered sockets. Returns the first such event, or nil
if no events arrived within the specified timeout
. If timeout
is -1, wait indefinitely.
# File lib/cztop/reactor.rb, line 400 def wait( timeout=-1 ) rc = CZTop::Poller::ZMQ.poller_wait( @poller_ptr, @event_ptr, timeout ) if rc == -1 case CZMQ::FFI::Errors.errno when *POLLER_RETRY_ERRORS # Retry if the error means we should else CZTop::HasFFIDelegate.raise_zmq_err end return nil end return Event.new(self, @event_ptr) end