Getting started
This document assumes that that you have a Redis server running. If not, see the bottom of the document for options you will have for installing this on your own system. You can test if your Redis server is behaving as expected by running
redux::hiredis()$PING()
#> [Redis: PONG]
If you get an error like “Connection refused” then check your installation.
The package is designed to be easy to get started with, and has features that you might like to use later. If you run the “Hello world” section you probably have 90% of what you need - the result of the document will show features that will help you bend that around your specific needs.
Hello world
Without any great explanation, here is the basic approach to using rrq to run a task on another R process, asynchronously. First, we create a “controller” object which you can use for queuing tasks.
library(rrq)
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
This controller uses an “identifier” (here, id
is
rrq:d87ae841
) which can be anything you want but acts like
a folder within the Redis server, distinguishing your queue from any
others hosted on the same server.
We will set this a default controller to use, which means we can
avoid passing in controller = obj
to all the calls
below:
Submit a task to the queue with rrq_task_create_expr()
,
returning a key for hat task
t <- rrq_task_create_expr(1 + 1)
t
#> [1] "e570f99803eb3c90bd37fcdfde5db9e2"
We’ll also need some worker processes to carry out our tasks. Here, we’ll spawn two for now (see the section below on alternatives to this)
w <- rrq_worker_spawn(2)
#> ℹ Spawning 2 workers with prefix 'nonspheric_siberiantiger'
+Wait for the task to complete:
rrq_task_wait(t)
#> [1] TRUE
then retrieve the result
rrq_task_result(t)
#> [1] 2
Things to note here:
- Every task is given a unique identifier, which can be used to query the task later (much more on this below)
- The process with the controller is not blocked during this operation and could carry out any other calculation it wanted (it could even be closed and the controller recreated later)
- We submitted the tasks before we had workers ready; we could have done this in the other order, and workers can be added and removed at will regardless of the state of the queue (see below)
Design
For years, the parallel
package has provided users with
the ability to run tasks in parallel with very little setup. Given a
list of data x
and some function f
, one can
change from the serial code
lapply(x, fun)
to run in parallel given a cluster object cl
parallel::parLapply(cl, x, fun)
(the even simpler parallel::mclapply(x, fun)
can be used
on platforms other than Windows with reasonable success). Nice as this
is it suffers some drawbacks, most of which follow from the simple
blocking interface:
- The R session is blocked while the calculations run, even though the host session is doing (essentially) nothing
- The number of workers (size of the cluster) is fixed at the point
where
parLapply
has been called, and is restricted to a single node without considerable effort. One cannot add workers to the cluster while it runs, or remove unneeded ones as tasks finish. - It is not possible to retrieve partially completed results - we have to wait for all the tasks to complete before working with any
- If tasks are of uneven size, the load balancing form
(
parallel::parLapplyLB
) is quite slow
As such it is hard to build interfaces like queues or work through dependency graphs (though see heroic work in future and targets). Attempts at doing this run into issues of where do you store the data and the queue in such a way that you can safely have multiple worker processes interacting with the queue without corrupting the database or hitting race conditions. Approaches like liteq may not work on network file systems, and therefore become limited to a single node.
At the other end of the scale, HPC systems with their schedulers can avoid all these issues, but with byzantine interfaces and slow per-task submission.
Notable features of rrq
which motivate its development
within this landscape:
- Uses Redis as a task broker; this provides a database to hold tasks that allows very fast network based access to many processes
- Uses a non-blocking design where control is returned to the controller process as immediately while tasks run in the background
- Supports a scalable worker pool; no workers are needed for tasks to be queued, and workers can be added or removed at will while tasks are running
- Allows for multiple non-interacting queues, multiple priority levels within a queue, and automatically resolved dependencies among completing tasks
- Supports a fully load-balanced design with overheads of around 1ms per task
- Supports optionally running each task in a separate process, allowing for strong isolation between tasks as well as per-task timeouts, logging, and allowing cancellation
- Allows rich querying of task status and progress and worker history
- Exposes both low-level primitives for working with individual tasks
as well as higher level interfaces that mimic functions like
lapply
Running tasks
Running tasks is a little different to many R-parallel backends,
because we do not directly allocate tasks to our workers, but simply
place it on first-in-first-out task queue. A pool of workers will then
poll for work from this queue. See vignette("design")
for
more on this.
Consider enqueing this expression
t <- rrq_task_create_expr({
Sys.sleep(2)
runif(1)
})
This has created a task that will sleep for 2 seconds then return a random number
Initially the task has status RUNNING
(it will be
PENDING
very briefly):
rrq_task_status(t)
#> [1] "RUNNING"
Then after a couple of seconds it will complete (we pad this out here so that it will complete even on slow systems)
Sys.sleep(3)
rrq_task_result(t)
#> [1] 0.8871164
The basic task lifecycle is this:
- when created/submitted a task becomes
PENDING
- when it is picked up by a worker it becomes
RUNNING
- when it completes it will become
COMPLETE
orERROR
In addition there are rarer ways a task can end
(CANCELLED
, DIED
, TIMEOUT
) or
fail to start due to dependencies between tasks (DEFERRED
or IMPOSSIBLE
; see below).
A task in any terminal state (except IMPOSSIBLE
, so
COMPLETE
, ERROR
, CANCELLED
,
DIED
or TIMEOUT
) can be retried, at which
point the status is MOVED
and the task will “point”
somewhere else (that task will move through the usual
PENDING -> RUNNING -> (terminal state)
flow). See
vignette("fault-tolerance")
for details.
Above, we slept for a few seconds in order for the task to become
finished. However, this is an extremely common operation, so
rrq
provides a function rrq_task_wait()
which
will wait until a task finishes, then returns a logical indicating if
the task succeeded or not, after which you can fetch the result with
rrq_task_result()
.
t <- rrq_task_create_expr({
Sys.sleep(2)
runif(1)
})
rrq_task_wait(t)
#> [1] TRUE
rrq_task_result(t)
#> [1] 0.992196
The polling interval here is 1 second by default, but if the task completes within that period it will still be returned as soon as it is complete (the interval is just the time between progress bar updates and the period where an interrupt would be caught to cancel the wait).
Configuring the worker environment
It is rare that we want our workers to run in completely empty R environments (no extra loaded packages, no custom functions available). Quite often you will want to run something to configure the workers before they accept tasks.
In order to do this, first define a function that will accept one argument which will be the environment that the worker will use, and then set that environment up.
For the most common case, where you have script files that contain
function definitions and you have a set of packages to load, rrq has a
helper function rrq_envir()
.
So, for example, suppose we want to source a file “myfuns.R” which contains some code
slowdouble <- function(x) {
Sys.sleep(x)
x * 2
}
We might write:
create <- rrq_envir(sources = "myfuns.R")
The next step is to register this function for your queue:
rrq_worker_envir_set(create)
By default, this will notify all running workers to update their environment. Note that if your function errors in any way, your workers will exit!
rrq_worker_log_tail(n = 4)
#> worker_id child time command message
#> 1 nonspheric_siberiantiger_1 NA 1726673864 MESSAGE REFRESH
#> 2 nonspheric_siberiantiger_2 NA 1726673864 MESSAGE REFRESH
#> 3 nonspheric_siberiantiger_1 NA 1726673864 ENVIR new
#> 4 nonspheric_siberiantiger_2 NA 1726673864 ENVIR new
#> 5 nonspheric_siberiantiger_2 NA 1726673864 ENVIR create
#> 6 nonspheric_siberiantiger_1 NA 1726673864 ENVIR create
#> 7 nonspheric_siberiantiger_2 NA 1726673864 RESPONSE REFRESH
#> 8 nonspheric_siberiantiger_1 NA 1726673864 RESPONSE REFRESH
Now our workers have picked up our functions we can start using them:
t <- rrq_task_create_expr(slowdouble(1))
rrq_task_wait(t)
#> [1] TRUE
If you need more control you can write your own function. We could
have written create
as
create <- function(env) {
sys.source("myfuns.R", env)
}
This approach would also allow you do do something like read an rds or Rdata file containing a large object that you want every worker to have a copy of.
Scheduling options
The rrq
package does not aspire to be a fully fledged
scheduler, but sometimes a little more control than first-in-first-out
is required. There are a few options available that allow the user to
control how tasks are run when needed. These involve:
- blocking tasks so that they run only after other tasks are completed (e.g., a set up task followed by a series of computational tasks, followed by an aggregation task)
- multiple queues with different priorities, or that different workers subscribe to (e.g., fast/slow high-priority/low-priority tasks, or tasks that require a limited resource)
- put tasks in the front of the queue, allowing a last-in-first-out queue
Tasks that depend on other tasks
We support a simple system for allowing tasks to depend on other tasks. An example of this might be where you need to download a file, then run a series of analyses on it. Or where you want to run an analysis over a set of parameters, and then aggregate once they’re all done. How the output of one task feeds into the others is up to you, but practically this will require one of the following options:
- write your results to disk and read them back
- store results in a database (e.g., Redis!) at the end of one task, read them in the next
- connect to the queue from your task itself
When queueing a task, you can provide a vector of task identifiers as
the depends_on
argument. These identifiers must all be
known to rrq
and the task will not be started until all
these prerequisites are completed. The task lifecycle will look
different to the above; rather than starting as PENDING
the
task begins as DEFERRED
.
Once all prerequisites are complete, a task becomes possible and it
moves from DEFERRED
to PENDING
. It will be
placed at the front of the queue.
If a prerequisite task fails for any reason (an error, is cancelled,
or its worker dies) then the task will become
IMPOSSIBLE
.
For example, suppose that we have code:
create <- function(n) {
saveRDS(runif(n), "numbers.rds")
}
use <- function(i) {
d <- readRDS("numbers.rds")
d[[i]]
}
Here we have some function create
that we want to run
first, doing some setup, then another function use
that we
want to run after which will read the result of running
create
and do some analysis on it.
Create an rrq_controller
object and tell workers to read
the deps.R
file which contains these function
definitions
obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4)))
rrq_default_controller_set(obj)
rrq_worker_envir_set(rrq_envir(sources = "deps.R"))
source("deps.R")
We can then enqueue our first task:
id <- rrq_task_create_expr(create(5))
Then use this id
id_use <- rrq_task_create_expr(use(2), depends_on = id)
The status of the first task will be PENDING
, per
usual:
rrq_task_status(id)
#> [1] "PENDING"
however, the second task will be DEFERRED
because it is
not yet in the queue:
rrq_task_status(id_use)
#> [1] "DEFERRED"
rrq_queue_list()
#> [1] "eca044a6a34e418010557c74577c7ec8"
Once the first task is processed by a worker, the status changes:
rrq_task_status(id)
#> [1] "COMPLETE"
rrq_task_status(id_use)
#> [1] "PENDING"
rrq_queue_list()
#> [1] "2408d04a86de82026ad5efe254d6d131"
At this point the second task will proceed through the queue as usual.
Points to note here:
- The deferred tasks cannot access the queued result of the blocking task (see above)
- The deferred tasks will be added to the front of the queue after they become undeferred
- The deferred tasks can’t be queued until the blocking tasks have returned an id
Multiple queues
Sometimes it is useful to have different workers listen on different queues. For example, you may have workers on different machines with different capabilities (e.g., a machine with a GPU or high memory). You may have tasks that are expected to take quite a long time but want some workers to monitor a fast queue with short lived tasks.
Every worker listens to the default
queue, but when
starting a worker, you can add additional queues and control the
priority order of these queues for that worker. When submitting tasks
you then specify the queue that the task sits in.
The easiest way to configure this is to save a worker configuration:
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
rrq_worker_config_save(
"short",
rrq_worker_config(queue = "short"))
rrq_worker_config_save(
"all",
rrq_worker_config(queue = c("short", "long")))
rrq_worker_config_list()
#> [1] "short" "all" "localhost"
Above, we create two configurations: “short” which just listens on
the queue short
, and all
which listens both on
the short and long task queues (note that both these workers will also
listen on the default queue).
w_short <- rrq_worker_spawn(name_config = "short")
#> ℹ Spawning 1 worker with prefix 'intercorporate_oriole'
w_all <- rrq_worker_spawn(name_config = "all")
#> ℹ Spawning 1 worker with prefix 'evileyed_fallowdeer'
We can then submit a long task to the worker:
id_long1 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long")
id_long2 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long")
After the workers have had the ability to pick up work, our “short” worker is still available:
rrq_worker_status()
#> intercorporate_oriole_1 evileyed_fallowdeer_1
#> "IDLE" "BUSY"
So we can submit tasks to this short queue and have them processed
id <- rrq_task_create_expr(runif(1), queue = "short")
rrq_task_wait(id, timeout = 10)
#> [1] TRUE
Note that there is no validation to check that any worker is listening on any queue when you submit a task. Indeed there can’t be as new workers can be added at any time (so at the point of submission perhaps there were no workers).
Running tasks in separate processes
Running a task in a separate process offers some additional features at a cost of a little more overhead per task.
The cost is that we have to launch an additional process for every
task run. We use callr
for this to
smooth over a number of rough edges, but this does impose a minimum
overhead of about 0.1s per task, plus the cost of loading any packages
that your task might need (if you use packages that make heavy use of
things like S4 classes this can easily extend to a few seconds).
The additional features that it provides are:
- Per-task isolation: because every task runs in a separate process it works in a fresh environment and is isolated from all other tasks
- Cancellable tasks: you can stop a running task and the worker will gracefully pick up additional work
- Per-task timeout: you can specify the maximum running time of any task and if the task exceeds it, it will be killed
- Per-task logs, via
rrq_task_log()
The sorts of tasks that benefit from this sort of approach are typically long-running (expected running times in the 10s of seconds or more) so that the overhead is low, but also the features of cancellation and timeouts become more useful. We have also seen this used usefully where the task may leak memory, or cache results aggressively - over time this would cause the worker process to consume more memory until the worker process was killed by the operating system.
To use a separate process, add separate_process = TRUE
to calls to rrq_task_create_expr()
. This will then enable
the argument timeout
to have an effect, as well as
rrq_task_cancel()
.
Coping with memory use
The data for each task, and the task result itself, is saved in Redis. This is alongside the typically much smaller metadata required to run rrq. Because Redis is an in memory database, this means that some things will not be a great idea; for example sending off 1000 tasks that will each write back 100 MB of simulation output would try and write 100 GB of data into the Redis database which may cause issues for your server!
To allow for this workflow, rrq
supports configuring its
object store (rrq::object_store
) so that objects above a
certain size are written out elsewhere. Currently, the only “elsewhere”
supported is to disk with the assumption that the controller and all
workers share a filesystem. The approach used is safe for multiple
concurrent processes, including over network mounted filesystems.
To configure this you must use rrq_configure()
before attaching either a controller or a worker to the queue.
The configuration interface here will change in future, but we will
maintain backward compatibility with the current options.
To configure storage so that every object that is greater than 1KB is saved to disk, you could write:
id <- paste0("rrq:", ids::random_id(bytes = 4))
path <- tempfile()
rrq_configure(id, store_max_size = 1000, offload_path = path)
then we create the queue object as normal (and spawn a worker so that we can use it)
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
w <- rrq_worker_spawn(1)
#> ℹ Spawning 1 worker with prefix 'hexaemeric_turnstone'
It’s not hard at all to get to 1KB of data, we can do that by simulating a big pile of random numbers:
t <- rrq_task_create_expr(runif(200000))
Once the task has finished, data will be stored on disk below the path given above:
dir(path)
#> [1] "913f17a3c43ebe76945541a6cab1e587"
This keeps the larger objects out of the database.
Orchestrating workers
This vignette uses the very basic rrq_worker_spawn()
method to create workers on your local machine. This is intended
primarily for development only, though it may be useful in some
situations. There are other options available, depending on how you want
to use rrq
.
Use (and limitations) of spawn
The simplest way of getting started with rrq
is to use
rrq_worker_spawn()
, as above. This approach has several
nice features; it uses callr
, so no extra work is required
to make the worker R session behave like the controller session (it will
find your environment variables, library, and working directory), and it
behaves the same way on all platforms (compare below). However, the
workers will disappear when the controlling session completes (this is
either a good or a bad thing) and you will be limited to a single
node.
Start workers on another node, perhaps using a scheduler
There are two issues here; one is the technical details of launching your rrq workers on the cluster, and the other is the details around whether your HPC admins would like you to (and the security implications of doing so).
If you are using rrq with an HPC system, then you will want to schedule workers onto the system. The details here will change
The basic approach is to write out a launcher script somewhere:
rrq_worker_script(dest)
This can be called from the command line:
$ ./rrq_worker --help
Usage:
rrq_worker [options] <id>
Options:
--config=NAME Name of a worker configuration [default: localhost]
--name=NAME Name of the worker (optional)
This is a bash script that can then be called from whatever cluster job scheduler you use. The important things to pass through are:
-
id
the only positional argument, which is the queue id -
--config=NAME
allows controlling of the named worker config (set viarrq_worker_config_save()
, and allowing changing of timeout, verbosity and queues)
In addition, you may need to change the configuration type. If you
need to control redis access you should set the REDIS_URL
environment variable to point at your Redis server.
Use docker
We provide a docker image that you can use (mrcide/rrq
),
though typically you would want to extend this image to include your own
packages. Alternatively create your own docker image (see the
main dockerfile but replace COPY . /src
with an
installation of rrq) and the image
that sets the entrypoint to call rrq_worker
.
We use rrq to orchestrate workers in web applications where a number of workers carry out long running calculations for a HTTP API written using plumber and porcelain.
Waiting for workers to appear
If you have submitted workers via a task scheduler, you might want to
block and wait for them to become available. You can do this using the
rrq_worker_wait()
function.
We first create a vector of names for the new workers, and then tell
rrq
that we’re going to produce these workers.
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
worker_ids <- c("cluster_1", "cluster_2")
How you then start the workers is up to you; you might start them at the command line with
rrq_worker --worker-id cluster_1 rrq:b5823454
rrq_worker --worker-id cluster_2 rrq:b5823454
(into separate terminals). Or you might queue these jobs with a cluster scheduler such as Slurm or PBS (with appropriate care over the working directory). But you can then immediately, in your R console write:
rrq_worker_wait(worker_ids, controller = obj)
and your session will block and wait for the workers to appear, erroring if they do not appear in time.
Worker heartbeat
If you use many workers, particularly on different machines, you may not notice if some disappear. Possible causes of this include:
- Your task crashes a worker (e.g., you run some C++ code that segfaults)
- Your worker is killed by the operating system (e.g., the system is running low on RAM and your process is targeted by the out-of-memory killer)
- The machine reboots or shuts down
- The redis server goes down temporarily, causing the worker to exit when writing back results
By default, if this happens when your worker is running a task, that
task status will forever be stuck in RUNNING
.
rrq
provides a simple heartbeat process, if requested,
to detect when a worker has disappeared. To do this, we run a second
process on each worker that periodically writes to the Redis database on
a key that will expire in a time slightly longer than that period, in
effect making a dead man’s
switch - see rrq::rrq_heartbeat
for details.
To enable the heartbeat, save a worker configuration with the
heartbeat_period
set to some number of seconds. Below we
use 2 seconds so that this example runs reasonably quickly, but in
practice something like 60 might be slightly less load on your Redis
server.
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
res <- rrq_worker_config_save(
"localhost",
rrq_worker_config(heartbeat_period = 2))
Then, launch a worker
w <- rrq_worker_spawn(1)
#> ℹ Spawning 1 worker with prefix 'bitumen_icelandicsheepdog'
Our worker will print information indicating that the heartbeat is
enabled (use rrq_worker_process_log()
)
#> [2024-09-18 15:37:50.708754] HEARTBEAT rrq:9b3cfc34:worker:bitumen_icelandicsheepdog_1:heartbeat
#> [2024-09-18 15:37:50.955803] HEARTBEAT OK
#> [2024-09-18 15:37:50.964211] ALIVE
#> [2024-09-18 15:37:50.964501] ENVIR new
#> [2024-09-18 15:37:50.964871] QUEUE default
#> __
#> ______________ _/ /
#> ______ / ___/ ___/ __ `/ /_____
#> /_____/ / / / / / /_/ /_/_____/
#> ______ /_/ /_/ \__, (_) ______
#> /_____/ /_/ /_____/
#> worker: bitumen_icelandicsheepdog_1
#> config: localhost
#> rrq_version: 0.7.19
#> platform: x86_64-pc-linux-gnu
#> running: Ubuntu 22.04.4 LTS
#> hostname: fv-az2026-9
#> username: runner
#> queue: rrq:9b3cfc34:queue:default
#> wd: /home/runner/work/rrq/rrq/vignettes
#> pid: 7553
#> redis_host: 127.0.0.1
#> redis_port: 6379
#> heartbeat_key: rrq:9b3cfc34:worker:bitumen_icelandicsheepdog_1:heartbeat
We also have a heartbeat key here that we can inspect:
info <- rrq_worker_info()[[1]]
obj$con$EXISTS(info$heartbeat_key)
#> [1] 1
obj$con$PTTL(info$heartbeat_key) # in milliseconds
#> [1] 5789
We queue some slow job onto the worker:
t <- rrq_task_create_expr({
Sys.sleep(10)
runif(1)
})
Then we kill the worker:
tools::pskill(rrq_worker_info()[[1]]$pid)
Of course, immediately our key still exists:
obj$con$EXISTS(info$heartbeat_key)
#> [1] 1
but eventually it will expire:
Sys.sleep(6)
obj$con$EXISTS(info$heartbeat_key)
#> [1] 0
So far as rrq
is concerned, at this point your task is
still running
rrq_task_status(t)
#> [1] "RUNNING"
Handling this situation is still completely manual. You can detect lost workers jobs with:
rrq_worker_detect_exited()
#> Lost 1 worker:
#> - bitumen_icelandicsheepdog_1
#> Orphaning 1 task:
#> - a7254809d1b1c35541c293eaa38417f4
this will also “orphan” the task
rrq_task_status(t)
#> [1] "DIED"
Any tasks that were dependent on this task will now be marked as
IMPOSSIBLE
.
In a future version we will support automatic re-queuing of jobs assigned to disappeared workers.
Getting a Redis server
There are several options to get started with Redis, the best one will likely depend on your platform and needs.
Use docker
(Linux, macOS with docker desktop, Windows with docker desktop)
This is how we develop rrq because it’s easy to destroy and recreate the redis instance. Start the docker redis container like:
docker run --name redis --rm -d -p 127.0.0.1:6379:6379 redis
This will listen on port 6379 which is the Redis default. You can
stop the container (deleting all data) with
docker stop redis
Install Redis
On Linux this is fairly straightforward, either by downloading and building the source
code or by installing via apt
or snap
On macOS the source will compile, or you can install a redis server via homebrew
On Windows you can install redis via WSL. There have also been various ports.
Use Redis on a different machine
If you have redis running on a different machine (this will be the
case if you’re using redis to distribute tasks over a number of
different machines) you will need to tell rrq
and
redux
where to find it. The simplest way is to set the
environment variable REDIS_HOST
to the name of the machine
if it is running with default ports, or set REDUX_URL
if
you need more control. Alternatively, when connecting to the server
above, you can manually construct your redux::hiredis
object and pass in any configuration option you need; see the
documentation for redux::redis_config
for details.