Task queues for R, implemented using Redis.
Create an rrq_controller
object
obj <- rrq_controller2("rrq:readme")
rrq_default_controller_set(obj)
Submit work to the queue:
t <- rrq_task_create_expr(runif(10))
t
#> [1] "0b6ae3ae76e1cfab43a8ad4d41b7c4cd"
Query task process:
rrq_task_status(t)
#> [1] "PENDING"
Run tasks on workers in the background
rrq_worker_spawn2()
#> ℹ Spawning 1 worker with prefix 'pepperish_zebralongwingbutterfly'
#> <rrq_worker_manager>
#> Public:
#> clone: function (deep = FALSE)
#> id: pepperish_zebralongwingbutterfly_1
#> initialize: function (controller, n, logdir = NULL, name_config = "localhost",
#> is_alive: function (worker_id = NULL)
#> kill: function (worker_id = NULL)
#> logs: function (worker_id)
#> stop: function (worker_id = NULL, ...)
#> wait_alive: function (timeout, time_poll = 0.2, progress = NULL)
#> Private:
#> check_worker_id: function (worker_id)
#> controller: rrq_controller2
#> logfile: /tmp/Rtmp3k3m8o/file183821cbeb985/pepperish_zebralongwin ...
#> process: list
#> worker_id_base: pepperish_zebralongwingbutterfly
Wait for tasks to complete
rrq_task_wait(t)
#> [1] TRUE
Retrieve results from a task
rrq_task_result(t)
#> [1] 0.88813000 0.69115977 0.58344801 0.49533313 0.23230984 0.94035381
#> [7] 0.06302391 0.75492762 0.69768060 0.31522964
Query what workers have done
rrq_worker_log_tail(n = Inf)
#> worker_id child time command
#> 1 pepperish_zebralongwingbutterfly_1 NA 1713530454 ALIVE
#> 2 pepperish_zebralongwingbutterfly_1 NA 1713530454 ENVIR
#> 3 pepperish_zebralongwingbutterfly_1 NA 1713530454 QUEUE
#> 4 pepperish_zebralongwingbutterfly_1 NA 1713530454 TASK_START
#> 5 pepperish_zebralongwingbutterfly_1 NA 1713530454 TASK_COMPLETE
#> 6 pepperish_zebralongwingbutterfly_1 NA 1713530454 TASK_START
#> 7 pepperish_zebralongwingbutterfly_1 NA 1713530454 TASK_COMPLETE
#> message
#> 1
#> 2 new
#> 3 default
#> 4 a7f89f9e729d693de669bb3eca5f6710
#> 5 a7f89f9e729d693de669bb3eca5f6710
#> 6 0b6ae3ae76e1cfab43a8ad4d41b7c4cd
#> 7 0b6ae3ae76e1cfab43a8ad4d41b7c4cd
For more information, see vignette("rrq")
Install from the mrc-ide package repository:
drat:::add("mrc-ide")
install.packages("rrq")
Alternatively, install with remotes
:
remotes::install_github("mrc-ide/rrq", upgrade = FALSE)
To test, we need a redis server that can be automatically connected to using the redux
defaults. This is satisfied if you have an unauthenticated redis server running on localhost, otherwise you should update the environment variable REDIS_URL
to point at a redis server. Do not use a production server, as the package will create and delete a lot of keys.
A suitable redis server can be started using docker with
./scripts/redis start
(and stopped with ./scripts/redis stop
)