dag.dag

cloud.dag.dag

Classes

Name Description
DAG Low-level API for creating and managing direct acyclic graphs
Node Representation of a function to run in a DAG.

DAG

cloud.dag.dag.DAG(
    self
    max_workers=None
    use_processes=False
    done_callback=None
    update_callback=None
    namespace=None
    name=None
    mode=Mode.REALTIME
    retry_strategy=None
    workflow_retry_strategy=None
    deadline=None
)

Low-level API for creating and managing direct acyclic graphs as TileDB Cloud Task Graphs.

Parameters

Name Type Description Default
max_workers Optional[int] Number of workers to allocate to execute DAG. None
use_processes bool If true will use processes instead of threads. False
done_callback Optional[Callable[[DAG], None]] Optional call back function to register for when dag is completed. Function will be passed reference to this DAG. None
update_callback Optional[Callable[[DAG], None]] Optional call back function to register for when dag status is updated. Function will be passed reference to this DAG. None
namespace Optional[str] Namespace to execute DAG in. None
name Optional[str] Human-readable name for DAG to be showin in Task Graph logs. None
mode Mode Mode the DAG is to run in, valid options are: Mode.REALTIME, Mode.BATCH. Mode.REALTIME
retry_strategy Optional[models.RetryStrategy] K8S retry policy to be applied to each Node. None
workflow_retry_strategy Optional[models.RetryStrategy] K8S retry policy to be applied to DAG. None
deadline Optional[int] Duration (sec) DAG allowed to execute before timeout. None

Attributes

Name Description
cancelled_nodes Cancelled Nodes.
completed_nodes Completed Nodes.
deadline Duration (sec) DAG allowed to execute before timeout.
failed_nodes Failed Nodes.
id UUID for DAG instance.
max_workers Flag. If true will use processes instead of threads.
mode Mode the DAG is to run in.
name Human-readable name for DAG to be showin in Task Graph logs.
namespace Namespace to execute DAG in.
nodes Mapping of Node UUIDs to Node instances.
nodes_by_name Mapping of Node names to Node instances.
not_started_nodes Queued Nodes.
retry_strategy K8S retry policy to be applied to each Node.
running_nodes Running Nodes.
server_graph_uuid The server-generated UUID of this graph, used for logging.
status Get DAG status.
use_processes Number of works to allocate to execute DAG.
visualization The following executors are initialized by calling
workflow_retry_strategy K8S retry policy to be applied to DAG.

Methods

Name Description
add_done_callback Add a callback for when DAG is completed
add_node Create and add a node.
add_node_obj Add node to DAG.
add_update_callback Add a callback for when DAG status is updated
cancel Cancel DAG.
compute Start the DAG by executing root nodes.
end_nodes Find all end nodes
end_results Get all end results, will block if all results are not ready
end_results_by_name Get all end results, will block if all results are not ready
find_end_nodes Find all end nodes.
get_tiledb_plot_node_details Build list of details needed for tiledb node graph
initial_setup Performs one-time server-side setup tasks.
report_node_complete Report a node as complete.
retry_all Retries all failed and cancelled nodes.
stats Get DAG node statistics.
submit_array_udf Submit a function that will be executed in the cloud serverlessly.
submit_local Submit a function that will run locally.
submit_sql Submit a sql query to run serverlessly in the cloud.
submit_udf Submit a function that will be executed in the cloud serverlessly.
submit_udf_stage Submit a function that will be executed in the cloud serverlessly.
visualize Build and render a tree diagram of the DAG.
wait Wait for DAG to be completed.
add_done_callback
cloud.dag.dag.DAG.add_done_callback(func)

Add a callback for when DAG is completed

Parameters
Name Type Description Default
func Function to call when DAG status is updated. The function will be passed reference to this dag required
add_node
cloud.dag.dag.DAG.add_node(
    func_exec
    *args
    name=None
    local_mode=True
    **kwargs
)

Create and add a node.

DEPRECATED. Use submit_local instead.

Parameters
Name Type Description Default
func_exec function to execute required
args arguments for function execution ()
name name None
Returns
Name Type Description
Node that is created
add_node_obj
cloud.dag.dag.DAG.add_node_obj(node)

Add node to DAG.

Parameters
Name Type Description Default
node to add to dag required
Returns
Name Type Description
Node Node instance.
add_update_callback
cloud.dag.dag.DAG.add_update_callback(func)

Add a callback for when DAG status is updated

Parameters
Name Type Description Default
func Function to call when DAG status is updated. The function will be passed reference to this dag required
cancel
cloud.dag.dag.DAG.cancel()

Cancel DAG.

compute
cloud.dag.dag.DAG.compute()

Start the DAG by executing root nodes.

end_nodes
cloud.dag.dag.DAG.end_nodes()

Find all end nodes

dag = DAG() dag.add_node(Node())

end_nodes = dag.end_nodes()

Returns
Name Type Description
list of root nodes
end_results
cloud.dag.dag.DAG.end_results()

Get all end results, will block if all results are not ready

