18  Parallelisation

Both dust2 and monty have built-in support that makes running in parallel relatively simple when you have multiple cores available. There are two main ways to parallelise:

  1. Using workers to parallelise between chains. This is useful to anyone running multiple chains for a sampler with monty irrespective of whether your model is deterministic or stochastic, written in odin2 or not.
  2. Using threads to parallelise between particles in a filter. is useful to anyone who has a stochastic model written in odin2 and is using the filter to estimate the marginal likelihood. Using threads cans can be done in combination with using workers.

The parallelisation is implemented in such a way that ensures that results will be identical to running in serial.

There is a third way to parallelise via manual scheduling of chains, which is of use if you want to distribute chains over, for example, the nodes of an HPC system.

We illustrate how to implement the three ways to parallelise below.

18.1 Using workers to parallelise between chains

library(monty)

Let’s revisit the simple Gaussian mixture model example from Section 10.1, and sample 4 chains of length 1000.

fn <- function(l, p, m1, m2) {
  log(p * dnorm(l, mean = m1) + (1 - p) * dnorm(l, mean = m2))
}

mixture_model <- monty_model_function(fn,
                                      fixed = list(p = 0.75, m1 = 3, m2 = 7),
                                      allow_multiple_parameters = TRUE)

sampler <- monty_sampler_random_walk(matrix(2))

set.seed(1)
samples <- monty_sample(mixture_model, sampler, 1000, initial = 3, n_chains = 4)
#> ⡀⠀ Sampling [▁▁▁▁] ■                                |   0% ETA: 28s
#> ✔ Sampled 4000 steps across 4 chains in 231ms
#> 

One of the inputs to monty_sample we have not specified is runner. The runner determines how the chains are run, and these can be setup with functions in monty. If runner is not specified then as a default the samples will be run with the simplest runner which is constructed with monty_runner_serial. With this runner, chains are run in series (one after another).

However if we have multiple cores available, we may want to run chains in parallel using workers. In this case we can construct a runner with monty_runner_callr. This runner uses the callr package to distribute your chains over a number of worker processes on the same machine.

Suppose we have 2 cores, then we will setup the runner setting n_workers = 2.

runner <- monty_runner_callr(n_workers = 2)

This indicates that we can run 2 chains in parallel. Running 4 chains in our example will mean that each worker process will run 2 chains in sequence. Ideally if you have sufficiently many cores available, you will set n_workers equal to the number of chains. Otherwise typically you want n_workers to be an even divisor of the number of chains - for 4 chains using 3 workers is not likely to be much faster than using 2 workers, as one of the workers would still need to run 2 chains in sequence.

We can run chains in parallel by using our runner

set.seed(1)
samples2 <- monty_sample(mixture_model, sampler, 1000, initial = 3, n_chains = 4,
                         runner = runner)

We have set the seed before running in serial and before running in parallel so we can compare the output, and we can see that they produce identical results.

identical(samples, samples2)
#> [1] TRUE

Notice that it has actually taken longer to run in parallel! This is because there is a cost to forking across the cores that outweighs the benefit of running in parallel in this case. If we run longer chains, in serial

set.seed(1)
samples <- monty_sample(mixture_model, sampler, 100000, initial = 3, n_chains = 4)
#> ⡀⠀ Sampling [▂▁▁▁] ■■■                              |   6% ETA: 19s
#> ⠄⠀ Sampling [▆▁▁▁] ■■■■■■■                          |  21% ETA: 16s
#> ⢂⠀ Sampling [█▃▁▁] ■■■■■■■■■■■■                     |  36% ETA: 13s
#> ⡂⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■                 |  51% ETA: 10s
#> ⠅⠀ Sampling [██▅▁] ■■■■■■■■■■■■■■■■■■■■■            |  66% ETA:  7s
#> ⢃⠀ Sampling [███▂] ■■■■■■■■■■■■■■■■■■■■■■■■■        |  81% ETA:  4s
#> ⡃⠀ Sampling [███▆] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■   |  96% ETA:  1s
#> ✔ Sampled 4e+05 steps across 4 chains in 20s
#> 

and then in parallel

runner <- monty_runner_callr(2)
set.seed(1)
samples2 <- monty_sample(mixture_model, sampler, 100000, initial = 3, n_chains = 4,
                         runner = runner)
