dag.dag
cloud.dag.dag
Directed acyclic graphs as TileDB task graphs.
Classes
| Name | Description |
|---|---|
| DAG | Low-level API for creating and managing direct acyclic graphs |
| Node | Representation of a function to run in a DAG. |
DAG
cloud.dag.dag.DAG(
max_workers=None,
use_processes=False,
done_callback=None,
update_callback=None,
namespace=None,
name=None,
mode=Mode.REALTIME,
retry_strategy=None,
workflow_retry_strategy=None,
deadline=None,
)Low-level API for creating and managing direct acyclic graphs as TileDB Cloud Task Graphs.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| max_workers | Optional[int] | Number of workers to allocate to execute DAG. | None |
| use_processes | bool | If true will use processes instead of threads. | False |
| done_callback | Optional[Callable[[DAG], None]] | Optional call back function to register for when dag is completed. Function will be passed reference to this DAG. | None |
| update_callback | Optional[Callable[[DAG], None]] | Optional call back function to register for when dag status is updated. Function will be passed reference to this DAG. | None |
| namespace | Optional[str] | Namespace to execute DAG in. | None |
| name | Optional[str] | Human-readable name for DAG to be showin in Task Graph logs. | None |
| mode | Mode | Mode the DAG is to run in, valid options are: Mode.REALTIME, Mode.BATCH. | Mode.REALTIME |
| retry_strategy | Optional[models.RetryStrategy] | K8S retry policy to be applied to each Node. | None |
| workflow_retry_strategy | Optional[models.RetryStrategy] | K8S retry policy to be applied to DAG. | None |
| deadline | Optional[int] | Duration (sec) DAG allowed to execute before timeout. | None |
Attributes
| Name | Description |
|---|---|
| cancelled_nodes | Cancelled Nodes. |
| completed_nodes | Completed Nodes. |
| deadline | Duration (sec) DAG allowed to execute before timeout. |
| failed_nodes | Failed Nodes. |
| id | UUID for DAG instance. |
| max_workers | Flag. If true will use processes instead of threads. |
| mode | Mode the DAG is to run in. |
| name | Human-readable name for DAG to be showin in Task Graph logs. |
| namespace | Namespace to execute DAG in. |
| nodes | Mapping of Node UUIDs to Node instances. |
| nodes_by_name | Mapping of Node names to Node instances. |
| not_started_nodes | Queued Nodes. |
| retry_strategy | K8S retry policy to be applied to each Node. |
| running_nodes | Running Nodes. |
| server_graph_uuid | The server-generated UUID of this graph, used for logging. |
| status | Get DAG status. |
| use_processes | Number of works to allocate to execute DAG. |
| visualization | The following executors are initialized by calling |
| workflow_retry_strategy | K8S retry policy to be applied to DAG. |
Methods
| Name | Description |
|---|---|
| add_done_callback | Add a callback for when DAG is completed |
| add_node | Create and add a node. |
| add_node_obj | Add node to DAG. |
| add_update_callback | Add a callback for when DAG status is updated |
| cancel | Cancel DAG. |
| compute | Start the DAG by executing root nodes. |
| end_nodes | Find all end nodes |
| end_results | Get all end results, will block if all results are not ready |
| end_results_by_name | Get all end results, will block if all results are not ready |
| find_end_nodes | Find all end nodes. |
| get_tiledb_plot_node_details | Build list of details needed for tiledb node graph |
| initial_setup | Performs one-time server-side setup tasks. |
| register | Register DAG to TileDB. |
| report_node_complete | Report a node as complete. |
| retry_all | Retries all failed and cancelled nodes. |
| stats | Get DAG node statistics. |
| submit_array_udf | Submit a function that will be executed in the cloud serverlessly. |
| submit_local | Submit a function that will run locally. |
| submit_sql | Submit a sql query to run serverlessly in the cloud. |
| submit_udf | Submit a function that will be executed in the cloud serverlessly. |
| submit_udf_stage | Submit a function that will be executed in the cloud serverlessly. |
| visualize | Build and render a tree diagram of the DAG. |
| wait | Wait for DAG to be completed. |
add_done_callback
cloud.dag.dag.DAG.add_done_callback(func)Add a callback for when DAG is completed
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | Function to call when DAG status is updated. The function will be passed reference to this dag | required |
add_node
cloud.dag.dag.DAG.add_node(
func_exec,
*args,
name=None,
local_mode=True,
**kwargs,
)Create and add a node.
DEPRECATED. Use submit_local instead.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func_exec | function to execute | required | |
| args | arguments for function execution | () |
|
| name | name | None |
Returns
| Name | Type | Description |
|---|---|---|
| Node that is created |
add_node_obj
cloud.dag.dag.DAG.add_node_obj(node)Add node to DAG.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| node | to add to dag | required |
Returns
| Name | Type | Description |
|---|---|---|
| Node | Node instance. |
add_update_callback
cloud.dag.dag.DAG.add_update_callback(func)Add a callback for when DAG status is updated
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | Function to call when DAG status is updated. The function will be passed reference to this dag | required |
cancel
cloud.dag.dag.DAG.cancel()Cancel DAG.
compute
cloud.dag.dag.DAG.compute()Start the DAG by executing root nodes.
end_nodes
cloud.dag.dag.DAG.end_nodes()Find all end nodes
dag = DAG() dag.add_node(Node())
end_nodes = dag.end_nodes()
Returns
| Name | Type | Description |
|---|---|---|
| list of root nodes |
end_results
cloud.dag.dag.DAG.end_results()Get all end results, will block if all results are not ready
dag = DAG() dag.add_node(Node())
end_results = dag.end_results()
Returns
| Name | Type | Description |
|---|---|---|
| map of results by node ID |
end_results_by_name
cloud.dag.dag.DAG.end_results_by_name()Get all end results, will block if all results are not ready
dag = DAG() dag.add_node(Node())
end_results = dag.end_results_by_name()
Returns
| Name | Type | Description |
|---|---|---|
| map of results by node name |
find_end_nodes
cloud.dag.dag.DAG.find_end_nodes()Find all end nodes.
Returns
| Name | Type | Description |
|---|---|---|
| List[Node] | list of end nodes |
get_tiledb_plot_node_details
cloud.dag.dag.DAG.get_tiledb_plot_node_details()Build list of details needed for tiledb node graph
Returns
| Name | Type | Description |
|---|---|---|
| Dict[str, Dict[str, str]] | Node summary |
initial_setup
cloud.dag.dag.DAG.initial_setup()Performs one-time server-side setup tasks.
Can safely be called multiple times.
register
cloud.dag.dag.DAG.register(name=None)Register DAG to TileDB.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | Optional[str] | Name to register DAG as. Uses self.name as default. | None |
Returns
| Name | Type | Description |
|---|---|---|
| str | Registered name of task graph. |
report_node_complete
cloud.dag.dag.DAG.report_node_complete(node)Report a node as complete.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| node | Node | Node to mark as complete. | required |
retry_all
cloud.dag.dag.DAG.retry_all()Retries all failed and cancelled nodes.
stats
cloud.dag.dag.DAG.stats()Get DAG node statistics.
Returns
| Name | Type | Description |
|---|---|---|
| Dict[str, Union[int, float]] | All node stats. |
submit_array_udf
cloud.dag.dag.DAG.submit_array_udf(func, *args, **kwargs)Submit a function that will be executed in the cloud serverlessly.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | Callable | Function to execute in UDF task. | required |
| args | Any | Positional arguments to pass into Node instantiation. | () |
| kwargs | Any | Keyword args to pass into Node instantiation. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Node that is created. |
submit_local
cloud.dag.dag.DAG.submit_local(func, *args, **kwargs)Submit a function that will run locally.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | Callable | Function to execute in UDF task. | required |
| args | Any | Positional arguments to pass into Node instantiation. | () |
| kwargs | Keyword args to pass into Node instantiation. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Node that is created |
submit_sql
cloud.dag.dag.DAG.submit_sql(sql, *args, **kwargs)Submit a sql query to run serverlessly in the cloud.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| sql | str | Query to execute. | required |
| args | Any | Positional arguments to pass into Node instantiation. | () |
| kwargs | Any | Keyword args to pass into Node instantiation. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Node | Node that is created |
submit_udf
cloud.dag.dag.DAG.submit_udf(func, *args, **kwargs)Submit a function that will be executed in the cloud serverlessly.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | Callable | Function to execute in UDF task. | required |
| args | Positional arguments to pass into Node instantiation. | () |
|
| kwargs | Keyword args to pass into Node instantiation. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Node that is created. |
submit_udf_stage
cloud.dag.dag.DAG.submit_udf_stage(
func,
*args,
expand_node_output=None,
**kwargs,
)Submit a function that will be executed in the cloud serverlessly.
Expand on node output simply means to dynamically allocate works to this UDF stage based on the output of the node indicated via the expand_node_output arg.
For example, if a node, NodeA (NodeA = DAG.submit(...)), returns a list of str values and NodeA is passed to expand_node_output, along with an arg in the func passed to submit_udf_stage that accepts a str is also passed NodeA, a node will spawn in parallel for each str value in the result of NodeA.
graph = DAG(...)
NodeA = graph.submit()
NodeB = graph.submit_udf_stage(..., expand_node_output=NodeA, str_arg=NodeA)Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | Callable | Function to execute in UDF task. | required |
| args | Any | Positional arguments to pass into Node instantiation. | () |
| expand_node_output | Optional[Node] | Node that we want to expand the output of. The output of the node should be a JSON encoded list. | None |
| kwargs | Any | Keyword args to pass into Node instantiation. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Node | Node that is created. |
visualize
cloud.dag.dag.DAG.visualize(notebook=True, auto_update=True, force_plotly=False)Build and render a tree diagram of the DAG.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| notebook | Is the visualization inside a jupyter notebook? If so we’ll use a widget. | True |
|
| auto_update | Should the diagram be auto updated with each status change. | True |
|
| force_plotly | Force the use of plotly graphs instead of TileDB Plot Widget. | False |
Returns
| Name | Type | Description |
|---|---|---|
| returns figure. |
wait
cloud.dag.dag.DAG.wait(timeout=None)Wait for DAG to be completed.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| timeout | Optional[float] | optional timeout in seconds to wait for DAG to be completed | None |
Returns
| Name | Type | Description |
|---|---|---|
| None | None or raises TimeoutError if timeout occurs |
Node
cloud.dag.dag.Node(
func,
*args,
name=None,
dag=None,
mode=Mode.REALTIME,
expand_node_output=None,
_download_results=None,
_internal_prewrapped_func=None,
_internal_accepts_stored_params=True,
**kwargs,
)Representation of a function to run in a DAG.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | Callable[…, _T] | Function to run as UDF task. | required |
| *args | Any | Positional arguments to pass to UDF. | () |
| name | Optional[str] | Human-readable name of Node task. | None |
| dag | Optional[DAG] | DAG this node is associated with. | None |
| mode | Mode | Mode the Node is to run in. | Mode.REALTIME |
| expand_node_output | Optional[Node] | Node to expand processes upon. | None |
| _download_results | Optional[bool] | An optional boolean to override default result-downloading behavior. If True, will always download the results of the function immediately upon completion. If False, will not download the results of the function immediately, but will be downloaded when .result() is called. |
None |
| _internal_prewrapped_func | Callable[…, results.Result[_T]] | For internal use only. A function that returns. something that is already a Result, which does not require wrapping. We assume that all prewrapped functions make server calls. | None |
| _internal_accepts_stored_params | bool | For internal use only. Applies only when _prewrapped_func is used. True if _prewrapped_func can accept stored parameters. False if it cannot, and all parameters must be serialized. |
True |
| **kwargs | Any | Keyword arguments to pass to UDF. | {} |
Attributes
| Name | Description |
|---|---|
| args | Positional args to pass into UDF. |
| children | Child Nodes (Nodes dependent on this Node). |
| dag | DAG this Node is pinned to. |
| error | Return Node error if encountered. |
| future | Returns something that pretends to be a Future. |
| id | UUID for Node instance. |
| kwargs | Keyword args to pass into UDF. |
| mode | Processing mode of Node. |
| name | The human-readable name of Node. |
| parents | Parent Nodes (Nodes this Node is dependent on). |
| status | Node status. |
Methods
| Name | Description |
|---|---|
| add_done_callback | Add callback function to execute at Node completion. |
| cancel | Cancel Node. |
| cancelled | Whether Node is cancelled. |
| depends_on | Create dependency chain for node, useful when there is a dependency |
| exception | Return execption if one was raised. |
| result | Fetch Node return. |
| retry | Retry Node. |
| running | Whether Node is actively running. |
| task_id | Gets the server-side Task ID of this node. |
| to_registration_json | Converts this node to the form used when registering the graph. |
| wait | Wait for node to be completed. |
add_done_callback
cloud.dag.dag.Node.add_done_callback(fn)Add callback function to execute at Node completion.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| fn | Callable[[Node[_T]], None] | Callaback to execute. | required |
cancel
cloud.dag.dag.Node.cancel()Cancel Node.
cancelled
cloud.dag.dag.Node.cancelled()Whether Node is cancelled.
depends_on
cloud.dag.dag.Node.depends_on(node)Create dependency chain for node, useful when there is a dependency that does not rely directly on passing results from one to another.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| node | Node | node to mark as a dependency of this node | required |
exception
cloud.dag.dag.Node.exception(timeout=None)Return execption if one was raised.
result
cloud.dag.dag.Node.result(timeout=None)Fetch Node return.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| timeout | Optional[float] | Time to wait to fetch result. | None |
Returns
| Name | Type | Description |
|---|---|---|
| _T | Results of Node processing. |
retry
cloud.dag.dag.Node.retry()Retry Node.
running
cloud.dag.dag.Node.running()Whether Node is actively running.
task_id
cloud.dag.dag.Node.task_id()Gets the server-side Task ID of this node.
Returns
| Name | Type | Description |
|---|---|---|
| Optional[uuid.UUID] | None if this has no task ID (as it was run on the client side). |
to_registration_json
cloud.dag.dag.Node.to_registration_json(existing_names)Converts this node to the form used when registering the graph.
This is the form of the Node that will be used to represent it in the RegisteredTaskGraph object, i.e. a RegisteredTaskGraphNode.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| existing_names | Set[str] | The set of names that have already been used, so that we don’t generate a duplicate node name. | required |
Returns
| Name | Type | Description |
|---|---|---|
| Dict[str, Any] | Mapping of Node for registration. |
wait
cloud.dag.dag.Node.wait(timeout=None)Wait for node to be completed.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| timeout | Optional[float] | optional timeout in seconds to wait for DAG to be completed. | None |
Returns
| Name | Type | Description |
|---|---|---|
| None | None or raises TimeoutError if timeout occurs. |
Functions
| Name | Description |
|---|---|
| list_logs | Retrieves the list of task graph logs you can view. |
| replace_stored_params | Descends into data structures and replaces Stored Params with results. |
| server_logs | Retrieves the full server-side logs for the given DAG. |
list_logs
cloud.dag.dag.list_logs(
namespace=None,
created_by=None,
search=None,
start_time=None,
end_time=None,
page=1,
per_page=10,
)Retrieves the list of task graph logs you can view.
The returned graph logs will be “light” versions, meaning they will not include any details about the execution state of an individual DAG. To retrieve those, pass the ID to :func:server_logs.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| namespace | Optional[str] | If present, include logs for only this namespace. If absent, include logs for all namespaces you have access to. | None |
| created_by | Optional[str] | Include only logs from this user (if present). | None |
| search | Optional[str] | A search string for the name of the task graph. | None |
| start_time | Optional[datetime.datetime] | Include logs created after this time. | None |
| end_time | Optional[datetime.datetime] | Include logs created before this time. | None |
| page | int | The page number to use, starting from 1. | 1 |
| per_page | int | The number of items per page. | 10 |
replace_stored_params
cloud.dag.dag.replace_stored_params(tree, loader)Descends into data structures and replaces Stored Params with results.
server_logs
cloud.dag.dag.server_logs(dag_or_id, namespace=None)Retrieves the full server-side logs for the given DAG.
The DAG can be provided as a DAG object, or the server-provided UUID of a DAG’s execution log in either :class:uuid.UUID or string form. This can be used to access both completed DAGs and in-progress DAGs. The returned DAGs will include full data
Will return None if called with a DAG object that has no server-side nodes.