dag.dag
client.dag.dag
Directed acyclic graphs as TileDB task graphs.
Classes
| Name | Description |
|---|---|
| DAG | A direct acyclic graph (DAG) backed by a TileDB Task Graphs. |
| DAGError | Raised when a DAG can not be registered or executed. |
| Node | Representation of a function to run in a DAG. |
DAG
client.dag.dag.DAG(
workspace=None,
max_workers=None,
use_processes=False,
done_callback=None,
update_callback=None,
name=None,
mode=Mode.REALTIME,
retry_strategy=None,
workflow_retry_strategy=None,
deadline=None,
)A direct acyclic graph (DAG) backed by a TileDB Task Graphs.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| max_workers | int | Number of workers to allocate to execute DAG. | None |
| use_processes | bool | If True the DAG will use processes instead of threads. Default: False. | False |
| done_callback | callable | Callback function to register for when dag is completed. Function will be passed reference to this DAG. | None |
| update_callback | callable | Callback function to register for when dag status is updated. Function will be passed reference to this DAG. | None |
| name | str | Human-readable name for DAG to be shown in task graph logs. | None |
| retry_strategy | RetryStrategy | K8S retry policy to be applied to each Node. | None |
| workflow_retry_strategy | RetryStrategy | K8S retry policy to be applied to DAG. | None |
| deadline | int | Duration (sec) DAG allowed to execute before timeout. | None |
Attributes
| Name | Type | Description |
|---|---|---|
| id | UUID | The DAG’s unique id. |
| nodes | dict | A mapping of child node ids to Node instances. |
| completed_nodes | dict | Mapping of completed nodes. |
| failed_nodes | dict | Mapping of failed nodes. |
| running_nodes | dict | Mapping of running nodes. |
| not_started_nodes | dict | Mapping of queued nodes. |
| cancelled_nodes | dict | Mapping of cancelled nodes. |
| name | str | Human-readable name for DAG to be shown in task graph logs. |
| server_graph_uuid | UUID | The server-generated UUID of this graph, used for logging. Will be None until :meth:initial_setup is called. If submitting the log works, will be the UUID; otherwise, will be None. |
Methods
| Name | Description |
|---|---|
| add_done_callback | Add a callback for when DAG is completed |
| add_node | Create and add a node. |
| add_node_obj | Add node to DAG. |
| add_update_callback | Add a callback for when DAG status is updated |
| cancel | Cancel DAG. |
| compute | Start the DAG by executing root nodes. |
| end_nodes | Find all end nodes |
| end_results | Get all end results, will block if all results are not ready |
| end_results_by_name | Get all end results, will block if all results are not ready |
| find_end_nodes | Find all end nodes. |
| get_tiledb_plot_node_details | Build list of details needed for tiledb node graph |
| initial_setup | Performs one-time server-side setup tasks. |
| register | Register this task graph. |
| report_node_complete | Report a node as complete. |
| retry_all | Retries all failed and cancelled nodes. |
| stats | Get DAG node statistics. |
| submit_array_udf | Submit a function that will be executed serverlessly. |
| submit_local | Submit a function that will run locally. |
| submit_sql | Submit a sql query to run serverlessly in the cloud. |
| submit_udf | Submit a function that will be executed in the cloud serverlessly. |
| submit_udf_stage | Submit a function that will be executed in the cloud serverlessly. |
| visualize | Build and render a tree diagram of the DAG. |
| wait | Wait for DAG to be completed. |
add_done_callback
client.dag.dag.DAG.add_done_callback(func)Add a callback for when DAG is completed :param func: Function to call when DAG status is updated. The function will be passed reference to this dag
add_node
client.dag.dag.DAG.add_node(
func_exec,
*args,
name=None,
local_mode=True,
**kwargs,
)Create and add a node.
DEPRECATED. Use submit_local instead.
:param func_exec: function to execute :param args: arguments for function execution :param name: name :return: Node that is created
add_node_obj
client.dag.dag.DAG.add_node_obj(node)Add node to DAG.
:param node: to add to dag :return: Node instance.
add_update_callback
client.dag.dag.DAG.add_update_callback(func)Add a callback for when DAG status is updated :param func: Function to call when DAG status is updated. The function will be passed reference to this dag
cancel
client.dag.dag.DAG.cancel()Cancel DAG.
compute
client.dag.dag.DAG.compute()Start the DAG by executing root nodes.
end_nodes
client.dag.dag.DAG.end_nodes()Find all end nodes
dag = DAG() dag.add_node(Node())
end_nodes = dag.end_nodes()
:return: list of root nodes
end_results
client.dag.dag.DAG.end_results()Get all end results, will block if all results are not ready
dag = DAG() dag.add_node(Node())
end_results = dag.end_results()
:return: map of results by node ID
end_results_by_name
client.dag.dag.DAG.end_results_by_name()Get all end results, will block if all results are not ready
dag = DAG() dag.add_node(Node())
end_results = dag.end_results_by_name()
:return: map of results by node name
find_end_nodes
client.dag.dag.DAG.find_end_nodes()Find all end nodes.
:return: list of end nodes
get_tiledb_plot_node_details
client.dag.dag.DAG.get_tiledb_plot_node_details()Build list of details needed for tiledb node graph
:return: Node summary
initial_setup
client.dag.dag.DAG.initial_setup()Performs one-time server-side setup tasks.
Can safely be called multiple times.
register
client.dag.dag.DAG.register(path, *, teamspace=None)Register this task graph.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str or object | The TileDB path at which the task graph is to be registered. May be a path relative to a teamspace, a Folder or Asset instance, or an absolute “tiledb” URI. If the path to a folder is provided, the name or id of the task graph will be appended to form a full asset path. |
required |
| teamspace | Teamspace or str | The teamspace to which the task graph will be registered, specified by object or id. If not provided, the path parameter is queried for a teamspace id. |
None |
Raises
| Name | Type | Description |
|---|---|---|
| DAGError: | When this task graph can not be registered. |
Examples
>>> folder = folders.create_folder(
... "dags",
... teamspace="teamspace",
... exist_ok=True,
... )
>>> dag = DAG(name="example1")
>>> dag.register("dags", teamspace="teamspace")This creates a task graph asset at path “dags/example1” in the teamspace named “teamspace”.
>>> dag = DAG(name="example2")
>>> dag.register(folder)This creates a DAG asset in “teamspace” at the path “dags/example2”. The DAG’s name has been appended to the folder path.
A DAG can also be registered to a specific absolute “tiledb” URI that specifies a different name.
>>> dag.register("tiledb://workspace/teamspace/dags/example3")report_node_complete
client.dag.dag.DAG.report_node_complete(node)Report a node as complete.
:param node: Node to mark as complete.
retry_all
client.dag.dag.DAG.retry_all()Retries all failed and cancelled nodes.
stats
client.dag.dag.DAG.stats()Get DAG node statistics.
:return: All node stats.
submit_array_udf
client.dag.dag.DAG.submit_array_udf(
func,
*args,
name=None,
mode=Mode.REALTIME,
resource_class=None,
resources=None,
storage=None,
expand_node_output=None,
_download_results=None,
_internal_prewrapped_func=None,
_internal_accepts_stored_params=True,
**kwargs,
)Submit a function that will be executed serverlessly.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | callable | Function to execute in UDF task. | required |
| args | tuple | Positional arguments for the function. | () |
| name | str | Human-readable name of Node task. | None |
| mode | Mode | Mode the Node is to run in. Default: realtime. | Mode.REALTIME |
| resource_class | str | Name of a UDF resource class. | None |
| resources | dict | A UDF resource description. | None |
| storage | list of TGUDFStorage | Storage and filesystem mount points for the Node. A realtime mode Node does not support storage. | None |
| expand_node_output | Node | Node to expand processes upon. | None |
| _download_results | bool | An optional boolean to override default result-downloading behavior. If True, will always download the results of the function immediately upon completion. If False, will not download the results of the function immediately, but will be downloaded when .result() is called. |
None |
| _internal_prewrapped_func | callable | For internal use only. A function that returns. something that is already a Result, which does not require wrapping. We assume that all prewrapped functions make server calls. | None |
| _internal_accepts_stored_params | bool | For internal use only. Applies only when _prewrapped_func is used. True if _prewrapped_func can accept stored parameters. False if it cannot, and all parameters must be serialized. |
True |
| kwargs | dict | Keyword arguments for the function. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Node |
submit_local
client.dag.dag.DAG.submit_local(func, *args, **kwargs)Submit a function that will run locally.
:param func: Function to execute in UDF task. :param args: Positional arguments to pass into Node instantiation. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created
submit_sql
client.dag.dag.DAG.submit_sql(sql, *args, **kwargs)Submit a sql query to run serverlessly in the cloud.
:param sql: Query to execute. :param args: Positional arguments to pass into Node instantiation. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created
submit_udf
client.dag.dag.DAG.submit_udf(func, *args, **kwargs)Submit a function that will be executed in the cloud serverlessly.
:param func: Function to execute in UDF task. :param args: Positional arguments to pass into Node instantiation. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created.
submit_udf_stage
client.dag.dag.DAG.submit_udf_stage(
func,
*args,
expand_node_output=None,
**kwargs,
)Submit a function that will be executed in the cloud serverlessly.
Expand on node output simply means to dynamically allocate works to this UDF stage based on the output of the node indicated via the expand_node_output arg.
For example, if a node, NodeA (NodeA = DAG.submit(...)), returns a list of str values and NodeA is passed to expand_node_output, along with an arg in the func passed to submit_udf_stage that accepts a str is also passed NodeA, a node will spawn in parallel for each str value in the result of NodeA.
graph = DAG(...)
NodeA = graph.submit()
NodeB = graph.submit_udf_stage(..., expand_node_output=NodeA, str_arg=NodeA):param func: Function to execute in UDF task. :param args: Positional arguments to pass into Node instantiation. :param expand_node_output: Node that we want to expand the output of. The output of the node should be a JSON encoded list. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created.
visualize
client.dag.dag.DAG.visualize(
notebook=True,
auto_update=True,
force_plotly=False,
)Build and render a tree diagram of the DAG.
:param notebook: Is the visualization inside a jupyter notebook? If so we’ll use a widget. :param auto_update: Should the diagram be auto updated with each status change. :param force_plotly: Force the use of plotly graphs instead of TileDB Plot Widget. :return: returns figure.
wait
client.dag.dag.DAG.wait(timeout=None)Wait for DAG to be completed.
:param timeout: optional timeout in seconds to wait for DAG to be completed :return: None or raises TimeoutError if timeout occurs
DAGError
client.dag.dag.DAGError()Raised when a DAG can not be registered or executed.
Node
client.dag.dag.Node(
func,
*args,
name=None,
dag=None,
mode=Mode.REALTIME,
resource_class=None,
resources=None,
storage=None,
expand_node_output=None,
_download_results=None,
_internal_prewrapped_func=None,
_internal_accepts_stored_params=True,
**kwargs,
)Representation of a function to run in a DAG.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| func | callable | Function to run as UDF task. | required |
| *args | sequence | Positional arguments to pass to UDF. | () |
| name | str | Human-readable name of Node task. | None |
| dag | DAG | DAG this node is associated with. | None |
| mode | Mode | Mode the Node is to run in. Default: realtime. | Mode.REALTIME |
| resource_class | str | Name of a UDF resource class. | None |
| resources | dict | A UDF resource description. | None |
| storage | list of TGUDFStorage | Storage and filesystem mount points for the Node. A realtime mode Node does not support storage. | None |
| expand_node_output | Node | Node to expand processes upon. | None |
| _download_results | bool | An optional boolean to override default result-downloading behavior. If True, will always download the results of the function immediately upon completion. If False, will not download the results of the function immediately, but will be downloaded when .result() is called. |
None |
| _internal_prewrapped_func | callable | For internal use only. A function that returns. something that is already a Result, which does not require wrapping. We assume that all prewrapped functions make server calls. | None |
| _internal_accepts_stored_params | bool | For internal use only. Applies only when _prewrapped_func is used. True if _prewrapped_func can accept stored parameters. False if it cannot, and all parameters must be serialized. |
True |
| **kwargs | dict | Keyword arguments to pass to UDF. | {} |
Raises
| Name | Type | Description |
|---|---|---|
| TypeError | When func and storage are invalid or incompatible with the Node’s mode. |
|
| ValueError | When resources and/or resource class are invalid. |
Attributes
| Name | Type | Description |
|---|---|---|
| id | uuid | The Node’s unique id. |
| dag | DAG | The DAG that the Node is attached to. |
| mode | Mode | The Node’s mode (realtime or batch). |
| parents | dict | A mapping of parent node ids to Node instances. |
| children | dict | A mapping of child node ids to Node instances. |
| args | tuple | Positional arguments for the Node’s function. |
| kwargs | dict | Keyword arguments for the Node’s function. |
Methods
| Name | Description |
|---|---|
| add_done_callback | Add callback function to execute at Node completion. |
| cancel | Cancel Node. |
| cancelled | Whether Node is cancelled. |
| depends_on | Create dependency chain for node, useful when there is a dependency |
| exception | Return execption if one was raised. |
| result | Fetch Node return. |
| retry | Retry Node. |
| running | Whether Node is actively running. |
| task_id | Gets the server-side Task ID of this node. |
| to_registration_json | Converts this node to the form used when registering the graph. |
| wait | Wait for node to be completed. |
add_done_callback
client.dag.dag.Node.add_done_callback(fn)Add callback function to execute at Node completion.
:param fn: Callaback to execute.
cancel
client.dag.dag.Node.cancel()Cancel Node.
cancelled
client.dag.dag.Node.cancelled()Whether Node is cancelled.
depends_on
client.dag.dag.Node.depends_on(node)Create dependency chain for node, useful when there is a dependency that does not rely directly on passing results from one to another.
:param node: node to mark as a dependency of this node
exception
client.dag.dag.Node.exception(timeout=None)Return execption if one was raised.
result
client.dag.dag.Node.result(timeout=None)Fetch Node return.
:param timeout: Time to wait to fetch result. :return: Results of Node processing.
retry
client.dag.dag.Node.retry()Retry Node.
running
client.dag.dag.Node.running()Whether Node is actively running.
task_id
client.dag.dag.Node.task_id()Gets the server-side Task ID of this node.
:return: None if this has no task ID (as it was run on the client side).
to_registration_json
client.dag.dag.Node.to_registration_json(existing_names)Converts this node to the form used when registering the graph.
This is the form of the Node that will be used to represent it in the RegisteredTaskGraph object, i.e. a RegisteredTaskGraphNode.
:param existing_names: The set of names that have already been used, so that we don’t generate a duplicate node name. :return: Mapping of Node for registration.
wait
client.dag.dag.Node.wait(timeout=None)Wait for node to be completed.
:param timeout: optional timeout in seconds to wait for DAG to be completed. :return: None or raises TimeoutError if timeout occurs.
Functions
| Name | Description |
|---|---|
| list_logs | Retrieves the list of task graph logs you can view. |
| replace_stored_params | Descends into data structures and replaces Stored Params with results. |
| server_logs | Retrieves the full server-side logs for the given DAG. |
list_logs
client.dag.dag.list_logs(
workspace=None,
created_by=None,
search=None,
start_time=None,
end_time=None,
page=1,
per_page=10,
)Retrieves the list of task graph logs you can view.
The returned graph logs will be “light” versions, meaning they will not include any details about the execution state of an individual DAG. To retrieve those, pass the ID to :func:server_logs.
:param workspace: If present, include logs for only this workspace. If absent, include logs for all workspaces you have access to. :param created_by: Include only logs from this user (if present). :param search: A search string for the name of the task graph. :param start_time: Include logs created after this time. :param end_time: Include logs created before this time. :param page: The page number to use, starting from 1. :param per_page: The number of items per page.
replace_stored_params
client.dag.dag.replace_stored_params(tree, loader)Descends into data structures and replaces Stored Params with results.
server_logs
client.dag.dag.server_logs(dag_or_id, workspace=None)Retrieves the full server-side logs for the given DAG.
The DAG can be provided as a DAG object, or the server-provided UUID of a DAG’s execution log in either :class:uuid.UUID or string form. This can be used to access both completed DAGs and in-progress DAGs. The returned DAGs will include full data
Will return None if called with a DAG object that has no server-side nodes.