From 2b14925d03aa2adae720fce80dc290791bd8a2ab Mon Sep 17 00:00:00 2001 From: skyvanguard Date: Mon, 12 Jan 2026 18:03:04 -0300 Subject: [PATCH] fix(streaming): remove unreachable dead code in error handling The check `sse.event == "error"` was inside a block that requires `sse.event.startswith("thread.")`, making it logically impossible to ever be true (since "error" doesn't start with "thread."). This was a regression from commit abc25966 where the error check was incorrectly moved inside the thread event branch. The fix restructures the code to: 1. Parse JSON data first (common to both branches) 2. Check for errors in the data (common to both branches) 3. Then handle the thread.* event special case for yield This removes ~28 lines of dead code while preserving all intended functionality. Fixes #2796 Co-Authored-By: Claude Opus 4.5 --- src/openai/_streaming.py | 96 ++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 62 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 61a742668a..264bf6dece 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -61,40 +61,26 @@ def __stream__(self) -> Iterator[_T]: break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data + data = sse.json() + + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + # Special case for Assistants `thread.` events - wrap data with event name if sse.event and sse.event.startswith("thread."): - data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) else: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data=data, cast_to=cast_to, response=response) finally: @@ -164,40 +150,26 @@ async def __stream__(self) -> AsyncIterator[_T]: break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data + data = sse.json() + + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + # Special case for Assistants `thread.` events - wrap data with event name if sse.event and sse.event.startswith("thread."): - data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) else: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data=data, cast_to=cast_to, response=response) finally: