Create a new controller. This is the new interface that will
replace rrq_controller soon, at which point it will rename back
to rrq_controller
.
Usage
rrq_controller(
queue_id,
con = redux::hiredis(),
timeout_task_wait = NULL,
follow = NULL,
check_version = TRUE
)
Arguments
- queue_id
An identifier for the queue. This will prefix all keys in redis, so a prefix might be useful here depending on your use case (e.g.
rrq:<user>:<id>
)- con
A redis connection. The default tries to create a redis connection using default ports, or environment variables set as in
redux::hiredis()
- timeout_task_wait
An optional default timeout to use when waiting for tasks with rrq_task_wait. If not given, then we fall back on the global option
rrq.timeout_task_wait
, and if that is not set, we wait forever (i.e.,timeout_task_wait = Inf
).- follow
An optional default logical to use for tasks that may (or may not) be retried. If not given we fall back on the global option
rrq.follow
, and if that is not set thenTRUE
(i.e., we do follow). The valuefollow = TRUE
is potentially slower thanfollow = FALSE
for some operations because we need to dereference every task id. If you never use rrq_task_retry then this dereference never has an effect and we can skip it. Seevignette("fault-tolerance")
for more information.- check_version
Logical, indicating if we should check the schema version. You can pass
FALSE
here to continue even where the schema version is incompatible, though any subsequent actions may lead to corruption.
Task lifecycle
A task is queued with
$enqueue()
, at which point it becomesPENDING
Once a worker selects the task to run, it becomes
RUNNING
If the task completes successfully without error it becomes
COMPLETE
If the task throws an error, it becomes
ERROR
If the task was cancelled (e.g., via
$task_cancel()
) it becomesCANCELLED
If the task is killed by an external process, crashes or the worker dies (and is running a heartbeat) then the task becomes
DIED
.The status of an unknown task is
MISSING
Tasks in any terminal state (except
IMPOSSIBLE
) may be retried withtask_retry
at which point they becomeMOVED
, seevignette("fault-tolerance")
for details
Worker lifecycle
A worker appears and is
IDLE
When running a task it is
BUSY
If it receives a
PAUSE
message it becomesPAUSED
until it receives aRESUME
messageIf it exits cleanly (e.g., via a
STOP
message or a timeout) it becomesEXITED
If it crashes and was running a heartbeat, it becomes
LOST
Messages
Most of the time workers process tasks, but you can also send them "messages". Messages take priority over tasks, so if a worker becomes idle (by coming online or by finishing a task) it will consume all available messages before starting on a new task, even if both are available.
Each message has a "command" and may have "arguments" to that command. The supported messages are:
PING
(no args): "ping" the worker, if alive it will respond with "PONG"ECHO
(accepts an argument of a string): Print a string to the terminal and log of the worker. Will respond withOK
once the message has been printed.EVAL
(accepts a string or a quoted expression): Evaluate an arbitrary R expression on the worker. Responds with the value of this expression.STOP
(accepts a string to print as the worker exits, defaults to "BYE"): Tells the worker to stop.INFO
(no args): Returns information about the worker (versions of packages, hostname, pid, etc).PAUSE
(no args): Tells the worker to stop accepting tasks (until it receives aRESUME
message). Messages are processed as normal.RESUME
(no args): Tells a paused worker to resume accepting tasks.REFRESH
(no args): Tells the worker to rebuild their environment with thecreate
method.TIMEOUT_SET
(accepts a number, representing seconds): Updates the worker timeout - the length of time after which it will exit if it has not processed a task.TIMEOUT_GET
(no args): Tells the worker to respond with its current timeout.
Examples
# Create a new controller; the id will be specific to your
# application. Here, we use 'rrq:example'
obj <- rrq_controller("rrq:example")
# Create a task for this controller to work on:
t <- rrq_task_create_expr(runif(10), controller = obj)
# Wait for the task to complete
rrq_task_wait(t, controller = obj)
#> [1] TRUE
# Fetch the task's result
rrq_task_result(t, controller = obj)
#> [1] 0.22664047 0.26882991 0.45438132 0.12968307 0.63519476 0.80048620
#> [7] 0.14971195 0.99998005 0.11946089 0.06567408