#> ⡀⠀ Sampling [▂▂▁▁] ■■■■                             |  11% ETA: 19s
#> ⠄⠀ Sampling [▅▅▁▁] ■■■■■■■■■■                       |  30% ETA: 12s
#> ⢂⠀ Sampling [▇█▁▁] ■■■■■■■■■■■■■■■■                 |  49% ETA:  9s
#> ⡂⠀ Sampling [██▃▃] ■■■■■■■■■■■■■■■■■■■■■            |  65% ETA:  6s
#> ⠅⠀ Sampling [██▅▅] ■■■■■■■■■■■■■■■■■■■■■■■■■■       |  84% ETA:  3s
#> ✔ Sampled 4e+05 steps across 4 chains in 16.7s
#> 

we now see that we are running quicker in parallel! In general, the longer each chain takes to run and the fewer chains you need to run per worker, the greater the benefit of running in parallel.

18.2 Using threads to parallelise among particles in the filter

library(odin2)
library(dust2)

Let’s consider the following discrete-time, stochastic SIR model

sir <- odin({
  update(S) <- S - n_SI
  update(I) <- I + n_SI - n_IR
  update(R) <- R + n_IR
  update(incidence) <- incidence + n_SI
  
  p_SI <- 1 - exp(-beta * I / N * dt)
  p_IR <- 1 - exp(-gamma * dt)
  n_SI <- Binomial(S, p_SI)
  n_IR <- Binomial(I, p_IR)
  
  initial(S) <- N - I0
  initial(I) <- I0
  initial(R) <- 0
  initial(incidence, zero_every = 1) <- 0
  
  N <- parameter(1000)
  I0 <- parameter(10)
  beta <- parameter(0.2)
  gamma <- parameter(0.1)
  
  cases <- data()
  cases ~ Poisson(incidence)
})

Which we will fit to the (synthetic) data set data/incidence.csv.

d <- read.csv("data/incidence.csv")
head(d)
#>   time cases
#> 1    1    12
#> 2    2    23
#> 3    3    25
#> 4    4    36
#> 5    5    30
#> 6    6    57

We will setup our packer, prior and sampler

packer <- monty::monty_packer(scalar = c("beta", "gamma"),
                              fixed = list(I0 = 10, N = 1000))

prior <- monty::monty_dsl({
  beta ~ Exponential(mean = 0.3)
  gamma ~ Exponential(mean = 0.1)
})

vcv <- diag(2) * 0.01
sampler <- monty::monty_sampler_random_walk(vcv)

and then we will setup our likelihood using a filter with 100 particles and then run 4 chains without any parallelisation

filter <- dust_filter_create(sir, time_start = 0,
                             data = d, dt = 0.25,
                             n_particles = 200)

likelihood <- dust_likelihood_monty(filter, packer,
                                    save_trajectories = TRUE)

posterior <- likelihood + prior

samples <- monty_sample(posterior, sampler, n_steps = 1000,
                        initial = c(0.3, 0.1),
                        n_chains = 4)
#> ⡀⠀ Sampling [▁▁▁▁] ■                                |   0% ETA:  1m
#> ⠄⠀ Sampling [▃▁▁▁] ■■■                              |   8% ETA: 32s
#> ⢂⠀ Sampling [▅▁▁▁] ■■■■■■                           |  17% ETA: 29s
#> ⡂⠀ Sampling [█▁▁▁] ■■■■■■■■■                        |  25% ETA: 26s
#> ⠅⠀ Sampling [█▃▁▁] ■■■■■■■■■■■                      |  34% ETA: 23s
#> ⢃⠀ Sampling [█▅▁▁] ■■■■■■■■■■■■■■                   |  43% ETA: 20s
#> ⡃⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■                 |  51% ETA: 17s
#> ⠍⠀ Sampling [██▃▁] ■■■■■■■■■■■■■■■■■■■              |  60% ETA: 14s
#> ⢋⠀ Sampling [██▆▁] ■■■■■■■■■■■■■■■■■■■■■■           |  68% ETA: 11s
#> ⡋⠀ Sampling [███▁] ■■■■■■■■■■■■■■■■■■■■■■■■         |  77% ETA:  8s
#> ⠍⠁ Sampling [███▃] ■■■■■■■■■■■■■■■■■■■■■■■■■■■      |  86% ETA:  5s
#> ⢋⠁ Sampling [███▆] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■■    |  94% ETA:  2s
#> ✔ Sampled 4000 steps across 4 chains in 34.7s
#> 

An input to dust_filter_create that we have not specified is n_threads, which has a default value of 1. Threads can be used in the filter to run particles forward in parallel. Suppose we have two available cores and we want to use these for threads in the filter, then we would set n_threads = 2

