In addition to passing tasks (and results) between a controller and workers, the controller can also send “messages” to workers. This vignette shows what the possible messages do.

In order to do this, we’re going to need a queue and a worker:

obj <- rrq::rrq_controller$new("rrq:messages")
logdir <- tempfile()
w <- rrq::rrq_worker_spawn(obj, logdir = logdir)
#>  Spawning 1 worker with prefix 'nonspheric_siberiantiger'
worker_id <- w$id

On startup the worker log contains:

[2024-04-18 17:00:05.003056] ALIVE
[2024-04-18 17:00:05.003972] ENVIR new
[2024-04-18 17:00:05.004391] QUEUE default
                                 __
                ______________ _/ /
      ______   / ___/ ___/ __ `/ /_____
     /_____/  / /  / /  / /_/ /_/_____/
 ______      /_/  /_/   \__, (_)   ______
/_____/                   /_/     /_____/
    worker:        nonspheric_siberiantiger_1
    config:        localhost
    rrq_version:   0.7.12
    platform:      x86_64-pc-linux-gnu (64-bit)
    running:       Ubuntu 22.04.4 LTS
    hostname:      fv-az573-536
    username:      runner
    queue:         rrq:messages:queue:default
    wd:            /home/runner/work/rrq/rrq/vignettes
    pid:           7266
    redis_host:    127.0.0.1
    redis_port:    6379
    heartbeat_key: <not set>

Because one of the main effects of messages is to print to the worker logfile, we’ll print this fairly often.

Messages and responses

  1. The queue sends a message for one or more workers to process. The message has an identifier that is derived from the current time. Messages are written to a first-in-first-out queue, per worker, and are processed independently by workers who do not look to see if other workers have messages or are processing them.

  2. As soon as a worker has finished processing any current job it will process the message (it must wait to finish a current job but will not start any further jobs).

  3. Once the message has been processed (see below) a response will be written to a response list with the same identifier as the message.

Some messages interact with the worker timeout:

  • PING, ECHO, EVAL, INFO PAUSE, RESUME and REFRESH will reset the timer, as if a task had been run
  • TIMEOUT_SET explicitly interacts with the timer
  • TIMEOUT_GET does not reset the timer, reporting the remaining time
  • STOP causes the worker to exit, so has no interaction with the timer

PING

The PING message simply asks the worker to return PONG. It’s useful for diagnosing communication issues because it does so little

message_id <- obj$message_send("PING")

The message id is going to be useful for getting responses:

message_id
#> [1] "1713459605.229455"

(this is derived from the current time, according to Redis which is the central reference point of time for the whole system).

[2024-04-18 17:00:05.229851] MESSAGE PING
PONG
[2024-04-18 17:00:05.230404] RESPONSE PING

The logfile prints:

  1. the request for the PING (MESSAGE PING)
  2. the value PONG to the R message stream
  3. logging a response (RESPONSE PONG), which means that something is written to the response stream.

We can access the same bits of information in the worker log:

obj$worker_log_tail(n = Inf)
#>                    worker_id child       time  command message
#> 1 nonspheric_siberiantiger_1    NA 1713459605    ALIVE        
#> 2 nonspheric_siberiantiger_1    NA 1713459605    ENVIR     new
#> 3 nonspheric_siberiantiger_1    NA 1713459605    QUEUE default
#> 4 nonspheric_siberiantiger_1    NA 1713459605  MESSAGE    PING
#> 5 nonspheric_siberiantiger_1    NA 1713459605 RESPONSE    PING

This includes the ALIVE message as the worker comes up.

Inspecting the logs is fine for interactive use, but it’s going to be more useful often to poll for a response.

We already know that our worker has a response, but we can ask anyway:

obj$message_has_response(message_id)
#> nonspheric_siberiantiger_1 
#>                       TRUE

Or inversely we can as what messages a given worker has responses for:

obj$message_response_ids(worker_id)
#> [1] "1713459605.229455"

To fetch the responses from all workers it was sent to (always returning a named list):

obj$message_get_response(message_id)
#> $nonspheric_siberiantiger_1
#> [1] "PONG"

or to fetch the response from a given worker:

obj$message_get_response(message_id, worker_id)
#> $nonspheric_siberiantiger_1
#> [1] "PONG"

The response can be deleted by passing delete = TRUE to this method:

obj$message_get_response(message_id, worker_id, delete = TRUE)
#> $nonspheric_siberiantiger_1
#> [1] "PONG"

after which recalling the message will throw an error:

obj$message_get_response(message_id, worker_id)
#> Error in `rrq_message_get_response()`:
#> ! Response missing for worker: 'nonspheric_siberiantiger_1'

There is also a timeout argument that lets you wait until a response is ready (as in $task_wait()).

obj$enqueue(Sys.sleep(2))
#> [1] "efc263dd522b0f8aa8ebcc2fb18a739a"
message_id <- obj$message_send("PING")
obj$message_get_response(message_id, worker_id, delete = TRUE, timeout = 10)
#> $nonspheric_siberiantiger_1
#> [1] "PONG"

Looking at the log will show what went on here:

obj$worker_log_tail(n = 4)
#>                    worker_id child       time       command
#> 1 nonspheric_siberiantiger_1    NA 1713459606    TASK_START
#> 2 nonspheric_siberiantiger_1    NA 1713459608 TASK_COMPLETE
#> 3 nonspheric_siberiantiger_1    NA 1713459608       MESSAGE
#> 4 nonspheric_siberiantiger_1    NA 1713459608      RESPONSE
#>                            message
#> 1 efc263dd522b0f8aa8ebcc2fb18a739a
#> 2 efc263dd522b0f8aa8ebcc2fb18a739a
#> 3                             PING
#> 4                             PING
  1. A task is received
  2. 2s later the task is completed
  3. Then the message is received
  4. Then, basically instantaneously, the message is responded to

However, because the message is only processed after the task is completed, the response takes a while to come back. Equivalently, from the worker log:

[2024-04-18 17:00:06.043697] TASK_START efc263dd522b0f8aa8ebcc2fb18a739a
[2024-04-18 17:00:08.048226] TASK_COMPLETE efc263dd522b0f8aa8ebcc2fb18a739a
[2024-04-18 17:00:08.048849] MESSAGE PING
PONG
[2024-04-18 17:00:08.049218] RESPONSE PING

ECHO

This is basically like PING and not very interesting; it prints an arbitrary string to the log. It always returns "OK" as a response.

message_id <- obj$message_send("ECHO", "hello world!")
obj$message_get_response(message_id, worker_id, timeout = 10)
#> $nonspheric_siberiantiger_1
#> [1] "OK"
[2024-04-18 17:00:08.248792] MESSAGE ECHO
hello world!
[2024-04-18 17:00:08.249397] RESPONSE ECHO

INFO

The INFO command refreshes and returns the worker information.

We already have a copy of the worker info; it was created when the worker started up:

obj$worker_info()[[worker_id]]
#>   <rrq_worker_info>
#>     worker:      nonspheric_siberiantiger_1
#>     config:      localhost
#>     rrq_version: 0.7.12
#>     platform:    x86_64-pc-linux-gnu (64-bit)
#>     running:     Ubuntu 22.04.4 LTS
#>     hostname:    fv-az573-536
#>     username:    runner
#>     queue:       rrq:messages:queue:default
#>     wd:          /home/runner/work/rrq/rrq/vignettes
#>     pid:         7266
#>     redis_host:  127.0.0.1
#>     redis_port:  6379

We can force the worker to refresh:

message_id <- obj$message_send("INFO")

Here’s the new worker information, complete with an updated envir field:

obj$message_get_response(message_id, worker_id, timeout = 10)
#> $nonspheric_siberiantiger_1
#> $nonspheric_siberiantiger_1$worker
#> [1] "nonspheric_siberiantiger_1"
#> 
#> $nonspheric_siberiantiger_1$config
#> [1] "localhost"
#> 
#> $nonspheric_siberiantiger_1$rrq_version
#> [1] "0.7.12"
#> 
#> $nonspheric_siberiantiger_1$platform
#> [1] "x86_64-pc-linux-gnu (64-bit)"
#> 
#> $nonspheric_siberiantiger_1$running
#> [1] "Ubuntu 22.04.4 LTS"
#> 
#> $nonspheric_siberiantiger_1$hostname
#> [1] "fv-az573-536"
#> 
#> $nonspheric_siberiantiger_1$username
#> [1] "runner"
#> 
#> $nonspheric_siberiantiger_1$queue
#> [1] "rrq:messages:queue:default"
#> 
#> $nonspheric_siberiantiger_1$wd
#> [1] "/home/runner/work/rrq/rrq/vignettes"
#> 
#> $nonspheric_siberiantiger_1$pid
#> [1] 7266
#> 
#> $nonspheric_siberiantiger_1$redis_host
#> [1] "127.0.0.1"
#> 
#> $nonspheric_siberiantiger_1$redis_port
#> [1] 6379

EVAL

Evaluate an arbitrary R expression, passed as a string (not as any sort of unevaluated or quoted expression). This expression is evaluated in the global environment, which is not the environment in which queued code is evaluated in.

message_id <- obj$message_send("EVAL", "1 + 1")
obj$message_get_response(message_id, worker_id, timeout = 10)
#> $nonspheric_siberiantiger_1
#> [1] 2

This could be used to evaluate code that has side effects, such as installing packages. However, due to limitations with how R loads packages the only way to update and reload a package is going to be to restart the worker.

PAUSE / RESUME

The PAUSE / RESUME messages can be used to prevent workers from picking up new work (and then allowing them to start again).

obj$worker_status()
#> nonspheric_siberiantiger_1 
#>                     "IDLE"
message_id <- obj$message_send("PAUSE")
obj$message_get_response(message_id, worker_id, timeout = 10)
#> $nonspheric_siberiantiger_1
#> [1] "OK"
obj$worker_status()
#> nonspheric_siberiantiger_1 
#>                   "PAUSED"

Once paused workers ignore tasks, which stay on the queue:

t <- obj$enqueue(runif(5))
obj$task_status(t)
#> 5f0899c23d6583ca78dca9493e6cb4bd 
#>                        "PENDING"

Sending a RESUME message unpauses the worker:

message_id <- obj$message_send("RESUME")
obj$message_get_response(message_id, worker_id, timeout = 10)
#> $nonspheric_siberiantiger_1
#> [1] "OK"
obj$task_wait(t, 5)
#> [1] 0.5704070 0.4412998 0.3284139 0.6874685 0.8690313

SET_TIMEOUT / GET_TIMEOUT

Workers will quit after being left idle for more than a certain time; this is their timeout. Only processing tasks counts as work (not messages). You can query the timeout with GET_TIMEOUT and set it with SET_TIMEOUT. For our worker above the timeout is infinite; it will never quit:

obj$message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $nonspheric_siberiantiger_1
#> timeout_idle    remaining 
#>          Inf          Inf

We can set this to a finite value, in seconds:

obj$message_send_and_wait("TIMEOUT_SET", 600, worker_ids = worker_id)
#> $nonspheric_siberiantiger_1
#> [1] "OK"

Here the timeout is set to 10 minutes (600s).

Once set, the TIMEOUT_GET returns the length of time remaining before the worker exits

obj$message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $nonspheric_siberiantiger_1
#> timeout_idle    remaining 
#>     600.0000     599.8842
Sys.sleep(5)
obj$message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $nonspheric_siberiantiger_1
#> timeout_idle    remaining 
#>      600.000      594.826

One useful pattern is to send work to workers, then set the timeout to zero. This means that when work is complete they will exit (almost) immediately:

grp <- obj$lapply(1:5, function(x) {
  Sys.sleep(0.5)
  runif(x)
}, timeout_task_wait = 0)
obj$message_send("TIMEOUT_SET", 0, worker_id)
obj$tasks_wait(grp$task_ids)
#> $af9c0f29608e73d218ec0c5568ddb265
#> [1] 0.6514729
#> 
#> $`127f7b5c59b59faf59708240d8471c24`
#> [1] 0.04160036 0.29388348
#> 
#> $`41155e9313a82134c7957f90b28f66e3`
#> [1] 0.09687766 0.03890603 0.97403863
#> 
#> $a028dc5dcb40d27bdaae0c8134a76667
#> [1] 0.3405890 0.1340167 0.3517258 0.3096213
#> 
#> $`13e11ce07ae1ac6fbb1232972a28cbfd`
#> [1] 0.6443115 0.8909218 0.8691705 0.3427710 0.8651562

The worker will remain idle for 60s (by default) which is the length of time that one poll for work lasts, then it will exit.

obj$worker_status(worker_id)
#> nonspheric_siberiantiger_1 
#>                     "IDLE"
obj$message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $nonspheric_siberiantiger_1
#> timeout_idle    remaining 
#>            0            0

Messages that are supported but use via wrappers:

There are other methods that are typically used via methods on the [rrq::rrq_controller] object.

  • REFRESH: requests that the worker refresh its evaluation environment. Typically used via the $envir() method
  • STOP: sent with a informational message as an argument, requests that the worker stop. Typically used via the $worker_stop() method