Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions src/strands/multiagent/a2a/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import json
import logging
import mimetypes
import uuid
from typing import Any, Literal

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import DataPart, FilePart, InternalError, Part, TaskState, TextPart, UnsupportedOperationError
from a2a.utils import new_agent_text_message, new_task
from a2a.types import DataPart, FilePart, InternalError, Part, TextPart, UnsupportedOperationError
from a2a.utils import new_task
from a2a.utils.errors import ServerError

from ...agent.agent import Agent as SAAgent
Expand Down Expand Up @@ -104,12 +105,18 @@ async def _execute_streaming(self, context: RequestContext, updater: TaskUpdater
else:
raise ValueError("No content blocks available")

self._current_artifact_id = str(uuid.uuid4())
self._is_first_chunk = True

try:
async for event in self.agent.stream_async(content_blocks):
await self._handle_streaming_event(event, updater)
except Exception:
logger.exception("Error in streaming execution")
raise
finally:
self._current_artifact_id = None
self._is_first_chunk = True

async def _handle_streaming_event(self, event: dict[str, Any], updater: TaskUpdater) -> None:
"""Handle a single streaming event from the Strands Agent.
Expand All @@ -125,31 +132,42 @@ async def _handle_streaming_event(self, event: dict[str, Any], updater: TaskUpda
logger.debug("Streaming event: %s", event)
if "data" in event:
if text_content := event["data"]:
await updater.update_status(
TaskState.working,
new_agent_text_message(
text_content,
updater.context_id,
updater.task_id,
),
await updater.add_artifact(
[Part(root=TextPart(text=text_content))],
artifact_id=self._current_artifact_id,
name="agent_response",
append=not self._is_first_chunk,
)
self._is_first_chunk = False
elif "result" in event:
await self._handle_agent_result(event["result"], updater)

async def _handle_agent_result(self, result: SAAgentResult | None, updater: TaskUpdater) -> None:
"""Handle the final result from the Strands Agent.

Processes the agent's final result, extracts text content from the response,
and adds it as an artifact to the task before marking the task as complete.
Sends the final artifact chunk marker and marks the task as complete.
If no data chunks were previously sent, includes the result content.

Args:
result: The agent result object containing the final response, or None if no result.
updater: The task updater for managing task state and adding the final artifact.
"""
if final_content := str(result):
if self._is_first_chunk:
final_content = str(result) if result else ""
parts = [Part(root=TextPart(text=final_content))] if final_content else []
await updater.add_artifact(
parts,
artifact_id=self._current_artifact_id,
name="agent_response",
last_chunk=True,
)
else:
await updater.add_artifact(
[Part(root=TextPart(text=final_content))],
[],
artifact_id=self._current_artifact_id,
name="agent_response",
append=True,
last_chunk=True,
)
await updater.complete()

Expand Down
40 changes: 30 additions & 10 deletions tests/strands/multiagent/a2a/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,11 +639,19 @@ async def test_handle_agent_result_with_none_result(mock_strands_agent, mock_req
mock_updater.complete = AsyncMock()
mock_updater.add_artifact = AsyncMock()

# Call _handle_agent_result with None
# Call _handle_agent_result with None (simulating data chunks were sent)
executor._current_artifact_id = "test-artifact-id"
executor._is_first_chunk = False
await executor._handle_agent_result(None, mock_updater)

# Verify completion was called
# Verify completion was called and artifact with last_chunk was sent
mock_updater.complete.assert_called_once()
mock_updater.add_artifact.assert_called_once()

# Verify the artifact was called with empty parts and last_chunk=True
call_args = mock_updater.add_artifact.call_args
assert call_args[0][0] == [] # Empty parts
assert call_args[1]["last_chunk"] is True


@pytest.mark.asyncio
Expand All @@ -668,16 +676,24 @@ async def test_handle_agent_result_with_result_but_no_message(
mock_result = MagicMock(spec=SAAgentResult)
mock_result.message = None

# Call _handle_agent_result
# Call _handle_agent_result (simulating data chunks were sent)
executor._current_artifact_id = "test-artifact-id"
executor._is_first_chunk = False
await executor._handle_agent_result(mock_result, mock_updater)

# Verify completion was called
# Verify completion was called and artifact with last_chunk was sent
mock_updater.complete.assert_called_once()
mock_updater.add_artifact.assert_called_once()

# Verify the artifact was called with empty parts and last_chunk=True
call_args = mock_updater.add_artifact.call_args
assert call_args[0][0] == [] # Empty parts
assert call_args[1]["last_chunk"] is True


@pytest.mark.asyncio
async def test_handle_agent_result_with_content(mock_strands_agent):
"""Test that _handle_agent_result handles result with content correctly."""
"""Test that _handle_agent_result handles result with content correctly when chunks were sent."""
executor = StrandsA2AExecutor(mock_strands_agent)

# Mock TaskUpdater
Expand All @@ -689,17 +705,21 @@ async def test_handle_agent_result_with_content(mock_strands_agent):
mock_result = MagicMock(spec=SAAgentResult)
mock_result.__str__ = MagicMock(return_value="Test response content")

# Call _handle_agent_result
# Call _handle_agent_result (simulating data chunks were already sent)
executor._current_artifact_id = "test-artifact-id"
executor._is_first_chunk = False
await executor._handle_agent_result(mock_result, mock_updater)

# Verify artifact was added and task completed
mock_updater.add_artifact.assert_called_once()
mock_updater.complete.assert_called_once()

# Check that the artifact contains the expected content
call_args = mock_updater.add_artifact.call_args[0][0]
assert len(call_args) == 1
assert call_args[0].root.text == "Test response content"
# Check that empty parts were sent with last_chunk=True (since chunks were already sent)
call_args = mock_updater.add_artifact.call_args
assert call_args[0][0] == [] # Empty parts
assert call_args[1]["artifact_id"] == "test-artifact-id"
assert call_args[1]["last_chunk"] is True
assert call_args[1]["append"] is True


def test_handle_conversion_error():
Expand Down