Skip to content

Conversation

@hassiebp
Copy link
Contributor

@hassiebp hassiebp commented Sep 26, 2025

Important

Enhances context propagation in generators using new wrapper classes in observe.py, ensuring trace continuity with comprehensive testing.

  • Behavior:
    • Introduces ContextPreservedSyncGeneratorWrapper and ContextPreservedAsyncGeneratorWrapper in observe.py to maintain context during generator iteration.
    • Uses contextvars.copy_context() to capture and preserve context for each generator iteration.
    • Adds fallback for Python < 3.10 in observe.py for async generators.
  • Testing:
    • Adds tests in test_decorators.py for sync and async generator context preservation, exception handling, and trace hierarchy.
    • Tests cover scenarios like empty generators, exceptions, and multi-project setups.
  • Misc:
    • Refactors generator handling in observe.py to use new wrapper classes.

This description was created by Ellipsis for b50dae4. You can customize this summary. It will automatically update as commits are pushed.


Disclaimer: Experimental PR review

Greptile Overview

Updated On: 2025-09-26 15:22:13 UTC

Summary

This PR implements generator context propagation for the @observe decorator to fix an issue where OpenTelemetry context was lost during generator iteration, causing traces to break when generators were consumed outside of their original scope.

Key Changes

  • Generator Wrapper Classes: Replaced simple try/finally generator wrapping with custom wrapper classes (ContextPreservedSyncGeneratorWrapper and ContextPreservedAsyncGeneratorWrapper) that preserve the execution context for each iteration
  • Context Preservation: Uses contextvars.copy_context() to capture the active span context when the generator is created, then runs each iteration within that preserved context
  • Python Version Compatibility: Includes fallback logic for Python < 3.10 where asyncio.create_task doesn't support the context parameter
  • Comprehensive Testing: Added extensive test coverage for context preservation scenarios including multi-project setups, concurrency, and exception handling

Technical Implementation

The fix addresses the core issue where generators returned by decorated functions would lose their OpenTelemetry context when consumed later (e.g., by streaming responses in FastAPI). The new wrapper classes ensure that:

  1. Context is captured while the originating span is still active
  2. Each generator iteration runs within the preserved context
  3. Final output aggregation and span completion occur correctly
  4. Exception handling maintains proper trace state

This change maintains backward compatibility while ensuring proper trace continuity for streaming/generator use cases.

Confidence Score: 4/5

  • This PR is generally safe to merge with a well-tested solution to a legitimate context propagation issue
  • Score reflects solid implementation with comprehensive test coverage, but has a minor fallback logic concern for Python < 3.10 async generators and some style improvements needed
  • Pay close attention to langfuse/_client/observe.py for the Python version compatibility fallback logic

Important Files Changed

File Analysis

Filename        Score        Overview
langfuse/_client/observe.py 4/5 Major rewrite of generator wrapper methods to preserve context using contextvars. Replaces simple try/finally approach with wrapper classes that run each iteration in preserved context. Includes fallback for Python < 3.10.
tests/test_decorators.py 5/5 Extensive test coverage for context preservation in generators including sync/async generators, multi-project scenarios, concurrency, and exception handling. Tests verify that contexts are properly preserved across generator iterations.

Sequence Diagram

sequenceDiagram
    participant User
    participant Observer as @observe Decorator
    participant Context as contextvars.Context
    participant Wrapper as GeneratorWrapper
    participant Original as Original Generator
    participant Span as LangfuseSpan

    User->>Observer: Call decorated function
    Observer->>Context: copy_context()
    Context-->>Observer: preserved_context
    Observer->>Original: Execute function
    Original-->>Observer: Return generator
    Observer->>Wrapper: Create wrapper class
    Observer->>Wrapper: Pass (generator, context, span)
    Observer-->>User: Return wrapper

    Note over User, Wrapper: Later consumption by user code

    User->>Wrapper: __iter__() or __aiter__()
    Wrapper-->>User: Return self

    loop For each item
        User->>Wrapper: __next__() or __anext__()
        Wrapper->>Context: context.run(next, generator)
        Context->>Original: Execute in preserved context
        Original-->>Context: Yield item
        Context-->>Wrapper: Return item
        Wrapper->>Wrapper: items.append(item)
        Wrapper-->>User: Return item
    end

    User->>Wrapper: __next__() (no more items)
    Wrapper->>Original: StopIteration/StopAsyncIteration
    Wrapper->>Span: update(output=joined_items)
    Wrapper->>Span: end()
    Wrapper-->>User: Re-raise StopIteration
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@hassiebp hassiebp force-pushed the add-preserve-context-helper branch from 370f69c to 5841a59 Compare September 28, 2025 09:57
hassiebp and others added 5 commits September 28, 2025 12:35
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
@hassiebp hassiebp merged commit d02d940 into main Sep 28, 2025
11 checks passed
@hassiebp hassiebp deleted the add-preserve-context-helper branch September 28, 2025 14:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug(python-sdk): Using @observe with FastAPI StreamingResponse breaks everything up into separate traces.

2 participants