Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
516e4d2
FIX [Bug]#367: Client hangs when implementing AgentExecutor and awai…
meoow113 Aug 4, 2025
66526b9
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 4, 2025
a5446e1
fix: #367 add immediate parameter to EventQueue.close() for forced qu…
Aug 6, 2025
42e83cb
The main updates in this pull request are as follows:
Aug 6, 2025
fbdc76f
update format
Aug 6, 2025
fbe67be
reduce lock contention in EventQueue.clear_events
meoow113 Aug 7, 2025
2af22c3
linter code
meoow113 Aug 7, 2025
8c85025
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 7, 2025
c7516df
Merge branch 'main' into fix-bug#367-client_hangs
meoow113 Aug 8, 2025
94888c5
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 8, 2025
dc217a1
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 11, 2025
833c1b1
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 12, 2025
811ebc7
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 12, 2025
f5c751c
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 12, 2025
e7e558f
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 13, 2025
7119604
test: add comprehensive tests for EventQueue.close immediate paramete…
meoow113 Aug 14, 2025
e13b5d8
Merge branch 'a2aproject:main' into fix-bug#367-client_hangs
meoow113 Aug 14, 2025
a5234fc
Merge branch 'fix-bug#367-client_hangs' of https://github.com/meoow11…
meoow113 Aug 14, 2025
98cf92b
perf(clear_events): fix Ruff PERF203 by moving try/except outside the…
meoow113 Aug 14, 2025
671b881
perf(clear_events): fix Ruff PERF203 and add Py3.13 shutdown compatib…
meoow113 Aug 14, 2025
db58ecf
delete unrelevant files
meoow113 Aug 14, 2025
e5d41c2
lint code
meoow113 Aug 14, 2025
0e70bda
fix(clear_events): handle Python 3.13 QueueShutDown exception for myp…
meoow113 Aug 15, 2025
affc4ad
delete unrelevent file
meoow113 Aug 15, 2025
d3e95f2
Merge branch 'main' into fix-bug#367-client_hangs
meoow113 Aug 18, 2025
ee9e8e6
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 20, 2025
40e1d88
Merge branch 'main' into fix-bug#367-client_hangs
holtskinner Aug 20, 2025
057da04
Regenerate uv.lock and remove pre-commit from main dependencies
holtskinner Aug 20, 2025
4f5013a
Revert changes to uv.lock
holtskinner Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
# other part is waiting for an event or a closed queue.
if is_final_event:
logger.debug('Stopping event consumption in consume_all.')
await self.queue.close()
await self.queue.close(True)
yield event
break
yield event
Expand Down
79 changes: 71 additions & 8 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,38 @@ def tap(self) -> 'EventQueue':
self._children.append(queue)
return queue

async def close(self) -> None:
"""Closes the queue for future push events.
async def close(self, immediate: bool = False) -> None:
"""Closes the queue for future push events and also closes all child queues.

Once closed, no new events can be enqueued. For Python 3.13+, this will trigger
`asyncio.QueueShutDown` when the queue is empty and a consumer tries to dequeue.
For lower versions, the queue will be marked as closed and optionally cleared.

Args:
immediate (bool):
- True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
- False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.

Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
when the queue is empty. Also closes all child queues.
"""
logger.debug('Closing EventQueue.')
async with self._lock:
# If already closed, just return.
if self._is_closed:
if self._is_closed and not immediate:
return
self._is_closed = True
if not self._is_closed:
self._is_closed = True
# If using python 3.13 or higher, use the shutdown method
if sys.version_info >= (3, 13):
self.queue.shutdown()
self.queue.shutdown(immediate)
for child in self._children:
await child.close()
await child.close(immediate)
# Otherwise, join the queue
else:
if immediate:
await self.clear_events(True)
for child in self._children:
await child.close(immediate)
return
tasks = [asyncio.create_task(self.queue.join())]
tasks.extend(
asyncio.create_task(child.close()) for child in self._children
Expand All @@ -160,3 +173,53 @@ async def close(self) -> None:
def is_closed(self) -> bool:
"""Checks if the queue is closed."""
return self._is_closed

async def clear_events(self, clear_child_queues: bool = True) -> None:
"""Clears all events from the current queue and optionally all child queues.

This method removes all pending events from the queue without processing them.
Child queues can be optionally cleared based on the clear_child_queues parameter.

