From 8128e6248a84ffccf791fdc9c5a827e7186c9a16 Mon Sep 17 00:00:00 2001 From: Holt Skinner Date: Thu, 15 May 2025 12:48:53 -0500 Subject: [PATCH 1/6] fix: JSCPD errors - Refactor to remove copy/paste --- src/a2a/utils/telemetry.py | 54 ++++++++++++++------------------------ 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/src/a2a/utils/telemetry.py b/src/a2a/utils/telemetry.py index 926e3771..bb75e57e 100644 --- a/src/a2a/utils/telemetry.py +++ b/src/a2a/utils/telemetry.py @@ -136,10 +136,7 @@ def trace_function( f'Start tracing for {actual_span_name}, is_async_func {is_async_func}' ) - @functools.wraps(func) - async def async_wrapper(*args, **kwargs) -> any: - """Async Wrapper for the decorator.""" - logger.debug('Start async tracer') + async def _invoke_with_tracing(*args, **kwargs): tracer = trace.get_tracer( INSTRUMENTING_MODULE_NAME, INSTRUMENTING_MODULE_VERSION ) @@ -153,7 +150,11 @@ async def async_wrapper(*args, **kwargs) -> any: try: # Async wrapper, await for the function call to complete. - result = await func(*args, **kwargs) + if is_async_func: + result = await func(*args, **kwargs) + # Sync wrapper, execute the function call. + else: + result = func(*args, **kwargs) span.set_status(StatusCode.OK) return result @@ -173,39 +174,22 @@ async def async_wrapper(*args, **kwargs) -> any: f'attribute_extractor error in span {actual_span_name}: {attr_e}' ) + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + """Async Wrapper for the decorator.""" + logger.debug('Start async tracer') + return await _invoke_with_tracing( + *args, + **kwargs, + ) + @functools.wraps(func) def sync_wrapper(*args, **kwargs): """Sync Wrapper for the decorator.""" - tracer = trace.get_tracer(INSTRUMENTING_MODULE_NAME) - with tracer.start_as_current_span(actual_span_name, kind=kind) as span: - if attributes: - for k, v in attributes.items(): - span.set_attribute(k, v) - - result = None - exception = None - - try: - # Sync wrapper, execute the function call. - result = func(*args, **kwargs) - span.set_status(StatusCode.OK) - return result - - except Exception as e: - exception = e - span.record_exception(e) - span.set_status(StatusCode.ERROR, description=str(e)) - raise - finally: - if attribute_extractor: - try: - attribute_extractor( - span, args, kwargs, result, exception - ) - except Exception as attr_e: - logger.error( - f'attribute_extractor error in span {actual_span_name}: {attr_e}' - ) + return _invoke_with_tracing( + *args, + **kwargs, + ) return async_wrapper if is_async_func else sync_wrapper From a2e070db35931956975430f2e8589d3b8e79e069 Mon Sep 17 00:00:00 2001 From: Holt Skinner Date: Thu, 15 May 2025 12:49:27 -0500 Subject: [PATCH 2/6] fix copy/paste errors in src/a2a/client --- src/a2a/client/client.py | 75 ++++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/src/a2a/client/client.py b/src/a2a/client/client.py index 2f032707..f575fb67 100644 --- a/src/a2a/client/client.py +++ b/src/a2a/client/client.py @@ -26,6 +26,36 @@ ) +async def _make_httpx_request( + client: httpx.AsyncClient, + method: str, + url: str, + json_payload: dict[str, Any] | None = None, + http_kwargs: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Makes an HTTP request and handles common errors, returning parsed JSON.""" + try: + if method.upper() == 'GET': + response = await client.get(url, **(http_kwargs or {})) + elif method.upper() == 'POST': + response = await client.post( + url, json=json_payload, **(http_kwargs or {}) + ) + else: + raise ValueError(f'Unsupported HTTP method: {method}') + + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + raise A2AClientHTTPError(e.response.status_code, str(e)) from e + except json.JSONDecodeError as e: + raise A2AClientJSONError(str(e)) from e + except httpx.RequestError as e: + raise A2AClientHTTPError( + 503, f'Network communication error: {e}' + ) from e + + class A2ACardResolver: """Agent Card resolver.""" @@ -42,21 +72,13 @@ def __init__( async def get_agent_card( self, http_kwargs: dict[str, Any] | None = None ) -> AgentCard: - try: - response = await self.httpx_client.get( - f'{self.base_url}/{self.agent_card_path}', - **(http_kwargs or {}), - ) - response.raise_for_status() - return AgentCard.model_validate(response.json()) - except httpx.HTTPStatusError as e: - raise A2AClientHTTPError(e.response.status_code, str(e)) from e - except json.JSONDecodeError as e: - raise A2AClientJSONError(str(e)) from e - except httpx.RequestError as e: - raise A2AClientHTTPError( - 503, f'Network communication error: {e}' - ) from e + response_json = await _make_httpx_request( + client=self.httpx_client, + method='GET', + url=f'{self.base_url}/{self.agent_card_path}', + http_kwargs=http_kwargs, + ) + return AgentCard.model_validate(response_json) class A2AClient: @@ -152,22 +174,15 @@ async def _send_request( Args: rpc_request_payload: JSON RPC payload for sending the request - **kwargs: Additional keyword arguments to pass to the httpx client. + http_kwargs: Additional keyword arguments to pass to the httpx client. """ - try: - response = await self.httpx_client.post( - self.url, json=rpc_request_payload, **(http_kwargs or {}) - ) - response.raise_for_status() - return response.json() - except httpx.HTTPStatusError as e: - raise A2AClientHTTPError(e.response.status_code, str(e)) from e - except json.JSONDecodeError as e: - raise A2AClientJSONError(str(e)) from e - except httpx.RequestError as e: - raise A2AClientHTTPError( - 503, f'Network communication error: {e}' - ) from e + return await _make_httpx_request( + client=self.httpx_client, + method='POST', + url=self.url, + json_payload=rpc_request_payload, + http_kwargs=http_kwargs, + ) async def get_task( self, From 5b5851d77bab6b2ff97aa5e1eae2b20f0f829152 Mon Sep 17 00:00:00 2001 From: Holt Skinner Date: Thu, 15 May 2025 13:14:16 -0500 Subject: [PATCH 3/6] Set threshold to 0 --- .github/linters/.jscpd.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/linters/.jscpd.json b/.github/linters/.jscpd.json index fb0f3b60..f8ac4cca 100644 --- a/.github/linters/.jscpd.json +++ b/.github/linters/.jscpd.json @@ -1,5 +1,5 @@ { "ignore": ["**/.github/**", "**/.git/**", "**/tests/**", "**/examples/**"], - "threshold": 3, + "threshold": 0, "reporters": ["html", "markdown"] } From 77658562ff41fc13f581aefaed2dc45650443339 Mon Sep 17 00:00:00 2001 From: Holt Skinner Date: Fri, 16 May 2025 10:16:19 -0500 Subject: [PATCH 4/6] Formatting --- src/a2a/client/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/a2a/client/client.py b/src/a2a/client/client.py index f575fb67..eece0a25 100644 --- a/src/a2a/client/client.py +++ b/src/a2a/client/client.py @@ -133,7 +133,7 @@ async def send_message_streaming( request: SendStreamingMessageRequest, *, http_kwargs: dict[str, Any] | None = None, - ) -> AsyncGenerator[SendStreamingMessageResponse, None]: + ) -> AsyncGenerator[SendStreamingMessageResponse]: if not request.id: request.id = str(uuid4()) From 0033b827094f574aa77bd68ce27d46d276490481 Mon Sep 17 00:00:00 2001 From: Holt Skinner Date: Mon, 2 Jun 2025 14:48:54 -0500 Subject: [PATCH 5/6] Reset client changes --- src/a2a/client/client.py | 82 ++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 45 deletions(-) diff --git a/src/a2a/client/client.py b/src/a2a/client/client.py index 47d4b943..5bcc36f1 100644 --- a/src/a2a/client/client.py +++ b/src/a2a/client/client.py @@ -1,54 +1,35 @@ import json import logging + from collections.abc import AsyncGenerator from typing import Any from uuid import uuid4 import httpx + from httpx_sse import SSEError, aconnect_sse from pydantic import ValidationError from a2a.client.errors import A2AClientHTTPError, A2AClientJSONError -from a2a.types import (AgentCard, CancelTaskRequest, CancelTaskResponse, - GetTaskPushNotificationConfigRequest, - GetTaskPushNotificationConfigResponse, GetTaskRequest, - GetTaskResponse, SendMessageRequest, - SendMessageResponse, SendStreamingMessageRequest, - SendStreamingMessageResponse, - SetTaskPushNotificationConfigRequest, - SetTaskPushNotificationConfigResponse) +from a2a.types import ( + AgentCard, + CancelTaskRequest, + CancelTaskResponse, + GetTaskPushNotificationConfigRequest, + GetTaskPushNotificationConfigResponse, + GetTaskRequest, + GetTaskResponse, + SendMessageRequest, + SendMessageResponse, + SendStreamingMessageRequest, + SendStreamingMessageResponse, + SetTaskPushNotificationConfigRequest, + SetTaskPushNotificationConfigResponse, +) from a2a.utils.telemetry import SpanKind, trace_class -logger = logging.getLogger(__name__) -async def _make_httpx_request( - client: httpx.AsyncClient, - method: str, - url: str, - json_payload: dict[str, Any] | None = None, - http_kwargs: dict[str, Any] | None = None, -) -> dict[str, Any]: - """Makes an HTTP request and handles common errors, returning parsed JSON.""" - try: - if method.upper() == 'GET': - response = await client.get(url, **(http_kwargs or {})) - elif method.upper() == 'POST': - response = await client.post( - url, json=json_payload, **(http_kwargs or {}) - ) - else: - raise ValueError(f'Unsupported HTTP method: {method}') - - response.raise_for_status() - return response.json() - except httpx.HTTPStatusError as e: - raise A2AClientHTTPError(e.response.status_code, str(e)) from e - except json.JSONDecodeError as e: - raise A2AClientJSONError(str(e)) from e - except httpx.RequestError as e: - raise A2AClientHTTPError( - 503, f'Network communication error: {e}' - ) from e +logger = logging.getLogger(__name__) class A2ACardResolver: @@ -135,6 +116,7 @@ async def get_agent_card( raise A2AClientJSONError( f'Failed to validate agent card structure from {target_url}: {e.json()}' ) from e + return agent_card @@ -189,6 +171,7 @@ async def get_client_from_agent_card_url( agent_card_path: The path to the agent card endpoint, relative to the base URL. http_kwargs: Optional dictionary of keyword arguments to pass to the underlying httpx.get request when fetching the agent card. + Returns: An initialized `A2AClient` instance. @@ -198,7 +181,9 @@ async def get_client_from_agent_card_url( """ agent_card: AgentCard = await A2ACardResolver( httpx_client, base_url=base_url, agent_card_path=agent_card_path - ).get_agent_card(http_kwargs=http_kwargs) # Fetches public card by default + ).get_agent_card( + http_kwargs=http_kwargs + ) # Fetches public card by default return A2AClient(httpx_client=httpx_client, agent_card=agent_card) async def send_message( @@ -304,13 +289,20 @@ async def _send_request( A2AClientHTTPError: If an HTTP error occurs during the request. A2AClientJSONError: If the response body cannot be decoded as JSON. """ - return await _make_httpx_request( - client=self.httpx_client, - method='POST', - url=self.url, - json_payload=rpc_request_payload, - http_kwargs=http_kwargs, - ) + try: + response = await self.httpx_client.post( + self.url, json=rpc_request_payload, **(http_kwargs or {}) + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + raise A2AClientHTTPError(e.response.status_code, str(e)) from e + except json.JSONDecodeError as e: + raise A2AClientJSONError(str(e)) from e + except httpx.RequestError as e: + raise A2AClientHTTPError( + 503, f'Network communication error: {e}' + ) from e async def get_task( self, From 39c4e72d6f46ff5bfe22ea550252a7227f022158 Mon Sep 17 00:00:00 2001 From: Holt Skinner Date: Mon, 2 Jun 2025 14:51:20 -0500 Subject: [PATCH 6/6] Undo telemetry changes --- src/a2a/utils/telemetry.py | 54 ++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/src/a2a/utils/telemetry.py b/src/a2a/utils/telemetry.py index 6c4de08b..0aeee931 100644 --- a/src/a2a/utils/telemetry.py +++ b/src/a2a/utils/telemetry.py @@ -138,7 +138,10 @@ def trace_function( f'Start tracing for {actual_span_name}, is_async_func {is_async_func}' ) - async def _invoke_with_tracing(*args, **kwargs): + @functools.wraps(func) + async def async_wrapper(*args, **kwargs) -> any: + """Async Wrapper for the decorator.""" + logger.debug('Start async tracer') tracer = trace.get_tracer( INSTRUMENTING_MODULE_NAME, INSTRUMENTING_MODULE_VERSION ) @@ -152,11 +155,7 @@ async def _invoke_with_tracing(*args, **kwargs): try: # Async wrapper, await for the function call to complete. - if is_async_func: - result = await func(*args, **kwargs) - # Sync wrapper, execute the function call. - else: - result = func(*args, **kwargs) + result = await func(*args, **kwargs) span.set_status(StatusCode.OK) return result @@ -176,22 +175,39 @@ async def _invoke_with_tracing(*args, **kwargs): f'attribute_extractor error in span {actual_span_name}: {attr_e}' ) - @functools.wraps(func) - async def async_wrapper(*args, **kwargs): - """Async Wrapper for the decorator.""" - logger.debug('Start async tracer') - return await _invoke_with_tracing( - *args, - **kwargs, - ) - @functools.wraps(func) def sync_wrapper(*args, **kwargs): """Sync Wrapper for the decorator.""" - return _invoke_with_tracing( - *args, - **kwargs, - ) + tracer = trace.get_tracer(INSTRUMENTING_MODULE_NAME) + with tracer.start_as_current_span(actual_span_name, kind=kind) as span: + if attributes: + for k, v in attributes.items(): + span.set_attribute(k, v) + + result = None + exception = None + + try: + # Sync wrapper, execute the function call. + result = func(*args, **kwargs) + span.set_status(StatusCode.OK) + return result + + except Exception as e: + exception = e + span.record_exception(e) + span.set_status(StatusCode.ERROR, description=str(e)) + raise + finally: + if attribute_extractor: + try: + attribute_extractor( + span, args, kwargs, result, exception + ) + except Exception as attr_e: + logger.error( + f'attribute_extractor error in span {actual_span_name}: {attr_e}' + ) return async_wrapper if is_async_func else sync_wrapper