In a perfect world, nothing would fail, and this vignette would not be needed. But here we are.

When you run tasks within a single process and things go wrong, you know immediately you see it happen. With a queuing system like rrq it’s less obvious when something bad has happened because the task is running on another process, possibly on a different machine!

  • Plain errors are the simplest sort of fault to recover from. Your task throws an error, rrq picks this up and marks the task as errored, and moves on to the next thing. You can then see that this error has happened as its status.
  • If your task crashes the process it can take out the worker (depending on how you have configured rrq; see below). In this case nothing updates the task status, and your task appears to stay running forever. This situation also arises if your process is killed by the operating system (e.g., out of memory) or via some other condition (e.g., an administrator terminating the process).
  • If your task is running on another machine and that machine becomes unreachable (switched off, disconnects from the network, etc) your task will never appear to finish.

This vignette discusses how you can mitigate against and recover from these sorts of issues. We start with the expected errors and build up to considerations for creating a fault tolerant queue.

Error handling

We start with the simplest sort of fault, and can just as easily happen locally as remotely with rrq. In this case the handling is fairly well defined and there’s not much you need to do. This section is not really “fault tolerance” at all, but simply how rrq handles errors and what you can do about it.

Consider this simple function which would fit a linear model between two variables:

fit_model <- function(x, y) {
  lm(y ~ x)
}
library(rrq)
obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4)))
rrq_default_controller_set(obj)
rrq_worker_envir_set(rrq_envir(sources = "fault.R"))
w <- rrq_worker_spawn()
#>  Spawning 1 worker with prefix 'nonspheric_siberiantiger'

In the happy case, everything works as expected:

x <- runif(5)
y <- 2 * x + rnorm(length(x), 0, 0.2)
t <- rrq_task_create_expr(fit_model(x, y))
rrq_task_wait(t, 10)
#> [1] TRUE
rrq_task_status(t)
#> [1] "COMPLETE"
rrq_task_result(t)
#> 
#> Call:
#> lm(formula = y ~ x)
#> 
#> Coefficients:
#> (Intercept)            x  
#>     -0.3968       2.5900

But if we provide invalid input, the task will error:

t <- rrq_task_create_expr(fit_model(x, NULL))
rrq_task_wait(t, 10)
#> [1] FALSE
rrq_task_result(t)
#> <rrq_task_error>
#>   from:   model.frame.default(formula = y ~ x, drop.unused.levels = TRUE)
#>   error:  invalid type (NULL) for variable 'y'
#>   queue:  rrq:74beefbc
#>   task:   62b9a9e8439dd7427f78147a92653377
#>   status: ERROR
#>   * To throw this error, use stop() with it
#>   * This error has a stack trace, use '$trace' to see it

The first thing to note here is that the task does not throw an error when you fetch the result, either via task_wait as above, or via task_result:

r <- rrq_task_result(t)
r
#> <rrq_task_error>
#>   from:   model.frame.default(formula = y ~ x, drop.unused.levels = TRUE)
#>   error:  invalid type (NULL) for variable 'y'
#>   queue:  rrq:74beefbc
#>   task:   62b9a9e8439dd7427f78147a92653377
#>   status: ERROR
#>   * To throw this error, use stop() with it
#>   * This error has a stack trace, use '$trace' to see it

However, the result will be an object of rrq_task_error which you can test for using inherits:

inherits(r, "rrq_task_error")
#> [1] TRUE

The default behaviour of rrq is not to error when fetching a task as that would require that you use tryCatch everywhere where you retrieve tasks that might have failed, and because errors are often interesting themselves. For example, rrq_task_error objects include stack traces alongside the error:

r$trace
#>      
#>   1. ├─rlang::try_fetch(...)
#>   2. │ ├─base::tryCatch(...)
#>   3. │ │ └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   4. │ │   └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   5. │ │     └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#>   6. │ └─base::withCallingHandlers(...)
#>   7. ├─base::eval(task$expr, envir)
#>   8. │ └─base::eval(task$expr, envir)
#>   9. │   └─fit_model(x, NULL)
#>  10. │     ├─stats::lm(y ~ x)
#>  11. │     │ └─base::eval(mf, parent.frame())
#>  12. │     │   └─base::eval(mf, parent.frame())
#>  13. │     ├─stats::model.frame(formula = y ~ x, drop.unused.levels = TRUE)
#>  14. │     └─stats::model.frame.default(formula = y ~ x, drop.unused.levels = TRUE)
#>  15. └─base::.handleSimpleError(...)
#>  16.   └─rlang (local) h(simpleError(msg, call))
#>  17.     └─handlers[[3L]](cnd)

