From 75a91ccfbaf1431b580e540d06bd3f9c3d9a0083 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 14 May 2025 12:00:59 -0700 Subject: [PATCH 1/2] fix: drift in BaseAgentExecutor and uses it in examples Signed-off-by: Adrian Cole --- .../calendar_agent/adk_agent_executor.py | 11 +---- examples/helloworld/agent_executor.py | 10 +---- examples/helloworld/test_client.py | 45 ------------------- examples/langgraph/agent_executor.py | 10 +---- src/a2a/server/agent_execution/__init__.py | 3 +- .../agent_execution/base_agent_executor.py | 42 ++--------------- 6 files changed, 12 insertions(+), 109 deletions(-) delete mode 100644 examples/helloworld/test_client.py diff --git a/examples/google_adk/calendar_agent/adk_agent_executor.py b/examples/google_adk/calendar_agent/adk_agent_executor.py index 9182227d..104403b6 100644 --- a/examples/google_adk/calendar_agent/adk_agent_executor.py +++ b/examples/google_adk/calendar_agent/adk_agent_executor.py @@ -10,8 +10,7 @@ from google.adk.events import Event from google.genai import types -from a2a.server.agent_execution import AgentExecutor -from a2a.server.agent_execution.context import RequestContext +from a2a.server.agent_execution import BaseAgentExecutor, RequestContext from a2a.server.events.event_queue import EventQueue from a2a.server.tasks import TaskUpdater from a2a.types import ( @@ -22,9 +21,7 @@ Part, TaskState, TextPart, - UnsupportedOperationError, ) -from a2a.utils.errors import ServerError from a2a.utils.message import new_agent_text_message @@ -40,7 +37,7 @@ auth_receive_timeout_seconds = 60 -class ADKAgentExecutor(AgentExecutor): +class ADKAgentExecutor(BaseAgentExecutor): """An AgentExecutor that runs an ADK-based Agent.""" _awaiting_auth: dict[str, asyncio.Future] @@ -221,10 +218,6 @@ async def execute( ) logger.debug('[Calendar] execute exiting') - async def cancel(self, context: RequestContext, event_queue: EventQueue): - # Ideally: kill any ongoing tasks. - raise ServerError(error=UnsupportedOperationError()) - async def on_auth_callback(self, state: str, uri: str): self._awaiting_auth[state].set_result(uri) diff --git a/examples/helloworld/agent_executor.py b/examples/helloworld/agent_executor.py index ed13dbc7..51aa95ea 100644 --- a/examples/helloworld/agent_executor.py +++ b/examples/helloworld/agent_executor.py @@ -1,6 +1,6 @@ from typing_extensions import override -from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.agent_execution import BaseAgentExecutor, RequestContext from a2a.server.events import EventQueue from a2a.utils import new_agent_text_message @@ -12,7 +12,7 @@ async def invoke(self) -> str: return 'Hello World' -class HelloWorldAgentExecutor(AgentExecutor): +class HelloWorldAgentExecutor(BaseAgentExecutor): """Test AgentProxy Implementation.""" def __init__(self): @@ -26,9 +26,3 @@ async def execute( ) -> None: result = await self.agent.invoke() event_queue.enqueue_event(new_agent_text_message(result)) - - @override - async def cancel( - self, context: RequestContext, event_queue: EventQueue - ) -> None: - raise Exception('cancel not supported') diff --git a/examples/helloworld/test_client.py b/examples/helloworld/test_client.py deleted file mode 100644 index 0ade8d2a..00000000 --- a/examples/helloworld/test_client.py +++ /dev/null @@ -1,45 +0,0 @@ -from a2a.client import A2AClient -from typing import Any -import httpx -from uuid import uuid4 -from a2a.types import ( - SendMessageRequest, - MessageSendParams, - SendStreamingMessageRequest, -) - - -async def main() -> None: - async with httpx.AsyncClient() as httpx_client: - client = await A2AClient.get_client_from_agent_card_url( - httpx_client, 'http://localhost:9999' - ) - send_message_payload: dict[str, Any] = { - 'message': { - 'role': 'user', - 'parts': [ - {'type': 'text', 'text': 'how much is 10 USD in INR?'} - ], - 'messageId': uuid4().hex, - }, - } - request = SendMessageRequest( - params=MessageSendParams(**send_message_payload) - ) - - response = await client.send_message(request) - print(response.model_dump(mode='json', exclude_none=True)) - - streaming_request = SendStreamingMessageRequest( - params=MessageSendParams(**send_message_payload) - ) - - stream_response = client.send_message_streaming(streaming_request) - async for chunk in stream_response: - print(chunk.model_dump(mode='json', exclude_none=True)) - - -if __name__ == '__main__': - import asyncio - - asyncio.run(main()) diff --git a/examples/langgraph/agent_executor.py b/examples/langgraph/agent_executor.py index 5e2ed77a..431b0801 100644 --- a/examples/langgraph/agent_executor.py +++ b/examples/langgraph/agent_executor.py @@ -1,7 +1,7 @@ from agent import CurrencyAgent from typing_extensions import override -from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.agent_execution import BaseAgentExecutor, RequestContext from a2a.server.events.event_queue import EventQueue from a2a.types import ( TaskArtifactUpdateEvent, @@ -12,7 +12,7 @@ from a2a.utils import new_agent_text_message, new_task, new_text_artifact -class CurrencyAgentExecutor(AgentExecutor): +class CurrencyAgentExecutor(BaseAgentExecutor): """Currency AgentExecutor Example.""" def __init__(self): @@ -89,9 +89,3 @@ async def execute( taskId=task.id, ) ) - - @override - async def cancel( - self, context: RequestContext, event_queue: EventQueue - ) -> None: - raise Exception('cancel not supported') diff --git a/src/a2a/server/agent_execution/__init__.py b/src/a2a/server/agent_execution/__init__.py index 88660d62..2c2f4c2d 100644 --- a/src/a2a/server/agent_execution/__init__.py +++ b/src/a2a/server/agent_execution/__init__.py @@ -1,5 +1,6 @@ from a2a.server.agent_execution.agent_executor import AgentExecutor +from a2a.server.agent_execution.base_agent_executor import BaseAgentExecutor from a2a.server.agent_execution.context import RequestContext -__all__ = ['AgentExecutor', 'RequestContext'] +__all__ = ['AgentExecutor', 'BaseAgentExecutor', 'RequestContext'] diff --git a/src/a2a/server/agent_execution/base_agent_executor.py b/src/a2a/server/agent_execution/base_agent_executor.py index 13f228b1..2ffcc52e 100644 --- a/src/a2a/server/agent_execution/base_agent_executor.py +++ b/src/a2a/server/agent_execution/base_agent_executor.py @@ -1,48 +1,14 @@ from a2a.server.agent_execution.agent_executor import AgentExecutor +from a2a.server.agent_execution.context import RequestContext from a2a.server.events.event_queue import EventQueue -from a2a.types import ( - A2AError, - CancelTaskRequest, - SendMessageRequest, - SendStreamingMessageRequest, - Task, - TaskResubscriptionRequest, - UnsupportedOperationError, -) +from a2a.types import A2AError, UnsupportedOperationError class BaseAgentExecutor(AgentExecutor): """Base AgentExecutor which returns unsupported operation error.""" - async def on_message_send( - self, - request: SendMessageRequest, - event_queue: EventQueue, - task: Task | None, - ) -> None: - """Handler for 'message/send' requests.""" + async def execute(self, context: RequestContext, event_queue: EventQueue): event_queue.enqueue_event(A2AError(UnsupportedOperationError())) - async def on_message_stream( - self, - request: SendStreamingMessageRequest, - event_queue: EventQueue, - task: Task | None, - ) -> None: - """Handler for 'message/stream' requests.""" - event_queue.enqueue_event(A2AError(UnsupportedOperationError())) - - async def on_cancel( - self, request: CancelTaskRequest, event_queue: EventQueue, task: Task - ) -> None: - """Handler for 'tasks/cancel' requests.""" - event_queue.enqueue_event(A2AError(UnsupportedOperationError())) - - async def on_resubscribe( - self, - request: TaskResubscriptionRequest, - event_queue: EventQueue, - task: Task, - ) -> None: - """Handler for 'tasks/resubscribe' requests.""" + async def cancel(self, context: RequestContext, event_queue: EventQueue): event_queue.enqueue_event(A2AError(UnsupportedOperationError())) From 88b9507753f3df92d34fb56258d61d8e3ddd8335 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 14 May 2025 12:28:45 -0700 Subject: [PATCH 2/2] revert Signed-off-by: Adrian Cole --- examples/helloworld/test_client.py | 45 ++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 examples/helloworld/test_client.py diff --git a/examples/helloworld/test_client.py b/examples/helloworld/test_client.py new file mode 100644 index 00000000..0ade8d2a --- /dev/null +++ b/examples/helloworld/test_client.py @@ -0,0 +1,45 @@ +from a2a.client import A2AClient +from typing import Any +import httpx +from uuid import uuid4 +from a2a.types import ( + SendMessageRequest, + MessageSendParams, + SendStreamingMessageRequest, +) + + +async def main() -> None: + async with httpx.AsyncClient() as httpx_client: + client = await A2AClient.get_client_from_agent_card_url( + httpx_client, 'http://localhost:9999' + ) + send_message_payload: dict[str, Any] = { + 'message': { + 'role': 'user', + 'parts': [ + {'type': 'text', 'text': 'how much is 10 USD in INR?'} + ], + 'messageId': uuid4().hex, + }, + } + request = SendMessageRequest( + params=MessageSendParams(**send_message_payload) + ) + + response = await client.send_message(request) + print(response.model_dump(mode='json', exclude_none=True)) + + streaming_request = SendStreamingMessageRequest( + params=MessageSendParams(**send_message_payload) + ) + + stream_response = client.send_message_streaming(streaming_request) + async for chunk in stream_response: + print(chunk.model_dump(mode='json', exclude_none=True)) + + +if __name__ == '__main__': + import asyncio + + asyncio.run(main())