dag = DAG() dag.add_node(Node())

end_results = dag.end_results()

Returns
Name Type Description
map of results by node ID
end_results_by_name
cloud.dag.dag.DAG.end_results_by_name()

Get all end results, will block if all results are not ready

dag = DAG() dag.add_node(Node())

end_results = dag.end_results_by_name()

Returns
Name Type Description
map of results by node name
find_end_nodes
cloud.dag.dag.DAG.find_end_nodes()

Find all end nodes.

Returns
Name Type Description
List[Node] list of end nodes
get_tiledb_plot_node_details
cloud.dag.dag.DAG.get_tiledb_plot_node_details()

Build list of details needed for tiledb node graph

Returns
Name Type Description
Dict[str, Dict[str, str]] Node summary
initial_setup
cloud.dag.dag.DAG.initial_setup()

Performs one-time server-side setup tasks.

Can safely be called multiple times.

report_node_complete
cloud.dag.dag.DAG.report_node_complete(node)

Report a node as complete.

Parameters
Name Type Description Default
node Node Node to mark as complete. required
retry_all
cloud.dag.dag.DAG.retry_all()

Retries all failed and cancelled nodes.

stats
cloud.dag.dag.DAG.stats()

Get DAG node statistics.

Returns
Name Type Description
Dict[str, Union[int, float]] All node stats.
submit_array_udf
cloud.dag.dag.DAG.submit_array_udf(func, *args, **kwargs)

Submit a function that will be executed in the cloud serverlessly.

Parameters
Name Type Description Default
func Callable Function to execute in UDF task. required
*args Any Postional arguments to pass into Node instantation. ()
**kwargs Any Keyword args to pass into Node instantiation. {}
Returns
Name Type Description
Node that is created.
submit_local
cloud.dag.dag.DAG.submit_local(func, *args, **kwargs)

Submit a function that will run locally.

Parameters
Name Type Description Default
func Callable Function to execute in UDF task. required
*args Any Postional arguments to pass into Node instantation. ()
**kwargs Keyword args to pass into Node instantiation. {}
Returns
Name Type Description
Node that is created
submit_sql
cloud.dag.dag.DAG.submit_sql(*args, **kwargs)

Submit a sql query to run serverlessly in the cloud.

Parameters
Name Type Description Default
sql Query to execute. required
*args Any Postional arguments to pass into Node instantation. ()
**kwargs Any Keyword args to pass into Node instantiation. {}
Returns
Name Type Description
Node Node that is created
submit_udf
cloud.dag.dag.DAG.submit_udf(func, *args, **kwargs)

Submit a function that will be executed in the cloud serverlessly.

Parameters
Name Type Description Default
func Callable Function to execute in UDF task. required
*args Postional arguments to pass into Node instantation. ()
**kwargs Keyword args to pass into Node instantiation. {}
Returns
Name Type Description
Node that is created.
submit_udf_stage
cloud.dag.dag.DAG.submit_udf_stage(
    func
    *args
    expand_node_output=None
    **kwargs
)

Submit a function that will be executed in the cloud serverlessly.

Expand on node output simply means to dynamically allocate works to this UDF stage based on the output of the node indicated via the expand_node_output arg.

For example, if a node, NodeA (NodeA = DAG.submit(...)), returns a list of str values and NodeA is passed to expand_node_output, along with an arg in the func passed to submit_udf_stage that accepts a str is also passed NodeA, a node will spawn in parallel for each str value in the result of NodeA.

graph = DAG(...)

NodeA = graph.submit()

NodeB = graph.submit_udf_stage(..., expand_node_output=NodeA, str_arg=NodeA)
Parameters
Name Type Description Default
func Callable Function to execute in UDF task. required
*args Any Postional arguments to pass into Node instantation. ()
expand_node_output Optional[Node] Node that we want to expand the output of. The output of the node should be a JSON encoded list. None
**kwargs Any Keyword args to pass into Node instantiation. {}
Returns
Name Type Description
Node Node that is created.
visualize
cloud.dag.dag.DAG.visualize(notebook=True, auto_update=True, force_plotly=False)

Build and render a tree diagram of the DAG.

Parameters
Name Type Description Default
notebook Is the visualization inside a jupyter notebook? If so we’ll use a widget. True
auto_update Should the diagram be auto updated with each status change. True
force_plotly Force the use of plotly graphs instead of TileDB Plot Widget. False
Returns
Name Type Description
returns figure.
wait
cloud.dag.dag.DAG.wait(timeout=None)

Wait for DAG to be completed.

Parameters
Name Type Description Default
timeout Optional[float] optional timeout in seconds to wait for DAG to be completed None
Returns
Name Type Description
None None or raises TimeoutError if timeout occurs

Node

cloud.dag.dag.Node(
    self
    func
    *args
    name=None
    dag=None
    mode=Mode.REALTIME
    expand_node_output=None
    _download_results=None
    _internal_prewrapped_func=None
    _internal_accepts_stored_params=True
    **kwargs
)

