Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/google/adk/a2a/converters/request_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.genai import types as genai_types
from pydantic import BaseModel

from ...agents.run_config import StreamingMode
from ...runners import RunConfig
from ..experimental import a2a_experimental
from .part_converter import A2APartToGenAIPartConverter
Expand All @@ -44,6 +45,7 @@ class AgentRunRequest(BaseModel):
[
RequestContext,
A2APartToGenAIPartConverter,
Optional[StreamingMode],
],
AgentRunRequest,
]
Expand All @@ -55,6 +57,8 @@ class AgentRunRequest(BaseModel):
Args:
request: The incoming request context from the A2A server.
part_converter: A function to convert A2A content parts to GenAI parts.
streaming_mode: Optional streaming mode for the agent execution. If None,
defaults to StreamingMode.NONE.

Returns:
An RunnerRequest object containing the keyword arguments for ADK runner's run_async method.
Expand All @@ -78,12 +82,15 @@ def _get_user_id(request: RequestContext) -> str:
def convert_a2a_request_to_agent_run_request(
request: RequestContext,
part_converter: A2APartToGenAIPartConverter = convert_a2a_part_to_genai_part,
streaming_mode: Optional[StreamingMode] = None,
) -> AgentRunRequest:
"""Converts an A2A RequestContext to an AgentRunRequest model.

Args:
request: The incoming request context from the A2A server.
part_converter: A function to convert A2A content parts to GenAI parts.
streaming_mode: Optional streaming mode for the agent execution. If None,
defaults to StreamingMode.NONE.

Returns:
A AgentRunRequest object ready to be used as arguments for the ADK runner.
Expand Down Expand Up @@ -113,5 +120,8 @@ def convert_a2a_request_to_agent_run_request(
role='user',
parts=output_parts,
),
run_config=RunConfig(custom_metadata=custom_metadata),
run_config=RunConfig(
custom_metadata=custom_metadata,
streaming_mode=streaming_mode or StreamingMode.NONE,
),
)
4 changes: 4 additions & 0 deletions src/google/adk/a2a/executor/a2a_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from pydantic import BaseModel
from typing_extensions import override

from ...agents.run_config import StreamingMode
from ...utils.context_utils import Aclosing
from ..converters.event_converter import AdkEventToA2AEventsConverter
from ..converters.event_converter import convert_event_to_a2a_events
Expand Down Expand Up @@ -69,6 +70,8 @@ class A2aAgentExecutorConfig(BaseModel):
convert_a2a_request_to_agent_run_request
)
event_converter: AdkEventToA2AEventsConverter = convert_event_to_a2a_events
streaming_mode: Optional[StreamingMode] = None
"""Optional streaming mode for agent execution. If None, defaults to StreamingMode.NONE."""


@a2a_experimental
Expand Down Expand Up @@ -190,6 +193,7 @@ async def _handle_request(
run_request = self._config.request_converter(
context,
self._config.a2a_part_converter,
self._config.streaming_mode,
)

# ensure the session exists
Expand Down
16 changes: 16 additions & 0 deletions src/google/adk/a2a/utils/agent_to_a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
from starlette.applications import Starlette

from ...agents.base_agent import BaseAgent
from ...agents.run_config import StreamingMode
from ...artifacts.in_memory_artifact_service import InMemoryArtifactService
from ...auth.credential_service.in_memory_credential_service import InMemoryCredentialService
from ...memory.in_memory_memory_service import InMemoryMemoryService
from ...runners import Runner
from ...sessions.in_memory_session_service import InMemorySessionService
from ..executor.a2a_agent_executor import A2aAgentExecutor
from ..executor.a2a_agent_executor import A2aAgentExecutorConfig
from ..experimental import a2a_experimental
from .agent_card_builder import AgentCardBuilder

Expand Down Expand Up @@ -79,6 +81,7 @@ def to_a2a(
protocol: str = "http",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
streaming_mode: Optional[StreamingMode] = None,
) -> Starlette:
"""Convert an ADK agent to a A2A Starlette application.

Expand All @@ -92,6 +95,10 @@ def to_a2a(
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
streaming_mode: Optional streaming mode for agent execution. If None,
defaults to StreamingMode.NONE. Set to StreamingMode.SSE
to enable Server-Sent Events streaming for real-time
responses.

Returns:
A Starlette application that can be run with uvicorn
Expand All @@ -103,6 +110,9 @@ def to_a2a(

# Or with custom agent card:
app = to_a2a(agent, agent_card=my_custom_agent_card)

# With SSE streaming enabled:
app = to_a2a(agent, streaming_mode=StreamingMode.SSE)
"""
# Set up ADK logging to ensure logs are visible when using uvicorn directly
adk_logger = logging.getLogger("google_adk")
Expand All @@ -123,8 +133,14 @@ async def create_runner() -> Runner:
# Create A2A components
task_store = InMemoryTaskStore()

# Create executor config with streaming mode
executor_config = A2aAgentExecutorConfig(
streaming_mode=streaming_mode,
)

agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
config=executor_config,
)

request_handler = DefaultRequestHandler(
Expand Down