From 6fc702ddcc4f7fc11b0adce3f8950feb2df92663 Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Fri, 30 Jan 2026 11:08:07 -0500 Subject: [PATCH 1/2] fix event compaction to nonblocking --- src/google/adk/runners.py | 78 ++++++- tests/unittests/test_runners.py | 362 ++++++++++++++++++++++++++++++++ 2 files changed, 431 insertions(+), 9 deletions(-) diff --git a/src/google/adk/runners.py b/src/google/adk/runners.py index 545a0e83e6..aaeb361754 100644 --- a/src/google/adk/runners.py +++ b/src/google/adk/runners.py @@ -118,6 +118,11 @@ class Runner: resumability_config: The resumability config for the application. """ + # Semaphore to limit concurrent event compaction tasks to prevent resource + # exhaustion under high concurrency. Limits concurrent LLM calls and DB writes. + # Shared across all Runner instances for global concurrency control. + _compaction_semaphore: Optional[asyncio.Semaphore] = None + app_name: str """The app name of the runner.""" agent: BaseAgent @@ -150,6 +155,7 @@ def __init__( credential_service: Optional[BaseCredentialService] = None, plugin_close_timeout: float = 5.0, auto_create_session: bool = False, + max_concurrent_compactions: int = 10, ): """Initializes the Runner. @@ -179,6 +185,11 @@ def __init__( auto_create_session: Whether to automatically create a session when not found. Defaults to False. If False, a missing session raises ValueError with a helpful message. + max_concurrent_compactions: Maximum number of concurrent event + compaction tasks allowed. Defaults to 10. This limit is shared across + all Runner instances to prevent resource exhaustion. Higher values + allow more concurrent compactions but consume more resources (LLM + API calls, database connections). Raises: ValueError: If `app` is provided along with `agent` or `plugins`, or if @@ -206,6 +217,8 @@ def __init__( ) = self._infer_agent_origin(self.agent) self._app_name_alignment_hint: Optional[str] = None self._enforce_app_name_alignment() + # Initialize or update the shared compaction semaphore + self._initialize_compaction_semaphore(max_concurrent_compactions) def _validate_runner_params( self, @@ -320,6 +333,27 @@ def _infer_agent_origin( return None, origin_dir return origin_name, origin_dir + @classmethod + def _initialize_compaction_semaphore(cls, limit: int) -> None: + """Initializes or updates the shared compaction semaphore. + + This method ensures the class-level semaphore is initialized with the + specified limit. If a semaphore already exists, it creates a new one + with the updated limit (the old one will be garbage collected once all + pending tasks complete). + + Args: + limit: Maximum number of concurrent compaction tasks allowed. + """ + if limit <= 0: + raise ValueError( + f'max_concurrent_compactions must be positive, got {limit}' + ) + # Note: We can't use async lock here since this is called from __init__. + # The semaphore creation itself is thread-safe, and in practice Runner + # instances are created in the same event loop, so this is safe. + cls._compaction_semaphore = asyncio.Semaphore(limit) + def _enforce_app_name_alignment(self) -> None: origin_name = self._agent_origin_app_name origin_dir = self._agent_origin_dir @@ -401,8 +435,8 @@ def run( If event compaction is enabled in the App configuration, it will be performed after all agent events for the current invocation have been - yielded. The generator will only finish iterating after event - compaction is complete. + yielded. Compaction runs as a background task and does not block the + generator from completing. Args: user_id: The user ID of the session. @@ -464,9 +498,10 @@ async def run_async( If event compaction is enabled in the App configuration, it will be performed after all agent events for the current invocation have been - yielded. The async generator will only finish iterating after event - compaction is complete. However, this does not block new `run_async` - calls for subsequent user queries, which can be started concurrently. + yielded. Compaction runs as a background task and does not block the + generator from completing, allowing the frontend to receive responses + without delay. However, this does not block new `run_async` calls for + subsequent user queries, which can be started concurrently. Args: user_id: The user ID of the session. @@ -552,11 +587,36 @@ async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]: # Run compaction after all events are yielded from the agent. # (We don't compact in the middle of an invocation, we only compact at # the end of an invocation.) + # Run compaction as a background task to avoid blocking the generator + # completion, which causes delays on the frontend. Use a semaphore to + # limit concurrent compactions and prevent resource exhaustion under + # high concurrency. if self.app and self.app.events_compaction_config: - logger.debug('Running event compactor.') - await _run_compaction_for_sliding_window( - self.app, session, self.session_service - ) + logger.debug('Scheduling event compactor in background.') + + async def _run_compaction_with_error_handling(): + try: + # Ensure semaphore is initialized (should always be after __init__) + if self._compaction_semaphore is None: + logger.warning( + 'Compaction semaphore not initialized, using default limit.' + ) + self._initialize_compaction_semaphore(10) + async with self._compaction_semaphore: + await _run_compaction_for_sliding_window( + self.app, session, self.session_service + ) + except asyncio.CancelledError: + logger.debug('Event compaction cancelled.') + raise + except Exception as e: + logger.error( + 'Event compaction failed but not blocking response: %s', + e, + exc_info=True, + ) + + asyncio.create_task(_run_compaction_with_error_handling()) async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen: async for event in agen: diff --git a/tests/unittests/test_runners.py b/tests/unittests/test_runners.py index 62b8d7334b..a66f859eab 100644 --- a/tests/unittests/test_runners.py +++ b/tests/unittests/test_runners.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import importlib from pathlib import Path import sys @@ -27,6 +28,7 @@ from google.adk.agents.llm_agent import LlmAgent from google.adk.agents.run_config import RunConfig from google.adk.apps.app import App +from google.adk.apps.app import EventsCompactionConfig from google.adk.apps.app import ResumabilityConfig from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService from google.adk.cli.utils.agent_loader import AgentLoader @@ -1321,5 +1323,365 @@ def test_infer_agent_origin_detects_mismatch_for_user_agent( assert "actual_name" in runner._app_name_alignment_hint +class TestRunnerCompaction: + """Tests for Runner event compaction features.""" + + def setup_method(self): + """Set up test fixtures.""" + self.session_service = InMemorySessionService() + self.artifact_service = InMemoryArtifactService() + self.agent = MockLlmAgent("test_agent") + + @pytest.mark.asyncio + async def test_max_concurrent_compactions_default_value(self): + """Test that max_concurrent_compactions defaults to 10.""" + runner = Runner( + app_name="test_app", + agent=self.agent, + session_service=self.session_service, + artifact_service=self.artifact_service, + ) + # Semaphore should be initialized + assert runner._compaction_semaphore is not None + + @pytest.mark.asyncio + async def test_max_concurrent_compactions_custom_value(self): + """Test that max_concurrent_compactions can be configured.""" + runner = Runner( + app_name="test_app", + agent=self.agent, + session_service=self.session_service, + artifact_service=self.artifact_service, + max_concurrent_compactions=5, + ) + assert runner._compaction_semaphore is not None + + @pytest.mark.asyncio + async def test_max_concurrent_compactions_shared_across_instances(self): + """Test that semaphore is shared across Runner instances.""" + runner1 = Runner( + app_name="test_app", + agent=self.agent, + session_service=self.session_service, + artifact_service=self.artifact_service, + max_concurrent_compactions=7, + ) + runner2 = Runner( + app_name="test_app", + agent=self.agent, + session_service=self.session_service, + artifact_service=self.artifact_service, + max_concurrent_compactions=3, + ) + # Both should reference the same class-level semaphore + assert runner1._compaction_semaphore is runner2._compaction_semaphore + + @pytest.mark.asyncio + async def test_max_concurrent_compactions_validation(self): + """Test that invalid max_concurrent_compactions raises ValueError.""" + with pytest.raises(ValueError, match="must be positive"): + Runner( + app_name="test_app", + agent=self.agent, + session_service=self.session_service, + artifact_service=self.artifact_service, + max_concurrent_compactions=0, + ) + with pytest.raises(ValueError, match="must be positive"): + Runner( + app_name="test_app", + agent=self.agent, + session_service=self.session_service, + artifact_service=self.artifact_service, + max_concurrent_compactions=-1, + ) + + @pytest.mark.asyncio + async def test_compaction_runs_in_background_non_blocking(self): + """Test that compaction runs in background and doesn't block generator.""" + # Create app with compaction config + app = App( + name="test_app", + root_agent=self.agent, + events_compaction_config=EventsCompactionConfig( + compaction_interval=1, # Compact after every invocation + overlap_size=0, + ), + ) + runner = Runner( + app=app, + session_service=self.session_service, + artifact_service=self.artifact_service, + ) + + # Create session + await self.session_service.create_session( + app_name="test_app", user_id=TEST_USER_ID, session_id=TEST_SESSION_ID + ) + + # Track timing + generator_start_time = None + generator_end_time = None + compaction_start_time = None + compaction_end_time = None + + async def mock_compaction(*args, **kwargs): + nonlocal compaction_start_time, compaction_end_time + compaction_start_time = asyncio.get_event_loop().time() + # Simulate slow compaction (100ms) + await asyncio.sleep(0.1) + compaction_end_time = asyncio.get_event_loop().time() + + # Mock the compaction function to track timing + from google.adk.apps import compaction + + original_compaction = compaction._run_compaction_for_sliding_window + compaction._run_compaction_for_sliding_window = mock_compaction + + try: + # Run multiple invocations to ensure compaction triggers + # (need compaction_interval=1 new invocations) + for run_num in range(2): + generator_start_time = asyncio.get_event_loop().time() + events = [] + async for event in runner.run_async( + user_id=TEST_USER_ID, + session_id=TEST_SESSION_ID, + new_message=types.Content( + role="user", parts=[types.Part(text=f"Message {run_num}")] + ), + ): + events.append(event) + generator_end_time = asyncio.get_event_loop().time() + + # Generator should complete quickly (not waiting for compaction) + generator_duration = generator_end_time - generator_start_time + assert generator_duration < 0.05, ( + f"Generator took {generator_duration}s, should complete quickly" + " without waiting for compaction" + ) + + # Give background compaction tasks time to run + await asyncio.sleep(0.2) + + # If compaction ran, verify it happened after generator completed + if compaction_start_time is not None: + assert compaction_start_time >= generator_end_time, ( + "Compaction should start after generator completes, " + f"but started at {compaction_start_time} and generator ended at " + f"{generator_end_time}" + ) + finally: + # Restore original function + compaction._run_compaction_for_sliding_window = original_compaction + + @pytest.mark.asyncio + async def test_compaction_semaphore_limits_concurrency(self): + """Test that semaphore limits concurrent compaction tasks.""" + app = App( + name="test_app", + root_agent=self.agent, + events_compaction_config=EventsCompactionConfig( + compaction_interval=1, + overlap_size=0, + ), + ) + runner = Runner( + app=app, + session_service=self.session_service, + artifact_service=self.artifact_service, + max_concurrent_compactions=2, # Limit to 2 concurrent + ) + + # Track concurrent compactions + concurrent_count = 0 + max_concurrent = 0 + compaction_lock = asyncio.Lock() + + async def mock_compaction(*args, **kwargs): + nonlocal concurrent_count, max_concurrent + async with compaction_lock: + concurrent_count += 1 + max_concurrent = max(max_concurrent, concurrent_count) + # Simulate slow compaction + await asyncio.sleep(0.1) + async with compaction_lock: + concurrent_count -= 1 + + # Mock compaction function + from google.adk.apps import compaction + + original_compaction = compaction._run_compaction_for_sliding_window + compaction._run_compaction_for_sliding_window = mock_compaction + + try: + # Create multiple sessions and trigger compactions simultaneously + tasks = [] + for i in range(5): # Try to start 5 compactions + session = await self.session_service.create_session( + app_name="test_app", + user_id=f"user{i}", + session_id=f"session{i}", + ) + # Add events to trigger compaction + for j in range(2): + event = Event( + invocation_id=f"inv{j}", + author="user", + content=types.Content( + role="user", parts=[types.Part(text=f"Message {j}")] + ), + timestamp=float(j), + ) + await self.session_service.append_event(session=session, event=event) + + # Start run_async in background + task = asyncio.create_task( + self._consume_events( + runner, + f"user{i}", + f"session{i}", + types.Content( + role="user", parts=[types.Part(text="New message")] + ), + ) + ) + tasks.append(task) + + # Wait a bit for compactions to start + await asyncio.sleep(0.05) + + # Check that max concurrent is limited by semaphore + assert max_concurrent <= 2, ( + f"Max concurrent compactions ({max_concurrent}) should not exceed" + " semaphore limit (2)" + ) + + # Wait for all tasks to complete + await asyncio.gather(*tasks) + finally: + compaction._run_compaction_for_sliding_window = original_compaction + + @pytest.mark.asyncio + async def test_compaction_error_does_not_block_generator(self): + """Test that compaction errors don't block the generator.""" + app = App( + name="test_app", + root_agent=self.agent, + events_compaction_config=EventsCompactionConfig( + compaction_interval=1, + overlap_size=0, + ), + ) + runner = Runner( + app=app, + session_service=self.session_service, + artifact_service=self.artifact_service, + ) + + session = await self.session_service.create_session( + app_name="test_app", user_id=TEST_USER_ID, session_id=TEST_SESSION_ID + ) + + # Add events to trigger compaction + for i in range(2): + event = Event( + invocation_id=f"inv{i}", + author="user", + content=types.Content( + role="user", parts=[types.Part(text=f"Message {i}")] + ), + timestamp=float(i), + ) + await self.session_service.append_event(session=session, event=event) + + # Mock compaction to raise an error + from google.adk.apps import compaction + + original_compaction = compaction._run_compaction_for_sliding_window + + async def failing_compaction(*args, **kwargs): + raise RuntimeError("Compaction failed") + + compaction._run_compaction_for_sliding_window = failing_compaction + + try: + # Generator should complete successfully despite compaction error + events = [] + async for event in runner.run_async( + user_id=TEST_USER_ID, + session_id=TEST_SESSION_ID, + new_message=types.Content( + role="user", parts=[types.Part(text="New message")] + ), + ): + events.append(event) + + # Generator should have completed + assert len(events) > 0, "Generator should yield events" + finally: + compaction._run_compaction_for_sliding_window = original_compaction + + @pytest.mark.asyncio + async def test_compaction_not_run_when_config_missing(self): + """Test that compaction is not run when config is missing.""" + # App without compaction config + app = App(name="test_app", root_agent=self.agent) + runner = Runner( + app=app, + session_service=self.session_service, + artifact_service=self.artifact_service, + ) + + session = await self.session_service.create_session( + app_name="test_app", user_id=TEST_USER_ID, session_id=TEST_SESSION_ID + ) + + # Mock compaction to verify it's not called + from google.adk.apps import compaction + + original_compaction = compaction._run_compaction_for_sliding_window + compaction_called = False + + async def track_compaction(*args, **kwargs): + nonlocal compaction_called + compaction_called = True + + compaction._run_compaction_for_sliding_window = track_compaction + + try: + async for event in runner.run_async( + user_id=TEST_USER_ID, + session_id=TEST_SESSION_ID, + new_message=types.Content( + role="user", parts=[types.Part(text="New message")] + ), + ): + pass + + # Give background tasks time to run + await asyncio.sleep(0.1) + assert ( + not compaction_called + ), "Compaction should not be called without config" + finally: + compaction._run_compaction_for_sliding_window = original_compaction + + async def _consume_events( + self, + runner: Runner, + user_id: str, + session_id: str, + new_message: types.Content, + ) -> list[Event]: + """Helper to consume all events from run_async.""" + events = [] + async for event in runner.run_async( + user_id=user_id, session_id=session_id, new_message=new_message + ): + events.append(event) + return events + + if __name__ == "__main__": pytest.main([__file__]) From 18f4da5c2799686766be8193998d9ecf5bb74bac Mon Sep 17 00:00:00 2001 From: Luyang Wang Date: Fri, 30 Jan 2026 15:01:03 -0500 Subject: [PATCH 2/2] addressing feedback --- src/google/adk/runners.py | 41 ++++++++++++++++---- tests/unittests/test_runners.py | 69 +++++++++++++++++++++++++++++++-- 2 files changed, 98 insertions(+), 12 deletions(-) diff --git a/src/google/adk/runners.py b/src/google/adk/runners.py index aaeb361754..537a282b93 100644 --- a/src/google/adk/runners.py +++ b/src/google/adk/runners.py @@ -20,6 +20,7 @@ from pathlib import Path import queue import sys +import threading from typing import Any from typing import AsyncGenerator from typing import Callable @@ -118,10 +119,15 @@ class Runner: resumability_config: The resumability config for the application. """ + # Default maximum number of concurrent compaction tasks allowed. + DEFAULT_MAX_CONCURRENT_COMPACTIONS = 10 + # Semaphore to limit concurrent event compaction tasks to prevent resource # exhaustion under high concurrency. Limits concurrent LLM calls and DB writes. # Shared across all Runner instances for global concurrency control. _compaction_semaphore: Optional[asyncio.Semaphore] = None + # Thread lock to protect semaphore initialization in multi-threaded scenarios + _compaction_semaphore_lock = threading.Lock() app_name: str """The app name of the runner.""" @@ -155,7 +161,7 @@ def __init__( credential_service: Optional[BaseCredentialService] = None, plugin_close_timeout: float = 5.0, auto_create_session: bool = False, - max_concurrent_compactions: int = 10, + max_concurrent_compactions: int = DEFAULT_MAX_CONCURRENT_COMPACTIONS, ): """Initializes the Runner. @@ -186,7 +192,8 @@ def __init__( not found. Defaults to False. If False, a missing session raises ValueError with a helpful message. max_concurrent_compactions: Maximum number of concurrent event - compaction tasks allowed. Defaults to 10. This limit is shared across + compaction tasks allowed. Defaults to + DEFAULT_MAX_CONCURRENT_COMPACTIONS (10). This limit is shared across all Runner instances to prevent resource exhaustion. Higher values allow more concurrent compactions but consume more resources (LLM API calls, database connections). @@ -219,6 +226,8 @@ def __init__( self._enforce_app_name_alignment() # Initialize or update the shared compaction semaphore self._initialize_compaction_semaphore(max_concurrent_compactions) + # Track background tasks to prevent premature garbage collection + self._background_tasks: set[asyncio.Task] = set() def _validate_runner_params( self, @@ -342,6 +351,9 @@ def _initialize_compaction_semaphore(cls, limit: int) -> None: with the updated limit (the old one will be garbage collected once all pending tasks complete). + Thread-safe: Uses a threading.Lock to protect against race conditions + when multiple Runner instances are created concurrently in different threads. + Args: limit: Maximum number of concurrent compaction tasks allowed. """ @@ -349,10 +361,10 @@ def _initialize_compaction_semaphore(cls, limit: int) -> None: raise ValueError( f'max_concurrent_compactions must be positive, got {limit}' ) - # Note: We can't use async lock here since this is called from __init__. - # The semaphore creation itself is thread-safe, and in practice Runner - # instances are created in the same event loop, so this is safe. - cls._compaction_semaphore = asyncio.Semaphore(limit) + # Use threading lock to ensure thread-safe initialization in multi-threaded + # scenarios. We can't use async lock here since this is called from __init__. + with cls._compaction_semaphore_lock: + cls._compaction_semaphore = asyncio.Semaphore(limit) def _enforce_app_name_alignment(self) -> None: origin_name = self._agent_origin_app_name @@ -601,7 +613,9 @@ async def _run_compaction_with_error_handling(): logger.warning( 'Compaction semaphore not initialized, using default limit.' ) - self._initialize_compaction_semaphore(10) + self._initialize_compaction_semaphore( + self.DEFAULT_MAX_CONCURRENT_COMPACTIONS + ) async with self._compaction_semaphore: await _run_compaction_for_sliding_window( self.app, session, self.session_service @@ -616,7 +630,9 @@ async def _run_compaction_with_error_handling(): exc_info=True, ) - asyncio.create_task(_run_compaction_with_error_handling()) + task = asyncio.create_task(_run_compaction_with_error_handling()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen: async for event in agen: @@ -1595,6 +1611,15 @@ async def close(self): if self.plugin_manager: await self.plugin_manager.close() + # Wait for background compaction tasks to complete + if self._background_tasks: + logger.debug( + 'Waiting for %d background compaction tasks to complete...', + len(self._background_tasks), + ) + await asyncio.gather(*self._background_tasks, return_exceptions=True) + self._background_tasks.clear() + logger.info('Runner closed.') if sys.version_info < (3, 11): diff --git a/tests/unittests/test_runners.py b/tests/unittests/test_runners.py index a66f859eab..3e1f84a586 100644 --- a/tests/unittests/test_runners.py +++ b/tests/unittests/test_runners.py @@ -1334,27 +1334,35 @@ def setup_method(self): @pytest.mark.asyncio async def test_max_concurrent_compactions_default_value(self): - """Test that max_concurrent_compactions defaults to 10.""" + """Test that max_concurrent_compactions defaults to DEFAULT_MAX_CONCURRENT_COMPACTIONS.""" runner = Runner( app_name="test_app", agent=self.agent, session_service=self.session_service, artifact_service=self.artifact_service, ) - # Semaphore should be initialized + # Semaphore should be initialized with default value assert runner._compaction_semaphore is not None + # Verify semaphore's value matches the default constant + assert ( + runner._compaction_semaphore._value + == Runner.DEFAULT_MAX_CONCURRENT_COMPACTIONS + ) @pytest.mark.asyncio async def test_max_concurrent_compactions_custom_value(self): """Test that max_concurrent_compactions can be configured.""" + custom_limit = 5 runner = Runner( app_name="test_app", agent=self.agent, session_service=self.session_service, artifact_service=self.artifact_service, - max_concurrent_compactions=5, + max_concurrent_compactions=custom_limit, ) assert runner._compaction_semaphore is not None + # Verify semaphore's value matches the custom value provided + assert runner._compaction_semaphore._value == custom_limit @pytest.mark.asyncio async def test_max_concurrent_compactions_shared_across_instances(self): @@ -1373,8 +1381,10 @@ async def test_max_concurrent_compactions_shared_across_instances(self): artifact_service=self.artifact_service, max_concurrent_compactions=3, ) - # Both should reference the same class-level semaphore + # Both should reference the same class-level semaphore, and its value + # should be updated by the last Runner instance. assert runner1._compaction_semaphore is runner2._compaction_semaphore + assert runner1._compaction_semaphore._value == 3 @pytest.mark.asyncio async def test_max_concurrent_compactions_validation(self): @@ -1667,6 +1677,57 @@ async def track_compaction(*args, **kwargs): finally: compaction._run_compaction_for_sliding_window = original_compaction + @pytest.mark.asyncio + async def test_background_tasks_tracked_and_cleaned_up(self): + """Test that background compaction tasks are tracked and cleaned up.""" + app = App( + name="test_app", + root_agent=self.agent, + events_compaction_config=EventsCompactionConfig( + compaction_interval=1, + overlap_size=0, + ), + ) + runner = Runner( + app=app, + session_service=self.session_service, + artifact_service=self.artifact_service, + ) + + # Verify background_tasks set exists + assert hasattr(runner, "_background_tasks") + assert isinstance(runner._background_tasks, set) + assert len(runner._background_tasks) == 0 + + # Create session and run invocations to trigger compaction + await self.session_service.create_session( + app_name="test_app", user_id=TEST_USER_ID, session_id=TEST_SESSION_ID + ) + + # Run multiple invocations to trigger compaction tasks + for i in range(2): + async for event in runner.run_async( + user_id=TEST_USER_ID, + session_id=TEST_SESSION_ID, + new_message=types.Content( + role="user", parts=[types.Part(text=f"Message {i}")] + ), + ): + pass + + # Give tasks time to be created + await asyncio.sleep(0.1) + + # Verify tasks are tracked (may be 0 if they completed quickly) + # The important part is that they were tracked, not that they're still running + assert hasattr(runner, "_background_tasks") + + # Close runner - should await background tasks + await runner.close() + + # After close, background tasks should be cleared + assert len(runner._background_tasks) == 0 + async def _consume_events( self, runner: Runner,