These are rlang stack traces, which are somewhat richer than those produced by traceback(), containing the same set of stacks but arranged in a tree. See the documentation there for details. This error object will also include any warnings captured while the task ran.

Objects of class rrq_task_error inherit from error and condition so, once thrown, will behave as expected in programs using errors for flow control (e.g., with tryCatch); you can throw them yourself with stop():

stop(r)
#> Error in model.frame.default(formula = y ~ x, drop.unused.levels = TRUE): invalid type (NULL) for variable 'y'

You can also change the default behaviour to error on failure by passing error = TRUE to rrq_task_result() or rrq_tasks_result(), which will immediately rethrow the error in your R session (your program could then stop or you could again catch it with tryCatch):

rrq_task_result(t, error = TRUE)
#> Error in model.frame.default(formula = y ~ x, drop.unused.levels = TRUE): invalid type (NULL) for variable 'y'

This process is unchanged if the task is run in a separate process (with separate_process = TRUE passed to enqueue), with the same status and return type, and with the trace information available after failure.

Increasing resiliance via separate processes

Running tasks in separate processes (e.g., rrq_task_create_expr(mytask(), separate_process = TRUE)) is the simplest way of making things more resiliant because this creates a layer of isolation between the worker and the task. If your task crashes R (e.g., a segmentation fault due to a bug in your C/C++ code) or is killed by the operating system then the worker process survives and can update the keys in Redis directly to advertise this fact. This is generally much nicer than when the worker dies and the task status cannot be updated.

The downside of using separate processes is that it is much slower; compare the time taken to queue, run an retrieve a trivial task run in the same worker process (look at the elapsed entry)

system.time(
  rrq_task_wait(rrq_task_create_expr(identity(1)), 10))
#>    user  system elapsed 
#>   0.001   0.000   0.002

with the same task run in a separate process on the worker

system.time(
  rrq_task_wait(rrq_task_create_expr(identity(1), separate_process = TRUE), 10))
#>    user  system elapsed 
#>   0.001   0.000   0.351

We expect this difference to be ~100 fold in local workers. Almost all the cost is due to the overhead of starting and terminating a fresh R session. If your queue uses a lot of packages there will be additional overhead here too as these are loaded. However, this cost is fixed and will decrease as a fraction of the total running time as the running time of your task increases. So for long-running tasks the additional safety of separate processes is probably worthwhile. For smaller tasks you may want to make sure you run a heartbeat process so that you can handle failures via that route.

Consider running some long running job; this one simply sleeps for an hour

t <- rrq_task_create_expr(Sys.sleep(3600), separate_process = TRUE)

To simulate the process crashing, we’ve killed it. Because we have queued this on a separate process we can use rrq_task_info() to fetch the process id (PID) of the process that the task is running in (this is different to that of the worker).

info <- rrq_task_info(t)
tools::pskill(info$pid)

Because the task was run in a separate process, our worker could detect that the task has died unexpectedly:

rrq_task_status(t)
#> [1] "DIED"

This is different to the error status in the previous section (that was ERROR). Note that if we had not run the task in a separate process the task status would be unchanged as RUNNING) because nothing could ever update the task status!

Retrieving the result has similar behaviour to the error case; we don’t throw but instead return an object of class rrq_task_error (which also inherits from error and condition). However, this time there’s really not much extra information in the error:

r <- rrq_task_result(t)
r
#> <rrq_task_error>
#>   error:  Task not successful: DIED
#>   queue:  rrq:74beefbc
#>   task:   6c73f3bb6480cb9e740a3d4255bfaf0b
#>   status: DIED
#>   * To throw this error, use stop() with it

There’s also no trace available

