If you have thousands and thousands of jobs to submit at once you may not want to flood the cluster with them all at once. Each job submission is relatively slow (the HPC tools that the web interface has to use are relatively slow). The actual queue that the cluster uses doesn’t seem to like processing tens of thousands of job, and can slow down. And if you take up the whole cluster someone may come and knock on your office and complain at you. At the same time, batching your jobs up into little bits and manually sending them off is a pain and work better done by a computer.
An alternative is to submit a set of “workers” to the cluster, and
then submit jobs to them. This is done with the rrq
package,
along with a redis
server
running on the cluster.
To get started you will need to install the rrq
package
locally.
drat::add("mrc-ide")
install.packages("rrq")
Then construct the context as before
root <- "contexts"
ctx <- context::context_save(root, sources = "mysources.R")
#> [ open:db ] rds
There are two ways we can proceed from here; the first - “workers” - is very similar to the non-worker workflow and is described first. The second - “rrq” - is a bit more involved and is described second.
Then configure and create the queue; the use_workers
argument is important here as it:
rrq
package is available on the
cluster, where your workers will run$enqueue
method so that
jobs are not sent to the HPC scheduler but to the rrq
scheduler$submit_workers
method which you will use
to create workers on the clusterHowever, everything else will appear the same.
config <- didehpc::didehpc_config(use_workers = TRUE)
obj <- didehpc::queue_didehpc(ctx, config = config)
#> Loading context 02a9261fe4e9e6554d76936ecb35cef0
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ library ]
#> [ namespace ]
#> [ source ] mysources.R
#> Running installation script on cluster
#> ,:\ /:.
#> // \_()_/ \\
#> || | | || CONAN THE LIBRARIAN
#> || | | || Library: Q:\didehpc\20210817-145020\contexts\lib\windows\4.0
#> || |____| || Bootstrap: T:\conan\bootstrap\4.0
#> \\ / || \ // Cache: Q:\didehpc\20210817-145020\contexts\conan\cache/pkg
#> `:/ || \;' Policy: lazy
#> || Repos:
#> || * https://mrc-ide.github.io/didehpc-pkgs
#> XX * https://cloud.r-project.org
#> XX Packages:
#> XX * rrq
#> XX * callr
#> OO
#> `'
#> i Loading metadata database
#> v Loading metadata database ... done
#> i Getting 8 pkgs (2.79 MB), 1 cached
#> v Got rrq 0.4.4 (source) (118.54 kB)
#> v Got prettyunits 1.1.1 (windows) (37.72 kB)
#> v Got progress 1.2.2 (windows) (85.86 kB)
#> v Got docopt 0.7.1 (windows) (245.69 kB)
#> v Got hms 1.1.0 (windows) (104.22 kB)
#> v Got redux 1.1.0 (windows) (287.97 kB)
#> v Got callr 3.7.0 (windows) (439.25 kB)
#> v Got processx 3.5.2 (windows) (1.25 MB)
#> v Got ps 1.6.0 (windows) (775.40 kB)
#> v Installed docopt 0.7.1 (719ms)
#> v Installed hms 1.1.0 (829ms)
#> v Installed progress 1.2.2 (907ms)
#> v Installed prettyunits 1.1.1 (1s)
#> v Installed callr 3.7.0 (1.1s)
#> v Installed redux 1.1.0 (1.3s)
#> i Building rrq 0.4.4
#> v Installed processx 3.5.2 (6.7s)
#> v Installed ps 1.6.0 (6.8s)
#> v Built rrq 0.4.4 (8s)
#> v Installed rrq 0.4.4 (438ms)
#> v Summary: 9 new 15 kept in 27.8s
#> Done!
You can now submit
t <- obj$enqueue(random_walk(0, 10))
This job will stay pending forever as the HPC scheduler will never run it
t$status()
#> [1] "PENDING"
t$times()
#> task_id submitted started finished
#> 1 cb4d883580667f393a2945a05688620a 2021-08-17 14:53:51 <NA> <NA>
#> waiting running idle
#> 1 0.2062078 NA NA
You must submit actual workers in order to actually run things. This could have been done before submitting the tasks, though workers will time out after 10 minutes of inactivity, if you have very many jobs to save your workers might exit before the work starts!
The argument is the number of workers to submit. Each worker is equivalent to a job that your configuration would otherwise create (in terms of cores selected).
workers <- obj$submit_workers(2)
#> Submitting 2 workers with base name 'epicurean_xiaosaurus'
#> submitting (-) [===================================] 100% | giving up in 600 s
workers
#> [1] "epicurean_xiaosaurus_1" "epicurean_xiaosaurus_2"
All workers get names in the form
<adjective>_<animal>_<integer>
so that
you can remember which workers you set off. They will turn off after 10
minutes of inactivity by default (you can tweak this with the
worker_timeout
argument to didehpc_config
or
by sending a TIMEOUT_SET
message).
One advantage over the usual queuing approach here is that you will not wait for anyone else’s jobs to complete once you have reserved your workers.
t$wait(10)
#> (-) waiting for cb4d883...20a, giving up in 9.5 s
#> [1] 0.92566336 -0.58321241 0.54753392 0.72564391 0.51248705 -0.22445805
#> [7] 0.03966805 -0.25494044 -0.16625861 -1.97337247
We’re going to interact with the rrq object a bit
rrq <- obj$rrq_controller()
rrq
#> <rrq_controller>
#> Public:
#> bulk_wait: function (x, timeout = Inf, time_poll = 1, progress = NULL, delete = TRUE)
#> con: redis_api, R6
#> deferred_list: function ()
#> destroy: function (delete = TRUE, worker_stop_type = "message", worker_stop_timeout = 0)
#> enqueue: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE,
#> enqueue_: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE,
#> enqueue_bulk: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> enqueue_bulk_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> envir: function (create, notify = TRUE)
#> initialize: function (queue_id, con = redux::hiredis())
#> keys: list
#> lapply: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> lapply_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> message_get_response: function (message_id, worker_ids = NULL, named = TRUE, delete = FALSE,
#> message_has_response: function (message_id, worker_ids = NULL, named = TRUE)
#> message_response_ids: function (worker_id)
#> message_send: function (command, args = NULL, worker_ids = NULL)
#> message_send_and_wait: function (command, args = NULL, worker_ids = NULL, named = TRUE,
#> queue_id: 02a9261fe4e9e6554d76936ecb35cef0
#> queue_length: function (queue = NULL)
#> queue_list: function (queue = NULL)
#> queue_remove: function (task_ids, queue = NULL)
#> task_cancel: function (task_id, wait = TRUE, delete = TRUE)
#> task_data: function (task_id)
#> task_delete: function (task_ids, check = TRUE)
#> task_exists: function (task_ids = NULL)
#> task_list: function ()
#> task_overview: function (task_ids = NULL)
#> task_position: function (task_ids, missing = 0L, queue = NULL)
#> task_preceeding: function (task_id, queue = NULL)
#> task_progress: function (task_id)
#> task_result: function (task_id)
#> task_status: function (task_ids = NULL)
#> task_wait: function (task_id, timeout = Inf, time_poll = 1, progress = NULL)
#> tasks_result: function (task_ids)
#> tasks_wait: function (task_ids, timeout = Inf, time_poll = 1, progress = NULL)
#> worker_config_list: function ()
#> worker_config_read: function (name)
#> worker_config_save: function (name, time_poll = NULL, timeout = NULL, queue = NULL,
#> worker_delete_exited: function (worker_ids = NULL)
#> worker_detect_exited: function ()
#> worker_info: function (worker_ids = NULL)
#> worker_len: function ()
#> worker_list: function ()
#> worker_list_exited: function ()
#> worker_load: function (worker_ids = NULL)
#> worker_log_tail: function (worker_ids = NULL, n = 1)
#> worker_process_log: function (worker_id)
#> worker_status: function (worker_ids = NULL)
#> worker_stop: function (worker_ids = NULL, type = "message", timeout = 0, time_poll = 0.05,
#> worker_task_id: function (worker_ids = NULL)
#> Private:
#> scripts: list
#> store: object_store, R6
This is another R6 object, though this one at least has decent
documentation - see the rrq::rrq_controller
for details of
each method
You can see what your workers have been up to with the
workers_log_tail
command:
rrq$worker_log_tail(n = Inf)
#> worker_id time command
#> 1 epicurean_xiaosaurus_1 1629208433 ALIVE
#> 2 epicurean_xiaosaurus_1 1629208433 TASK_START
#> 3 epicurean_xiaosaurus_2 1629208434 ALIVE
#> 4 epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE
#> message
#> 1
#> 2 977ba6cb22c03767d8bb2386b0c2b271
#> 3
#> 4 977ba6cb22c03767d8bb2386b0c2b271
The time
column represents seconds - relative seconds
should still be useful here.
As before, logging works on a per-task basis:
t$log()
#> [ open:db ] rds
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ library ]
#> [ namespace ]
#> [ source ] mysources.R
#> [ root ] contexts
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ task ] cb4d883580667f393a2945a05688620a
#> [ expr ] random_walk(0, 10)
#> [ start ] 2021-08-17 14:53:54.075
#> [ ok ]
#> [ end ] 2021-08-17 14:53:54.231
Find out how long your workers will persist for:
rrq$message_send_and_wait("TIMEOUT_GET", worker_ids = workers)
#> $epicurean_xiaosaurus_1
#> timeout remaining
#> 600 600
#>
#> $epicurean_xiaosaurus_2
#> timeout remaining
#> 600.0000 599.4219
Other than that, hopefully everything else continues as normal. We
can submit a bunch of jobs and run them using $lapply
:
sizes <- 3:8
grp <- obj$lapply(sizes, random_walk, x = 0)
#> Creating bundle: 'robust_krill'
#> [ bulk ] Creating 6 tasks
#> submitting 6 tasks
Task status:
grp$status()
#> e505c76477549a36f04625d810b494e1 8f2525389b6ba6ca19034567599a42be
#> "PENDING" "PENDING"
#> 579c132c388fb39f88992691a9ff16a1 bacdb4853335a3bc3ff9008d3bc09a93
#> "PENDING" "PENDING"
#> 5aa65c5b6b3b41b5afdf471a2e75886b 001a83fce62afb5eb9c49eda46d4c3cf
#> "PENDING" "PENDING"
Collect the results:
res <- grp$wait(5)
#> (-) [=======================>------------------------] 50% | giving up in 4 s
#> (\) [=======================>------------------------] 50% | giving up in 4 s
#> (|) [================================================] 100% | giving up in 3 s
res
#> [[1]]
#> [1] -0.7841728 -1.9470967 -2.1056223
#>
#> [[2]]
#> [1] -0.4309173 -1.6581243 -0.4737780 0.4492074
#>
#> [[3]]
#> [1] -1.4924993 -2.0190762 -0.6046004 -1.3086768 -1.4110236
#>
#> [[4]]
#> [1] -1.184490257 -0.196637540 -0.191229471 -0.004466775 0.483852828
#> [6] 1.066925267
#>
#> [[5]]
#> [1] 0.6912883 -0.6199649 -1.4499917 -2.5812726 -1.4212357 -1.4081750 -2.2605613
#>
#> [[6]]
#> [1] -0.54840267 0.05881601 0.71391542 1.40774062 1.85894605 3.56734897
#> [7] 4.74281970 5.10295235
While workers will turn off automatically, it’s polite to turn them
off as soon as you’re done using obj$stop_workers()
Alternatively, after submitting a bunch of jobs you can run
rrq$message_send("TIMEOUT_SET", 0)
which will mean that the workers will stop immediately after not receiving a task (so after they finish processing all your jobs they’ll stop one by one). Practically this still takes one minute because that’s the polling timeout time (I may be able to improve this later).
obj$stop_workers()
Sys.sleep(1)
rrq$worker_log_tail(workers, n = Inf)
#> worker_id time command
#> 1 epicurean_xiaosaurus_1 1629208433 ALIVE
#> 2 epicurean_xiaosaurus_1 1629208433 TASK_START
#> 3 epicurean_xiaosaurus_2 1629208434 ALIVE
#> 4 epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE
#> 5 epicurean_xiaosaurus_1 1629208435 MESSAGE
#> 6 epicurean_xiaosaurus_1 1629208435 RESPONSE
#> 7 epicurean_xiaosaurus_2 1629208435 MESSAGE
#> 8 epicurean_xiaosaurus_2 1629208435 RESPONSE
#> 9 epicurean_xiaosaurus_1 1629208435 TASK_START
#> 10 epicurean_xiaosaurus_2 1629208435 TASK_START
#> 11 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE
#> 12 epicurean_xiaosaurus_1 1629208436 TASK_START
#> 13 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE
#> 14 epicurean_xiaosaurus_2 1629208436 TASK_START
#> 15 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE
#> 16 epicurean_xiaosaurus_1 1629208436 TASK_START
#> 17 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE
#> 18 epicurean_xiaosaurus_2 1629208436 TASK_START
#> 19 epicurean_xiaosaurus_1 1629208437 TASK_COMPLETE
#> 20 epicurean_xiaosaurus_2 1629208437 TASK_COMPLETE
#> 21 epicurean_xiaosaurus_1 1629208437 MESSAGE
#> 22 epicurean_xiaosaurus_1 1629208437 RESPONSE
#> 23 epicurean_xiaosaurus_1 1629208437 STOP
#> 24 epicurean_xiaosaurus_2 1629208437 MESSAGE
#> 25 epicurean_xiaosaurus_2 1629208437 RESPONSE
#> 26 epicurean_xiaosaurus_2 1629208437 STOP
#> message
#> 1
#> 2 977ba6cb22c03767d8bb2386b0c2b271
#> 3
#> 4 977ba6cb22c03767d8bb2386b0c2b271
#> 5 TIMEOUT_GET
#> 6 TIMEOUT_GET
#> 7 TIMEOUT_GET
#> 8 TIMEOUT_GET
#> 9 96b4a731efe9bf2efb5e3e7e6aef781a
#> 10 1df4287014a01c65f807c39cc84e5482
#> 11 96b4a731efe9bf2efb5e3e7e6aef781a
#> 12 26c86df13bf2c438bd3d09db57a3627a
#> 13 1df4287014a01c65f807c39cc84e5482
#> 14 7cbb27b699ffc184723d884e896a1c30
#> 15 26c86df13bf2c438bd3d09db57a3627a
#> 16 e544f4fafc61746df4b5d6b80b95338d
#> 17 7cbb27b699ffc184723d884e896a1c30
#> 18 b03fa5fd3f1509e6e98e9707cce0d445
#> 19 e544f4fafc61746df4b5d6b80b95338d
#> 20 b03fa5fd3f1509e6e98e9707cce0d445
#> 21 STOP
#> 22 STOP
#> 23 OK (BYE)
#> 24 STOP
#> 25 STOP
#> 26 OK (BYE)
rrq$destroy()
In this model, we create a very lightweight queue which in turn creates very lightweight tasks. This avoids even more overhead than the approach above, though it can be more difficult to debug because less information is saved. Rather than round-tripping data through the disk, everything goes via the redis server.
The first part here looks very similar, except that we use
use_rrq = TRUE
rather than use_workers
config <- didehpc::didehpc_config(use_rrq = TRUE)
obj <- didehpc::queue_didehpc(ctx, config = config)
#> Loading context 02a9261fe4e9e6554d76936ecb35cef0
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ library ]
#> [ namespace ]
#> [ source ] mysources.R
We still submit workers
workers <- obj$submit_workers(10)
#> Submitting 10 workers with base name 'vegetarian_alaskajingle'
#> submitting (-) [======>----------------------------] 20% | giving up in 599 s
#> submitting (\) [=========>-------------------------] 30% | giving up in 599 s
#> submitting (|) [=============>---------------------] 40% | giving up in 598 s
#> submitting (/) [=================>-----------------] 50% | giving up in 597 s
#> submitting (-) [====================>--------------] 60% | giving up in 597 s
#> submitting (\) [=======================>-----------] 70% | giving up in 596 s
#> submitting (|) [===========================>-------] 80% | giving up in 596 s
#> submitting (/) [===============================>---] 90% | giving up in 596 s
#> submitting (-) [===================================] 100% | giving up in 595 s
workers
#> [1] "vegetarian_alaskajingle_1" "vegetarian_alaskajingle_2"
#> [3] "vegetarian_alaskajingle_3" "vegetarian_alaskajingle_4"
#> [5] "vegetarian_alaskajingle_5" "vegetarian_alaskajingle_6"
#> [7] "vegetarian_alaskajingle_7" "vegetarian_alaskajingle_8"
#> [9] "vegetarian_alaskajingle_9" "vegetarian_alaskajingle_10"
To send tasks to these workers we directly use the
rrq_controller
object - we’ll not use the
queue_didehpc
object from this point.
rrq <- obj$rrq_controller()
This will look and act a lot like the main didehpc queue controller,
but with a few differences. Tasks will come back as plain strings rather
than user-friendly objects and lapply
and
enqueue_bulk
are now blocking operations by default. Most
tasks will clean up after they delete rather than leaving a persistent
record on disk. The payback for this is potentially very fast task
turnarounds and better behaviour with the disk under heavy load.
t <- rrq$enqueue(sin(1))
rrq$task_wait(t, 10)
#> [1] 0.841471
For example; submitting 50 trivial tasks to our pool of workers and retrieving the results:
system.time(res <- rrq$lapply(1:50, sin))
#> user system elapsed
#> 0.026 0.000 0.325
or 500 tasks:
system.time(res <- rrq$lapply(1:500, sin, progress = FALSE))
#> user system elapsed
#> 0.161 0.052 1.476
Across the network, the latency here is ~1/600 s per task. On fi–didemrchnb it will hopefully be a bit faster because of the infiniband network.
rrq$worker_stop()
rrq$destroy()
It is theoretically possible to submit a cluster job that creates an
rrq_controller
and controls the second queue. To do that
you need to write function like:
get_rrq_controller <- function(x, ...) {
queue_id <- Sys.getenv("CONTEXT_ID", "")
stopifnot(queue_id != "")
rrq::rrq_controller$new((queue_id)
}
within your sources, then you can use it in place of running (say) a
lapply()
call in your code. This approach allows a
relatively simple form of inter-process communication. Talk to Rich if
this is something you might have a use for, if you have simulation needs
that are larger than a single node.
Suppose that we want to submit a job where we have a single process which orchestrates a group of workers each of which takes up an entire node. We used this pattern in the covid response where we wanted to run different MCMC chains on different nodes, each using 32 cores, but we also needed a single “controlling” process to organise collecting results from these nodes.
What we want to do do is specify a different set of resources to be
used by the workers than by tasks submitted by $enqueue()
.
Note that this only makes sense when using use_rrq = TRUE
.
For example, to submit a controlling process that uses the
GeneralNodes
template and one core (the default) but worker
processes that can use 8 cores, we might write
config <- didehpc::didehpc_config(
use_rrq = TRUE,
worker_resource = didehpc::worker_resource(cores = 8))
config
#> <didehpc_config>
#> - cluster: fi--dideclusthn
#> - credentials:
#> - username: rfitzjoh
#> - password: *******************
#> - username: rfitzjoh
#> - resource:
#> - template: GeneralNodes
#> - parallel: FALSE
#> - count: 1
#> - type: Cores
#> - shares:
#> - home: (local) /home/rich/net/home => \\fi--san03.dide.ic.ac.uk\homes\rfitzjoh => Q: (remote)
#> - temp: (local) /home/rich/net/temp => \\fi--didef3.dide.ic.ac.uk\tmp => T: (remote)
#> - use_workers: FALSE
#> - use_rrq: TRUE
#> - worker_timeout: 600
#> - conan_bootstrap: TRUE
#> - r_version: 4.0.3
#> - use_java: FALSE
#> - redis_host: fi--dideclusthn.dide.ic.ac.uk
#> - worker_resource:
#> - template: GeneralNodes
#> - parallel: TRUE
#> - count: 8
#> - type: Cores
See the worker_resource
section here indicates different
resources to the resource
section.
Now, when submitting workers with obj$submit_workers
each worker will be able to use 8 cores, but a task submitted by
$enqueue()
will only be able to use one. You might then
submit a task that uses the get_rrq_controller
trick above
as part of your single core job, which can then farm out work to your
workers using the rrq
queue object.