A base class, on top of which queues can be developed. This includes all methods except for support for actually submitting tasks.

Public fields

context

The context object

Methods


Method new()

Constructor

Usage

queue_base$new(context_id, root = NULL, initialize = TRUE)

Arguments

context_id

A context identifier; either a context object or an name/id of a saved context (see context::context_save()).

root

Root path to load contexts from, if using a string identifier for context_id. If context_id is a context object then root must be NULL.

initialize

Logical, indicating if the context should be loaded immediately. If you want to run tasks this must be TRUE, but to query it can be FALSE. See context::context_load() and the $initialise_context() method.


Method initialize_context()

Load the context. This causes the packages to be loaded and all script files to be sourced. This is required before any tasks can be queued, because we need to be check against this environment to work out what is available on any workers.

Usage

queue_base$initialize_context()


Method task_list()

List all tasks known to this queue

Usage

queue_base$task_list()


Method task_status()

Return the status of selected tasks

Usage

queue_base$task_status(task_ids = NULL, named = TRUE)

Arguments

task_ids

Task identifiers to query. If NULL, then all tasks are queried.

named

Logical, indicating if the status result should be named by by task id.


Method task_times()

Return the times taken by tasks as a data.frame

Usage

queue_base$task_times(task_ids = NULL, unit_elapsed = "secs", sorted = TRUE)

Arguments

task_ids

Task identifiers to query. If NULL, then all tasks are queried.

unit_elapsed

Time unit to use for the elapsed fields

sorted

Logical indicating of the fields should be sorted by submitted time.


Method task_get()

Retrieve a task by id

Usage

queue_base$task_get(task_id, check_exists = TRUE)

Arguments

task_id

A task identifier (hexadecimal string)

check_exists

Logical, indicating if we should check that the task exists.


Method task_result()

Retrieve a task's result

Usage

queue_base$task_result(task_id)

Arguments

task_id

A task identifier (hexadecimal string)


Method task_delete()

Delete tasks

Usage

queue_base$task_delete(task_ids)

Arguments

task_ids

A vector of task identifiers (each a hexadecimal string)


Method task_bundle_list()

List all known task bundles

Usage

queue_base$task_bundle_list()


Method task_bundle_info()

List all known task bundles along with information about what was run and when.

Usage

queue_base$task_bundle_info()


Method task_bundle_get()

Get a task bundle by name

Usage

queue_base$task_bundle_get(name)

Arguments

name

A task bundle identifier (a string of the form adjective_anmimal)


Method task_bundle_retry_failed()

Retry failed tasks in a bundle

Usage

queue_base$task_bundle_retry_failed(name)

Arguments

name

A task bundle identifier (a string of the form adjective_anmimal)


Method enqueue()

Queue a task

Usage

queue_base$enqueue(expr, envir = parent.frame(), submit = TRUE, name = NULL)

Arguments

expr

An unevaluated expression to put on the queue

envir

The environment that you would run this expression in locally. This will be used to copy across any dependent variables. For example, if your expression is sum(1 + a), we will also send the value of a to the worker along with the expression.

submit

Logical indicating if the task should be submitted

name

Optional name for the task


Method enqueue_()

Queue a task

Usage

queue_base$enqueue_(expr, envir = parent.frame(), submit = TRUE, name = NULL)

Arguments

expr

A quoted expression to put on the queue

envir

The environment that you would run this expression in locally. This will be used to copy across any dependent variables. For example, if your expression is sum(1 + a), we will also send the value of a to the worker along with the expression.

submit

Logical indicating if the task should be submitted

name

Optional name for the task


Method enqueue_bulk()

Send a bulk set of tasks to your workers. This function is a bit like a mash-up of Map and do.call, when used with a data.frame argument, which is typically what is provided. Rather than $lapply() which applies FUN to each element of X, enqueue_bulk will apply over each row of X, spreading the columms out as arguments. If you have a function f(a, b)and a [data.frame] with columnsaandb` this should feel intuitive.

Usage

queue_base$enqueue_bulk(
  X,
  FUN,
  ...,
  do_call = TRUE,
  envir = parent.frame(),
  timeout = 0,
  time_poll = 1,
  progress = NULL,
  name = NULL,
  overwrite = FALSE
)

Arguments

X

Typically a data.frame, which you want to apply FUN over, row-wise. The names of the data.frame must match the arguments of your function.

FUN

A function

...

Additional arguments to add to every call to FUN

do_call

Logical, indicating if each row of X should be treated as if it was do.call(FUN, X[i, ]) - typically this is what you want.

envir

The environment to use to try and find the function

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed.

time_poll

Optional time with which to "poll" for completion.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

name

Optional name for a created bundle

overwrite

Logical, indicating if we should overwrite any bundle that exists with name name.


Method lapply()

Apply a function over a list of data. This is equivalent to using $enqueue() over each element in the list.

Usage

queue_base$lapply(
  X,
  FUN,
  ...,
  envir = parent.frame(),
  timeout = 0,
  time_poll = 1,
  progress = NULL,
  name = NULL,
  overwrite = FALSE
)

Arguments

X

A list of data to apply our function against

FUN

A function to be applied to each element of X

...

Additional arguments to add to every call to FUN

envir

The environment to use to try and find the function

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed.

time_poll

Optional time with which to "poll" for completion.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

name

Optional name for a created bundle

overwrite

Logical, indicating if we should overwrite any bundle that exists with name name.


Method mapply()

A wrapper like mapply

Send a bulk set of tasks to your workers. This function is a bit like a mash-up of Map and do.call, when used with a data.frame argument, which is typically what is provided. Rather than $lapply() which applies FUN to each element of X, enqueue_bulk will apply over each row of X, spreading the columms out as arguments. If you have a function f(a, b)and a [data.frame] with columnsaandb` this should feel intuitive.

Usage

queue_base$mapply(
  FUN,
  ...,
  MoreArgs = NULL,
  envir = parent.frame(),
  timeout = 0,
  time_poll = 1,
  progress = NULL,
  name = NULL,
  use_names = TRUE,
  overwrite = FALSE
)

Arguments

FUN

A function

...

Additional arguments to add to every call to FUN

MoreArgs

As for mapply, additional arguments that apply to every function call.

envir

The environment to use to try and find the function

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed.

time_poll

Optional time with which to "poll" for completion.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

name

Optional name for a created bundle

use_names

Use names

overwrite

Logical, indicating if we should overwrite any bundle that exists with name name.

X

Typically a data.frame, which you want to apply FUN over, row-wise. The names of the data.frame must match the arguments of your function.


Method submit()

Submit a task into a queue. This is a stub method and must be overridden by a derived class for the queue to do anything.

Usage

queue_base$submit(task_ids, names = NULL)

Arguments

task_ids

Vector of tasks to submit

names

Optional vector of names of tasks


Method unsubmit()

Unsubmit a task from the queue. This is a stub method and must be overridden by a derived class for the queue to do anything.

Usage

queue_base$unsubmit(task_ids)

Arguments

task_ids

Vector of tasks to submit