Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions python/packages/ag-ui/agent_framework_ag_ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@

from ._agent import AgentFrameworkAgent
from ._client import AGUIChatClient
from ._confirmation_strategies import (
ConfirmationStrategy,
DefaultConfirmationStrategy,
DocumentWriterConfirmationStrategy,
RecipeConfirmationStrategy,
TaskPlannerConfirmationStrategy,
)
from ._endpoint import add_agent_framework_fastapi_endpoint
from ._event_converters import AGUIEventConverter
from ._http_service import AGUIHttpService
Expand All @@ -35,13 +28,8 @@
"AGUIHttpService",
"AGUIRequest",
"AgentState",
"ConfirmationStrategy",
"DefaultConfirmationStrategy",
"PredictStateConfig",
"RunMetadata",
"TaskPlannerConfirmationStrategy",
"RecipeConfirmationStrategy",
"DocumentWriterConfirmationStrategy",
"DEFAULT_TAGS",
"__version__",
]
100 changes: 12 additions & 88 deletions python/packages/ag-ui/agent_framework_ag_ui/_agent.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.

"""AgentFrameworkAgent wrapper for AG-UI protocol - Clean Architecture."""
"""AgentFrameworkAgent wrapper for AG-UI protocol."""

from collections.abc import AsyncGenerator
from typing import Any, cast

from ag_ui.core import BaseEvent
from agent_framework import AgentProtocol

from ._confirmation_strategies import ConfirmationStrategy, DefaultConfirmationStrategy
from ._orchestrators import (
DefaultOrchestrator,
ExecutionContext,
HumanInTheLoopOrchestrator,
Orchestrator,
)
from ._run import run_agent_stream


