This vignette assumes some familiarity with the dust approach to random numbers, though we provide pointers below to the relevant documentation for further discussion.
The random number generator in dust
is well suited to
parallel calculations. On a single node we can set up a generator with
many streams and then parallelise the calculation using OpenMP. The
number of streams is either one per core or, typically better, one per
logical unit of calculation (which may be the maximum number of threads,
for example one stream per loop iteration). This is the approach taken
by dust objects (vignette("dust")
) and described for
standalone use of the generators
(vignette("rng_package")
).
When parallelising, you may also have separate ‘nodes’, which could be separate processors, physical machines, or just processes on the same machine. The difference between threads is that nodes will not share any memory with each other during processing.
Some additional care is required to use multiple nodes for the
calculation so their streams do not overlap (“distributed parallel
computation”), and this vignette explains the problems and solutions. It
applies to both use with dust
objects and for standalone
use of the random number generators.
To seed multiple streams we begin by seeding the first generator. Practically how this happens can be ignored for now.
r <- dust::dust_rng$new(seed = 1)
r$state()
#> [1] c1 5c 02 89 ec 2d 0a 91 1e 61 39 74 08 ab 41 5e c3 86 8d 6d f4 02 8a b1 56
#> [26] 49 ee d9 dd 95 81 e2
To seed the next stream we “jump ahead” in the stream. The generators
provide pre-computed coefficients that jump the state of the generator
to a state equal to drawing a very large number of random numbers (or
equivalently steps of the Markov chains). For the default generator
(xoshiro256+
) this is 2^128 draws, equal to the square root
of the period of the generator (2^{256}1).
r$jump()
r$state()
#> [1] 15 6e 10 9b 13 4b 6b 88 3a 50 b0 f1 ef 99 11 5c 6f 68 1c 0f e8 97 17 40 da
#> [26] c7 c6 2e 27 b7 b7 5d
Constructing a generator with more than one stream does this automatically:
r2 <- dust::dust_rng$new(seed = 1, n_streams = 4)
matrix(r2$state(), 32)
#> [,1] [,2] [,3] [,4]
#> [1,] c1 15 56 f9
#> [2,] 5c 6e fc 32
#> [3,] 02 10 6d 84
#> [4,] 89 9b df e6
#> [5,] ec 13 7b 30
#> [6,] 2d 4b 10 34
#> [7,] 0a 6b 6e d2
#> [8,] 91 88 ed 38
#> [9,] 1e 3a cf f3
#> [10,] 61 50 0f f2
#> [11,] 39 b0 57 89
#> [12,] 74 f1 0e 21
#> [13,] 08 ef 3d 72
#> [14,] ab 99 3b fd
#> [15,] 41 11 a2 53
#> [16,] 5e 5c 45 2b
#> [17,] c3 6f 87 b5
#> [18,] 86 68 5d 92
#> [19,] 8d 1c 4b 2c
#> [20,] 6d 0f 86 8f
#> [21,] f4 e8 44 25
#> [22,] 02 97 9a 9f
#> [23,] 8a 17 7d d8
#> [24,] b1 40 18 ae
#> [25,] 56 da f8 c6
#> [26,] 49 c7 d0 89
#> [27,] ee c6 ce 79
#> [28,] d9 2e 93 ef
#> [29,] dd 27 a8 b0
#> [30,] 95 b7 63 42
#> [31,] 81 b7 dd ec
#> [32,] e2 5d 41 9f
Note here how the first 32 bytes (the first column) are the same as
the initial state of the generator r
, the second set are
equal to the state after jumping once. The third is 2 * 2^128 steps
ahead and the fourth is 3 * 2^128 steps ahead.
We also have coefficients for a “long jump”, which is equivalent to 2^192 steps of the generator (generator period^{1}). The point of the original jump is that individual draws of random numbers will never overlap; the point of the long jumps is that each long jump is so far that we’ll never reach the next one by normal jumping (there are 2^64 ~= 10^19 jumps within a single long jump).
The basic idea using the above primitives is that we could take a set of “long jumps” to create a set of initial seeds, pass those to each node that will do calculation, then on each node take a set of jumps to create a locally parallel random number generator.
The function dust::dust_rng_distributed_state
creates a
list of random number states that are suitable for use on a series of
nodes:
dust::dust_rng_distributed_state(seed = 1, n_streams = 1, n_nodes = 2L)
#> [[1]]
#> [1] c1 5c 02 89 ec 2d 0a 91 1e 61 39 74 08 ab 41 5e c3 86 8d 6d f4 02 8a b1 56
#> [26] 49 ee d9 dd 95 81 e2
#>
#> [[2]]
#> [1] 77 69 c2 44 65 7b dc 93 49 c7 93 ef dd 06 fc 47 cf af f3 d0 80 97 01 68 45
#> [26] a2 90 f4 38 dd 4d 0a
Each element of this list can be passed through to to
dust::dust_rng_pointer$new
or to create
dust::dust_generator
objects. It will depend on your
application how you do this of course.
We also include the convenience function
dust::dust_rng_distributed_pointer
which returns a list of
pointer objects
dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1, n_nodes = 2L)
#> [[1]]
#> <dust_rng_pointer>
#> Public:
#> algorithm: xoshiro256plus
#> initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#> is_current: function ()
#> n_streams: 1
#> state: function ()
#> sync: function ()
#> Private:
#> is_current_: TRUE
#> ptr_: externalptr
#> state_: c1 5c 02 89 ec 2d 0a 91 1e 61 39 74 08 ab 41 5e c3 86 8d ...
#>
#> [[2]]
#> <dust_rng_pointer>
#> Public:
#> algorithm: xoshiro256plus
#> initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#> is_current: function ()
#> n_streams: 1
#> state: function ()
#> sync: function ()
#> Private:
#> is_current_: TRUE
#> ptr_: externalptr
#> state_: 77 69 c2 44 65 7b dc 93 49 c7 93 ef dd 06 fc 47 cf af f3 ...
We will illustrate the idea first with the pi estimation algorithm
from vignette("rng_package")
and to start we will ignore
the within-host parallelism (imagine we’re trying to distribute the
calculation over a set of single core jobs). Because we need our code
available on multiple nodes we can’t use cpp11::source_cpp
and will have to create a package, which will be called
piparallel
as before.
As a reminder, to use our pi estimation code on a single node we
first create a dust_rng_pointer
object
rng <- dust:::dust_rng_pointer$new()
We can call the function like so:
piparallel::pi_dust_parallel(1e6, rng, 1)
#> [1] 3.142492
To do this in parallel, we use the above functions to create a list of pointers:
ptrs <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1,
n_nodes = 4L)
Next, we need a parallel cluster; we’ll use the parallel
package for this and create a cluster of two nodes (this could be
increased of course). This might be done via a cluster scheduler or the
more sophisticated tools in the future
package.
cl <- parallel::makeCluster(2, "PSOCK")
We’ve installed our package in lib
so can make sure
that’s available on each node:
parallel::clusterCall(cl, loadNamespace, "piparallel", lib)
#> [[1]]
#> <environment: namespace:piparallel>
#>
#> [[2]]
#> <environment: namespace:piparallel>
Then we can call our pi estimation function. We take as arguments the number of iterations, a pointer to random state and the number of threads to use.
ans <- parallel::clusterApply(cl, ptrs, function(r)
piparallel::pi_dust_parallel(1e6, r, 1))
ans
#> [[1]]
#> [1] 3.144028
#>
#> [[2]]
#> [1] 3.141628
#>
#> [[3]]
#> [1] 3.141232
#>
#> [[4]]
#> [1] 3.141676
We now have four estimates of pi which are independent and can be safely averaged:
These are the same numbers we would have found had we run things in series locally:
ptrs_local <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1,
n_nodes = 4L)
ans_local <- lapply(ptrs_local, function(r)
piparallel::pi_dust_parallel(1e6, r, 1))
ans_local
#> [[1]]
#> [1] 3.144028
#>
#> [[2]]
#> [1] 3.141628
#>
#> [[3]]
#> [1] 3.141232
#>
#> [[4]]
#> [1] 3.141676
If we’d run the clusterApply
over a different number of
nodes, the calculation would also be unchanged.
The same approach works where we want each node to run in parallel. So we might want to distribute calculations over 4 nodes which each have 8 cores say. Then we’d configure our pointers to have more streams (here using 32 streams as that might represent our upper bound of per-node compute capacity)
ptrs <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 32,
n_nodes = 4L)
The state vector for each pointer is now considerably longer:
length(ptrs[[1]]$state())
#> [1] 1024
ans <- lapply(ptrs, function(r)
piparallel::pi_dust_parallel(1e6, r, 8))
ans
#> [[1]]
#> [1] 3.142302
#>
#> [[2]]
#> [1] 3.141256
#>
#> [[3]]
#> [1] 3.141828
#>
#> [[4]]
#> [1] 3.140525
As before, this gives us four answers (two per node) but each node ran 2 * 32 * 1e6 random draws, spread over 8 threads in two serial jobs.
With this set-up we can change the number of nodes and number of threads without affecting the calculation, but spread the work out over the compute that we have available to us.
The above set-up assumes that we want to establish our streams, do some work with them, then throw them away. That is, we never want to run calculations on the same nodes again. To sensibly we have several choices:
Take another set of long jumps to jump over all partial
series used and continue from there. Possibly the simplest
solution; if we have set up initial states for n
nodes and
we have our initial seed we can simply take n
long jumps to
move away from our initial sequence, then configure seeds using the next
n
states. This can be repeated without practical limit. It
has the advantage that no reverse communication needs to happen about
the random number seed - that is, the worker nodes never need to tell
our main node where they got to in the sequence.
Report back the final state reached by each node. A more involved solution, but possibly more satisfying, involves each node sending back its final state at the end of sampling (along with whatever calculations were being performed). We then pass this state back to the nodes when continuing so that all calculations are taken from an unbroken set of streams.
pi_estimate_continue <- function(n, ptr, n_threads) {
value <- piparallel::pi_dust_parallel(n, ptr, n_threads)
ptr$sync()
list(value = value,
ptr = ptr)
}
parallel::clusterExport(cl, "pi_estimate_continue")
ptrs <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1,
n_nodes = 4L)
ans <- parallel::clusterApply(cl, ptrs, function(r)
pi_estimate_continue(1e6, r, 1))
ans
#> [[1]]
#> [[1]]$value
#> [1] 3.144028
#>
#> [[1]]$ptr
#> <dust_rng_pointer>
#> Public:
#> algorithm: xoshiro256plus
#> initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#> is_current: function ()
#> n_streams: 1
#> state: function ()
#> sync: function ()
#> Private:
#> is_current_: TRUE
#> ptr_: externalptr
#> state_: 3c 9a e0 2e ac 65 52 0d b4 be 20 c8 cd c5 9e 27 82 14 7e ...
#>
#>
#> [[2]]
#> [[2]]$value
#> [1] 3.141628
#>
#> [[2]]$ptr
#> <dust_rng_pointer>
#> Public:
#> algorithm: xoshiro256plus
#> initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#> is_current: function ()
#> n_streams: 1
#> state: function ()
#> sync: function ()
#> Private:
#> is_current_: TRUE
#> ptr_: externalptr
#> state_: 2e 3d 0e 9c fb a7 d3 08 f3 32 89 25 c1 38 b8 aa 95 0b 65 ...
#>
#>
#> [[3]]
#> [[3]]$value
#> [1] 3.141232
#>
#> [[3]]$ptr
#> <dust_rng_pointer>
#> Public:
#> algorithm: xoshiro256plus
#> initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#> is_current: function ()
#> n_streams: 1
#> state: function ()
#> sync: function ()
#> Private:
#> is_current_: TRUE
#> ptr_: externalptr
#> state_: 7f b4 7d 26 a0 13 11 24 20 95 93 02 4b 3d 16 9d ba cf a2 ...
#>
#>
#> [[4]]
#> [[4]]$value
#> [1] 3.141676
#>
#> [[4]]$ptr
#> <dust_rng_pointer>
#> Public:
#> algorithm: xoshiro256plus
#> initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#> is_current: function ()
#> n_streams: 1
#> state: function ()
#> sync: function ()
#> Private:
#> is_current_: TRUE
#> ptr_: externalptr
#> state_: a0 46 22 8e b9 e5 e8 fe 37 99 06 e4 18 2d 13 f3 25 33 fb ...
We can again assemble our answer, which agrees with above
The state of these pointers now also agrees with above;
ptrs_local[[1]]$state()
#> [1] 3c 9a e0 2e ac 65 52 0d b4 be 20 c8 cd c5 9e 27 82 14 7e 79 c2 8f fb 47 4c
#> [26] 00 da 0f 97 50 f7 f6
ans[[1]]$ptr$state()
#> [1] 3c 9a e0 2e ac 65 52 0d b4 be 20 c8 cd c5 9e 27 82 14 7e 79 c2 8f fb 47 4c
#> [26] 00 da 0f 97 50 f7 f6
This approach could be useful where the calculation is to be continued (for example iterating until some convergence criteria is met). The additional work of synchronising and returning the pointer adds complexity though.
Ideally the calculations will not depend on number of nodes used, so
you should take the same approach as described in
vignette("rng_package")
and try and identify the
“parallelisable” component (which might be larger than the number of
nodes) and parallelise based on that.
For example, suppose we want to run a multi-chain MCMC simulation.
Parallelising across chains is an obvious between-node target. We could
then send n
chains over m
nodes
(m <= n
) and we’d want to arrange our seeds so that we
long-jump for each chain not over each node as that way no
matter how many nodes we had available we’d get the same results.
Here we show a more complete, and less contrived, use case with
dust’s “volatility” model. This example picks up from
vignette("data")
and the reader is directed there for a
fuller explanation.
volatility <- dust::dust_example("volatility")
The model was written to fit to a time series:
The model can run in parallel on a single node to produce a likelihood estimate given parameters, we’ll write a small function to do this:
run_filter <- function(pars, mod) {
if (any(pars < 0)) {
return(-Inf)
}
pars <- list(alpha = pars[[1]], sigma = pars[[2]])
mod$update_state(pars = pars, time = 0)
mod$filter()$log_likelihood
}
We assume that our parameter vector here is a length-2 numeric vector
with values alpha
and sigma
and ensure that
these are positive by returning -Inf
if invalid values are
given.
We can create an instance of the model, set the data, and run the
filter (for details see vignette("data")
)
n_particles <- 128
n_threads <- 4
mod <- volatility$new(list(), 0, n_particles, n_threads = n_threads)
mod$set_data(dust::dust_data(data))
#> NULL
run_filter(c(0.91, 1), mod)
#> [1] -179.0706
We could also write a very simple MCMC using the Metropolis-Hastings algorithm:
mcmc <- function(mod, p, n_times, proposal_sd = 0.02) {
ll <- run_filter(p, mod)
ret <- matrix(NA_real_, n_times + 1, length(p) + 1)
ret[1, ] <- c(p, ll)
for (i in seq_len(n_times)) {
p_new <- rnorm(length(p), p, proposal_sd)
ll_new <- run_filter(p_new, mod)
if (ll_new > ll || runif(1) < exp(ll_new - ll)) {
ll <- ll_new
p <- p_new
}
ret[i + 1, ] <- c(p, ll)
}
ret
}
We can run this for a number of steps and collect up sampled parameters and their likelihoods
ans <- mcmc(mod, c(0.91, 1), 20)
(We are cutting a lot of corners with the inference here; we have not specified any priors and are assuming that the distribution can be integrated over the parameters, and our proposal mechanism is extremely simple supporting only orthogonal proposals of the two parameters which are certainly correlated. However, the basic features are shared with any more sophisticated approach.)
Our aim here is now to carry this out in parallel, where we run a chain per node. To do this we need to do the same basic steps as above. If we want to run 8 chains we’ll need 8 seeds, even if we run on fewer nodes:
seed <- dust::dust_rng_distributed_state(n_nodes = 8L, algorithm = volatility)
It will also be useful to write a little wrapper function that repeats the setup from above, then runs the MCMC:
run_mcmc <- function(seed, data, ...) {
volatility <- dust::dust_example("volatility")
mod <- volatility$new(list(), 0, 128, seed = seed, n_threads = 2)
mod$set_data(dust::dust_data(data))
mcmc(mod, ...)
}
parallel::clusterExport(cl, c("mcmc", "run_mcmc", "run_filter"))
ans <- parallel::clusterApply(cl, seed, run_mcmc, data, c(0.91, 1), 20)
This produces a list of samples from each of the 8 chains, run over 2
nodes, each of which ran using 2 threads. We can scale any of this
parallelism (increasing threads until we hit the total number of
particles, increasing nodes until we hit the number of chains) but the
results will be deterministic (except for the call to runif
that we use here for the acceptance test within the MCMC).
Note that in creating the seed above we left n_streams
as the default (1) because the dust model constructor will take care of
expanding out per-particle seeds via a series of jumps.
You should be careful with these approaches to not exceed the compute available to you, especially when using a multi-user system such as an HPC.
[^1] It’s hard to get a sense of these numbers but 2^128
is about 3 x 10^38
draws. If we draw a billion numbers
(10^9) a second (1ns per draw is the rate on a reasonable CPU) we can
draw 3 x 10^16 numbers a year, so at this rate it would take this would
take 10^22 years to reach the next stream. In contrast, the universe is
14 billion years old (1.4 x 10^10 years).
3/4↩︎