diff --git a/src/google/adk/a2a/converters/request_converter.py b/src/google/adk/a2a/converters/request_converter.py index 17989374d6..689e7e398f 100644 --- a/src/google/adk/a2a/converters/request_converter.py +++ b/src/google/adk/a2a/converters/request_converter.py @@ -22,6 +22,7 @@ from google.genai import types as genai_types from pydantic import BaseModel +from ...agents.run_config import RunConfig,StreamingMode from ...runners import RunConfig from ..experimental import a2a_experimental from .part_converter import A2APartToGenAIPartConverter @@ -115,3 +116,46 @@ def convert_a2a_request_to_agent_run_request( ), run_config=RunConfig(custom_metadata=custom_metadata), ) + +@a2a_experimental +def create_request_converter( + streaming_mode: StreamingMode = StreamingMode.NONE + ) -> A2ARequestToAgentRunRequestConverter: + """Creates a request converter with specified streaming mode. + + Args: + streaming_mode: The streaming mode to use for the agent execution. + + Returns: + A Converter function configured with the specified streaming mode. + """ + def converter( + request: RequestContext, + part_converter: A2APartToGenAIPartConverter = convert_a2a_part_to_genai_part, + )-> AgentRunRequest: + + if not request.message: + raise ValueError('Request message cannot be None') + + custom_metadata = {} + if request.metadata: + custom_metadata['a2a_metadata'] = request.metadata + + output_parts = [] + for a2a_part in request.message.parts: + genai_parts = part_converter(a2a_part) + if not isinstance(genai_parts, list): + genai_parts = [genai_parts] if genai_parts else [] + output_parts.extend(genai_parts) + + return AgentRunRequest( + user_id=_get_user_id(request), + session_id=request.context_id, + new_message=genai_types.Content( + role='user', + parts=output_parts, + ), + run_config=RunConfig(streaming_mode=streaming_mode, custom_metadata=custom_metadata), + ) + + return converter \ No newline at end of file diff --git a/src/google/adk/a2a/utils/agent_to_a2a.py b/src/google/adk/a2a/utils/agent_to_a2a.py index 155888bcab..e8350a9fdd 100644 --- a/src/google/adk/a2a/utils/agent_to_a2a.py +++ b/src/google/adk/a2a/utils/agent_to_a2a.py @@ -24,6 +24,7 @@ from a2a.types import AgentCard from starlette.applications import Starlette +from ...agents.run_config import StreamingMode from ...agents.base_agent import BaseAgent from ...artifacts.in_memory_artifact_service import InMemoryArtifactService from ...auth.credential_service.in_memory_credential_service import InMemoryCredentialService @@ -33,7 +34,8 @@ from ..executor.a2a_agent_executor import A2aAgentExecutor from ..experimental import a2a_experimental from .agent_card_builder import AgentCardBuilder - +from ..executor.a2a_agent_executor import A2aAgentExecutorConfig +from ..converters.request_converter import create_request_converter def _load_agent_card( agent_card: Optional[Union[AgentCard, str]], @@ -79,6 +81,7 @@ def to_a2a( protocol: str = "http", agent_card: Optional[Union[AgentCard, str]] = None, runner: Optional[Runner] = None, + streaming_mode: StreamingMode = StreamingMode.NONE ) -> Starlette: """Convert an ADK agent to a A2A Starlette application. @@ -92,6 +95,7 @@ def to_a2a( agent. runner: Optional pre-built Runner object. If not provided, a default runner will be created using in-memory services. + streaming_mode: The streaming mode to use for the request converter. (default: StreamingMode.NONE) Returns: A Starlette application that can be run with uvicorn @@ -122,9 +126,14 @@ async def create_runner() -> Runner: # Create A2A components task_store = InMemoryTaskStore() + + executor_config = A2aAgentExecutorConfig( + request_converter=create_request_converter(streaming_mode=streaming_mode) + ) agent_executor = A2aAgentExecutor( runner=runner or create_runner, + config=executor_config, ) request_handler = DefaultRequestHandler(