feat: Add sync streaming support for Anthropic instrumentation#4155
feat: Add sync streaming support for Anthropic instrumentation#4155vasantteja wants to merge 36 commits intoopen-telemetry:mainfrom
Conversation
- Add support for Messages.create(stream=True) with StreamWrapper - Add support for Messages.stream() with MessageStreamManagerWrapper - Add MessageWrapper for non-streaming response telemetry - Rename MessageCreateParams to MessageRequestParams - Add comprehensive tests for sync streaming functionality
- Add type: ignore[arg-type] for Union type narrowing in messages_create - Add type: ignore[return-value] for wrapper return types - Add type: ignore[return-value] for __exit__ returning None
99a2596 to
504d0df
Compare
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py
Outdated
Show resolved
Hide resolved
|
tagging @anirudha who was interested to review the PR :) |
|
Thanks. Taking a look today |
…r handling - Introduce constants for provider name and cache token attributes. - Normalize stop reasons and aggregate cache token fields in MessageWrapper and StreamWrapper. - Enhance tests to validate input token aggregation and stop reason normalization. - Update cassettes for new request and response structures in streaming scenarios.
…d consistency - Simplify constant definitions and normalize function calls in utils.py. - Enhance test cases by removing unnecessary line breaks and improving formatting. - Ensure consistent usage of type hints and comments in test functions.
- Update the pylint directive to disable too-many-arguments warning for better clarity. - Maintain consistency in function signature and improve code readability.
anirudha
left a comment
There was a problem hiding this comment.
Tests all pass locally. Nice work overall — the wrapper separation is clean. One bug to fix (double finalize), rest are suggestions.
Note: conftest.py isn't in this diff so I can't leave a line comment, but scrub_response_headers is a no-op and all new cassettes leak anthropic-organization-id: 455ea6be-bd92-4199-83ec-0c6b39c5c169. Worth scrubbing that or adding it to filter_headers.
Also, the PR description says Fixes #3949 but async streaming isn't covered. Totally fine to scope this to sync only, but Fixes will auto-close the issue on merge. Maybe Partially addresses #3949 instead?
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Show resolved
Hide resolved
…tion - Update test cases to validate streaming behavior with various parameters, including token usage and stop reasons. - Introduce new cassettes for different scenarios, ensuring comprehensive coverage of streaming interactions. - Refactor existing tests for clarity and consistency in structure and assertions.
…ocals in test_stream_wrapper_finalize_idempotent function
…eja/opentelemetry-python-contrib into anthropic-sync-streaming
…e. Introduced MessageWrapper and StreamWrapper classes for telemetry handling. Updated tests to reflect changes in instrumentation behavior.
…ty functions, and update wrapper classes for better clarity and maintainability. Removed unused code and improved type safety in utility functions. Updated tests to reflect changes in the instrumentation behavior.
…imports and streamline finish reason normalization for improved clarity and maintainability.
| def _skip_if_cassette_missing_and_no_real_key(request): | ||
| cassette_path = ( | ||
| Path(__file__).parent / "cassettes" / f"{request.node.name}.yaml" | ||
| ) | ||
| api_key = os.getenv("ANTHROPIC_API_KEY") | ||
| if not cassette_path.exists() and api_key == "test_anthropic_api_key": | ||
| pytest.skip( | ||
| f"Cassette {cassette_path.name} is missing. " | ||
| "Set a real ANTHROPIC_API_KEY to record it." | ||
| ) |
There was a problem hiding this comment.
Why would the cassette be missing?
There was a problem hiding this comment.
The _skip_if_cassette_missing_and_no_real_key guard is called only by the tool_use and thinking tests — whose cassettes may not exist if the SDK version is too old to record them. It prevents a confusing failure (dummy key hitting the real API) by cleanly skipping the test with a message to set a real key. Lmk if you want to remove this.
There was a problem hiding this comment.
Why are there several new requests in here? It doesn't look like test_sync_messages_create_stop_reason was changed to make more request
In case you haven't seen it, you can pass VCR record mode with --vcr-record=<mode>
There was a problem hiding this comment.
Honeslty thanks for this. I was using this pytest instrumentation-genai/opentelemetry-instrumentation-anthropic/tests --vcr-record=all and it was appending my requests. I was rerunning this everytime I make a change just to ensure everything is good. This bulked up the cassettes. I deleted the cassettes and recreated them.
…st and response structures, enhance error handling scenarios, and ensure consistency in message formats across various test cases. Removed outdated data and improved clarity in test interactions.
MikeGoldsmith
left a comment
There was a problem hiding this comment.
Looking good, thanks @vasantteja.
I've left some suggestions and things we need to figure out if we want to remove or update.
...ntelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Outdated
Show resolved
Hide resolved
instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/conftest.py
Show resolved
Hide resolved
...ntelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py
Show resolved
Hide resolved
...ntelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Outdated
Show resolved
Hide resolved
...ntelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py
Outdated
Show resolved
Hide resolved
…apper to include content capture logic, improve type safety with explicit casting, and streamline test cases for better clarity. Added new test for streaming response attributes and refined existing tests to ensure consistency in message handling.
…Ds, timestamps, and token usage across various test cases. Refine content capture logic and ensure consistency in message formats, including adjustments to event data and headers for improved clarity and accuracy.
| def __enter__(self) -> "StreamWrapper": | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: |
There was a problem hiding this comment.
Potential bug: __next__ can call fail_llm(...) on stream errors, but _finalized is not set on that path, and __exit__ always calls close() which then calls _finalize_invocation() -> stop_llm(...). That can finalize the same invocation twice (fail then stop) and potentially produce incorrect success telemetry after an error. Also, because __exit__ ignores exc_type, exceptions raised by user code inside with stream: are currently treated as successful completion.
There was a problem hiding this comment.
Agreed!! This abstraction is little error prone. I rewrote it to mimic openai responses stream wrapper incorporating your other suggestion on that PR.
...ntelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py
Outdated
Show resolved
Hide resolved
...ntelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py
Outdated
Show resolved
Hide resolved
...ntelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py
Outdated
Show resolved
Hide resolved
|
|
||
| if result.model: | ||
| invocation.response_model_name = result.model | ||
| is_streaming = kwargs.get("stream", False) |
There was a problem hiding this comment.
Is it possible to make this type safe by moving it into the extract_params() function's return value?
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Outdated
Show resolved
Hide resolved
...opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py
Outdated
Show resolved
Hide resolved
...instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/messages_extractors.py
Outdated
Show resolved
Hide resolved
…oved type safety. Replace 'Any' with 'object' in several function signatures and class attributes. Introduce logging for error handling in MessagesStreamWrapper to enhance instrumentation reliability.
…eja/opentelemetry-python-contrib into anthropic-sync-streaming
… clarity and safety. Update function signatures to use specific types instead of 'object', including changes to parameters in extract_params, get_input_messages, and get_system_instruction. Refactor messages_create to ensure correct type handling for streaming and non-streaming responses. Additionally, streamline message handling in MessagesStreamWrapper for better performance and reliability.
Description
This PR adds sync streaming support for the Anthropic instrumentation. It enables telemetry capture for:
Messages.create(stream=True)- Streaming responses via the create method with stream parameterMessages.stream()- The dedicated streaming method that returns aMessageStreamManagerKey changes:
StreamWrapperclass to wrapStream[RawMessageStreamEvent]and extract telemetry from streaming chunksMessageStreamManagerWrapperto wrapMessageStreamManagercontext managerMessageWrapperfor non-streaming response telemetry extractionMessageCreateParamstoMessageRequestParamsto reflect broader API coveragemessages_createto use manual lifecycle management (start_llm/stop_llm) instead of context manager to support both streaming and non-streamingFixes #3949 partially.
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Added comprehensive tests for sync streaming functionality:
test_sync_messages_create_streaming- Tests streaming with context managertest_sync_messages_create_streaming_iteration- Tests direct iteration without context managertest_sync_messages_create_streaming_connection_error- Tests error handling for streamingtest_sync_messages_stream_basic- TestsMessages.stream()methodtest_sync_messages_stream_with_params- Tests stream with additional parameters (temperature, top_p, top_k)test_sync_messages_stream_token_usage- Tests token usage capture in streamingtest_sync_messages_stream_connection_error- Tests error handling for stream methodAll tests use VCR cassettes for reproducible HTTP interaction replay.
Does This PR Require a Core Repo Change?
Checklist:
See contributing.md for styleguide, changelog guidelines, and more.