Skip to content

A simple Test Repo for showcasing how to use the Orchestrator Pattern with streaming subagents who hand control back to the orchestrator after finishing

Notifications You must be signed in to change notification settings

herzogmatthias/google-adk-custom-agent-orchestrator-setup

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Orchestrator Pattern with Streaming Sub-Agents

A Google ADK implementation showing how an orchestrator delegates to specialized sub-agents that stream output and return control.

Architecture

User → [Orchestrator] → delegates to → [Researcher/Writer] → streams output
                ↑                              ↓
                └──── returns control ─────────┘

Flow:

  1. Orchestrator talks to user, calls delegate_research(topic)
  2. Researcher streams output → saves to state["researcher_result"]
  3. Control returns to orchestrator with result
  4. Orchestrator calls delegate_writing(data)
  5. Writer streams output → saves to state["writer_result"]
  6. Orchestrator presents final result

Core Implementation

Tool Interception

# Define dummy tools (orchestrator sees these)
def delegate_research(topic: str) -> str: return "dummy"
def delegate_writing(raw_data: str) -> str: return "dummy"

# Manager intercepts and routes to real agents
if tool_name == "delegate_research":
    worker = self.researcher

Sub-Agent Setup

researcher = LlmAgent(
    name="researcher",
    output_key="researcher_result",  # Auto-save output
    include_contents='none',         # Isolated context
    instruction="Task: {task}\nProduce research..."
)

Manager Loop

async def _run_async_impl(self, ctx):
    while keep_looping:
        # 1. Run orchestrator until tool call
        async for event in self.orchestrator.run_async(ctx):
            yield event
            if tool_call in managed_tools:
                break

        # 2. Run sub-agent (streams to user)
        async for event in worker.run_async(ctx):
            yield event

        # 3. Inject synthetic tool response
        # 4. Loop continues with orchestrator

Quick Start

# Install
pip install google-adk litellm python-dotenv

# Configure
echo "OPENAI_API_KEY=your_key" > .env

# Run
adk web --reload_agents

Try: "Research quantum computing and write a summary"

Why This Pattern?

Advantages:

  • Only orchestrator talks to users (clean UX)
  • Sub-agents stream in real-time
  • Iterative refinement (orchestrator reacts to results)
  • No AgentTool complexity

Use Cases:

  • Multi-step workflows (research → write → review)
  • Specialized experts (legal, medical, technical)
  • Quality loops (draft → critique → revise)

Key Points

  1. Virtual Tools: Orchestrator calls delegate_research() but it's not a real function
  2. Interception: Manager catches the call and routes to actual sub-agent
  3. Streaming: Sub-agent output streams to user immediately
  4. State: Results saved to state["researcher_result"] for orchestrator to read
  5. Synthetic Response: Manager injects fake tool response so orchestrator continues

Troubleshooting

Issue Fix
Tool not intercepted Add to managed_tools set
No streaming Check yield event in worker loop
State not updating Set output_key on sub-agent
Orchestrator loops Verify FunctionResponse.id matches tool_call.id

About

A simple Test Repo for showcasing how to use the Orchestrator Pattern with streaming subagents who hand control back to the orchestrator after finishing

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages