A queue controller. Use this to interact with a queue/cluster.
A task is queued with $enqueue()
, at which point it becomes PENDING
Once a worker selects the task to run, it becomes RUNNING
If the task completes successfully without error it becomes COMPLETE
If the task throws an error, it becomes ERROR
If the task was cancelled (e.g., via $task_cancel()
) it becomes
CANCELLED
If the task is killed by an external process, crashes or the worker
dies (and is running a heartbeat) then the task becomes DIED
.
The status of an unknown task is MISSING
Tasks in any terminal state (except IMPOSSIBLE
) may be retried
with task_retry
at which point they become MOVED
, see
vignette("fault-tolerance")
for details
A worker appears and is IDLE
When running a task it is BUSY
If it receives a PAUSE
message it becomes PAUSED
until it
receives a RESUME
message
If it exits cleanly (e.g., via a STOP
message or a timeout) it
becomes EXITED
If it crashes and was running a heartbeat, it becomes LOST
Most of the time workers process tasks, but you can also send them "messages". Messages take priority over tasks, so if a worker becomes idle (by coming online or by finishing a task) it will consume all available messages before starting on a new task, even if both are available.
Each message has a "command" and may have "arguments" to that command. The supported messages are:
PING
(no args): "ping" the worker, if alive it will respond
with "PONG"
ECHO
(accepts an argument of a string): Print a string to the
terminal and log of the worker. Will respond with OK
once the
message has been printed.
EVAL
(accepts a string or a quoted expression): Evaluate an
arbitrary R expression on the worker. Responds with the value of
this expression.
STOP
(accepts a string to print as the worker exits, defaults
to "BYE"): Tells the worker to stop.
INFO
(no args): Returns information about the worker (versions
of packages, hostname, pid, etc).
PAUSE
(no args): Tells the worker to stop accepting tasks
(until it receives a RESUME
message). Messages are processed
as normal.
RESUME
(no args): Tells a paused worker to resume accepting
tasks.
REFRESH
(no args): Tells the worker to rebuild their
environment with the create
method.
TIMEOUT_SET
(accepts a number, representing seconds): Updates
the worker timeout - the length of time after which it will exit
if it has not processed a task.
TIMEOUT_GET
(no args): Tells the worker to respond with its
current timeout.
lapply
)The bulk interface is a bit more complicated than the basic
enqueue
interface. In the majority of cases you can ignore the
details and use the lapply
method in much the same way as you
would in normal R. Assuming that obj
is your rrq_controller
object, you might write:
which will return the same thing as lapply(1:10, sqrt)
(provided
that you have a Redis server running and workers registered)
There is some sleight of hand here, though as we need to identify
that it is the symbol sqrt
that matters there corresponding
to the builtin sqrt function. You can make this more explicit
by passing in the name of the function using $lapply_()
The same treatment applies to the dots; this is allowed:
But this will look up the bindings of log
and b
in the context
in which the call is made. This may not always do what is
expected, so you can use the names directly:
con
The redis connection. This is part of the public API and can be used to access the same redis database as the queue.
queue_id
The queue id used on creation. This is read-only after creation.
new()
Constructor
rrq_controller$new(
queue_id,
con = redux::hiredis(),
timeout_task_wait = NULL,
follow = NULL,
check_version = TRUE
)
queue_id
An identifier for the queue. This will prefix all
keys in redis, so a prefix might be useful here depending on
your use case (e.g. rrq:<user>:<id>
)
con
A redis connection. The default tries to create a redis
connection using default ports, or environment variables set as in
redux::hiredis()
timeout_task_wait
An optional default timeout to use when
waiting for tasks (e.g., with $task_wait()
, $tasks_wait()
,
$lapply()
, etc). If not given, then we fall back on the
global option rrq.timeout_task_wait
, and if that is not set,
follow
An optional default logical to use for tasks
that may (or may not) be retried. If not given we fall back
on the global option rrq.follow
, and if that is not set then
TRUE
(i.e., we do follow). The value follow = TRUE
is
potentially slower than follow = FALSE
for some operations
because we need to dereference every task id. If you never use
$task_retry
then this dereference never has an effect and we
can skip it. See vignette("fault-tolerance")
for more
information.
check_version
Check that the schema version is correct
to_v2()
Convert controller to the new-style object. Please don't use this in packages directly
destroy()
Entirely destroy a queue, by deleting all keys associated with it from the Redis database. This is a very destructive action and cannot be undone.
rrq_controller$destroy(
delete = TRUE,
worker_stop_type = "message",
timeout_worker_stop = 0
)
delete
Either TRUE
(the default) indicating that the
keys should be immediately deleted. Alternatively, provide an
integer value and the keys will instead be marked for future
deletion by "expiring" after this many seconds, using Redis'
EXPIRE
command.
worker_stop_type
Passed to $worker_stop
; Can be one of
"message", "kill" or "kill_local". The "kill" method requires that
the workers are using a heartbeat, and "kill_local" requires that
the workers are on the same machine as the controller. However,
these may be faster to stop workers than "message", which will
wait until any task is finished.
timeout_worker_stop
A timeout to pass to the worker to
respond the request to stop. See worker_stop
's timeout
argument for details.
envir()
Register a function to create an environment when creating a worker. When a worker starts, they will run this function.
create
A function that will create an environment. It will
be called with one parameter (an environment), in a fresh R
session. The function rrq_envir()
can be used to
create a suitable function for the most common case (loading
packages and sourcing scripts).
notify
Boolean, indicating if we should send a REFRESH
message to all workers to update their environment.
enqueue()
Queue an expression
rrq_controller$enqueue(
expr,
envir = parent.frame(),
queue = NULL,
separate_process = FALSE,
timeout_task_run = NULL,
depends_on = NULL,
export = NULL
)
expr
Any R expression, unevaluated
envir
The environment that you would run this expression in
locally. This will be used to copy across any dependent variables.
For example, if your expression is sum(1 + a)
, we will also send
the value of a
to the worker along with the expression.
queue
The queue to add the task to; if not specified the "default" queue (which all workers listen to) will be used. If you have configured workers to listen to more than one queue you can specify that here. Be warned that if you push jobs onto a queue with no worker, it will queue forever.
separate_process
Logical, indicating if the task should be
run in a separate process on the worker. If TRUE
, then the
worker runs the task in a separate process using the callr
package. This means that the worker environment is completely
clean, subsequent runs are not affected by preceding ones.
The downside of this approach is a considerable overhead in
starting the external process and transferring data back.
timeout_task_run
Optionally, a maximum allowed running time, in
seconds. This parameter only has an effect if separate_process
is TRUE
. If given, then if the task takes longer than this
time it will be stopped and the task status set to TIMEOUT
.
depends_on
Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue.
export
Optionally a list of variables to export for the
calculation. If given then no automatic analysis of the
expression is done. It should be either a named list (name
being the variable name, value being the value) or a
character vector of variables that can be found immediately
within envir
. Use this where you have already done analysis
of the expression (e.g., with the future package / globals)
or where you want to avoid moving large objects through Redis
that will be available on the remote workers due to how you
have configured your worker environment.
enqueue_()
Queue an expression
rrq_controller$enqueue_(
expr,
envir = parent.frame(),
queue = NULL,
separate_process = FALSE,
timeout_task_run = NULL,
depends_on = NULL,
export = NULL
)
expr
Any R expression, quoted; use this to use $enqueue
in a programmatic context where you want to construct expressions
directly (e.g., bquote(log(.(x)), list(x = 10))
envir
The environment that you would run this expression in
locally. This will be used to copy across any dependent variables.
For example, if your expression is sum(1 + a)
, we will also send
the value of a
to the worker along with the expression.
queue
The queue to add the task to; if not specified the "default" queue (which all workers listen to) will be used. If you have configured workers to listen to more than one queue you can specify that here. Be warned that if you push jobs onto a queue with no worker, it will queue forever.
separate_process
Logical, indicating if the task should be
run in a separate process on the worker (see $enqueue
for
details).
timeout_task_run
Optionally, a maximum allowed running time, in
seconds (see $enqueue
for details).
depends_on
Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue.
export
Optionally a list of variables to export for the
calculation. See $enqueue
for details.
lapply()
Apply a function over a list of data. This is
equivalent to using $enqueue()
over each element in the list.
rrq_controller$lapply(
X,
FUN,
...,
dots = NULL,
envir = parent.frame(),
queue = NULL,
separate_process = FALSE,
timeout_task_run = NULL,
depends_on = NULL,
timeout_task_wait = NULL,
time_poll = 1,
progress = NULL,
delete = FALSE,
error = FALSE
)
X
A list of data to apply our function against
FUN
A function to be applied to each element of X
...
Additional arguments to FUN
dots
As an alternative to ...
, you can provide the dots
as a list of additional arguments. This may be easier to program
against.
envir
The environment to use to try and find the function
queue
The queue to add the tasks to (see $enqueue
for
details).
separate_process
Logical, indicating if the task should be
run in a separate process on the worker (see $enqueue
for
details).
timeout_task_run
Optionally, a maximum allowed running time, in
seconds (see the timeout
argument of $enqueue
for details).
depends_on
Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.
timeout_task_wait
Optional timeout, in seconds, after which an
error will be thrown if all tasks have not completed. If given as
0
, then we return a handle that can be used to check for tasks
using bulk_wait
. If not given, falls back on the controller's
timeout_task_wait
(see $new()
)
time_poll
Optional time with which to "poll" for
completion (default is 1s, see $task_wait()
for details)
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
delete
Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.
error
Optional logical, indicating if an error in the task
should throw. Like $task_result()
the default is not to throw,
giving you back an rrq_task_error
object for each failing task.
If error = TRUE
we throw on error instead.
lapply_()
The "standard evaluation" version of $lapply()
.
This differs in how the function is found and how dots are passed.
With this version, both are passed by value; this may create more
overhead on the redis server as the values of the variables will
be copied over rather than using their names if possible.
rrq_controller$lapply_(
X,
FUN,
...,
dots = NULL,
envir = parent.frame(),
queue = NULL,
separate_process = FALSE,
timeout_task_run = NULL,
depends_on = NULL,
timeout_task_wait = NULL,
time_poll = 1,
progress = NULL,
delete = FALSE,
error = FALSE
)
X
A list of data to apply our function against
FUN
A function to be applied to each element of X
...
Additional arguments to FUN
dots
As an alternative to ...
, you can provide the dots
as a list of additional arguments. This may be easier to program
against.
envir
The environment to use to try and find the function
queue
The queue to add the tasks to (see $enqueue
for
details).
separate_process
Logical, indicating if the task should be
run in a separate process on the worker (see $enqueue
for
details).
timeout_task_run
Optionally, a maximum allowed running time, in
seconds (see the timeout
argument of $enqueue
for details).
depends_on
Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.
timeout_task_wait
Optional timeout, in seconds, after which an
error will be thrown if all tasks have not completed. If given as
0
, then we return a handle that can be used to check for tasks
using bulk_wait
. If not given, falls back on the controller's
timeout_task_wait
(see $new()
)
time_poll
Optional time with which to "poll" for
completion (default is 1s, see $task_wait()
for details)
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
delete
Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.
error
Optional logical, indicating if an error in the task
should throw. Like $task_result()
the default is not to throw,
giving you back an rrq_task_error
object for each failing task.
If error = TRUE
we throw on error instead.
enqueue_bulk()
Send a bulk set of tasks to your workers.
This function is a bit like a mash-up of Map and do.call,
when used with a data.frame argument, which is typically what
is provided. Rather than $lapply()
which applies FUN
to each
element of X
, enqueue_bulk will apply over each row of
X, spreading the columms out as arguments. If you have a function
f(a, b)and a [data.frame] with columns
aand
b` this
should feel intuitive.
rrq_controller$enqueue_bulk(
X,
FUN,
...,
dots = NULL,
envir = parent.frame(),
queue = NULL,
separate_process = FALSE,
timeout_task_run = NULL,
depends_on = NULL,
timeout_task_wait = NULL,
time_poll = 1,
progress = NULL,
delete = FALSE,
error = FALSE
)
X
Typically a data.frame, which you want to apply FUN
over, row-wise. The names of the data.frame
must match the
arguments of your function.
FUN
A function
...
Additional arguments to add to every call to FUN
dots
As an alternative to ...
, you can provide the dots
as a list of additional arguments. This may be easier to program
against.
envir
The environment to use to try and find the function
queue
The queue to add the tasks to (see $enqueue
for
details).
separate_process
Logical, indicating if the task should be
run in a separate process on the worker (see $enqueue
for
details).
timeout_task_run
Optionally, a maximum allowed running time, in
seconds (see the timeout
argument of $enqueue
for details).
depends_on
Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.
timeout_task_wait
Optional timeout, in seconds, after which an
error will be thrown if all tasks have not completed. If given as
0
, then we return a handle that can be used to check for tasks
using bulk_wait
. If not given, falls back on the controller's
timeout_task_wait
(see $new()
)
time_poll
Optional time with which to "poll" for
completion (default is 1s, see $task_wait()
for details)
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
delete
Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.
error
Optional logical, indicating if an error in the task
should throw. Like $task_result()
the default is not to throw,
giving you back an rrq_task_error
object for each failing task.
If error = TRUE
we throw on error instead.
enqueue_bulk_()
The "standard evaluation" version of $enqueue_bulk()
.
This differs in how the function is found and how dots are passed.
With this version, both are passed by value; this may create more
overhead on the redis server as the values of the variables will
be copied over rather than using their names if possible.
rrq_controller$enqueue_bulk_(
X,
FUN,
...,
dots = NULL,
envir = parent.frame(),
queue = NULL,
separate_process = FALSE,
timeout_task_run = NULL,
depends_on = NULL,
timeout_task_wait = NULL,
time_poll = 1,
progress = NULL,
delete = delete,
error = error
)
X
Typically a data.frame, which you want to apply FUN
over, row-wise. The names of the data.frame
must match the
arguments of your function.
FUN
A function
...
Additional arguments to add to every call to FUN
dots
As an alternative to ...
, you can provide the dots
as a list of additional arguments. This may be easier to program
against.
envir
The environment to use to try and find the function
queue
The queue to add the tasks to (see $enqueue
for
details).
separate_process
Logical, indicating if the task should be
run in a separate process on the worker (see $enqueue
for
details).
timeout_task_run
Optionally, a maximum allowed running time, in
seconds (see the timeout
argument of $enqueue
for details).
depends_on
Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.
timeout_task_wait
Optional timeout, in seconds, after which an
error will be thrown if all tasks have not completed. If given as
0
, then we return a handle that can be used to check for tasks
using bulk_wait
. If not given, falls back on the controller's
timeout_task_wait
(see $new()
)
time_poll
Optional time with which to "poll" for
completion (default is 1s, see $task_wait()
for details)
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
delete
Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.
error
Optional logical, indicating if an error in the task
should throw. Like $task_result()
the default is not to throw,
giving you back an rrq_task_error
object for each failing task.
If error = TRUE
we throw on error instead.
bulk_wait()
Wait for a group of tasks
rrq_controller$bulk_wait(
x,
timeout = NULL,
time_poll = 1,
progress = NULL,
delete = FALSE,
error = FALSE,
follow = NULL
)
x
An object of class rrq_bulk
, as created by $lapply()
timeout
Optional timeout, in seconds, after which an error will be thrown if the task has not completed.
time_poll
Optional time with which to "poll" for
completion (default is 1s, see $task_wait()
for details)
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
delete
Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.
error
Optional logical, indicating if an error in the task
should throw. Like $task_result()
the default is not to throw,
giving you back an rrq_task_error
object for each failing task.
If error = TRUE
we throw on error instead.
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
task_status()
Return a character vector of task statuses. The name of each element corresponds to a task id, and the value will be one of the possible statuses ("PENDING", "COMPLETE", etc).
task_ids
Optional character vector of task ids for which you
would like statuses. If not given (or NULL
) then the status of
all task ids known to this rrq controller is returned.
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
task_progress()
Retrieve task progress, if set. This will be NULL
if progress has never been registered, otherwise whatever value
was set - can be an arbitrary R object.
task_overview()
Provide a high level overview of task statuses
for a set of task ids, being the count in major categories of
PENDING
, RUNNING
, COMPLETE
and ERROR
.
task_position()
Find the position of one or more tasks in the queue.
task_ids
Character vector of tasks to find the position for.
missing
Value to return if the task is not found in the queue.
A task will take value missing
if it is running, complete,
errored, deferred etc and a positive integer if it is in the queue,
indicating its position (with 1) being the next task to run.
queue
The name of the queue to query (defaults to the "default" queue).
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
task_preceeding()
List the tasks in front of task_id
in the queue.
If the task is missing from the queue this will return NULL. If
the task is next in the queue this will return an empty character
vector.
task_id
Task to find the position for.
queue
The name of the queue to query (defaults to the "default" queue).
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
task_result()
Get the result for a single task (see $tasks_result
for a method for efficiently getting multiple results at once).
Returns the value of running the task if it is complete, and an
error otherwise.
task_id
The single id for which the result is wanted.
error
Logical, indicating if we should throw an error
if a task was not successful. By default (error = FALSE
),
in the case of the task result returning an error we return
an object of class rrq_task_error
, which contains information
about the error. Passing error = TRUE
simply calls stop()
on this error if it is returned.
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
tasks_result()
Get the results of a group of tasks, returning them as a list.
task_ids
A vector of task ids for which the task result is wanted.
error
Logical, indicating if we should throw an error if
the task was not successful. See $task_result()
for details.
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
task_wait()
Poll for a task to complete, returning the result
when completed. If the task has already completed this is
roughly equivalent to task_result
. See $tasks_wait
for an
efficient way of doing this for a group of tasks.
rrq_controller$task_wait(
task_id,
timeout = NULL,
time_poll = 1,
progress = NULL,
error = FALSE,
follow = NULL
)
task_id
The single id that we will wait for
timeout
Optional timeout, in seconds, after which an
error will be thrown if the task has not completed. If not given,
falls back on the controller's timeout_task_wait
(see $new()
)
time_poll
Optional time with which to "poll" for completion. By default this will be 1 second; this is the time that each request for a completed task may block for (however, if the task is finished before this, the actual time waited for will be less). Increasing this will reduce the responsiveness of your R session to interrupting, but will cause slightly less network load. Values less than 1s are not currently supported as this requires a very recent Redis server.
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
error
Logical, indicating if we should throw an error if
the task was not successful. See $task_result()
for details.
Note that an error is always thrown if not all tasks are fetched
in time.
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
tasks_wait()
Poll for a group of tasks to complete, returning the
result as list when completed. If the tasks have already completed
this is roughly equivalent to tasks_result
.
rrq_controller$tasks_wait(
task_ids,
timeout = NULL,
time_poll = 1,
progress = NULL,
error = FALSE,
follow = NULL
)
task_ids
A vector of task ids to poll for
timeout
Optional timeout, in seconds, after which an
error will be thrown if the task has not completed. If not given,
falls back on the controller's timeout_task_wait
(see $new()
)
time_poll
Optional time with which to "poll" for
completion (default is 1s, see $task_wait()
for details)
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
error
Logical, indicating if we should throw an error if
the task was not successful. See $task_result()
for details.
Note that an error is always thrown if not all tasks are fetched
in time.
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
task_cancel()
Cancel a single task. If the task is PENDING
it
will be unqueued and the status set to CANCELED
. If RUNNING
then the task will be stopped if it was set to run in
a separate process (i.e., queued with separate_process = TRUE
).
Dependent tasks will be marked as impossible.
task_info()
Return information about a task. This currently includes information about where a task is (or was) running and information about any retry chain, but will expand in future. The format of the output here is subject to change (and will probably get a nice print method) but the values present in the output will be included in any future update.
task_times()
Fetch times for tasks at points in their life cycle.
For each task returns the time of submission, starting
and completion (not necessarily successfully; this includes
errors and interruptions). If a task has not reached a point
yet (e.g., submitted but not run, or running but not finished)
the time will be NA
). Times are returned in unix timestamp
format in UTC; you can use redux::redis_time_to_r to convert
them to a POSIXt object.
task_ids
Task ids to fetch times for.
follow
Optional logical, indicating if we should follow any
redirects set up by doing $task_retry
. If not given, falls
back on the value passed into the queue, the global option
rrq.follow
, and finally TRUE
. Set to FALSE
if you want to
return information about the original task, even if it has been
subsequently retried.
task_retry()
Retry a task (or set of tasks). Typically this
is after failure (e.g., ERROR
, DIED
or similar) but you can
retry even successfully completed tasks. Once retried, methods
that retrieve information about a task (e.g., $task_status()
,
$task_result()
) will behave differently depending on the value
of their follow
argument. See vignette("fault-tolerance")
for more details.
deferred_list()
Return deferred tasks and what they are waiting on. Note this is in an arbitrary order, tasks will be added to the queue as their dependencies are satisfied.
worker_info()
Returns a list of information about active
workers (or exited workers if worker_ids
includes them).
worker_log_tail()
Returns the last (few) elements in the worker
log. The log will be returned as a data.frame of entries
worker_id
(the worker id), child
(the process id, an integer,
where logs come from a child process from a task queued with
separate_process = TRUE
), time
(the time in Redis when the
event happened; see redux::redis_time to convert this to an R
time), command
(the worker command) and message
(the message
corresponding to that command).
worker_stop()
Stop workers.
rrq_controller$worker_stop(
worker_ids = NULL,
type = "message",
timeout = 0,
time_poll = 0.05,
progress = NULL
)
worker_ids
Optional vector of worker ids. If NULL
then
all active workers will be stopped.
type
The strategy used to stop the workers. Can be message
,
kill
or kill_local
(see details).
timeout
Optional timeout; if greater than zero then we poll
for a response from the worker for this many seconds until they
acknowledge the message and stop (only has an effect if type
is message
). If a timeout of greater than zero is given, then
for a message
-based stop we wait up to this many seconds for the
worker to exit. That means that we might wait up to 2 * timeout
seconds for this function to return.
time_poll
If type
is message
and timeout
is greater
than zero, this is the polling interval used between redis calls.
Increasing this reduces network load but decreases the ability
to interrupt the process.
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
The type
parameter indicates the strategy used to stop
workers, and interacts with other parameters. The strategies used by
the different values are:
message
, in which case a STOP
message will be sent to the
worker, which they will receive after finishing any currently
running task (if RUNNING
; IDLE
workers will stop immediately).
kill
, in which case a kill signal will be sent via the heartbeat
(if the worker is using one). This will kill the worker even if
is currently working on a task, eventually leaving that task with
a status of DIED
.
kill_local
, in which case a kill signal is sent using operating
system signals, which requires that the worker is on the same
machine as the controller.
worker_process_log()
Return the contents of a worker's process log, if it is located on the same physical storage (including network storage) as the controller. This will generally behave for workers started with rrq_worker_spawn but may require significant care otherwise.
worker_config_save()
Save a worker configuration, which can be used to start workers with a set of options with the cli. These correspond to arguments to rrq_worker.
name
Name for this configuration
config
A worker configuration, created by
rrq_worker_config()
overwrite
Logical, indicating if an existing configuration
with this name
should be overwritten if it exists. If FALSE
,
then the configuration is not updated, even if it differs from
the version currently saved.
worker_config_read()
Return the value of a of worker configuration saved by
$worker_config_save
message_send()
Send a message to workers. Sending a message returns
a message id, which can be used to poll for a response with the
other message_*
methods.
message_get_response()
Get response to messages, waiting until the message has been responded to.
rrq_controller$message_get_response(
message_id,
worker_ids = NULL,
named = TRUE,
delete = FALSE,
timeout = 0,
time_poll = 0.05,
progress = NULL
)
message_id
The message id
worker_ids
Optional vector of worker ids. If NULL
then
all active workers are used (note that this may differ to the set
of workers that the message was sent to!)
named
Logical, indicating if the return value should be named by worker id.
delete
Logical, indicating if messages should be deleted after retrieval
timeout
Integer, representing seconds to wait until the response has been received. An error will be thrown if a response has not been received in this time.
time_poll
If timeout
is greater
than zero, this is the polling interval used between redis calls.
Increasing this reduces network load but increases the time that
may be waited for.
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
message_send_and_wait()
Send a message and wait for responses.
This is a helper function around message_send
and
message_get_response
.
rrq_controller$message_send_and_wait(
command,
args = NULL,
worker_ids = NULL,
named = TRUE,
delete = TRUE,
timeout = 600,
time_poll = 0.05,
progress = NULL
)
command
A command, such as PING
, PAUSE
; see the Messages
section of the Details for al messages.
args
Arguments to the command, if supported
worker_ids
Optional vector of worker ids to send the message
to. If NULL
then the message will be sent to all active workers.
named
Logical, indicating if the return value should be named by worker id.
delete
Logical, indicating if messages should be deleted after retrieval
timeout
Integer, representing seconds to wait until the response has been received. An error will be thrown if a response has not been received in this time.
time_poll
If timeout
is greater
than zero, this is the polling interval used between redis calls.
Increasing this reduces network load but increases the time that
may be waited for.
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.