Skip to content

Commit 5e33d3a

Browse files
switch from manual parsing to hooks
1 parent c872735 commit 5e33d3a

File tree

6 files changed

+271
-206
lines changed

6 files changed

+271
-206
lines changed

examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -180,43 +180,8 @@ async def on_task_event_send(self, params: SendEventParams):
180180
else:
181181
logger.warning(f"No session_id returned - context may not persist")
182182

183-
# Send Claude's response back to user
184-
# Note: Activity should have streamed the response in real-time
185-
# But if streaming failed (task_id=None), we need to send it here
186-
messages = result.get("messages", [])
187-
188-
# Extract just the assistant messages (skip system/result messages)
189-
assistant_messages = [
190-
msg for msg in messages
191-
if msg.get("role") == "assistant" and msg.get("content")
192-
]
193-
194-
if assistant_messages:
195-
# Combine assistant responses
196-
combined_content = "\n\n".join(
197-
msg.get("content", "") for msg in assistant_messages
198-
)
199-
200-
# Send the response (streaming might have failed if task_id was None)
201-
await adk.messages.create(
202-
task_id=params.task.id,
203-
content=TextContent(
204-
author="agent",
205-
content=combined_content,
206-
format="markdown",
207-
)
208-
)
209-
logger.info(f"Sent Claude response to UI: {combined_content[:100]}...")
210-
else:
211-
# No assistant message found - this shouldn't happen
212-
logger.warning("No assistant messages in Claude response")
213-
await adk.messages.create(
214-
task_id=params.task.id,
215-
content=TextContent(
216-
author="agent",
217-
content="⚠️ Claude completed but returned no assistant messages.",
218-
)
219-
)
183+
# Response already streamed to UI by activity - no need to send again
184+
logger.debug(f"Turn {self._state.turn_number} completed successfully")
220185

221186
except Exception as e:
222187
logger.error(f"Error running Claude agent: {e}", exc_info=True)

src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@
4444
ClaudeMessageHandler,
4545
)
4646

47+
from agentex.lib.core.temporal.plugins.claude_agents.hooks import (
48+
create_streaming_hooks,
49+
TemporalStreamingHooks,
50+
)
51+
4752
# Reuse OpenAI's context threading - this is the key to streaming!
4853
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import (
4954
ContextInterceptor,
@@ -58,6 +63,9 @@
5863
"create_workspace_directory",
5964
# Message handling
6065
"ClaudeMessageHandler",
66+
# Hooks
67+
"create_streaming_hooks",
68+
"TemporalStreamingHooks",
6169
# Context threading (reused from OpenAI)
6270
"ContextInterceptor",
6371
"streaming_task_id",

src/agentex/lib/core/temporal/plugins/claude_agents/activities.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
streaming_parent_span_id,
1717
)
1818
from agentex.lib.core.temporal.plugins.claude_agents.message_handler import ClaudeMessageHandler
19+
from agentex.lib.core.temporal.plugins.claude_agents.hooks import create_streaming_hooks
1920

2021
logger = make_logger(__name__)
2122

@@ -103,14 +104,22 @@ async def run_claude_agent_activity(
103104
model=agent_data.get('model'),
104105
)
105106

106-
# Configure Claude with workspace isolation, session resume, and subagents
107+
# Create hooks for streaming tool calls and subagent execution
108+
hooks = create_streaming_hooks(
109+
task_id=task_id,
110+
trace_id=trace_id,
111+
parent_span_id=parent_span_id,
112+
)
113+
114+
# Configure Claude with workspace isolation, session resume, subagents, and hooks
107115
options = ClaudeAgentOptions(
108116
cwd=workspace_path,
109117
allowed_tools=allowed_tools,
110118
permission_mode=permission_mode, # type: ignore
111119
system_prompt=system_prompt,
112120
resume=resume_session_id,
113121
agents=agent_defs,
122+
hooks=hooks, # Tool lifecycle hooks for streaming!
114123
)
115124

