Skip to content
39 changes: 39 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Streaming Tools Non-Live Agent

This agent demonstrates streaming tools in non-live mode (run_async/SSE).

## Features

- **monitor_stock_price**: Monitors stock prices with real-time updates
- **process_large_dataset**: Processes datasets with progress updates
- **monitor_system_health**: Monitors system health metrics continuously

## Testing

### With ADK Web UI

```bash
cd contributing/samples
adk web .
```

Then try:
- "Monitor the stock price for AAPL"
- "Process a large dataset at /tmp/data.csv"
- "Monitor system health"

### With ADK CLI

```bash
cd contributing/samples/streaming_tools_non_live_agent
adk run .
```

### With API Server (SSE)

```bash
cd contributing/samples
adk api_server .
```

Then send a POST request to `/run_sse` with `streaming: true` to see intermediate Events.
15 changes: 15 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import agent
128 changes: 128 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example agent demonstrating streaming tools in non-live mode (run_async/SSE).

This agent shows how to use streaming tools that yield intermediate results
in non-live mode. Streaming tools work with both run_async and SSE endpoints.
"""

from __future__ import annotations

import asyncio
from typing import AsyncGenerator

from google.adk.agents import Agent


async def monitor_stock_price(symbol: str) -> AsyncGenerator[dict, None]:
"""Monitor stock price with real-time updates.

This is a streaming tool that yields intermediate results as the stock
price changes. The agent can react to these intermediate results.

Args:
symbol: The stock symbol to monitor (e.g., 'AAPL', 'GOOGL').

Yields:
Dictionary containing stock price updates with status indicators.
"""
# Simulate stock price changes
prices = [100, 105, 110, 108, 112, 115]
for i, price in enumerate(prices):
await asyncio.sleep(1) # Simulate real-time updates
yield {
'symbol': symbol,
'price': price,
'update': i + 1,
'status': 'streaming' if i < len(prices) - 1 else 'complete',
}


async def process_large_dataset(file_path: str) -> AsyncGenerator[dict, None]:
"""Process dataset with progress updates.

This streaming tool demonstrates how to provide progress feedback
for long-running operations.

Args:
file_path: Path to the dataset file to process.

Yields:
Dictionary containing progress information and final result.
"""
total_rows = 100
processed = 0

# Simulate processing in batches
for batch in range(10):
await asyncio.sleep(0.5) # Simulate processing time
processed += 10
yield {
'progress': processed / total_rows,
'processed': processed,
'total': total_rows,
'status': 'streaming',
'message': f'Processed {processed}/{total_rows} rows',
}

# Final result
yield {
'result': 'Processing complete',
'status': 'complete',
'file_path': file_path,
'total_processed': total_rows,
}


async def monitor_system_health() -> AsyncGenerator[dict, None]:
"""Monitor system health metrics with continuous updates.

This streaming tool demonstrates continuous monitoring that can be
stopped by the agent when thresholds are reached.

Yields:
Dictionary containing system health metrics.
"""
metrics = [
{'cpu': 45, 'memory': 60, 'disk': 70},
{'cpu': 50, 'memory': 65, 'disk': 72},
{'cpu': 55, 'memory': 70, 'disk': 75},
{'cpu': 60, 'memory': 75, 'disk': 78},
]

for i, metric in enumerate(metrics):
await asyncio.sleep(2) # Check every 2 seconds
yield {
'metrics': metric,
'timestamp': i + 1,
'status': 'streaming' if i < len(metrics) - 1 else 'complete',
'alert': 'high' if metric['cpu'] > 55 else 'normal',
}


root_agent = Agent(
name='streaming_tools_agent',
model='gemini-2.5-flash-lite',
instruction=(
'You are a helpful assistant that can monitor stock prices, process'
' datasets, and monitor system health using streaming tools. When'
' using streaming tools, you will receive intermediate results that'
' you can react to. For example, if monitoring stock prices, you can'
' alert the user when prices change significantly. If processing a'
' dataset, you can provide progress updates. If monitoring system'
' health, you can alert when metrics exceed thresholds.'
),
tools=[monitor_stock_price, process_large_dataset, monitor_system_health],
)
131 changes: 98 additions & 33 deletions src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,49 +672,114 @@ async def _postprocess_run_processors_async(
async for event in agen:
yield event

async def _postprocess_handle_function_calls_async(
async def _yield_function_response_events(
self,
invocation_context: InvocationContext,
function_call_event: Event,
llm_request: LlmRequest,
function_response_event: Event,
) -> AsyncGenerator[Event, None]:
if function_response_event := await functions.handle_function_calls_async(
invocation_context, function_call_event, llm_request.tools_dict
"""Yields auth, confirmation, and set_model_response events for a function response.

Args:
invocation_context: The invocation context.
function_call_event: The original function call event.
function_response_event: The function response event.

Yields:
Auth events, confirmation events, the function response event, and
set_model_response events if applicable.
"""
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
)
if auth_event:
yield auth_event

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event

yield function_response_event

# Check if this is a set_model_response function response
if json_response := (
_output_schema_processor.get_structured_model_response(
function_response_event
)
):
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
final_event = _output_schema_processor.create_final_model_response_event(
invocation_context, json_response
)
if auth_event:
yield auth_event
yield final_event

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event
async def _postprocess_handle_function_calls_async(
self,
invocation_context: InvocationContext,
function_call_event: Event,
llm_request: LlmRequest,
) -> AsyncGenerator[Event, None]:
function_calls = function_call_event.get_function_calls()
if not function_calls:
return

# Always yield the function response event first
yield function_response_event
# Check if any tools are streaming tools
has_streaming_tools = any(
functions._is_streaming_tool(tool)
for call in function_calls
if (tool := llm_request.tools_dict.get(call.name))
)

# Check if this is a set_model_response function response
if json_response := _output_schema_processor.get_structured_model_response(
function_response_event
if has_streaming_tools:
# Use streaming handler
tool_confirmation_dict = getattr(
invocation_context, 'tool_confirmation_dict', None
)
async for event in functions.handle_function_calls_async_with_streaming(
invocation_context,
function_calls,
llm_request.tools_dict,
tool_confirmation_dict,
):
# Create and yield a final model response event
final_event = (
_output_schema_processor.create_final_model_response_event(
invocation_context, json_response
)
)
yield final_event
transfer_to_agent = function_response_event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(agent_to_run.run_async(invocation_context)) as agen:
async for event in agen:
yield event
async for secondary_event in self._yield_function_response_events(
invocation_context, function_call_event, event
):
yield secondary_event

# Check for agent transfer after each streaming event
transfer_to_agent = event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(
agent_to_run.run_async(invocation_context)
) as agen:
async for transfer_event in agen:
yield transfer_event
# Agent transfer handled, exit the streaming loop
return
else:
# Use regular handler
if function_response_event := await functions.handle_function_calls_async(
invocation_context, function_call_event, llm_request.tools_dict
):
async for secondary_event in self._yield_function_response_events(
invocation_context, function_call_event, function_response_event
):
yield secondary_event

transfer_to_agent = function_response_event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(
agent_to_run.run_async(invocation_context)
) as agen:
async for event in agen:
yield event

def _get_agent_to_run(
self, invocation_context: InvocationContext, agent_name: str
Expand Down
Loading