r$trace
#> NULL

Loss of workers

In this section we outline what you can do about unexplained and unreported failures in your task, or loss of workers. This will typically be that your worker has crashed (due to your task crashing it perhaps), killed (e.g., by the operating system or an administrator) or the loss of the machine that it is working on.

In order to enable fault tolerance for this sort of issue, you first need to enable a “heatbeat” on the worker processes. This is a second process on each worker that periodically writes to the Redis database on a key that will expire in a time slightly longer than that period, in effect making a dead man’s switch - see rrq::rrq_heartbeat for details. So if the worker process dies for any reason, then after a while we’ll detect that as its key has expired. We can then take some action.

To enable the heartbeat, save a worker configuration before starting a worker:

rrq_worker_config_save(
  "localhost",
  rrq_worker_config(heartbeat_period = 3))

When you spawn a worker it will pick up this configuration, and we’ll be able to detect if it has died.

w <- rrq_worker_spawn()
#>  Spawning 1 worker with prefix 'raging_llama'

In order to simulate the loss of the worker, we will kill it after starting a long-running task:

Now, we can queue some long running process, which this worker will start:

The worker wil pick the task up fairly quickly, and the status will change to RUNNING:

rrq_task_status(t)
#> [1] "RUNNING"
rrq_worker_status(w$id)
#> raging_llama_1 
#>         "BUSY"

We kill the worker (simulating the job crashing, or the machine turning off, etc):

w$kill()

Because the worker has been killed, it can’t write to redis to tell us that the task can’t be completed, so this status will never change:

rrq_task_status(t)
#> [1] "RUNNING"
rrq_worker_status(w$id)
#> raging_llama_1 
#>         "BUSY"

The heartbeat will persist for 3 times the period given above (this multiplier is not configurable and while any number greater than 1 should be OK, we picked this as it allows for occasional network connectivity issues or slowness on the node - we may reduce it in a future version). This means that after 9s (3 * 3s) the key will have expired:

We can then use the worker_detect_exited() method to clean up

rrq_worker_detect_exited()
#> Lost 1 worker:
#>   - raging_llama_1
#> Orphaning 1 task:
#>   - d4fe288268ecf8ac8b93c48a691ea86e

At this point, the statuses of our task and worker are correct:

rrq_task_status(t)
#> [1] "DIED"
rrq_worker_status(w$id)
#> raging_llama_1 
#>         "LOST"

Fetching the task result provides the same DIED error as above:

rrq_task_result(t)
#> <rrq_task_error>
#>   error:  Task not successful: DIED
#>   queue:  rrq:74beefbc
#>   task:   d4fe288268ecf8ac8b93c48a691ea86e
#>   status: DIED
#>   * To throw this error, use stop() with it

This is still not terribly useful, as we have not provided any mechanism to automatically requeue a lost task or restart a dead worker, which we cover in the next section.

Retrying tasks

Regardless of how your tasks got to be in a broken situation, the mechanism for getting them out of this situation remains the same; you want to retry the task:

  • Your task might have errored because of some resource being unavailable or full and now it is back on line
  • The worker might have died because someone turned off the machine
  • Your memory errors only happen sometimes and you feel lucky (in this case you should also fix your program!)

Alternatively, perhaps your task ran to successful completion but you simply want to rerun it (e.g., some stochastic algorithm that is displaying a non-fatal pathology).

You can use rrq_task_retry to retry any number of tasks that have completed (in one of the terminal states - including COMPLETE but also ERROR, CANCELLED DIED or TIMEOUT). This is nondestructive to any of the task data, in particular its result, so you will still have access to the failed run and its stack trace (but see below).

An example where retrying fixes an error

In this example, we’ll run some “model” that may or may not converge, and we want to retry until it does. This represents some badly behaved task with a stochastic component that will eventually work if tried enough times:

stochastic_failure <- function(p) {
  x <- runif(1)
  if (x < p) {
    stop(sprintf("Convergence failure - x is only %0.2f", x))
  }
  x
}

We can create a new environment for our worker, to pick up this function:

rrq_worker_envir_set(rrq_envir(sources = "fault2.R"))
w <- rrq::rrq_worker_spawn()
#>  Spawning 1 worker with prefix 'congenial_dassierat'

