diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 61a742668a..264bf6dece 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -61,40 +61,26 @@ def __stream__(self) -> Iterator[_T]: break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data + data = sse.json() + + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + # Special case for Assistants `thread.` events - wrap data with event name if sse.event and sse.event.startswith("thread."): - data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) else: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data=data, cast_to=cast_to, response=response) finally: @@ -164,40 +150,26 @@ async def __stream__(self) -> AsyncIterator[_T]: break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data + data = sse.json() + + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + # Special case for Assistants `thread.` events - wrap data with event name if sse.event and sse.event.startswith("thread."): - data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) else: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data(data=data, cast_to=cast_to, response=response) finally: