Skip to contents

Create a bulk set of tasks based on applying a function over a vector or data.frame. This is the bulk equivalent of rrq_task_create_call, in the same way that rrq_task_create_bulk_expr is a bulk version of rrq_task_create_expr.

Usage

rrq_task_create_bulk_call(
  fn,
  data,
  args = NULL,
  queue = NULL,
  separate_process = FALSE,
  timeout_task_run = NULL,
  depends_on = NULL,
  controller = NULL
)

Arguments

fn

The function to call

data

The data to apply the function over. This can be a vector or list, in which case we act like lapply and apply fn to each element in turn. Alternatively, this can be a data.frame, in which case each row is taken as a set of arguments to fn. Note that if data is a data.frame then all arguments to fn are named.

args

Additional arguments to fn, shared across all calls. These must be named. If you are using a data.frame for data, you'd probably be better off adding additional columns that don't vary across rows, but the end result is the same.

queue

The queue to add the task to; if not specified the "default" queue (which all workers listen to) will be used. If you have configured workers to listen to more than one queue you can specify that here. Be warned that if you push jobs onto a queue with no worker, it will queue forever.

separate_process

Logical, indicating if the task should be run in a separate process on the worker. If TRUE, then the worker runs the task in a separate process using the callr package. This means that the worker environment is completely clean, subsequent runs are not affected by preceding ones. The downside of this approach is a considerable overhead in starting the external process and transferring data back.

timeout_task_run

Optionally, a maximum allowed running time, in seconds. This parameter only has an effect if separate_process is TRUE. If given, then if the task takes longer than this time it will be stopped and the task status set to TIMEOUT.

depends_on

Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue.

controller

The controller to use. If not given (or NULL) we'll use the controller registered with rrq_default_controller_set().

Value

A vector of task identifiers; this will have the length as data has rows if it is a data.frame, otherwise it has the same length as data

Examples

obj <- rrq_controller("rrq:example")

d <- data.frame(n = 1:10, lambda = rgamma(10, 5))
ts <- rrq_task_create_bulk_call(rpois, d, controller = obj)
rrq_task_wait(ts, controller = obj)
#> [1] TRUE
rrq_task_results(ts, controller = obj)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 5 7
#> 
#> [[3]]
#> [1] 3 3 4
#> 
#> [[4]]
#> [1]  4 10  7 10
#> 
#> [[5]]
#> [1] 7 5 6 3 4
#> 
#> [[6]]
#> [1] 4 2 6 2 7 6
#> 
#> [[7]]
#> [1] 3 2 2 3 6 2 5
#> 
#> [[8]]
#> [1] 1 1 3 5 4 1 4 4
#> 
#> [[9]]
#> [1] 3 3 4 4 1 5 2 0 4
#> 
#> [[10]]
#>  [1] 2 3 3 0 1 2 1 3 3 4
#>