Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@

import backoff
import httpx
from opentelemetry import trace
from opentelemetry import trace as otel_trace_api
from opentelemetry import (
trace as otel_trace_api,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.util._decorator import (
Expand Down Expand Up @@ -110,9 +111,10 @@
TextPromptClient,
)
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext
from langfuse._client.context_propagation import LangfuseContextPropagationMixin


class Langfuse:
class Langfuse(LangfuseContextPropagationMixin):
"""Main client for Langfuse tracing and platform features.

This class provides an interface for creating and managing traces, spans,
Expand Down Expand Up @@ -189,6 +191,7 @@ class Langfuse:
_resources: Optional[LangfuseResourceManager] = None
_mask: Optional[MaskFunction] = None
_otel_tracer: otel_trace_api.Tracer
_host: str

def __init__(
self,
Expand All @@ -211,8 +214,10 @@ def __init__(
additional_headers: Optional[Dict[str, str]] = None,
tracer_provider: Optional[TracerProvider] = None,
):
self._host = host or cast(
str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
self._host = (
host
if host is not None
else os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original version is more readableand to my understanding they do the same. If this is true, should we revert this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, they are both equivalent

self._environment = environment or cast(
str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT)
Expand Down Expand Up @@ -1667,6 +1672,11 @@ def update_current_trace(
span.update(output=response)
```
"""
warnings.warn(
"update_current_trace is deprecated and will be removed in a future version. ",
DeprecationWarning,
stacklevel=2,
)
if not self._tracing_enabled:
langfuse_logger.debug(
"Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode."
Expand Down Expand Up @@ -1811,7 +1821,7 @@ def _create_remote_parent_span(
is_remote=False,
)

return trace.NonRecordingSpan(span_context)
return otel_trace_api.NonRecordingSpan(span_context)

def _is_valid_trace_id(self, trace_id: str) -> bool:
pattern = r"^[0-9a-f]{32}$"
Expand Down
244 changes: 244 additions & 0 deletions langfuse/_client/context_propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
"""Context propagation utilities for Langfuse tracing.
This module provides a mixin class that enables automatic propagation of trace
attributes (session_id, user_id, metadata) from parent contexts to child spans
using OpenTelemetry's context and baggage mechanisms.
The mixin is shared between the main Langfuse client and span classes to provide
consistent context propagation behavior across the SDK.
"""

import json
from typing import Any, Generator

from opentelemetry import (
baggage as otel_baggage_api,
context as otel_context_api,
trace as otel_trace_api,
)
from opentelemetry.util._decorator import _agnosticcontextmanager

# Context key constants for Langfuse context propagation
LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id"
LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id"
LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata"


class LangfuseContextPropagationMixin:
"""Mixin providing context managers for automatic trace attribute propagation.
This mixin adds three context managers (session, user, metadata) that enable
automatic propagation of trace attributes to all child spans created within
their scope. The propagation works through OpenTelemetry's context mechanism
for local (same-service) propagation, with optional baggage for cross-service
propagation.
Classes that inherit this mixin gain the ability to create contexts where
certain attributes are automatically applied to all spans without manual
specification.
"""

@_agnosticcontextmanager
def session(
self, id: str, *, as_baggage: bool = False
Copy link
Contributor

@hassiebp hassiebp Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename as_baggage to something more immediately descriptive on what the impact is. Something like propagate_via_http makes it both clear that this is propagation related and comes with security considerations (users might be unfamiliar with OTEL in general and baggage in particular). We could also consider dropping this entirely and only allow enabling it via env var only as users probably won't dynamically decide in runtime whether to use baggage or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For propagation, we also need to consider / test the multiprocessing case in Python, where users use Celery workers that are separate Python processes to handle background tasks. Trace attributes must be propagated there, too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hassiebp Test langchain

) -> Generator[None, None, None]:
"""Create a session context manager that propagates session_id to all child spans.
Args:
id (str): The session identifier to propagate to child spans.
as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage
for cross-service propagation. If False, stores only in local context for
current-service propagation. Defaults to False.
Returns:
Context manager that sets session_id on all spans created within its scope.
Warning:
When as_baggage=True, the session_id will be included in HTTP headers of any
outbound requests made within this context. Only use this for non-sensitive
identifiers that are safe to transmit across service boundaries.
Example:
```python
# Local context only (default)
with langfuse.session(id="session_123"):
with langfuse.start_as_current_span(name="process-request") as span:
# This span and all its children will have session_id="session_123"
child_span = langfuse.start_span(name="child-operation")
# Cross-service propagation (use with caution)
with langfuse.session(id="session_123", as_baggage=True):
# session_id will be propagated to external service calls
response = requests.get("https://api.example.com/data")
```
"""
# Set context variable
new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id)
token = otel_context_api.attach(new_context)

# Set attribute on currently active span if exists
current_span = otel_trace_api.get_current_span()
if current_span is not None and current_span.is_recording():
current_span.set_attribute("session.id", id)

# Set baggage if requested
baggage_token = None
if as_baggage:
new_baggage = otel_baggage_api.set_baggage("session.id", id)
baggage_token = otel_context_api.attach(new_baggage)

try:
yield
finally:
# Always detach context token
otel_context_api.detach(token)

# Detach baggage token if it was set
if baggage_token is not None:
otel_context_api.detach(baggage_token)

@_agnosticcontextmanager
def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]:
"""Create a user context manager that propagates user_id to all child spans.
Args:
id (str): The user identifier to propagate to child spans.
as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage
for cross-service propagation. If False, stores only in local context for
current-service propagation. Defaults to False.
Returns:
Context manager that sets user_id on all spans created within its scope.
Warning:
When as_baggage=True, the user_id will be included in HTTP headers of any
outbound requests made within this context. This may leak sensitive user
information to external services. Use with extreme caution.
Example:
```python
# Local context only (default, recommended for user IDs)
with langfuse.user(id="user_456"):
with langfuse.start_as_current_span(name="user-action") as span:
# This span and all its children will have user_id="user_456"
pass
# Cross-service propagation (NOT recommended for sensitive user IDs)
with langfuse.user(id="public_user_456", as_baggage=True):
# user_id will be propagated to external service calls
response = requests.get("https://api.example.com/data")
```
"""
# Set context variable
new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id)
token = otel_context_api.attach(new_context)

