Skip to content

Commit bea608c

Browse files
fix(openai-agents): Patch run_single_turn_streamed() functions following library refactor (#5451)
`AgentRunner._run_single_turn_streamed()` was moved to `agents.run_internal.run_loop.run_single_turn_streamed()`. Patch the new function by reusing existing wrapper logic.
1 parent 14e3e0a commit bea608c

File tree

3 files changed

+84
-67
lines changed

3 files changed

+84
-67
lines changed

sentry_sdk/integrations/openai_agents/__init__.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
_get_model,
88
_get_all_tools,
99
_run_single_turn,
10+
_run_single_turn_streamed,
1011
_create_run_wrapper,
1112
_create_run_streamed_wrapper,
1213
_patch_agent_run,
@@ -78,7 +79,7 @@ class OpenAIAgentsIntegration(Integration):
7879
3. In a loop, the agent repeatedly calls the Responses API, maintaining a conversation history that includes previous messages and tool results, which is passed to each call.
7980
- A Model instance is created at the start of the loop by calling the `Runner._get_model()`. We patch the Model instance using `patches._get_model()`.
8081
- Available tools are also deteremined at the start of the loop, with `Runner._get_all_tools()`. We patch Tool instances by iterating through the returned tools in `patches._get_all_tools()`.
81-
- In each loop iteration, `run_single_turn()` or `run_single_turn_streamed()` is responsible for calling the Responses API, patched with `patches._run_single_turn()` and `patched_run_single_turn_streamed()`.
82+
- In each loop iteration, `run_single_turn()` or `run_single_turn_streamed()` is responsible for calling the Responses API, patched with `patches._run_single_turn()` and `patches._run_single_turn_streamed()`.
8283
4. On loop termination, `RunImpl.execute_final_output()` is called. The function is patched with `patched_execute_final_output()`.
8384
8485
Local tools are run based on the return value from the Responses API as a post-API call step in the above loop.
@@ -128,6 +129,16 @@ async def new_wrapped_run_single_turn(
128129

129130
agents.run.run_single_turn = new_wrapped_run_single_turn
130131

132+
@wraps(run_loop.run_single_turn_streamed)
133+
async def new_wrapped_run_single_turn_streamed(
134+
*args: "Any", **kwargs: "Any"
135+
) -> "SingleStepResult":
136+
return await _run_single_turn_streamed(
137+
run_loop.run_single_turn_streamed, *args, **kwargs
138+
)
139+
140+
agents.run.run_single_turn_streamed = new_wrapped_run_single_turn_streamed
141+
131142
return
132143

133144
original_get_all_tools = AgentRunner._get_all_tools
@@ -163,3 +174,17 @@ async def old_wrapped_run_single_turn(
163174
agents.run.AgentRunner._run_single_turn = classmethod(
164175
old_wrapped_run_single_turn
165176
)
177+
178+
original_run_single_turn_streamed = AgentRunner._run_single_turn_streamed
179+
180+
@wraps(AgentRunner._run_single_turn_streamed.__func__)
181+
async def old_wrapped_run_single_turn_streamed(
182+
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
183+
) -> "SingleStepResult":
184+
return await _run_single_turn_streamed(
185+
original_run_single_turn_streamed, *args, **kwargs
186+
)
187+
188+
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
189+
old_wrapped_run_single_turn_streamed
190+
)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .models import _get_model # noqa: F401
22
from .tools import _get_all_tools # noqa: F401
33
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
4-
from .agent_run import _run_single_turn, _patch_agent_run # noqa: F401
4+
from .agent_run import _run_single_turn, _run_single_turn_streamed, _patch_agent_run # noqa: F401
55
from .error_tracing import _patch_error_tracing # noqa: F401

sentry_sdk/integrations/openai_agents/patches/agent_run.py

Lines changed: 57 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,70 @@ async def _run_single_turn(
109109
return result
110110

111111

112+
async def _run_single_turn_streamed(
113+
original_run_single_turn_streamed: "Callable[..., Awaitable[SingleStepResult]]",
114+
*args: "Any",
115+
**kwargs: "Any",
116+
) -> "SingleStepResult":
117+
"""
118+
Patched _run_single_turn_streamed that
119+
- creates agent invocation spans for streaming if there is no already active agent invocation span.
120+
- ends the agent invocation span if and only if `_run_single_turn_streamed()` raises an exception.
121+
122+
Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
123+
_run_single_turn_streamed uses positional arguments. The call signature is:
124+
_run_single_turn_streamed(
125+
streamed_result, # args[0]
126+
agent, # args[1]
127+
hooks, # args[2]
128+
context_wrapper, # args[3]
129+
run_config, # args[4]
130+
should_run_agent_start_hooks, # args[5]
131+
tool_use_tracker, # args[6]
132+
all_tools, # args[7]
133+
server_conversation_tracker, # args[8] (optional)
134+
)
135+
"""
136+
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
137+
agent = args[1] if len(args) > 1 else kwargs.get("agent")
138+
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
139+
should_run_agent_start_hooks = bool(
140+
args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks", False)
141+
)
142+
143+
span_kwargs: "dict[str, Any]" = {}
144+
if streamed_result and hasattr(streamed_result, "input"):
145+
span_kwargs["original_input"] = streamed_result.input
146+
147+
span = _maybe_start_agent_span(
148+
context_wrapper,
149+
agent,
150+
should_run_agent_start_hooks,
151+
span_kwargs,
152+
is_streaming=True,
153+
)
154+
155+
try:
156+
result = await original_run_single_turn_streamed(*args, **kwargs)
157+
except Exception as exc:
158+
exc_info = sys.exc_info()
159+
with capture_internal_exceptions():
160+
if span is not None and span.timestamp is None:
161+
_record_exception_on_span(span, exc)
162+
end_invoke_agent_span(context_wrapper, agent)
163+
_close_streaming_workflow_span(agent)
164+
reraise(*exc_info)
165+
166+
return result
167+
168+
112169
def _patch_agent_run() -> None:
113170
"""
114171
Patches AgentRunner methods to create agent invocation spans.
115172
This directly patches the execution flow to track when agents start and stop.
116173
"""
117174

118175
# Store original methods
119-
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
120176
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
121177
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output
122178

@@ -193,71 +249,7 @@ async def patched_execute_final_output(
193249

194250
return result
195251

196-
@wraps(
197-
original_run_single_turn_streamed.__func__
198-
if hasattr(original_run_single_turn_streamed, "__func__")
199-
else original_run_single_turn_streamed
200-
)
201-
async def patched_run_single_turn_streamed(
202-
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
203-
) -> "Any":
204-
"""
205-
Patched _run_single_turn_streamed that
206-
- creates agent invocation spans for streaming if there is no already active agent invocation span.
207-
- ends the agent invocation span if and only if `_run_single_turn_streamed()` raises an exception.
208-
209-
Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
210-
_run_single_turn_streamed uses positional arguments. The call signature is:
211-
_run_single_turn_streamed(
212-
streamed_result, # args[0]
213-
agent, # args[1]
214-
hooks, # args[2]
215-
context_wrapper, # args[3]
216-
run_config, # args[4]
217-
should_run_agent_start_hooks, # args[5]
218-
tool_use_tracker, # args[6]
219-
all_tools, # args[7]
220-
server_conversation_tracker, # args[8] (optional)
221-
)
222-
"""
223-
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
224-
agent = args[1] if len(args) > 1 else kwargs.get("agent")
225-
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
226-
should_run_agent_start_hooks = bool(
227-
args[5]
228-
if len(args) > 5
229-
else kwargs.get("should_run_agent_start_hooks", False)
230-
)
231-
232-
span_kwargs: "dict[str, Any]" = {}
233-
if streamed_result and hasattr(streamed_result, "input"):
234-
span_kwargs["original_input"] = streamed_result.input
235-
236-
span = _maybe_start_agent_span(
237-
context_wrapper,
238-
agent,
239-
should_run_agent_start_hooks,
240-
span_kwargs,
241-
is_streaming=True,
242-
)
243-
244-
try:
245-
result = await original_run_single_turn_streamed(*args, **kwargs)
246-
except Exception as exc:
247-
exc_info = sys.exc_info()
248-
with capture_internal_exceptions():
249-
if span is not None and span.timestamp is None:
250-
_record_exception_on_span(span, exc)
251-
end_invoke_agent_span(context_wrapper, agent)
252-
_close_streaming_workflow_span(agent)
253-
reraise(*exc_info)
254-
255-
return result
256-
257252
# Apply patches
258-
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
259-
patched_run_single_turn_streamed
260-
)
261253
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
262254
agents._run_impl.RunImpl.execute_final_output = classmethod(
263255
patched_execute_final_output

0 commit comments

Comments
 (0)