This document assumes that that you have a Redis server running. If not, see the bottom of the document for options you will have for installing this on your own system. You can test if your Redis server is behaving as expected by running
redux::hiredis()$PING()
#> [Redis: PONG]
If you get an error like “Connection refused” then check your installation.
The package is designed to be easy to get started with, and has features that you might like to use later. If you run the “Hello world” section you probably have 90% of what you need - the result of the document will show features that will help you bend that around your specific needs.
Without any great explanation, here is the basic approach to using rrq to run a task on another R process, asynchronously. First, we create a “controller” object which you can use for queuing tasks.
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq::rrq_controller$new(id)
This controller uses an “identifier” (here, id
is
rrq:a62cd485
) which can be anything you want but acts like
a folder within the Redis server, distinguishing your queue from any
others hosted on the same server.
Submit a task to the queue with the $enqueue()
method,
returning a key for that task
t <- obj$enqueue(1 + 1)
t
#> [1] "e118f989df576738079a8f0abd14d89f"
We’ll also need some worker processes to carry out our tasks. Here, we’ll spawn two for now (see the section below on alternatives to this)
w <- rrq::rrq_worker_spawn(obj, 2)
#> Spawning 2 workers with prefix doggoned_lamb
Retrieve the result, polling if needed:
obj$task_wait(t, progress = FALSE)
#> [1] 2
Things to note here:
For years, the parallel
package has provided users with
the ability to run tasks in parallel with very little setup. Given a
list of data x
and some function f
, one can
change from the serial code
lapply(x, fun)
to run in parallel given a cluster object cl
parallel::parLapply(cl, x, fun)
(the even simpler parallel::mclapply(x, fun)
can be used
on platforms other than Windows with reasonable success). Nice as this
is it suffers some drawbacks, most of which follow from the simple
blocking interface:
parLapply
has been called, and is restricted to a
single node without considerable effort. One cannot add workers to the
cluster while it runs, or remove unneeded ones as tasks finish.parallel::parLapplyLB
) is quite slowAs such it is hard to build interfaces like queues or work through dependency graphs (though see heroic work in future and targets). Attempts at doing this run into issues of where do you store the data and the queue in such a way that you can safely have multiple worker processes interacting with the queue without corrupting the database or hitting race conditions. Approaches like liteq may not work on network file systems, and therefore become limited to a single node.
At the other end of the scale, HPC systems with their schedulers can avoid all these issues, but with byzantine interfaces and slow per-task submission.
Notable features of rrq
which motivate its development
within this landscape:
lapply
Running tasks is a little different to many R-parallel backends,
because we do not directly allocate tasks to our workers, but simply
place it on first-in-first-out task queue. A pool of workers will then
poll for work from this queue. See vignette("design")
for
more on this.
Consider enqueing this expression
This has created a task that will sleep for 2 seconds then return a random number
Initially the task has status RUNNING
(it will be
PENDING
very briefly):
obj$task_status(t)
#> 6c821359e497a41593f0f807898754a0
#> "RUNNING"
Then after a couple of seconds it will complete (we pad this out here so that it will complete even on slow systems)
Sys.sleep(3)
obj$task_result(t)
#> [1] 0.5709851
The basic task lifecycle is this:
PENDING
RUNNING
COMPLETE
or
ERROR
In addition there are rarer ways a task can end
(CANCELLED
, DIED
, TIMEOUT
) or
fail to start due to dependencies between tasks (DEFERRED
or IMPOSSIBLE
; see below).
A task in any terminal state (except IMPOSSIBLE
, so
COMPLETE
, ERROR
, CANCELLED
,
DIED
or TIMEOUT
) can be retried, at which
point the status is MOVED
and the task will “point”
somewhere else (that task will move through the usual
PENDING -> RUNNING -> (terminal state)
flow). See
vignette("fault-tolerance")
for details.
Above, we slept for a few seconds in order for the task to become
finished. However, this is an extremely common operation, so
rrq
provides a $task_wait
method which will
wait until a task finishes, then returns the result.
The polling interval here is 1 second by default, but if the task completes within that period it will still be returned as soon as it is complete (the interval is just the time between progress bar updates and the period where an interrupt would be caught to cancel the wait).
Once a task is complete, $task_wait
and
$task_result
are equivalent
obj$task_wait(t)
#> [1] 0.08547447
obj$task_result(t)
#> [1] 0.08547447
It is rare that we want our workers to run in completely empty R environments (no extra loaded packages, no custom functions available). Quite often you will want to run something to configure the workers before they accept tasks.
In order to do this, first define a function that will accept one argument which will be the environment that the worker will use, and then set that environment up.
For the most common case, where you have script files that contain
function definitions and you have a set of packages to load, rrq has a
helper function rrq::rrq_envir
.
So, for example, suppose we want to source a file “myfuns.R” which contains some code
slowdouble <- function(x) {
Sys.sleep(x)
x * 2
}
We might write:
create <- rrq::rrq_envir(sources = "myfuns.R")
The next step is to register this function for your queue:
obj$envir(create)
By default, this will notify all running workers to update their environment. Note that if your function errors in any way, your workers will exit!
obj$worker_log_tail(n = 4)
#> worker_id child time command message
#> 1 doggoned_lamb_1 NA 1709655056 MESSAGE REFRESH
#> 2 doggoned_lamb_2 NA 1709655056 MESSAGE REFRESH
#> 3 doggoned_lamb_1 NA 1709655056 ENVIR new
#> 4 doggoned_lamb_2 NA 1709655056 ENVIR new
#> 5 doggoned_lamb_2 NA 1709655056 ENVIR create
#> 6 doggoned_lamb_1 NA 1709655056 ENVIR create
#> 7 doggoned_lamb_2 NA 1709655056 RESPONSE REFRESH
#> 8 doggoned_lamb_1 NA 1709655056 RESPONSE REFRESH
Now our workers have picked up our functions we can start using them:
t <- obj$enqueue(slowdouble(1))
obj$task_wait(t)
#> [1] 2
If you need more control you can write your own function. We could
have written create
as
create <- function(env) {
sys.source("myfuns.R", env)
}
This approach would also allow you do do something like read an rds or Rdata file containing a large object that you want every worker to have a copy of.
The rrq
package does not aspire to be a fully fledged
scheduler, but sometimes a little more control than first-in-first-out
is required. There are a few options available that allow the user to
control how tasks are run when needed. These involve:
We support a simple system for allowing tasks to depend on other tasks. An example of this might be where you need to download a file, then run a series of analyses on it. Or where you want to run an analysis over a set of parameters, and then aggregate once they’re all done. How the output of one task feeds into the others is up to you, but practically this will require one of the following options:
When queueing a task, you can provide a vector of task identifiers as
the depends_on
argument. These identifiers must all be
known to rrq
and the task will not be started until all
these prerequisites are completed. The task lifecycle will look
different to the above; rather than starting as PENDING
the
task begins as DEFERRED
.
Once all prerequisites are complete, a task becomes possible and it
moves from DEFERRED
to PENDING
. It will be
placed at the front of the queue.
If a prerequisite task fails for any reason (an error, is cancelled,
or its worker dies) then the task will become
IMPOSSIBLE
.
For example, suppose that we have code:
create <- function(n) {
saveRDS(runif(n), "numbers.rds")
}
use <- function(i) {
d <- readRDS("numbers.rds")
d[[i]]
}
Here we have some function create
that we want to run
first, doing some setup, then another function use
that we
want to run after which will read the result of running
create
and do some analysis on it.
Create an rrq_controller
object and tell workers to read
the deps.R
file which contains these function
definitions
obj <- rrq::rrq_controller$new(paste0("rrq:", ids::random_id(bytes = 4)))
obj$envir(rrq::rrq_envir(sources = "deps.R"))
source("deps.R")
We can then enqueue our first task:
id <- obj$enqueue(create(5))
Then use this id
id_use <- obj$lapply(1:5, use, depends_on = id, timeout_task_wait = 0)
The status of the first task will be PENDING
, per
usual:
obj$task_status(id)
#> af56de3ad240f5bf5f93b366157a1e8a
#> "PENDING"
however, the group of tasks submitted as part of the
$lapply
call will be DEFERRED
because they’re
not yet in the queue:
obj$task_status(id_use$task_ids)
#> 2da0fbb266e9254e291154b3d746565d af67e6bd3002eeabe95f1d11be99c0b8
#> "DEFERRED" "DEFERRED"
#> 0ad663e03f7ff86ad89f73b2955bdea4 7d7a5e064a9d5b10ce202bd30413cfa2
#> "DEFERRED" "DEFERRED"
#> 7e797230c9ae6a9016f94f1e2b778745
#> "DEFERRED"
obj$queue_list()
#> [1] "af56de3ad240f5bf5f93b366157a1e8a"
Once the first task is processed by a worker, the status changes:
obj$task_status(id)
#> af56de3ad240f5bf5f93b366157a1e8a
#> "COMPLETE"
obj$task_status(id_use$task_ids)
#> 2da0fbb266e9254e291154b3d746565d af67e6bd3002eeabe95f1d11be99c0b8
#> "PENDING" "PENDING"
#> 0ad663e03f7ff86ad89f73b2955bdea4 7d7a5e064a9d5b10ce202bd30413cfa2
#> "PENDING" "PENDING"
#> 7e797230c9ae6a9016f94f1e2b778745
#> "PENDING"
obj$queue_list()
#> [1] "7e797230c9ae6a9016f94f1e2b778745" "7d7a5e064a9d5b10ce202bd30413cfa2"
#> [3] "2da0fbb266e9254e291154b3d746565d" "af67e6bd3002eeabe95f1d11be99c0b8"
#> [5] "0ad663e03f7ff86ad89f73b2955bdea4"
At this point the tasks will proceed through the queue as usual.
Points to note here:
Sometimes it is useful to have different workers listen on different queues. For example, you may have workers on different machines with different capabilities (e.g., a machine with a GPU or high memory). You may have tasks that are expected to take quite a long time but want some workers to monitor a fast queue with short lived tasks.
Every worker listens to the default
queue, but when
starting a worker, you can add additional queues and control the
priority order of these queues for that worker. When submitting tasks
you then specify the queue that the task sits in.
The easiest way to configure this is to save a worker configuration:
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq::rrq_controller$new(id)
obj$worker_config_save(
"short",
rrq::rrq_worker_config(queue = "short"))
obj$worker_config_save(
"all",
rrq::rrq_worker_config(queue = c("short", "long")))
obj$worker_config_list()
#> [1] "short" "localhost" "all"
Above, we create two configurations: “short” which just listens on
the queue short
, and all
which listens both on
the short and long task queues (note that both these workers will also
listen on the default queue).
w_short <- rrq::rrq_worker_spawn(obj, name_config = "short")
#> Spawning 1 worker with prefix unrounded_argentinehornedfrog
w_all <- rrq::rrq_worker_spawn(obj, name_config = "all")
#> Spawning 1 worker with prefix humourful_africangoldencat
We can then submit a long task to the worker:
id_long1 <- obj$enqueue(Sys.sleep(3600), queue = "long")
id_long2 <- obj$enqueue(Sys.sleep(3600), queue = "long")
After the workers have had the ability to pick up work, our “short” worker is still available:
obj$worker_status()
#> unrounded_argentinehornedfrog_1 humourful_africangoldencat_1
#> "IDLE" "BUSY"
So we can submit tasks to this short queue and have them processed
id <- obj$enqueue(runif(1), queue = "short")
obj$task_wait(id, timeout = 10)
#> [1] 0.7365085
Note that there is no validation to check that any worker is listening on any queue when you submit a task. Indeed there can’t be as new workers can be added at any time (so at the point of submission perhaps there were no workers).
Running a task in a separate process offers some additional features at a cost of a little more overhead per task.
The cost is that we have to launch an additional process for every
task run. We use callr
for this to
smooth over a number of rough edges, but this does impose a minimum
overhead of about 0.1s per task, plus the cost of loading any packages
that your task might need (if you use packages that make heavy use of
things like S4 classes this can easily extend to a few seconds).
The additional features that it provides are:
The sorts of tasks that benefit from this sort of approach are typically long-running (expected running times in the 10s of seconds or more) so that the overhead is low, but also the features of cancellation and timeouts become more useful. We have also seen this used usefully where the task may leak memory, or cache results aggressively - over time this would cause the worker process to consume more memory until the worker process was killed by the operating system.
To use a separate process, add separate_process = TRUE
to calls to $enqueue()
. This will then enable the argument
timeout
to have an effect, as well as the
$task_cancel()
method.
The data for each task, and the task result itself, is saved in Redis. This is alongside the typically much smaller metadata required to run rrq. Because Redis is an in memory database, this means that some things will not be a great idea; for example sending off 1000 tasks that will each write back 100 MB of simulation output would try and write 100 GB of data into the Redis database which may cause issues for your server!
To allow for this workflow, rrq
supports configuring its
object store (rrq::object_store
) so that objects above a
certain size are written out elsewhere. Currently, the only “elsewhere”
supported is to disk with the assumption that the controller and all
workers share a filesystem. The approach used is safe for multiple
concurrent processes, including over network mounted filesystems.
To configure this you must use rrq::rrq_configure
before attaching either a controller or a worker to the queue.
The configuration interface here will change in future, but we will
maintain backward compatibility with the current options.
To configure storage so that every object that is greater than 1KB is saved to disk, you could write:
id <- paste0("rrq:", ids::random_id(bytes = 4))
path <- tempfile()
rrq::rrq_configure(id, store_max_size = 1000, offload_path = path)
then we create the queue object as normal (and spawn a worker so that we can use it)
obj <- rrq::rrq_controller$new(id)
w <- rrq::rrq_worker_spawn(obj, 1)
#> Spawning 1 worker with prefix deceptive_africangroundhornbill
It’s not hard at all to get to 1KB of data, we can do that by simulating a big pile of random numbers:
t <- obj$enqueue(runif(200000))
Once the task has finished, data will be stored on disk below the path given above:
dir(path)
#> [1] "bd8f1d1b4124e769720625b5987892e6"
This keeps the larger objects out of the database.
This vignette uses the very basic rrq_worker_spawn()
method to create workers on your local machine. This is intended
primarily for development only, though it may be useful in some
situations. There are other options available, depending on how you want
to use rrq
.
The simplest way of getting started with rrq
is to use
rrq::rrq_worker_spawn
, as above. This approach has several
nice features; it uses callr
, so no extra work is required
to make the worker R session behave like the controller session (it will
find your environment variables, library, and working directory), and it
behaves the same way on all platforms (compare below). However, the
workers will disappear when the controlling session completes (this is
either a good or a bad thing) and you will be limited to a single
node.
There are two issues here; one is the technical details of launching your rrq workers on the cluster, and the other is the details around whether your HPC admins would like you to (and the security implications of doing so).
If you are using rrq with an HPC system, then you will want to schedule workers onto the system. The details here will change
The basic approach is to write out a launcher script somewhere:
rrq::rrq_worker_script(dest)
This can be called from the command line:
$ ./rrq_worker --help
Usage:
rrq_worker [options] <id>
Options:
--config=NAME Name of a worker configuration [default: localhost]
--name=NAME Name of the worker (optional)
--key-alive=KEY Key to write to once alive (optional)
This is a bash script that can then be called from whatever cluster job scheduler you use. The important things to pass through are:
id
the only positional argument, which is the queue
id--config=NAME
allows controlling of the named worker
config (set via $worker_config_set()
, and allowing changing
of timeout, verbosity and queues)--key-alive=KEY
allows setting a key that the worker
will report back onIn addition, you may need to change the configuration type. If you
need to control redis access you should set the REDIS_URL
environment variable to point at your Redis server.
We provide a docker image that you can use (mrcide/rrq
),
though typically you would want to extend this image to include your own
packages. Alternatively create your own docker image (see the
main dockerfile but replace COPY . /src
with an
installation of rrq) and the image
that sets the entrypoint to call rrq_worker
.
We use rrq to orchestrate workers in web applications where a number of workers carry out long running calculations for a HTTP API written using plumber and porcelain.
If you have submitted workers via a task scheduler, you might want to
block and wait for them to become available. You can do this using the
rrq::rrq_worker_expect
and
rrq::rrq_worker_wait
functions.
We first create a vector of names for the new workers, and then tell
rrq
that we’re going to produce these workers.
obj <- rrq::rrq_controller$new(id)
worker_ids <- c("cluster_1", "cluster_2")
key_alive <- rrq::rrq_worker_expect(obj, worker_ids)
It responds with a key that we can use to check if they have appeared:
key_alive
#> [1] "rrq:5d41c76d:worker:alive:6676476cd81eeb71589f0c4f10c93b4f"
How you then start the workers is up to you; you might start them at the command line with
rrq_worker --worker-id cluster_1 rrq:5d41c76d
rrq_worker --worker-id cluster_2 rrq:5d41c76d
(into separate terminals). Or you might queue these jobs with a cluster scheduler such as Slurm or PBS (with appropriate care over the working directory). But you can then immediately, in your R console write:
rrq::rrq_worker_wait(obj, key_alive)
and your session will block and wait for the workers to appear, erroring if they do not appear in time.
If you use many workers, particularly on different machines, you may not notice if some disappear. Possible causes of this include:
By default, if this happens when your worker is running a task, that
task status will forever be stuck in RUNNING
.
rrq
provides a simple heartbeat process, if requested,
to detect when a worker has disappeared. To do this, we run a second
process on each worker that periodically writes to the Redis database on
a key that will expire in a time slightly longer than that period, in
effect making a dead man’s
switch - see rrq::rrq_heartbeat
for details.
To enable the heartbeat, save a worker configuration with the
heartbeat_period
set to some number of seconds. Below we
use 2 seconds so that this example runs reasonably quickly, but in
practice something like 60 might be slightly less load on your Redis
server.
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq::rrq_controller$new(id)
res <- obj$worker_config_save(
"localhost",
rrq::rrq_worker_config(heartbeat_period = 2))
Then, launch a worker
w <- rrq::rrq_worker_spawn(obj, 1)
#> Spawning 1 worker with prefix dashing_italianbrownbear
Our worker will print information indicating that the heartbeat is
enabled (use obj$worker_process_log()
)
#> [2024-03-05 16:11:02.398818] HEARTBEAT rrq:882c35e8:worker:dashing_italianbrownbear_1:heartbeat
#> [2024-03-05 16:11:02.65111] HEARTBEAT OK
#> [2024-03-05 16:11:02.660215] ALIVE
#> [2024-03-05 16:11:02.660556] ENVIR new
#> [2024-03-05 16:11:02.660901] QUEUE default
#> __
#> ______________ _/ /
#> ______ / ___/ ___/ __ `/ /_____
#> /_____/ / / / / / /_/ /_/_____/
#> ______ /_/ /_/ \__, (_) ______
#> /_____/ /_/ /_____/
#> worker: dashing_italianbrownbear_1
#> config: localhost
#> rrq_version: 0.7.9
#> platform: x86_64-pc-linux-gnu (64-bit)
#> running: Ubuntu 22.04.4 LTS
#> hostname: fv-az1242-266
#> username: runner
#> queue: rrq:882c35e8:queue:default
#> wd: /home/runner/work/rrq/rrq/vignettes
#> pid: 8002
#> redis_host: 127.0.0.1
#> redis_port: 6379
#> heartbeat_key: rrq:882c35e8:worker:dashing_italianbrownbear_1:heartbeat
We also have a heartbeat key here that we can inspect:
info <- obj$worker_info()[[1]]
obj$con$EXISTS(info$heartbeat_key)
#> [1] 1
obj$con$PTTL(info$heartbeat_key) # in milliseconds
#> [1] 5837
We queue some slow job onto the worker:
Then we kill the worker:
tools::pskill(obj$worker_info()[[1]]$pid)
Of course, immediately our key still exists:
obj$con$EXISTS(info$heartbeat_key)
#> [1] 1
but eventually it will expire:
Sys.sleep(6)
obj$con$EXISTS(info$heartbeat_key)
#> [1] 0
So far as rrq
is concerned, at this point your task is
still running
obj$task_status(t)
#> 08c7ca5f3fa0bf49e8b69f8dd4c43368
#> "RUNNING"
Handling this situation is still completely manual. You can detect lost workers jobs with:
obj$worker_detect_exited()
#> Lost 1 worker:
#> - dashing_italianbrownbear_1
#> Orphaning 1 task:
#> - 08c7ca5f3fa0bf49e8b69f8dd4c43368
this will also “orphan” the task
obj$task_status(t)
#> 08c7ca5f3fa0bf49e8b69f8dd4c43368
#> "DIED"
Any tasks that were dependent on this task will now be marked as
IMPOSSIBLE
.
In a future version we will support automatic re-queuing of jobs assigned to disappeared workers.
There are several options to get started with Redis, the best one will likely depend on your platform and needs.
(Linux, macOS with docker desktop, Windows with docker desktop)
This is how we develop rrq because it’s easy to destroy and recreate the redis instance. Start the docker redis container like:
docker run --name redis --rm -d -p 127.0.0.1:6379:6379 redis
This will listen on port 6379 which is the Redis default. You can
stop the container (deleting all data) with
docker stop redis
On Linux this is fairly straightforward, either by downloading and building the source
code or by installing via apt
or snap
On macOS the source will compile, or you can install a redis server via homebrew
On Windows you can install redis via WSL. There have also been various ports.
If you have redis running on a different machine (this will be the
case if you’re using redis to distribute tasks over a number of
different machines) you will need to tell rrq
and
redux
where to find it. The simplest way is to set the
environment variable REDIS_HOST
to the name of the machine
if it is running with default ports, or set REDUX_URL
if
you need more control. Alternatively, when connecting to the server
above, you can manually construct your redux::hiredis
object and pass in any configuration option you need; see the
documentation for redux::redis_config
for details.