dag.dag
cloud.dag.dag
Classes
Name | Description |
---|---|
DAG | Low-level API for creating and managing direct acyclic graphs |
Node | Representation of a function to run in a DAG. |
DAG
cloud.dag.dag.DAG(self
=None
max_workers=False
use_processes=None
done_callback=None
update_callback=None
namespace=None
name=Mode.REALTIME
mode=None
retry_strategy=None
workflow_retry_strategy=None
deadline )
Low-level API for creating and managing direct acyclic graphs as TileDB Cloud Task Graphs.
Parameters
Name | Type | Description | Default |
---|---|---|---|
max_workers | Optional[int] | Number of workers to allocate to execute DAG. | None |
use_processes | bool | If true will use processes instead of threads. | False |
done_callback | Optional[Callable[[DAG], None]] | Optional call back function to register for when dag is completed. Function will be passed reference to this DAG. | None |
update_callback | Optional[Callable[[DAG], None]] | Optional call back function to register for when dag status is updated. Function will be passed reference to this DAG. | None |
namespace | Optional[str] | Namespace to execute DAG in. | None |
name | Optional[str] | Human-readable name for DAG to be showin in Task Graph logs. | None |
mode | Mode | Mode the DAG is to run in, valid options are: Mode.REALTIME, Mode.BATCH. | Mode.REALTIME |
retry_strategy | Optional[models.RetryStrategy] | K8S retry policy to be applied to each Node. | None |
workflow_retry_strategy | Optional[models.RetryStrategy] | K8S retry policy to be applied to DAG. | None |
deadline | Optional[int] | Duration (sec) DAG allowed to execute before timeout. | None |
Attributes
Name | Description |
---|---|
cancelled_nodes | Cancelled Nodes. |
completed_nodes | Completed Nodes. |
deadline | Duration (sec) DAG allowed to execute before timeout. |
failed_nodes | Failed Nodes. |
id | UUID for DAG instance. |
max_workers | Flag. If true will use processes instead of threads. |
mode | Mode the DAG is to run in. |
name | Human-readable name for DAG to be showin in Task Graph logs. |
namespace | Namespace to execute DAG in. |
nodes | Mapping of Node UUIDs to Node instances. |
nodes_by_name | Mapping of Node names to Node instances. |
not_started_nodes | Queued Nodes. |
retry_strategy | K8S retry policy to be applied to each Node. |
running_nodes | Running Nodes. |
server_graph_uuid | The server-generated UUID of this graph, used for logging. |
status | Get DAG status. |
use_processes | Number of works to allocate to execute DAG. |
visualization | The following executors are initialized by calling |
workflow_retry_strategy | K8S retry policy to be applied to DAG. |
Methods
Name | Description |
---|---|
add_done_callback | Add a callback for when DAG is completed |
add_node | Create and add a node. |
add_node_obj | Add node to DAG. |
add_update_callback | Add a callback for when DAG status is updated |
cancel | Cancel DAG. |
compute | Start the DAG by executing root nodes. |
end_nodes | Find all end nodes |
end_results | Get all end results, will block if all results are not ready |
end_results_by_name | Get all end results, will block if all results are not ready |
find_end_nodes | Find all end nodes. |
get_tiledb_plot_node_details | Build list of details needed for tiledb node graph |
initial_setup | Performs one-time server-side setup tasks. |
report_node_complete | Report a node as complete. |
retry_all | Retries all failed and cancelled nodes. |
stats | Get DAG node statistics. |
submit_array_udf | Submit a function that will be executed in the cloud serverlessly. |
submit_local | Submit a function that will run locally. |
submit_sql | Submit a sql query to run serverlessly in the cloud. |
submit_udf | Submit a function that will be executed in the cloud serverlessly. |
submit_udf_stage | Submit a function that will be executed in the cloud serverlessly. |
visualize | Build and render a tree diagram of the DAG. |
wait | Wait for DAG to be completed. |
add_done_callback
cloud.dag.dag.DAG.add_done_callback(func)
Add a callback for when DAG is completed
Parameters
Name | Type | Description | Default |
---|---|---|---|
func | Function to call when DAG status is updated. The function will be passed reference to this dag | required |
add_node
cloud.dag.dag.DAG.add_node(
func_exec*args
=None
name=True
local_mode**kwargs
)
Create and add a node.
DEPRECATED. Use submit_local
instead.
Parameters
Name | Type | Description | Default |
---|---|---|---|
func_exec | function to execute | required | |
args | arguments for function execution | () |
|
name | name | None |
Returns
Name | Type | Description |
---|---|---|
Node that is created |
add_node_obj
cloud.dag.dag.DAG.add_node_obj(node)
Add node to DAG.
Parameters
Name | Type | Description | Default |
---|---|---|---|
node | to add to dag | required |
Returns
Name | Type | Description |
---|---|---|
Node | Node instance. |
add_update_callback
cloud.dag.dag.DAG.add_update_callback(func)
Add a callback for when DAG status is updated
Parameters
Name | Type | Description | Default |
---|---|---|---|
func | Function to call when DAG status is updated. The function will be passed reference to this dag | required |
cancel
cloud.dag.dag.DAG.cancel()
Cancel DAG.
compute
cloud.dag.dag.DAG.compute()
Start the DAG by executing root nodes.
end_nodes
cloud.dag.dag.DAG.end_nodes()
Find all end nodes
dag = DAG() dag.add_node(Node())
end_nodes = dag.end_nodes()
Returns
Name | Type | Description |
---|---|---|
list of root nodes |
end_results
cloud.dag.dag.DAG.end_results()
Get all end results, will block if all results are not ready
dag = DAG() dag.add_node(Node())
end_results = dag.end_results()
Returns
Name | Type | Description |
---|---|---|
map of results by node ID |
end_results_by_name
cloud.dag.dag.DAG.end_results_by_name()
Get all end results, will block if all results are not ready
dag = DAG() dag.add_node(Node())
end_results = dag.end_results_by_name()
Returns
Name | Type | Description |
---|---|---|
map of results by node name |
find_end_nodes
cloud.dag.dag.DAG.find_end_nodes()
Find all end nodes.
Returns
Name | Type | Description |
---|---|---|
List[Node] | list of end nodes |
get_tiledb_plot_node_details
cloud.dag.dag.DAG.get_tiledb_plot_node_details()
Build list of details needed for tiledb node graph
Returns
Name | Type | Description |
---|---|---|
Dict[str, Dict[str, str]] | Node summary |
initial_setup
cloud.dag.dag.DAG.initial_setup()
Performs one-time server-side setup tasks.
Can safely be called multiple times.
report_node_complete
cloud.dag.dag.DAG.report_node_complete(node)
Report a node as complete.
Parameters
Name | Type | Description | Default |
---|---|---|---|
node | Node | Node to mark as complete. | required |
retry_all
cloud.dag.dag.DAG.retry_all()
Retries all failed and cancelled nodes.
stats
cloud.dag.dag.DAG.stats()
Get DAG node statistics.
Returns
Name | Type | Description |
---|---|---|
Dict[str, Union[int, float]] | All node stats. |
submit_array_udf
*args, **kwargs) cloud.dag.dag.DAG.submit_array_udf(func,
Submit a function that will be executed in the cloud serverlessly.
Parameters
Name | Type | Description | Default |
---|---|---|---|
func | Callable | Function to execute in UDF task. | required |
*args | Any | Postional arguments to pass into Node instantation. | () |
**kwargs | Any | Keyword args to pass into Node instantiation. | {} |
Returns
Name | Type | Description |
---|---|---|
Node that is created. |
submit_local
*args, **kwargs) cloud.dag.dag.DAG.submit_local(func,
Submit a function that will run locally.
Parameters
Name | Type | Description | Default |
---|---|---|---|
func | Callable | Function to execute in UDF task. | required |
*args | Any | Postional arguments to pass into Node instantation. | () |
**kwargs | Keyword args to pass into Node instantiation. | {} |
Returns
Name | Type | Description |
---|---|---|
Node that is created |
submit_sql
*args, **kwargs) cloud.dag.dag.DAG.submit_sql(
Submit a sql query to run serverlessly in the cloud.
Parameters
Name | Type | Description | Default |
---|---|---|---|
sql | Query to execute. | required | |
*args | Any | Postional arguments to pass into Node instantation. | () |
**kwargs | Any | Keyword args to pass into Node instantiation. | {} |
Returns
Name | Type | Description |
---|---|---|
Node | Node that is created |
submit_udf
*args, **kwargs) cloud.dag.dag.DAG.submit_udf(func,
Submit a function that will be executed in the cloud serverlessly.
Parameters
Name | Type | Description | Default |
---|---|---|---|
func | Callable | Function to execute in UDF task. | required |
*args | Postional arguments to pass into Node instantation. | () |
|
**kwargs | Keyword args to pass into Node instantiation. | {} |
Returns
Name | Type | Description |
---|---|---|
Node that is created. |
submit_udf_stage
cloud.dag.dag.DAG.submit_udf_stage(
func*args
=None
expand_node_output**kwargs
)
Submit a function that will be executed in the cloud serverlessly.
Expand on node output simply means to dynamically allocate works to this UDF stage based on the output of the node indicated via the expand_node_output
arg.
For example, if a node, NodeA
(NodeA = DAG.submit(...)
), returns a list of str values and NodeA
is passed to expand_node_output
, along with an arg in the func
passed to submit_udf_stage
that accepts a str is also passed NodeA
, a node will spawn in parallel for each str value in the result of NodeA
.
= DAG(...)
graph
= graph.submit()
NodeA
= graph.submit_udf_stage(..., expand_node_output=NodeA, str_arg=NodeA) NodeB
Parameters
Name | Type | Description | Default |
---|---|---|---|
func | Callable | Function to execute in UDF task. | required |
*args | Any | Postional arguments to pass into Node instantation. | () |
expand_node_output | Optional[Node] | Node that we want to expand the output of. The output of the node should be a JSON encoded list. | None |
**kwargs | Any | Keyword args to pass into Node instantiation. | {} |
Returns
Name | Type | Description |
---|---|---|
Node | Node that is created. |
visualize
=True, auto_update=True, force_plotly=False) cloud.dag.dag.DAG.visualize(notebook
Build and render a tree diagram of the DAG.
Parameters
Name | Type | Description | Default |
---|---|---|---|
notebook | Is the visualization inside a jupyter notebook? If so we’ll use a widget. | True |
|
auto_update | Should the diagram be auto updated with each status change. | True |
|
force_plotly | Force the use of plotly graphs instead of TileDB Plot Widget. | False |
Returns
Name | Type | Description |
---|---|---|
returns figure. |
wait
=None) cloud.dag.dag.DAG.wait(timeout
Wait for DAG to be completed.
Parameters
Name | Type | Description | Default |
---|---|---|---|
timeout | Optional[float] | optional timeout in seconds to wait for DAG to be completed | None |
Returns
Name | Type | Description |
---|---|---|
None | None or raises TimeoutError if timeout occurs |
Node
cloud.dag.dag.Node(self
func*args
=None
name=None
dag=Mode.REALTIME
mode=None
expand_node_output=None
_download_results=None
_internal_prewrapped_func=True
_internal_accepts_stored_params**kwargs
)
Representation of a function to run in a DAG.
Parameters
Name | Type | Description | Default |
---|---|---|---|
func | Callable[…, _T] | Function to run as UDF task. | required |
*args | Any | Positional arguments to pass to UDF. | () |
name | Optional[str] | Human-readable name of Node task. | None |
dag | Optional[DAG] | DAG this node is associated with. | None |
mode | Mode | Mode the Node is to run in. | Mode.REALTIME |
expand_node_output | Optional[Node] | Node to expand processes upon. | None |
_download_results | Optional[bool] | An optional boolean to override default result-downloading behavior. If True, will always download the results of the function immediately upon completion. If False, will not download the results of the function immediately, but will be downloaded when .result() is called. |
None |
_internal_prewrapped_func | Callable[…, results.Result[_T]] | For internal use only. A function that returns. something that is already a Result, which does not require wrapping. We assume that all prewrapped functions make server calls. | None |
_internal_accepts_stored_params | bool | For internal use only. Applies only when _prewrapped_func is used. True if _prewrapped_func can accept stored parameters. False if it cannot, and all parameters must be serialized. |
True |
**kwargs | Any | Keyword arguments to pass to UDF. | {} |
Attributes
Name | Description |
---|---|
args | Positional args to pass into UDF. |
children | Child Nodes (Nodes dependent on this Node). |
dag | DAG this Node is pinned to. |
error | Return Node error if encountered. |
future | Returns something that pretends to be a Future. |
id | UUID for Node instance. |
kwargs | Keyword args to pass into UDF. |
mode | Processing mode of Node. |
name | The human-readable name of Node. |
parents | Parent Nodes (Nodes this Node is dependent on). |
status | Node status. |
Methods
Name | Description |
---|---|
add_done_callback | Add callback function to execute at Node completion. |
cancel | Cancel Node. |
cancelled | Whether Node is cancelled. |
depends_on | Create dependency chain for node, useful when there is a dependency |
exception | Return execption if one was raised. |
result | Fetch Node return. |
retry | Retry Node. |
running | Whether Node is actively running. |
task_id | Gets the server-side Task ID of this node. |
wait | Wait for node to be completed. |
add_done_callback
cloud.dag.dag.Node.add_done_callback(fn)
Add callback function to execute at Node completion.
Parameters
Name | Type | Description | Default |
---|---|---|---|
fn | Callable[[Node[_T]], None] | Callaback to execute. | required |
cancel
cloud.dag.dag.Node.cancel()
Cancel Node.
cancelled
cloud.dag.dag.Node.cancelled()
Whether Node is cancelled.
depends_on
cloud.dag.dag.Node.depends_on(node)
Create dependency chain for node, useful when there is a dependency that does not rely directly on passing results from one to another.
Parameters
Name | Type | Description | Default |
---|---|---|---|
node | Node | node to mark as a dependency of this node | required |
exception
=None) cloud.dag.dag.Node.exception(timeout
Return execption if one was raised.
result
=None) cloud.dag.dag.Node.result(timeout
Fetch Node return.
Parameters
Name | Type | Description | Default |
---|---|---|---|
timeout | Optional[float] | Time to wait to fetch result. | None |
Returns
Name | Type | Description |
---|---|---|
_T | Results of Node processing. |
retry
cloud.dag.dag.Node.retry()
Retry Node.
running
cloud.dag.dag.Node.running()
Whether Node is actively running.
task_id
cloud.dag.dag.Node.task_id()
Gets the server-side Task ID of this node.
Returns
Name | Type | Description |
---|---|---|
Optional[uuid.UUID] | None if this has no task ID (as it was run on the client side). |
wait
=None) cloud.dag.dag.Node.wait(timeout
Wait for node to be completed.
Parameters
Name | Type | Description | Default |
---|---|---|---|
timeout | Optional[float] | optional timeout in seconds to wait for DAG to be completed. | None |
Returns
Name | Type | Description |
---|---|---|
None | None or raises TimeoutError if timeout occurs. |
Functions
Name | Description |
---|---|
list_logs | Retrieves the list of task graph logs you can view. |
replace_stored_params | Descends into data structures and replaces Stored Params with results. |
server_logs | Retrieves the full server-side logs for the given DAG. |
list_logs
cloud.dag.dag.list_logs(=None
namespace=None
created_by=None
search=None
start_time=None
end_time=1
page=10
per_page )
Retrieves the list of task graph logs you can view.
The returned graph logs will be “light” versions, meaning they will not include any details about the execution state of an individual DAG. To retrieve those, pass the ID to :func:server_logs
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
namespace | Optional[str] | If present, include logs for only this namespace. If absent, include logs for all namespaces you have access to. | None |
created_by | Optional[str] | Include only logs from this user (if present). | None |
search | Optional[str] | A search string for the name of the task graph. | None |
start_time | Optional[datetime.datetime] | Include logs created after this time. | None |
end_time | Optional[datetime.datetime] | Include logs created before this time. | None |
page | int | The page number to use, starting from 1. | 1 |
per_page | int | The number of items per page. | 10 |
replace_stored_params
cloud.dag.dag.replace_stored_params(tree, loader)
Descends into data structures and replaces Stored Params with results.
server_logs
=None) cloud.dag.dag.server_logs(dag_or_id, namespace
Retrieves the full server-side logs for the given DAG.
The DAG can be provided as a DAG object, or the server-provided UUID of a DAG’s execution log in either :class:uuid.UUID
or string form. This can be used to access both completed DAGs and in-progress DAGs. The returned DAGs will include full data
Will return None if called with a DAG object that has no server-side nodes.