library(monty)
18 Parallelisation
Both dust2
and monty
have built-in support for parallelisation, making it straightforward to take advantage of having multiple cores available. There are three main ways to parallelise, depending on what part of the computation you want to speed up:
Using workers to parallelise between chains This runs each MCMC chain in its own process. It is useful whenever you are running multiple chains, regardless of whether your model is deterministic or stochastic, or whether it is written in
odin2
or not.Using threads to parallelise between particles in a filter This runs different set of particles from a particle filter in parallel, within a single process. It is useful when you have a stochastic model written in
odin2
and are using the filter to estimate the marginal likelihood. Using threads can be combined with using workers.Manual scheduling of chains This is a fully manual way of distributing chains - for example, across the nodes of an HPC system - rather than letting
monty
manage the workers automatically.
We illustrate how to implement all three methods below. Note that all the three parallelisation methods are implemented so that results will be identical to running in serial.
18.1 Using workers to parallelise between chains
Let’s revisit the simple Gaussian mixture model example from Section 10.1, and sample 4 chains of length 1000.
<- function(l, p, m1, m2) {
fn log(p * dnorm(l, mean = m1) + (1 - p) * dnorm(l, mean = m2))
}
<- monty_model_function(fn,
mixture_model fixed = list(p = 0.75, m1 = 3, m2 = 7),
allow_multiple_parameters = TRUE)
<- monty_sampler_random_walk(matrix(2))
sampler
set.seed(1)
<- monty_sample(mixture_model, sampler, 1000, initial = 3, n_chains = 4)
samples #> ⡀⠀ Sampling [▁▁▁▁] ■ | 0% ETA: 29s
#> ✔ Sampled 4000 steps across 4 chains in 223ms
#>
One of the available inputs to monty_sample
that we have not specified is runner
. The runner determines how the chains are run, and these can be setup with functions in monty
. If runner
is not specified then as a default the samples will be run with the simplest runner which is constructed with monty_runner_serial
. With this runner, chains are run in series (one after another).
However if we have multiple cores available, we may want to run chains in parallel using workers. In this case we can construct a runner with monty_runner_callr
. This runner uses the callr
package to distribute your chains over a number of worker processes on the same machine.
Suppose we have 2 cores, then we will setup the runner setting n_workers = 2
.
<- monty_runner_callr(n_workers = 2) runner
This indicates that we can run 2 chains in parallel. Running 4 chains in our example will mean that each worker process will run 2 chains in sequence. Ideally if you have sufficiently many cores available, you will set n_workers
equal to the number of chains. Otherwise typically you want n_workers
to be an even divisor of the number of chains - for 4 chains using 3 workers is not likely to be much faster than using 2 workers, as one of the workers would still need to run 2 chains in sequence.
We can run chains in parallel by using our runner
set.seed(1)
<- monty_sample(mixture_model, sampler, 1000, initial = 3, n_chains = 4,
samples2 runner = runner)
We have set the seed before running in serial and before running in parallel so we can compare the output, and we can see that they produce identical results.
identical(samples, samples2)
#> [1] TRUE
Notice that it has actually taken longer to run in parallel! This is because there is a cost to forking across the cores that outweighs the benefit of running in parallel in this case. If we run longer chains, in serial
set.seed(1)
<- monty_sample(mixture_model, sampler, 100000, initial = 3, n_chains = 4)
samples #> ⡀⠀ Sampling [▂▁▁▁] ■■■ | 6% ETA: 19s
#> ⠄⠀ Sampling [▆▁▁▁] ■■■■■■■ | 21% ETA: 16s
#> ⢂⠀ Sampling [█▄▁▁] ■■■■■■■■■■■■ | 37% ETA: 13s
#> ⡂⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■■ | 52% ETA: 9s
#> ⠅⠀ Sampling [██▆▁] ■■■■■■■■■■■■■■■■■■■■■ | 68% ETA: 6s
#> ⢃⠀ Sampling [███▃] ■■■■■■■■■■■■■■■■■■■■■■■■■■ | 83% ETA: 3s
#> ⡃⠀ Sampling [███▇] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■ | 99% ETA: 0s
#> ✔ Sampled 4e+05 steps across 4 chains in 19.4s
#>
and then in parallel
<- monty_runner_callr(2)
runner set.seed(1)
<- monty_sample(mixture_model, sampler, 100000, initial = 3, n_chains = 4,
samples2 runner = runner)
#> ⡀⠀ Sampling [▃▃▁▁] ■■■■■■ | 16% ETA: 16s
#> ⠄⠀ Sampling [▅▅▁▁] ■■■■■■■■■■■ | 34% ETA: 12s
#> ⢂⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■ | 51% ETA: 9s
#> ⡂⠀ Sampling [██▃▄] ■■■■■■■■■■■■■■■■■■■■■■ | 71% ETA: 5s
#> ⠅⠀ Sampling [██▆▆] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■ | 89% ETA: 2s
#> ✔ Sampled 4e+05 steps across 4 chains in 16.4s
#>
we now see that we are running quicker in parallel! In general, the longer each chain takes to run and the fewer chains you need to run per worker, the greater the benefit of running in parallel.
18.2 Using threads to parallelise among particles in the filter
library(odin2)
library(dust2)
Let’s consider the following discrete-time, stochastic SIR model
<- odin({
sir update(S) <- S - n_SI
update(I) <- I + n_SI - n_IR
update(R) <- R + n_IR
update(incidence) <- incidence + n_SI
<- 1 - exp(-beta * I / N * dt)
p_SI <- 1 - exp(-gamma * dt)
p_IR <- Binomial(S, p_SI)
n_SI <- Binomial(I, p_IR)
n_IR
initial(S) <- N - I0
initial(I) <- I0
initial(R) <- 0
initial(incidence, zero_every = 1) <- 0
<- parameter(1000)
N <- parameter(10)
I0 <- parameter(0.2)
beta <- parameter(0.1)
gamma
<- data()
cases ~ Poisson(incidence)
cases })
Which we will fit to the (synthetic) data set data/incidence.csv
.
<- read.csv("data/incidence.csv")
d head(d)
#> time cases
#> 1 1 12
#> 2 2 23
#> 3 3 25
#> 4 4 36
#> 5 5 30
#> 6 6 57
We will setup our packer, prior and sampler
<- monty::monty_packer(scalar = c("beta", "gamma"),
packer fixed = list(I0 = 10, N = 1000))
<- monty::monty_dsl({
prior ~ Exponential(mean = 0.3)
beta ~ Exponential(mean = 0.1)
gamma
})
<- diag(2) * 0.01
vcv <- monty::monty_sampler_random_walk(vcv) sampler
and then we will setup our likelihood using a filter with 100 particles and then run 4 chains without any parallelisation
<- dust_filter_create(sir, time_start = 0,
filter data = d, dt = 0.25,
n_particles = 200)
<- dust_likelihood_monty(filter, packer,
likelihood save_trajectories = TRUE)
<- likelihood + prior
posterior
<- monty_sample(posterior, sampler, n_steps = 1000,
samples initial = c(0.3, 0.1),
n_chains = 4)
#> ⡀⠀ Sampling [▁▁▁▁] ■■ | 2% ETA: 35s
#> ⠄⠀ Sampling [▃▁▁▁] ■■■■ | 10% ETA: 32s
#> ⢂⠀ Sampling [▆▁▁▁] ■■■■■■■ | 19% ETA: 29s
#> ⡂⠀ Sampling [█▁▁▁] ■■■■■■■■■ | 27% ETA: 26s
#> ⠅⠀ Sampling [█▄▁▁] ■■■■■■■■■■■■ | 36% ETA: 23s
#> ⢃⠀ Sampling [█▆▁▁] ■■■■■■■■■■■■■■ | 44% ETA: 20s
#> ⡃⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■■ | 53% ETA: 17s
#> ⠍⠀ Sampling [██▄▁] ■■■■■■■■■■■■■■■■■■■ | 61% ETA: 14s
#> ⢋⠀ Sampling [██▆▁] ■■■■■■■■■■■■■■■■■■■■■■ | 70% ETA: 11s
#> ⡋⠀ Sampling [███▁] ■■■■■■■■■■■■■■■■■■■■■■■■■ | 78% ETA: 8s
#> ⠍⠁ Sampling [███▄] ■■■■■■■■■■■■■■■■■■■■■■■■■■■ | 87% ETA: 5s
#> ⢋⠁ Sampling [███▆] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■ | 95% ETA: 2s
#> ✔ Sampled 4000 steps across 4 chains in 35.3s
#>
An input to dust_filter_create
that we have not specified is n_threads
, which has a default value of 1. Threads can be used in the filter to run particles forward in parallel. Suppose we have two available cores and we want to use these for threads in the filter, then we would set n_threads = 2
<- dust_filter_create(sir, time_start = 0,
filter data = d, dt = 0.25,
n_particles = 200,
n_threads = 2)
<- dust_likelihood_monty(filter, packer,
likelihood save_trajectories = TRUE)
<- likelihood + prior
posterior
<- monty_sample(posterior, sampler, n_steps = 1000,
samples initial = c(0.3, 0.1),
n_chains = 4)
#> ⡀⠀ Sampling [▂▁▁▁] ■■■ | 7% ETA: 18s
#> ⠄⠀ Sampling [▇▁▁▁] ■■■■■■■■ | 22% ETA: 16s
#> ⢂⠀ Sampling [█▄▁▁] ■■■■■■■■■■■■ | 37% ETA: 13s
#> ⡂⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■■ | 52% ETA: 10s
#> ⠅⠀ Sampling [██▅▁] ■■■■■■■■■■■■■■■■■■■■■ | 67% ETA: 7s
#> ⢃⠀ Sampling [███▂] ■■■■■■■■■■■■■■■■■■■■■■■■■■ | 82% ETA: 4s
#> ⡃⠀ Sampling [███▇] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■ | 97% ETA: 1s
#> ✔ Sampled 4000 steps across 4 chains in 19.9s
#>
and we can see this is quicker than without parallelisation. As with workers and chains, generally the number of threads should be an even divisor of the number of particles.
You can use workers in combination with threads. To illustrate how to prioritise workers vs threads, let’s instead use our 2 cores for workers.
<- dust_filter_create(sir, time_start = 0,
filter data = d, dt = 0.25,
n_particles = 200,
n_threads = 1)
<- dust_likelihood_monty(filter, packer,
likelihood save_trajectories = TRUE)
<- likelihood + prior
posterior
<- monty_runner_callr(n_workers = 2)
runner
<- monty_sample(posterior, sampler, n_steps = 1000,
samples initial = c(0.3, 0.1),
n_chains = 4,
runner = runner)
#> ⡀⠀ Sampling [▂▂▁▁] ■■■■ | 9% ETA: 26s
#> ⠄⠀ Sampling [▄▄▁▁] ■■■■■■■■■ | 26% ETA: 15s
#> ⢂⠀ Sampling [▆▇▁▁] ■■■■■■■■■■■■■■ | 44% ETA: 11s
#> ⡂⠀ Sampling [██▂▁] ■■■■■■■■■■■■■■■■■■ | 56% ETA: 9s
#> ⠅⠀ Sampling [██▃▄] ■■■■■■■■■■■■■■■■■■■■■■ | 71% ETA: 6s
#> ⢃⠀ Sampling [██▆▆] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■ | 89% ETA: 2s
#> ✔ Sampled 4000 steps across 4 chains in 19.4s
#>
We see that this was quicker - this is because there is no communication between workers while the chains run, but there is regular communication between threads during while the filter runs. Thus it is better to prioritise workers over threads. Suppose you have 32 cores available and are running 4 chains. Ideally you would set n_workers = 4
in monty_runner_callr
to ensure all 4 chains run at the same time, and then set n_threads = 8
(n_threads
represents the number of threads per worker) to use all 32 cores, and choose n_particles
to be a multiple of 8.
18.3 Using manual scheduling to parallelise between chains
Suppose you have access to an HPC system with multiple nodes of e.g. 32 cores. Efficient combination of workers and threads on a single 32-core node is already beneficial. Manual scheduling with monty
enables running chains on different nodes, essentially increasing the number of cores available. For example, instead of running 4 chains with 4 workers on a 32-core node resulting in 8 threads per chain/worker, you could run 4 chains on 4 32-core nodes using 32 cores per chain (using 128 cores in total).
Instead of using monty_sample
, first we would need to prepare the chains
monty_sample_manual_prepare(posterior, sampler, n_steps = 1000,
path = "mypath",
initial = c(0.3, 0.1),
n_chains = 4)
where here "mypath"
is the location for writing outputs to. Cluster jobs are submitted for each chain, running the chain with
monty_sample_manual_run(1, path = "mypath")
for chain 1,
monty_sample_manual_run(2, path = "mypath")
for chain 2, and so on.
Once all chains have been run, results from the chains can be collected together
<- monty_sample_manual_collect("mypath") samples