class CZTop::Reactor

An implementation of the Reactor pattern described in [Pattern-Oriented Software Architecture (Volume 2)]. It allows an asynchronous application to be described as one or more “reactions” to events, in this case either I/O conditions on a ZMQ socket or a timer expiring.

[POSA2]: www.cs.wustl.edu/~schmidt/POSA/POSA2/

Constants

DEFAULT_POLL_INTERVAL

The maximum number of seconds to wait for events when there are no timers registered.

POLLER_RETRY_ERRORS

Errors encountered during the poll that should retry instead of raising

VALID_EVENTS

The events that can be registered and the corresponding mask

VERSION

The version of this library

Attributes

sockets[R]

Sockets and the handlers that handle their IO

timers[R]

Registered timers as a Timers::Group

wakeup_timer[R]

The handle of the default timer that is used to ensure the polling loop notices new sockets and timers.

Public Class Methods

new() click to toggle source

Create a new CZTop::Reactor

Calls superclass method
# File lib/cztop/reactor.rb, line 55
def initialize
        @sockets = Hash.new do |hsh,key|
                hsh[ key ] = { events: [], handler: nil }
        end
        @timers = Timers::Group.new
        @wakeup_timer = @timers.every( DEFAULT_POLL_INTERVAL ) do
                # No-op -- just ensures that new timers that are registered are only
                # delayed by (at most) the DEFAULT_POLL_INTERVAL before they start.
        end

        @socket_pointers = {}

        @poller_ptr = CZTop::Poller::ZMQ.poller_new
        ObjectSpace.define_finalizer( @poller_ptr, -> (obj_id) {
                # $stderr.puts "Freeing the poller pointer %p" % [ @poller_ptr ]
                ptr_ptr = ::FFI::MemoryPointer.new( :pointer )
                ptr_ptr.write_pointer( @poller_ptr )
                CZTop::Poller::ZMQ.poller_destroy( ptr_ptr )
        })
        @event_ptr = ::FFI::MemoryPointer.new( CZTop::Poller::ZMQ::PollerEvent )

        super
end

Public Instance Methods

add( socket, *events, &handler )
Alias for: register
add_oneshot_timer( delay, &callback ) click to toggle source

Register a timer that will call the specified callback once after delay seconds.

# File lib/cztop/reactor.rb, line 233
def add_oneshot_timer( delay, &callback )
        self.log.debug "Registering a oneshot timer: will call %p after %0.2fs" %
                [ callback, delay ]
        return self.timers.after( delay, &callback )
end
add_periodic_timer( delay, &callback ) click to toggle source

Register a timer that will call the specified callback once every delay seconds until it is cancelled.

# File lib/cztop/reactor.rb, line 242
def add_periodic_timer( delay, &callback )
        self.log.debug "Registering a periodic timer: will call %p every %0.2fs" %
                [ callback, delay ]
        return self.timers.every( delay, &callback )
end
clear() click to toggle source

Clear all registered sockets and returns the sockets that were cleared.

# File lib/cztop/reactor.rb, line 218
def clear
        self.synchronize do
                sockets = self.sockets.keys
                sockets.each {|sock| self.unregister(sock) }
                return sockets
        end
end
disable_events( socket, *events ) click to toggle source

Remove the specified events from the list that will be polled for on the given socket handle.

# File lib/cztop/reactor.rb, line 190
def disable_events( socket, *events )
        self.synchronize do
                socket = self.socket_for_ptr( socket ) if socket.is_a?( FFI::Pointer )
                self.sockets[ socket ][:events] -= events
                self.update_poller_for( socket )
        end
end
Also aliased as: disable_socket_events
disable_socket_events( socket, *events )
Alias for: disable_events
empty?() click to toggle source

Returns true if no sockets are registered.

# File lib/cztop/reactor.rb, line 212
def empty?
        return self.sockets.empty? && self.timers.empty?
end
enable_event( socket, *events )
Alias for: enable_events
enable_events( socket, *events ) click to toggle source

Add the specified events to the list that will be polled for on the given socket.

# File lib/cztop/reactor.rb, line 168
def enable_events( socket, *events )
        invalid = events - ( events & VALID_EVENTS.keys )
        if !invalid.empty?
                raise ArgumentError, "invalid events: %p" % [ invalid ]
        end

        self.synchronize do
                socket = self.socket_for_ptr( socket ) if socket.is_a?( FFI::Pointer )
                raise ArgumentError, "%p is not registered yet" % [ socket ] unless
                        self.registered?( socket )

                self.sockets[ socket ][ :events ] |= events
                self.update_poller_for( socket )
        end
end
enable_socket_event( socket, *events )
Alias for: enable_events
enable_socket_events( socket, *events )
Alias for: enable_events
event_enabled?( socket, event ) click to toggle source

