diff --git a/.gitignore b/.gitignore index 0abdace2d..3666be24a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,7 @@ Brewfile.lock.json .DS_Store -examples/**/uv.lock \ No newline at end of file +examples/**/uv.lock + +# Claude workspace directories +.claude-workspace/ \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/.dockerignore b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/.gitignore b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/.gitignore new file mode 100644 index 000000000..4d50da2f0 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/.gitignore @@ -0,0 +1,5 @@ +# Local environment variables (contains secrets) +.env.local + +# Workspace directory (created at runtime) +workspace/ diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/Dockerfile b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/Dockerfile new file mode 100644 index 000000000..f5a592ebc --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/Dockerfile @@ -0,0 +1,53 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 060_open_ai_agents_sdk_hello_world/pyproject.toml /app/060_open_ai_agents_sdk_hello_world/pyproject.toml +COPY 060_open_ai_agents_sdk_hello_world/README.md /app/060_open_ai_agents_sdk_hello_world/README.md + +WORKDIR /app/060_open_ai_agents_sdk_hello_world + +# Copy the project code +COPY 060_open_ai_agents_sdk_hello_world/project /app/060_open_ai_agents_sdk_hello_world/project + +# Install the required Python packages from pyproject.toml +RUN uv pip install --system . + +WORKDIR /app/060_open_ai_agents_sdk_hello_world + + +ENV PYTHONPATH=/app +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/README.md b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/README.md new file mode 100644 index 000000000..2f40e53c1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/README.md @@ -0,0 +1,338 @@ +# Claude Agents SDK Integration with AgentEx + +Integration of Claude Agents SDK with AgentEx's Temporal-based orchestration platform. Claude agents run in durable workflows with real-time streaming to the AgentEx UI. + +> ⚠️ **Note**: This integration is designed for local agent development and single-worker deployments. For distributed multi-worker Kubernetes deployments, additional infrastructure is required (see [Deployment Considerations](#deployment-considerations) below). + +## Features + +- **Durable Execution** - Workflows survive restarts via Temporal's event sourcing (single-worker) +- **Session Resume** - Conversation context maintained across turns via `session_id` +- **Workspace Isolation** - Each task gets dedicated directory for file operations +- **Real-time Streaming** - Text and tool calls stream to UI via Redis +- **Tool Execution** - Read, Write, Edit, Bash, Grep, Glob with visibility in UI +- **Subagents** - Specialized agents (code-reviewer, file-organizer) with nested tracing +- **Cost Tracking** - Token usage and API costs logged per turn +- **Automatic Retries** - Temporal policies handle transient failures + +## How It Works + +### Architecture + +``` +┌─────────────────────────────────┐ +│ Temporal Workflow │ +│ - Stores session_id in state │ +│ - Tracks turn number │ +│ - Sets _task_id, _trace_id │ +└────────────┬────────────────────┘ + │ execute_activity + ↓ +┌─────────────────────────────────┐ +│ run_claude_agent_activity │ +│ - Reads context from ContextVar│ +│ - Configures Claude SDK │ +│ - Processes messages via hooks │ +│ - Returns session_id │ +└────────────┬────────────────────┘ + │ ClaudeSDKClient + ↓ +┌─────────────────────────────────┐ +│ Claude SDK │ +│ - Maintains session │ +│ - Calls Anthropic API │ +│ - Executes tools in workspace │ +│ - Triggers hooks │ +└─────────────────────────────────┘ +``` + +### Context Threading + +The integration reuses AgentEx's `ContextInterceptor` pattern (originally built for OpenAI): + +1. **Workflow** stores `_task_id`, `_trace_id`, `_parent_span_id` as instance variables +2. **ContextInterceptor (outbound)** reads these from workflow instance, injects into activity headers +3. **ContextInterceptor (inbound)** extracts from headers, sets `ContextVar` values +4. **Activity** reads `ContextVar` to get task_id for streaming + +This enables real-time streaming without breaking Temporal's determinism requirements. + +### Session Management + +Claude SDK sessions are preserved across turns: + +1. **First turn**: Claude SDK creates session, returns `session_id` in `SystemMessage` +2. **Message handler** extracts `session_id` from messages +3. **Activity** returns `session_id` to workflow +4. **Workflow** stores in `StateModel.claude_session_id` (Temporal checkpoints this) +5. **Next turn**: Pass `resume=session_id` to `ClaudeAgentOptions` +6. **Claude SDK** resumes session with full conversation history + +### Tool Streaming via Hooks + +Tool lifecycle events are handled by Claude SDK hooks: + +**PreToolUse Hook**: +- Called before tool execution +- Streams `ToolRequestContent` to UI → shows "Using tool: Write" +- Creates nested span for Task tool (subagents) + +**PostToolUse Hook**: +- Called after tool execution +- Streams `ToolResponseContent` to UI → shows "Used tool: Write" +- Closes subagent spans with results + +### Subagent Execution + +Subagents are defined as `AgentDefinition` objects passed to Claude SDK: + +```python +agents={ + 'code-reviewer': AgentDefinition( + description='Expert code review specialist...', + prompt='You are a code reviewer...', + tools=['Read', 'Grep', 'Glob'], # Read-only + model='sonnet', + ) +} +``` + +When Claude uses the Task tool, the SDK routes to the appropriate subagent based on description matching. Subagent execution is tracked via nested tracing spans. + +## Code Structure + +``` +claude_agents/ +├── __init__.py # Public exports +├── activities.py # Temporal activities +│ ├── create_workspace_directory +│ └── run_claude_agent_activity +├── message_handler.py # Message processing +│ └── ClaudeMessageHandler +│ ├── Streams text blocks +│ ├── Extracts session_id +│ └── Extracts usage/cost +└── hooks/ + └── hooks.py # Claude SDK hooks + └── TemporalStreamingHooks + ├── pre_tool_use + └── post_tool_use +``` + +## Deployment Considerations + +This integration works well for local development and single-worker deployments. For distributed multi-worker production deployments, consider the following: + +### ⚠️ Session Persistence (Multi-Worker) + +**Current behavior**: Claude SDK sessions are tied to the worker process. + +- **Local dev**: ✅ Works - session persists within single worker +- **K8s multi-pod**: ⚠️ Session ID stored in Temporal state, but session itself lives in Claude CLI process +- **Impact**: If task moves to different pod, session becomes invalid +- **Infrastructure needed**: Session persistence layer or sticky routing to same pod + +### ⚠️ Workspace Storage (Multi-Worker) + +**Current behavior**: Workspaces are local directories (`./workspace/{task_id}`). + +- **Local dev**: ✅ Works - single worker accesses all files +- **K8s multi-pod**: ⚠️ Each pod has isolated filesystem +- **Impact**: Files created by one pod are invisible to other pods +- **Infrastructure needed**: Shared storage (NFS, EFS, GCS Fuse) via `CLAUDE_WORKSPACE_ROOT` env var + +**Solution for production**: +```bash +# Mount shared filesystem (NFS, EFS, etc.) to all pods +export CLAUDE_WORKSPACE_ROOT=/mnt/shared/workspaces + +# All workers will now share workspace access +``` + +### ℹ️ Filesystem-Based Configuration + +**Current approach**: Agents and configuration are defined programmatically in code. + +- **Not used**: `.claude/agents/`, `.claude/skills/`, `CLAUDE.md` files +- **Why**: Aligns with AgentEx's code-as-configuration philosophy +- **Trade-off**: More explicit and version-controlled, but can't leverage existing Claude configs +- **To enable**: Would need to add `setting_sources=["project"]` to `ClaudeAgentOptions` + +**Current approach** (programmatic config in workflow.py): +```python +subagents = { + 'code-reviewer': AgentDefinition( + description='...', + prompt='...', + tools=['Read', 'Grep', 'Glob'], + model='sonnet', + ), +} +``` + +--- + +**Summary**: The integration is production-ready for **single-worker deployments**. Multi-worker deployments require additional infrastructure for session persistence and workspace sharing. + +## Quick Start + +### Prerequisites + +- Temporal server (localhost:7233) +- Redis (localhost:6379) +- Anthropic API key + +### Run + +```bash +# Install +rye sync --all-features + +# Configure +export ANTHROPIC_API_KEY="your-key" +export REDIS_URL="redis://localhost:6379" +export TEMPORAL_ADDRESS="localhost:7233" + +# Run from repository root +uv run agentex agents run --manifest examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/manifest.yaml +``` + +## Example Interactions + +### Context Preservation + +``` +User: "Your name is Jose" +Claude: "Nice to meet you! I'm Jose..." + +User: "What name did I assign to you?" +Claude: "You asked me to go by Jose!" ← Remembers context +``` + +### Tool Usage + +``` +User: "Create a hello.c file with Hello World" +Claude: *streams response* +[Tool card appears: "Using tool: Write"] +[Tool card updates: "Used tool: Write"] +"Done! I've created hello.c..." +``` + +### Subagents + +``` +User: "Review the code quality in hello.c" +Claude: *delegates to code-reviewer* +[Tool card: "Using tool: Task" with subagent_type: "code-reviewer"] +[Traces view shows: "Subagent: code-reviewer" nested under turn] +``` + +## Behind the Scenes + +### Message Flow + +When a user sends a message: + +1. **Signal received** (`on_task_event_send`) - Workflow increments turn, echoes message +2. **Span created** - Tracing span wraps turn, stores `parent_span_id` for interceptor +3. **Activity called** - Workflow passes prompt, workspace, session_id, subagent defs +4. **Context threaded** - Interceptor injects task_id/trace_id into activity headers +5. **Activity starts** - Reads context from ContextVar, creates hooks +6. **Claude executes** - SDK uses hooks to stream tools, message_handler streams text +7. **Results returned** - Activity returns session_id, usage, cost +8. **State updated** - Workflow stores session_id for next turn + +### Streaming Pipeline + +**Text streaming**: +``` +Claude SDK → TextBlock → ClaudeMessageHandler._handle_text_block() +→ TextDelta → adk.streaming.stream_update() +→ Redis XADD → AgentEx UI +``` + +**Tool streaming**: +``` +Claude SDK → PreToolUse hook → ToolRequestContent +→ adk.streaming (via hook) → Redis → UI ("Using tool...") + +Tool executes... + +Claude SDK → PostToolUse hook → ToolResponseContent +→ adk.streaming (via hook) → Redis → UI ("Used tool...") +``` + +### Subagent Tracing + +When Task tool is detected in PreToolUse hook: + +```python +# Create nested span +span_ctx = adk.tracing.span( + trace_id=trace_id, + parent_id=parent_span_id, + name=f"Subagent: {subagent_type}", + input=tool_input, +) +span = await span_ctx.__aenter__() + +# Store for PostToolUse to close +self.subagent_spans[tool_use_id] = (span_ctx, span) +``` + +In PostToolUse hook, the span is closed with results, creating a complete nested trace. + +## Key Implementation Details + +### Temporal Determinism + +- **File I/O in activities**: `create_workspace_directory` is an activity (not workflow code) +- **Message iteration completes**: Use `receive_response()` (not `receive_messages()`) +- **State is serializable**: `StateModel` uses Pydantic BaseModel + +### AgentDefinition Serialization + +Temporal serializes activity arguments to JSON. AgentDefinition dataclasses become dicts, so the activity reconstructs them: + +```python +agent_defs = { + name: AgentDefinition(**agent_data) + for name, agent_data in agents.items() +} +``` + +### Hook Callback Signatures + +Claude SDK expects specific signatures: + +```python +async def pre_tool_use( + input_data: dict[str, Any], # Contains tool_name, tool_input + tool_use_id: str | None, # Unique ID for this call + context: Any, # HookContext (currently unused) +) -> dict[str, Any]: # Return {} to allow, or modify behavior +``` + +## Comparison with OpenAI Integration + +| Aspect | OpenAI | Claude | +|--------|--------|--------| +| **Plugin** | `OpenAIAgentsPlugin` (official) | Manual activity wrapper | +| **Streaming** | Token-level deltas | Message block-level | +| **Tool Results** | `ToolResultBlock` | `UserMessage` (with acceptEdits) | +| **Hooks** | `RunHooks` class | `HookMatcher` with callbacks | +| **Context Threading** | ContextInterceptor | ContextInterceptor (reused!) | +| **Subagents** | Agent handoffs | AgentDefinition config | + +## Notes + +**Message Block Streaming**: Claude SDK returns complete text blocks, not individual tokens. Text appears instantly rather than animating character-by-character. This is inherent to Claude SDK's API design. + +**In-Process Subagents**: Subagents run within Claude SDK via config-based routing, not as separate Temporal workflows. This is by design - subagents are specializations, not independent agents. + +**Manual Activity Calls**: Unlike OpenAI which has an official Temporal plugin, Claude integration requires explicit `workflow.execute_activity()` calls. A future enhancement could create an automatic plugin. + +## License + +Apache 2.0 (same as AgentEx SDK) diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/manifest.yaml b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/manifest.yaml new file mode 100644 index 000000000..3af65eede --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/manifest.yaml @@ -0,0 +1,59 @@ +kind: Agent + +# Build Configuration +build: + context: + root: ../ + include_paths: + - 090_claude_agents_sdk_mvp + dockerfile: 090_claude_agents_sdk_mvp/Dockerfile + dockerignore: 090_claude_agents_sdk_mvp/.dockerignore + +# Local Development Configuration +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + worker: project/run_worker.py + +# Agent Configuration +agent: + acp_type: async + name: claude-mvp-agent + description: Claude Agents SDK MVP - proof of concept integration with AgentEx + + temporal: + enabled: true + workflows: + - name: ClaudeMvpWorkflow + queue_name: claude-mvp-queue + + credentials: + - env_var_name: ANTHROPIC_API_KEY + secret_name: anthropic-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + +# Deployment Configuration +deployment: + image: + repository: "" + tag: "latest" + imagePullSecrets: + - name: my-registry-secret + global: + agent: + name: "claude-mvp-agent" + description: "Claude Agents SDK MVP" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/acp.py b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/acp.py new file mode 100644 index 000000000..fcdbba155 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/acp.py @@ -0,0 +1,72 @@ +import os +import sys + +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +# === DEBUG SETUP (AgentEx CLI Debug Support) === +if os.getenv("AGENTEX_DEBUG_ENABLED") == "true": + try: + import debugpy + debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5679")) + debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "acp") + wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true" + + # Configure debugpy + debugpy.configure(subProcess=False) + debugpy.listen(debug_port) + + print(f"🐛 [{debug_type.upper()}] Debug server listening on port {debug_port}") + + if wait_for_attach: + print(f"⏳ [{debug_type.upper()}] Waiting for debugger to attach...") + debugpy.wait_for_client() + print(f"✅ [{debug_type.upper()}] Debugger attached!") + else: + print(f"📡 [{debug_type.upper()}] Ready for debugger attachment") + + except ImportError: + print("❌ debugpy not available. Install with: pip install debugpy") + sys.exit(1) + except Exception as e: + print(f"❌ Debug setup failed: {e}") + sys.exit(1) +# === END DEBUG SETUP === + +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor + +context_interceptor = ContextInterceptor() +temporal_streaming_model_provider = TemporalStreamingModelProvider() + +# Create the ACP server +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + # When deployed to the cluster, the Temporal address will automatically be set to the cluster address + # For local development, we set the address manually to talk to the local Temporal service set up via docker compose + # We are also adding the Open AI Agents SDK plugin to the ACP. + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)], + interceptors=[context_interceptor] + ) +) + + +# Notice that we don't need to register any handlers when we use type="temporal" +# If you look at the code in agentex.sdk.fastacp.impl.temporal_acp +# You can see that these handlers are automatically registered when the ACP is created + +# @acp.on_task_create +# This will be handled by the method in your workflow that is decorated with @workflow.run + +# @acp.on_task_event_send +# This will be handled by the method in your workflow that is decorated with @workflow.signal(name=SignalName.RECEIVE_MESSAGE) + +# @acp.on_task_cancel +# This does not need to be handled by your workflow. +# It is automatically handled by the temporal client which cancels the workflow directly \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/run_worker.py b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/run_worker.py new file mode 100644 index 000000000..a969cd760 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/run_worker.py @@ -0,0 +1,85 @@ +"""Claude MVP Worker - Minimal setup + +This worker demonstrates the minimal setup needed to run Claude agents +in AgentEx's Temporal architecture. + +Key components: +- ClaudeSDKClient activity (run_claude_agent_activity) +- ContextInterceptor (reused from OpenAI - threads task_id) +- Standard AgentEx activities (messages, streaming, tracing) +""" + +import os +import asyncio + +# Import workflow +from project.workflow import ClaudeMvpWorkflow + +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +# Import Claude components +from agentex.lib.core.temporal.plugins.claude_agents import ( + ContextInterceptor, # Reuse from OpenAI! + run_claude_agent_activity, + create_workspace_directory, +) + +logger = make_logger(__name__) + + +async def main(): + """Start the Claude MVP worker""" + + environment_variables = EnvironmentVariables.refresh() + + logger.info("=" * 80) + logger.info("CLAUDE MVP WORKER STARTING") + logger.info("=" * 80) + logger.info(f"Workflow: {environment_variables.WORKFLOW_NAME}") + logger.info(f"Task Queue: {environment_variables.WORKFLOW_TASK_QUEUE}") + logger.info(f"Temporal Address: {environment_variables.TEMPORAL_ADDRESS}") + logger.info(f"Redis URL: {environment_variables.REDIS_URL}") + logger.info(f"Workspace Root: {environment_variables.CLAUDE_WORKSPACE_ROOT}") + logger.info(f"ANTHROPIC_API_KEY: {'SET' if os.environ.get('ANTHROPIC_API_KEY') else 'NOT SET (will fail when activity runs)'}") + + # Get all standard AgentEx activities + activities = get_all_activities() + + # Add Claude-specific activities + activities.append(run_claude_agent_activity) + activities.append(create_workspace_directory) + + logger.info(f"Registered {len(activities)} activities (including Claude activity)") + + # Create context interceptor (reuse from OpenAI!) + context_interceptor = ContextInterceptor() + + # Create worker with interceptor + worker = AgentexWorker( + task_queue=environment_variables.WORKFLOW_TASK_QUEUE, + interceptors=[context_interceptor], # Threads task_id to activities! + plugins=[], # No plugin for MVP - manual activity wrapping + ) + + logger.info("=" * 80) + logger.info("🚀 WORKER READY - Listening for tasks...") + logger.info("=" * 80) + + # Run worker + await worker.run( + activities=activities, + workflow=ClaudeMvpWorkflow, + ) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("\n🛑 Worker stopped by user") + except Exception as e: + logger.error(f"❌ Worker failed: {e}", exc_info=True) + raise diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py new file mode 100644 index 000000000..c22045152 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py @@ -0,0 +1,240 @@ +"""Claude Agents SDK MVP - Minimal working example + +This workflow demonstrates the basic integration pattern between Claude Agents SDK +and AgentEx's Temporal architecture. + +What this proves: +- ✅ Claude agent runs in Temporal workflow +- ✅ File operations isolated to workspace +- ✅ Basic text streaming to UI +- ✅ Visible in Temporal UI as activities +- ✅ Temporal retry policies work + +What's missing (see NEXT_STEPS.md): +- Tool call streaming +- Proper plugin architecture +- Subagents +- Tracing +""" +from __future__ import annotations + +import os +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy +from claude_agent_sdk.types import AgentDefinition + +from agentex.lib import adk +from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.utils.model_utils import BaseModel +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow + +# Import Claude activities +from agentex.lib.core.temporal.plugins.claude_agents import ( + run_claude_agent_activity, + create_workspace_directory, +) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") + +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +class StateModel(BaseModel): + """Workflow state for Claude session tracking + + Stores Claude session ID to maintain conversation context across turns. + This allows Claude to remember previous messages and answer follow-up questions. + """ + claude_session_id: str | None = None + turn_number: int = 0 + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class ClaudeMvpWorkflow(BaseWorkflow): + """Minimal Claude agent workflow - MVP v0 + + This workflow: + 1. Creates isolated workspace for task + 2. Receives user messages via signals + 3. Runs Claude via Temporal activity + 4. Returns responses to user + + Key features: + - Durable execution (survives restarts) + - Workspace isolation + - Automatic retries + - Visible in Temporal UI + """ + + def __init__(self): + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._state: StateModel | None = None + self._task_id = None + self._trace_id = None + self._parent_span_id = None + self._workspace_path = None + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams): + """Handle user message - run Claude agent""" + + logger.info(f"Received task message: {params.event.content.content[:100]}...") + + if self._state is None: + raise ValueError("State is not initialized") + + self._task_id = params.task.id + self._trace_id = params.task.id + self._state.turn_number += 1 + + # Echo user message to UI + await adk.messages.create( + task_id=params.task.id, + content=params.event.content + ) + + # Wrap in tracing span - THIS IS REQUIRED for ContextInterceptor to work! + async with adk.tracing.span( + trace_id=params.task.id, + name=f"Turn {self._state.turn_number}", + input={ + "prompt": params.event.content.content, + "session_id": self._state.claude_session_id, + }, + ) as span: + self._parent_span_id = span.id if span else None + + try: + # Define subagents for specialized tasks + subagents = { + 'code-reviewer': AgentDefinition( + description='Expert code review specialist. Use when analyzing code quality, security, or best practices.', + prompt='You are a code review expert. Analyze code for bugs, security issues, and best practices. Be thorough but concise.', + tools=['Read', 'Grep', 'Glob'], # Read-only + model='sonnet', + ), + 'file-organizer': AgentDefinition( + description='File organization specialist. Use when creating multiple files or organizing project layout.', + prompt='You are a file organization expert. Create well-structured projects with clear naming.', + tools=['Write', 'Read', 'Bash', 'Glob'], + model='haiku', # Faster model + ), + } + + # Run Claude via activity (manual wrapper for MVP) + # ContextInterceptor reads _task_id, _trace_id, _parent_span_id and threads to activity! + result = await workflow.execute_activity( + run_claude_agent_activity, + args=[ + params.event.content.content, # prompt + self._workspace_path, # workspace + ["Read", "Write", "Edit", "Bash", "Grep", "Glob", "Task"], # allowed tools (Task for subagents!) + "acceptEdits", # permission mode + "You are a helpful coding assistant. Be concise.", # system prompt + self._state.claude_session_id, # resume session for context! + subagents, # subagent definitions! + ], + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(seconds=10), + backoff_coefficient=2.0, + ), + ) + + # Update session_id for next turn (maintains conversation context) + new_session_id = result.get("session_id") + if new_session_id: + self._state.claude_session_id = new_session_id + logger.info( + f"Turn {self._state.turn_number}: " + f"session_id={'STARTED' if self._state.turn_number == 1 else 'CONTINUED'} " + f"({new_session_id[:16]}...)" + ) + else: + logger.warning(f"No session_id returned - context may not persist") + + # Response already streamed to UI by activity - no need to send again + logger.debug(f"Turn {self._state.turn_number} completed successfully") + + except Exception as e: + logger.error(f"Error running Claude agent: {e}", exc_info=True) + # Send error message to user + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=f"❌ Error: {str(e)}", + ) + ) + raise + + @workflow.run + async def on_task_create(self, params: CreateTaskParams): + """Initialize workflow - create workspace and send welcome""" + + logger.info(f"Creating Claude MVP workflow for task: {params.task.id}") + + # Initialize state with session tracking + self._state = StateModel( + claude_session_id=None, + turn_number=0, + ) + + # Create workspace via activity (avoids determinism issues with file I/O) + workspace_root = os.environ.get("CLAUDE_WORKSPACE_ROOT") + self._workspace_path = await workflow.execute_activity( + create_workspace_directory, + args=[params.task.id, workspace_root], + start_to_close_timeout=timedelta(seconds=10), + ) + + logger.info(f"Workspace ready: {self._workspace_path}") + + # Send welcome message + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + "🚀 **Claude MVP Agent Ready!**\n\n" + f"Workspace: `{self._workspace_path}`\n\n" + "I'm powered by Claude Agents SDK + Temporal. Try asking me to:\n" + "- Create files: *'Create a hello.py file'*\n" + "- Read files: *'What's in hello.py?'*\n" + "- Run commands: *'List files in the workspace'*\n\n" + "Send me a message to get started! 💬" + ), + format="markdown", + ) + ) + + # Wait for completion signal + logger.info("Waiting for task completion...") + await workflow.wait_condition( + lambda: self._complete_task, + timeout=None, # Long-running workflow + ) + + logger.info("Claude MVP workflow completed") + return "Task completed successfully" + + @workflow.signal + async def complete_task_signal(self): + """Signal to gracefully complete the workflow""" + logger.info("Received complete_task signal") + self._complete_task = True diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/workspace/.gitignore b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/workspace/.gitignore new file mode 100644 index 000000000..3b65a4661 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/workspace/.gitignore @@ -0,0 +1,4 @@ +# Ignore all files in workspace directory +# Each task gets its own subdirectory here +* +!.gitignore diff --git a/pyproject.toml b/pyproject.toml index 2711ac205..543c4e59b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dependencies = [ "datadog>=0.52.1", "ddtrace>=3.13.0", "yaspin>=3.1.0", + "claude-agent-sdk>=0.1.0", + "anthropic>=0.40.0", ] requires-python = ">= 3.12,<4" classifiers = [ diff --git a/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py b/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py new file mode 100644 index 000000000..6f8a7c413 --- /dev/null +++ b/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py @@ -0,0 +1,72 @@ +"""Claude Agents SDK integration with Temporal. + +This plugin provides integration between Claude Agents SDK and AgentEx's +Temporal-based orchestration platform. + +Features: +- Temporal activity wrapper for Claude SDK calls +- Real-time streaming to Redis/UI +- Session resume for conversation context +- Tool call visibility (Read, Write, Bash, etc.) +- Subagent support with nested tracing +- Workspace isolation per task + +Architecture: +- activities.py: Temporal activity definitions +- message_handler.py: Message parsing and streaming logic +- Reuses OpenAI's ContextInterceptor for context threading + +Usage: + from agentex.lib.core.temporal.plugins.claude_agents import ( + run_claude_agent_activity, + create_workspace_directory, + ContextInterceptor, + ) + + # In worker + worker = AgentexWorker( + task_queue=queue_name, + interceptors=[ContextInterceptor()], + ) + + activities = get_all_activities() + activities.extend([run_claude_agent_activity, create_workspace_directory]) + + await worker.run(activities=activities, workflow=YourWorkflow) +""" + +from agentex.lib.core.temporal.plugins.claude_agents.hooks import ( + TemporalStreamingHooks, + create_streaming_hooks, +) +from agentex.lib.core.temporal.plugins.claude_agents.activities import ( + run_claude_agent_activity, + create_workspace_directory, +) +from agentex.lib.core.temporal.plugins.claude_agents.message_handler import ( + ClaudeMessageHandler, +) + +# Reuse OpenAI's context threading - this is the key to streaming! +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ( + ContextInterceptor, + streaming_task_id, + streaming_trace_id, + streaming_parent_span_id, +) + +__all__ = [ + # Activities + "run_claude_agent_activity", + "create_workspace_directory", + # Message handling + "ClaudeMessageHandler", + # Hooks + "create_streaming_hooks", + "TemporalStreamingHooks", + # Context threading (reused from OpenAI) + "ContextInterceptor", + "streaming_task_id", + "streaming_trace_id", + "streaming_parent_span_id", +] diff --git a/src/agentex/lib/core/temporal/plugins/claude_agents/activities.py b/src/agentex/lib/core/temporal/plugins/claude_agents/activities.py new file mode 100644 index 000000000..ccd6a9f94 --- /dev/null +++ b/src/agentex/lib/core/temporal/plugins/claude_agents/activities.py @@ -0,0 +1,154 @@ +"""Temporal activities for Claude Agents SDK integration.""" + +from __future__ import annotations + +import os +from typing import Any + +from temporalio import activity +from claude_agent_sdk import AgentDefinition, ClaudeSDKClient, ClaudeAgentOptions + +from agentex.lib.utils.logging import make_logger +from agentex.lib.core.temporal.plugins.claude_agents.hooks import create_streaming_hooks +from agentex.lib.core.temporal.plugins.claude_agents.message_handler import ClaudeMessageHandler +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ( + streaming_task_id, + streaming_trace_id, + streaming_parent_span_id, +) + +logger = make_logger(__name__) + + +@activity.defn +async def create_workspace_directory(task_id: str, workspace_root: str | None = None) -> str: + """Create workspace directory for task - runs as Temporal activity + + Args: + task_id: Task ID for workspace directory name + workspace_root: Root directory for workspaces (defaults to .claude-workspace/ in cwd) + + Returns: + Absolute path to created workspace + """ + if workspace_root is None: + # Default to .claude-workspace in current directory + # Follows Claude SDK's .claude/ convention + workspace_root = os.path.join(os.getcwd(), ".claude-workspace") + + workspace_path = os.path.join(workspace_root, task_id) + os.makedirs(workspace_path, exist_ok=True) + logger.info(f"Created workspace: {workspace_path}") + return workspace_path + + +@activity.defn(name="run_claude_agent_activity") +async def run_claude_agent_activity( + prompt: str, + workspace_path: str, + allowed_tools: list[str], + permission_mode: str = "acceptEdits", + system_prompt: str | None = None, + resume_session_id: str | None = None, + agents: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Execute Claude SDK - wrapped in Temporal activity + + This activity: + 1. Gets task_id from ContextVar (set by ContextInterceptor) + 2. Configures Claude with workspace isolation and session resume + 3. Runs Claude SDK and processes messages via ClaudeMessageHandler + 4. Streams messages to UI in real-time + 5. Returns session_id, usage, and cost for next turn + + Args: + prompt: User message to send to Claude + workspace_path: Directory for file operations (cwd) + allowed_tools: List of tools Claude can use (include "Task" for subagents) + permission_mode: Permission mode (default: acceptEdits) + system_prompt: Optional system prompt override + resume_session_id: Optional session ID to resume conversation context + agents: Optional dict of subagent definitions for Task tool + + Returns: + dict with "messages", "session_id", "usage", and "cost_usd" keys + """ + + # Get streaming context from ContextVars (set by interceptor) + task_id = streaming_task_id.get() + trace_id = streaming_trace_id.get() + parent_span_id = streaming_parent_span_id.get() + + logger.info( + f"[run_claude_agent_activity] Starting - " + f"task_id={task_id}, workspace={workspace_path}, tools={allowed_tools}, " + f"resume={'YES' if resume_session_id else 'NO (new session)'}, " + f"subagents={list(agents.keys()) if agents else 'NONE'}" + ) + + # Reconstruct AgentDefinition objects from serialized dicts + # Temporal serializes dataclasses to dicts, need to recreate them + agent_defs = None + if agents: + agent_defs = {} + for name, agent_data in agents.items(): + if isinstance(agent_data, AgentDefinition): + agent_defs[name] = agent_data + else: + # Reconstruct from dict + agent_defs[name] = AgentDefinition( + description=agent_data.get('description', ''), + prompt=agent_data.get('prompt', ''), + tools=agent_data.get('tools'), + model=agent_data.get('model'), + ) + + # Create hooks for streaming tool calls and subagent execution + hooks = create_streaming_hooks( + task_id=task_id, + trace_id=trace_id, + parent_span_id=parent_span_id, + ) + + # Configure Claude with workspace isolation, session resume, subagents, and hooks + options = ClaudeAgentOptions( + cwd=workspace_path, + allowed_tools=allowed_tools, + permission_mode=permission_mode, # type: ignore + system_prompt=system_prompt, + resume=resume_session_id, + agents=agent_defs, + hooks=hooks, # Tool lifecycle hooks for streaming! + ) + + # Create message handler for streaming + handler = ClaudeMessageHandler( + task_id=task_id, + trace_id=trace_id, + parent_span_id=parent_span_id, + ) + + # Run Claude and process messages + try: + await handler.initialize() + + async with ClaudeSDKClient(options=options) as client: + await client.query(prompt) + + # Use receive_response() instead of receive_messages() + # receive_response() yields messages until ResultMessage, then stops + # receive_messages() is infinite and never completes! + async for message in client.receive_response(): + await handler.handle_message(message) + + logger.debug(f"Message loop completed, cleaning up...") + await handler.cleanup() + + results = handler.get_results() + logger.debug(f"Returning results with keys: {results.keys()}") + return results + + except Exception as e: + logger.error(f"[run_claude_agent_activity] Error: {e}", exc_info=True) + await handler.cleanup() + raise diff --git a/src/agentex/lib/core/temporal/plugins/claude_agents/hooks/__init__.py b/src/agentex/lib/core/temporal/plugins/claude_agents/hooks/__init__.py new file mode 100644 index 000000000..39c086515 --- /dev/null +++ b/src/agentex/lib/core/temporal/plugins/claude_agents/hooks/__init__.py @@ -0,0 +1,11 @@ +"""Claude SDK hooks for streaming lifecycle events to AgentEx UI.""" + +from agentex.lib.core.temporal.plugins.claude_agents.hooks.hooks import ( + TemporalStreamingHooks, + create_streaming_hooks, +) + +__all__ = [ + "create_streaming_hooks", + "TemporalStreamingHooks", +] diff --git a/src/agentex/lib/core/temporal/plugins/claude_agents/hooks/hooks.py b/src/agentex/lib/core/temporal/plugins/claude_agents/hooks/hooks.py new file mode 100644 index 000000000..5f629fc17 --- /dev/null +++ b/src/agentex/lib/core/temporal/plugins/claude_agents/hooks/hooks.py @@ -0,0 +1,212 @@ +"""Claude SDK hooks for streaming tool calls and subagent execution to AgentEx UI. + +This module provides hook callbacks that integrate with Claude SDK's hooks system +to stream tool execution lifecycle events in real-time. +""" + +from __future__ import annotations + +from typing import Any + +from claude_agent_sdk import HookMatcher + +from agentex.lib import adk +from agentex.lib.utils.logging import make_logger +from agentex.types.task_message_update import StreamTaskMessageFull +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + +logger = make_logger(__name__) + + +class TemporalStreamingHooks: + """Hooks for streaming Claude SDK lifecycle events to AgentEx UI. + + Implements Claude SDK hook callbacks: + - PreToolUse: Called before tool execution → stream tool request + - PostToolUse: Called after tool execution → stream tool result + + Also handles subagent detection and nested tracing. + """ + + def __init__( + self, + task_id: str | None, + trace_id: str | None = None, + parent_span_id: str | None = None, + ): + """Initialize streaming hooks. + + Args: + task_id: AgentEx task ID for routing streams + trace_id: Trace ID for nested spans + parent_span_id: Parent span ID for subagent spans + """ + self.task_id = task_id + self.trace_id = trace_id + self.parent_span_id = parent_span_id + + # Track active subagent spans + self.subagent_spans: dict[str, Any] = {} # tool_call_id → (ctx, span) + + async def pre_tool_use( + self, + input_data: dict[str, Any], + tool_use_id: str | None, + _context: Any, + ) -> dict[str, Any]: + """Hook called before tool execution. + + Args: + input_data: Contains tool_name, tool_input from Claude SDK + tool_use_id: Unique ID for this tool call + context: Hook context from Claude SDK + + Returns: + Empty dict (allow execution to proceed) + """ + if not self.task_id or not tool_use_id: + return {} + + tool_name = input_data.get("tool_name", "unknown") + tool_input = input_data.get("tool_input", {}) + + logger.info(f"🔧 Tool request: {tool_name}") + + # Special handling for Task tool (subagents) - create nested span + if tool_name == "Task" and self.trace_id and self.parent_span_id: + subagent_type = tool_input.get("subagent_type", "unknown") + logger.info(f"🤖 Subagent started: {subagent_type}") + + # Create nested trace span for subagent + subagent_ctx = adk.tracing.span( + trace_id=self.trace_id, + parent_id=self.parent_span_id, + name=f"Subagent: {subagent_type}", + input=tool_input, + ) + subagent_span = await subagent_ctx.__aenter__() + self.subagent_spans[tool_use_id] = (subagent_ctx, subagent_span) + + # Stream tool request to UI + try: + async with adk.streaming.streaming_task_message_context( + task_id=self.task_id, + initial_content=ToolRequestContent( + author="agent", + name=tool_name, + arguments=tool_input, + tool_call_id=tool_use_id, + ) + ) as tool_ctx: + await tool_ctx.stream_update( + StreamTaskMessageFull( + parent_task_message=tool_ctx.task_message, + content=ToolRequestContent( + author="agent", + name=tool_name, + arguments=tool_input, + tool_call_id=tool_use_id, + ), + type="full" + ) + ) + except Exception as e: + logger.warning(f"Failed to stream tool request: {e}") + + return {} # Allow execution + + async def post_tool_use( + self, + input_data: dict[str, Any], + tool_use_id: str | None, + _context: Any, + ) -> dict[str, Any]: + """Hook called after tool execution. + + Args: + input_data: Contains tool_name, tool_output from Claude SDK + tool_use_id: Unique ID for this tool call + context: Hook context from Claude SDK + + Returns: + Empty dict + """ + if not self.task_id or not tool_use_id: + return {} + + tool_name = input_data.get("tool_name", "unknown") + tool_output = input_data.get("tool_output", "") + + logger.info(f"✅ Tool result: {tool_name}") + + # If this was a subagent, close the nested span + if tool_use_id in self.subagent_spans: + subagent_ctx, subagent_span = self.subagent_spans[tool_use_id] + subagent_span.output = {"result": tool_output} + await subagent_ctx.__aexit__(None, None, None) + logger.info(f"🤖 Subagent completed: {tool_name}") + del self.subagent_spans[tool_use_id] + + # Stream tool response to UI + try: + async with adk.streaming.streaming_task_message_context( + task_id=self.task_id, + initial_content=ToolResponseContent( + author="agent", + name=tool_name, + content=tool_output, + tool_call_id=tool_use_id, + ) + ) as tool_ctx: + await tool_ctx.stream_update( + StreamTaskMessageFull( + parent_task_message=tool_ctx.task_message, + content=ToolResponseContent( + author="agent", + name=tool_name, + content=tool_output, + tool_call_id=tool_use_id, + ), + type="full" + ) + ) + except Exception as e: + logger.warning(f"Failed to stream tool response: {e}") + + return {} + + +def create_streaming_hooks( + task_id: str | None, + trace_id: str | None = None, + parent_span_id: str | None = None, +) -> dict[str, list[HookMatcher]]: + """Create Claude SDK hooks configuration for streaming. + + Returns hooks dict suitable for ClaudeAgentOptions(hooks=...). + + Args: + task_id: AgentEx task ID for streaming + trace_id: Trace ID for nested spans + parent_span_id: Parent span ID for subagent spans + + Returns: + Dict with PreToolUse and PostToolUse hook configurations + """ + hooks_instance = TemporalStreamingHooks(task_id, trace_id, parent_span_id) + + return { + "PreToolUse": [ + HookMatcher( + matcher=None, # Match all tools + hooks=[hooks_instance.pre_tool_use] + ) + ], + "PostToolUse": [ + HookMatcher( + matcher=None, # Match all tools + hooks=[hooks_instance.post_tool_use] + ) + ], + } diff --git a/src/agentex/lib/core/temporal/plugins/claude_agents/message_handler.py b/src/agentex/lib/core/temporal/plugins/claude_agents/message_handler.py new file mode 100644 index 000000000..c0d414a23 --- /dev/null +++ b/src/agentex/lib/core/temporal/plugins/claude_agents/message_handler.py @@ -0,0 +1,178 @@ +"""Message handling and streaming for Claude Agents SDK. + +Simplified message handler that focuses on: +- Streaming text content to UI +- Extracting session_id for conversation continuity +- Extracting usage and cost information + +Tool requests/responses are handled by Claude SDK hooks (see hooks/hooks.py). +""" + +from __future__ import annotations + +from typing import Any + +from claude_agent_sdk import ( + TextBlock, + ResultMessage, + SystemMessage, + AssistantMessage, +) + +from agentex.lib import adk +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import StreamTaskMessageDelta + +logger = make_logger(__name__) + + +class ClaudeMessageHandler: + """Handles Claude SDK messages and streams them to AgentEx UI. + + Simplified handler focused on: + - Streaming text blocks to UI + - Extracting session_id from SystemMessage/ResultMessage + - Extracting usage and cost from ResultMessage + - Serializing responses for Temporal + + Note: Tool lifecycle events (requests/responses) are handled by + TemporalStreamingHooks, not this class. + """ + + def __init__( + self, + task_id: str | None, + trace_id: str | None, + parent_span_id: str | None, + ): + self.task_id = task_id + self.trace_id = trace_id + self.parent_span_id = parent_span_id + + # Message tracking + self.messages: list[Any] = [] + self.serialized_messages: list[dict] = [] + + # Streaming context for text + self.streaming_ctx = None + + # Result data + self.session_id: str | None = None + self.usage_info: dict | None = None + self.cost_info: float | None = None + + async def initialize(self): + """Initialize streaming context if task_id is available.""" + if self.task_id: + logger.debug(f"Creating streaming context for task: {self.task_id}") + self.streaming_ctx = await adk.streaming.streaming_task_message_context( + task_id=self.task_id, + initial_content=TextContent( + author="agent", + content="", + format="markdown" + ) + ).__aenter__() + + async def handle_message(self, message: Any): + """Process a single message from Claude SDK.""" + self.messages.append(message) + msg_num = len(self.messages) + + # Debug logging (verbose - only for troubleshooting) + logger.debug(f"📨 [{msg_num}] Message type: {type(message).__name__}") + if isinstance(message, AssistantMessage): + block_types = [type(b).__name__ for b in message.content] + logger.debug(f" [{msg_num}] Content blocks: {block_types}") + + # Route to specific handlers + # Note: Tool requests/responses are handled by hooks, not here! + if isinstance(message, AssistantMessage): + await self._handle_assistant_message(message, msg_num) + elif isinstance(message, SystemMessage): + await self._handle_system_message(message) + elif isinstance(message, ResultMessage): + await self._handle_result_message(message) + + async def _handle_assistant_message(self, message: AssistantMessage, _msg_num: int): + """Handle AssistantMessage - contains text blocks. + + Note: Tool calls (ToolUseBlock/ToolResultBlock) are handled by hooks, not here. + We only process TextBlock for streaming text to UI. + """ + # Stream text blocks to UI + for block in message.content: + if isinstance(block, TextBlock): + await self._handle_text_block(block) + + # Collect text for final response + text_content = [] + for block in message.content: + if isinstance(block, TextBlock): + text_content.append(block.text) + + if text_content: + self.serialized_messages.append({ + "role": "assistant", + "content": "\n".join(text_content) + }) + + async def _handle_text_block(self, block: TextBlock): + """Handle text content block.""" + if not block.text or not self.streaming_ctx: + return + + logger.debug(f"💬 Text block: {block.text[:50]}...") + + delta = TextDelta(type="text", text_delta=block.text) + + try: + await self.streaming_ctx.stream_update( + StreamTaskMessageDelta( + parent_task_message=self.streaming_ctx.task_message, + delta=delta, + type="delta" + ) + ) + except Exception as e: + logger.warning(f"Failed to stream text delta: {e}") + + async def _handle_system_message(self, message: SystemMessage): + """Handle system message - extract session_id.""" + if message.subtype == "init": + self.session_id = message.data.get("session_id") + logger.debug(f"Session initialized: {self.session_id[:16] if self.session_id else 'unknown'}...") + else: + logger.debug(f"SystemMessage: {message.subtype}") + + async def _handle_result_message(self, message: ResultMessage): + """Handle result message - extract usage and cost.""" + self.usage_info = message.usage + self.cost_info = message.total_cost_usd + + # Update session_id if available + if message.session_id: + self.session_id = message.session_id + + logger.info(f"💰 Cost: ${self.cost_info:.4f}, Duration: {message.duration_ms}ms, Turns: {message.num_turns}") + + async def cleanup(self): + """Clean up open streaming contexts.""" + if self.streaming_ctx: + try: + await self.streaming_ctx.close() + logger.debug(f"Closed streaming context") + except Exception as e: + logger.warning(f"Failed to close streaming context: {e}") + + def get_results(self) -> dict[str, Any]: + """Get final results for Temporal.""" + return { + "messages": self.serialized_messages, + "task_id": self.task_id, + "session_id": self.session_id, + "usage": self.usage_info, + "cost_usd": self.cost_info, + } diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py b/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py index 8e551fc2e..1111249f0 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py @@ -85,10 +85,12 @@ def __init__(self, next, payload_converter): def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle: """Add task_id, trace_id, and parent_span_id to headers when starting model activities.""" - # Only add headers for invoke_model_activity calls + # Only add headers for model activity calls (OpenAI and Claude) activity_name = str(input.activity) if hasattr(input, 'activity') else "" - if "invoke_model_activity" in activity_name or "invoke-model-activity" in activity_name: + if ("invoke_model_activity" in activity_name or + "invoke-model-activity" in activity_name or + "run_claude_agent_activity" in activity_name): # Get task_id, trace_id, and parent_span_id from workflow instance instead of inbound interceptor try: workflow_instance = workflow.instance() diff --git a/src/agentex/lib/environment_variables.py b/src/agentex/lib/environment_variables.py index f23b6a393..cb534e5b2 100644 --- a/src/agentex/lib/environment_variables.py +++ b/src/agentex/lib/environment_variables.py @@ -39,6 +39,9 @@ class EnvVarKeys(str, Enum): # Build Information BUILD_INFO_PATH = "BUILD_INFO_PATH" AGENT_INPUT_TYPE = "AGENT_INPUT_TYPE" + # Claude Agents SDK Configuration + ANTHROPIC_API_KEY = "ANTHROPIC_API_KEY" + CLAUDE_WORKSPACE_ROOT = "CLAUDE_WORKSPACE_ROOT" class Environment(str, Enum): @@ -75,6 +78,9 @@ class EnvironmentVariables(BaseModel): AUTH_PRINCIPAL_B64: str | None = None # Build Information BUILD_INFO_PATH: str | None = None + # Claude Agents SDK Configuration + ANTHROPIC_API_KEY: str | None = None + CLAUDE_WORKSPACE_ROOT: str | None = None # Defaults to project/workspace if not set @classmethod def refresh(cls) -> EnvironmentVariables: diff --git a/uv.lock b/uv.lock index 379f3c630..0a0242aed 100644 --- a/uv.lock +++ b/uv.lock @@ -8,11 +8,13 @@ resolution-markers = [ [[package]] name = "agentex-sdk" -version = "0.6.5" +version = "0.6.7" source = { editable = "." } dependencies = [ { name = "aiohttp" }, + { name = "anthropic" }, { name = "anyio" }, + { name = "claude-agent-sdk" }, { name = "cloudpickle" }, { name = "datadog" }, { name = "ddtrace" }, @@ -47,6 +49,7 @@ dependencies = [ { name = "tzlocal" }, { name = "uvicorn" }, { name = "watchfiles" }, + { name = "yaspin" }, ] [package.optional-dependencies] @@ -69,7 +72,9 @@ dev = [ requires-dist = [ { name = "aiohttp", specifier = ">=3.10.10,<4" }, { name = "aiohttp", marker = "extra == 'aiohttp'" }, + { name = "anthropic", specifier = ">=0.40.0" }, { name = "anyio", specifier = ">=3.5.0,<5" }, + { name = "claude-agent-sdk", specifier = ">=0.1.0" }, { name = "cloudpickle", specifier = ">=3.1.1" }, { name = "datadog", specifier = ">=0.52.1" }, { name = "ddtrace", specifier = ">=3.13.0" }, @@ -106,6 +111,7 @@ requires-dist = [ { name = "tzlocal", specifier = ">=5.3.1" }, { name = "uvicorn", specifier = ">=0.31.1" }, { name = "watchfiles", specifier = ">=0.24.0,<1.0" }, + { name = "yaspin", specifier = ">=3.1.0" }, ] provides-extras = ["aiohttp", "dev"] @@ -198,6 +204,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/78/b6/6307fbef88d9b5ee7421e68d78a9f162e0da4900bc5f5793f6d3d0e34fb8/annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", size = 13643, upload-time = "2024-05-20T21:33:24.1Z" }, ] +[[package]] +name = "anthropic" +version = "0.74.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "distro" }, + { name = "docstring-parser" }, + { name = "httpx" }, + { name = "jiter" }, + { name = "pydantic" }, + { name = "sniffio" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5b/f9/baa1b885c8664b446e6a13003938046901e54ffd70b532bbebd01256e34b/anthropic-0.74.0.tar.gz", hash = "sha256:114ec10cb394b6764e199da06335da4747b019c5629e53add33572f66964ad99", size = 428958, upload-time = "2025-11-18T15:29:47.579Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/61/27/8c404b290ec650e634eacc674df943913722ec21097b0476d68458250c2f/anthropic-0.74.0-py3-none-any.whl", hash = "sha256:df29b8dfcdbd2751fa31177f643d8d8f66c5315fe06bdc42f9139e9f00d181d5", size = 371474, upload-time = "2025-11-18T15:29:45.748Z" }, +] + [[package]] name = "anyio" version = "4.11.0" @@ -365,6 +390,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8a/1f/f041989e93b001bc4e44bb1669ccdcf54d3f00e628229a85b08d330615c5/charset_normalizer-3.4.3-py3-none-any.whl", hash = "sha256:ce571ab16d890d23b5c278547ba694193a45011ff86a9162a71307ed9f86759a", size = 53175, upload-time = "2025-08-09T07:57:26.864Z" }, ] +[[package]] +name = "claude-agent-sdk" +version = "0.1.8" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "mcp" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6e/2c/14828b10a5c99a3cdc42b12451c9ed03de6d53a712da4fe7b0b41c28e693/claude_agent_sdk-0.1.8.tar.gz", hash = "sha256:8ee495215132edc7f88e439f3f071154a016cea62d393fbf985eb806793ed3d1", size = 50899, upload-time = "2025-11-19T05:07:58.064Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/4e/fe4da2c056caaa4eb819181e77f2497f39ab2fb629ae93f0bed62a521982/claude_agent_sdk-0.1.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:bdec17988dba541bd48487d68d8e2dbcbe5fef718744f3c73b4236bb3e290875", size = 49380557, upload-time = "2025-11-19T05:07:48.607Z" }, + { url = "https://files.pythonhosted.org/packages/95/7d/b1f6648d6631c892205c3db44f862532471aa2dead846c9d78c260ba3a73/claude_agent_sdk-0.1.8-py3-none-manylinux_2_17_x86_64.whl", hash = "sha256:6640f4c977842dc73a277a7f934a889c0161ab78ad454806cfb2b34eb0a2a7f7", size = 65237961, upload-time = "2025-11-19T05:07:52.021Z" }, + { url = "https://files.pythonhosted.org/packages/b9/fd/cba2bad3c79519be68c266b95a55508f856f7c3b3eaa47cfa9051672a221/claude_agent_sdk-0.1.8-py3-none-win_amd64.whl", hash = "sha256:4b2db1276d553b5cfa939701d0f6b9da38797db93445b9579f97498d0b4f3724", size = 68148406, upload-time = "2025-11-19T05:07:55.279Z" }, +] + [[package]] name = "click" version = "8.3.0" @@ -493,6 +533,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/12/b3/231ffd4ab1fc9d679809f356cebee130ac7daa00d6d6f3206dd4fd137e9e/distro-1.9.0-py3-none-any.whl", hash = "sha256:7bffd925d65168f85027d8da9af6bddab658135b840670a223589bc0c8ef02b2", size = 20277, upload-time = "2023-12-24T09:54:30.421Z" }, ] +[[package]] +name = "docstring-parser" +version = "0.17.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b2/9d/c3b43da9515bd270df0f80548d9944e389870713cc1fe2b8fb35fe2bcefd/docstring_parser-0.17.0.tar.gz", hash = "sha256:583de4a309722b3315439bb31d64ba3eebada841f2e2cee23b99df001434c912", size = 27442, upload-time = "2025-07-21T07:35:01.868Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/55/e2/2537ebcff11c1ee1ff17d8d0b6f4db75873e3b0fb32c2d4a2ee31ecb310a/docstring_parser-0.17.0-py3-none-any.whl", hash = "sha256:cf2569abd23dce8099b300f9b4fa8191e9582dda731fd533daf54c4551658708", size = 36896, upload-time = "2025-07-21T07:35:00.684Z" }, +] + [[package]] name = "envier" version = "0.6.1"