module CZTop::SendReceiveMethods
These are methods that can be used on a {Socket} as well as an {Actor}, but actually just pass through to methods of {Message} (which take a polymorphic reference, in Ruby as well as in C). @see api.zeromq.org/czmq3-0:zmsg
Constants
- FD_TIMEOUT
Because ZMQ sockets are edge-triggered, there’s a small chance that we miss an edge (race condition). To avoid blocking forever, all waiting on the ZMQ FD is done with this timeout or less.
The race condition exists between the calls to {#readable?}/{#writable?} and waiting for the ZMQ FD. If the socke becomes readable/writable during that time, waiting for the FD could block forever without a timeout.
- JIFFY
Sometimes the ZMQ FD just insists on readiness. To avoid hogging the CPU, a sleep of this many seconds is included in the tight loop.
Public Instance Methods
Sends a message.
@param message [Message, String, Array<parts>] the message to send @raise [IO::EAGAINWaitWritable, IO::TimeoutError
] if send timeout has been reached (see
{ZsockOptions::OptionsAccessor#sndtimeo=})
@raise [Interrupt, ArgumentError, SystemCallError] anything raised by
{Message#send_to}
@return [self] @see Message.coerce
@see Message#send_to
# File lib/cztop/send_receive_methods.rb, line 31 def <<(message) Message.coerce(message).send_to(self) self end
@return [Float, nil] the timeout in seconds used by {IO#wait_readable}
# File lib/cztop/send_receive_methods.rb, line 137 def read_timeout timeout = options.rcvtimeo if timeout <= 0 timeout = nil else timeout = timeout.to_f / 1000 end timeout end
Receives a message.
@return [Message] @raise [IO::EAGAINWaitReadable, IO::TimeoutError
] if receive timeout has been reached (see
{ZsockOptions::OptionsAccessor#rcvtimeo=})
@raise [Interrupt, ArgumentError, SystemCallError] anything raised by
{Message.receive_from}
@see Message.receive_from
# File lib/cztop/send_receive_methods.rb, line 45 def receive Message.receive_from(self) end
@note Only available on Ruby >= 3.2
# File lib/cztop/send_receive_methods.rb, line 61 def wait_for_fd_signal(timeout = nil) @fd_io ||= to_io if timeout if timeout > FD_TIMEOUT timeout = FD_TIMEOUT end else timeout = FD_TIMEOUT end # NOTE: always wait for readability on ZMQ FD @fd_io.wait_readable timeout end
Waits for socket to become readable. @param timeout [Numeric, nil] timeout in seconds @return [true] if readable within timeout @raise [IO::EAGAINWaitReadable, IO::TimeoutError
] if timeout has been reached @raise [CZMQ::FFI::Zsock::DestroyedError] if socket has already been destroyed @note Only available on Ruby >= 3.2
# File lib/cztop/send_receive_methods.rb, line 89 def wait_readable(timeout = read_timeout) return true if readable? timeout_at = now + timeout if timeout while true # p wait_readable: self, timeout: timeout wait_for_fd_signal timeout break if readable? # NOTE: ZMQ FD can't be trusted raise ::IO::TimeoutError if timeout_at && now >= timeout_at sleep JIFFY # HACK break if readable? # NOTE: ZMQ FD is edge-triggered. Check again before blocking. end true end
Waits for socket to become writable. @param timeout [Numeric, nil] timeout in seconds @return [true] if writable within timeout @raise [IO::EAGAINWaitReadable, IO::TimeoutError
] if timeout has been reached @raise [CZMQ::FFI::Zsock::DestroyedError] if socket has already been destroyed @note Only available on Ruby >= 3.2
# File lib/cztop/send_receive_methods.rb, line 116 def wait_writable(timeout = write_timeout) return true if writable? timeout_at = now + timeout if timeout while true # p wait_writable: self, timeout: timeout wait_for_fd_signal timeout break if writable? # NOTE: ZMQ FD can't be trusted raise ::IO::TimeoutError if timeout_at && now >= timeout_at sleep JIFFY # HACK break if writable? # NOTE: ZMQ FD is edge-triggered. Check again before blocking. end true end
@return [Float, nil] the timeout in seconds used by {IO#wait_writable}
# File lib/cztop/send_receive_methods.rb, line 151 def write_timeout timeout = options.sndtimeo if timeout <= 0 timeout = nil else timeout = timeout.to_f / 1000 end timeout end
Private Instance Methods
# File lib/cztop/send_receive_methods.rb, line 167 def now Process.clock_gettime(Process::CLOCK_MONOTONIC) end