Returns true if the specified event is enabled for the given socket.

# File lib/cztop/reactor.rb, line 201
def event_enabled?( socket, event )
        socket = self.socket_for_ptr( socket ) if socket.is_a?( FFI::Pointer )

        return false unless self.sockets.key?( socket )
        return self.sockets[ socket ][ :events ].include?( event )
end
has_event_enabled?( socket, event )
Alias for: event_enabled?
pause_timers() click to toggle source

Pause all timers registered with the reactor.

# File lib/cztop/reactor.rb, line 262
def pause_timers
        self.timers.pause
end
poll_once( ignore_interrupts: false, ignore_eagain: true ) click to toggle source

Poll registered sockets or fire timers and return.

# File lib/cztop/reactor.rb, line 324
def poll_once( ignore_interrupts: false, ignore_eagain: true )
        self.log.debug "Polling %d sockets" % [ self.sockets.length ]

        wait_interval = self.timers.wait_interval || DEFAULT_POLL_INTERVAL

        # If there's a timer already due to fire, don't wait at all
        event = if wait_interval > 0
                        self.log.debug "Waiting for IO for %fms" % [ wait_interval * 1000 ]
                        self.wait( wait_interval * 1000 )
                else
                        nil
                end

        self.log.debug "Got event %p" % [ event ]
        if event
                if event.socket
                        handler = self.sockets[ event.socket ][ :handler ]
                        handler.call( event )
                else
                        self.log.warn "Got an event for an unregistered socket: racing to shut down?"
                end
        else
                self.log.debug "Expired: firing timers."
                self.timers.fire
        end

        # self.log.debug "%d sockets after polling: %s" %
        #     [ self.sockets.length, self.sockets_description ]
rescue Interrupt
        raise unless ignore_interrupts
        self.log.debug "Interrupted."
        return nil
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
        raise unless ignore_eagain
        self.log.debug "EAGAIN"
        return nil
end
register( socket, *events, &handler ) click to toggle source

Register the specified socket with the reactor for the specified events. The following events are supported:

:read

Data may be read from the socket without blocking.

:write

Data may be written to the socket without blocking.

Registering a handle will unregister any previously registered event/handler+arguments pairs associated with the handle.

# File lib/cztop/reactor.rb, line 113
def register( socket, *events, &handler )
        if !events.empty? && !events.last.is_a?( Symbol )
                handler_obj = events.pop
                handler = handler_obj.method( :handle_io_event )
        end

        raise LocalJumpError, "no block or handler given" unless handler

        self.synchronize do
                self.unregister( socket )

                ptr = self.ptr_for_socket( socket )
                rc = CZTop::Poller::ZMQ.poller_add( @poller_ptr, ptr, nil, 0 )
                self.log.debug "poller_add: rc = %p" % [ rc ]
                CZTop::HasFFIDelegate.raise_zmq_err if rc == -1

                self.log.info "Registered: %p with handler: %p" % [ socket, handler ]
                self.sockets[ socket ][ :handler ] = handler
                self.enable_events( socket, *events )

                @socket_pointers[ ptr.to_i ] = socket
        end
end
Also aliased as: add, register_socket
register_monitor( socket, *events, &callback ) click to toggle source

Create a CZTop::Monitor for the specified socket that will listen for the specified events (which are monitor events, not I/O events). It will be automatically registered with the reactor for the `:read` event with the specified callback, then returned.

# File lib/cztop/reactor.rb, line 291
def register_monitor( socket, *events, &callback )
        if !events.empty? && events.last.respond_to?( :handle_monitor_event )
                handler = events.pop
                callback = handler.method( :handle_monitor_event )
        end

        events.map!( &:to_s )
        events.push( 'ALL' ) if events.empty?

        monitor = CZTop::Monitor.new( socket )
        monitor.listen( *events )
        monitor.start
        monitor.verbose! if $VERBOSE

        self.register( monitor.actor, :read, &callback )

        return monitor
end
Also aliased as: start_monitor
register_socket( socket, *events, &handler )
Alias for: register
registered?( socket ) click to toggle source

Returns true if the given socket handle is registered with the reactor.

# File lib/cztop/reactor.rb, line 161
def registered?( socket )
        return self.sockets.key?( socket )
end
remove( socket )
Alias for: unregister
remove_timer( timer ) click to toggle source

Remove the specified timer from the reactor.

# File lib/cztop/reactor.rb, line 250
def remove_timer( timer )
        timer.cancel
end
resume_timer( timer ) click to toggle source

Restore the specified timer to the reactor.

# File lib/cztop/reactor.rb, line 256
def resume_timer( timer )
        timer.reset
end
resume_timers() click to toggle source