Now, we enqueue a task as usual, then wait on it

t1 <- rrq_task_create_expr(stochastic_failure(0.5))
rrq_task_wait(t1, timeout = 10)
#> [1] FALSE
rrq_task_result(t1)
#> <rrq_task_error>
#>   from:   stochastic_failure(0.5)
#>   error:  Convergence failure - x is only 0.27
#>   queue:  rrq:74beefbc
#>   task:   4e8cd0fe77870b9b12c692c5e37b5583
#>   status: ERROR
#>   * To throw this error, use stop() with it
#>   * This error has a stack trace, use '$trace' to see it

Oh no, this task has failed!

We can use the task_retry method to requeue this job, meaning that it will run exactly as before. However, the random number stream has moved on and this task will return a different answer - perhaps it will work this time?

t2 <- rrq_task_retry(t1)
rrq_task_wait(t2, timeout = 10)
#> [1] FALSE
rrq_task_result(t2)
#> <rrq_task_error>
#>   from:   stochastic_failure(0.5)
#>   error:  Convergence failure - x is only 0.37
#>   queue:  rrq:74beefbc
#>   task:   3ed65e727b4bd6c7f42aee8df92c4e01
#>   status: ERROR
#>   * To throw this error, use stop() with it
#>   * This error has a stack trace, use '$trace' to see it

The task has failed again. Note here that task_retry has taken a task id and returned a new task id, which we have waited on. However, we could also have used the original id (more on this below).

Let’s give it one more go:

t3 <- rrq_task_retry(t2)
rrq_task_wait(t3, timeout = 10)
#> [1] TRUE
rrq_task_result(t3)
#> [1] 0.5728534

Success! The task has run to completion and we no longer have an error.

When you retry a task, rrq sets up redirects that point from the old task id to the new one, and in most cases you can use whichever you find more convenient. For example, we can now read the successful task result from any of the task ids:

rrq_task_results(c(t1, t2, t3))
#> [[1]]
#> [1] 0.5728534
#> 
#> [[2]]
#> [1] 0.5728534
#> 
#> [[3]]
#> [1] 0.5728534

To prevent this, pass follow = FALSE to return the original result

rrq_task_results(c(t1, t2, t3), follow = FALSE)
#> [[1]]
#> <rrq_task_error>
#>   from:   stochastic_failure(0.5)
#>   error:  Convergence failure - x is only 0.27
#>   queue:  rrq:74beefbc
#>   task:   4e8cd0fe77870b9b12c692c5e37b5583
#>   status: ERROR
#>   * To throw this error, use stop() with it
#>   * This error has a stack trace, use '$trace' to see it
#> 
#> [[2]]
#> <rrq_task_error>
#>   from:   stochastic_failure(0.5)
#>   error:  Convergence failure - x is only 0.37
#>   queue:  rrq:74beefbc
#>   task:   3ed65e727b4bd6c7f42aee8df92c4e01
#>   status: ERROR
#>   * To throw this error, use stop() with it
#>   * This error has a stack trace, use '$trace' to see it
#> 
#> [[3]]
#> [1] 0.5728534

Here, we can see that the first two times we ran the task they failed, along with the error and any backtraces. The same logic applies to other functions, such as rrq_task_status()

rrq_task_status(c(t1, t2, t3))
#> [1] "COMPLETE" "COMPLETE" "COMPLETE"
rrq_task_status(c(t1, t2, t3), follow = FALSE)
#> [1] "MOVED"    "MOVED"    "COMPLETE"

Note that the task status above has overwritten the original status (previously it was ERROR, but now it is MOVED). However, you can always read the original status in the error object’s status field, should you need it.

You can also extract times that events happened with rrq_task_times()