116125
# Create message handler for streaming
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
"""Claude SDK hooks for streaming lifecycle events to AgentEx UI."""
2+
3+
from agentex.lib.core.temporal.plugins.claude_agents.hooks.hooks import (
4+
create_streaming_hooks,
5+
TemporalStreamingHooks,
6+
)
7+
8+
__all__ = [
9+
"create_streaming_hooks",
10+
"TemporalStreamingHooks",
11+
]
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
"""Claude SDK hooks for streaming tool calls and subagent execution to AgentEx UI.
2+
3+
This module provides hook callbacks that integrate with Claude SDK's hooks system
4+
to stream tool execution lifecycle events in real-time.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from typing import Any
10+
11+
from claude_agent_sdk import HookMatcher
12+
13+
from agentex.lib.utils.logging import make_logger
14+
from agentex.lib import adk
15+
from agentex.types.tool_request_content import ToolRequestContent
16+
from agentex.types.tool_response_content import ToolResponseContent
17+
from agentex.types.task_message_update import StreamTaskMessageFull
18+
19+
logger = make_logger(__name__)
20+
21+
22+
class TemporalStreamingHooks:
23+
"""Hooks for streaming Claude SDK lifecycle events to AgentEx UI.
24+
25+
Implements Claude SDK hook callbacks:
26+
- PreToolUse: Called before tool execution → stream tool request
27+
- PostToolUse: Called after tool execution → stream tool result
28+
29+
Also handles subagent detection and nested tracing.
30+
"""
31+
32+
def __init__(
33+
self,
34+
task_id: str | None,
35+
trace_id: str | None = None,
36+
parent_span_id: str | None = None,
37+
):
38+
"""Initialize streaming hooks.
39+
40+
Args:
41+
task_id: AgentEx task ID for routing streams
42+
trace_id: Trace ID for nested spans
43+
parent_span_id: Parent span ID for subagent spans
44+
"""
45+
self.task_id = task_id
46+
self.trace_id = trace_id
47+
self.parent_span_id = parent_span_id
48+
49+
# Track active subagent spans
50+
self.subagent_spans: dict[str, Any] = {} # tool_call_id → (ctx, span)
51+
52+
async def pre_tool_use(
53+
self,
54+
input_data: dict[str, Any],
55+
tool_use_id: str | None,
56+
context: Any,
57+
) -> dict[str, Any]:
58+
"""Hook called before tool execution.
59+
60+
Args:
61+
input_data: Contains tool_name, tool_input from Claude SDK
62+
tool_use_id: Unique ID for this tool call
63+
context: Hook context from Claude SDK
64+
65+
Returns:
66+
Empty dict (allow execution to proceed)
67+
"""
68+
if not self.task_id or not tool_use_id:
69+
return {}
70+
71+
tool_name = input_data.get("tool_name", "unknown")
72+
tool_input = input_data.get("tool_input", {})
73+
74+
logger.info(f"🔧 Tool request: {tool_name}")
75+
76+
# Special handling for Task tool (subagents) - create nested span
77+
if tool_name == "Task" and self.trace_id and self.parent_span_id:
78+
subagent_type = tool_input.get("subagent_type", "unknown")
79+
logger.info(f"🤖 Subagent started: {subagent_type}")
80+
81+
# Create nested trace span for subagent
82+
subagent_ctx = adk.tracing.span(
83+
trace_id=self.trace_id,
84+
parent_id=self.parent_span_id,
85+
name=f"Subagent: {subagent_type}",
86+
input=tool_input,
87+
)
88+
subagent_span = await subagent_ctx.__aenter__()
89+
self.subagent_spans[tool_use_id] = (subagent_ctx, subagent_span)
90+
91+
# Stream tool request to UI
92+
try:
93+
async with adk.streaming.streaming_task_message_context(
94+
task_id=self.task_id,
95+
initial_content=ToolRequestContent(
96+
author="agent",
97+
name=tool_name,
98+
arguments=tool_input,
99+
tool_call_id=tool_use_id,
100+
)
101+
) as tool_ctx:
102+
await tool_ctx.stream_update(
103+
StreamTaskMessageFull(
104+
parent_task_message=tool_ctx.task_message,
105+
content=ToolRequestContent(
106+
author="agent",
107+
name=tool_name,
108+
arguments=tool_input,
109+
tool_call_id=tool_use_id,
110+
),
111+
type="full"
112+
)
113+
)
114+
except Exception as e:
115+
logger.warning(f"Failed to stream tool request: {e}")
116+
117+
return {} # Allow execution
118+
119+
async def post_tool_use(
120+
self,
121+
input_data: dict[str, Any],
122+
tool_use_id: str | None,
123+
context: Any,
124+
) -> dict[str, Any]:
125+
"""Hook called after tool execution.
126+
127+
Args:
128+
input_data: Contains tool_name, tool_output from Claude SDK
129+
tool_use_id: Unique ID for this tool call
130+
context: Hook context from Claude SDK
131+
132+
Returns:
133+
Empty dict
134+
"""
135+
if not self.task_id or not tool_use_id:
136+
return {}
137+
138+
tool_name = input_data.get("tool_name", "unknown")
139+
tool_output = input_data.get("tool_output", "")
140+
141+
logger.info(f"✅ Tool result: {tool_name}")
142+
143+
# If this was a subagent, close the nested span
144+
if tool_use_id in self.subagent_spans:
145+
subagent_ctx, subagent_span = self.subagent_spans[tool_use_id]
146+
subagent_span.output = {"result": tool_output}
147+
await subagent_ctx.__aexit__(None, None, None)
148+
logger.info(f"🤖 Subagent completed: {tool_name}")
149+
del self.subagent_spans[tool_use_id]
150+
151+
# Stream tool response to UI
152+
try:
153+
async with adk.streaming.streaming_task_message_context(
154+
task_id=self.task_id,
155+
initial_content=ToolResponseContent(
156+
author="agent",
157+
name=tool_name,
158+
content=tool_output,
159+
tool_call_id=tool_use_id,
160+
)
161+
) as tool_ctx:
162+
await tool_ctx.stream_update(
163+
StreamTaskMessageFull(
164+
parent_task_message=tool_ctx.task_message,
165+
content=ToolResponseContent(
166+
author="agent",
167+
name=tool_name,
168+
content=tool_output,
169+
tool_call_id=tool_use_id,
170+
),
171+
type="full"
172+
)
173+
)
174+
except Exception as e:
175+
logger.warning(f"Failed to stream tool response: {e}")
176+
177+
return {}
178+
179+
180+
def create_streaming_hooks(
181+
task_id: str | None,
182+
trace_id: str | None = None,
183+
parent_span_id: str | None = None,
184+
) -> dict[str, list[HookMatcher]]:
185+
"""Create Claude SDK hooks configuration for streaming.
186+
187+
Returns hooks dict suitable for ClaudeAgentOptions(hooks=...).
188+
189+
Args:
190+
task_id: AgentEx task ID for streaming
191+
trace_id: Trace ID for nested spans
192+
parent_span_id: Parent span ID for subagent spans
193+
194+
Returns:
195+
Dict with PreToolUse and PostToolUse hook configurations
196+
"""
197+
hooks_instance = TemporalStreamingHooks(task_id, trace_id, parent_span_id)
198+
199+
return {
200+
"PreToolUse": [
201+
HookMatcher(
202+
matcher=None, # Match all tools
203+
hooks=[hooks_instance.pre_tool_use]
204+
)
205+
],
206+
"PostToolUse": [
207+
HookMatcher(
208+
matcher=None, # Match all tools
209+
hooks=[hooks_instance.post_tool_use]
210+
)
211+
],
212+
}

0 commit comments

Comments
 (0)