filter <- dust_filter_create(sir, time_start = 0,
                             data = d, dt = 0.25,
                             n_particles = 200, 
                             n_threads = 2)

likelihood <- dust_likelihood_monty(filter, packer,
                                    save_trajectories = TRUE)

posterior <- likelihood + prior

samples <- monty_sample(posterior, sampler, n_steps = 1000,
                        initial = c(0.3, 0.1),
                        n_chains = 4)
#> ⡀⠀ Sampling [▂▁▁▁] ■■■                              |   5% ETA: 18s
#> ⠄⠀ Sampling [▆▁▁▁] ■■■■■■■                          |  21% ETA: 15s
#> ⢂⠀ Sampling [█▄▁▁] ■■■■■■■■■■■■                     |  36% ETA: 12s
#> ⡂⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■                 |  51% ETA:  9s
#> ⠅⠀ Sampling [██▅▁] ■■■■■■■■■■■■■■■■■■■■■            |  67% ETA:  6s
#> ⢃⠀ Sampling [███▃] ■■■■■■■■■■■■■■■■■■■■■■■■■■       |  82% ETA:  3s
#> ⡃⠀ Sampling [███▇] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■   |  98% ETA:  0s
#> ✔ Sampled 4000 steps across 4 chains in 19.4s
#> 

and we can see this is quicker than without parallelisation. As with workers and chains, generally the number of threads should be an even divisor of the number of particles.

You can use workers in combination with threads. To illustrate how to prioritise workers vs threads, let’s instead use our 2 cores for workers.

filter <- dust_filter_create(sir, time_start = 0,
                             data = d, dt = 0.25,
                             n_particles = 200, 
                             n_threads = 1)

likelihood <- dust_likelihood_monty(filter, packer,
                                    save_trajectories = TRUE)

posterior <- likelihood + prior

runner <- monty_runner_callr(n_workers = 2)

samples <- monty_sample(posterior, sampler, n_steps = 1000,
                        initial = c(0.3, 0.1),
                        n_chains = 4,
                        runner = runner)
#> ⡀⠀ Sampling [▂▂▁▁] ■■■■                             |   9% ETA: 26s
#> ⠄⠀ Sampling [▄▄▁▁] ■■■■■■■■■                        |  26% ETA: 16s
#> ⢂⠀ Sampling [▆▆▁▁] ■■■■■■■■■■■■■■                   |  42% ETA: 11s
#> ⡂⠀ Sampling [██▁▁] ■■■■■■■■■■■■■■■■■                |  55% ETA: 10s
#> ⠅⠀ Sampling [██▄▄] ■■■■■■■■■■■■■■■■■■■■■■■          |  72% ETA:  6s
#> ⢃⠀ Sampling [██▆▆] ■■■■■■■■■■■■■■■■■■■■■■■■■■■■     |  90% ETA:  2s
#> ✔ Sampled 4000 steps across 4 chains in 19.4s
#> 

We see that this was quicker - this is because there is no communication between workers while the chains run, but there is regular communication between threads during while the filter runs. Thus it is better to prioritise workers over threads. Suppose you have 32 cores available and are running 4 chains. Ideally you would set n_workers = 4 in monty_runner_callr to ensure all 4 chains run at the same time, and then set n_threads = 8 (n_threads represents the number of threads per worker) to use all 32 cores, and choose n_particles to be a multiple of 8.

18.3 Using manual scheduling to parallelise between chains

Suppose you have access to an HPC system with multiple nodes of e.g. 32 cores. Efficient combination of workers and threads on a single 32-core node is already beneficial. Manual scheduling with monty enables running chains on different nodes, essentially increasing the number of cores available. For example, instead of running 4 chains with 4 workers on a 32-core node resulting in 8 threads per chain/worker, you could run 4 chains on 4 32-core nodes using 32 cores per chain (using 128 cores in total).

Instead of using monty_sample, first we would need to prepare the chains

monty_sample_manual_prepare(posterior, sampler, n_steps = 1000,
                            path = "mypath",
                            initial = c(0.3, 0.1),
                            n_chains = 4)

where here "mypath" is the location for writing outputs to. Cluster jobs are submitted for each chain, running the chain with

monty_sample_manual_run(1, path = "mypath")

for chain 1,

monty_sample_manual_run(2, path = "mypath")

for chain 2, and so on.

Once all chains have been run, results from the chains can be collected together

samples <- monty_sample_manual_collect("mypath")