Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion langfuse/_client/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ObservationTypeGenerationLike,
ObservationTypeSpanLike,
)

from langfuse._utils.serializer import EventSerializer
from langfuse.model import PromptClient
from langfuse.types import MapValue, SpanLevel
Expand Down
103 changes: 99 additions & 4 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Any,
Callable,
Dict,
Generator,
List,
Literal,
Optional,
Expand All @@ -27,8 +28,15 @@

import backoff
import httpx
from opentelemetry import trace
from opentelemetry import trace as otel_trace_api
from opentelemetry import (
baggage as otel_baggage_api,
)
from opentelemetry import (
context as otel_context_api,
)
from opentelemetry import (
trace as otel_trace_api,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.util._decorator import (
Expand All @@ -39,6 +47,7 @@

from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.constants import (
LANGFUSE_CORRELATION_CONTEXT_KEY,
ObservationTypeGenerationLike,
ObservationTypeLiteral,
ObservationTypeLiteralNoEvent,
Expand Down Expand Up @@ -69,7 +78,10 @@
LangfuseSpan,
LangfuseTool,
)
from langfuse._client.utils import run_async_safely
from langfuse._client.utils import (
get_attribute_key_from_correlation_context,
run_async_safely,
)
from langfuse._utils import _get_timestamp
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
Expand Down Expand Up @@ -189,6 +201,7 @@ class Langfuse:
_resources: Optional[LangfuseResourceManager] = None
_mask: Optional[MaskFunction] = None
_otel_tracer: otel_trace_api.Tracer
_host: str

def __init__(
self,
Expand Down Expand Up @@ -350,6 +363,83 @@ def start_span(
status_message=status_message,
)

@_agnosticcontextmanager
def correlation_context(
self,
correlation_context: Dict[str, str],
*,
as_baggage: bool = False,
) -> Generator[None, None, None]:
"""Create a context manager that propagates the given correlation_context to all spans within the context manager's scope.

Args:
correlation_context (Dict[str, str]): Dictionary containing key-value pairs to be propagated
to all spans within the context manager's scope. Common keys include user_id, session_id,
and custom metadata. All values must be strings below 200 characters.
as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage
for cross-service propagation. If False, stores only in local context for
current-service propagation. Defaults to False.
Copy link
Member

@maxdeichmann maxdeichmann Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a sentence here that this will be added to the headers of all outgoing HTTP requests. I would add the warning from below up here as not everyone is reading everything.


Returns:
Context manager that sets values on all spans created within its scope.

Warning:
When as_baggage=True, the values will be included in HTTP headers of any
outbound requests made within this context. Only use this for non-sensitive
identifiers that are safe to transmit across service boundaries.

Examples:
```python
# Local context only (default) - pass context as dictionary
with langfuse.correlation_context({"session_id": "session_123"}):
with langfuse.start_as_current_span(name="process-request") as span:
# This span and all its children will have session_id="session_123"
child_span = langfuse.start_span(name="child-operation")

# Multiple values in context dictionary
with langfuse.correlation_context({"user_id": "user_456", "experiment": "A"}):
# All spans will have both user_id and experiment attributes
span = langfuse.start_span(name="experiment-operation")

# Cross-service propagation (use with caution)
with langfuse.correlation_context({"session_id": "session_123"}, as_baggage=True):
# session_id will be propagated to external service calls
response = requests.get("https://api.example.com/data")
```
"""
current_context = otel_context_api.get_current()
current_span = otel_trace_api.get_current_span()

current_context = otel_context_api.set_value(
LANGFUSE_CORRELATION_CONTEXT_KEY, correlation_context, current_context
)

for key, value in correlation_context.items():
if len(value) > 200:
langfuse_logger.warning(
f"Correlation context key '{key}' is over 200 characters ({len(value)} chars). Dropping value."
)
continue

attribute_key = get_attribute_key_from_correlation_context(key)

if current_span is not None and current_span.is_recording():
current_span.set_attribute(attribute_key, value)

if as_baggage:
current_context = otel_baggage_api.set_baggage(
key, value, current_context
)

# Activate context, execute, and detach context
token = otel_context_api.attach(current_context)

try:
yield

finally:
otel_context_api.detach(token)

def start_as_current_span(
self,
*,
Expand Down Expand Up @@ -1667,6 +1757,11 @@ def update_current_trace(
span.update(output=response)
```
"""
warnings.warn(
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
DeprecationWarning,
stacklevel=2,
)
if not self._tracing_enabled:
langfuse_logger.debug(
"Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode."
Expand Down Expand Up @@ -1811,7 +1906,7 @@ def _create_remote_parent_span(
is_remote=False,
)

return trace.NonRecordingSpan(span_context)
return otel_trace_api.NonRecordingSpan(span_context)

def _is_valid_trace_id(self, trace_id: str) -> bool:
pattern = r"^[0-9a-f]{32}$"
Expand Down
5 changes: 4 additions & 1 deletion langfuse/_client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
This module defines constants used throughout the Langfuse OpenTelemetry integration.
"""

from typing import Literal, List, get_args, Union, Any
from typing import Any, List, Literal, Union, get_args

from typing_extensions import TypeAlias

LANGFUSE_TRACER_NAME = "langfuse-sdk"

LANGFUSE_CORRELATION_CONTEXT_KEY = "langfuse.ctx.correlation"


"""Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat"""
ObservationTypeGenerationLike: TypeAlias = Literal[
Expand Down
11 changes: 8 additions & 3 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
and scoring integration specific to Langfuse's observability platform.
"""

import warnings
from datetime import datetime
from time import time_ns
import warnings
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -44,10 +44,10 @@
create_trace_attributes,
)
from langfuse._client.constants import (
ObservationTypeLiteral,
ObservationTypeGenerationLike,
ObservationTypeSpanLike,
ObservationTypeLiteral,
ObservationTypeLiteralNoEvent,
ObservationTypeSpanLike,
get_observation_types_list,
)
from langfuse.logger import langfuse_logger
Expand Down Expand Up @@ -233,6 +233,11 @@ def update_trace(
tags: List of tags to categorize the trace
public: Whether the trace should be publicly accessible
"""
warnings.warn(
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
DeprecationWarning,
stacklevel=2,
)
if not self._otel_span.is_recording():
return self

Expand Down
61 changes: 58 additions & 3 deletions langfuse/_client/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,29 @@
import os
from typing import Dict, List, Optional

from opentelemetry import baggage
from opentelemetry import context as context_api
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import format_span_id

from langfuse._client.constants import LANGFUSE_TRACER_NAME
from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.constants import (
LANGFUSE_CORRELATION_CONTEXT_KEY,
LANGFUSE_TRACER_NAME,
)
from langfuse._client.environment_variables import (
LANGFUSE_FLUSH_AT,
LANGFUSE_FLUSH_INTERVAL,
LANGFUSE_OTEL_TRACES_EXPORT_PATH,
)
from langfuse._client.utils import span_formatter
from langfuse._client.utils import (
correlation_context_to_attribute_map,
get_attribute_key_from_correlation_context,
span_formatter,
)
from langfuse.logger import langfuse_logger
from langfuse.version import __version__ as langfuse_version

Expand Down Expand Up @@ -114,6 +126,49 @@ def __init__(
else None,
)

def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
# Propagate correlation context to span
current_context = parent_context or context_api.get_current()
propagated_attributes = {}

# Propagate correlation context in baggage
baggage_entries = baggage.get_all(context=current_context)

for key, value in baggage_entries.items():
if (
key.startswith(LangfuseOtelSpanAttributes.TRACE_METADATA)
or key in correlation_context_to_attribute_map.values()
):
propagated_attributes[key] = value

# Propagate correlation context in OTEL context
correlation_context = (
context_api.get_value(LANGFUSE_CORRELATION_CONTEXT_KEY, current_context)
or {}
)

if not isinstance(correlation_context, dict):
langfuse_logger.error(
f"Correlation context is not of type dict. Got type '{type(correlation_context)}'."
)

return super().on_start(span, parent_context)

for key, value in correlation_context.items():
attribute_key = get_attribute_key_from_correlation_context(key)
propagated_attributes[attribute_key] = value

# Write attributes on span
if propagated_attributes:
for key, value in propagated_attributes.items():
span.set_attribute(key, str(value))

langfuse_logger.debug(
f"Propagated {len(propagated_attributes)} attributes to span '{format_span_id(span.context.span_id)}': {propagated_attributes}"
)

return super().on_start(span, parent_context)

def on_end(self, span: ReadableSpan) -> None:
# Only export spans that belong to the scoped project
# This is important to not send spans to wrong project in multi-project setups
Expand Down
15 changes: 15 additions & 0 deletions langfuse/_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from opentelemetry.sdk import util
from opentelemetry.sdk.trace import ReadableSpan

from langfuse._client.attributes import LangfuseOtelSpanAttributes


def span_formatter(span: ReadableSpan) -> str:
parent_id = (
Expand Down Expand Up @@ -125,3 +127,16 @@ async def my_async_function():
else:
# Loop exists but not running, safe to use asyncio.run()
return asyncio.run(coro)


correlation_context_to_attribute_map = {
"session_id": LangfuseOtelSpanAttributes.TRACE_SESSION_ID,
"user_id": LangfuseOtelSpanAttributes.TRACE_USER_ID,
}


def get_attribute_key_from_correlation_context(correlation_context_key: str) -> str:
return (
correlation_context_to_attribute_map.get(correlation_context_key)
or f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{correlation_context_key}"
)
Loading
Loading