dag.dag

client.dag.dag

Directed acyclic graphs as TileDB task graphs.

Classes

Name Description
DAG A direct acyclic graph (DAG) backed by a TileDB Task Graphs.
DAGError Raised when a DAG can not be registered or executed.
Node Representation of a function to run in a DAG.

DAG

client.dag.dag.DAG(
    workspace=None,
    max_workers=None,
    use_processes=False,
    done_callback=None,
    update_callback=None,
    name=None,
    mode=Mode.REALTIME,
    retry_strategy=None,
    workflow_retry_strategy=None,
    deadline=None,
)

A direct acyclic graph (DAG) backed by a TileDB Task Graphs.

Parameters

Name Type Description Default
max_workers int Number of workers to allocate to execute DAG. None
use_processes bool If True the DAG will use processes instead of threads. Default: False. False
done_callback callable Callback function to register for when dag is completed. Function will be passed reference to this DAG. None
update_callback callable Callback function to register for when dag status is updated. Function will be passed reference to this DAG. None
name str Human-readable name for DAG to be shown in task graph logs. None
retry_strategy RetryStrategy K8S retry policy to be applied to each Node. None
workflow_retry_strategy RetryStrategy K8S retry policy to be applied to DAG. None
deadline int Duration (sec) DAG allowed to execute before timeout. None

Attributes

Name Type Description
id UUID The DAG’s unique id.
nodes dict A mapping of child node ids to Node instances.
completed_nodes dict Mapping of completed nodes.
failed_nodes dict Mapping of failed nodes.
running_nodes dict Mapping of running nodes.
not_started_nodes dict Mapping of queued nodes.
cancelled_nodes dict Mapping of cancelled nodes.
name str Human-readable name for DAG to be shown in task graph logs.
server_graph_uuid UUID The server-generated UUID of this graph, used for logging. Will be None until :meth:initial_setup is called. If submitting the log works, will be the UUID; otherwise, will be None.

Methods

Name Description
add_done_callback Add a callback for when DAG is completed
add_node Create and add a node.
add_node_obj Add node to DAG.
add_update_callback Add a callback for when DAG status is updated
cancel Cancel DAG.
compute Start the DAG by executing root nodes.
end_nodes Find all end nodes
end_results Get all end results, will block if all results are not ready
end_results_by_name Get all end results, will block if all results are not ready
find_end_nodes Find all end nodes.
get_tiledb_plot_node_details Build list of details needed for tiledb node graph
initial_setup Performs one-time server-side setup tasks.
register Register this task graph.
report_node_complete Report a node as complete.
retry_all Retries all failed and cancelled nodes.
stats Get DAG node statistics.
submit_array_udf Submit a function that will be executed serverlessly.
submit_local Submit a function that will run locally.
submit_sql Submit a sql query to run serverlessly in the cloud.
submit_udf Submit a function that will be executed in the cloud serverlessly.
submit_udf_stage Submit a function that will be executed in the cloud serverlessly.
visualize Build and render a tree diagram of the DAG.
wait Wait for DAG to be completed.
add_done_callback
client.dag.dag.DAG.add_done_callback(func)

Add a callback for when DAG is completed :param func: Function to call when DAG status is updated. The function will be passed reference to this dag

add_node
client.dag.dag.DAG.add_node(
    func_exec,
    *args,
    name=None,
    local_mode=True,
    **kwargs,
)

Create and add a node.

DEPRECATED. Use submit_local instead.

:param func_exec: function to execute :param args: arguments for function execution :param name: name :return: Node that is created

add_node_obj
client.dag.dag.DAG.add_node_obj(node)

Add node to DAG.

:param node: to add to dag :return: Node instance.

add_update_callback
client.dag.dag.DAG.add_update_callback(func)

Add a callback for when DAG status is updated :param func: Function to call when DAG status is updated. The function will be passed reference to this dag

cancel
client.dag.dag.DAG.cancel()

Cancel DAG.

compute
client.dag.dag.DAG.compute()

Start the DAG by executing root nodes.

end_nodes
client.dag.dag.DAG.end_nodes()

Find all end nodes

dag = DAG() dag.add_node(Node())

end_nodes = dag.end_nodes()

:return: list of root nodes

end_results
client.dag.dag.DAG.end_results()

Get all end results, will block if all results are not ready

dag = DAG() dag.add_node(Node())

end_results = dag.end_results()

:return: map of results by node ID

end_results_by_name
client.dag.dag.DAG.end_results_by_name()

Get all end results, will block if all results are not ready

dag = DAG() dag.add_node(Node())

end_results = dag.end_results_by_name()

:return: map of results by node name

find_end_nodes
client.dag.dag.DAG.find_end_nodes()

Find all end nodes.

:return: list of end nodes

get_tiledb_plot_node_details
client.dag.dag.DAG.get_tiledb_plot_node_details()

Build list of details needed for tiledb node graph

:return: Node summary

initial_setup
client.dag.dag.DAG.initial_setup()

Performs one-time server-side setup tasks.

Can safely be called multiple times.

register
client.dag.dag.DAG.register(path, *, teamspace=None)

Register this task graph.

Parameters
Name Type Description Default
path str or object The TileDB path at which the task graph is to be registered. May be a path relative to a teamspace, a Folder or Asset instance, or an absolute “tiledb” URI. If the path to a folder is provided, the name or id of the task graph will be appended to form a full asset path. required
teamspace Teamspace or str The teamspace to which the task graph will be registered, specified by object or id. If not provided, the path parameter is queried for a teamspace id. None
Raises
Name Type Description
DAGError: When this task graph can not be registered.
Examples
>>> folder = folders.create_folder(
...     "dags",
...     teamspace="teamspace",
...     exist_ok=True,
... )
>>> dag = DAG(name="example1")
>>> dag.register("dags", teamspace="teamspace")

This creates a task graph asset at path “dags/example1” in the teamspace named “teamspace”.

>>> dag = DAG(name="example2")
>>> dag.register(folder)

This creates a DAG asset in “teamspace” at the path “dags/example2”. The DAG’s name has been appended to the folder path.

A DAG can also be registered to a specific absolute “tiledb” URI that specifies a different name.

>>> dag.register("tiledb://workspace/teamspace/dags/example3")
report_node_complete
client.dag.dag.DAG.report_node_complete(node)

Report a node as complete.

:param node: Node to mark as complete.

retry_all
client.dag.dag.DAG.retry_all()

Retries all failed and cancelled nodes.

stats
client.dag.dag.DAG.stats()

Get DAG node statistics.

:return: All node stats.

submit_array_udf
client.dag.dag.DAG.submit_array_udf(
    func,
    *args,
    name=None,
    mode=Mode.REALTIME,
    resource_class=None,
    resources=None,
    storage=None,
    expand_node_output=None,
    _download_results=None,
    _internal_prewrapped_func=None,
    _internal_accepts_stored_params=True,
    **kwargs,
)

Submit a function that will be executed serverlessly.

Parameters
Name Type Description Default
func callable Function to execute in UDF task. required
args tuple Positional arguments for the function. ()
name str Human-readable name of Node task. None
mode Mode Mode the Node is to run in. Default: realtime. Mode.REALTIME
resource_class str Name of a UDF resource class. None
resources dict A UDF resource description. None
storage list of TGUDFStorage Storage and filesystem mount points for the Node. A realtime mode Node does not support storage. None
expand_node_output Node Node to expand processes upon. None
_download_results bool An optional boolean to override default result-downloading behavior. If True, will always download the results of the function immediately upon completion. If False, will not download the results of the function immediately, but will be downloaded when .result() is called. None
_internal_prewrapped_func callable For internal use only. A function that returns. something that is already a Result, which does not require wrapping. We assume that all prewrapped functions make server calls. None
_internal_accepts_stored_params bool For internal use only. Applies only when _prewrapped_func is used. True if _prewrapped_func can accept stored parameters. False if it cannot, and all parameters must be serialized. True
kwargs dict Keyword arguments for the function. {}
Returns
Name Type Description
Node
submit_local
client.dag.dag.DAG.submit_local(func, *args, **kwargs)

Submit a function that will run locally.

:param func: Function to execute in UDF task. :param args: Positional arguments to pass into Node instantiation. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created

submit_sql
client.dag.dag.DAG.submit_sql(sql, *args, **kwargs)

Submit a sql query to run serverlessly in the cloud.

:param sql: Query to execute. :param args: Positional arguments to pass into Node instantiation. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created

submit_udf
client.dag.dag.DAG.submit_udf(func, *args, **kwargs)

Submit a function that will be executed in the cloud serverlessly.

:param func: Function to execute in UDF task. :param args: Positional arguments to pass into Node instantiation. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created.

submit_udf_stage
client.dag.dag.DAG.submit_udf_stage(
    func,
    *args,
    expand_node_output=None,
    **kwargs,
)

Submit a function that will be executed in the cloud serverlessly.

Expand on node output simply means to dynamically allocate works to this UDF stage based on the output of the node indicated via the expand_node_output arg.

For example, if a node, NodeA (NodeA = DAG.submit(...)), returns a list of str values and NodeA is passed to expand_node_output, along with an arg in the func passed to submit_udf_stage that accepts a str is also passed NodeA, a node will spawn in parallel for each str value in the result of NodeA.

graph = DAG(...)

NodeA = graph.submit()

NodeB = graph.submit_udf_stage(..., expand_node_output=NodeA, str_arg=NodeA)

:param func: Function to execute in UDF task. :param args: Positional arguments to pass into Node instantiation. :param expand_node_output: Node that we want to expand the output of. The output of the node should be a JSON encoded list. :param kwargs: Keyword args to pass into Node instantiation. :return: Node that is created.

visualize
client.dag.dag.DAG.visualize(
    notebook=True,
    auto_update=True,
    force_plotly=False,
)

Build and render a tree diagram of the DAG.

:param notebook: Is the visualization inside a jupyter notebook? If so we’ll use a widget. :param auto_update: Should the diagram be auto updated with each status change. :param force_plotly: Force the use of plotly graphs instead of TileDB Plot Widget. :return: returns figure.

wait
client.dag.dag.DAG.wait(timeout=None)

Wait for DAG to be completed.

:param timeout: optional timeout in seconds to wait for DAG to be completed :return: None or raises TimeoutError if timeout occurs

DAGError

client.dag.dag.DAGError()

Raised when a DAG can not be registered or executed.

Node

client.dag.dag.Node(
    func,
    *args,
    name=None,
    dag=None,
    mode=Mode.REALTIME,
    resource_class=None,
    resources=None,
    storage=None,
    expand_node_output=None,
    _download_results=None,
    _internal_prewrapped_func=None,
    _internal_accepts_stored_params=True,
    **kwargs,
)

Representation of a function to run in a DAG.

Parameters

Name Type Description Default
func callable Function to run as UDF task. required
*args sequence Positional arguments to pass to UDF. ()
name str Human-readable name of Node task. None
dag DAG DAG this node is associated with. None
mode Mode Mode the Node is to run in. Default: realtime. Mode.REALTIME
resource_class str Name of a UDF resource class. None
resources dict A UDF resource description. None
storage list of TGUDFStorage Storage and filesystem mount points for the Node. A realtime mode Node does not support storage. None
expand_node_output Node Node to expand processes upon. None
_download_results bool An optional boolean to override default result-downloading behavior. If True, will always download the results of the function immediately upon completion. If False, will not download the results of the function immediately, but will be downloaded when .result() is called. None
_internal_prewrapped_func callable For internal use only. A function that returns. something that is already a Result, which does not require wrapping. We assume that all prewrapped functions make server calls. None
_internal_accepts_stored_params bool For internal use only. Applies only when _prewrapped_func is used. True if _prewrapped_func can accept stored parameters. False if it cannot, and all parameters must be serialized. True
**kwargs dict Keyword arguments to pass to UDF. {}

