msg_queue {interprocess} | R Documentation |
Send Text Messages Between Processes
Description
An interprocess message queue that ensures each message is delivered to only one reader, at which time the message is removed from the queue. Ideal for producer/consumer situations where the message defines work waiting to be processed. The message itself can be any scalar character, for example, a JSON string or path to an RDS file.
Usage
msg_queue(
name = uid(),
assert = NULL,
max_count = 100,
max_nchar = 128,
cleanup = FALSE,
file = NULL
)
## S3 method for class 'msg_queue'
with(data, expr, alt_expr = NULL, timeout_ms = Inf, ...)
Arguments
name |
Unique ID. Alphanumeric, starting with a letter. |
assert |
Apply an additional constraint.
|
max_count |
The maximum number of messages that can be stored in
the queue at the same time. Attempting to send additional messages
will cause |
max_nchar |
The maximum number of characters in each message. Attempting to send larger messages will throw an error. Ignored if the message queue already exists. |
cleanup |
Remove the message queue when the R session exits. If
|
file |
Use a hash of this file/directory path as the message queue name. The file itself will not be read or modified, and does not need to exist. |
data |
A |
expr |
Expression to evaluate if a message is received. The message
can be accessed by |
alt_expr |
Expression to evaluate if |
timeout_ms |
Maximum time (in milliseconds) to block the process
while waiting for the operation to succeed. Use |
... |
Not used. |
Value
msg_queue()
returns a msg_queue
object with the following methods:
-
$name
Returns the message queue's name (scalar character).
-
$send(msg, timeout_ms = Inf, priority = 0)
Returns
TRUE
on success, orFALSE
if the timeout is reached.-
msg
: The message (scalar character) to add to the message queue. -
priority
: Higher priority messages will be retrieved from the message queue first.0
= lowest priority; integers only.
-
$receive(timeout_ms = Inf)
Returns the next message from the message queue, or
NULL
if the timeout is reached.
-
$count()
Returns the number of messages currently in the message queue.
-
$max_count()
Returns the maximum number of messages the queue can hold.
-
$max_nchar()
Returns the maximum number of characters per message.
-
$remove()
Returns
TRUE
if the message queue was successfully deleted from the operating system, orFALSE
on error.
with()
returns eval(expr)
on success; eval(alt_expr)
otherwise.
See Also
Other shared objects:
mutex()
,
semaphore()
Examples
mq <- interprocess::msg_queue()
print(mq)
mq$send(paste('my favorite number is', floor(runif(1) * 100)))
mq$count()
mq$receive()
mq$receive(timeout_ms = 0)
mq$send('The Matrix has you...')
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
mq$remove()