If you have thousands and thousands of jobs to submit at once you may not want to flood the cluster with them all at once. Each job submission is relatively slow (the HPC tools that the web interface has to use are relatively slow). The actual queue that the cluster uses doesn’t seem to like processing tens of thousands of job, and can slow down. And if you take up the whole cluster someone may come and knock on your office and complain at you. At the same time, batching your jobs up into little bits and manually sending them off is a pain and work better done by a computer.
An alternative is to submit a set of “workers” to the cluster, and
then submit jobs to them. This is done with the rrq package,
along with a redis server
running on the cluster.
To get started you will need to install the rrq package
locally.
drat::add("mrc-ide")
install.packages("rrq")Then construct the context as before
root <- "contexts"
ctx <- context::context_save(root, sources = "mysources.R")
#> [ open:db ] rdsThere are two ways we can proceed from here; the first - “workers” - is very similar to the non-worker workflow and is described first. The second - “rrq” - is a bit more involved and is described second.
Then configure and create the queue; the use_workers
argument is important here as it:
rrq package is available on the
cluster, where your workers will run$enqueue method so that
jobs are not sent to the HPC scheduler but to the rrq
scheduler$submit_workers method which you will use
to create workers on the clusterHowever, everything else will appear the same.
config <- didehpc::didehpc_config(use_workers = TRUE)
obj <- didehpc::queue_didehpc(ctx, config = config)
#> Loading context 02a9261fe4e9e6554d76936ecb35cef0
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ library ]
#> [ namespace ]
#> [ source ] mysources.R
#> Running installation script on cluster
#> ,:\ /:.
#> // \_()_/ \\
#> || | | || CONAN THE LIBRARIAN
#> || | | || Library: Q:\didehpc\20210817-145020\contexts\lib\windows\4.0
#> || |____| || Bootstrap: T:\conan\bootstrap\4.0
#> \\ / || \ // Cache: Q:\didehpc\20210817-145020\contexts\conan\cache/pkg
#> `:/ || \;' Policy: lazy
#> || Repos:
#> || * https://mrc-ide.github.io/didehpc-pkgs
#> XX * https://cloud.r-project.org
#> XX Packages:
#> XX * rrq
#> XX * callr
#> OO
#> `'
#> i Loading metadata database
#> v Loading metadata database ... done
#> i Getting 8 pkgs (2.79 MB), 1 cached
#> v Got rrq 0.4.4 (source) (118.54 kB)
#> v Got prettyunits 1.1.1 (windows) (37.72 kB)
#> v Got progress 1.2.2 (windows) (85.86 kB)
#> v Got docopt 0.7.1 (windows) (245.69 kB)
#> v Got hms 1.1.0 (windows) (104.22 kB)
#> v Got redux 1.1.0 (windows) (287.97 kB)
#> v Got callr 3.7.0 (windows) (439.25 kB)
#> v Got processx 3.5.2 (windows) (1.25 MB)
#> v Got ps 1.6.0 (windows) (775.40 kB)
#> v Installed docopt 0.7.1 (719ms)
#> v Installed hms 1.1.0 (829ms)
#> v Installed progress 1.2.2 (907ms)
#> v Installed prettyunits 1.1.1 (1s)
#> v Installed callr 3.7.0 (1.1s)
#> v Installed redux 1.1.0 (1.3s)
#> i Building rrq 0.4.4
#> v Installed processx 3.5.2 (6.7s)
#> v Installed ps 1.6.0 (6.8s)
#> v Built rrq 0.4.4 (8s)
#> v Installed rrq 0.4.4 (438ms)
#> v Summary: 9 new 15 kept in 27.8s
#> Done!You can now submit
t <- obj$enqueue(random_walk(0, 10))This job will stay pending forever as the HPC scheduler will never run it
t$status()
#> [1] "PENDING"
t$times()
#> task_id submitted started finished
#> 1 cb4d883580667f393a2945a05688620a 2021-08-17 14:53:51 <NA> <NA>
#> waiting running idle
#> 1 0.2062078 NA NAYou must submit actual workers in order to actually run things. This could have been done before submitting the tasks, though workers will time out after 10 minutes of inactivity, if you have very many jobs to save your workers might exit before the work starts!
The argument is the number of workers to submit. Each worker is equivalent to a job that your configuration would otherwise create (in terms of cores selected).
workers <- obj$submit_workers(2)
#> Submitting 2 workers with base name 'epicurean_xiaosaurus'
#> submitting (-) [===================================] 100% | giving up in 600 s
workers
#> [1] "epicurean_xiaosaurus_1" "epicurean_xiaosaurus_2"All workers get names in the form
<adjective>_<animal>_<integer> so that
you can remember which workers you set off. They will turn off after 10
minutes of inactivity by default (you can tweak this with the
worker_timeout argument to didehpc_config or
by sending a TIMEOUT_SET message).
One advantage over the usual queuing approach here is that you will not wait for anyone else’s jobs to complete once you have reserved your workers.
t$wait(10)
#> (-) waiting for cb4d883...20a, giving up in 9.5 s
#> [1] 0.92566336 -0.58321241 0.54753392 0.72564391 0.51248705 -0.22445805
#> [7] 0.03966805 -0.25494044 -0.16625861 -1.97337247We’re going to interact with the rrq object a bit
rrq <- obj$rrq_controller()
rrq
#> <rrq_controller>
#> Public:
#> bulk_wait: function (x, timeout = Inf, time_poll = 1, progress = NULL, delete = TRUE)
#> con: redis_api, R6
#> deferred_list: function ()
#> destroy: function (delete = TRUE, worker_stop_type = "message", worker_stop_timeout = 0)
#> enqueue: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE,
#> enqueue_: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE,
#> enqueue_bulk: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> enqueue_bulk_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> envir: function (create, notify = TRUE)
#> initialize: function (queue_id, con = redux::hiredis())
#> keys: list
#> lapply: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> lapply_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#> message_get_response: function (message_id, worker_ids = NULL, named = TRUE, delete = FALSE,
#> message_has_response: function (message_id, worker_ids = NULL, named = TRUE)
#> message_response_ids: function (worker_id)
#> message_send: function (command, args = NULL, worker_ids = NULL)
#> message_send_and_wait: function (command, args = NULL, worker_ids = NULL, named = TRUE,
#> queue_id: 02a9261fe4e9e6554d76936ecb35cef0
#> queue_length: function (queue = NULL)
#> queue_list: function (queue = NULL)
#> queue_remove: function (task_ids, queue = NULL)
#> task_cancel: function (task_id, wait = TRUE, delete = TRUE)
#> task_data: function (task_id)
#> task_delete: function (task_ids, check = TRUE)
#> task_exists: function (task_ids = NULL)
#> task_list: function ()
#> task_overview: function (task_ids = NULL)
#> task_position: function (task_ids, missing = 0L, queue = NULL)
#> task_preceeding: function (task_id, queue = NULL)
#> task_progress: function (task_id)
#> task_result: function (task_id)
#> task_status: function (task_ids = NULL)
#> task_wait: function (task_id, timeout = Inf, time_poll = 1, progress = NULL)
#> tasks_result: function (task_ids)
#> tasks_wait: function (task_ids, timeout = Inf, time_poll = 1, progress = NULL)
#> worker_config_list: function ()
#> worker_config_read: function (name)
#> worker_config_save: function (name, time_poll = NULL, timeout = NULL, queue = NULL,
#> worker_delete_exited: function (worker_ids = NULL)
#> worker_detect_exited: function ()
#> worker_info: function (worker_ids = NULL)
#> worker_len: function ()
#> worker_list: function ()
#> worker_list_exited: function ()
#> worker_load: function (worker_ids = NULL)
#> worker_log_tail: function (worker_ids = NULL, n = 1)
#> worker_process_log: function (worker_id)
#> worker_status: function (worker_ids = NULL)
#> worker_stop: function (worker_ids = NULL, type = "message", timeout = 0, time_poll = 0.05,
#> worker_task_id: function (worker_ids = NULL)
#> Private:
#> scripts: list
#> store: object_store, R6This is another R6 object, though this one at least has decent
documentation - see the rrq::rrq_controller for details of
each method
You can see what your workers have been up to with the
workers_log_tail command:
rrq$worker_log_tail(n = Inf)
#> worker_id time command
#> 1 epicurean_xiaosaurus_1 1629208433 ALIVE
#> 2 epicurean_xiaosaurus_1 1629208433 TASK_START
#> 3 epicurean_xiaosaurus_2 1629208434 ALIVE
#> 4 epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE
#> message
#> 1
#> 2 977ba6cb22c03767d8bb2386b0c2b271
#> 3
#> 4 977ba6cb22c03767d8bb2386b0c2b271The time column represents seconds - relative seconds
should still be useful here.
As before, logging works on a per-task basis:
t$log()
#> [ open:db ] rds
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ library ]
#> [ namespace ]
#> [ source ] mysources.R
#> [ root ] contexts
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ task ] cb4d883580667f393a2945a05688620a
#> [ expr ] random_walk(0, 10)
#> [ start ] 2021-08-17 14:53:54.075
#> [ ok ]
#> [ end ] 2021-08-17 14:53:54.231Find out how long your workers will persist for:
rrq$message_send_and_wait("TIMEOUT_GET", worker_ids = workers)
#> $epicurean_xiaosaurus_1
#> timeout remaining
#> 600 600
#>
#> $epicurean_xiaosaurus_2
#> timeout remaining
#> 600.0000 599.4219Other than that, hopefully everything else continues as normal. We
can submit a bunch of jobs and run them using $lapply:
sizes <- 3:8
grp <- obj$lapply(sizes, random_walk, x = 0)
#> Creating bundle: 'robust_krill'
#> [ bulk ] Creating 6 tasks
#> submitting 6 tasksTask status:
grp$status()
#> e505c76477549a36f04625d810b494e1 8f2525389b6ba6ca19034567599a42be
#> "PENDING" "PENDING"
#> 579c132c388fb39f88992691a9ff16a1 bacdb4853335a3bc3ff9008d3bc09a93
#> "PENDING" "PENDING"
#> 5aa65c5b6b3b41b5afdf471a2e75886b 001a83fce62afb5eb9c49eda46d4c3cf
#> "PENDING" "PENDING"Collect the results:
res <- grp$wait(5)
#> (-) [=======================>------------------------] 50% | giving up in 4 s
#> (\) [=======================>------------------------] 50% | giving up in 4 s
#> (|) [================================================] 100% | giving up in 3 s
res
#> [[1]]
#> [1] -0.7841728 -1.9470967 -2.1056223
#>
#> [[2]]
#> [1] -0.4309173 -1.6581243 -0.4737780 0.4492074
#>
#> [[3]]
#> [1] -1.4924993 -2.0190762 -0.6046004 -1.3086768 -1.4110236
#>
#> [[4]]
#> [1] -1.184490257 -0.196637540 -0.191229471 -0.004466775 0.483852828
#> [6] 1.066925267
#>
#> [[5]]
#> [1] 0.6912883 -0.6199649 -1.4499917 -2.5812726 -1.4212357 -1.4081750 -2.2605613
#>
#> [[6]]
#> [1] -0.54840267 0.05881601 0.71391542 1.40774062 1.85894605 3.56734897
#> [7] 4.74281970 5.10295235While workers will turn off automatically, it’s polite to turn them
off as soon as you’re done using obj$stop_workers()
Alternatively, after submitting a bunch of jobs you can run
rrq$message_send("TIMEOUT_SET", 0)which will mean that the workers will stop immediately after not receiving a task (so after they finish processing all your jobs they’ll stop one by one). Practically this still takes one minute because that’s the polling timeout time (I may be able to improve this later).
obj$stop_workers()
Sys.sleep(1)
rrq$worker_log_tail(workers, n = Inf)
#> worker_id time command
#> 1 epicurean_xiaosaurus_1 1629208433 ALIVE
#> 2 epicurean_xiaosaurus_1 1629208433 TASK_START
#> 3 epicurean_xiaosaurus_2 1629208434 ALIVE
#> 4 epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE
#> 5 epicurean_xiaosaurus_1 1629208435 MESSAGE
#> 6 epicurean_xiaosaurus_1 1629208435 RESPONSE
#> 7 epicurean_xiaosaurus_2 1629208435 MESSAGE
#> 8 epicurean_xiaosaurus_2 1629208435 RESPONSE
#> 9 epicurean_xiaosaurus_1 1629208435 TASK_START
#> 10 epicurean_xiaosaurus_2 1629208435 TASK_START
#> 11 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE
#> 12 epicurean_xiaosaurus_1 1629208436 TASK_START
#> 13 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE
#> 14 epicurean_xiaosaurus_2 1629208436 TASK_START
#> 15 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE
#> 16 epicurean_xiaosaurus_1 1629208436 TASK_START
#> 17 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE
#> 18 epicurean_xiaosaurus_2 1629208436 TASK_START
#> 19 epicurean_xiaosaurus_1 1629208437 TASK_COMPLETE
#> 20 epicurean_xiaosaurus_2 1629208437 TASK_COMPLETE
#> 21 epicurean_xiaosaurus_1 1629208437 MESSAGE
#> 22 epicurean_xiaosaurus_1 1629208437 RESPONSE
#> 23 epicurean_xiaosaurus_1 1629208437 STOP
#> 24 epicurean_xiaosaurus_2 1629208437 MESSAGE
#> 25 epicurean_xiaosaurus_2 1629208437 RESPONSE
#> 26 epicurean_xiaosaurus_2 1629208437 STOP
#> message
#> 1
#> 2 977ba6cb22c03767d8bb2386b0c2b271
#> 3
#> 4 977ba6cb22c03767d8bb2386b0c2b271
#> 5 TIMEOUT_GET
#> 6 TIMEOUT_GET
#> 7 TIMEOUT_GET
#> 8 TIMEOUT_GET
#> 9 96b4a731efe9bf2efb5e3e7e6aef781a
#> 10 1df4287014a01c65f807c39cc84e5482
#> 11 96b4a731efe9bf2efb5e3e7e6aef781a
#> 12 26c86df13bf2c438bd3d09db57a3627a
#> 13 1df4287014a01c65f807c39cc84e5482
#> 14 7cbb27b699ffc184723d884e896a1c30
#> 15 26c86df13bf2c438bd3d09db57a3627a
#> 16 e544f4fafc61746df4b5d6b80b95338d
#> 17 7cbb27b699ffc184723d884e896a1c30
#> 18 b03fa5fd3f1509e6e98e9707cce0d445
#> 19 e544f4fafc61746df4b5d6b80b95338d
#> 20 b03fa5fd3f1509e6e98e9707cce0d445
#> 21 STOP
#> 22 STOP
#> 23 OK (BYE)
#> 24 STOP
#> 25 STOP
#> 26 OK (BYE)
rrq$destroy()In this model, we create a very lightweight queue which in turn creates very lightweight tasks. This avoids even more overhead than the approach above, though it can be more difficult to debug because less information is saved. Rather than round-tripping data through the disk, everything goes via the redis server.
The first part here looks very similar, except that we use
use_rrq = TRUE rather than use_workers
config <- didehpc::didehpc_config(use_rrq = TRUE)
obj <- didehpc::queue_didehpc(ctx, config = config)
#> Loading context 02a9261fe4e9e6554d76936ecb35cef0
#> [ context ] 02a9261fe4e9e6554d76936ecb35cef0
#> [ library ]
#> [ namespace ]
#> [ source ] mysources.RWe still submit workers
workers <- obj$submit_workers(10)
#> Submitting 10 workers with base name 'vegetarian_alaskajingle'
#> submitting (-) [======>----------------------------] 20% | giving up in 599 s
#> submitting (\) [=========>-------------------------] 30% | giving up in 599 s
#> submitting (|) [=============>---------------------] 40% | giving up in 598 s
#> submitting (/) [=================>-----------------] 50% | giving up in 597 s
#> submitting (-) [====================>--------------] 60% | giving up in 597 s
#> submitting (\) [=======================>-----------] 70% | giving up in 596 s
#> submitting (|) [===========================>-------] 80% | giving up in 596 s
#> submitting (/) [===============================>---] 90% | giving up in 596 s
#> submitting (-) [===================================] 100% | giving up in 595 s
workers
#> [1] "vegetarian_alaskajingle_1" "vegetarian_alaskajingle_2"
#> [3] "vegetarian_alaskajingle_3" "vegetarian_alaskajingle_4"
#> [5] "vegetarian_alaskajingle_5" "vegetarian_alaskajingle_6"
#> [7] "vegetarian_alaskajingle_7" "vegetarian_alaskajingle_8"
#> [9] "vegetarian_alaskajingle_9" "vegetarian_alaskajingle_10"To send tasks to these workers we directly use the
rrq_controller object - we’ll not use the
queue_didehpc object from this point.
rrq <- obj$rrq_controller()This will look and act a lot like the main didehpc queue controller,
but with a few differences. Tasks will come back as plain strings rather
than user-friendly objects and lapply and
enqueue_bulk are now blocking operations by default. Most
tasks will clean up after they delete rather than leaving a persistent
record on disk. The payback for this is potentially very fast task
turnarounds and better behaviour with the disk under heavy load.
t <- rrq$enqueue(sin(1))
rrq$task_wait(t, 10)
#> [1] 0.841471For example; submitting 50 trivial tasks to our pool of workers and retrieving the results:
system.time(res <- rrq$lapply(1:50, sin))
#> user system elapsed
#> 0.026 0.000 0.325or 500 tasks:
system.time(res <- rrq$lapply(1:500, sin, progress = FALSE))
#> user system elapsed
#> 0.161 0.052 1.476Across the network, the latency here is ~1/600 s per task. On fi–didemrchnb it will hopefully be a bit faster because of the infiniband network.
rrq$worker_stop()
rrq$destroy()It is theoretically possible to submit a cluster job that creates an
rrq_controller and controls the second queue. To do that
you need to write function like:
get_rrq_controller <- function(x, ...) {
queue_id <- Sys.getenv("CONTEXT_ID", "")
stopifnot(queue_id != "")
rrq::rrq_controller$new((queue_id)
}
within your sources, then you can use it in place of running (say) a
lapply() call in your code. This approach allows a
relatively simple form of inter-process communication. Talk to Rich if
this is something you might have a use for, if you have simulation needs
that are larger than a single node.
Suppose that we want to submit a job where we have a single process which orchestrates a group of workers each of which takes up an entire node. We used this pattern in the covid response where we wanted to run different MCMC chains on different nodes, each using 32 cores, but we also needed a single “controlling” process to organise collecting results from these nodes.
What we want to do do is specify a different set of resources to be
used by the workers than by tasks submitted by $enqueue().
Note that this only makes sense when using use_rrq = TRUE.
For example, to submit a controlling process that uses the
GeneralNodes template and one core (the default) but worker
processes that can use 8 cores, we might write
config <- didehpc::didehpc_config(
use_rrq = TRUE,
worker_resource = didehpc::worker_resource(cores = 8))
config
#> <didehpc_config>
#> - cluster: fi--dideclusthn
#> - credentials:
#> - username: rfitzjoh
#> - password: *******************
#> - username: rfitzjoh
#> - resource:
#> - template: GeneralNodes
#> - parallel: FALSE
#> - count: 1
#> - type: Cores
#> - shares:
#> - home: (local) /home/rich/net/home => \\fi--san03.dide.ic.ac.uk\homes\rfitzjoh => Q: (remote)
#> - temp: (local) /home/rich/net/temp => \\fi--didef3.dide.ic.ac.uk\tmp => T: (remote)
#> - use_workers: FALSE
#> - use_rrq: TRUE
#> - worker_timeout: 600
#> - conan_bootstrap: TRUE
#> - r_version: 4.0.3
#> - use_java: FALSE
#> - redis_host: fi--dideclusthn.dide.ic.ac.uk
#> - worker_resource:
#> - template: GeneralNodes
#> - parallel: TRUE
#> - count: 8
#> - type: CoresSee the worker_resource section here indicates different
resources to the resource section.
Now, when submitting workers with obj$submit_workers
each worker will be able to use 8 cores, but a task submitted by
$enqueue() will only be able to use one. You might then
submit a task that uses the get_rrq_controller trick above
as part of your single core job, which can then farm out work to your
workers using the rrq queue object.