From 39aab40bb14f6f376f631b02109daf4884740d2b Mon Sep 17 00:00:00 2001 From: Bryce Cole Date: Sun, 21 Dec 2025 16:49:56 -0600 Subject: [PATCH] fix: a2a use artifact update event fix: update tests fix: simplify code by storing in class fix: remove uneeded code change --- src/strands/multiagent/a2a/executor.py | 44 +++++++++++++------ tests/strands/multiagent/a2a/test_executor.py | 40 ++++++++++++----- 2 files changed, 61 insertions(+), 23 deletions(-) diff --git a/src/strands/multiagent/a2a/executor.py b/src/strands/multiagent/a2a/executor.py index 52b6d2ef1..09d1b544d 100644 --- a/src/strands/multiagent/a2a/executor.py +++ b/src/strands/multiagent/a2a/executor.py @@ -12,13 +12,14 @@ import json import logging import mimetypes +import uuid from typing import Any, Literal from a2a.server.agent_execution import AgentExecutor, RequestContext from a2a.server.events import EventQueue from a2a.server.tasks import TaskUpdater -from a2a.types import DataPart, FilePart, InternalError, Part, TaskState, TextPart, UnsupportedOperationError -from a2a.utils import new_agent_text_message, new_task +from a2a.types import DataPart, FilePart, InternalError, Part, TextPart, UnsupportedOperationError +from a2a.utils import new_task from a2a.utils.errors import ServerError from ...agent.agent import Agent as SAAgent @@ -104,12 +105,18 @@ async def _execute_streaming(self, context: RequestContext, updater: TaskUpdater else: raise ValueError("No content blocks available") + self._current_artifact_id = str(uuid.uuid4()) + self._is_first_chunk = True + try: async for event in self.agent.stream_async(content_blocks): await self._handle_streaming_event(event, updater) except Exception: logger.exception("Error in streaming execution") raise + finally: + self._current_artifact_id = None + self._is_first_chunk = True async def _handle_streaming_event(self, event: dict[str, Any], updater: TaskUpdater) -> None: """Handle a single streaming event from the Strands Agent. @@ -125,31 +132,42 @@ async def _handle_streaming_event(self, event: dict[str, Any], updater: TaskUpda logger.debug("Streaming event: %s", event) if "data" in event: if text_content := event["data"]: - await updater.update_status( - TaskState.working, - new_agent_text_message( - text_content, - updater.context_id, - updater.task_id, - ), + await updater.add_artifact( + [Part(root=TextPart(text=text_content))], + artifact_id=self._current_artifact_id, + name="agent_response", + append=not self._is_first_chunk, ) + self._is_first_chunk = False elif "result" in event: await self._handle_agent_result(event["result"], updater) async def _handle_agent_result(self, result: SAAgentResult | None, updater: TaskUpdater) -> None: """Handle the final result from the Strands Agent. - Processes the agent's final result, extracts text content from the response, - and adds it as an artifact to the task before marking the task as complete. + Sends the final artifact chunk marker and marks the task as complete. + If no data chunks were previously sent, includes the result content. Args: result: The agent result object containing the final response, or None if no result. updater: The task updater for managing task state and adding the final artifact. """ - if final_content := str(result): + if self._is_first_chunk: + final_content = str(result) if result else "" + parts = [Part(root=TextPart(text=final_content))] if final_content else [] + await updater.add_artifact( + parts, + artifact_id=self._current_artifact_id, + name="agent_response", + last_chunk=True, + ) + else: await updater.add_artifact( - [Part(root=TextPart(text=final_content))], + [], + artifact_id=self._current_artifact_id, name="agent_response", + append=True, + last_chunk=True, ) await updater.complete() diff --git a/tests/strands/multiagent/a2a/test_executor.py b/tests/strands/multiagent/a2a/test_executor.py index 1463d3f48..69ffd54c8 100644 --- a/tests/strands/multiagent/a2a/test_executor.py +++ b/tests/strands/multiagent/a2a/test_executor.py @@ -639,11 +639,19 @@ async def test_handle_agent_result_with_none_result(mock_strands_agent, mock_req mock_updater.complete = AsyncMock() mock_updater.add_artifact = AsyncMock() - # Call _handle_agent_result with None + # Call _handle_agent_result with None (simulating data chunks were sent) + executor._current_artifact_id = "test-artifact-id" + executor._is_first_chunk = False await executor._handle_agent_result(None, mock_updater) - # Verify completion was called + # Verify completion was called and artifact with last_chunk was sent mock_updater.complete.assert_called_once() + mock_updater.add_artifact.assert_called_once() + + # Verify the artifact was called with empty parts and last_chunk=True + call_args = mock_updater.add_artifact.call_args + assert call_args[0][0] == [] # Empty parts + assert call_args[1]["last_chunk"] is True @pytest.mark.asyncio @@ -668,16 +676,24 @@ async def test_handle_agent_result_with_result_but_no_message( mock_result = MagicMock(spec=SAAgentResult) mock_result.message = None - # Call _handle_agent_result + # Call _handle_agent_result (simulating data chunks were sent) + executor._current_artifact_id = "test-artifact-id" + executor._is_first_chunk = False await executor._handle_agent_result(mock_result, mock_updater) - # Verify completion was called + # Verify completion was called and artifact with last_chunk was sent mock_updater.complete.assert_called_once() + mock_updater.add_artifact.assert_called_once() + + # Verify the artifact was called with empty parts and last_chunk=True + call_args = mock_updater.add_artifact.call_args + assert call_args[0][0] == [] # Empty parts + assert call_args[1]["last_chunk"] is True @pytest.mark.asyncio async def test_handle_agent_result_with_content(mock_strands_agent): - """Test that _handle_agent_result handles result with content correctly.""" + """Test that _handle_agent_result handles result with content correctly when chunks were sent.""" executor = StrandsA2AExecutor(mock_strands_agent) # Mock TaskUpdater @@ -689,17 +705,21 @@ async def test_handle_agent_result_with_content(mock_strands_agent): mock_result = MagicMock(spec=SAAgentResult) mock_result.__str__ = MagicMock(return_value="Test response content") - # Call _handle_agent_result + # Call _handle_agent_result (simulating data chunks were already sent) + executor._current_artifact_id = "test-artifact-id" + executor._is_first_chunk = False await executor._handle_agent_result(mock_result, mock_updater) # Verify artifact was added and task completed mock_updater.add_artifact.assert_called_once() mock_updater.complete.assert_called_once() - # Check that the artifact contains the expected content - call_args = mock_updater.add_artifact.call_args[0][0] - assert len(call_args) == 1 - assert call_args[0].root.text == "Test response content" + # Check that empty parts were sent with last_chunk=True (since chunks were already sent) + call_args = mock_updater.add_artifact.call_args + assert call_args[0][0] == [] # Empty parts + assert call_args[1]["artifact_id"] == "test-artifact-id" + assert call_args[1]["last_chunk"] is True + assert call_args[1]["append"] is True def test_handle_conversion_error():