diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 200e8019..7636df08 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -125,7 +125,7 @@ async def consume_all(self) -> AsyncGenerator[Event]: # other part is waiting for an event or a closed queue. if is_final_event: logger.debug('Stopping event consumption in consume_all.') - await self.queue.close() + await self.queue.close(True) yield event break yield event diff --git a/src/a2a/server/events/event_queue.py b/src/a2a/server/events/event_queue.py index bcb02424..6eecdc0d 100644 --- a/src/a2a/server/events/event_queue.py +++ b/src/a2a/server/events/event_queue.py @@ -132,25 +132,38 @@ def tap(self) -> 'EventQueue': self._children.append(queue) return queue - async def close(self) -> None: - """Closes the queue for future push events. + async def close(self, immediate: bool = False) -> None: + """Closes the queue for future push events and also closes all child queues. + + Once closed, no new events can be enqueued. For Python 3.13+, this will trigger + `asyncio.QueueShutDown` when the queue is empty and a consumer tries to dequeue. + For lower versions, the queue will be marked as closed and optionally cleared. + + Args: + immediate (bool): + - True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources. + - False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled. - Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown` - when the queue is empty. Also closes all child queues. """ logger.debug('Closing EventQueue.') async with self._lock: # If already closed, just return. - if self._is_closed: + if self._is_closed and not immediate: return - self._is_closed = True + if not self._is_closed: + self._is_closed = True # If using python 3.13 or higher, use the shutdown method if sys.version_info >= (3, 13): - self.queue.shutdown() + self.queue.shutdown(immediate) for child in self._children: - await child.close() + await child.close(immediate) # Otherwise, join the queue else: + if immediate: + await self.clear_events(True) + for child in self._children: + await child.close(immediate) + return tasks = [asyncio.create_task(self.queue.join())] tasks.extend( asyncio.create_task(child.close()) for child in self._children @@ -160,3 +173,53 @@ async def close(self) -> None: def is_closed(self) -> bool: """Checks if the queue is closed.""" return self._is_closed + + async def clear_events(self, clear_child_queues: bool = True) -> None: + """Clears all events from the current queue and optionally all child queues. + + This method removes all pending events from the queue without processing them. + Child queues can be optionally cleared based on the clear_child_queues parameter. + + Args: + clear_child_queues: If True (default), clear all child queues as well. + If False, only clear the current queue, leaving child queues untouched. + """ + logger.debug('Clearing all events from EventQueue and child queues.') + + # Clear all events from the queue, even if closed + cleared_count = 0 + async with self._lock: + try: + while True: + event = self.queue.get_nowait() + logger.debug( + f'Discarding unprocessed event of type: {type(event)}, content: {event}' + ) + self.queue.task_done() + cleared_count += 1 + except asyncio.QueueEmpty: + pass + except Exception as e: + # Handle Python 3.13+ QueueShutDown + if ( + sys.version_info >= (3, 13) + and type(e).__name__ == 'QueueShutDown' + ): + pass + else: + raise + + if cleared_count > 0: + logger.debug( + f'Cleared {cleared_count} unprocessed events from EventQueue.' + ) + + # Clear all child queues (lock released before awaiting child tasks) + if clear_child_queues and self._children: + child_tasks = [ + asyncio.create_task(child.clear_events()) + for child in self._children + ] + + if child_tasks: + await asyncio.gather(*child_tasks, return_exceptions=True) diff --git a/tests/server/events/test_event_queue.py b/tests/server/events/test_event_queue.py index 7befcd39..fc139ecc 100644 --- a/tests/server/events/test_event_queue.py +++ b/tests/server/events/test_event_queue.py @@ -364,3 +364,130 @@ async def test_is_closed_reflects_state(event_queue: EventQueue) -> None: await event_queue.close() assert event_queue.is_closed() is True # Closed after calling close() + + +@pytest.mark.asyncio +async def test_close_with_immediate_true(event_queue: EventQueue) -> None: + """Test close with immediate=True clears events immediately.""" + # Add some events to the queue + event1 = Message(**MESSAGE_PAYLOAD) + event2 = Task(**MINIMAL_TASK) + await event_queue.enqueue_event(event1) + await event_queue.enqueue_event(event2) + + # Verify events are in queue + assert not event_queue.queue.empty() + + # Close with immediate=True + await event_queue.close(immediate=True) + + # Verify queue is closed and empty + assert event_queue.is_closed() is True + assert event_queue.queue.empty() + + +@pytest.mark.asyncio +async def test_close_immediate_propagates_to_children( + event_queue: EventQueue, +) -> None: + """Test that immediate parameter is propagated to child queues.""" + + child_queue = event_queue.tap() + + # Add events to both parent and child + event = Message(**MESSAGE_PAYLOAD) + await event_queue.enqueue_event(event) + + assert child_queue.is_closed() is False + assert child_queue.queue.empty() is False + + # close event queue + await event_queue.close(immediate=True) + + # Verify child queue was called and empty with immediate=True + assert child_queue.is_closed() is True + assert child_queue.queue.empty() + + +@pytest.mark.asyncio +async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None: + """Test clear_events clears only the current queue when clear_child_queues=False.""" + + child_queue = event_queue.tap() + event1 = Message(**MESSAGE_PAYLOAD) + event2 = Task(**MINIMAL_TASK) + await event_queue.enqueue_event(event1) + await event_queue.enqueue_event(event2) + + # Clear only parent queue + await event_queue.clear_events(clear_child_queues=False) + + # Verify parent queue is empty + assert event_queue.queue.empty() + + # Verify child queue still has its event + assert not child_queue.queue.empty() + assert child_queue.is_closed() is False + + dequeued_child_event = await child_queue.dequeue_event(no_wait=True) + assert dequeued_child_event == event1 + + +@pytest.mark.asyncio +async def test_clear_events_with_children(event_queue: EventQueue) -> None: + """Test clear_events clears both current queue and child queues.""" + + # Create child queues and add events + child_queue1 = event_queue.tap() + child_queue2 = event_queue.tap() + + # Add events to parent queue + event1 = Message(**MESSAGE_PAYLOAD) + event2 = Task(**MINIMAL_TASK) + await event_queue.enqueue_event(event1) + await event_queue.enqueue_event(event2) + + # Clear all queues + await event_queue.clear_events(clear_child_queues=True) + + # Verify all queues are empty + assert event_queue.queue.empty() + assert child_queue1.queue.empty() + assert child_queue2.queue.empty() + + +@pytest.mark.asyncio +async def test_clear_events_empty_queue(event_queue: EventQueue) -> None: + """Test clear_events works correctly with empty queue.""" + # Verify queue is initially empty + assert event_queue.queue.empty() + + # Clear events from empty queue + await event_queue.clear_events() + + # Verify queue remains empty + assert event_queue.queue.empty() + + +@pytest.mark.asyncio +async def test_clear_events_closed_queue(event_queue: EventQueue) -> None: + """Test clear_events works correctly with closed queue.""" + # Add events and close queue + + with patch('sys.version_info', (3, 12, 0)): # Simulate older Python + # Mock queue.join as it's called in older versions + event_queue.queue.join = AsyncMock() + + event = Message(**MESSAGE_PAYLOAD) + await event_queue.enqueue_event(event) + await event_queue.close() + + # Verify queue is closed but not empty + assert event_queue.is_closed() is True + assert not event_queue.queue.empty() + + # Clear events from closed queue + await event_queue.clear_events() + + # Verify queue is now empty + assert event_queue.queue.empty()