From 16170271740dd04eae648205aa602749d86fc8ff Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Thu, 13 Nov 2025 16:22:22 -0500 Subject: [PATCH 1/9] Accumulate extra pydantic fields from the sample event --- src/anthropic/lib/streaming/_messages.py | 35 +++++ .../fixtures/extra_fields_response.txt | 20 +++ tests/lib/streaming/test_extra_fields.py | 143 ++++++++++++++++++ 3 files changed, 198 insertions(+) create mode 100644 tests/lib/streaming/fixtures/extra_fields_response.txt create mode 100644 tests/lib/streaming/test_extra_fields.py diff --git a/src/anthropic/lib/streaming/_messages.py b/src/anthropic/lib/streaming/_messages.py index 857f9734..c6bb9f0f 100644 --- a/src/anthropic/lib/streaming/_messages.py +++ b/src/anthropic/lib/streaming/_messages.py @@ -401,6 +401,27 @@ def build_events( ) +def _deep_merge_extra_fields(existing: Any, new: Any) -> Any: + """Deep merge new data into existing data, mutating containers in place. + + - Dicts: recursively merge keys (mutates existing dict) + - Lists: extend existing with new items (mutates existing list) + - Other: replace with new value + """ + if isinstance(existing, dict) and isinstance(new, dict): + for key, value in new.items(): + if key in existing: + existing[key] = _deep_merge_extra_fields(existing[key], value) + else: + existing[key] = value + return existing # Return mutated dict + elif isinstance(existing, list) and isinstance(new, list): + existing.extend(new) + return existing # Return mutated list + else: + return new + + def accumulate_event( *, event: RawMessageStreamEvent, @@ -481,4 +502,18 @@ def accumulate_event( if event.usage.server_tool_use is not None: current_snapshot.usage.server_tool_use = event.usage.server_tool_use + # Accumulate any extra fields from the event into the snapshot + if hasattr(event, '__pydantic_extra__') and event.__pydantic_extra__: + if not hasattr(current_snapshot, '__pydantic_extra__'): + current_snapshot.__pydantic_extra__ = {} + + for key, value in event.__pydantic_extra__.items(): + if key in current_snapshot.__pydantic_extra__: + current_snapshot.__pydantic_extra__[key] = _deep_merge_extra_fields( + current_snapshot.__pydantic_extra__[key], + value + ) + else: + current_snapshot.__pydantic_extra__[key] = value + return current_snapshot diff --git a/tests/lib/streaming/fixtures/extra_fields_response.txt b/tests/lib/streaming/fixtures/extra_fields_response.txt new file mode 100644 index 00000000..c9b9dd48 --- /dev/null +++ b/tests/lib/streaming/fixtures/extra_fields_response.txt @@ -0,0 +1,20 @@ +event: message_start +data: {"type":"message_start","message":{"id":"msg_123","type":"message","role":"assistant","content":[],"model":"claude-3-opus-latest","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":11,"output_tokens":1},"private_field":{"nested":{"values":[1,2]}}}} + +event: content_block_start +data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"},"private_field":{"nested":{"values":[3],"metadata":"chunk1"}}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"!"},"private_field":{"nested":{"values":[4,5],"metadata":"chunk2"}}} + +event: content_block_stop +data: {"type":"content_block_stop","index":0} + +event: message_delta +data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":3},"private_field":{"nested":{"values":[6]}}} + +event: message_stop +data: {"type":"message_stop"} diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py new file mode 100644 index 00000000..85379128 --- /dev/null +++ b/tests/lib/streaming/test_extra_fields.py @@ -0,0 +1,143 @@ +"""Tests for accumulating extra fields in streaming responses. + +This tests that pydantic extra fields (fields not in the schema) are properly +accumulated during streaming, without exposing specific field names in the SDK. +""" + +from __future__ import annotations + +import os +from typing import Any, cast + +import httpx +import pytest +from respx import MockRouter + +from anthropic import Anthropic, AsyncAnthropic + +from .helpers import get_response, to_async_iter + +base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") +api_key = "my-anthropic-api-key" + +sync_client = Anthropic(base_url=base_url, api_key=api_key, _strict_response_validation=True) +async_client = AsyncAnthropic(base_url=base_url, api_key=api_key, _strict_response_validation=True) + + +def assert_extra_fields_accumulated(message: Any) -> None: + """Verify that extra fields are properly accumulated from streaming events. + + This test is intentionally generic - it doesn't know the specific field names, + just that extra fields should be deep-merged correctly. + """ + # Extra fields should be accessible via attribute access (pydantic's extra="allow") + assert hasattr(message, '__pydantic_extra__'), "Message should have __pydantic_extra__" + + extra = message.__pydantic_extra__ + assert 'private_field' in extra, "Extra fields should be accumulated" + + # Verify deep merging: nested dicts should be merged, lists should be extended + private_field = extra['private_field'] + assert isinstance(private_field, dict), "Extra field should be a dict" + assert 'nested' in private_field, "Nested structure should be present" + + nested = private_field['nested'] + assert isinstance(nested, dict), "Nested field should be a dict" + assert 'values' in nested, "Nested values should be present" + + # The 'values' list should have been extended across all streaming events: + # message_start: [1, 2] + # content_block_delta 1: [3] + # content_block_delta 2: [4, 5] + # message_delta: [6] + # Expected: [1, 2, 3, 4, 5, 6] + values = nested['values'] + assert isinstance(values, list), "Nested values should be a list" + assert values == [1, 2, 3, 4, 5, 6], "Lists should be extended, not replaced" + + # Last value from dict merge should be present + assert nested.get('metadata') == 'chunk2', "Dict values should be merged" + + +class TestSyncExtraFields: + @pytest.mark.respx(base_url=base_url) + def test_extra_fields_accumulation(self, respx_mock: MockRouter) -> None: + """Test that extra fields are accumulated during streaming.""" + respx_mock.post("/v1/messages").mock( + return_value=httpx.Response(200, content=get_response("extra_fields_response.txt")) + ) + + with sync_client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello!", + } + ], + model="claude-3-opus-latest", + ) as stream: + # Consume the stream + for _ in stream: + pass + + message = stream.get_final_message() + assert_extra_fields_accumulated(message) + + +class TestAsyncExtraFields: + @pytest.mark.asyncio + @pytest.mark.respx(base_url=base_url) + async def test_extra_fields_accumulation(self, respx_mock: MockRouter) -> None: + """Test that extra fields are accumulated during async streaming.""" + respx_mock.post("/v1/messages").mock( + return_value=httpx.Response(200, content=to_async_iter(get_response("extra_fields_response.txt"))) + ) + + async with async_client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello!", + } + ], + model="claude-3-opus-latest", + ) as stream: + # Consume the stream + async for _ in stream: + pass + + message = await stream.get_final_message() + assert_extra_fields_accumulated(message) + + +def test_deep_merge_extra_fields_function() -> None: + """Test the _deep_merge_extra_fields helper function directly.""" + from anthropic.lib.streaming._messages import _deep_merge_extra_fields + + # Test dict merging + existing = {"a": 1, "b": {"c": 2}} + new = {"b": {"d": 3}, "e": 4} + result = _deep_merge_extra_fields(existing, new) + assert result == {"a": 1, "b": {"c": 2, "d": 3}, "e": 4} + assert result is existing, "Should mutate in place" + + # Test list extending + existing_list = [1, 2, 3] + new_list = [4, 5] + result_list = _deep_merge_extra_fields(existing_list, new_list) + assert result_list == [1, 2, 3, 4, 5] + assert result_list is existing_list, "Should mutate in place" + + # Test nested dict with lists + existing_nested = {"data": {"values": [1, 2]}} + new_nested = {"data": {"values": [3, 4], "count": 4}} + result_nested = _deep_merge_extra_fields(existing_nested, new_nested) + assert result_nested == {"data": {"values": [1, 2, 3, 4], "count": 4}} + assert result_nested is existing_nested, "Should mutate in place" + + # Test scalar replacement + assert _deep_merge_extra_fields(1, 2) == 2 + assert _deep_merge_extra_fields("old", "new") == "new" + assert _deep_merge_extra_fields(None, {"a": 1}) == {"a": 1} From 4d4227f33195b252fc4ec3f7e905f14b7cddcb85 Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Thu, 13 Nov 2025 16:37:12 -0500 Subject: [PATCH 2/9] ruff --- tests/lib/streaming/test_extra_fields.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index 85379128..6ae7e1f0 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -7,7 +7,7 @@ from __future__ import annotations import os -from typing import Any, cast +from typing import Any import httpx import pytest From efb4ae1e4c53987ba9c867064200b3105e4af486 Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Fri, 14 Nov 2025 13:30:31 -0500 Subject: [PATCH 3/9] pyright --- src/anthropic/lib/streaming/_messages.py | 31 ++++++++++++++---------- tests/lib/streaming/test_extra_fields.py | 17 +++++++------ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/anthropic/lib/streaming/_messages.py b/src/anthropic/lib/streaming/_messages.py index c6bb9f0f..12093747 100644 --- a/src/anthropic/lib/streaming/_messages.py +++ b/src/anthropic/lib/streaming/_messages.py @@ -401,7 +401,7 @@ def build_events( ) -def _deep_merge_extra_fields(existing: Any, new: Any) -> Any: +def _deep_merge_extra_fields(existing: object, new: object) -> object: """Deep merge new data into existing data, mutating containers in place. - Dicts: recursively merge keys (mutates existing dict) @@ -409,15 +409,19 @@ def _deep_merge_extra_fields(existing: Any, new: Any) -> Any: - Other: replace with new value """ if isinstance(existing, dict) and isinstance(new, dict): - for key, value in new.items(): - if key in existing: - existing[key] = _deep_merge_extra_fields(existing[key], value) + existing_dict = cast("dict[str, object]", existing) + new_dict = cast("dict[str, object]", new) + for key, value in new_dict.items(): + if key in existing_dict: + existing_dict[key] = _deep_merge_extra_fields(existing_dict[key], value) else: - existing[key] = value - return existing # Return mutated dict + existing_dict[key] = value + return existing_dict # Return mutated dict elif isinstance(existing, list) and isinstance(new, list): - existing.extend(new) - return existing # Return mutated list + existing_list = cast("list[object]", existing) + new_list = cast("list[object]", new) + existing_list.extend(new_list) + return existing_list # Return mutated list else: return new @@ -504,16 +508,17 @@ def accumulate_event( # Accumulate any extra fields from the event into the snapshot if hasattr(event, '__pydantic_extra__') and event.__pydantic_extra__: - if not hasattr(current_snapshot, '__pydantic_extra__'): + if not hasattr(current_snapshot, '__pydantic_extra__') or current_snapshot.__pydantic_extra__ is None: current_snapshot.__pydantic_extra__ = {} + snapshot_extra = current_snapshot.__pydantic_extra__ for key, value in event.__pydantic_extra__.items(): - if key in current_snapshot.__pydantic_extra__: - current_snapshot.__pydantic_extra__[key] = _deep_merge_extra_fields( - current_snapshot.__pydantic_extra__[key], + if key in snapshot_extra: + snapshot_extra[key] = _deep_merge_extra_fields( + snapshot_extra[key], value ) else: - current_snapshot.__pydantic_extra__[key] = value + snapshot_extra[key] = value return current_snapshot diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index 6ae7e1f0..eb03855a 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -7,7 +7,7 @@ from __future__ import annotations import os -from typing import Any +from typing import Any, cast import httpx import pytest @@ -37,12 +37,14 @@ def assert_extra_fields_accumulated(message: Any) -> None: assert 'private_field' in extra, "Extra fields should be accumulated" # Verify deep merging: nested dicts should be merged, lists should be extended - private_field = extra['private_field'] - assert isinstance(private_field, dict), "Extra field should be a dict" + private_field_value = extra['private_field'] + assert isinstance(private_field_value, dict), "Extra field should be a dict" + private_field = cast(dict[str, object], private_field_value) assert 'nested' in private_field, "Nested structure should be present" - nested = private_field['nested'] - assert isinstance(nested, dict), "Nested field should be a dict" + nested_value = private_field['nested'] + assert isinstance(nested_value, dict), "Nested field should be a dict" + nested = cast(dict[str, object], nested_value) assert 'values' in nested, "Nested values should be present" # The 'values' list should have been extended across all streaming events: @@ -51,8 +53,9 @@ def assert_extra_fields_accumulated(message: Any) -> None: # content_block_delta 2: [4, 5] # message_delta: [6] # Expected: [1, 2, 3, 4, 5, 6] - values = nested['values'] - assert isinstance(values, list), "Nested values should be a list" + values_value = nested['values'] + assert isinstance(values_value, list), "Nested values should be a list" + values = cast(list[int], values_value) assert values == [1, 2, 3, 4, 5, 6], "Lists should be extended, not replaced" # Last value from dict merge should be present From 55501ef7828fa27dbfe4729b9dbe53f7bcb1a1fe Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Fri, 14 Nov 2025 14:47:38 -0500 Subject: [PATCH 4/9] fix test --- tests/lib/streaming/test_extra_fields.py | 97 ++++++++++++------------ 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index eb03855a..09aef3b6 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -6,12 +6,12 @@ from __future__ import annotations +import asyncio import os from typing import Any, cast import httpx -import pytest -from respx import MockRouter +import respx from anthropic import Anthropic, AsyncAnthropic @@ -63,56 +63,59 @@ def assert_extra_fields_accumulated(message: Any) -> None: class TestSyncExtraFields: - @pytest.mark.respx(base_url=base_url) - def test_extra_fields_accumulation(self, respx_mock: MockRouter) -> None: + def test_extra_fields_accumulation(self) -> None: """Test that extra fields are accumulated during streaming.""" - respx_mock.post("/v1/messages").mock( - return_value=httpx.Response(200, content=get_response("extra_fields_response.txt")) - ) - - with sync_client.messages.stream( - max_tokens=1024, - messages=[ - { - "role": "user", - "content": "Say hello!", - } - ], - model="claude-3-opus-latest", - ) as stream: - # Consume the stream - for _ in stream: - pass - - message = stream.get_final_message() - assert_extra_fields_accumulated(message) + with respx.mock(base_url=base_url) as respx_mock: + respx_mock.post("/v1/messages").mock( + return_value=httpx.Response(200, content=get_response("extra_fields_response.txt")) + ) + + with sync_client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello!", + } + ], + model="claude-3-opus-latest", + ) as stream: + # Consume the stream + for _ in stream: + pass + + message = stream.get_final_message() + assert_extra_fields_accumulated(message) class TestAsyncExtraFields: - @pytest.mark.asyncio - @pytest.mark.respx(base_url=base_url) - async def test_extra_fields_accumulation(self, respx_mock: MockRouter) -> None: + def test_extra_fields_accumulation(self) -> None: """Test that extra fields are accumulated during async streaming.""" - respx_mock.post("/v1/messages").mock( - return_value=httpx.Response(200, content=to_async_iter(get_response("extra_fields_response.txt"))) - ) - - async with async_client.messages.stream( - max_tokens=1024, - messages=[ - { - "role": "user", - "content": "Say hello!", - } - ], - model="claude-3-opus-latest", - ) as stream: - # Consume the stream - async for _ in stream: - pass - - message = await stream.get_final_message() - assert_extra_fields_accumulated(message) + + async def run_test() -> None: + with respx.mock(base_url=base_url) as respx_mock: + respx_mock.post("/v1/messages").mock( + return_value=httpx.Response(200, content=to_async_iter(get_response("extra_fields_response.txt"))) + ) + + async with async_client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello!", + } + ], + model="claude-3-opus-latest", + ) as stream: + # Consume the stream + async for _ in stream: + pass + + message = await stream.get_final_message() + assert_extra_fields_accumulated(message) + + asyncio.run(run_test()) def test_deep_merge_extra_fields_function() -> None: From b780db7bd128443a00b448580fa7096990092cf9 Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Fri, 14 Nov 2025 14:50:57 -0500 Subject: [PATCH 5/9] ruff --- tests/lib/streaming/test_extra_fields.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index 09aef3b6..a1b0817c 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -6,8 +6,8 @@ from __future__ import annotations -import asyncio import os +import asyncio from typing import Any, cast import httpx From 728433ea80727c0f49b6ab895fb9acb2f277989c Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Mon, 24 Nov 2025 09:27:54 -0500 Subject: [PATCH 6/9] PR comments --- src/anthropic/lib/streaming/_messages.py | 24 ++- tests/lib/streaming/test_extra_fields.py | 182 ++++++++++------------- 2 files changed, 88 insertions(+), 118 deletions(-) diff --git a/src/anthropic/lib/streaming/_messages.py b/src/anthropic/lib/streaming/_messages.py index 12093747..40bba102 100644 --- a/src/anthropic/lib/streaming/_messages.py +++ b/src/anthropic/lib/streaming/_messages.py @@ -21,7 +21,7 @@ ContentBlockStopEvent, ) from ...types import Message, ContentBlock, RawMessageStreamEvent -from ..._utils import consume_sync_iterator, consume_async_iterator +from ..._utils import is_dict, is_list, consume_sync_iterator, consume_async_iterator from ..._models import build, construct_type, construct_type_unchecked from ..._streaming import Stream, AsyncStream @@ -408,20 +408,16 @@ def _deep_merge_extra_fields(existing: object, new: object) -> object: - Lists: extend existing with new items (mutates existing list) - Other: replace with new value """ - if isinstance(existing, dict) and isinstance(new, dict): - existing_dict = cast("dict[str, object]", existing) - new_dict = cast("dict[str, object]", new) - for key, value in new_dict.items(): - if key in existing_dict: - existing_dict[key] = _deep_merge_extra_fields(existing_dict[key], value) + if is_dict(existing) and is_dict(new): + for key, value in new.items(): + if key in existing: + existing[key] = _deep_merge_extra_fields(existing[key], value) else: - existing_dict[key] = value - return existing_dict # Return mutated dict - elif isinstance(existing, list) and isinstance(new, list): - existing_list = cast("list[object]", existing) - new_list = cast("list[object]", new) - existing_list.extend(new_list) - return existing_list # Return mutated list + existing[key] = value + return existing # Return mutated dict + elif is_list(existing) and is_list(new): + existing.extend(new) + return existing # Return mutated list else: return new diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index a1b0817c..e85d3c8c 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -6,116 +6,90 @@ from __future__ import annotations -import os -import asyncio from typing import Any, cast -import httpx -import respx +from anthropic.types import Message, TextBlock, TextDelta, Usage +from anthropic.types.raw_message_delta_event import Delta, RawMessageDeltaEvent +from anthropic.types.raw_message_start_event import RawMessageStartEvent +from anthropic.types.raw_content_block_start_event import RawContentBlockStartEvent +from anthropic.types.raw_content_block_delta_event import RawContentBlockDeltaEvent +from anthropic.types.message_delta_usage import MessageDeltaUsage +from anthropic.lib.streaming._messages import accumulate_event -from anthropic import Anthropic, AsyncAnthropic -from .helpers import get_response, to_async_iter - -base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") -api_key = "my-anthropic-api-key" - -sync_client = Anthropic(base_url=base_url, api_key=api_key, _strict_response_validation=True) -async_client = AsyncAnthropic(base_url=base_url, api_key=api_key, _strict_response_validation=True) - - -def assert_extra_fields_accumulated(message: Any) -> None: - """Verify that extra fields are properly accumulated from streaming events. - - This test is intentionally generic - it doesn't know the specific field names, - just that extra fields should be deep-merged correctly. - """ - # Extra fields should be accessible via attribute access (pydantic's extra="allow") - assert hasattr(message, '__pydantic_extra__'), "Message should have __pydantic_extra__" - - extra = message.__pydantic_extra__ - assert 'private_field' in extra, "Extra fields should be accumulated" - - # Verify deep merging: nested dicts should be merged, lists should be extended - private_field_value = extra['private_field'] - assert isinstance(private_field_value, dict), "Extra field should be a dict" - private_field = cast(dict[str, object], private_field_value) - assert 'nested' in private_field, "Nested structure should be present" - - nested_value = private_field['nested'] - assert isinstance(nested_value, dict), "Nested field should be a dict" - nested = cast(dict[str, object], nested_value) - assert 'values' in nested, "Nested values should be present" - - # The 'values' list should have been extended across all streaming events: - # message_start: [1, 2] - # content_block_delta 1: [3] - # content_block_delta 2: [4, 5] - # message_delta: [6] - # Expected: [1, 2, 3, 4, 5, 6] - values_value = nested['values'] - assert isinstance(values_value, list), "Nested values should be a list" - values = cast(list[int], values_value) - assert values == [1, 2, 3, 4, 5, 6], "Lists should be extended, not replaced" - - # Last value from dict merge should be present - assert nested.get('metadata') == 'chunk2', "Dict values should be merged" - - -class TestSyncExtraFields: +class TestExtraFieldsAccumulation: def test_extra_fields_accumulation(self) -> None: - """Test that extra fields are accumulated during streaming.""" - with respx.mock(base_url=base_url) as respx_mock: - respx_mock.post("/v1/messages").mock( - return_value=httpx.Response(200, content=get_response("extra_fields_response.txt")) - ) - - with sync_client.messages.stream( - max_tokens=1024, - messages=[ - { - "role": "user", - "content": "Say hello!", - } - ], + """Test that extra fields are accumulated across streaming events.""" + # Build message with extra field via message_start + message_start = RawMessageStartEvent( + type="message_start", + message=Message( + id="msg_123", + type="message", + role="assistant", + content=[], model="claude-3-opus-latest", - ) as stream: - # Consume the stream - for _ in stream: - pass - - message = stream.get_final_message() - assert_extra_fields_accumulated(message) - - -class TestAsyncExtraFields: - def test_extra_fields_accumulation(self) -> None: - """Test that extra fields are accumulated during async streaming.""" - - async def run_test() -> None: - with respx.mock(base_url=base_url) as respx_mock: - respx_mock.post("/v1/messages").mock( - return_value=httpx.Response(200, content=to_async_iter(get_response("extra_fields_response.txt"))) - ) - - async with async_client.messages.stream( - max_tokens=1024, - messages=[ - { - "role": "user", - "content": "Say hello!", - } - ], - model="claude-3-opus-latest", - ) as stream: - # Consume the stream - async for _ in stream: - pass - - message = await stream.get_final_message() - assert_extra_fields_accumulated(message) - - asyncio.run(run_test()) + stop_reason=None, + stop_sequence=None, + usage=Usage(input_tokens=11, output_tokens=1), + # Extra field with nested structure + private_field={"nested": {"values": [1, 2]}}, # type: ignore[call-arg] + ), + ) + snapshot = accumulate_event(event=message_start, current_snapshot=None) + + # content_block_start + content_block_start = RawContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ) + snapshot = accumulate_event(event=content_block_start, current_snapshot=snapshot) + + # First content_block_delta with extra field + delta1 = RawContentBlockDeltaEvent( + type="content_block_delta", + index=0, + delta=TextDelta(type="text_delta", text="Hello"), + private_field={"nested": {"values": [3], "metadata": "chunk1"}}, # type: ignore[call-arg] + ) + snapshot = accumulate_event(event=delta1, current_snapshot=snapshot) + + # Second content_block_delta with extra field + delta2 = RawContentBlockDeltaEvent( + type="content_block_delta", + index=0, + delta=TextDelta(type="text_delta", text="!"), + private_field={"nested": {"values": [4, 5], "metadata": "chunk2"}}, # type: ignore[call-arg] + ) + snapshot = accumulate_event(event=delta2, current_snapshot=snapshot) + + # message_delta with extra field + message_delta = RawMessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=3), + private_field={"nested": {"values": [6]}}, # type: ignore[call-arg] + ) + snapshot = accumulate_event(event=message_delta, current_snapshot=snapshot) + + # Verify extra fields were accumulated + assert hasattr(snapshot, "__pydantic_extra__"), "Message should have __pydantic_extra__" + extra = snapshot.__pydantic_extra__ + assert extra is not None + assert "private_field" in extra, "Extra fields should be accumulated" + + private_field = cast(dict[str, Any], extra["private_field"]) + assert "nested" in private_field + + nested = cast(dict[str, Any], private_field["nested"]) + assert "values" in nested + + # Lists should be extended across all events: [1,2] + [3] + [4,5] + [6] + assert nested["values"] == [1, 2, 3, 4, 5, 6], "Lists should be extended, not replaced" + + # Dict values should use the last value + assert nested.get("metadata") == "chunk2", "Dict values should be merged" def test_deep_merge_extra_fields_function() -> None: From 2f51ae80752d07a2289e8be73da6ca9dbc242196 Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Mon, 24 Nov 2025 09:29:37 -0500 Subject: [PATCH 7/9] lint --- tests/lib/streaming/test_extra_fields.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index e85d3c8c..1e1093e4 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -8,13 +8,13 @@ from typing import Any, cast -from anthropic.types import Message, TextBlock, TextDelta, Usage +from anthropic.types import Usage, Message, TextBlock, TextDelta +from anthropic.lib.streaming._messages import accumulate_event +from anthropic.types.message_delta_usage import MessageDeltaUsage from anthropic.types.raw_message_delta_event import Delta, RawMessageDeltaEvent from anthropic.types.raw_message_start_event import RawMessageStartEvent -from anthropic.types.raw_content_block_start_event import RawContentBlockStartEvent from anthropic.types.raw_content_block_delta_event import RawContentBlockDeltaEvent -from anthropic.types.message_delta_usage import MessageDeltaUsage -from anthropic.lib.streaming._messages import accumulate_event +from anthropic.types.raw_content_block_start_event import RawContentBlockStartEvent class TestExtraFieldsAccumulation: From 829486182c91de312d80cdc5dd1032d654447e37 Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Mon, 24 Nov 2025 09:58:07 -0500 Subject: [PATCH 8/9] Pydantic v1 fix --- tests/lib/streaming/test_extra_fields.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index 1e1093e4..37a3cc1a 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -9,6 +9,7 @@ from typing import Any, cast from anthropic.types import Usage, Message, TextBlock, TextDelta +from anthropic._compat import PYDANTIC_V1 from anthropic.lib.streaming._messages import accumulate_event from anthropic.types.message_delta_usage import MessageDeltaUsage from anthropic.types.raw_message_delta_event import Delta, RawMessageDeltaEvent @@ -73,6 +74,10 @@ def test_extra_fields_accumulation(self) -> None: ) snapshot = accumulate_event(event=message_delta, current_snapshot=snapshot) + # This feature requires Pydantic v2 + if PYDANTIC_V1: + return + # Verify extra fields were accumulated assert hasattr(snapshot, "__pydantic_extra__"), "Message should have __pydantic_extra__" extra = snapshot.__pydantic_extra__ From 4a6c3179249ea179e293c808e2fdd67aa478c181 Mon Sep 17 00:00:00 2001 From: Evan Miller Date: Tue, 25 Nov 2025 10:03:09 -0500 Subject: [PATCH 9/9] PR comments --- .../fixtures/extra_fields_response.txt | 20 --- tests/lib/streaming/test_extra_fields.py | 156 +++++++++--------- 2 files changed, 79 insertions(+), 97 deletions(-) delete mode 100644 tests/lib/streaming/fixtures/extra_fields_response.txt diff --git a/tests/lib/streaming/fixtures/extra_fields_response.txt b/tests/lib/streaming/fixtures/extra_fields_response.txt deleted file mode 100644 index c9b9dd48..00000000 --- a/tests/lib/streaming/fixtures/extra_fields_response.txt +++ /dev/null @@ -1,20 +0,0 @@ -event: message_start -data: {"type":"message_start","message":{"id":"msg_123","type":"message","role":"assistant","content":[],"model":"claude-3-opus-latest","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":11,"output_tokens":1},"private_field":{"nested":{"values":[1,2]}}}} - -event: content_block_start -data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} - -event: content_block_delta -data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"},"private_field":{"nested":{"values":[3],"metadata":"chunk1"}}} - -event: content_block_delta -data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"!"},"private_field":{"nested":{"values":[4,5],"metadata":"chunk2"}}} - -event: content_block_stop -data: {"type":"content_block_stop","index":0} - -event: message_delta -data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":3},"private_field":{"nested":{"values":[6]}}} - -event: message_stop -data: {"type":"message_stop"} diff --git a/tests/lib/streaming/test_extra_fields.py b/tests/lib/streaming/test_extra_fields.py index 37a3cc1a..5b98ff34 100644 --- a/tests/lib/streaming/test_extra_fields.py +++ b/tests/lib/streaming/test_extra_fields.py @@ -8,6 +8,8 @@ from typing import Any, cast +import pytest + from anthropic.types import Usage, Message, TextBlock, TextDelta from anthropic._compat import PYDANTIC_V1 from anthropic.lib.streaming._messages import accumulate_event @@ -18,83 +20,83 @@ from anthropic.types.raw_content_block_start_event import RawContentBlockStartEvent -class TestExtraFieldsAccumulation: - def test_extra_fields_accumulation(self) -> None: - """Test that extra fields are accumulated across streaming events.""" - # Build message with extra field via message_start - message_start = RawMessageStartEvent( - type="message_start", - message=Message( - id="msg_123", - type="message", - role="assistant", - content=[], - model="claude-3-opus-latest", - stop_reason=None, - stop_sequence=None, - usage=Usage(input_tokens=11, output_tokens=1), - # Extra field with nested structure - private_field={"nested": {"values": [1, 2]}}, # type: ignore[call-arg] - ), - ) - snapshot = accumulate_event(event=message_start, current_snapshot=None) - - # content_block_start - content_block_start = RawContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ) - snapshot = accumulate_event(event=content_block_start, current_snapshot=snapshot) - - # First content_block_delta with extra field - delta1 = RawContentBlockDeltaEvent( - type="content_block_delta", - index=0, - delta=TextDelta(type="text_delta", text="Hello"), - private_field={"nested": {"values": [3], "metadata": "chunk1"}}, # type: ignore[call-arg] - ) - snapshot = accumulate_event(event=delta1, current_snapshot=snapshot) - - # Second content_block_delta with extra field - delta2 = RawContentBlockDeltaEvent( - type="content_block_delta", - index=0, - delta=TextDelta(type="text_delta", text="!"), - private_field={"nested": {"values": [4, 5], "metadata": "chunk2"}}, # type: ignore[call-arg] - ) - snapshot = accumulate_event(event=delta2, current_snapshot=snapshot) - - # message_delta with extra field - message_delta = RawMessageDeltaEvent( - type="message_delta", - delta=Delta(stop_reason="end_turn", stop_sequence=None), - usage=MessageDeltaUsage(output_tokens=3), - private_field={"nested": {"values": [6]}}, # type: ignore[call-arg] - ) - snapshot = accumulate_event(event=message_delta, current_snapshot=snapshot) - - # This feature requires Pydantic v2 - if PYDANTIC_V1: - return - - # Verify extra fields were accumulated - assert hasattr(snapshot, "__pydantic_extra__"), "Message should have __pydantic_extra__" - extra = snapshot.__pydantic_extra__ - assert extra is not None - assert "private_field" in extra, "Extra fields should be accumulated" - - private_field = cast(dict[str, Any], extra["private_field"]) - assert "nested" in private_field - - nested = cast(dict[str, Any], private_field["nested"]) - assert "values" in nested - - # Lists should be extended across all events: [1,2] + [3] + [4,5] + [6] - assert nested["values"] == [1, 2, 3, 4, 5, 6], "Lists should be extended, not replaced" - - # Dict values should use the last value - assert nested.get("metadata") == "chunk2", "Dict values should be merged" +@pytest.mark.skipif(PYDANTIC_V1, reason="Extra fields accumulation not supported in Pydantic v1") +def test_extra_fields_accumulation(): + """Test that extra fields are accumulated across streaming events.""" + # Build message with extra field via message_start + message_start = RawMessageStartEvent( + type="message_start", + message=Message( + id="msg_123", + type="message", + role="assistant", + content=[], + model="claude-3-opus-latest", + stop_reason=None, + stop_sequence=None, + usage=Usage(input_tokens=11, output_tokens=1), + # Extra field with nested structure + private_field={"nested": {"values": [1, 2]}}, # type: ignore[call-arg] + ), + ) + snapshot = accumulate_event(event=message_start, current_snapshot=None) + + # content_block_start + content_block_start = RawContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ) + snapshot = accumulate_event(event=content_block_start, current_snapshot=snapshot) + + # First content_block_delta with extra field + delta1 = RawContentBlockDeltaEvent( + type="content_block_delta", + index=0, + delta=TextDelta(type="text_delta", text="Hello"), + private_field={"nested": {"values": [3], "metadata": "chunk1"}}, # type: ignore[call-arg] + ) + snapshot = accumulate_event(event=delta1, current_snapshot=snapshot) + + # Second content_block_delta with extra field + delta2 = RawContentBlockDeltaEvent( + type="content_block_delta", + index=0, + delta=TextDelta(type="text_delta", text="!"), + private_field={"nested": {"values": [4, 5], "metadata": "chunk2"}}, # type: ignore[call-arg] + ) + snapshot = accumulate_event(event=delta2, current_snapshot=snapshot) + + # message_delta with extra field + message_delta = RawMessageDeltaEvent( + type="message_delta", + delta=Delta(stop_reason="end_turn", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=3), + private_field={"nested": {"values": [6]}}, # type: ignore[call-arg] + ) + snapshot = accumulate_event(event=message_delta, current_snapshot=snapshot) + + # This feature requires Pydantic v2 + if PYDANTIC_V1: + return + + # Verify extra fields were accumulated + assert hasattr(snapshot, "__pydantic_extra__"), "Message should have __pydantic_extra__" + extra = snapshot.__pydantic_extra__ + assert extra is not None + assert "private_field" in extra, "Extra fields should be accumulated" + + private_field = cast(dict[str, Any], extra["private_field"]) + assert "nested" in private_field + + nested = cast(dict[str, Any], private_field["nested"]) + assert "values" in nested + + # Lists should be extended across all events: [1,2] + [3] + [4,5] + [6] + assert nested["values"] == [1, 2, 3, 4, 5, 6], "Lists should be extended, not replaced" + + # Dict values should use the last value + assert nested.get("metadata") == "chunk2", "Dict values should be merged" def test_deep_merge_extra_fields_function() -> None: