Skip to content

Commit a17ad34

Browse files
committed
fix: non-blocking send_message server handler not invoke push notification
- Problem: When client use send_message with MessageSendConfiguration.blocking=False, the result_aggregator will enter the logic of _continue_consuming. But it's not push notification to client. The client can't get notification for long-running task(non-blocking invoke) at this situation. - Solution: Simply add push notification logic to result_aggregator is okay.
1 parent 1d8f92e commit a17ad34

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,10 @@ async def _setup_message_execution(
231231
)
232232

233233
queue = await self._queue_manager.create_or_tap(task_id)
234-
result_aggregator = ResultAggregator(task_manager)
234+
result_aggregator = ResultAggregator(
235+
task_manager,
236+
push_sender=self._push_sender,
237+
)
235238
# TODO: to manage the non-blocking flows.
236239
producer_task = asyncio.create_task(
237240
self._run_event_stream(request_context, queue)

src/a2a/server/tasks/result_aggregator.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from collections.abc import AsyncGenerator, AsyncIterator
55

66
from a2a.server.events import Event, EventConsumer
7+
from a2a.server.tasks.push_notification_sender import PushNotificationSender
78
from a2a.server.tasks.task_manager import TaskManager
89
from a2a.types import Message, Task, TaskState, TaskStatusUpdateEvent
910

@@ -24,14 +25,20 @@ class ResultAggregator:
2425
Task object and emit that Task object.
2526
"""
2627

27-
def __init__(self, task_manager: TaskManager):
28+
def __init__(
29+
self,
30+
task_manager: TaskManager,
31+
push_sender: PushNotificationSender | None = None,
32+
) -> None:
2833
"""Initializes the ResultAggregator.
2934
3035
Args:
3136
task_manager: The `TaskManager` instance to use for processing events
3237
and managing the task state.
38+
push_sender: The `PushNotificationSender` instance to use for sending push notifications.
3339
"""
3440
self.task_manager = task_manager
41+
self.push_sender = push_sender
3542
self._message: Message | None = None
3643

3744
@property
@@ -168,3 +175,11 @@ async def _continue_consuming(
168175
"""
169176
async for event in event_stream:
170177
await self.task_manager.process(event)
178+
await self._send_push_notification_if_needed()
179+
180+
async def _send_push_notification_if_needed(self) -> None:
181+
"""Sends push notification if configured and task is available."""
182+
if self.push_sender:
183+
latest_task = await self.current_result
184+
if isinstance(latest_task, Task):
185+
await self.push_sender.send_notification(latest_task)

0 commit comments

Comments
 (0)