Representation of a function to run in a DAG.

Parameters

Name Type Description Default
func Callable[…, _T] Function to run as UDF task. required
*args Any Positional arguments to pass to UDF. ()
name Optional[str] Human-readable name of Node task. None
dag Optional[DAG] DAG this node is associated with. None
mode Mode Mode the Node is to run in. Mode.REALTIME
expand_node_output Optional[Node] Node to expand processes upon. None
_download_results Optional[bool] An optional boolean to override default result-downloading behavior. If True, will always download the results of the function immediately upon completion. If False, will not download the results of the function immediately, but will be downloaded when .result() is called. None
_internal_prewrapped_func Callable[…, results.Result[_T]] For internal use only. A function that returns. something that is already a Result, which does not require wrapping. We assume that all prewrapped functions make server calls. None
_internal_accepts_stored_params bool For internal use only. Applies only when _prewrapped_func is used. True if _prewrapped_func can accept stored parameters. False if it cannot, and all parameters must be serialized. True
**kwargs Any Keyword arguments to pass to UDF. {}

Attributes

Name Description
args Positional args to pass into UDF.
children Child Nodes (Nodes dependent on this Node).
dag DAG this Node is pinned to.
error Return Node error if encountered.
future Returns something that pretends to be a Future.
id UUID for Node instance.
kwargs Keyword args to pass into UDF.
mode Processing mode of Node.
name The human-readable name of Node.
parents Parent Nodes (Nodes this Node is dependent on).
status Node status.

Methods

Name Description
add_done_callback Add callback function to execute at Node completion.
cancel Cancel Node.
cancelled Whether Node is cancelled.
depends_on Create dependency chain for node, useful when there is a dependency
exception Return execption if one was raised.
result Fetch Node return.
retry Retry Node.
running Whether Node is actively running.
task_id Gets the server-side Task ID of this node.
wait Wait for node to be completed.
add_done_callback
cloud.dag.dag.Node.add_done_callback(fn)

Add callback function to execute at Node completion.

Parameters
Name Type Description Default
fn Callable[[Node[_T]], None] Callaback to execute. required
cancel
cloud.dag.dag.Node.cancel()

Cancel Node.

cancelled
cloud.dag.dag.Node.cancelled()

Whether Node is cancelled.

depends_on
cloud.dag.dag.Node.depends_on(node)

Create dependency chain for node, useful when there is a dependency that does not rely directly on passing results from one to another.

Parameters
Name Type Description Default
node Node node to mark as a dependency of this node required
exception
cloud.dag.dag.Node.exception(timeout=None)

Return execption if one was raised.

result
cloud.dag.dag.Node.result(timeout=None)

Fetch Node return.

Parameters
Name Type Description Default
timeout Optional[float] Time to wait to fetch result. None
Returns
Name Type Description
_T Results of Node processing.
retry
cloud.dag.dag.Node.retry()

Retry Node.

running
cloud.dag.dag.Node.running()

Whether Node is actively running.

task_id
cloud.dag.dag.Node.task_id()

Gets the server-side Task ID of this node.

Returns
Name Type Description
Optional[uuid.UUID] None if this has no task ID (as it was run on the client side).
wait
cloud.dag.dag.Node.wait(timeout=None)

Wait for node to be completed.

Parameters
Name Type Description Default
timeout Optional[float] optional timeout in seconds to wait for DAG to be completed. None
Returns
Name Type Description
None None or raises TimeoutError if timeout occurs.

Functions

Name Description
list_logs Retrieves the list of task graph logs you can view.
replace_stored_params Descends into data structures and replaces Stored Params with results.
server_logs Retrieves the full server-side logs for the given DAG.

list_logs

cloud.dag.dag.list_logs(
    namespace=None
    created_by=None
    search=None
    start_time=None
    end_time=None
    page=1
    per_page=10
)

Retrieves the list of task graph logs you can view.

The returned graph logs will be “light” versions, meaning they will not include any details about the execution state of an individual DAG. To retrieve those, pass the ID to :func:server_logs.

Parameters

Name Type Description Default
namespace Optional[str] If present, include logs for only this namespace. If absent, include logs for all namespaces you have access to. None
created_by Optional[str] Include only logs from this user (if present). None
search Optional[str] A search string for the name of the task graph. None
start_time Optional[datetime.datetime] Include logs created after this time. None
end_time Optional[datetime.datetime] Include logs created before this time. None
page int The page number to use, starting from 1. 1
per_page int The number of items per page. 10

replace_stored_params

cloud.dag.dag.replace_stored_params(tree, loader)

Descends into data structures and replaces Stored Params with results.

server_logs

cloud.dag.dag.server_logs(dag_or_id, namespace=None)

Retrieves the full server-side logs for the given DAG.

The DAG can be provided as a DAG object, or the server-provided UUID of a DAG’s execution log in either :class:uuid.UUID or string form. This can be used to access both completed DAGs and in-progress DAGs. The returned DAGs will include full data

Will return None if called with a DAG object that has no server-side nodes.