From ab92d40018d34484cf85ceadce61235764593f38 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 7 Oct 2025 17:30:26 +0200 Subject: [PATCH 1/2] feat(correlation-context): update implementation --- langfuse/_client/client.py | 20 +++++++++++--------- langfuse/_client/span.py | 6 +++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index fa4196b34..90012a843 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -16,6 +16,7 @@ Any, Callable, Dict, + Generator, List, Literal, Optional, @@ -23,16 +24,19 @@ Union, cast, overload, - Generator, ) import backoff import httpx from opentelemetry import ( baggage as otel_baggage_api, - trace as otel_trace_api, +) +from opentelemetry import ( context as otel_context_api, ) +from opentelemetry import ( + trace as otel_trace_api, +) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.util._decorator import ( @@ -43,14 +47,14 @@ from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.constants import ( + LANGFUSE_CTX_METADATA, + LANGFUSE_CTX_SESSION_ID, + LANGFUSE_CTX_USER_ID, ObservationTypeGenerationLike, ObservationTypeLiteral, ObservationTypeLiteralNoEvent, ObservationTypeSpanLike, get_observation_types_list, - LANGFUSE_CTX_USER_ID, - LANGFUSE_CTX_SESSION_ID, - LANGFUSE_CTX_METADATA, ) from langfuse._client.datasets import DatasetClient, DatasetItemClient from langfuse._client.environment_variables import ( @@ -219,10 +223,8 @@ def __init__( additional_headers: Optional[Dict[str, str]] = None, tracer_provider: Optional[TracerProvider] = None, ): - self._host = ( - host - if host is not None - else os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") + self._host = host or cast( + str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") ) self._environment = environment or cast( str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index e02707dc2..6904c28c3 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -13,9 +13,9 @@ and scoring integration specific to Langfuse's observability platform. """ +import warnings from datetime import datetime from time import time_ns -import warnings from typing import ( TYPE_CHECKING, Any, @@ -44,10 +44,10 @@ create_trace_attributes, ) from langfuse._client.constants import ( - ObservationTypeLiteral, ObservationTypeGenerationLike, - ObservationTypeSpanLike, + ObservationTypeLiteral, ObservationTypeLiteralNoEvent, + ObservationTypeSpanLike, get_observation_types_list, ) from langfuse.logger import langfuse_logger From cf3ec39bf8396bdd04472400a58143cb21123f83 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 7 Oct 2025 19:22:16 +0200 Subject: [PATCH 2/2] push --- langfuse/_client/attributes.py | 1 - langfuse/_client/client.py | 100 ++++++++++--------------- langfuse/_client/constants.py | 8 +- langfuse/_client/span.py | 2 +- langfuse/_client/span_processor.py | 114 ++++++++++------------------- langfuse/_client/utils.py | 15 ++++ tests/test_core_sdk.py | 24 +++--- 7 files changed, 109 insertions(+), 155 deletions(-) diff --git a/langfuse/_client/attributes.py b/langfuse/_client/attributes.py index 5ae81000c..75c5645ea 100644 --- a/langfuse/_client/attributes.py +++ b/langfuse/_client/attributes.py @@ -18,7 +18,6 @@ ObservationTypeGenerationLike, ObservationTypeSpanLike, ) - from langfuse._utils.serializer import EventSerializer from langfuse.model import PromptClient from langfuse.types import MapValue, SpanLevel diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 90012a843..a98cbda2c 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -47,9 +47,7 @@ from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.constants import ( - LANGFUSE_CTX_METADATA, - LANGFUSE_CTX_SESSION_ID, - LANGFUSE_CTX_USER_ID, + LANGFUSE_CORRELATION_CONTEXT_KEY, ObservationTypeGenerationLike, ObservationTypeLiteral, ObservationTypeLiteralNoEvent, @@ -80,7 +78,10 @@ LangfuseSpan, LangfuseTool, ) -from langfuse._client.utils import run_async_safely +from langfuse._client.utils import ( + get_attribute_key_from_correlation_context, + run_async_safely, +) from langfuse._utils import _get_timestamp from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache @@ -363,19 +364,18 @@ def start_span( ) @_agnosticcontextmanager - def with_attributes( + def correlation_context( self, - session_id: Optional[str] = None, - user_id: Optional[str] = None, - metadata: Optional[dict[str, str]] = None, + correlation_context: Dict[str, str], + *, as_baggage: bool = False, ) -> Generator[None, None, None]: - """Creates a context manager that propagates the given attributes to all spans created within the context. + """Create a context manager that propagates the given correlation_context to all spans within the context manager's scope. Args: - session_id (str): Session identifier. - user_id (str): User identifier. - metadata (dict): Additional metadata to associate with all spans in the context. Values must be strings and are truncated to 200 characters. + correlation_context (Dict[str, str]): Dictionary containing key-value pairs to be propagated + to all spans within the context manager's scope. Common keys include user_id, session_id, + and custom metadata. All values must be strings below 200 characters. as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage for cross-service propagation. If False, stores only in local context for current-service propagation. Defaults to False. @@ -388,16 +388,21 @@ def with_attributes( outbound requests made within this context. Only use this for non-sensitive identifiers that are safe to transmit across service boundaries. - Example: + Examples: ```python - # Local context only (default) - with langfuse.with_attributes(session_id="session_123"): + # Local context only (default) - pass context as dictionary + with langfuse.correlation_context({"session_id": "session_123"}): with langfuse.start_as_current_span(name="process-request") as span: # This span and all its children will have session_id="session_123" child_span = langfuse.start_span(name="child-operation") + # Multiple values in context dictionary + with langfuse.correlation_context({"user_id": "user_456", "experiment": "A"}): + # All spans will have both user_id and experiment attributes + span = langfuse.start_span(name="experiment-operation") + # Cross-service propagation (use with caution) - with langfuse.with_attributes(session_id="session_123", as_baggage=True): + with langfuse.correlation_context({"session_id": "session_123"}, as_baggage=True): # session_id will be propagated to external service calls response = requests.get("https://api.example.com/data") ``` @@ -405,62 +410,33 @@ def with_attributes( current_context = otel_context_api.get_current() current_span = otel_trace_api.get_current_span() - # Process session_id - if session_id is not None: - current_context = otel_context_api.set_value( - LANGFUSE_CTX_SESSION_ID, session_id, current_context - ) - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("session.id", session_id) - if as_baggage: - current_context = otel_baggage_api.set_baggage( - "session.id", session_id, current_context - ) + current_context = otel_context_api.set_value( + LANGFUSE_CORRELATION_CONTEXT_KEY, correlation_context, current_context + ) - # Process user_id - if user_id is not None: - current_context = otel_context_api.set_value( - LANGFUSE_CTX_USER_ID, user_id, current_context - ) - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("user.id", user_id) - if as_baggage: - current_context = otel_baggage_api.set_baggage( - "user.id", user_id, current_context + for key, value in correlation_context.items(): + if len(value) > 200: + langfuse_logger.warning( + f"Correlation context key '{key}' is over 200 characters ({len(value)} chars). Dropping value." ) + continue - # Process metadata - if metadata is not None: - # Truncate values with size > 200 to 200 characters and emit warning including the ky - for k, v in metadata.items(): - if not isinstance(v, str): - # Ignore unreachable mypy warning as this runtime guard should make sense either way - warnings.warn( # type: ignore[unreachable] - f"Metadata values must be strings, got {type(v)} for key '{k}'" - ) - del metadata[k] - if len(v) > 200: - warnings.warn( - f"Metadata value for key '{k}' exceeds 200 characters and will be truncated." - ) - metadata[k] = v[:200] + attribute_key = get_attribute_key_from_correlation_context(key) - current_context = otel_context_api.set_value( - LANGFUSE_CTX_METADATA, metadata, current_context - ) if current_span is not None and current_span.is_recording(): - for k, v in metadata.items(): - current_span.set_attribute(f"langfuse.metadata.{k}", v) + current_span.set_attribute(attribute_key, value) + if as_baggage: - for k, v in metadata.items(): - current_context = otel_baggage_api.set_baggage( - f"langfuse.metadata.{k}", str(v), current_context - ) + current_context = otel_baggage_api.set_baggage( + key, value, current_context + ) # Activate context, execute, and detach context token = otel_context_api.attach(current_context) + try: yield + finally: otel_context_api.detach(token) @@ -1782,7 +1758,7 @@ def update_current_trace( ``` """ warnings.warn( - "update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ", + "update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ", DeprecationWarning, stacklevel=2, ) diff --git a/langfuse/_client/constants.py b/langfuse/_client/constants.py index 2a1297de6..b385f0a8f 100644 --- a/langfuse/_client/constants.py +++ b/langfuse/_client/constants.py @@ -3,15 +3,13 @@ This module defines constants used throughout the Langfuse OpenTelemetry integration. """ -from typing import Literal, List, get_args, Union, Any +from typing import Any, List, Literal, Union, get_args + from typing_extensions import TypeAlias LANGFUSE_TRACER_NAME = "langfuse-sdk" -# Context key constants for Langfuse context propagation -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" +LANGFUSE_CORRELATION_CONTEXT_KEY = "langfuse.ctx.correlation" """Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat""" diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 6904c28c3..fb9a2849d 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -234,7 +234,7 @@ def update_trace( public: Whether the trace should be publicly accessible """ warnings.warn( - "update_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ", + "update_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ", DeprecationWarning, stacklevel=2, ) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 2433c0f21..c62c5ad83 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -12,28 +12,32 @@ """ import base64 -import json import os from typing import Dict, List, Optional -from opentelemetry import baggage, context as context_api +from opentelemetry import baggage +from opentelemetry import context as context_api from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import format_span_id +from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.constants import ( + LANGFUSE_CORRELATION_CONTEXT_KEY, LANGFUSE_TRACER_NAME, - LANGFUSE_CTX_USER_ID, - LANGFUSE_CTX_SESSION_ID, - LANGFUSE_CTX_METADATA, ) from langfuse._client.environment_variables import ( LANGFUSE_FLUSH_AT, LANGFUSE_FLUSH_INTERVAL, LANGFUSE_OTEL_TRACES_EXPORT_PATH, ) -from langfuse._client.utils import span_formatter +from langfuse._client.utils import ( + correlation_context_to_attribute_map, + get_attribute_key_from_correlation_context, + span_formatter, +) from langfuse.logger import langfuse_logger from langfuse.version import __version__ as langfuse_version @@ -123,86 +127,46 @@ def __init__( ) def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: - """Handle span start event and propagate context and baggage to span attributes. - - This method is called when a span starts and applies context propagation: - 1. Propagates all baggage keys as span attributes - 2. Propagates langfuse.ctx.* context variables as span attributes - 3. Distributes langfuse.ctx.metadata keys as individual langfuse.metadata.* attributes - - Args: - span: The span that is starting - parent_context: The context when the span was created (optional) - """ - # Get the current context (use parent_context if available, otherwise current) + # Propagate correlation context to span current_context = parent_context or context_api.get_current() - - # Dictionary to collect span attributes that were propagated propagated_attributes = {} - # 1. Propagate all baggage keys as span attributes + # Propagate correlation context in baggage baggage_entries = baggage.get_all(context=current_context) + for key, value in baggage_entries.items(): - # Only propagate user.id, session.id and langfuse.metadata.* as those are set by us on the baggage - if key.startswith("langfuse.metadata.") or key in [ - "user.id", - "session.id", - ]: + if ( + key.startswith(LangfuseOtelSpanAttributes.TRACE_METADATA) + or key in correlation_context_to_attribute_map.values() + ): propagated_attributes[key] = value - langfuse_logger.debug( - f"Propagated baggage key '{key}' = '{value}' to span '{span.name}'" - ) - - # 2. Propagate langfuse.ctx.* context variables - langfuse_ctx_keys = [LANGFUSE_CTX_USER_ID, LANGFUSE_CTX_SESSION_ID] - for ctx_key in langfuse_ctx_keys: - try: - value = context_api.get_value(ctx_key, context=current_context) - if value is not None: - # Convert context key to span attribute name (remove langfuse.ctx. prefix) - attr_key = ctx_key.replace("langfuse.ctx.", "") - propagated_attributes[attr_key] = value - langfuse_logger.debug( - f"Propagated context key '{ctx_key}' = '{value}' to span '{span.name}'" - ) - except Exception as e: - langfuse_logger.debug(f"Could not read context key '{ctx_key}': {e}") - - # 3. Handle langfuse.ctx.metadata - distribute keys as individual attributes - try: - # Get metadata dict from context - metadata_dict = context_api.get_value( - LANGFUSE_CTX_METADATA, context=current_context + + # Propagate correlation context in OTEL context + correlation_context = ( + context_api.get_value(LANGFUSE_CORRELATION_CONTEXT_KEY, current_context) + or {} + ) + + if not isinstance(correlation_context, dict): + langfuse_logger.error( + f"Correlation context is not of type dict. Got type '{type(correlation_context)}'." ) - if metadata_dict is not None and isinstance(metadata_dict, dict): - # Set each metadata key as a separate span attribute with langfuse.metadata. prefix - for key, value in metadata_dict.items(): - attr_key = f"langfuse.metadata.{key}" - - # Convert value to appropriate type for span attribute (naive or json stringify) - attr_value = ( - value - if isinstance(value, (str, int, float, bool)) - else json.dumps(value) - ) - - propagated_attributes[attr_key] = attr_value - langfuse_logger.debug( - f"Propagated metadata key '{key}' = '{attr_value}' to span '{span.name}'" - ) - except Exception as e: - langfuse_logger.debug(f"Could not read metadata from context: {e}") - - # Log summary of propagated attributes + + return super().on_start(span, parent_context) + + for key, value in correlation_context.items(): + attribute_key = get_attribute_key_from_correlation_context(key) + propagated_attributes[attribute_key] = value + + # Write attributes on span if propagated_attributes: + for key, value in propagated_attributes.items(): + span.set_attribute(key, str(value)) + langfuse_logger.debug( - f"Propagated {len(propagated_attributes)} attributes to span '{span.name}': {list(propagated_attributes.keys())}" + f"Propagated {len(propagated_attributes)} attributes to span '{format_span_id(span.context.span_id)}': {propagated_attributes}" ) - # Set all propagated attributes on the span - for key, value in propagated_attributes.items(): - span.set_attribute(key, value) # type: ignore[arg-type] - return super().on_start(span, parent_context) def on_end(self, span: ReadableSpan) -> None: diff --git a/langfuse/_client/utils.py b/langfuse/_client/utils.py index d34857ebd..340daddb6 100644 --- a/langfuse/_client/utils.py +++ b/langfuse/_client/utils.py @@ -13,6 +13,8 @@ from opentelemetry.sdk import util from opentelemetry.sdk.trace import ReadableSpan +from langfuse._client.attributes import LangfuseOtelSpanAttributes + def span_formatter(span: ReadableSpan) -> str: parent_id = ( @@ -125,3 +127,16 @@ async def my_async_function(): else: # Loop exists but not running, safe to use asyncio.run() return asyncio.run(coro) + + +correlation_context_to_attribute_map = { + "session_id": LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "user_id": LangfuseOtelSpanAttributes.TRACE_USER_ID, +} + + +def get_attribute_key_from_correlation_context(correlation_context_key: str) -> str: + return ( + correlation_context_to_attribute_map.get(correlation_context_key) + or f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{correlation_context_key}" + ) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index d5af127c6..829d9d971 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2071,7 +2071,7 @@ def test_context_manager_user_propagation(): user_id = "test_user_123" with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.with_attributes(user_id=user_id): + with langfuse.correlation_context({"user_id": user_id}): trace_id = parent_span.trace_id # Create child spans that should inherit user_id @@ -2107,7 +2107,7 @@ def test_context_manager_session_propagation(): session_id = "test_session_456" with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.with_attributes(session_id=session_id): + with langfuse.correlation_context({"session_id": session_id}): trace_id = parent_span.trace_id # Create child spans that should inherit session_id @@ -2142,8 +2142,8 @@ def test_context_manager_metadata_propagation(): langfuse = Langfuse() with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.with_attributes( - metadata={ + with langfuse.correlation_context( + { "experiment": "A/B", "version": "1.2.3", "feature_flag": "enabled", @@ -2189,10 +2189,10 @@ def test_context_manager_nested_contexts(): langfuse = Langfuse() with langfuse.start_as_current_span(name="outer-span") as outer_span: - with langfuse.with_attributes(user_id="user_1", session_id="session_1"): - with langfuse.with_attributes( - metadata={"env": "prod", "region": "us-east"} - ): + with langfuse.correlation_context( + {"user_id": "user_1", "session_id": "session_1"} + ): + with langfuse.correlation_context({"env": "prod", "region": "us-east"}): outer_trace_id = outer_span.trace_id # Create span in outer context @@ -2234,9 +2234,11 @@ def test_context_manager_baggage_propagation(): # Test with baggage enabled (careful with sensitive data) with langfuse.start_as_current_span(name="service-span") as span: - with langfuse.with_attributes(session_id="public_session_789", as_baggage=True): - with langfuse.with_attributes( - as_baggage=True, metadata={"service": "api", "version": "v1.0"} + with langfuse.correlation_context( + {"session_id": "public_session_789"}, as_baggage=True + ): + with langfuse.correlation_context( + {"service": "api", "version": "v1.0"}, as_baggage=True ): trace_id = span.trace_id