Skip to content

Commit 4b5ba77

Browse files
committed
feat: a2a use artifact update event
fix: update tests fix: simplify code by storing in class fix: remove uneeded code change
1 parent 0c640e8 commit 4b5ba77

File tree

2 files changed

+61
-23
lines changed

2 files changed

+61
-23
lines changed

src/strands/multiagent/a2a/executor.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212
import json
1313
import logging
1414
import mimetypes
15+
import uuid
1516
from typing import Any, Literal
1617

1718
from a2a.server.agent_execution import AgentExecutor, RequestContext
1819
from a2a.server.events import EventQueue
1920
from a2a.server.tasks import TaskUpdater
20-
from a2a.types import DataPart, FilePart, InternalError, Part, TaskState, TextPart, UnsupportedOperationError
21-
from a2a.utils import new_agent_text_message, new_task
21+
from a2a.types import DataPart, FilePart, InternalError, Part, TextPart, UnsupportedOperationError
22+
from a2a.utils import new_task
2223
from a2a.utils.errors import ServerError
2324

2425
from ...agent.agent import Agent as SAAgent
@@ -104,12 +105,18 @@ async def _execute_streaming(self, context: RequestContext, updater: TaskUpdater
104105
else:
105106
raise ValueError("No content blocks available")
106107

108+
self._current_artifact_id = str(uuid.uuid4())
109+
self._is_first_chunk = True
110+
107111
try:
108112
async for event in self.agent.stream_async(content_blocks):
109113
await self._handle_streaming_event(event, updater)
110114
except Exception:
111115
logger.exception("Error in streaming execution")
112116
raise
117+
finally:
118+
self._current_artifact_id = None
119+
self._is_first_chunk = True
113120

114121
async def _handle_streaming_event(self, event: dict[str, Any], updater: TaskUpdater) -> None:
115122
"""Handle a single streaming event from the Strands Agent.
@@ -125,31 +132,42 @@ async def _handle_streaming_event(self, event: dict[str, Any], updater: TaskUpda
125132
logger.debug("Streaming event: %s", event)
126133
if "data" in event:
127134
if text_content := event["data"]:
128-
await updater.update_status(
129-
TaskState.working,
130-
new_agent_text_message(
131-
text_content,
132-
updater.context_id,
133-
updater.task_id,
134-
),
135+
await updater.add_artifact(
136+
[Part(root=TextPart(text=text_content))],
137+
artifact_id=self._current_artifact_id,
138+
name="agent_response",
139+
append=not self._is_first_chunk,
135140
)
141+
self._is_first_chunk = False
136142
elif "result" in event:
137143
await self._handle_agent_result(event["result"], updater)
138144

139145
async def _handle_agent_result(self, result: SAAgentResult | None, updater: TaskUpdater) -> None:
140146
"""Handle the final result from the Strands Agent.
141147
142-
Processes the agent's final result, extracts text content from the response,
143-
and adds it as an artifact to the task before marking the task as complete.
148+
Sends the final artifact chunk marker and marks the task as complete.
149+
If no data chunks were previously sent, includes the result content.
144150
145151
Args:
146152
result: The agent result object containing the final response, or None if no result.
147153
updater: The task updater for managing task state and adding the final artifact.
148154
"""
149-
if final_content := str(result):
155+
if self._is_first_chunk:
156+
final_content = str(result) if result else ""
157+
parts = [Part(root=TextPart(text=final_content))] if final_content else []
158+
await updater.add_artifact(
159+
parts,
160+
artifact_id=self._current_artifact_id,
161+
name="agent_response",
162+
last_chunk=True,
163+
)
164+
else:
150165
await updater.add_artifact(
151-
[Part(root=TextPart(text=final_content))],
166+
[],
167+
artifact_id=self._current_artifact_id,
152168
name="agent_response",
169+
append=True,
170+
last_chunk=True,
153171
)
154172
await updater.complete()
155173

tests/strands/multiagent/a2a/test_executor.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -639,11 +639,19 @@ async def test_handle_agent_result_with_none_result(mock_strands_agent, mock_req
639639
mock_updater.complete = AsyncMock()
640640
mock_updater.add_artifact = AsyncMock()
641641

642-
# Call _handle_agent_result with None
642+
# Call _handle_agent_result with None (simulating data chunks were sent)
643+
executor._current_artifact_id = "test-artifact-id"
644+
executor._is_first_chunk = False
643645
await executor._handle_agent_result(None, mock_updater)
644646

645-
# Verify completion was called
647+
# Verify completion was called and artifact with last_chunk was sent
646648
mock_updater.complete.assert_called_once()
649+
mock_updater.add_artifact.assert_called_once()
650+
651+
# Verify the artifact was called with empty parts and last_chunk=True
652+
call_args = mock_updater.add_artifact.call_args
653+
assert call_args[0][0] == [] # Empty parts
654+
assert call_args[1]["last_chunk"] is True
647655

648656

649657
@pytest.mark.asyncio
@@ -668,16 +676,24 @@ async def test_handle_agent_result_with_result_but_no_message(
668676
mock_result = MagicMock(spec=SAAgentResult)
669677
mock_result.message = None
670678

671-
# Call _handle_agent_result
679+
# Call _handle_agent_result (simulating data chunks were sent)
680+
executor._current_artifact_id = "test-artifact-id"
681+
executor._is_first_chunk = False
672682
await executor._handle_agent_result(mock_result, mock_updater)
673683

674-
# Verify completion was called
684+
# Verify completion was called and artifact with last_chunk was sent
675685
mock_updater.complete.assert_called_once()
686+
mock_updater.add_artifact.assert_called_once()
687+
688+
# Verify the artifact was called with empty parts and last_chunk=True
689+
call_args = mock_updater.add_artifact.call_args
690+
assert call_args[0][0] == [] # Empty parts
691+
assert call_args[1]["last_chunk"] is True
676692

677693

678694
@pytest.mark.asyncio
679695
async def test_handle_agent_result_with_content(mock_strands_agent):
680-
"""Test that _handle_agent_result handles result with content correctly."""
696+
"""Test that _handle_agent_result handles result with content correctly when chunks were sent."""
681697
executor = StrandsA2AExecutor(mock_strands_agent)
682698

683699
# Mock TaskUpdater
@@ -689,17 +705,21 @@ async def test_handle_agent_result_with_content(mock_strands_agent):
689705
mock_result = MagicMock(spec=SAAgentResult)
690706
mock_result.__str__ = MagicMock(return_value="Test response content")
691707

692-
# Call _handle_agent_result
708+
# Call _handle_agent_result (simulating data chunks were already sent)
709+
executor._current_artifact_id = "test-artifact-id"
710+
executor._is_first_chunk = False
693711
await executor._handle_agent_result(mock_result, mock_updater)
694712

695713
# Verify artifact was added and task completed
696714
mock_updater.add_artifact.assert_called_once()
697715
mock_updater.complete.assert_called_once()
698716

699-
# Check that the artifact contains the expected content
700-
call_args = mock_updater.add_artifact.call_args[0][0]
701-
assert len(call_args) == 1
702-
assert call_args[0].root.text == "Test response content"
717+
# Check that empty parts were sent with last_chunk=True (since chunks were already sent)
718+
call_args = mock_updater.add_artifact.call_args
719+
assert call_args[0][0] == [] # Empty parts
720+
assert call_args[1]["artifact_id"] == "test-artifact-id"
721+
assert call_args[1]["last_chunk"] is True
722+
assert call_args[1]["append"] is True
703723

704724

705725
def test_handle_conversion_error():

0 commit comments

Comments
 (0)