Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

strategy:
matrix:
python-version: ["3.13"]
python-version: ["3.10", "3.13"]

steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
line-length = 80 # Google Style Guide §3.2: 80 columns
indent-width = 4 # Google Style Guide §3.4: 4 spaces

target-version = "py313" # Minimum Python version
target-version = "py310" # Minimum Python version

[lint]
ignore = [
Expand Down
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import nox


DEFAULT_PYTHON_VERSION = '3.13'
DEFAULT_PYTHON_VERSION = '3.10'

CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute()

Expand Down Expand Up @@ -127,7 +127,7 @@ def format(session):
session.run(
'pyupgrade',
'--exit-zero-even-if-changed',
'--py313-plus',
'--py310-plus',
*lint_paths_py,
)
session.run(
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "A2A Python SDK"
readme = "README.md"
license = { file = "LICENSE" }
authors = [{ name = "Google LLC", email = "googleapis-packages@google.com" }]
requires-python = ">=3.13"
requires-python = ">=3.10"
keywords = ["A2A", "A2A SDK", "A2A Protocol", "Agent2Agent"]
dependencies = [
"httpx>=0.28.1",
Expand All @@ -22,6 +22,9 @@ classifiers = [
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Operating System :: OS Independent",
"Topic :: Software Development :: Libraries :: Python Modules",
Expand Down
17 changes: 14 additions & 3 deletions src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import sys

from collections.abc import AsyncGenerator

Expand All @@ -15,6 +16,13 @@
from a2a.utils.telemetry import SpanKind, trace_class


# This is an alias to the exception for closed queue
QueueClosed = asyncio.QueueEmpty

# When using python 3.13 or higher, the closed queue signal is QueueShutdown
if sys.version_info >= (3, 13):
QueueClosed = asyncio.QueueShutDown

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -111,13 +119,16 @@ async def consume_all(self) -> AsyncGenerator[Event]:

if is_final_event:
logger.debug('Stopping event consumption in consume_all.')
self.queue.close()
await self.queue.close()
break
except TimeoutError:
# continue polling until there is a final event
continue
except asyncio.QueueShutDown:
break
except QueueClosed:
# Confirm that the queue is closed, e.g. we aren't on
# python 3.12 and get a queue empty error on an open queue
if self.queue.is_closed():
break

def agent_task_callback(self, agent_task: asyncio.Task[None]):
"""Callback to handle exceptions from the agent's execution task.
Expand Down
50 changes: 46 additions & 4 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import sys

from a2a.types import (
A2AError,
Expand Down Expand Up @@ -39,6 +40,8 @@ def __init__(self) -> None:
"""Initializes the EventQueue."""
self.queue: asyncio.Queue[Event] = asyncio.Queue()
self._children: list[EventQueue] = []
self._is_closed = False
self._lock = asyncio.Lock()
logger.debug('EventQueue initialized.')

def enqueue_event(self, event: Event):
Expand All @@ -47,6 +50,9 @@ def enqueue_event(self, event: Event):
Args:
event: The event object to enqueue.
"""
if self._is_closed:
logger.warning('Queue is closed. Event will not be enqueued.')
return
logger.debug(f'Enqueuing event of type: {type(event)}')
self.queue.put_nowait(event)
for child in self._children:
Expand All @@ -55,6 +61,20 @@ def enqueue_event(self, event: Event):
async def dequeue_event(self, no_wait: bool = False) -> Event:
"""Dequeues an event from the queue.

This implementation expects that dequeue to raise an exception when
the queue has been closed. In python 3.13+ this is naturally provided
by the QueueShutDown exception generated when the queue has closed and
the user is awaiting the queue.get method. Python<=3.12 this needs to
manage this lifecycle itself. The current implementation can lead to
blocking if the dequeue_event is called before the EventQueue has been
closed but when there are no events on the queue. Two ways to avoid this
are to call this with no_wait = True which won't block, but is the
callers responsibility to retry as appropriate. Alternatively, one can
use a async Task management solution to cancel the get task if the queue
has closed or some other condition is met. The implementation of the
EventConsumer uses an async.wait with a timeout to abort the
dequeue_event call and retry, when it will return with a closed error.

Args:
no_wait: If True, retrieve an event immediately or raise `asyncio.QueueEmpty`.
If False (default), wait until an event is available.
Expand All @@ -66,6 +86,11 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
asyncio.QueueEmpty: If `no_wait` is True and the queue is empty.
asyncio.QueueShutDown: If the queue has been closed and is empty.
"""
async with self._lock:
if self._is_closed and self.queue.empty():
logger.warning('Queue is closed. Event will not be dequeued.')
raise asyncio.QueueEmpty('Queue is closed.')

if no_wait:
logger.debug('Attempting to dequeue event (no_wait=True).')
event = self.queue.get_nowait()
Expand Down Expand Up @@ -99,13 +124,30 @@ def tap(self) -> 'EventQueue':
self._children.append(queue)
return queue

def close(self):
async def close(self):
"""Closes the queue for future push events.

Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
when the queue is empty. Also closes all child queues.
"""
logger.debug('Closing EventQueue.')
self.queue.shutdown()
for child in self._children:
child.close()
async with self._lock:
# If already closed, just return.
if self._is_closed:
return
self._is_closed = True
# If using python 3.13 or higher, use the shutdown method
if sys.version_info >= (3, 13):
self.queue.shutdown()
for child in self._children:
child.close()
# Otherwise, join the queue
else:
tasks = [asyncio.create_task(self.queue.join())]
for child in self._children:
tasks.append(asyncio.create_task(child.close()))
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

def is_closed(self) -> bool:
"""Checks if the queue is closed."""
return self._is_closed
2 changes: 1 addition & 1 deletion src/a2a/server/events/in_memory_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def close(self, task_id: str):
if task_id not in self._task_queue:
raise NoTaskQueue()
queue = self._task_queue.pop(task_id)
queue.close()
await queue.close()

async def create_or_tap(self, task_id: str) -> EventQueue:
"""Creates a new event queue for a task ID if one doesn't exist, otherwise taps the existing one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def _run_event_stream(
queue: The event queue for the agent to publish to.
"""
await self.agent_executor.execute(request, queue)
queue.close()
await queue.close()

async def on_message_send(
self, params: MessageSendParams
Expand Down