Raises

Name Type Description
TypeError When func and storage are invalid or incompatible with the Node’s mode.
ValueError When resources and/or resource class are invalid.

Attributes

Name Type Description
id uuid The Node’s unique id.
dag DAG The DAG that the Node is attached to.
mode Mode The Node’s mode (realtime or batch).
parents dict A mapping of parent node ids to Node instances.
children dict A mapping of child node ids to Node instances.
args tuple Positional arguments for the Node’s function.
kwargs dict Keyword arguments for the Node’s function.

Methods

Name Description
add_done_callback Add callback function to execute at Node completion.
cancel Cancel Node.
cancelled Whether Node is cancelled.
depends_on Create dependency chain for node, useful when there is a dependency
exception Return execption if one was raised.
result Fetch Node return.
retry Retry Node.
running Whether Node is actively running.
task_id Gets the server-side Task ID of this node.
to_registration_json Converts this node to the form used when registering the graph.
wait Wait for node to be completed.
add_done_callback
client.dag.dag.Node.add_done_callback(fn)

Add callback function to execute at Node completion.

:param fn: Callaback to execute.

cancel
client.dag.dag.Node.cancel()

Cancel Node.

cancelled
client.dag.dag.Node.cancelled()

Whether Node is cancelled.

depends_on
client.dag.dag.Node.depends_on(node)

Create dependency chain for node, useful when there is a dependency that does not rely directly on passing results from one to another.

:param node: node to mark as a dependency of this node

exception
client.dag.dag.Node.exception(timeout=None)

Return execption if one was raised.

result
client.dag.dag.Node.result(timeout=None)

Fetch Node return.

:param timeout: Time to wait to fetch result. :return: Results of Node processing.

retry
client.dag.dag.Node.retry()

Retry Node.

running
client.dag.dag.Node.running()

Whether Node is actively running.

task_id
client.dag.dag.Node.task_id()

Gets the server-side Task ID of this node.

:return: None if this has no task ID (as it was run on the client side).

to_registration_json
client.dag.dag.Node.to_registration_json(existing_names)

Converts this node to the form used when registering the graph.

This is the form of the Node that will be used to represent it in the RegisteredTaskGraph object, i.e. a RegisteredTaskGraphNode.

:param existing_names: The set of names that have already been used, so that we don’t generate a duplicate node name. :return: Mapping of Node for registration.

wait
client.dag.dag.Node.wait(timeout=None)

Wait for node to be completed.

:param timeout: optional timeout in seconds to wait for DAG to be completed. :return: None or raises TimeoutError if timeout occurs.

Functions

Name Description
list_logs Retrieves the list of task graph logs you can view.
replace_stored_params Descends into data structures and replaces Stored Params with results.
server_logs Retrieves the full server-side logs for the given DAG.

list_logs

client.dag.dag.list_logs(
    workspace=None,
    created_by=None,
    search=None,
    start_time=None,
    end_time=None,
    page=1,
    per_page=10,
)

Retrieves the list of task graph logs you can view.

The returned graph logs will be “light” versions, meaning they will not include any details about the execution state of an individual DAG. To retrieve those, pass the ID to :func:server_logs.

:param workspace: If present, include logs for only this workspace. If absent, include logs for all workspaces you have access to. :param created_by: Include only logs from this user (if present). :param search: A search string for the name of the task graph. :param start_time: Include logs created after this time. :param end_time: Include logs created before this time. :param page: The page number to use, starting from 1. :param per_page: The number of items per page.

replace_stored_params

client.dag.dag.replace_stored_params(tree, loader)

Descends into data structures and replaces Stored Params with results.

server_logs

client.dag.dag.server_logs(dag_or_id, workspace=None)

Retrieves the full server-side logs for the given DAG.

The DAG can be provided as a DAG object, or the server-provided UUID of a DAG’s execution log in either :class:uuid.UUID or string form. This can be used to access both completed DAGs and in-progress DAGs. The returned DAGs will include full data

Will return None if called with a DAG object that has no server-side nodes.