Skip to content

Commit

Permalink
Migrate to clustermq 0.9.0 interface
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Sep 25, 2023
1 parent 96d788f commit c2af92a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 109 deletions.
36 changes: 0 additions & 36 deletions .github/workflows/cover.yaml

This file was deleted.

26 changes: 0 additions & 26 deletions .github/workflows/lint.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: drake
Title: A Pipeline Toolkit for Reproducible Computation at Scale
Version: 7.13.5.9000
Version: 7.13.6
Authors@R: c(
person(
given = c("William", "Michael"),
Expand Down Expand Up @@ -120,7 +120,7 @@ Suggests:
bindr,
callr,
cli (>= 1.1.0),
clustermq (>= 0.8.8),
clustermq (>= 0.9.0),
crayon,
curl (>= 2.7),
data.table,
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Version 7.13.5.9000

# Version 7.13.6

* Migrate to the new interface in `clustermq` 0.9.0 (@mschubert).

# Version 7.13.5

Expand Down
45 changes: 14 additions & 31 deletions R/backend_clustermq.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,28 @@ cmq_set_common_data <- function(config) {
export <- as.list(config$envir, all.names = TRUE) # nocov
}
export$config <- hpc_config(config)
config$workers$set_common_data(
export = export,
fun = identity,
const = list(),
rettype = list(),
pkgs = character(0),
common_seed = config$settings$seed,
token = "set_common_data_token"
)
do.call(what = config$workers$env, args = export)
}

cmq_main <- function(config) {
on.exit(config$workers$finalize())
on.exit(config$workers$cleanup())
config$logger$disk("begin scheduling targets")
while (config$counter$remaining > 0) {
cmq_main_iter(config)
}
if (config$workers$cleanup()) {
on.exit()
}
}

cmq_main_iter <- function(config) {
msg <- config$workers$receive_data()
cmq_conclude_build(msg = msg, config = config)
if (!identical(msg$token, "set_common_data_token")) {
config$logger$disk("sending common data")
config$workers$send_common_data()
} else if (!config$queue$empty()) {
build <- config$workers$recv()
cmq_conclude_build(build = build, config = config)
if (!config$queue$empty()) {
cmq_next_target(config)
} else {
config$workers$send_shutdown_worker()
config$workers$send_shutdown()
}
}

cmq_conclude_build <- function(msg, config) {
build <- msg$result
cmq_conclude_build <- function(build, config) {
if (is.null(build)) {
return()
}
Expand Down Expand Up @@ -158,22 +143,20 @@ cmq_send_target <- function(target, config) {
spec <- hpc_spec(target, config)
config_tmp <- get_hpc_config_tmp(config)
config$logger$disk("build on an hpc worker", target = target)
config$workers$send_call(
expr = drake::cmq_build(
config$workers$send(
cmd = drake::cmq_build(
target = target,
meta = meta,
deps = deps,
spec = spec,
config_tmp = config_tmp,
config = config
),
env = list(
target = target,
meta = meta,
deps = deps,
spec = spec,
config_tmp = config_tmp
)
target = target,
meta = meta,
deps = deps,
spec = spec,
config_tmp = config_tmp
)
}

Expand Down
24 changes: 12 additions & 12 deletions R/backend_future.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ ft_launch_worker <- function(target, meta, protect, config) {
}

future_globals <- function(
target,
meta,
config,
spec,
config_tmp,
protect
target,
meta,
config,
spec,
config_tmp,
protect
) {
globals <- list(
DRAKE_GLOBALS__ = list(
Expand Down Expand Up @@ -161,12 +161,12 @@ future_globals <- function(
#' @param protect Names of targets that still need their
#' dependencies available in memory.
future_build <- function(
target,
meta,
config,
spec,
config_tmp,
protect
target,
meta,
config,
spec,
config_tmp,
protect
) {
config$spec <- spec
config <- restore_hpc_config_tmp(config_tmp, config)
Expand Down

0 comments on commit c2af92a

Please sign in to comment.