diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 5f86cc81..8ed943e2 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: - python-version: ["3.13"] + python-version: ["3.10", "3.13"] steps: - name: Checkout code diff --git a/.ruff.toml b/.ruff.toml index 1345c850..f4baf437 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -9,7 +9,7 @@ line-length = 80 # Google Style Guide §3.2: 80 columns indent-width = 4 # Google Style Guide §3.4: 4 spaces -target-version = "py313" # Minimum Python version +target-version = "py310" # Minimum Python version [lint] ignore = [ diff --git a/noxfile.py b/noxfile.py index d731b6e1..e541b2bb 100644 --- a/noxfile.py +++ b/noxfile.py @@ -23,7 +23,7 @@ import nox -DEFAULT_PYTHON_VERSION = '3.13' +DEFAULT_PYTHON_VERSION = '3.10' CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() @@ -127,7 +127,7 @@ def format(session): session.run( 'pyupgrade', '--exit-zero-even-if-changed', - '--py313-plus', + '--py310-plus', *lint_paths_py, ) session.run( diff --git a/pyproject.toml b/pyproject.toml index eda65828..9bb3814a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "A2A Python SDK" readme = "README.md" license = { file = "LICENSE" } authors = [{ name = "Google LLC", email = "googleapis-packages@google.com" }] -requires-python = ">=3.13" +requires-python = ">=3.10" keywords = ["A2A", "A2A SDK", "A2A Protocol", "Agent2Agent"] dependencies = [ "httpx>=0.28.1", @@ -22,6 +22,9 @@ classifiers = [ "Intended Audience :: Developers", "Programming Language :: Python", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Operating System :: OS Independent", "Topic :: Software Development :: Libraries :: Python Modules", diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index d963adf3..6fc91856 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -1,5 +1,6 @@ import asyncio import logging +import sys from collections.abc import AsyncGenerator @@ -15,6 +16,13 @@ from a2a.utils.telemetry import SpanKind, trace_class +# This is an alias to the exception for closed queue +QueueClosed = asyncio.QueueEmpty + +# When using python 3.13 or higher, the closed queue signal is QueueShutdown +if sys.version_info >= (3, 13): + QueueClosed = asyncio.QueueShutDown + logger = logging.getLogger(__name__) @@ -111,13 +119,16 @@ async def consume_all(self) -> AsyncGenerator[Event]: if is_final_event: logger.debug('Stopping event consumption in consume_all.') - self.queue.close() + await self.queue.close() break except TimeoutError: # continue polling until there is a final event continue - except asyncio.QueueShutDown: - break + except QueueClosed: + # Confirm that the queue is closed, e.g. we aren't on + # python 3.12 and get a queue empty error on an open queue + if self.queue.is_closed(): + break def agent_task_callback(self, agent_task: asyncio.Task[None]): """Callback to handle exceptions from the agent's execution task. diff --git a/src/a2a/server/events/event_queue.py b/src/a2a/server/events/event_queue.py index 10c9ec0c..555c353a 100644 --- a/src/a2a/server/events/event_queue.py +++ b/src/a2a/server/events/event_queue.py @@ -1,5 +1,6 @@ import asyncio import logging +import sys from a2a.types import ( A2AError, @@ -39,6 +40,8 @@ def __init__(self) -> None: """Initializes the EventQueue.""" self.queue: asyncio.Queue[Event] = asyncio.Queue() self._children: list[EventQueue] = [] + self._is_closed = False + self._lock = asyncio.Lock() logger.debug('EventQueue initialized.') def enqueue_event(self, event: Event): @@ -47,6 +50,9 @@ def enqueue_event(self, event: Event): Args: event: The event object to enqueue. """ + if self._is_closed: + logger.warning('Queue is closed. Event will not be enqueued.') + return logger.debug(f'Enqueuing event of type: {type(event)}') self.queue.put_nowait(event) for child in self._children: @@ -55,6 +61,20 @@ def enqueue_event(self, event: Event): async def dequeue_event(self, no_wait: bool = False) -> Event: """Dequeues an event from the queue. + This implementation expects that dequeue to raise an exception when + the queue has been closed. In python 3.13+ this is naturally provided + by the QueueShutDown exception generated when the queue has closed and + the user is awaiting the queue.get method. Python<=3.12 this needs to + manage this lifecycle itself. The current implementation can lead to + blocking if the dequeue_event is called before the EventQueue has been + closed but when there are no events on the queue. Two ways to avoid this + are to call this with no_wait = True which won't block, but is the + callers responsibility to retry as appropriate. Alternatively, one can + use a async Task management solution to cancel the get task if the queue + has closed or some other condition is met. The implementation of the + EventConsumer uses an async.wait with a timeout to abort the + dequeue_event call and retry, when it will return with a closed error. + Args: no_wait: If True, retrieve an event immediately or raise `asyncio.QueueEmpty`. If False (default), wait until an event is available. @@ -66,6 +86,11 @@ async def dequeue_event(self, no_wait: bool = False) -> Event: asyncio.QueueEmpty: If `no_wait` is True and the queue is empty. asyncio.QueueShutDown: If the queue has been closed and is empty. """ + async with self._lock: + if self._is_closed and self.queue.empty(): + logger.warning('Queue is closed. Event will not be dequeued.') + raise asyncio.QueueEmpty('Queue is closed.') + if no_wait: logger.debug('Attempting to dequeue event (no_wait=True).') event = self.queue.get_nowait() @@ -99,13 +124,30 @@ def tap(self) -> 'EventQueue': self._children.append(queue) return queue - def close(self): + async def close(self): """Closes the queue for future push events. Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown` when the queue is empty. Also closes all child queues. """ logger.debug('Closing EventQueue.') - self.queue.shutdown() - for child in self._children: - child.close() + async with self._lock: + # If already closed, just return. + if self._is_closed: + return + self._is_closed = True + # If using python 3.13 or higher, use the shutdown method + if sys.version_info >= (3, 13): + self.queue.shutdown() + for child in self._children: + child.close() + # Otherwise, join the queue + else: + tasks = [asyncio.create_task(self.queue.join())] + for child in self._children: + tasks.append(asyncio.create_task(child.close())) + await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + + def is_closed(self) -> bool: + """Checks if the queue is closed.""" + return self._is_closed diff --git a/src/a2a/server/events/in_memory_queue_manager.py b/src/a2a/server/events/in_memory_queue_manager.py index db7663c4..7d7dc861 100644 --- a/src/a2a/server/events/in_memory_queue_manager.py +++ b/src/a2a/server/events/in_memory_queue_manager.py @@ -69,7 +69,7 @@ async def close(self, task_id: str): if task_id not in self._task_queue: raise NoTaskQueue() queue = self._task_queue.pop(task_id) - queue.close() + await queue.close() async def create_or_tap(self, task_id: str) -> EventQueue: """Creates a new event queue for a task ID if one doesn't exist, otherwise taps the existing one. diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index fa6d7ea8..e0ecd97a 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -147,7 +147,7 @@ async def _run_event_stream( queue: The event queue for the agent to publish to. """ await self.agent_executor.execute(request, queue) - queue.close() + await queue.close() async def on_message_send( self, params: MessageSendParams