# Set attribute on currently active span if exists
current_span = otel_trace_api.get_current_span()
if current_span is not None and current_span.is_recording():
current_span.set_attribute("user.id", id)

# Set baggage if requested
baggage_token = None
if as_baggage:
new_baggage = otel_baggage_api.set_baggage("user.id", id)
baggage_token = otel_context_api.attach(new_baggage)

try:
yield
finally:
# Always detach context token
otel_context_api.detach(token)

# Detach baggage token if it was set
if baggage_token is not None:
otel_context_api.detach(baggage_token)

@_agnosticcontextmanager
def metadata(
self, *, as_baggage: bool = False, **kwargs: Any
) -> Generator[None, None, None]:
"""Create a metadata context manager that propagates metadata to all child spans.
Args:
as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage
for cross-service propagation. If False, stores only in local context for
current-service propagation. Defaults to False.
**kwargs: Metadata key-value pairs. Values should not exceed 200 characters.
Returns:
Context manager that sets metadata on all spans created within its scope.
Warning:
When as_baggage=True, all metadata key-value pairs will be included in HTTP
headers of any outbound requests made within this context. Ensure no sensitive
information is included in the metadata when using cross-service propagation.
Example:
```python
# Local context only (default)
with langfuse.metadata(experiment="A/B", version="1.2.3"):
with langfuse.start_as_current_span(name="experiment-run") as span:
# This span and all its children will have the metadata
pass
# Cross-service propagation (use with caution)
with langfuse.metadata(as_baggage=True, experiment="A/B", service="api"):
# metadata will be propagated to external service calls
response = requests.get("https://api.example.com/data")
```
"""
if not kwargs:
# No metadata to set, just yield
yield
return

# Store metadata as a dict in context (not JSON string)
# This allows span_processor to distribute keys as individual attributes
new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs)
token = otel_context_api.attach(new_context)

# Set attributes on currently active span if exists
current_span = otel_trace_api.get_current_span()
if current_span is not None and current_span.is_recording():
for key, value in kwargs.items():
attr_key = f"langfuse.metadata.{key}"
# Convert value to appropriate type for span attribute
if isinstance(value, (str, int, float, bool)):
attr_value = value
else:
# For complex types, convert to JSON string
attr_value = json.dumps(value)
current_span.set_attribute(attr_key, attr_value)

# Set baggage if requested
baggage_token = None
if as_baggage:
# Start with None context and chain baggage settings
new_baggage = None

# Add each metadata key-value pair to baggage
for key, value in kwargs.items():
# Convert value to string and truncate if needed for baggage
str_value = str(value)
if len(str_value) > 200:
str_value = str_value[:200]

baggage_key = f"metadata.{key}"
new_baggage = otel_baggage_api.set_baggage(
baggage_key, str_value, new_baggage
)

# Attach the new baggage context
if new_baggage is not None:
baggage_token = otel_context_api.attach(new_baggage)

try:
yield
finally:
# Always detach context token
otel_context_api.detach(token)

# Detach all baggage tokens if they were set
if baggage_token is not None:
otel_context_api.detach(baggage_token)
16 changes: 13 additions & 3 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
overload,
)

from opentelemetry import trace as otel_trace_api
from opentelemetry.util._decorator import _AgnosticContextManager
from opentelemetry import (
trace as otel_trace_api,
)
from opentelemetry.util._decorator import (
_AgnosticContextManager,
)

from langfuse.model import PromptClient

Expand All @@ -52,14 +56,15 @@
)
from langfuse.logger import langfuse_logger
from langfuse.types import MapValue, ScoreDataType, SpanLevel
from langfuse._client.context_propagation import LangfuseContextPropagationMixin

# Factory mapping for observation classes
# Note: "event" is handled separately due to special instantiation logic
# Populated after class definitions
_OBSERVATION_CLASS_MAP: Dict[str, Type["LangfuseObservationWrapper"]] = {}


class LangfuseObservationWrapper:
class LangfuseObservationWrapper(LangfuseContextPropagationMixin):
"""Abstract base class for all Langfuse span types.

This class provides common functionality for all Langfuse span types, including
Expand Down Expand Up @@ -233,6 +238,11 @@ def update_trace(
tags: List of tags to categorize the trace
public: Whether the trace should be publicly accessible
"""
warnings.warn(
"update_trace is deprecated and will be removed in a future version. ",
DeprecationWarning,
stacklevel=2,
)
if not self._otel_span.is_recording():
return self

Expand Down
Loading