From dfd6e6c132889e9adcd2456d02ee313a51bcaf81 Mon Sep 17 00:00:00 2001 From: Krishna Thota Date: Wed, 14 May 2025 21:35:41 -0700 Subject: [PATCH 1/4] fix: Fix mypy errors and enable streaming for hw example --- examples/helloworld/__main__.py | 2 +- src/a2a/server/agent_execution/context.py | 2 +- src/a2a/server/events/in_memory_queue_manager.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/helloworld/__main__.py b/examples/helloworld/__main__.py index ac6cca7d..ba06207b 100644 --- a/examples/helloworld/__main__.py +++ b/examples/helloworld/__main__.py @@ -27,7 +27,7 @@ version='1.0.0', defaultInputModes=['text'], defaultOutputModes=['text'], - capabilities=AgentCapabilities(), + capabilities=AgentCapabilities(streaming=True), skills=[skill], authentication=AgentAuthentication(schemes=['public']), ) diff --git a/src/a2a/server/agent_execution/context.py b/src/a2a/server/agent_execution/context.py index 3c61dc6b..870c5f8e 100644 --- a/src/a2a/server/agent_execution/context.py +++ b/src/a2a/server/agent_execution/context.py @@ -19,7 +19,7 @@ def __init__( task_id: str | None = None, context_id: str | None = None, task: Task | None = None, - related_tasks: list[Task] = None, + related_tasks: list[Task] | None = None, ): if related_tasks is None: related_tasks = [] diff --git a/src/a2a/server/events/in_memory_queue_manager.py b/src/a2a/server/events/in_memory_queue_manager.py index a0d95f8e..9d4a135b 100644 --- a/src/a2a/server/events/in_memory_queue_manager.py +++ b/src/a2a/server/events/in_memory_queue_manager.py @@ -18,7 +18,7 @@ class InMemoryQueueManager(QueueManager): true scalable deployment. """ - def __init__(self): + def __init__(self) -> None: self._task_queue: dict[str, EventQueue] = {} self._lock = asyncio.Lock() From a20df2caceb4819b21ed7384c711637b68cf8e76 Mon Sep 17 00:00:00 2001 From: Krishna Thota Date: Wed, 14 May 2025 22:40:27 -0700 Subject: [PATCH 2/4] fix:Handle propagating agent exceptions --- examples/helloworld/agent_executor.py | 1 + .../agent_execution/base_agent_executor.py | 48 ------------------- src/a2a/server/events/event_consumer.py | 17 ++++++- .../default_request_handler.py | 2 + 4 files changed, 19 insertions(+), 49 deletions(-) delete mode 100644 src/a2a/server/agent_execution/base_agent_executor.py diff --git a/examples/helloworld/agent_executor.py b/examples/helloworld/agent_executor.py index ed13dbc7..6f21c79c 100644 --- a/examples/helloworld/agent_executor.py +++ b/examples/helloworld/agent_executor.py @@ -24,6 +24,7 @@ async def execute( context: RequestContext, event_queue: EventQueue, ) -> None: + raise Exception('cancel not supported') result = await self.agent.invoke() event_queue.enqueue_event(new_agent_text_message(result)) diff --git a/src/a2a/server/agent_execution/base_agent_executor.py b/src/a2a/server/agent_execution/base_agent_executor.py deleted file mode 100644 index 13f228b1..00000000 --- a/src/a2a/server/agent_execution/base_agent_executor.py +++ /dev/null @@ -1,48 +0,0 @@ -from a2a.server.agent_execution.agent_executor import AgentExecutor -from a2a.server.events.event_queue import EventQueue -from a2a.types import ( - A2AError, - CancelTaskRequest, - SendMessageRequest, - SendStreamingMessageRequest, - Task, - TaskResubscriptionRequest, - UnsupportedOperationError, -) - - -class BaseAgentExecutor(AgentExecutor): - """Base AgentExecutor which returns unsupported operation error.""" - - async def on_message_send( - self, - request: SendMessageRequest, - event_queue: EventQueue, - task: Task | None, - ) -> None: - """Handler for 'message/send' requests.""" - event_queue.enqueue_event(A2AError(UnsupportedOperationError())) - - async def on_message_stream( - self, - request: SendStreamingMessageRequest, - event_queue: EventQueue, - task: Task | None, - ) -> None: - """Handler for 'message/stream' requests.""" - event_queue.enqueue_event(A2AError(UnsupportedOperationError())) - - async def on_cancel( - self, request: CancelTaskRequest, event_queue: EventQueue, task: Task - ) -> None: - """Handler for 'tasks/cancel' requests.""" - event_queue.enqueue_event(A2AError(UnsupportedOperationError())) - - async def on_resubscribe( - self, - request: TaskResubscriptionRequest, - event_queue: EventQueue, - task: Task, - ) -> None: - """Handler for 'tasks/resubscribe' requests.""" - event_queue.enqueue_event(A2AError(UnsupportedOperationError())) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index dcfa9d98..8049cf94 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -22,6 +22,8 @@ class EventConsumer: def __init__(self, queue: EventQueue): self.queue = queue + self._timeout = 0.5 + self._exception: BaseException | None = None logger.debug('EventConsumer initialized') async def consume_one(self) -> Event: @@ -45,8 +47,10 @@ async def consume_all(self) -> AsyncGenerator[Event]: """Consume all the generated streaming events from the agent.""" logger.debug('Starting to consume all events from the queue.') while True: + if self._exception: + raise self._exception try: - event = await self.queue.dequeue_event() + event = await asyncio.wait_for(self.queue.dequeue_event(), timeout=self._timeout) logger.debug( f'Dequeued event of type: {type(event)} in consume_all.' ) @@ -74,5 +78,16 @@ async def consume_all(self) -> AsyncGenerator[Event]: logger.debug('Stopping event consumption in consume_all.') self.queue.close() break + except asyncio.TimeoutError: + # continue polling until there is a final event + continue except asyncio.QueueShutDown: break + + + + + + def agent_task_callback(self, agent_task: asyncio.Task[None]): + if agent_task.exception() is not None: + self._exception = agent_task.exception() \ No newline at end of file diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index f656e414..4107cdf2 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -138,6 +138,7 @@ async def on_message_send( await self._register_producer(task_id, producer_task) consumer = EventConsumer(queue) + producer_task.add_done_callback(consumer.agent_task_callback) interrupted = False try: @@ -192,6 +193,7 @@ async def on_message_send_stream( try: consumer = EventConsumer(queue) + producer_task.add_done_callback(consumer.agent_task_callback) async for event in result_aggregator.consume_and_emit(consumer): # Now we know we have a Task, register the queue if isinstance(event, Task): From 072ca6ee715764c44f0863fa8ce6ee73be7851eb Mon Sep 17 00:00:00 2001 From: Krishna Thota Date: Wed, 14 May 2025 22:46:15 -0700 Subject: [PATCH 3/4] fix:Handle propagating agent exceptions --- examples/helloworld/agent_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/helloworld/agent_executor.py b/examples/helloworld/agent_executor.py index 6f21c79c..ed13dbc7 100644 --- a/examples/helloworld/agent_executor.py +++ b/examples/helloworld/agent_executor.py @@ -24,7 +24,6 @@ async def execute( context: RequestContext, event_queue: EventQueue, ) -> None: - raise Exception('cancel not supported') result = await self.agent.invoke() event_queue.enqueue_event(new_agent_text_message(result)) From 7f0c3d30e8739c4af952492e8e23f2e9024793e8 Mon Sep 17 00:00:00 2001 From: Krishna Thota Date: Thu, 15 May 2025 09:35:24 -0700 Subject: [PATCH 4/4] Add comment for queue read timeout --- src/a2a/server/events/event_consumer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 8049cf94..e1109402 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -50,6 +50,11 @@ async def consume_all(self) -> AsyncGenerator[Event]: if self._exception: raise self._exception try: + # We use a timeout when waiting for an event from the queue. + # This is required because it allows the loop to check if + # `self._exception` has been set by the `agent_task_callback`. + # Without the timeout, loop might hang indefinitely if no events are + # enqueued by the agent and the agent simply threw an exception event = await asyncio.wait_for(self.queue.dequeue_event(), timeout=self._timeout) logger.debug( f'Dequeued event of type: {type(event)} in consume_all.'