# Dynamic workflows
Supported in ADKPython v2.0.0Alpha
The ADK framework provides a programmatic way to define workflows as a more
flexible and powerful alternative to [graph-based workflows](/workflows/).
Using a graph-based approach provides a convenient way to compose multi-step,
static process structures with workflow nodes. However, if the logic path for
your workflow is more complex, with iterative loops or complex branching logic,
a graph-based approach may not suit your needs, or may become too unwieldy to
manage.
Dynamic workflows in ADK allow you to put aside graph-based path structures and
use the full power of your chosen programming language to build workflows. With
Dynamic workflows, you can create workflows with simple decorators, invoke
workflow nodes as functions, and build complex routing logic. Here are some of
the benefits of dynamic workflows in ADK:
- **Flexible Control Flow:** Define execution order dynamically using
loops, conditionals, and recursion which are difficult or impossible to
represent in static graphs.
- **Programmatic Experience:** Use familiar constructs like `while` loops
and `async/await` instead of graph-based routing.
- **Automatic Checkpointing:** Dynamic workflows track each node
execution. Successful sub-nodes are automatically skipped when resuming the
workflow, making complex logic durable and resumable by default.
- **Encapsulation:** Wrap business logic into *parent* nodes that
internally compose lower-level nodes, keeping the overall workflow graph
clean and manageable.
!!! example "Alpha Release"
ADK 2.0 is an Alpha release and may cause breaking changes when used with prior
versions of ADK. Do not use ADK 2.0 if you require backwards compatibility, such
as in production environments. We encourage you to test this release and we
welcome your
[feedback](https://github.com/google/adk-python/issues/new?template=feature_request.md&labels=v2)!
For information on installing ADK 2.0 to test this feature, see
[Welcome to ADK 2.0](/2.0/).
## Get started
The following dynamic workflow code example shows how to define a basic
workflow containing a single node with a function:
```python
from google.adk import Workflow
from google.adk import Event
from google.adk import Context
from typing import Any
@node(name="hello_node")
def my_node(node_input: Any):
return "Hello World"
# define a dynamic workflow node
@node(rerun_on_resume=True)
async def my_workflow(ctx: Context, node_input: str) -> str:
# run_node executes a node and returns its output
result = await ctx.run_node(my_node, input_data="hello")
return result
# Run the workflow
root_agent = Workflow(
name="root_agent",
edges=[("START", my_workflow)],
)
```
This example uses the [***@node***](#node) annotation for convenience and to
keep the written code as simple as possible. This annotation generates wrappers
that allow the code to be run in the context of an ADK dynamic workflow.
## Building blocks: nodes and workflows
Nodes and workflows represent the basic building blocks of ADK's dynamic
workflows. These classes provide the functionality required to wrap your code so
it can be integrated into code-based workflows in ADK.
### Nodes and @node {#node}
A dynamic workflow in ADK is composed of *nodes*, which are classes derived
from ***BaseNode***. A simple version of a usable workflow node is a
***FunctionNode***, which allows you to wrap code with functionality required to
run within a ***Workflow***. For convenience, the ADK framework provides the
***@node*** annotation which generates the node wrapper, keeping boilerplate
wrapper code to a minimum:
```python
@node(name="hello_node")
def my_function_node(node_input: Any):
return "Hello World"
```
The following code snippet shows the equivalent code *without* the
***@node*** annotation:
```python
# base function
def my_function_node(node_input: Any):
return "Hello World"
# FunctionNode wrapper with options
success_node = FunctionNode(
my_function_node,
name="hello",
rerun_on_resume=True,
)
```
Creating the node wrapper code yourself can be useful if you are wrapping
functions from an external library, need to create multiple nodes from the same
function with different configurations, or if you are managing node references
in a registry for advanced orchestration.
### Workflows
In an ADK dynamic workflow, you use the ***Workflow*** class as a primary
container for orchestrating nodes. You use a node to define a dynamic workflow
with code that manages running nodes and the execution logic (order and paths)
for those nodes, as shown in the following code sample:
```python
@node(rerun_on_resume=True)
async def my_workflow(ctx):
# run_node executes a node and returns its output
result = await ctx.run_node(my_function_node, input_data="Hello")
result_formatted = await ctx.run_node(my_formatting_node, input_data=result)
return result_formatted
# Run the workflow
root_agent = Workflow(
name="root_agent",
edges=[("START", my_workflow)],
)
```
## Data handling
When using dynamic workflows with ADK, passing data is simpler than
[graph-based workflows](/workflows/) because, with a workflow,
the ***Context*** class's ***run_node()*** method returns the node's output
directly. This eliminates the need to directly handle session state or complex
routing outputs for data transfer. The following code example shows how you can
pass string data between an agent node and a function node:
```python
from google.adk import Context
@node(rerun_on_resume=True)
async def editorial_workflow(ctx: Context, user_request: str):
# Agent Node generates output
raw_draft = await ctx.run_node(draft_agent, user_request)
# Function Node formats text
formatted_text = await ctx.run_node(format_function_node, raw_draft)
return formatted_text
```
You can also pass specific data schemas using defined class and configure input
and output schemas, similar to graph-based workflow nodes, as shown in the
following code example:
```python
from google.adk import Agent
from google.adk import Context
from pydantic import BaseModel
class CityTime(BaseModel):
time_info: str # time information
city: str # city name
@node
def city_time_function(city: str):
"""Simulate returning the current time in a specified city."""
return CityTime(time_info="10:10 AM", city=city)
city_report_agent = Agent(
name="city_report_agent",
model="gemini-2.5-flash",
input_schema=CityTime,
instruction="""output the data provided by the previous node.""",
)
@node # workflow node
async def city_workflow(ctx: Context):
city_time = await ctx.run_node(city_time_function, "Paris")
report_text = await ctx.run_node(city_report_agent, city_time)
return report_text
```
For more information on data handling between workflow nodes, see
[Data handling for agent workflows](/workflows/data-handling/).
## Workflow routes
Dynamic workflows in ADK provide more flexibility in terms of routing logic
compared to [graph-based workflows](/workflows/), including
iterative loops or more complex branching logic. This section describes some of
the techniques that you can use for routing.
### Sequence route
You can create sequential task processing with dynamic workflows in ADK, just
as you can with graph-based workflows. The following code snippet shows a
dynamic workflow with an agent, a function node, and a second agent:
```python
@node # workflow node
async def city_workflow(ctx: Context):
city = await ctx.run_node(city_generator_agent)
city_time = await ctx.run_node(city_time_function, city)
report_text = await ctx.run_node(city_report_agent, city_time)
return report_text
```
### Loop route
For workflows where you want to use an iterative loop for a task, dynamic
workflows offer much more flexibility to define the routing logic you need. The
following code example shows how to use dynamic workflows to construct a
workflow loop for generating, reviewing, and updating code:
```python
coder_agent = LlmAgent(
name="generator_agent",
model="gemini-2.5-flash",
instruction="Write python code for user request.",
output_schema=str,
)
@node(name="lint_reviewer")
compile_lint_check = ApiNode()
fixer_agent = LlmAgent(
name="generator_agent",
model="gemini-2.5-flash",
instruction="""Refactor current code {code}.
Based on compile & lint review: {findings}""",
output_schema=str,
)
@node # workflow node
async def code_workflow(ctx):
code = await ctx.run_node(coder_agent)
check_resp = await ctx.run_node(compile_lint_check, code)
while check_resp.findings:
yield Event(state={"code": code, "findings": check_resp.findings})
code = await ctx.run_node(fixer_agent)
check_resp = await ctx.run_node(compile_lint_check, code)
return code
```
### Parallel execution routes
Dynamic workflows in ADK can support parallel execution, and you can use
standard asynchronous libraries, such as the `asyncio`, to build this
functionality. The following code example shows how to build a workflow node
that supports parallel execution, which can then be integrated into a larger
workflow:
```python
from google.adk.workflow import BaseNode
from google.adk import Context
from typing import Any
import asyncio
class ParallelNode(BaseNode):
"""A supervisor node that runs a worker node in parallel."""
real_node: BaseNode
async def run(self, ctx: Context, node_input: list[Any]):
tasks = []
# Dynamically schedule worker nodes for each item in the input list
for item in node_input:
# ctx.run_node returns an awaitable future for the ephemeral node
tasks.append(ctx.run_node(self.real_node, item))
# Use asyncio to gather results in parallel
results = await asyncio.gather(*tasks)
return results
```
!!! tip "Tip: Resuming parallel nodes"
The workflow framework ensures that if a dynamic workflow is resumed, only
failed or interrupted worker nodes are re-executed, including parallel worker
nodes.
## Human input
Dynamic workflows in ADK can also include human input or human in the loop
(HITL) steps. You build human input into workflows by creating a ***BaseNode***
subclass that interrupts the workflow, combined with a ***RequestInput***
instance for providing a request to the user and retrieving the response. The
following code example shows how to build a human input node and include it in a
workflow:
```python
from google.adk.workflow import BaseNode
from google.adk import Context
from google.adk.events import RequestInput
from typing import Any, AsyncGenerator
class GetInput(BaseNode):
"""A node that pauses execution and waits for human input."""
rerun_on_resume = False # Ensure the response is yielded as output on resume
def __init__(self, request: RequestInput, name: str):
self.request = request
self.name = name
def get_name(self) -> str:
return self.name
async def run(self) -> AsyncGenerator[Any, None]:
# Yielding the request tells the workflow to pause and wait for input
yield self.request
async def approval_process_node(ctx: Context, node_input: Any):
"""A parent node that coordinates a human approval step."""
# Define the request for the user
request = RequestInput(message="Please approve this request (Yes/No)")
# Invoke the HITL node dynamically. The workflow pauses here.
user_response = await ctx.run_node(GetInput(request, name="approval_step"))
if user_response.lower() == "yes":
return "Request Approved"
else:
return "Request Denied"
```
## Advanced features
Dynamic workflows offer some advanced features designed to handle more complex
development scenarios. These capabilities allow for finer control over execution
and better integration with existing technical infrastructure.
### Execution IDs
The ADK framework generates a deterministic identifier (ID) for child node
executions based on the parent ID and a counter. ADK workflows use deterministic
IDs for each scheduled node to identify previous results. These IDs are
generated based on the order of dynamic node schedules, and are used for
checkpointing and to re-run tasks in the correct order in the case of a resumed
or re-run workflow.
#### Custom execution IDs
In some rare cases, you may need to have stable identifiers, such as when
processing a reorderable list, you can supply a custom ID when running a node.
In general, you should avoid this due to the impacts to workflow task retries
and process resumes. Specifically, these IDs are used to check node states and
skip execution if a node was already run. If you provide custom IDs, make sure
they are deterministic for workflow re-runs and logically remain the same for
the input. The following example code shows how to add such an identifier when
executing node in a workflow:
!!! warning "Warning: Custom execution IDs"
Avoid creating custom execution IDs. Since execution IDs are used to determine
the execution order of nodes, custom execution IDs can cause problems when the
system attempts to re-run those nodes in your workflow.
```python
class Order(BaseModel):
order_id: str
cart_items: list[Product]
def shorten_link(ctx, node_input: str):
orders = await get_orders()
process_tasks = []
for i, order in enumerate(orders):
task = ctx.run_node(process_order, order, name=order.order_id))
process_tasks.append(task)
result = asyncio.gather(*process_tasks)
yield result
```