Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion langfuse/_client/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ObservationTypeGenerationLike,
ObservationTypeSpanLike,
)

from langfuse._utils.serializer import EventSerializer
from langfuse.model import PromptClient
from langfuse.types import MapValue, SpanLevel
Expand Down
114 changes: 46 additions & 68 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,27 @@
Any,
Callable,
Dict,
Generator,
List,
Literal,
Optional,
Type,
Union,
cast,
overload,
Generator,
)

import backoff
import httpx
from opentelemetry import (
baggage as otel_baggage_api,
trace as otel_trace_api,
)
from opentelemetry import (
context as otel_context_api,
)
from opentelemetry import (
trace as otel_trace_api,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.util._decorator import (
Expand All @@ -43,14 +47,12 @@

from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.constants import (
LANGFUSE_CORRELATION_CONTEXT_KEY,
ObservationTypeGenerationLike,
ObservationTypeLiteral,
ObservationTypeLiteralNoEvent,
ObservationTypeSpanLike,
get_observation_types_list,
LANGFUSE_CTX_USER_ID,
LANGFUSE_CTX_SESSION_ID,
LANGFUSE_CTX_METADATA,
)
from langfuse._client.datasets import DatasetClient, DatasetItemClient
from langfuse._client.environment_variables import (
Expand All @@ -76,7 +78,10 @@
LangfuseSpan,
LangfuseTool,
)
from langfuse._client.utils import run_async_safely
from langfuse._client.utils import (
get_attribute_key_from_correlation_context,
run_async_safely,
)
from langfuse._utils import _get_timestamp
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
Expand Down Expand Up @@ -219,10 +224,8 @@ def __init__(
additional_headers: Optional[Dict[str, str]] = None,
tracer_provider: Optional[TracerProvider] = None,
):
self._host = (
host
if host is not None
else os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
self._host = host or cast(
str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
)
self._environment = environment or cast(
str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT)
Expand Down Expand Up @@ -361,19 +364,18 @@ def start_span(
)

@_agnosticcontextmanager
def with_attributes(
def correlation_context(
self,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[dict[str, str]] = None,
correlation_context: Dict[str, str],
*,
as_baggage: bool = False,
) -> Generator[None, None, None]:
"""Creates a context manager that propagates the given attributes to all spans created within the context.
"""Create a context manager that propagates the given correlation_context to all spans within the context manager's scope.

Args:
session_id (str): Session identifier.
user_id (str): User identifier.
metadata (dict): Additional metadata to associate with all spans in the context. Values must be strings and are truncated to 200 characters.
correlation_context (Dict[str, str]): Dictionary containing key-value pairs to be propagated
to all spans within the context manager's scope. Common keys include user_id, session_id,
and custom metadata. All values must be strings below 200 characters.
as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage
for cross-service propagation. If False, stores only in local context for
current-service propagation. Defaults to False.
Expand All @@ -386,79 +388,55 @@ def with_attributes(
outbound requests made within this context. Only use this for non-sensitive
identifiers that are safe to transmit across service boundaries.

Example:
Examples:
```python
# Local context only (default)
with langfuse.with_attributes(session_id="session_123"):
# Local context only (default) - pass context as dictionary
with langfuse.correlation_context({"session_id": "session_123"}):
with langfuse.start_as_current_span(name="process-request") as span:
# This span and all its children will have session_id="session_123"
child_span = langfuse.start_span(name="child-operation")

# Multiple values in context dictionary
with langfuse.correlation_context({"user_id": "user_456", "experiment": "A"}):
# All spans will have both user_id and experiment attributes
span = langfuse.start_span(name="experiment-operation")

# Cross-service propagation (use with caution)
with langfuse.with_attributes(session_id="session_123", as_baggage=True):
with langfuse.correlation_context({"session_id": "session_123"}, as_baggage=True):
# session_id will be propagated to external service calls
response = requests.get("https://api.example.com/data")
```
"""
current_context = otel_context_api.get_current()
current_span = otel_trace_api.get_current_span()

# Process session_id
if session_id is not None:
current_context = otel_context_api.set_value(
LANGFUSE_CTX_SESSION_ID, session_id, current_context
)
if current_span is not None and current_span.is_recording():
current_span.set_attribute("session.id", session_id)
if as_baggage:
current_context = otel_baggage_api.set_baggage(
"session.id", session_id, current_context
)
current_context = otel_context_api.set_value(
LANGFUSE_CORRELATION_CONTEXT_KEY, correlation_context, current_context
)

# Process user_id
if user_id is not None:
current_context = otel_context_api.set_value(
LANGFUSE_CTX_USER_ID, user_id, current_context
)
if current_span is not None and current_span.is_recording():
current_span.set_attribute("user.id", user_id)
if as_baggage:
current_context = otel_baggage_api.set_baggage(
"user.id", user_id, current_context
for key, value in correlation_context.items():
if len(value) > 200:
langfuse_logger.warning(
f"Correlation context key '{key}' is over 200 characters ({len(value)} chars). Dropping value."
)
continue

# Process metadata
if metadata is not None:
# Truncate values with size > 200 to 200 characters and emit warning including the ky
for k, v in metadata.items():
if not isinstance(v, str):
# Ignore unreachable mypy warning as this runtime guard should make sense either way
warnings.warn( # type: ignore[unreachable]
f"Metadata values must be strings, got {type(v)} for key '{k}'"
)
del metadata[k]
if len(v) > 200:
warnings.warn(
f"Metadata value for key '{k}' exceeds 200 characters and will be truncated."
)
metadata[k] = v[:200]
attribute_key = get_attribute_key_from_correlation_context(key)

current_context = otel_context_api.set_value(
LANGFUSE_CTX_METADATA, metadata, current_context
)
if current_span is not None and current_span.is_recording():
for k, v in metadata.items():
current_span.set_attribute(f"langfuse.metadata.{k}", v)
current_span.set_attribute(attribute_key, value)

if as_baggage:
for k, v in metadata.items():
current_context = otel_baggage_api.set_baggage(
f"langfuse.metadata.{k}", str(v), current_context
)
current_context = otel_baggage_api.set_baggage(
key, value, current_context
)

# Activate context, execute, and detach context
token = otel_context_api.attach(current_context)

try:
yield

finally:
otel_context_api.detach(token)

Expand Down Expand Up @@ -1780,7 +1758,7 @@ def update_current_trace(
```
"""
warnings.warn(
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ",
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
DeprecationWarning,
stacklevel=2,
)
Expand Down
8 changes: 3 additions & 5 deletions langfuse/_client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
This module defines constants used throughout the Langfuse OpenTelemetry integration.
"""

from typing import Literal, List, get_args, Union, Any
from typing import Any, List, Literal, Union, get_args

from typing_extensions import TypeAlias

LANGFUSE_TRACER_NAME = "langfuse-sdk"

# Context key constants for Langfuse context propagation
LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id"
LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id"
LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata"
LANGFUSE_CORRELATION_CONTEXT_KEY = "langfuse.ctx.correlation"


"""Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat"""
Expand Down
8 changes: 4 additions & 4 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
and scoring integration specific to Langfuse's observability platform.
"""

import warnings
from datetime import datetime
from time import time_ns
import warnings
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -44,10 +44,10 @@
create_trace_attributes,
)
from langfuse._client.constants import (
ObservationTypeLiteral,
ObservationTypeGenerationLike,
ObservationTypeSpanLike,
ObservationTypeLiteral,
ObservationTypeLiteralNoEvent,
ObservationTypeSpanLike,
get_observation_types_list,
)
from langfuse.logger import langfuse_logger
Expand Down Expand Up @@ -234,7 +234,7 @@ def update_trace(
public: Whether the trace should be publicly accessible
"""
warnings.warn(
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ",
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
DeprecationWarning,
stacklevel=2,
)
Expand Down
Loading
Loading