Resume all timers registered with the reactor.

# File lib/cztop/reactor.rb, line 268
def resume_timers
        self.timers.resume
end
socket_event_enabled?( socket, event )
Alias for: event_enabled?
socket_for_ptr( pointer ) click to toggle source

Return the socket object for the given pointer (an FFI::Pointer), or nil if the pointer is unknown.

# File lib/cztop/reactor.rb, line 373
def socket_for_ptr( pointer )
        return @socket_pointers[ pointer.to_i ]
end
sockets_description() click to toggle source

Return a description of the sockets registered with the reactor.

# File lib/cztop/reactor.rb, line 379
def sockets_description
        return self.sockets.map do |sock, _|
                case sock
                when CZTop::Socket
                        "%s {%s}" % [ sock.class.name.sub(/.*::/, ''), sock.last_endpoint || 'not connected' ]
                when CZTop::Actor
                        "Actor %#x" % [ sock.object_id * 2 ]
                else
                        "??? %p ???" % [ sock.class ]
                end
        end.join( ', ' )
end
start_monitor( socket, *events, &callback )
Alias for: register_monitor
start_polling( **opts ) click to toggle source

Poll registered sockets and fire timers until they're all unregistered.

# File lib/cztop/reactor.rb, line 318
def start_polling( **opts )
        self.poll_once( **opts ) until self.empty?
end
stop_polling() click to toggle source

Stop polling for events and prepare to shut down.

# File lib/cztop/reactor.rb, line 364
def stop_polling
        self.log.debug "Stopping the poll loop."
        self.clear
        self.timers.cancel
end
unregister( socket ) click to toggle source

Remove the specified socket from the receiver's list of registered handles, if present. Returns the handle if it was registered, or nil if it was not.

# File lib/cztop/reactor.rb, line 143
def unregister( socket )
        self.synchronize do
                if self.sockets.delete( socket )
                        self.log.info "Unregistering: %p" % [ socket ]
                        ptr = self.ptr_for_socket( socket )
                        rc = CZTop::Poller::ZMQ.poller_remove( @poller_ptr, ptr )
                        self.log.debug "poller_remove: rc = %p" % [ rc ]
                        CZTop::HasFFIDelegate.raise_zmq_err if rc == -1
                end

                @socket_pointers.delete( ptr.to_i )
        end
end
Also aliased as: remove, unregister_socket
unregister_socket( socket )
Alias for: unregister
with_timers_paused() { || ... } click to toggle source

Execute a block with all registered timers paused, then resume them when the block returns.

# File lib/cztop/reactor.rb, line 275
def with_timers_paused
        self.pause_timers
        return yield
ensure
        self.resume_timers
end

Protected Instance Methods

mask_for( socket ) click to toggle source

Return the ZMQ bitmask for the events the specified socket is registered for.

# File lib/cztop/reactor.rb, line 430
def mask_for( socket )
        return self.sockets[ socket ][ :events ].inject( 0 ) do |mask, evt|
                mask | VALID_EVENTS[ evt ]
        end
end
ptr_for_socket( socket ) click to toggle source

Return the low-level handle for socket. Raises an ArgumentError if argument is not a CZTop::Socket or a CZTop::Actor.

# File lib/cztop/reactor.rb, line 439
def ptr_for_socket( socket )
        unless socket.is_a?( CZTop::Socket ) || socket.is_a?( CZTop::Actor )
                raise ArgumentError, "expected a CZTop::Socket or a CZTop::Actor, got %p" % [ socket ]
        end
        return CZMQ::FFI::Zsock.resolve( socket )
end
update_poller_for( socket ) click to toggle source

Modify the underlying poller's event mask with the events socket is interested in.

# File lib/cztop/reactor.rb, line 417
def update_poller_for( socket )
        self.synchronize do
                event_mask = self.mask_for( socket )

                ptr = self.ptr_for_socket( socket )
                rc = CZTop::Poller::ZMQ.poller_modify( @poller_ptr, ptr, event_mask )
                CZTop::HasFFIDelegate.raise_zmq_err if rc == -1
        end
end
wait( timeout=-1 ) click to toggle source

Waits for events on registered sockets. Returns the first such event, or nil if no events arrived within the specified timeout. If timeout is -1, wait indefinitely.

# File lib/cztop/reactor.rb, line 400
def wait( timeout=-1 )
        rc = CZTop::Poller::ZMQ.poller_wait( @poller_ptr, @event_ptr, timeout )
        if rc == -1
                case CZMQ::FFI::Errors.errno
                when *POLLER_RETRY_ERRORS
                        # Retry if the error means we should
                else
                        CZTop::HasFFIDelegate.raise_zmq_err
                end
                return nil
        end
        return Event.new(self, @event_ptr)
end