Args:
clear_child_queues: If True (default), clear all child queues as well.
If False, only clear the current queue, leaving child queues untouched.
"""
logger.debug('Clearing all events from EventQueue and child queues.')

# Clear all events from the queue, even if closed
cleared_count = 0
async with self._lock:
try:
while True:
event = self.queue.get_nowait()
logger.debug(
f'Discarding unprocessed event of type: {type(event)}, content: {event}'
)
self.queue.task_done()
cleared_count += 1
except asyncio.QueueEmpty:
pass
except Exception as e:
# Handle Python 3.13+ QueueShutDown
if (
sys.version_info >= (3, 13)
and type(e).__name__ == 'QueueShutDown'
):
pass
else:
raise

if cleared_count > 0:
logger.debug(
f'Cleared {cleared_count} unprocessed events from EventQueue.'
)

# Clear all child queues (lock released before awaiting child tasks)
if clear_child_queues and self._children:
child_tasks = [
asyncio.create_task(child.clear_events())
for child in self._children
]

if child_tasks:
await asyncio.gather(*child_tasks, return_exceptions=True)
127 changes: 127 additions & 0 deletions tests/server/events/test_event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,130 @@ async def test_is_closed_reflects_state(event_queue: EventQueue) -> None:
await event_queue.close()

assert event_queue.is_closed() is True # Closed after calling close()


@pytest.mark.asyncio
async def test_close_with_immediate_true(event_queue: EventQueue) -> None:
"""Test close with immediate=True clears events immediately."""
# Add some events to the queue
event1 = Message(**MESSAGE_PAYLOAD)
event2 = Task(**MINIMAL_TASK)
await event_queue.enqueue_event(event1)
await event_queue.enqueue_event(event2)

# Verify events are in queue
assert not event_queue.queue.empty()

# Close with immediate=True
await event_queue.close(immediate=True)

# Verify queue is closed and empty
assert event_queue.is_closed() is True
assert event_queue.queue.empty()


@pytest.mark.asyncio
async def test_close_immediate_propagates_to_children(
event_queue: EventQueue,
) -> None:
"""Test that immediate parameter is propagated to child queues."""

child_queue = event_queue.tap()

# Add events to both parent and child
event = Message(**MESSAGE_PAYLOAD)
await event_queue.enqueue_event(event)

assert child_queue.is_closed() is False
assert child_queue.queue.empty() is False

# close event queue
await event_queue.close(immediate=True)

# Verify child queue was called and empty with immediate=True
assert child_queue.is_closed() is True
assert child_queue.queue.empty()


@pytest.mark.asyncio
async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None:
"""Test clear_events clears only the current queue when clear_child_queues=False."""

child_queue = event_queue.tap()
event1 = Message(**MESSAGE_PAYLOAD)
event2 = Task(**MINIMAL_TASK)
await event_queue.enqueue_event(event1)
await event_queue.enqueue_event(event2)

# Clear only parent queue
await event_queue.clear_events(clear_child_queues=False)

# Verify parent queue is empty
assert event_queue.queue.empty()

# Verify child queue still has its event
assert not child_queue.queue.empty()
assert child_queue.is_closed() is False

dequeued_child_event = await child_queue.dequeue_event(no_wait=True)
assert dequeued_child_event == event1


@pytest.mark.asyncio
async def test_clear_events_with_children(event_queue: EventQueue) -> None:
"""Test clear_events clears both current queue and child queues."""

# Create child queues and add events
child_queue1 = event_queue.tap()
child_queue2 = event_queue.tap()

# Add events to parent queue
event1 = Message(**MESSAGE_PAYLOAD)
event2 = Task(**MINIMAL_TASK)
await event_queue.enqueue_event(event1)
await event_queue.enqueue_event(event2)

# Clear all queues
await event_queue.clear_events(clear_child_queues=True)

# Verify all queues are empty
assert event_queue.queue.empty()
assert child_queue1.queue.empty()
assert child_queue2.queue.empty()


@pytest.mark.asyncio
async def test_clear_events_empty_queue(event_queue: EventQueue) -> None:
"""Test clear_events works correctly with empty queue."""
# Verify queue is initially empty
assert event_queue.queue.empty()

# Clear events from empty queue
await event_queue.clear_events()

# Verify queue remains empty
assert event_queue.queue.empty()


@pytest.mark.asyncio
async def test_clear_events_closed_queue(event_queue: EventQueue) -> None:
"""Test clear_events works correctly with closed queue."""
# Add events and close queue

with patch('sys.version_info', (3, 12, 0)): # Simulate older Python
# Mock queue.join as it's called in older versions
event_queue.queue.join = AsyncMock()

event = Message(**MESSAGE_PAYLOAD)
await event_queue.enqueue_event(event)
await event_queue.close()

# Verify queue is closed but not empty
assert event_queue.is_closed() is True
assert not event_queue.queue.empty()

# Clear events from closed queue
await event_queue.clear_events()

# Verify queue is now empty
assert event_queue.queue.empty()
Loading