Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/a2a/server/tasks/base_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,26 @@ async def _dispatch_notification(
) -> bool:
url = push_info.url
try:
headers = None
headers = {}
if push_info.token:
headers = {'X-A2A-Notification-Token': push_info.token}
headers['X-A2A-Notification-Token'] = push_info.token

# Add authentication header if configured
if push_info.authentication and push_info.authentication.schemes:
for scheme in push_info.authentication.schemes:
if (
scheme.lower() == 'bearer'
and push_info.authentication.credentials
):
headers['Authorization'] = (
f'Bearer {push_info.authentication.credentials}'
)
break

response = await self._client.post(
url,
json=task.model_dump(mode='json', exclude_none=True),
headers=headers,
headers=headers if headers else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

This conditional assignment is not necessary. The httpx client accepts an empty dictionary for headers, which is what headers would be if no token or authentication is provided. You can simplify this by just passing headers directly.

                headers=headers,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
response.raise_for_status()
logger.info(
Expand Down
90 changes: 89 additions & 1 deletion tests/server/tasks/test_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BasePushNotificationSender,
)
from a2a.types import (
PushNotificationAuthenticationInfo,
PushNotificationConfig,
Task,
TaskState,
Expand All @@ -29,8 +30,11 @@ def create_sample_push_config(
url: str = 'http://example.com/callback',
config_id: str = 'cfg1',
token: str | None = None,
authentication: PushNotificationAuthenticationInfo | None = None,
) -> PushNotificationConfig:
return PushNotificationConfig(id=config_id, url=url, token=token)
return PushNotificationConfig(
id=config_id, url=url, token=token, authentication=authentication
)


class TestBasePushNotificationSender(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -92,6 +96,90 @@ async def test_send_notification_with_token_success(self) -> None:
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_bearer_authentication(self) -> None:
task_id = 'task_send_bearer_auth'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Bearer'], credentials='test-jwt-token'
)
config = create_sample_push_config(
url='http://notify.me/here',
token='unique_token',
authentication=auth_info,
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

self.mock_config_store.get_info.assert_awaited_once_with(task_id)

# assert httpx_client post method got invoked with right parameters
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers={
'X-A2A-Notification-Token': 'unique_token',
'Authorization': 'Bearer test-jwt-token',
},
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_bearer_authentication_no_credentials(
self,
) -> None:
task_id = 'task_send_bearer_no_creds'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Bearer'], credentials=None
)
config = create_sample_push_config(
url='http://notify.me/here', authentication=auth_info
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

# Should not add Authorization header when credentials are missing
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers=None,
)

async def test_send_notification_with_non_bearer_authentication(
self,
) -> None:
task_id = 'task_send_non_bearer'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Basic'], credentials='user:pass'
)
config = create_sample_push_config(
url='http://notify.me/here', authentication=auth_info
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

# Should not add Authorization header for non-Bearer schemes
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers=None,
)

async def test_send_notification_no_config(self) -> None:
task_id = 'task_send_no_config'
task_data = create_sample_task(task_id=task_id)
Expand Down
Loading