Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 94 additions & 9 deletions src/google/adk/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pathlib import Path
import queue
import sys
import threading
from typing import Any
from typing import AsyncGenerator
from typing import Callable
Expand Down Expand Up @@ -118,6 +119,16 @@ class Runner:
resumability_config: The resumability config for the application.
"""

# Default maximum number of concurrent compaction tasks allowed.
DEFAULT_MAX_CONCURRENT_COMPACTIONS = 10

# Semaphore to limit concurrent event compaction tasks to prevent resource
# exhaustion under high concurrency. Limits concurrent LLM calls and DB writes.
# Shared across all Runner instances for global concurrency control.
_compaction_semaphore: Optional[asyncio.Semaphore] = None
# Thread lock to protect semaphore initialization in multi-threaded scenarios
_compaction_semaphore_lock = threading.Lock()

app_name: str
"""The app name of the runner."""
agent: BaseAgent
Expand Down Expand Up @@ -150,6 +161,7 @@ def __init__(
credential_service: Optional[BaseCredentialService] = None,
plugin_close_timeout: float = 5.0,
auto_create_session: bool = False,
max_concurrent_compactions: int = DEFAULT_MAX_CONCURRENT_COMPACTIONS,
):
"""Initializes the Runner.

Expand Down Expand Up @@ -179,6 +191,12 @@ def __init__(
auto_create_session: Whether to automatically create a session when
not found. Defaults to False. If False, a missing session raises
ValueError with a helpful message.
max_concurrent_compactions: Maximum number of concurrent event
compaction tasks allowed. Defaults to
DEFAULT_MAX_CONCURRENT_COMPACTIONS (10). This limit is shared across
all Runner instances to prevent resource exhaustion. Higher values
allow more concurrent compactions but consume more resources (LLM
API calls, database connections).

Raises:
ValueError: If `app` is provided along with `agent` or `plugins`, or if
Expand Down Expand Up @@ -206,6 +224,10 @@ def __init__(
) = self._infer_agent_origin(self.agent)
self._app_name_alignment_hint: Optional[str] = None
self._enforce_app_name_alignment()
# Initialize or update the shared compaction semaphore
self._initialize_compaction_semaphore(max_concurrent_compactions)
# Track background tasks to prevent premature garbage collection
self._background_tasks: set[asyncio.Task] = set()

def _validate_runner_params(
self,
Expand Down Expand Up @@ -320,6 +342,30 @@ def _infer_agent_origin(
return None, origin_dir
return origin_name, origin_dir

@classmethod
def _initialize_compaction_semaphore(cls, limit: int) -> None:
"""Initializes or updates the shared compaction semaphore.

This method ensures the class-level semaphore is initialized with the
specified limit. If a semaphore already exists, it creates a new one
with the updated limit (the old one will be garbage collected once all
pending tasks complete).

Thread-safe: Uses a threading.Lock to protect against race conditions
when multiple Runner instances are created concurrently in different threads.

Args:
limit: Maximum number of concurrent compaction tasks allowed.
"""
if limit <= 0:
raise ValueError(
f'max_concurrent_compactions must be positive, got {limit}'
)
# Use threading lock to ensure thread-safe initialization in multi-threaded
# scenarios. We can't use async lock here since this is called from __init__.
with cls._compaction_semaphore_lock:
cls._compaction_semaphore = asyncio.Semaphore(limit)

def _enforce_app_name_alignment(self) -> None:
origin_name = self._agent_origin_app_name
origin_dir = self._agent_origin_dir
Expand Down Expand Up @@ -401,8 +447,8 @@ def run(

If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The generator will only finish iterating after event
compaction is complete.
yielded. Compaction runs as a background task and does not block the
generator from completing.

Args:
user_id: The user ID of the session.
Expand Down Expand Up @@ -464,9 +510,10 @@ async def run_async(

If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The async generator will only finish iterating after event
compaction is complete. However, this does not block new `run_async`
calls for subsequent user queries, which can be started concurrently.
yielded. Compaction runs as a background task and does not block the
generator from completing, allowing the frontend to receive responses
without delay. However, this does not block new `run_async` calls for
subsequent user queries, which can be started concurrently.

Args:
user_id: The user ID of the session.
Expand Down Expand Up @@ -552,11 +599,40 @@ async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]:
# Run compaction after all events are yielded from the agent.
# (We don't compact in the middle of an invocation, we only compact at
# the end of an invocation.)
# Run compaction as a background task to avoid blocking the generator
# completion, which causes delays on the frontend. Use a semaphore to
# limit concurrent compactions and prevent resource exhaustion under
# high concurrency.
if self.app and self.app.events_compaction_config:
logger.debug('Running event compactor.')
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
)
logger.debug('Scheduling event compactor in background.')

async def _run_compaction_with_error_handling():
try:
# Ensure semaphore is initialized (should always be after __init__)
if self._compaction_semaphore is None:
logger.warning(
'Compaction semaphore not initialized, using default limit.'
)
self._initialize_compaction_semaphore(
self.DEFAULT_MAX_CONCURRENT_COMPACTIONS
)
async with self._compaction_semaphore:
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
)
except asyncio.CancelledError:
logger.debug('Event compaction cancelled.')
raise
except Exception as e:
logger.error(
'Event compaction failed but not blocking response: %s',
e,
exc_info=True,
)

task = asyncio.create_task(_run_compaction_with_error_handling())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)

async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen:
async for event in agen:
Expand Down Expand Up @@ -1535,6 +1611,15 @@ async def close(self):
if self.plugin_manager:
await self.plugin_manager.close()

# Wait for background compaction tasks to complete
if self._background_tasks:
logger.debug(
'Waiting for %d background compaction tasks to complete...',
len(self._background_tasks),
)
await asyncio.gather(*self._background_tasks, return_exceptions=True)
self._background_tasks.clear()

logger.info('Runner closed.')

if sys.version_info < (3, 11):
Expand Down
Loading