Create a bulk set of tasks based on applying a function over a vector or data.frame. This is the bulk equivalent of rrq_task_create_call, in the same way that rrq_task_create_bulk_expr is a bulk version of rrq_task_create_expr.
Usage
rrq_task_create_bulk_call(
fn,
data,
args = NULL,
queue = NULL,
separate_process = FALSE,
timeout_task_run = NULL,
depends_on = NULL,
controller = NULL
)
Arguments
- fn
The function to call
- data
The data to apply the function over. This can be a vector or list, in which case we act like
lapply
and applyfn
to each element in turn. Alternatively, this can be a data.frame, in which case each row is taken as a set of arguments tofn
. Note that ifdata
is adata.frame
then all arguments tofn
are named.- args
Additional arguments to
fn
, shared across all calls. These must be named. If you are using adata.frame
fordata
, you'd probably be better off adding additional columns that don't vary across rows, but the end result is the same.- queue
The queue to add the task to; if not specified the "default" queue (which all workers listen to) will be used. If you have configured workers to listen to more than one queue you can specify that here. Be warned that if you push jobs onto a queue with no worker, it will queue forever.
- separate_process
Logical, indicating if the task should be run in a separate process on the worker. If
TRUE
, then the worker runs the task in a separate process using thecallr
package. This means that the worker environment is completely clean, subsequent runs are not affected by preceding ones. The downside of this approach is a considerable overhead in starting the external process and transferring data back.- timeout_task_run
Optionally, a maximum allowed running time, in seconds. This parameter only has an effect if
separate_process
isTRUE
. If given, then if the task takes longer than this time it will be stopped and the task status set toTIMEOUT
.- depends_on
Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue.
- controller
The controller to use. If not given (or
NULL
) we'll use the controller registered withrrq_default_controller_set()
.
Value
A vector of task identifiers; this will have the length as
data
has rows if it is a data.frame
, otherwise it has the
same length as data
Examples
obj <- rrq_controller("rrq:example")
d <- data.frame(n = 1:10, lambda = rgamma(10, 5))
ts <- rrq_task_create_bulk_call(rpois, d, controller = obj)
rrq_task_wait(ts, controller = obj)
#> [1] TRUE
rrq_task_results(ts, controller = obj)
#> [[1]]
#> [1] 1
#>
#> [[2]]
#> [1] 5 7
#>
#> [[3]]
#> [1] 3 3 4
#>
#> [[4]]
#> [1] 4 10 7 10
#>
#> [[5]]
#> [1] 7 5 6 3 4
#>
#> [[6]]
#> [1] 4 2 6 2 7 6
#>
#> [[7]]
#> [1] 3 2 2 3 6 2 5
#>
#> [[8]]
#> [1] 1 1 3 5 4 1 4 4
#>
#> [[9]]
#> [1] 3 3 4 4 1 5 2 0 4
#>
#> [[10]]
#> [1] 2 3 3 0 1 2 1 3 3 4
#>