rrq_task_times(c(t1, t2, t3))
#>                                      submit      start   complete moved
#> 4e8cd0fe77870b9b12c692c5e37b5583 1714040451 1714040451 1714040451    NA
#> 3ed65e727b4bd6c7f42aee8df92c4e01 1714040451 1714040451 1714040451    NA
#> 8b36f0aba9450200ca58e2bbbfeac857 1714040451 1714040451 1714040451    NA
rrq_task_times(c(t1, t2, t3), follow = FALSE)
#>                                      submit      start   complete      moved
#> 4e8cd0fe77870b9b12c692c5e37b5583 1714040451 1714040451 1714040451 1714040451
#> 3ed65e727b4bd6c7f42aee8df92c4e01 1714040451 1714040451 1714040451 1714040451
#> 8b36f0aba9450200ca58e2bbbfeac857 1714040451 1714040451 1714040451         NA

This means that most of the time you can ignore that a task has been retried and work with whatever task id you have handy.

You can inspect the chain of tasks by looking at the return values rrq_task_info(). In the above example we have a chain of three tasks 4e8cd0… -> 3ed65e… -> 8b36f0…, which we can discover this way:

rrq_task_info(t1)
#> $id
#> [1] "4e8cd0fe77870b9b12c692c5e37b5583"
#> 
#> $status
#> [1] "MOVED"
#> 
#> $queue
#> [1] "default"
#> 
#> $separate_process
#> [1] FALSE
#> 
#> $timeout
#> NULL
#> 
#> $worker
#> [1] "congenial_dassierat_1"
#> 
#> $pid
#> NULL
#> 
#> $depends
#> $depends$up
#> NULL
#> 
#> $depends$down
#> NULL
#> 
#> 
#> $moved
#> $moved$up
#> NULL
#> 
#> $moved$down
#> [1] "3ed65e727b4bd6c7f42aee8df92c4e01" "8b36f0aba9450200ca58e2bbbfeac857"

Here, the moved field shows the tasks that are upstream (up) and downstream (down) of this task. For tasks that have never been retried both elements are NULL. The elements are always ordered from oldest to newest, and chains of tasks are always linear (i.e., there is no forking).

For the middle task in the chain, both up and down point at different tasks:

rrq_task_info(t2)$moved
#> $up
#> [1] "4e8cd0fe77870b9b12c692c5e37b5583"
#> 
#> $down
#> [1] "8b36f0aba9450200ca58e2bbbfeac857"

and for the leaf task:

rrq_task_info(t3)$moved
#> $up
#> [1] "4e8cd0fe77870b9b12c692c5e37b5583" "3ed65e727b4bd6c7f42aee8df92c4e01"
#> 
#> $down
#> NULL

Performance considerations

It is possible to change the default behaviour of follow to not follow through a task chain. Doing this might be slightly more efficient in cases where you are interested in running large numbers of small tasks and do not want to retry failed tasks. This is because almost every request that involves a task id will have to check to see if it has been moved.

To change this behaviour, pass the follow argument to the queue constructor, for example:

obj_nofollow <- rrq_controller(obj$queue_id, follow = FALSE)
rrq_task_result(t1, controller = obj_nofollow)
#> <rrq_task_error>
#>   from:   stochastic_failure(0.5)
#>   error:  Convergence failure - x is only 0.27
#>   queue:  rrq:74beefbc
#>   task:   4e8cd0fe77870b9b12c692c5e37b5583
#>   status: ERROR
#>   * To throw this error, use stop() with it
#>   * This error has a stack trace, use '$trace' to see it

Here, we would create a queue controller object with the default follow behaviour set to FALSE and so accessing a task result of a moved task would not search down through any potential retries unless follow = TRUE was explicitly passed.

Deleting tasks that have been retried

Deletion will operate on all tasks in a chain. So if you delete a task that has been retried then it deletes everything upstream (up to the original task) and downstream (down to the last time that the task was retried). This is because otherwise it is too easy to end up with inconsistent state. So if we run

all three tasks in the chain are deleted:

rrq_task_status(c(t1, t2, t3), follow = FALSE)
#> [1] "MISSING" "MISSING" "MISSING"

Going further

Currently this system does not allow you to do a few things that would be useful

  • You can’t change queue when you retry a task (for configurations with multiple queues)
  • You can’t automatically retry a task on failure (e.g., try this up to 5 times before giving up entirely)
  • You can’t retry tasks that have become impossible due to failure of a dependent task, even if that task has been retried and since succeeded

We will relax these limitations soon.