class AgentConfig:
Expand All @@ -33,7 +27,7 @@ def __init__(
state_schema: Optional state schema for state management; accepts dict or Pydantic model/class
predict_state_config: Configuration for predictive state updates
use_service_thread: Whether the agent thread is service-managed
require_confirmation: Whether predictive updates require confirmation
require_confirmation: Whether predictive updates require user confirmation before applying
"""
self.state_schema = self._normalize_state_schema(state_schema)
self.predict_state_config = predict_state_config or {}
Expand All @@ -58,12 +52,12 @@ def _normalize_state_schema(state_schema: Any | None) -> dict[str, Any]:
base_model_type = None

if base_model_type is not None and isinstance(state_schema, base_model_type):
schema_dict = state_schema.__class__.model_json_schema()
schema_dict = state_schema.__class__.model_json_schema() # type: ignore[union-attr]
return schema_dict.get("properties", {}) or {}

if base_model_type is not None and isinstance(state_schema, type) and issubclass(state_schema, base_model_type):
schema_dict = state_schema.model_json_schema()
return schema_dict.get("properties", {}) or {}
schema_dict = state_schema.model_json_schema() # type: ignore[union-attr]
return schema_dict.get("properties", {}) or {} # type: ignore

return {}

Expand All @@ -72,12 +66,7 @@ class AgentFrameworkAgent:
"""Wraps Agent Framework agents for AG-UI protocol compatibility.

Translates between Agent Framework's AgentProtocol and AG-UI's event-based
protocol. Uses orchestrators to handle different execution flows (standard
execution, human-in-the-loop, etc.). Orchestrators are checked in order;
the first matching orchestrator handles the request.

Supports predictive state updates for agentic generative UI, with optional
confirmation requirements configurable per use case.
protocol. Follows a simple linear flow: RunStarted -> content events -> RunFinished.
"""

def __init__(
Expand All @@ -88,9 +77,7 @@ def __init__(
state_schema: Any | None = None,
predict_state_config: dict[str, dict[str, str]] | None = None,
require_confirmation: bool = True,
orchestrators: list[Orchestrator] | None = None,
use_service_thread: bool = False,
confirmation_strategy: ConfirmationStrategy | None = None,
):
"""Initialize the AG-UI compatible agent wrapper.

Expand All @@ -99,15 +86,9 @@ def __init__(
name: Optional name for the agent
description: Optional description
state_schema: Optional state schema for state management; accepts dict or Pydantic model/class
predict_state_config: Configuration for predictive state updates.
Format: {"state_key": {"tool": "tool_name", "tool_argument": "arg_name"}}
require_confirmation: Whether predictive updates require confirmation.
Set to False for agentic generative UI that updates automatically.
orchestrators: Custom orchestrators (auto-configured if None).
Orchestrators are checked in order; first match handles the request.
use_service_thread: Whether the agent thread is service-managed.
confirmation_strategy: Strategy for generating confirmation messages.
Defaults to DefaultConfirmationStrategy if None.
predict_state_config: Configuration for predictive state updates
require_confirmation: Whether predictive updates require user confirmation before applying
use_service_thread: Whether the agent thread is service-managed
"""
self.agent = agent
self.name = name or getattr(agent, "name", "agent")
Expand All @@ -120,74 +101,17 @@ def __init__(
require_confirmation=require_confirmation,
)

# Configure orchestrators
if orchestrators is None:
self.orchestrators = self._default_orchestrators()
else:
self.orchestrators = orchestrators

# Configure confirmation strategy
if confirmation_strategy is None:
self.confirmation_strategy: ConfirmationStrategy = DefaultConfirmationStrategy()
else:
self.confirmation_strategy = confirmation_strategy

def _default_orchestrators(self) -> list[Orchestrator]:
"""Create default orchestrator chain.

Returns:
List of orchestrators in priority order. First matching orchestrator
handles the request, so order matters.
"""
return [
HumanInTheLoopOrchestrator(), # Handle tool approval responses
# Add more specialized orchestrators here as needed
DefaultOrchestrator(), # Fallback: standard agent execution
]

async def run_agent(
self,
input_data: dict[str, Any],
) -> AsyncGenerator[BaseEvent, None]:
"""Run the agent and yield AG-UI events.

This is the ONLY public method - much simpler than the original 376-line
implementation. All orchestration logic has been extracted into dedicated
Orchestrator classes.

The method creates an ExecutionContext with all needed data, then finds
the first orchestrator that can handle the request and delegates to it.

Args:
input_data: The AG-UI run input containing messages, state, etc.

Yields:
AG-UI events

Raises:
RuntimeError: If no orchestrator matches (should never happen if
DefaultOrchestrator is last in the chain)
"""
# Create execution context with all needed data
context = ExecutionContext(
input_data=input_data,
agent=self.agent,
config=self.config,
confirmation_strategy=self.confirmation_strategy,
)

# Find matching orchestrator and execute
for orchestrator in self.orchestrators:
if orchestrator.can_handle(context):
async for event in orchestrator.run(context):
yield event
return

# Should never reach here if DefaultOrchestrator is last
raise RuntimeError("No orchestrator matched - check configuration")


__all__ = [
"AgentFrameworkAgent",
"AgentConfig",
]
async for event in run_agent_stream(input_data, self.agent, self.config):
yield event
4 changes: 2 additions & 2 deletions python/packages/ag-ui/agent_framework_ag_ui/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _apply_server_function_call_unwrap(chat_client: TBaseChatClient) -> TBaseCha
original_get_streaming_response = chat_client.get_streaming_response

@wraps(original_get_streaming_response)
async def streaming_wrapper(self, *args: Any, **kwargs: Any) -> AsyncIterable[ChatResponseUpdate]:
async def streaming_wrapper(self: Any, *args: Any, **kwargs: Any) -> AsyncIterable[ChatResponseUpdate]:
async for update in original_get_streaming_response(self, *args, **kwargs):
_unwrap_server_function_call_contents(cast(MutableSequence[Content | dict[str, Any]], update.contents))
yield update
Expand All @@ -84,7 +84,7 @@ async def streaming_wrapper(self, *args: Any, **kwargs: Any) -> AsyncIterable[Ch
original_get_response = chat_client.get_response

@wraps(original_get_response)
async def response_wrapper(self, *args: Any, **kwargs: Any) -> ChatResponse:
async def response_wrapper(self: Any, *args: Any, **kwargs: Any) -> ChatResponse:
response = await original_get_response(self, *args, **kwargs)
if response.messages:
for message in response.messages:
Expand Down
Loading
Loading