Delayed functions
The delayed
function constructs a “node” object for a task graph. It’s essentially an R function, to be executed later, with a few more items:
- Optionally a friendly display name in
name
- Optionally
local=TRUE
to execute the function on the current host – the default is to execute on TileDB Cloud. - If the function takes arguments, they can be specified by
args=...
indelayed
, or, via a separate call todelayed_args
. - The
compute
function executes the delayed functions.
a <- delayed(function() { 9 }, name='a', local=TRUE)
b <- delayed(function(x) { 10*x }, args=list(a), name='b', local=TRUE)
c <- delayed(function(x) { 100*x }, name='c', local=TRUE)
delayed_args(c) <- list(b)
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d', local=TRUE)
o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)
with output
[1] 990
Delayed SQL queries
This is a simple convenience wrapper connecting delayed
, as above, and TileDB Cloud SQL queries as described in the SQL vignette.
a <- delayed_sql(
namespace='your-namespace',
query="select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`",
name="rows-query")
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)
with output
avg_a rows
1 2.5000 1
2 6.5000 2
3 10.5000 3
4 14.5000 4
Delayed generic UDFs
This too is a simple convenience wrapper connecting delayed
and generic UDFs as described in the UDFs vignette.
a <- delayed_generic_udf(
namespace='your-namespace',
udf=function(vec, exponent) {
sum(vec ** exponent)
},
name='my-generic'
)
delayed_args(a) <- list(vec=1:10, exponent=3)
print(compute(a, namespace=namespaceToCharge))
with output
[1]
3025
Delayed array UDFs
This is another simple convenience wrapper connecting delayed
and array UDFs as described in the UDFs vignette.
a <- delayed_array_udf(
namespace='your-namespace'
array="TileDB-Inc/quickstart_dense",
udf=function(df) {
vec <- as.vector(df[["a"]])
list(min=min(vec), med=median(vec), max=max(vec))
},
selectedRanges=list(cbind(1,2), cbind(1,2)),
attrs=c("a")
)
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)
with output
$min
[1] 1
$med
[1] 3.5
$max
[1] 6
Composing tasks into task graphs
Nodes, be they from delayed
, delayed_sql
, delayed_array_udf
, or delayed_multi_array_udf
, can be connected together into directed acyclic graphs. (If you construct a graph with a cyclic dependency, you’ll get an error message promptly.)
Here’s an example:
# Build several delayed objects to define a graph.
# Locally executed; simple enough.
local = delayed(function(x) { x*2 }, local=TRUE)
delayed_args(local) <- list(100)
# Array UDF -- we specify selected ranges and attributes, then do some R on the
# dataframe which the UDF receives.
array_apply <- delayed_array_udf(
namespace=namespace, # namespace to charge
array="TileDB-Inc/quickstart_dense",
udf=function(df) { sum(as.vector(df[["a"]])) },
selectedRanges=list(cbind(1,4), cbind(1,4)),
attrs=c("a")
)
# SQL -- note the output is a dataframe, and values are all strings (MariaDB
# "decimal values") so we'll cast them to numeric later.
sql = delayed_sql(
namespace=namespace,
"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",
name="sql"
)
# Custom function for averaging all the results we are passing in
ourmean <- function(local, array_apply, sql) {
mean(c(local, array_apply, sql))
}
# This is essentially a task graph that looks like
# ourmean
# / | \
# / | \
# local array_apply sql
#
# The `local`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `ourmean` will computed on their results.
# Note here we slot out the ansswer from the SQL dataframe using `[[...]]`,
# and also cast to numeric.
res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]])))
print(compute(res, namespace=namespace, verbose=TRUE))
with output
[1] 168
Examining task graphs
In the ideal case, we connect together some computations and they run correctly the first time. In other cases, we need to inspect a bit. The TileDB-Cloud-R
package offers a few different ways to do this. Let’s revisit the simple “diamond” example from above, a few different ways.
Run as-is
namespace <- 'your-namespace'
a <- delayed(function() { 9 })
b <- delayed(function(x) { 10*x }, args=list(a))
c <- delayed(function(x) { 100*x }, args=list(a))
d <- delayed(function(...) { sum(...)}, args=list(b,c))
o <- compute(d, namespace=namespace)
print(o)
This is as above, except now we execute remotely.
This outputs
[1] 990
Run with verbosity
For more detail we can add display names to each node (the default naming is things like n000005
, which is less intuitive), and also use the verbose
option to compute
. In an R CLI session, this will live-update as the DAG runs; in a notebook, you’ll see output when the DAG completes.
namespace <- 'your-namespace'
a <- delayed(function() { 9 }, name='a')
b <- delayed(function(x) { 10*x }, args=list(a), name='b')
c <- delayed(function(x) { 100*x }, args=list(a), name='c')
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d')
o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)
with output like
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=NOT_STARTED
b args_ready=FALSE status=NOT_STARTED
c args_ready=FALSE status=NOT_STARTED
d args_ready=FALSE status=NOT_STARTED
1644869786 2022-02-14 15:16:26 START a
1644869786 2022-02-14 15:16:26 launch remote compute a
1644869789 2022-02-14 15:16:29 finish remote compute a
1644869789 2022-02-14 15:16:29 END a
1644869789 2022-02-14 15:16:29 START c
1644869789 2022-02-14 15:16:29 START b
1644869789 2022-02-14 15:16:29 launch remote compute c
1644869791 2022-02-14 15:16:31 finish remote compute c
1644869791 2022-02-14 15:16:31 END c
1644869789 2022-02-14 15:16:29 launch remote compute b
1644869791 2022-02-14 15:16:31 finish remote compute b
1644869791 2022-02-14 15:16:31 END b
1644869791 2022-02-14 15:16:31 START d
1644869791 2022-02-14 15:16:31 launch remote compute d
1644869794 2022-02-14 15:16:34 finish remote compute d
1644869794 2022-02-14 15:16:34 END d
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
[1] 990
Here we have a print of the DAG; then launch/finish of nodes in dependency order; then another print of the DAG; then finally the output as before.
Compute locally
Here the nodes run locally. Print statements within the nodes go to the terminal.
a <- delayed(function() { cat("NODE A\n"); 9 }, name='a')
b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b')
c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')
o <- compute(d, namespace=namespace, verbose=TRUE, force_all_local=TRUE)
print(o)
with output
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=NOT_STARTED
b args_ready=FALSE status=NOT_STARTED
c args_ready=FALSE status=NOT_STARTED
d args_ready=FALSE status=NOT_STARTED
1644869845 2022-02-14 15:17:25 START a
1644869845 2022-02-14 15:17:25 launch local compute a
NODE A
1644869845 2022-02-14 15:17:25 finish local compute a
1644869845 2022-02-14 15:17:25 END a
1644869845 2022-02-14 15:17:25 START b
1644869845 2022-02-14 15:17:25 START c
1644869845 2022-02-14 15:17:25 launch local compute b
NODE B
1644869845 2022-02-14 15:17:25 finish local compute b
1644869845 2022-02-14 15:17:25 END b
1644869845 2022-02-14 15:17:25 launch local compute c
NODE C
1644869845 2022-02-14 15:17:25 finish local compute c
1644869845 2022-02-14 15:17:25 END c
1644869845 2022-02-14 15:17:25 START d
1644869845 2022-02-14 15:17:25 launch local compute d
NODE D
1644869845 2022-02-14 15:17:25 finish local compute d
1644869845 2022-02-14 15:17:25 END d
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
> print(o)
[1] 990
Compute locally and sequentially
Here the nodes run not only locally, but with no concurrency, and all prints go directly to the terminal where you can see them right away:
a <- delayed(function() { cat("NODE A\n"); 9 }, name='a')
b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b')
c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')
o <- d$compute_sequentially()
print(o)
NODE A
NODE B
NODE C
NODE D
[1] 990
Continue from error
In this (admittedly artificial) example we show partial progress through a task graph, then a retry and continue.
> a <- delayed(function() { 9 }, name='a')
> b <- delayed(function(x) { stop("the train!") }, args=list(a), name='b') # Intentional error
> c <- delayed(function(x) { 100*x }, args=list(a), name='c')
> d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d')
> o <- d$compute(namespace=namespace)
Error in arg$poll(namespace = namespace, verbose = verbose, force_local = force_local) :
node failed: b: tiledbcloud: received error response: Server returned 500 response status code. Message: Error message: received an error from the container: docker container exited with non-zero code: 1
Docker logs:
Arguments file name: /dev/shm/tiledb_da86fa44-c1d7-4831-828d-0790844da970/args
Error in (function (x) : the train!
Calls: main -> compute_result -> do.call -> <Anonymous>
Execution halted
> show(d)
node=d,nargs=2Error in self$args_ready() : dependency has failed
> show(d$dag_for_terminal)
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=FAILED
c args_ready=TRUE status=RUNNING
d Error in self$args_ready() : dependency has failed
Fix the artificial error:
> b$func <- function(x) { 10*x }
> o <- d$compute(namespace=namespace)
> print(o)
[1] 990
> show(d$dag_for_terminal)
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
>