Skip to content

Conversation

@abhishar432
Copy link
Contributor

@abhishar432 abhishar432 commented Oct 9, 2025

Important

Ensure ingestion and media queues are always flushed in resource_manager.py, even with ProxyTracerProvider.

  • Behavior:
    • Modify flush() and shutdown() in resource_manager.py to ensure ingestion and media queues are always flushed.
    • Change condition to check for ProxyTracerProvider instances, ensuring flush operations are not skipped.
  • Logging:
    • Add debug log in flush() to confirm successful flushing of OTEL tracer provider, score ingestion queue, and media upload queue.

This description was created by Ellipsis for 2479a83. You can customize this summary. It will automatically update as commits are pushed.

Disclaimer: Experimental PR review

Greptile Overview

Updated On: 2025-10-09 20:05:52 UTC

Summary

This PR fixes a critical data loss bug in the `LangfuseResourceManager`'s flush and shutdown operations. The issue occurred when the OpenTelemetry tracer provider was a `ProxyTracerProvider` (OpenTelemetry's default provider when no explicit configuration is set), which is common in many deployment scenarios.

Previously, both the flush() and shutdown() methods would return early when encountering a ProxyTracerProvider, skipping the essential queue flushing operations for score ingestion and media upload queues. This meant that pending scores and media uploads could be lost during application shutdown or manual flush operations.

The fix inverts the conditional logic to ensure that the Langfuse-specific queues (score ingestion and media upload) are always flushed, while only making the OpenTelemetry tracer provider flush conditional on having a real (non-proxy) tracer provider. This change maintains thread safety and resource cleanup guarantees that are critical for the LangfuseResourceManager's design as a singleton managing background threads and queues.

The change integrates well with the existing codebase architecture, where the LangfuseResourceManager serves as the central coordinator for background processing tasks. The fix preserves the existing OpenTelemetry integration while ensuring Langfuse's internal queue management remains robust regardless of OpenTelemetry configuration state.

Important Files Changed

Changed Files
Filename Score Overview
langfuse/_client/resource_manager.py 5/5 Fixed critical data loss bug by ensuring ingestion and media queues are always flushed regardless of OpenTelemetry tracer provider type

Confidence score: 5/5

  • This PR is safe to merge with minimal risk as it fixes a clear data loss bug with a straightforward solution
  • Score reflects the critical nature of the fix and the simple, well-targeted changes that address the root cause without introducing complexity
  • No files require special attention as the change is isolated and the logic is clear

Sequence Diagram

sequenceDiagram
    participant User
    participant LRM as "LangfuseResourceManager"
    participant TP as "TracerProvider"
    participant SIQ as "Score Ingestion Queue"
    participant MUQ as "Media Upload Queue"
    participant SIC as "ScoreIngestionConsumer"
    participant MUC as "MediaUploadConsumer"

    User->>LRM: "flush()"
    LRM->>TP: "force_flush()"
    TP-->>LRM: "flush complete"
    LRM->>LRM: "log 'Successfully flushed OTEL tracer provider'"
    
    LRM->>SIQ: "join()"
    Note over SIQ: "Wait for all score ingestion tasks to complete"
    SIQ-->>LRM: "all tasks complete"
    LRM->>LRM: "log 'Successfully flushed score ingestion queue'"
    
    LRM->>MUQ: "join()"
    Note over MUQ: "Wait for all media upload tasks to complete"
    MUQ-->>LRM: "all tasks complete"
    LRM->>LRM: "log 'Successfully flushed media upload queue'"
    
    LRM-->>User: "flush complete"
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, no comments

Edit Code Review Agent Settings | Greptile

@hassiebp hassiebp merged commit 746c951 into langfuse:main Oct 10, 2025
5 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants