From 1e7020123f638ceee1752a9fca24d90b59336bc9 Mon Sep 17 00:00:00 2001 From: steffen911 Date: Mon, 29 Sep 2025 19:57:30 +0200 Subject: [PATCH 01/15] feat: propagate trace attributes onto all child spans on update --- langfuse/_client/client.py | 203 ++++++++++++++++++++- langfuse/_client/span.py | 204 ++++++++++++++++++++- langfuse/_client/span_processor.py | 114 +++++++++++- tests/test_core_sdk.py | 275 +++++++++++++++++++++++++---- 4 files changed, 755 insertions(+), 41 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index ebc65e988..bdc507971 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -4,6 +4,7 @@ """ import asyncio +import json import logging import os import re @@ -27,8 +28,11 @@ import backoff import httpx -from opentelemetry import trace -from opentelemetry import trace as otel_trace_api +from opentelemetry import ( + trace as otel_trace_api, + baggage as otel_baggage_api, + context as otel_context_api, +) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.util._decorator import ( @@ -111,6 +115,11 @@ ) from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext +# Context key constants for Langfuse context propagation +LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" +LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" +LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" + class Langfuse: """Main client for Langfuse tracing and platform features. @@ -1667,6 +1676,11 @@ def update_current_trace( span.update(output=response) ``` """ + warnings.warn( + "update_current_trace is deprecated and will be removed in a future version. ", + DeprecationWarning, + stacklevel=2, + ) if not self._tracing_enabled: langfuse_logger.debug( "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." @@ -1811,7 +1825,7 @@ def _create_remote_parent_span( is_remote=False, ) - return trace.NonRecordingSpan(span_context) + return otel_trace_api.NonRecordingSpan(span_context) def _is_valid_trace_id(self, trace_id: str) -> bool: pattern = r"^[0-9a-f]{32}$" @@ -3450,3 +3464,186 @@ def clear_prompt_cache(self) -> None: """ if self._resources is not None: self._resources.prompt_cache.clear() + + @_agnosticcontextmanager + def session(self, id: str, *, as_baggage: bool = False) -> _AgnosticContextManager: + """Create a session context manager that propagates session_id to all child spans. + + Args: + id (str): The session identifier to propagate to child spans. + as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + + Returns: + Context manager that sets session_id on all spans created within its scope. + + Warning: + When as_baggage=True, the session_id will be included in HTTP headers of any + outbound requests made within this context. Only use this for non-sensitive + identifiers that are safe to transmit across service boundaries. + + Example: + ```python + # Local context only (default) + with langfuse.session(id="session_123"): + with langfuse.start_as_current_span(name="process-request") as span: + # This span and all its children will have session_id="session_123" + child_span = langfuse.start_span(name="child-operation") + + # Cross-service propagation (use with caution) + with langfuse.session(id="session_123", as_baggage=True): + # session_id will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) + token = otel_context_api.attach(new_context) + + # Set baggage if requested + baggage_token = None + if as_baggage: + new_baggage = otel_baggage_api.set_baggage("session.id", id) + baggage_token = otel_context_api.attach(new_baggage) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach baggage token if it was set + if baggage_token is not None: + otel_context_api.detach(baggage_token) + + @_agnosticcontextmanager + def user(self, id: str, *, as_baggage: bool = False) -> _AgnosticContextManager: + """Create a user context manager that propagates user_id to all child spans. + + Args: + id (str): The user identifier to propagate to child spans. + as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + + Returns: + Context manager that sets user_id on all spans created within its scope. + + Warning: + When as_baggage=True, the user_id will be included in HTTP headers of any + outbound requests made within this context. This may leak sensitive user + information to external services. Use with extreme caution. + + Example: + ```python + # Local context only (default, recommended for user IDs) + with langfuse.user(id="user_456"): + with langfuse.start_as_current_span(name="user-action") as span: + # This span and all its children will have user_id="user_456" + pass + + # Cross-service propagation (NOT recommended for sensitive user IDs) + with langfuse.user(id="public_user_456", as_baggage=True): + # user_id will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) + token = otel_context_api.attach(new_context) + + # Set baggage if requested + baggage_token = None + if as_baggage: + new_baggage = otel_baggage_api.set_baggage("user.id", id) + baggage_token = otel_context_api.attach(new_baggage) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach baggage token if it was set + if baggage_token is not None: + otel_context_api.detach(baggage_token) + + @_agnosticcontextmanager + def metadata( + self, *, as_baggage: bool = False, **kwargs + ) -> _AgnosticContextManager: + """Create a metadata context manager that propagates metadata to all child spans. + + Args: + as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + **kwargs: Metadata key-value pairs. Values should not exceed 200 characters. + + Returns: + Context manager that sets metadata on all spans created within its scope. + + Warning: + When as_baggage=True, all metadata key-value pairs will be included in HTTP + headers of any outbound requests made within this context. Ensure no sensitive + information is included in the metadata when using cross-service propagation. + + Example: + ```python + # Local context only (default) + with langfuse.metadata(experiment="A/B", version="1.2.3"): + with langfuse.start_as_current_span(name="experiment-run") as span: + # This span and all its children will have the metadata + pass + + # Cross-service propagation (use with caution) + with langfuse.metadata(as_baggage=True, experiment="A/B", service="api"): + # metadata will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + if not kwargs: + # No metadata to set, just yield + yield + return + + # Convert metadata dict to JSON string for context storage + metadata_json = json.dumps(kwargs) + + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, metadata_json) + token = otel_context_api.attach(new_context) + + # Set baggage if requested + baggage_tokens = [] + if as_baggage: + current_baggage = otel_baggage_api.get_all() + new_baggage = current_baggage + + # Add each metadata key-value pair to baggage + for key, value in kwargs.items(): + # Convert value to string and truncate if needed for baggage + str_value = str(value) + if len(str_value) > 200: + str_value = str_value[:200] + + baggage_key = f"metadata.{key}" + new_baggage = otel_baggage_api.set_baggage( + baggage_key, str_value, new_baggage + ) + + # Attach the new baggage context + if new_baggage != current_baggage: + baggage_token = otel_context_api.attach(new_baggage) + baggage_tokens.append(baggage_token) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach all baggage tokens if they were set + for baggage_token in baggage_tokens: + otel_context_api.detach(baggage_token) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 9fa9c7489..0ec379e67 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -15,6 +15,7 @@ from datetime import datetime from time import time_ns +import json import warnings from typing import ( TYPE_CHECKING, @@ -29,8 +30,15 @@ overload, ) -from opentelemetry import trace as otel_trace_api -from opentelemetry.util._decorator import _AgnosticContextManager +from opentelemetry import ( + baggage as otel_baggage_api, + context as otel_context_api, + trace as otel_trace_api, +) +from opentelemetry.util._decorator import ( + _AgnosticContextManager, + _agnosticcontextmanager, +) from langfuse.model import PromptClient @@ -58,6 +66,11 @@ # Populated after class definitions _OBSERVATION_CLASS_MAP: Dict[str, Type["LangfuseObservationWrapper"]] = {} +# Context key constants for Langfuse context propagation +LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" +LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" +LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" + class LangfuseObservationWrapper: """Abstract base class for all Langfuse span types. @@ -233,6 +246,11 @@ def update_trace( tags: List of tags to categorize the trace public: Whether the trace should be publicly accessible """ + warnings.warn( + "update_trace is deprecated and will be removed in a future version. ", + DeprecationWarning, + stacklevel=2, + ) if not self._otel_span.is_recording(): return self @@ -1113,6 +1131,188 @@ def start_as_current_observation( # type: ignore[misc] prompt=prompt, ) + @_agnosticcontextmanager + def session( + self, id: str, *, as_baggage: bool = False + ) -> "_AgnosticContextManager": + """Create a session context manager that propagates session_id to all child spans. + + Args: + id (str): The session identifier to propagate to child spans. + as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + + Returns: + Context manager that sets session_id on all spans created within its scope. + + Warning: + When as_baggage=True, the session_id will be included in HTTP headers of any + outbound requests made within this context. Only use this for non-sensitive + identifiers that are safe to transmit across service boundaries. + + Example: + ```python + # Local context only (default) + with span.session(id="session_123"): + child_span = span.start_span(name="child-operation") + # child_span will have session_id="session_123" + + # Cross-service propagation (use with caution) + with span.session(id="session_123", as_baggage=True): + # session_id will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) + token = otel_context_api.attach(new_context) + + # Set baggage if requested + baggage_token = None + if as_baggage: + new_baggage = otel_baggage_api.set_baggage("session.id", id) + baggage_token = otel_context_api.attach(new_baggage) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach baggage token if it was set + if baggage_token is not None: + otel_context_api.detach(baggage_token) + + @_agnosticcontextmanager + def user(self, id: str, *, as_baggage: bool = False) -> "_AgnosticContextManager": + """Create a user context manager that propagates user_id to all child spans. + + Args: + id (str): The user identifier to propagate to child spans. + as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + + Returns: + Context manager that sets user_id on all spans created within its scope. + + Warning: + When as_baggage=True, the user_id will be included in HTTP headers of any + outbound requests made within this context. This may leak sensitive user + information to external services. Use with extreme caution. + + Example: + ```python + # Local context only (default, recommended for user IDs) + with span.user(id="user_456"): + child_span = span.start_span(name="child-operation") + # child_span will have user_id="user_456" + + # Cross-service propagation (NOT recommended for sensitive user IDs) + with span.user(id="public_user_456", as_baggage=True): + # user_id will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) + token = otel_context_api.attach(new_context) + + # Set baggage if requested + baggage_token = None + if as_baggage: + new_baggage = otel_baggage_api.set_baggage("user.id", id) + baggage_token = otel_context_api.attach(new_baggage) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach baggage token if it was set + if baggage_token is not None: + otel_context_api.detach(baggage_token) + + @_agnosticcontextmanager + def metadata( + self, *, as_baggage: bool = False, **kwargs + ) -> "_AgnosticContextManager": + """Create a metadata context manager that propagates metadata to all child spans. + + Args: + as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + **kwargs: Metadata key-value pairs. Values should not exceed 200 characters. + + Returns: + Context manager that sets metadata on all spans created within its scope. + + Warning: + When as_baggage=True, all metadata key-value pairs will be included in HTTP + headers of any outbound requests made within this context. Ensure no sensitive + information is included in the metadata when using cross-service propagation. + + Example: + ```python + # Local context only (default) + with span.metadata(experiment="A/B", version="1.2.3"): + child_span = span.start_span(name="child-operation") + # child_span will have the metadata + + # Cross-service propagation (use with caution) + with span.metadata(as_baggage=True, experiment="A/B", service="api"): + # metadata will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + if not kwargs: + # No metadata to set, just yield + yield + return + + # Convert metadata dict to JSON string for context storage + metadata_json = json.dumps(kwargs) + + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, metadata_json) + token = otel_context_api.attach(new_context) + + # Set baggage if requested + baggage_tokens = [] + if as_baggage: + current_baggage = otel_baggage_api.get_all() + new_baggage = current_baggage + + # Add each metadata key-value pair to baggage + for key, value in kwargs.items(): + # Convert value to string and truncate if needed for baggage + str_value = str(value) + if len(str_value) > 200: + str_value = str_value[:200] + + baggage_key = f"metadata.{key}" + new_baggage = otel_baggage_api.set_baggage( + baggage_key, str_value, new_baggage + ) + + # Attach the new baggage context + if new_baggage != current_baggage: + baggage_token = otel_context_api.attach(new_baggage) + baggage_tokens.append(baggage_token) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach all baggage tokens if they were set + for baggage_token in baggage_tokens: + otel_context_api.detach(baggage_token) + class LangfuseSpan(LangfuseObservationWrapper): """Standard span implementation for general operations in Langfuse. diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index baa72360c..fdd7264eb 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -12,11 +12,14 @@ """ import base64 +import json import os from typing import Dict, List, Optional +from opentelemetry import baggage, context as context_api +from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor from langfuse._client.constants import LANGFUSE_TRACER_NAME @@ -29,6 +32,11 @@ from langfuse.logger import langfuse_logger from langfuse.version import __version__ as langfuse_version +# Context key constants for Langfuse context propagation +LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" +LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" +LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" + class LangfuseSpanProcessor(BatchSpanProcessor): """OpenTelemetry span processor that exports spans to the Langfuse API. @@ -114,6 +122,110 @@ def __init__( else None, ) + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: + """Handle span start event and propagate context and baggage to span attributes. + + This method is called when a span starts and applies context propagation: + 1. Propagates all baggage keys as span attributes + 2. Propagates langfuse.ctx.* context variables as span attributes + 3. Merges langfuse.ctx.metadata.* keys into a single metadata JSON attribute + + Args: + span: The span that is starting + parent_context: The context when the span was created (optional) + """ + if self._is_langfuse_span(span) and not self._is_langfuse_project_span(span): + langfuse_logger.debug( + f"Security: Span rejected - belongs to project '{span.instrumentation_scope.attributes.get('public_key') if span.instrumentation_scope and span.instrumentation_scope.attributes else None}' but processor is for '{self.public_key}'. " + f"This prevents cross-project data leakage in multi-project environments." + ) + return super().on_start(span, parent_context) + + # Get the current context (use parent_context if available, otherwise current) + current_context = parent_context or context_api.get_current() + + # Dictionary to collect span attributes that were propagated + propagated_attributes = {} + + # 1. Propagate all baggage keys as span attributes + baggage_entries = baggage.get_all(context=current_context) + for key, value in baggage_entries.items(): + # Check if this baggage entry is already present as a span attribute + if not hasattr(span.attributes, key) or span.attributes.get(key) != value: + propagated_attributes[key] = value + langfuse_logger.debug( + f"Propagated baggage key '{key}' = '{value}' to span '{span.name}'" + ) + + # 2. Propagate langfuse.ctx.* context variables + langfuse_ctx_keys = [LANGFUSE_CTX_USER_ID, LANGFUSE_CTX_SESSION_ID] + for ctx_key in langfuse_ctx_keys: + try: + value = context_api.get_value(ctx_key, context=current_context) + if value is not None: + # Convert context key to span attribute name (remove langfuse.ctx. prefix) + attr_key = ctx_key.replace("langfuse.ctx.", "") + + # Only propagate if not already set on span + if ( + not hasattr(span.attributes, attr_key) + or span.attributes.get(attr_key) != value + ): + propagated_attributes[attr_key] = value + langfuse_logger.debug( + f"Propagated context key '{ctx_key}' = '{value}' to span '{span.name}'" + ) + except Exception as e: + langfuse_logger.debug(f"Could not read context key '{ctx_key}': {e}") + + # 3. Handle langfuse.ctx.metadata.* keys - merge into single metadata JSON + try: + # Get metadata as a single JSON object from context + metadata_value = context_api.get_value( + LANGFUSE_CTX_METADATA, context=current_context + ) + if metadata_value is not None: + if isinstance(metadata_value, str): + # If it's already a JSON string, validate it + try: + json.loads(metadata_value) # Validate JSON + metadata_json = metadata_value + except json.JSONDecodeError: + # If invalid JSON, wrap in quotes + metadata_json = json.dumps({"value": metadata_value}) + elif isinstance(metadata_value, dict): + # Convert dict to JSON string + metadata_json = json.dumps(metadata_value) + else: + # Convert other types to a wrapped JSON object + metadata_json = json.dumps({"value": str(metadata_value)}) + + # Only propagate if not already set or different + existing_metadata = ( + span.attributes.get("metadata") + if hasattr(span, "attributes") + else None + ) + if existing_metadata != metadata_json: + propagated_attributes["metadata"] = metadata_json + langfuse_logger.debug( + f"Propagated metadata to span '{span.name}': {metadata_json}" + ) + except Exception as e: + langfuse_logger.debug(f"Could not read metadata from context: {e}") + + # Log summary of propagated attributes + if propagated_attributes: + langfuse_logger.debug( + f"Propagated {len(propagated_attributes)} attributes to span '{span.name}': {list(propagated_attributes.keys())}" + ) + + # Set all propagated attributes on the span + for key, value in propagated_attributes.items(): + span.set_attribute(key, value) + + return super().on_start(span, parent_context) + def on_end(self, span: ReadableSpan) -> None: # Only export spans that belong to the scoped project # This is important to not send spans to wrong project in multi-project setups diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index 26d11746c..fa6c7275b 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -338,7 +338,7 @@ def test_create_update_current_trace(): user_id="test", metadata={"key": "value"}, public=True, - input="test_input" + input="test_input", ) # Get trace ID for later reference trace_id = span.trace_id @@ -347,7 +347,9 @@ def test_create_update_current_trace(): sleep(1) # Update trace properties using update_current_trace - langfuse.update_current_trace(metadata={"key2": "value2"}, public=False, version="1.0") + langfuse.update_current_trace( + metadata={"key2": "value2"}, public=False, version="1.0" + ) # Ensure data is sent to the API langfuse.flush() @@ -1957,9 +1959,9 @@ def test_start_as_current_observation_types(): expected_types = {obs_type.upper() for obs_type in observation_types} | { "SPAN" } # includes parent span - assert expected_types.issubset(found_types), ( - f"Missing types: {expected_types - found_types}" - ) + assert expected_types.issubset( + found_types + ), f"Missing types: {expected_types - found_types}" # Verify each specific observation exists for obs_type in observation_types: @@ -2003,25 +2005,25 @@ def test_that_generation_like_properties_are_actually_created(): ) as obs: # Verify the properties are accessible on the observation object if hasattr(obs, "model"): - assert obs.model == test_model, ( - f"{obs_type} should have model property" - ) + assert ( + obs.model == test_model + ), f"{obs_type} should have model property" if hasattr(obs, "completion_start_time"): - assert obs.completion_start_time == test_completion_start_time, ( - f"{obs_type} should have completion_start_time property" - ) + assert ( + obs.completion_start_time == test_completion_start_time + ), f"{obs_type} should have completion_start_time property" if hasattr(obs, "model_parameters"): - assert obs.model_parameters == test_model_parameters, ( - f"{obs_type} should have model_parameters property" - ) + assert ( + obs.model_parameters == test_model_parameters + ), f"{obs_type} should have model_parameters property" if hasattr(obs, "usage_details"): - assert obs.usage_details == test_usage_details, ( - f"{obs_type} should have usage_details property" - ) + assert ( + obs.usage_details == test_usage_details + ), f"{obs_type} should have usage_details property" if hasattr(obs, "cost_details"): - assert obs.cost_details == test_cost_details, ( - f"{obs_type} should have cost_details property" - ) + assert ( + obs.cost_details == test_cost_details + ), f"{obs_type} should have cost_details property" langfuse.flush() @@ -2035,28 +2037,231 @@ def test_that_generation_like_properties_are_actually_created(): for obs in trace.observations if obs.name == f"test-{obs_type}" and obs.type == obs_type.upper() ] - assert len(observations) == 1, ( - f"Expected one {obs_type.upper()} observation, but found {len(observations)}" - ) + assert ( + len(observations) == 1 + ), f"Expected one {obs_type.upper()} observation, but found {len(observations)}" obs = observations[0] assert obs.model == test_model, f"{obs_type} should have model property" - assert obs.model_parameters == test_model_parameters, ( - f"{obs_type} should have model_parameters property" - ) + assert ( + obs.model_parameters == test_model_parameters + ), f"{obs_type} should have model_parameters property" # usage_details assert hasattr(obs, "usage_details"), f"{obs_type} should have usage_details" - assert obs.usage_details == dict(test_usage_details, total=30), ( - f"{obs_type} should persist usage_details" - ) # API adds total + assert obs.usage_details == dict( + test_usage_details, total=30 + ), f"{obs_type} should persist usage_details" # API adds total - assert obs.cost_details == test_cost_details, ( - f"{obs_type} should persist cost_details" - ) + assert ( + obs.cost_details == test_cost_details + ), f"{obs_type} should persist cost_details" # completion_start_time, because of time skew not asserting time - assert obs.completion_start_time is not None, ( - f"{obs_type} should persist completion_start_time property" - ) + assert ( + obs.completion_start_time is not None + ), f"{obs_type} should persist completion_start_time property" + + +def test_context_manager_user_propagation(): + """Test that user context manager propagates user_id to child spans.""" + langfuse = Langfuse() + + user_id = "test_user_123" + + with langfuse.start_as_current_span(name="parent-span") as parent_span: + with langfuse.user(id=user_id): + trace_id = parent_span.trace_id + + # Create child spans that should inherit user_id + child_span = langfuse.start_span(name="child-span") + child_span.end() + + # Create generation that should inherit user_id + generation = parent_span.start_generation(name="child-generation") + generation.end() + + langfuse.flush() + sleep(2) + + # Verify trace has user_id (child spans inherit via context propagation) + trace = get_api().trace.get(trace_id) + assert trace.user_id == user_id + + # Verify child observations were created + child_observations = [ + obs + for obs in trace.observations + if obs.name in ["child-span", "child-generation"] + ] + assert len(child_observations) == 2 + + +def test_context_manager_session_propagation(): + """Test that session context manager propagates session_id to child spans.""" + langfuse = Langfuse() + + session_id = "test_session_456" + + with langfuse.start_as_current_span(name="parent-span") as parent_span: + with langfuse.session(id=session_id): + trace_id = parent_span.trace_id + + # Create child spans that should inherit session_id + child_span = langfuse.start_span(name="child-span") + child_span.end() + + # Create nested context to test multiple levels + with langfuse.start_as_current_span(name="nested-span") as nested_span: + grandchild_span = langfuse.start_span(name="grandchild-span") + grandchild_span.end() + + langfuse.flush() + sleep(2) + + # Verify trace has session_id + trace = get_api().trace.get(trace_id) + assert trace.session_id == session_id + + # Verify nested spans were created + nested_observations = [obs for obs in trace.observations if "span" in obs.name] + assert len(nested_observations) >= 2 + + +def test_context_manager_metadata_propagation(): + """Test that metadata context manager propagates metadata to child spans.""" + langfuse = Langfuse() + + with langfuse.start_as_current_span(name="parent-span") as parent_span: + with langfuse.metadata( + experiment="A/B", version="1.2.3", feature_flag="enabled" + ): + trace_id = parent_span.trace_id + + # Create child spans that should inherit metadata + child_span = langfuse.start_span(name="child-span") + child_span.end() + + # Create generation that should inherit metadata + generation = parent_span.start_generation(name="child-generation") + generation.end() + + langfuse.flush() + sleep(2) + + # Verify trace has metadata + trace = get_api().trace.get(trace_id) + assert trace.metadata["experiment"] == "A/B" + assert trace.metadata["version"] == "1.2.3" + assert trace.metadata["feature_flag"] == "enabled" + + +def test_context_manager_nested_contexts(): + """Test nested context managers with overrides and merging.""" + langfuse = Langfuse() + + with langfuse.start_as_current_span(name="outer-span") as outer_span: + with langfuse.user(id="user_1"): + with langfuse.session(id="session_1"): + with langfuse.metadata(env="prod", region="us-east"): + outer_trace_id = outer_span.trace_id + + # Create span in outer context + outer_child = langfuse.start_span(name="outer-child") + outer_child.end() + + # Override user in nested context + with langfuse.user(id="user_2"): + with langfuse.metadata( + env="staging" + ): # Override env, keep region + nested_span = langfuse.start_span(name="nested-span") + nested_span.end() + + langfuse.flush() + sleep(2) + + # Verify trace was created with nested spans + trace = get_api().trace.get(outer_trace_id) + + # Verify trace-level properties from the outer context + assert trace.user_id == "user_2" # Last set user_id should win + assert trace.session_id == "session_1" # Session should be preserved + assert trace.metadata["env"] == "staging" # Last set env should win + assert ( + trace.metadata["region"] == "us-east" + ) # Region should be preserved from outer context + + # Verify child observations were created + child_observations = [ + obs for obs in trace.observations if "child" in obs.name or "nested" in obs.name + ] + assert len(child_observations) >= 2 + + # Verify specific child spans exist + outer_child_obs = [obs for obs in trace.observations if obs.name == "outer-child"] + nested_span_obs = [obs for obs in trace.observations if obs.name == "nested-span"] + + assert len(outer_child_obs) == 1, "outer-child span should exist" + assert len(nested_span_obs) == 1, "nested-span should exist" + + +def test_context_manager_baggage_propagation(): + """Test context managers with as_baggage=True for cross-service propagation.""" + langfuse = Langfuse() + + # Test with baggage enabled (careful with sensitive data) + with langfuse.start_as_current_span(name="service-span") as span: + with langfuse.session(id="public_session_789", as_baggage=True): + with langfuse.metadata(as_baggage=True, service="api", version="1.0"): + trace_id = span.trace_id + + # Create child spans that inherit baggage context + child_span = langfuse.start_span(name="external-call-span") + child_span.end() + + langfuse.flush() + sleep(2) + + # Verify trace properties were set + trace = get_api().trace.get(trace_id) + assert trace.session_id == "public_session_789" + assert trace.metadata["service"] == "api" + assert trace.metadata["version"] == "1.0" + + +def test_span_context_managers(): + """Test context managers called on span instances.""" + langfuse = Langfuse() + + with langfuse.start_as_current_span(name="parent-span") as parent_span: + trace_id = parent_span.trace_id + + # Use context managers on the span instance + with parent_span.user(id="span_user_123"): + child1 = parent_span.start_span(name="user-context-child") + child1.end() + + with parent_span.session(id="span_session_456"): + child2 = parent_span.start_span(name="session-context-child") + child2.end() + + with parent_span.metadata(component="auth", action="validate"): + child3 = parent_span.start_span(name="metadata-context-child") + child3.end() + + langfuse.flush() + sleep(2) + + # Verify trace was created with child observations + trace = get_api().trace.get(trace_id) + child_names = [ + obs.name for obs in trace.observations if obs.name.endswith("-child") + ] + expected_children = [ + "user-context-child", + "session-context-child", + "metadata-context-child", + ] + assert all(child in child_names for child in expected_children) From f15d538bcd6bd6dd8a2e0dad5cfe48288f0cd42e Mon Sep 17 00:00:00 2001 From: steffen911 Date: Tue, 30 Sep 2025 09:05:37 +0200 Subject: [PATCH 02/15] chore: lint --- tests/test_core_sdk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index fa6c7275b..092eaf5d5 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2113,7 +2113,7 @@ def test_context_manager_session_propagation(): child_span.end() # Create nested context to test multiple levels - with langfuse.start_as_current_span(name="nested-span") as nested_span: + with langfuse.start_as_current_span(name="nested-span"): grandchild_span = langfuse.start_span(name="grandchild-span") grandchild_span.end() From 702d2774d87a30d75f37ed6150cadbc6a1ed7491 Mon Sep 17 00:00:00 2001 From: steffen911 Date: Tue, 30 Sep 2025 09:24:28 +0200 Subject: [PATCH 03/15] chore: typing --- langfuse/_client/client.py | 24 +++++++++++++++--------- langfuse/_client/span.py | 15 ++++++++------- langfuse/_client/span_processor.py | 14 ++++++++------ 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index bdc507971..7f8d5ef6e 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -17,6 +17,7 @@ Any, Callable, Dict, + Generator, List, Literal, Optional, @@ -198,6 +199,7 @@ class Langfuse: _resources: Optional[LangfuseResourceManager] = None _mask: Optional[MaskFunction] = None _otel_tracer: otel_trace_api.Tracer + _host: str def __init__( self, @@ -220,8 +222,10 @@ def __init__( additional_headers: Optional[Dict[str, str]] = None, tracer_provider: Optional[TracerProvider] = None, ): - self._host = host or cast( - str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") + self._host = ( + host + if host is not None + else os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") ) self._environment = environment or cast( str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) @@ -3466,7 +3470,9 @@ def clear_prompt_cache(self) -> None: self._resources.prompt_cache.clear() @_agnosticcontextmanager - def session(self, id: str, *, as_baggage: bool = False) -> _AgnosticContextManager: + def session( + self, id: str, *, as_baggage: bool = False + ) -> Generator[None, None, None]: """Create a session context manager that propagates session_id to all child spans. Args: @@ -3518,7 +3524,7 @@ def session(self, id: str, *, as_baggage: bool = False) -> _AgnosticContextManag otel_context_api.detach(baggage_token) @_agnosticcontextmanager - def user(self, id: str, *, as_baggage: bool = False) -> _AgnosticContextManager: + def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]: """Create a user context manager that propagates user_id to all child spans. Args: @@ -3571,8 +3577,8 @@ def user(self, id: str, *, as_baggage: bool = False) -> _AgnosticContextManager: @_agnosticcontextmanager def metadata( - self, *, as_baggage: bool = False, **kwargs - ) -> _AgnosticContextManager: + self, *, as_baggage: bool = False, **kwargs: Any + ) -> Generator[None, None, None]: """Create a metadata context manager that propagates metadata to all child spans. Args: @@ -3618,8 +3624,8 @@ def metadata( # Set baggage if requested baggage_tokens = [] if as_baggage: - current_baggage = otel_baggage_api.get_all() - new_baggage = current_baggage + # Start with None context and chain baggage settings + new_baggage = None # Add each metadata key-value pair to baggage for key, value in kwargs.items(): @@ -3634,7 +3640,7 @@ def metadata( ) # Attach the new baggage context - if new_baggage != current_baggage: + if new_baggage is not None: baggage_token = otel_context_api.attach(new_baggage) baggage_tokens.append(baggage_token) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 0ec379e67..90471f27d 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -21,6 +21,7 @@ TYPE_CHECKING, Any, Dict, + Generator, List, Literal, Optional, @@ -1134,7 +1135,7 @@ def start_as_current_observation( # type: ignore[misc] @_agnosticcontextmanager def session( self, id: str, *, as_baggage: bool = False - ) -> "_AgnosticContextManager": + ) -> Generator[None, None, None]: """Create a session context manager that propagates session_id to all child spans. Args: @@ -1185,7 +1186,7 @@ def session( otel_context_api.detach(baggage_token) @_agnosticcontextmanager - def user(self, id: str, *, as_baggage: bool = False) -> "_AgnosticContextManager": + def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]: """Create a user context manager that propagates user_id to all child spans. Args: @@ -1237,8 +1238,8 @@ def user(self, id: str, *, as_baggage: bool = False) -> "_AgnosticContextManager @_agnosticcontextmanager def metadata( - self, *, as_baggage: bool = False, **kwargs - ) -> "_AgnosticContextManager": + self, *, as_baggage: bool = False, **kwargs: Any + ) -> Generator[None, None, None]: """Create a metadata context manager that propagates metadata to all child spans. Args: @@ -1283,8 +1284,8 @@ def metadata( # Set baggage if requested baggage_tokens = [] if as_baggage: - current_baggage = otel_baggage_api.get_all() - new_baggage = current_baggage + # Start with None context and chain baggage settings + new_baggage = None # Add each metadata key-value pair to baggage for key, value in kwargs.items(): @@ -1299,7 +1300,7 @@ def metadata( ) # Attach the new baggage context - if new_baggage != current_baggage: + if new_baggage is not None: baggage_token = otel_context_api.attach(new_baggage) baggage_tokens.append(baggage_token) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index fdd7264eb..d9d67943d 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -151,7 +151,9 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None baggage_entries = baggage.get_all(context=current_context) for key, value in baggage_entries.items(): # Check if this baggage entry is already present as a span attribute - if not hasattr(span.attributes, key) or span.attributes.get(key) != value: + if not hasattr(span.attributes, key) or ( + span.attributes is not None and span.attributes.get(key) != value + ): propagated_attributes[key] = value langfuse_logger.debug( f"Propagated baggage key '{key}' = '{value}' to span '{span.name}'" @@ -167,9 +169,9 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None attr_key = ctx_key.replace("langfuse.ctx.", "") # Only propagate if not already set on span - if ( - not hasattr(span.attributes, attr_key) - or span.attributes.get(attr_key) != value + if not hasattr(span.attributes, attr_key) or ( + span.attributes is not None + and span.attributes.get(attr_key) != value ): propagated_attributes[attr_key] = value langfuse_logger.debug( @@ -203,7 +205,7 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None # Only propagate if not already set or different existing_metadata = ( span.attributes.get("metadata") - if hasattr(span, "attributes") + if hasattr(span, "attributes") and span.attributes is not None else None ) if existing_metadata != metadata_json: @@ -222,7 +224,7 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None # Set all propagated attributes on the span for key, value in propagated_attributes.items(): - span.set_attribute(key, value) + span.set_attribute(key, value) # type: ignore[arg-type] return super().on_start(span, parent_context) From 10d9c85842e8c3961dcaa0bbdc28b0663f972a9b Mon Sep 17 00:00:00 2001 From: steffen911 Date: Tue, 30 Sep 2025 10:06:31 +0200 Subject: [PATCH 04/15] chore: distribute metadata --- langfuse/_client/client.py | 9 ++--- langfuse/_client/span.py | 9 ++--- langfuse/_client/span_processor.py | 56 ++++++++++++++---------------- 3 files changed, 32 insertions(+), 42 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 7f8d5ef6e..5de724d10 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -4,7 +4,6 @@ """ import asyncio -import json import logging import os import re @@ -3614,11 +3613,9 @@ def metadata( yield return - # Convert metadata dict to JSON string for context storage - metadata_json = json.dumps(kwargs) - - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, metadata_json) + # Store metadata as a dict in context (not JSON string) + # This allows span_processor to distribute keys as individual attributes + new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) token = otel_context_api.attach(new_context) # Set baggage if requested diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 90471f27d..18079c8c3 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -15,7 +15,6 @@ from datetime import datetime from time import time_ns -import json import warnings from typing import ( TYPE_CHECKING, @@ -1274,11 +1273,9 @@ def metadata( yield return - # Convert metadata dict to JSON string for context storage - metadata_json = json.dumps(kwargs) - - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, metadata_json) + # Store metadata as a dict in context (not JSON string) + # This allows span_processor to distribute keys as individual attributes + new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) token = otel_context_api.attach(new_context) # Set baggage if requested diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index d9d67943d..76db4f8b7 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -128,7 +128,7 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None This method is called when a span starts and applies context propagation: 1. Propagates all baggage keys as span attributes 2. Propagates langfuse.ctx.* context variables as span attributes - 3. Merges langfuse.ctx.metadata.* keys into a single metadata JSON attribute + 3. Distributes langfuse.ctx.metadata keys as individual langfuse.metadata.* attributes Args: span: The span that is starting @@ -180,39 +180,35 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None except Exception as e: langfuse_logger.debug(f"Could not read context key '{ctx_key}': {e}") - # 3. Handle langfuse.ctx.metadata.* keys - merge into single metadata JSON + # 3. Handle langfuse.ctx.metadata - distribute keys as individual attributes try: - # Get metadata as a single JSON object from context - metadata_value = context_api.get_value( + # Get metadata dict from context + metadata_dict = context_api.get_value( LANGFUSE_CTX_METADATA, context=current_context ) - if metadata_value is not None: - if isinstance(metadata_value, str): - # If it's already a JSON string, validate it - try: - json.loads(metadata_value) # Validate JSON - metadata_json = metadata_value - except json.JSONDecodeError: - # If invalid JSON, wrap in quotes - metadata_json = json.dumps({"value": metadata_value}) - elif isinstance(metadata_value, dict): - # Convert dict to JSON string - metadata_json = json.dumps(metadata_value) - else: - # Convert other types to a wrapped JSON object - metadata_json = json.dumps({"value": str(metadata_value)}) - - # Only propagate if not already set or different - existing_metadata = ( - span.attributes.get("metadata") - if hasattr(span, "attributes") and span.attributes is not None - else None - ) - if existing_metadata != metadata_json: - propagated_attributes["metadata"] = metadata_json - langfuse_logger.debug( - f"Propagated metadata to span '{span.name}': {metadata_json}" + if metadata_dict is not None and isinstance(metadata_dict, dict): + # Set each metadata key as a separate span attribute with langfuse.metadata. prefix + for key, value in metadata_dict.items(): + attr_key = f"langfuse.metadata.{key}" + + # Convert value to appropriate type for span attribute + if isinstance(value, (str, int, float, bool)): + attr_value = value + else: + # For complex types, convert to JSON string + attr_value = json.dumps(value) + + # Only propagate if not already set or different + existing_value = ( + span.attributes.get(attr_key) + if hasattr(span, "attributes") and span.attributes is not None + else None ) + if existing_value != attr_value: + propagated_attributes[attr_key] = attr_value + langfuse_logger.debug( + f"Propagated metadata key '{key}' = '{attr_value}' to span '{span.name}'" + ) except Exception as e: langfuse_logger.debug(f"Could not read metadata from context: {e}") From db4de18cc241aaa0acf843b98e7863ae8196a0d3 Mon Sep 17 00:00:00 2001 From: steffen911 Date: Tue, 30 Sep 2025 10:21:30 +0200 Subject: [PATCH 05/15] chore: set properties on current span --- langfuse/_client/client.py | 25 +++++++++++++++++++++++++ langfuse/_client/span.py | 25 +++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 5de724d10..bfaa18f61 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3506,6 +3506,11 @@ def session( new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) token = otel_context_api.attach(new_context) + # Set attribute on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("session.id", id) + # Set baggage if requested baggage_token = None if as_baggage: @@ -3558,6 +3563,11 @@ def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, No new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) token = otel_context_api.attach(new_context) + # Set attribute on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("user.id", id) + # Set baggage if requested baggage_token = None if as_baggage: @@ -3618,6 +3628,21 @@ def metadata( new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) token = otel_context_api.attach(new_context) + # Set attributes on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + import json as json_module + + for key, value in kwargs.items(): + attr_key = f"langfuse.metadata.{key}" + # Convert value to appropriate type for span attribute + if isinstance(value, (str, int, float, bool)): + attr_value = value + else: + # For complex types, convert to JSON string + attr_value = json_module.dumps(value) + current_span.set_attribute(attr_key, attr_value) + # Set baggage if requested baggage_tokens = [] if as_baggage: diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 18079c8c3..ce771f9d0 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -1168,6 +1168,11 @@ def session( new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) token = otel_context_api.attach(new_context) + # Set attribute on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("session.id", id) + # Set baggage if requested baggage_token = None if as_baggage: @@ -1219,6 +1224,11 @@ def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, No new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) token = otel_context_api.attach(new_context) + # Set attribute on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("user.id", id) + # Set baggage if requested baggage_token = None if as_baggage: @@ -1278,6 +1288,21 @@ def metadata( new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) token = otel_context_api.attach(new_context) + # Set attributes on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + import json as json_module + + for key, value in kwargs.items(): + attr_key = f"langfuse.metadata.{key}" + # Convert value to appropriate type for span attribute + if isinstance(value, (str, int, float, bool)): + attr_value = value + else: + # For complex types, convert to JSON string + attr_value = json_module.dumps(value) + current_span.set_attribute(attr_key, attr_value) + # Set baggage if requested baggage_tokens = [] if as_baggage: From 05657c60ae16c41750b3accb6dd589c3a1dbed69 Mon Sep 17 00:00:00 2001 From: Steffen Schmitz Date: Tue, 30 Sep 2025 08:39:08 +0000 Subject: [PATCH 06/15] chore: update test case behaviour --- tests/test_core_sdk.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index 092eaf5d5..f643306f2 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2214,7 +2214,7 @@ def test_context_manager_baggage_propagation(): # Test with baggage enabled (careful with sensitive data) with langfuse.start_as_current_span(name="service-span") as span: with langfuse.session(id="public_session_789", as_baggage=True): - with langfuse.metadata(as_baggage=True, service="api", version="1.0"): + with langfuse.metadata(as_baggage=True, service="api", version="v1.0"): trace_id = span.trace_id # Create child spans that inherit baggage context @@ -2228,7 +2228,7 @@ def test_context_manager_baggage_propagation(): trace = get_api().trace.get(trace_id) assert trace.session_id == "public_session_789" assert trace.metadata["service"] == "api" - assert trace.metadata["version"] == "1.0" + assert trace.metadata["version"] == "v1.0" def test_span_context_managers(): From fc3b3c5d749f9f78bd7ed9303cbd13e66e3c9da5 Mon Sep 17 00:00:00 2001 From: Steffen Schmitz Date: Tue, 30 Sep 2025 08:54:13 +0000 Subject: [PATCH 07/15] chore: test observations --- tests/test_core_sdk.py | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index f643306f2..169a6717b 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2089,11 +2089,12 @@ def test_context_manager_user_propagation(): trace = get_api().trace.get(trace_id) assert trace.user_id == user_id - # Verify child observations were created + # Verify child observations were created and have user_id child_observations = [ obs for obs in trace.observations if obs.name in ["child-span", "child-generation"] + and obs.metadata["attributes"]["user.id"] == user_id ] assert len(child_observations) == 2 @@ -2125,7 +2126,11 @@ def test_context_manager_session_propagation(): assert trace.session_id == session_id # Verify nested spans were created - nested_observations = [obs for obs in trace.observations if "span" in obs.name] + nested_observations = [ + obs + for obs in trace.observations + if "span" in obs.name and obs.metadata["attributes"]["session.id"] == session_id + ] assert len(nested_observations) >= 2 @@ -2156,6 +2161,21 @@ def test_context_manager_metadata_propagation(): assert trace.metadata["version"] == "1.2.3" assert trace.metadata["feature_flag"] == "enabled" + # Verify all observations have the metadata distributed as individual keys + for obs in trace.observations: + if obs.name in ["child-span", "child-generation", "parent-span"]: + # Check that metadata was set on the observation + assert hasattr(obs, "metadata"), f"Observation {obs.name} missing metadata" + assert ( + obs.metadata["experiment"] == "A/B" + ), f"Observation {obs.name} missing experiment metadata" + assert ( + obs.metadata["version"] == "1.2.3" + ), f"Observation {obs.name} missing version metadata" + assert ( + obs.metadata["feature_flag"] == "enabled" + ), f"Observation {obs.name} missing feature_flag metadata" + def test_context_manager_nested_contexts(): """Test nested context managers with overrides and merging.""" @@ -2199,7 +2219,7 @@ def test_context_manager_nested_contexts(): ] assert len(child_observations) >= 2 - # Verify specific child spans exist + # Verify specific child spans exist and have correct metadata outer_child_obs = [obs for obs in trace.observations if obs.name == "outer-child"] nested_span_obs = [obs for obs in trace.observations if obs.name == "nested-span"] @@ -2230,6 +2250,18 @@ def test_context_manager_baggage_propagation(): assert trace.metadata["service"] == "api" assert trace.metadata["version"] == "v1.0" + # Verify all observations have the metadata and session_id + for obs in trace.observations: + if obs.name in ["external-call-span", "service-span"]: + # Check that metadata was set on the observation + assert hasattr(obs, "metadata"), f"Observation {obs.name} missing metadata" + assert ( + obs.metadata["service"] == "api" + ), f"Observation {obs.name} missing service metadata" + assert ( + obs.metadata["version"] == "v1.0" + ), f"Observation {obs.name} missing version metadata" + def test_span_context_managers(): """Test context managers called on span instances.""" From 338421867e4800ecbdce221cce99aa0523ba77b2 Mon Sep 17 00:00:00 2001 From: Steffen Schmitz Date: Tue, 30 Sep 2025 09:17:51 +0000 Subject: [PATCH 08/15] cleanup tests --- tests/test_core_sdk.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index 169a6717b..aa15457fb 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2094,7 +2094,8 @@ def test_context_manager_user_propagation(): obs for obs in trace.observations if obs.name in ["child-span", "child-generation"] - and obs.metadata["attributes"]["user.id"] == user_id + # Skip user.id validation as we currently drop it from the visible attributes server-side. + # and obs.metadata["attributes"]["user.id"] == user_id ] assert len(child_observations) == 2 @@ -2129,7 +2130,9 @@ def test_context_manager_session_propagation(): nested_observations = [ obs for obs in trace.observations - if "span" in obs.name and obs.metadata["attributes"]["session.id"] == session_id + if "span" in obs.name + # Skip session.id validation as we currently drop it from the visible attributes server-side. + # and obs.metadata["attributes"]["session.id"] == session_id ] assert len(nested_observations) >= 2 @@ -2191,13 +2194,8 @@ def test_context_manager_nested_contexts(): outer_child = langfuse.start_span(name="outer-child") outer_child.end() - # Override user in nested context - with langfuse.user(id="user_2"): - with langfuse.metadata( - env="staging" - ): # Override env, keep region - nested_span = langfuse.start_span(name="nested-span") - nested_span.end() + nested_span = langfuse.start_span(name="nested-span") + nested_span.end() langfuse.flush() sleep(2) @@ -2205,13 +2203,11 @@ def test_context_manager_nested_contexts(): # Verify trace was created with nested spans trace = get_api().trace.get(outer_trace_id) - # Verify trace-level properties from the outer context - assert trace.user_id == "user_2" # Last set user_id should win - assert trace.session_id == "session_1" # Session should be preserved - assert trace.metadata["env"] == "staging" # Last set env should win - assert ( - trace.metadata["region"] == "us-east" - ) # Region should be preserved from outer context + # Verify trace-level properties from the context + assert trace.user_id == "user_1" + assert trace.session_id == "session_1" + assert trace.metadata["env"] == "prod" + assert trace.metadata["region"] == "us-east" # Verify child observations were created child_observations = [ From 87a9cfe7b4a0cc1eadeabd356359102fc091d682 Mon Sep 17 00:00:00 2001 From: steffen911 Date: Tue, 30 Sep 2025 15:36:50 +0200 Subject: [PATCH 09/15] chore: pr comments --- langfuse/_client/client.py | 10 +++------- langfuse/_client/span.py | 10 +++------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index bfaa18f61..f62bed20f 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -6,6 +6,7 @@ import asyncio import logging import os +import json import re import urllib.parse import warnings @@ -3631,8 +3632,6 @@ def metadata( # Set attributes on currently active span if exists current_span = otel_trace_api.get_current_span() if current_span is not None and current_span.is_recording(): - import json as json_module - for key, value in kwargs.items(): attr_key = f"langfuse.metadata.{key}" # Convert value to appropriate type for span attribute @@ -3640,7 +3639,7 @@ def metadata( attr_value = value else: # For complex types, convert to JSON string - attr_value = json_module.dumps(value) + attr_value = json.dumps(value) current_span.set_attribute(attr_key, attr_value) # Set baggage if requested @@ -3653,10 +3652,7 @@ def metadata( for key, value in kwargs.items(): # Convert value to string and truncate if needed for baggage str_value = str(value) - if len(str_value) > 200: - str_value = str_value[:200] - - baggage_key = f"metadata.{key}" + baggage_key = f"langfuse.metadata.{key}" new_baggage = otel_baggage_api.set_baggage( baggage_key, str_value, new_baggage ) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index ce771f9d0..66beb1842 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -16,6 +16,7 @@ from datetime import datetime from time import time_ns import warnings +import json from typing import ( TYPE_CHECKING, Any, @@ -1291,8 +1292,6 @@ def metadata( # Set attributes on currently active span if exists current_span = otel_trace_api.get_current_span() if current_span is not None and current_span.is_recording(): - import json as json_module - for key, value in kwargs.items(): attr_key = f"langfuse.metadata.{key}" # Convert value to appropriate type for span attribute @@ -1300,7 +1299,7 @@ def metadata( attr_value = value else: # For complex types, convert to JSON string - attr_value = json_module.dumps(value) + attr_value = json.dumps(value) current_span.set_attribute(attr_key, attr_value) # Set baggage if requested @@ -1313,10 +1312,7 @@ def metadata( for key, value in kwargs.items(): # Convert value to string and truncate if needed for baggage str_value = str(value) - if len(str_value) > 200: - str_value = str_value[:200] - - baggage_key = f"metadata.{key}" + baggage_key = f"langfuse.metadata.{key}" new_baggage = otel_baggage_api.set_baggage( baggage_key, str_value, new_baggage ) From cd234adcc2e418381b3ac5a9fddd0f6c411bbbb8 Mon Sep 17 00:00:00 2001 From: steffen911 Date: Tue, 30 Sep 2025 15:55:18 +0200 Subject: [PATCH 10/15] chore: reduce duplication --- langfuse/_client/client.py | 215 +-------------------- langfuse/_client/context_propagation.py | 244 ++++++++++++++++++++++++ langfuse/_client/span.py | 213 +-------------------- langfuse/_client/span_processor.py | 10 +- 4 files changed, 253 insertions(+), 429 deletions(-) create mode 100644 langfuse/_client/context_propagation.py diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index f62bed20f..b7f768d90 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -6,7 +6,6 @@ import asyncio import logging import os -import json import re import urllib.parse import warnings @@ -17,7 +16,6 @@ Any, Callable, Dict, - Generator, List, Literal, Optional, @@ -31,8 +29,6 @@ import httpx from opentelemetry import ( trace as otel_trace_api, - baggage as otel_baggage_api, - context as otel_context_api, ) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator @@ -115,14 +111,10 @@ TextPromptClient, ) from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext +from langfuse._client.context_propagation import LangfuseContextPropagationMixin -# Context key constants for Langfuse context propagation -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" - -class Langfuse: +class Langfuse(LangfuseContextPropagationMixin): """Main client for Langfuse tracing and platform features. This class provides an interface for creating and managing traces, spans, @@ -3468,206 +3460,3 @@ def clear_prompt_cache(self) -> None: """ if self._resources is not None: self._resources.prompt_cache.clear() - - @_agnosticcontextmanager - def session( - self, id: str, *, as_baggage: bool = False - ) -> Generator[None, None, None]: - """Create a session context manager that propagates session_id to all child spans. - - Args: - id (str): The session identifier to propagate to child spans. - as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - - Returns: - Context manager that sets session_id on all spans created within its scope. - - Warning: - When as_baggage=True, the session_id will be included in HTTP headers of any - outbound requests made within this context. Only use this for non-sensitive - identifiers that are safe to transmit across service boundaries. - - Example: - ```python - # Local context only (default) - with langfuse.session(id="session_123"): - with langfuse.start_as_current_span(name="process-request") as span: - # This span and all its children will have session_id="session_123" - child_span = langfuse.start_span(name="child-operation") - - # Cross-service propagation (use with caution) - with langfuse.session(id="session_123", as_baggage=True): - # session_id will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) - token = otel_context_api.attach(new_context) - - # Set attribute on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("session.id", id) - - # Set baggage if requested - baggage_token = None - if as_baggage: - new_baggage = otel_baggage_api.set_baggage("session.id", id) - baggage_token = otel_context_api.attach(new_baggage) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach baggage token if it was set - if baggage_token is not None: - otel_context_api.detach(baggage_token) - - @_agnosticcontextmanager - def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]: - """Create a user context manager that propagates user_id to all child spans. - - Args: - id (str): The user identifier to propagate to child spans. - as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - - Returns: - Context manager that sets user_id on all spans created within its scope. - - Warning: - When as_baggage=True, the user_id will be included in HTTP headers of any - outbound requests made within this context. This may leak sensitive user - information to external services. Use with extreme caution. - - Example: - ```python - # Local context only (default, recommended for user IDs) - with langfuse.user(id="user_456"): - with langfuse.start_as_current_span(name="user-action") as span: - # This span and all its children will have user_id="user_456" - pass - - # Cross-service propagation (NOT recommended for sensitive user IDs) - with langfuse.user(id="public_user_456", as_baggage=True): - # user_id will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) - token = otel_context_api.attach(new_context) - - # Set attribute on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("user.id", id) - - # Set baggage if requested - baggage_token = None - if as_baggage: - new_baggage = otel_baggage_api.set_baggage("user.id", id) - baggage_token = otel_context_api.attach(new_baggage) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach baggage token if it was set - if baggage_token is not None: - otel_context_api.detach(baggage_token) - - @_agnosticcontextmanager - def metadata( - self, *, as_baggage: bool = False, **kwargs: Any - ) -> Generator[None, None, None]: - """Create a metadata context manager that propagates metadata to all child spans. - - Args: - as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - **kwargs: Metadata key-value pairs. Values should not exceed 200 characters. - - Returns: - Context manager that sets metadata on all spans created within its scope. - - Warning: - When as_baggage=True, all metadata key-value pairs will be included in HTTP - headers of any outbound requests made within this context. Ensure no sensitive - information is included in the metadata when using cross-service propagation. - - Example: - ```python - # Local context only (default) - with langfuse.metadata(experiment="A/B", version="1.2.3"): - with langfuse.start_as_current_span(name="experiment-run") as span: - # This span and all its children will have the metadata - pass - - # Cross-service propagation (use with caution) - with langfuse.metadata(as_baggage=True, experiment="A/B", service="api"): - # metadata will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - if not kwargs: - # No metadata to set, just yield - yield - return - - # Store metadata as a dict in context (not JSON string) - # This allows span_processor to distribute keys as individual attributes - new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) - token = otel_context_api.attach(new_context) - - # Set attributes on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - for key, value in kwargs.items(): - attr_key = f"langfuse.metadata.{key}" - # Convert value to appropriate type for span attribute - if isinstance(value, (str, int, float, bool)): - attr_value = value - else: - # For complex types, convert to JSON string - attr_value = json.dumps(value) - current_span.set_attribute(attr_key, attr_value) - - # Set baggage if requested - baggage_tokens = [] - if as_baggage: - # Start with None context and chain baggage settings - new_baggage = None - - # Add each metadata key-value pair to baggage - for key, value in kwargs.items(): - # Convert value to string and truncate if needed for baggage - str_value = str(value) - baggage_key = f"langfuse.metadata.{key}" - new_baggage = otel_baggage_api.set_baggage( - baggage_key, str_value, new_baggage - ) - - # Attach the new baggage context - if new_baggage is not None: - baggage_token = otel_context_api.attach(new_baggage) - baggage_tokens.append(baggage_token) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach all baggage tokens if they were set - for baggage_token in baggage_tokens: - otel_context_api.detach(baggage_token) diff --git a/langfuse/_client/context_propagation.py b/langfuse/_client/context_propagation.py new file mode 100644 index 000000000..095f11ae3 --- /dev/null +++ b/langfuse/_client/context_propagation.py @@ -0,0 +1,244 @@ +"""Context propagation utilities for Langfuse tracing. + +This module provides a mixin class that enables automatic propagation of trace +attributes (session_id, user_id, metadata) from parent contexts to child spans +using OpenTelemetry's context and baggage mechanisms. + +The mixin is shared between the main Langfuse client and span classes to provide +consistent context propagation behavior across the SDK. +""" + +import json +from typing import Any, Generator + +from opentelemetry import ( + baggage as otel_baggage_api, + context as otel_context_api, + trace as otel_trace_api, +) +from opentelemetry.util._decorator import _agnosticcontextmanager + +# Context key constants for Langfuse context propagation +LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" +LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" +LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" + + +class LangfuseContextPropagationMixin: + """Mixin providing context managers for automatic trace attribute propagation. + + This mixin adds three context managers (session, user, metadata) that enable + automatic propagation of trace attributes to all child spans created within + their scope. The propagation works through OpenTelemetry's context mechanism + for local (same-service) propagation, with optional baggage for cross-service + propagation. + + Classes that inherit this mixin gain the ability to create contexts where + certain attributes are automatically applied to all spans without manual + specification. + """ + + @_agnosticcontextmanager + def session( + self, id: str, *, as_baggage: bool = False + ) -> Generator[None, None, None]: + """Create a session context manager that propagates session_id to all child spans. + + Args: + id (str): The session identifier to propagate to child spans. + as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + + Returns: + Context manager that sets session_id on all spans created within its scope. + + Warning: + When as_baggage=True, the session_id will be included in HTTP headers of any + outbound requests made within this context. Only use this for non-sensitive + identifiers that are safe to transmit across service boundaries. + + Example: + ```python + # Local context only (default) + with langfuse.session(id="session_123"): + with langfuse.start_as_current_span(name="process-request") as span: + # This span and all its children will have session_id="session_123" + child_span = langfuse.start_span(name="child-operation") + + # Cross-service propagation (use with caution) + with langfuse.session(id="session_123", as_baggage=True): + # session_id will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) + token = otel_context_api.attach(new_context) + + # Set attribute on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("session.id", id) + + # Set baggage if requested + baggage_token = None + if as_baggage: + new_baggage = otel_baggage_api.set_baggage("session.id", id) + baggage_token = otel_context_api.attach(new_baggage) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach baggage token if it was set + if baggage_token is not None: + otel_context_api.detach(baggage_token) + + @_agnosticcontextmanager + def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]: + """Create a user context manager that propagates user_id to all child spans. + + Args: + id (str): The user identifier to propagate to child spans. + as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + + Returns: + Context manager that sets user_id on all spans created within its scope. + + Warning: + When as_baggage=True, the user_id will be included in HTTP headers of any + outbound requests made within this context. This may leak sensitive user + information to external services. Use with extreme caution. + + Example: + ```python + # Local context only (default, recommended for user IDs) + with langfuse.user(id="user_456"): + with langfuse.start_as_current_span(name="user-action") as span: + # This span and all its children will have user_id="user_456" + pass + + # Cross-service propagation (NOT recommended for sensitive user IDs) + with langfuse.user(id="public_user_456", as_baggage=True): + # user_id will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + # Set context variable + new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) + token = otel_context_api.attach(new_context) + + # Set attribute on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("user.id", id) + + # Set baggage if requested + baggage_token = None + if as_baggage: + new_baggage = otel_baggage_api.set_baggage("user.id", id) + baggage_token = otel_context_api.attach(new_baggage) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach baggage token if it was set + if baggage_token is not None: + otel_context_api.detach(baggage_token) + + @_agnosticcontextmanager + def metadata( + self, *, as_baggage: bool = False, **kwargs: Any + ) -> Generator[None, None, None]: + """Create a metadata context manager that propagates metadata to all child spans. + + Args: + as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + **kwargs: Metadata key-value pairs. Values should not exceed 200 characters. + + Returns: + Context manager that sets metadata on all spans created within its scope. + + Warning: + When as_baggage=True, all metadata key-value pairs will be included in HTTP + headers of any outbound requests made within this context. Ensure no sensitive + information is included in the metadata when using cross-service propagation. + + Example: + ```python + # Local context only (default) + with langfuse.metadata(experiment="A/B", version="1.2.3"): + with langfuse.start_as_current_span(name="experiment-run") as span: + # This span and all its children will have the metadata + pass + + # Cross-service propagation (use with caution) + with langfuse.metadata(as_baggage=True, experiment="A/B", service="api"): + # metadata will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + if not kwargs: + # No metadata to set, just yield + yield + return + + # Store metadata as a dict in context (not JSON string) + # This allows span_processor to distribute keys as individual attributes + new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) + token = otel_context_api.attach(new_context) + + # Set attributes on currently active span if exists + current_span = otel_trace_api.get_current_span() + if current_span is not None and current_span.is_recording(): + for key, value in kwargs.items(): + attr_key = f"langfuse.metadata.{key}" + # Convert value to appropriate type for span attribute + if isinstance(value, (str, int, float, bool)): + attr_value = value + else: + # For complex types, convert to JSON string + attr_value = json.dumps(value) + current_span.set_attribute(attr_key, attr_value) + + # Set baggage if requested + baggage_token = None + if as_baggage: + # Start with None context and chain baggage settings + new_baggage = None + + # Add each metadata key-value pair to baggage + for key, value in kwargs.items(): + # Convert value to string and truncate if needed for baggage + str_value = str(value) + if len(str_value) > 200: + str_value = str_value[:200] + + baggage_key = f"metadata.{key}" + new_baggage = otel_baggage_api.set_baggage( + baggage_key, str_value, new_baggage + ) + + # Attach the new baggage context + if new_baggage is not None: + baggage_token = otel_context_api.attach(new_baggage) + + try: + yield + finally: + # Always detach context token + otel_context_api.detach(token) + + # Detach all baggage tokens if they were set + if baggage_token is not None: + otel_context_api.detach(baggage_token) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 66beb1842..1c7b7473a 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -16,12 +16,10 @@ from datetime import datetime from time import time_ns import warnings -import json from typing import ( TYPE_CHECKING, Any, Dict, - Generator, List, Literal, Optional, @@ -32,13 +30,10 @@ ) from opentelemetry import ( - baggage as otel_baggage_api, - context as otel_context_api, trace as otel_trace_api, ) from opentelemetry.util._decorator import ( _AgnosticContextManager, - _agnosticcontextmanager, ) from langfuse.model import PromptClient @@ -61,19 +56,15 @@ ) from langfuse.logger import langfuse_logger from langfuse.types import MapValue, ScoreDataType, SpanLevel +from langfuse._client.context_propagation import LangfuseContextPropagationMixin # Factory mapping for observation classes # Note: "event" is handled separately due to special instantiation logic # Populated after class definitions _OBSERVATION_CLASS_MAP: Dict[str, Type["LangfuseObservationWrapper"]] = {} -# Context key constants for Langfuse context propagation -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" - -class LangfuseObservationWrapper: +class LangfuseObservationWrapper(LangfuseContextPropagationMixin): """Abstract base class for all Langfuse span types. This class provides common functionality for all Langfuse span types, including @@ -1132,206 +1123,6 @@ def start_as_current_observation( # type: ignore[misc] prompt=prompt, ) - @_agnosticcontextmanager - def session( - self, id: str, *, as_baggage: bool = False - ) -> Generator[None, None, None]: - """Create a session context manager that propagates session_id to all child spans. - - Args: - id (str): The session identifier to propagate to child spans. - as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - - Returns: - Context manager that sets session_id on all spans created within its scope. - - Warning: - When as_baggage=True, the session_id will be included in HTTP headers of any - outbound requests made within this context. Only use this for non-sensitive - identifiers that are safe to transmit across service boundaries. - - Example: - ```python - # Local context only (default) - with span.session(id="session_123"): - child_span = span.start_span(name="child-operation") - # child_span will have session_id="session_123" - - # Cross-service propagation (use with caution) - with span.session(id="session_123", as_baggage=True): - # session_id will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) - token = otel_context_api.attach(new_context) - - # Set attribute on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("session.id", id) - - # Set baggage if requested - baggage_token = None - if as_baggage: - new_baggage = otel_baggage_api.set_baggage("session.id", id) - baggage_token = otel_context_api.attach(new_baggage) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach baggage token if it was set - if baggage_token is not None: - otel_context_api.detach(baggage_token) - - @_agnosticcontextmanager - def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]: - """Create a user context manager that propagates user_id to all child spans. - - Args: - id (str): The user identifier to propagate to child spans. - as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - - Returns: - Context manager that sets user_id on all spans created within its scope. - - Warning: - When as_baggage=True, the user_id will be included in HTTP headers of any - outbound requests made within this context. This may leak sensitive user - information to external services. Use with extreme caution. - - Example: - ```python - # Local context only (default, recommended for user IDs) - with span.user(id="user_456"): - child_span = span.start_span(name="child-operation") - # child_span will have user_id="user_456" - - # Cross-service propagation (NOT recommended for sensitive user IDs) - with span.user(id="public_user_456", as_baggage=True): - # user_id will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) - token = otel_context_api.attach(new_context) - - # Set attribute on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("user.id", id) - - # Set baggage if requested - baggage_token = None - if as_baggage: - new_baggage = otel_baggage_api.set_baggage("user.id", id) - baggage_token = otel_context_api.attach(new_baggage) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach baggage token if it was set - if baggage_token is not None: - otel_context_api.detach(baggage_token) - - @_agnosticcontextmanager - def metadata( - self, *, as_baggage: bool = False, **kwargs: Any - ) -> Generator[None, None, None]: - """Create a metadata context manager that propagates metadata to all child spans. - - Args: - as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - **kwargs: Metadata key-value pairs. Values should not exceed 200 characters. - - Returns: - Context manager that sets metadata on all spans created within its scope. - - Warning: - When as_baggage=True, all metadata key-value pairs will be included in HTTP - headers of any outbound requests made within this context. Ensure no sensitive - information is included in the metadata when using cross-service propagation. - - Example: - ```python - # Local context only (default) - with span.metadata(experiment="A/B", version="1.2.3"): - child_span = span.start_span(name="child-operation") - # child_span will have the metadata - - # Cross-service propagation (use with caution) - with span.metadata(as_baggage=True, experiment="A/B", service="api"): - # metadata will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - if not kwargs: - # No metadata to set, just yield - yield - return - - # Store metadata as a dict in context (not JSON string) - # This allows span_processor to distribute keys as individual attributes - new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) - token = otel_context_api.attach(new_context) - - # Set attributes on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - for key, value in kwargs.items(): - attr_key = f"langfuse.metadata.{key}" - # Convert value to appropriate type for span attribute - if isinstance(value, (str, int, float, bool)): - attr_value = value - else: - # For complex types, convert to JSON string - attr_value = json.dumps(value) - current_span.set_attribute(attr_key, attr_value) - - # Set baggage if requested - baggage_tokens = [] - if as_baggage: - # Start with None context and chain baggage settings - new_baggage = None - - # Add each metadata key-value pair to baggage - for key, value in kwargs.items(): - # Convert value to string and truncate if needed for baggage - str_value = str(value) - baggage_key = f"langfuse.metadata.{key}" - new_baggage = otel_baggage_api.set_baggage( - baggage_key, str_value, new_baggage - ) - - # Attach the new baggage context - if new_baggage is not None: - baggage_token = otel_context_api.attach(new_baggage) - baggage_tokens.append(baggage_token) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach all baggage tokens if they were set - for baggage_token in baggage_tokens: - otel_context_api.detach(baggage_token) - class LangfuseSpan(LangfuseObservationWrapper): """Standard span implementation for general operations in Langfuse. diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 76db4f8b7..fc595a086 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -31,11 +31,11 @@ from langfuse._client.utils import span_formatter from langfuse.logger import langfuse_logger from langfuse.version import __version__ as langfuse_version - -# Context key constants for Langfuse context propagation -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" +from langfuse._client.context_propagation import ( + LANGFUSE_CTX_USER_ID, + LANGFUSE_CTX_SESSION_ID, + LANGFUSE_CTX_METADATA, +) class LangfuseSpanProcessor(BatchSpanProcessor): From 287da48c77eda1c57623e7a58eaf5c2b67cc664e Mon Sep 17 00:00:00 2001 From: steffen911 Date: Wed, 1 Oct 2025 15:30:47 +0200 Subject: [PATCH 11/15] chore: skip the langfuse project span check --- langfuse/_client/span_processor.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index fc595a086..206e418ba 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -134,13 +134,6 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None span: The span that is starting parent_context: The context when the span was created (optional) """ - if self._is_langfuse_span(span) and not self._is_langfuse_project_span(span): - langfuse_logger.debug( - f"Security: Span rejected - belongs to project '{span.instrumentation_scope.attributes.get('public_key') if span.instrumentation_scope and span.instrumentation_scope.attributes else None}' but processor is for '{self.public_key}'. " - f"This prevents cross-project data leakage in multi-project environments." - ) - return super().on_start(span, parent_context) - # Get the current context (use parent_context if available, otherwise current) current_context = parent_context or context_api.get_current() From a303b701202ffe4431ce4f1471d6adea1f54c066 Mon Sep 17 00:00:00 2001 From: steffen911 Date: Thu, 2 Oct 2025 09:13:51 +0200 Subject: [PATCH 12/15] chore: feedback --- langfuse/_client/client.py | 116 ++++++++++- langfuse/_client/context_propagation.py | 244 ------------------------ langfuse/_client/span.py | 5 +- langfuse/_client/span_processor.py | 2 +- tests/test_core_sdk.py | 36 ---- 5 files changed, 116 insertions(+), 287 deletions(-) delete mode 100644 langfuse/_client/context_propagation.py diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index b7f768d90..6b832f9cf 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -23,12 +23,15 @@ Union, cast, overload, + Generator, ) import backoff import httpx from opentelemetry import ( + baggage as otel_baggage_api, trace as otel_trace_api, + context as otel_context_api, ) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator @@ -111,10 +114,15 @@ TextPromptClient, ) from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext -from langfuse._client.context_propagation import LangfuseContextPropagationMixin -class Langfuse(LangfuseContextPropagationMixin): +# Context key constants for Langfuse context propagation +LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" +LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" +LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" + + +class Langfuse: """Main client for Langfuse tracing and platform features. This class provides an interface for creating and managing traces, spans, @@ -355,6 +363,108 @@ def start_span( status_message=status_message, ) + @_agnosticcontextmanager + def with_attributes( + self, + session_id: Optional[str] = None, + user_id: Optional[str] = None, + metadata: Optional[dict[str, str]] = None, + as_baggage: bool = False, + ) -> Generator[None, None, None]: + """Creates a context manager that propagates the given attributes to all spans created within the context. + + Args: + session_id (str): Session identifier. + user_id (str): User identifier. + metadata (dict): Additional metadata to associate with all spans in the context. Values must be strings and are truncated to 200 characters. + as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage + for cross-service propagation. If False, stores only in local context for + current-service propagation. Defaults to False. + + Returns: + Context manager that sets values on all spans created within its scope. + + Warning: + When as_baggage=True, the values will be included in HTTP headers of any + outbound requests made within this context. Only use this for non-sensitive + identifiers that are safe to transmit across service boundaries. + + Example: + ```python + # Local context only (default) + with langfuse.with_attributes(session_id="session_123"): + with langfuse.start_as_current_span(name="process-request") as span: + # This span and all its children will have session_id="session_123" + child_span = langfuse.start_span(name="child-operation") + + # Cross-service propagation (use with caution) + with langfuse.with_attributes(session_id="session_123", as_baggage=True): + # session_id will be propagated to external service calls + response = requests.get("https://api.example.com/data") + ``` + """ + current_context = otel_context_api.get_current() + current_span = otel_trace_api.get_current_span() + + # Process session_id + if session_id is not None: + current_context = otel_context_api.set_value( + LANGFUSE_CTX_SESSION_ID, session_id, current_context + ) + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("session.id", session_id) + if as_baggage: + current_context = otel_baggage_api.set_baggage( + "session.id", session_id, current_context + ) + + # Process user_id + if user_id is not None: + current_context = otel_context_api.set_value( + LANGFUSE_CTX_USER_ID, user_id, current_context + ) + if current_span is not None and current_span.is_recording(): + current_span.set_attribute("user.id", user_id) + if as_baggage: + current_context = otel_baggage_api.set_baggage( + "user.id", user_id, current_context + ) + + # Process metadata + if metadata is not None: + # Truncate values with size > 200 to 200 characters and emit warning including the ky + for k, v in metadata.items(): + if not isinstance(v, str): + # Ignore unreachable mypy warning as this runtime guard should make sense either way + warnings.warn( # type: ignore[unreachable] + f"Metadata values must be strings, got {type(v)} for key '{k}'" + ) + del metadata[k] + if len(v) > 200: + warnings.warn( + f"Metadata value for key '{k}' exceeds 200 characters and will be truncated." + ) + metadata[k] = v[:200] + + current_context = otel_context_api.set_value( + LANGFUSE_CTX_METADATA, metadata, current_context + ) + if current_span is not None and current_span.is_recording(): + for k, v in metadata.items(): + current_span.set_attribute(f"langfuse.metadata.{k}", v) + if as_baggage: + for k, v in metadata.items(): + current_context = otel_baggage_api.set_baggage( + f"langfuse.metadata.{k}", str(v), current_context + ) + + # Activate context, execute, and detach context + token = otel_context_api.attach(current_context) + try: + yield + finally: + otel_context_api.detach(token) + def start_as_current_span( self, *, @@ -1673,7 +1783,7 @@ def update_current_trace( ``` """ warnings.warn( - "update_current_trace is deprecated and will be removed in a future version. ", + "update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ", DeprecationWarning, stacklevel=2, ) diff --git a/langfuse/_client/context_propagation.py b/langfuse/_client/context_propagation.py deleted file mode 100644 index 095f11ae3..000000000 --- a/langfuse/_client/context_propagation.py +++ /dev/null @@ -1,244 +0,0 @@ -"""Context propagation utilities for Langfuse tracing. - -This module provides a mixin class that enables automatic propagation of trace -attributes (session_id, user_id, metadata) from parent contexts to child spans -using OpenTelemetry's context and baggage mechanisms. - -The mixin is shared between the main Langfuse client and span classes to provide -consistent context propagation behavior across the SDK. -""" - -import json -from typing import Any, Generator - -from opentelemetry import ( - baggage as otel_baggage_api, - context as otel_context_api, - trace as otel_trace_api, -) -from opentelemetry.util._decorator import _agnosticcontextmanager - -# Context key constants for Langfuse context propagation -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" - - -class LangfuseContextPropagationMixin: - """Mixin providing context managers for automatic trace attribute propagation. - - This mixin adds three context managers (session, user, metadata) that enable - automatic propagation of trace attributes to all child spans created within - their scope. The propagation works through OpenTelemetry's context mechanism - for local (same-service) propagation, with optional baggage for cross-service - propagation. - - Classes that inherit this mixin gain the ability to create contexts where - certain attributes are automatically applied to all spans without manual - specification. - """ - - @_agnosticcontextmanager - def session( - self, id: str, *, as_baggage: bool = False - ) -> Generator[None, None, None]: - """Create a session context manager that propagates session_id to all child spans. - - Args: - id (str): The session identifier to propagate to child spans. - as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - - Returns: - Context manager that sets session_id on all spans created within its scope. - - Warning: - When as_baggage=True, the session_id will be included in HTTP headers of any - outbound requests made within this context. Only use this for non-sensitive - identifiers that are safe to transmit across service boundaries. - - Example: - ```python - # Local context only (default) - with langfuse.session(id="session_123"): - with langfuse.start_as_current_span(name="process-request") as span: - # This span and all its children will have session_id="session_123" - child_span = langfuse.start_span(name="child-operation") - - # Cross-service propagation (use with caution) - with langfuse.session(id="session_123", as_baggage=True): - # session_id will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) - token = otel_context_api.attach(new_context) - - # Set attribute on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("session.id", id) - - # Set baggage if requested - baggage_token = None - if as_baggage: - new_baggage = otel_baggage_api.set_baggage("session.id", id) - baggage_token = otel_context_api.attach(new_baggage) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach baggage token if it was set - if baggage_token is not None: - otel_context_api.detach(baggage_token) - - @_agnosticcontextmanager - def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]: - """Create a user context manager that propagates user_id to all child spans. - - Args: - id (str): The user identifier to propagate to child spans. - as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - - Returns: - Context manager that sets user_id on all spans created within its scope. - - Warning: - When as_baggage=True, the user_id will be included in HTTP headers of any - outbound requests made within this context. This may leak sensitive user - information to external services. Use with extreme caution. - - Example: - ```python - # Local context only (default, recommended for user IDs) - with langfuse.user(id="user_456"): - with langfuse.start_as_current_span(name="user-action") as span: - # This span and all its children will have user_id="user_456" - pass - - # Cross-service propagation (NOT recommended for sensitive user IDs) - with langfuse.user(id="public_user_456", as_baggage=True): - # user_id will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - # Set context variable - new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) - token = otel_context_api.attach(new_context) - - # Set attribute on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("user.id", id) - - # Set baggage if requested - baggage_token = None - if as_baggage: - new_baggage = otel_baggage_api.set_baggage("user.id", id) - baggage_token = otel_context_api.attach(new_baggage) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach baggage token if it was set - if baggage_token is not None: - otel_context_api.detach(baggage_token) - - @_agnosticcontextmanager - def metadata( - self, *, as_baggage: bool = False, **kwargs: Any - ) -> Generator[None, None, None]: - """Create a metadata context manager that propagates metadata to all child spans. - - Args: - as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage - for cross-service propagation. If False, stores only in local context for - current-service propagation. Defaults to False. - **kwargs: Metadata key-value pairs. Values should not exceed 200 characters. - - Returns: - Context manager that sets metadata on all spans created within its scope. - - Warning: - When as_baggage=True, all metadata key-value pairs will be included in HTTP - headers of any outbound requests made within this context. Ensure no sensitive - information is included in the metadata when using cross-service propagation. - - Example: - ```python - # Local context only (default) - with langfuse.metadata(experiment="A/B", version="1.2.3"): - with langfuse.start_as_current_span(name="experiment-run") as span: - # This span and all its children will have the metadata - pass - - # Cross-service propagation (use with caution) - with langfuse.metadata(as_baggage=True, experiment="A/B", service="api"): - # metadata will be propagated to external service calls - response = requests.get("https://api.example.com/data") - ``` - """ - if not kwargs: - # No metadata to set, just yield - yield - return - - # Store metadata as a dict in context (not JSON string) - # This allows span_processor to distribute keys as individual attributes - new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) - token = otel_context_api.attach(new_context) - - # Set attributes on currently active span if exists - current_span = otel_trace_api.get_current_span() - if current_span is not None and current_span.is_recording(): - for key, value in kwargs.items(): - attr_key = f"langfuse.metadata.{key}" - # Convert value to appropriate type for span attribute - if isinstance(value, (str, int, float, bool)): - attr_value = value - else: - # For complex types, convert to JSON string - attr_value = json.dumps(value) - current_span.set_attribute(attr_key, attr_value) - - # Set baggage if requested - baggage_token = None - if as_baggage: - # Start with None context and chain baggage settings - new_baggage = None - - # Add each metadata key-value pair to baggage - for key, value in kwargs.items(): - # Convert value to string and truncate if needed for baggage - str_value = str(value) - if len(str_value) > 200: - str_value = str_value[:200] - - baggage_key = f"metadata.{key}" - new_baggage = otel_baggage_api.set_baggage( - baggage_key, str_value, new_baggage - ) - - # Attach the new baggage context - if new_baggage is not None: - baggage_token = otel_context_api.attach(new_baggage) - - try: - yield - finally: - # Always detach context token - otel_context_api.detach(token) - - # Detach all baggage tokens if they were set - if baggage_token is not None: - otel_context_api.detach(baggage_token) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 1c7b7473a..e495e701b 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -56,7 +56,6 @@ ) from langfuse.logger import langfuse_logger from langfuse.types import MapValue, ScoreDataType, SpanLevel -from langfuse._client.context_propagation import LangfuseContextPropagationMixin # Factory mapping for observation classes # Note: "event" is handled separately due to special instantiation logic @@ -64,7 +63,7 @@ _OBSERVATION_CLASS_MAP: Dict[str, Type["LangfuseObservationWrapper"]] = {} -class LangfuseObservationWrapper(LangfuseContextPropagationMixin): +class LangfuseObservationWrapper: """Abstract base class for all Langfuse span types. This class provides common functionality for all Langfuse span types, including @@ -239,7 +238,7 @@ def update_trace( public: Whether the trace should be publicly accessible """ warnings.warn( - "update_trace is deprecated and will be removed in a future version. ", + "update_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ", DeprecationWarning, stacklevel=2, ) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 206e418ba..c64d7eb13 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -31,7 +31,7 @@ from langfuse._client.utils import span_formatter from langfuse.logger import langfuse_logger from langfuse.version import __version__ as langfuse_version -from langfuse._client.context_propagation import ( +from langfuse._client.client import ( LANGFUSE_CTX_USER_ID, LANGFUSE_CTX_SESSION_ID, LANGFUSE_CTX_METADATA, diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index aa15457fb..317ea83eb 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2257,39 +2257,3 @@ def test_context_manager_baggage_propagation(): assert ( obs.metadata["version"] == "v1.0" ), f"Observation {obs.name} missing version metadata" - - -def test_span_context_managers(): - """Test context managers called on span instances.""" - langfuse = Langfuse() - - with langfuse.start_as_current_span(name="parent-span") as parent_span: - trace_id = parent_span.trace_id - - # Use context managers on the span instance - with parent_span.user(id="span_user_123"): - child1 = parent_span.start_span(name="user-context-child") - child1.end() - - with parent_span.session(id="span_session_456"): - child2 = parent_span.start_span(name="session-context-child") - child2.end() - - with parent_span.metadata(component="auth", action="validate"): - child3 = parent_span.start_span(name="metadata-context-child") - child3.end() - - langfuse.flush() - sleep(2) - - # Verify trace was created with child observations - trace = get_api().trace.get(trace_id) - child_names = [ - obs.name for obs in trace.observations if obs.name.endswith("-child") - ] - expected_children = [ - "user-context-child", - "session-context-child", - "metadata-context-child", - ] - assert all(child in child_names for child in expected_children) From 8b09a9b1cc6e8b35f014993d14fec95331fc1f01 Mon Sep 17 00:00:00 2001 From: steffen911 Date: Thu, 2 Oct 2025 09:23:07 +0200 Subject: [PATCH 13/15] chore: cleanup --- langfuse/_client/span.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index e495e701b..e02707dc2 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -29,12 +29,8 @@ overload, ) -from opentelemetry import ( - trace as otel_trace_api, -) -from opentelemetry.util._decorator import ( - _AgnosticContextManager, -) +from opentelemetry import trace as otel_trace_api +from opentelemetry.util._decorator import _AgnosticContextManager from langfuse.model import PromptClient From d210b493f04dc1529ab252dbde83439dbff7e3cc Mon Sep 17 00:00:00 2001 From: steffen911 Date: Thu, 2 Oct 2025 09:48:52 +0200 Subject: [PATCH 14/15] chore: update test cases and cleanup --- langfuse/_client/client.py | 9 ++--- langfuse/_client/constants.py | 5 +++ langfuse/_client/span_processor.py | 62 ++++++++++++------------------ tests/test_core_sdk.py | 37 ++++++++++-------- 4 files changed, 55 insertions(+), 58 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 6b832f9cf..fa4196b34 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -48,6 +48,9 @@ ObservationTypeLiteralNoEvent, ObservationTypeSpanLike, get_observation_types_list, + LANGFUSE_CTX_USER_ID, + LANGFUSE_CTX_SESSION_ID, + LANGFUSE_CTX_METADATA, ) from langfuse._client.datasets import DatasetClient, DatasetItemClient from langfuse._client.environment_variables import ( @@ -116,12 +119,6 @@ from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext -# Context key constants for Langfuse context propagation -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" - - class Langfuse: """Main client for Langfuse tracing and platform features. diff --git a/langfuse/_client/constants.py b/langfuse/_client/constants.py index b699480c0..2a1297de6 100644 --- a/langfuse/_client/constants.py +++ b/langfuse/_client/constants.py @@ -8,6 +8,11 @@ LANGFUSE_TRACER_NAME = "langfuse-sdk" +# Context key constants for Langfuse context propagation +LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" +LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" +LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" + """Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat""" ObservationTypeGenerationLike: TypeAlias = Literal[ diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index c64d7eb13..2433c0f21 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -22,7 +22,12 @@ from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor -from langfuse._client.constants import LANGFUSE_TRACER_NAME +from langfuse._client.constants import ( + LANGFUSE_TRACER_NAME, + LANGFUSE_CTX_USER_ID, + LANGFUSE_CTX_SESSION_ID, + LANGFUSE_CTX_METADATA, +) from langfuse._client.environment_variables import ( LANGFUSE_FLUSH_AT, LANGFUSE_FLUSH_INTERVAL, @@ -31,11 +36,6 @@ from langfuse._client.utils import span_formatter from langfuse.logger import langfuse_logger from langfuse.version import __version__ as langfuse_version -from langfuse._client.client import ( - LANGFUSE_CTX_USER_ID, - LANGFUSE_CTX_SESSION_ID, - LANGFUSE_CTX_METADATA, -) class LangfuseSpanProcessor(BatchSpanProcessor): @@ -143,10 +143,11 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None # 1. Propagate all baggage keys as span attributes baggage_entries = baggage.get_all(context=current_context) for key, value in baggage_entries.items(): - # Check if this baggage entry is already present as a span attribute - if not hasattr(span.attributes, key) or ( - span.attributes is not None and span.attributes.get(key) != value - ): + # Only propagate user.id, session.id and langfuse.metadata.* as those are set by us on the baggage + if key.startswith("langfuse.metadata.") or key in [ + "user.id", + "session.id", + ]: propagated_attributes[key] = value langfuse_logger.debug( f"Propagated baggage key '{key}' = '{value}' to span '{span.name}'" @@ -160,16 +161,10 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None if value is not None: # Convert context key to span attribute name (remove langfuse.ctx. prefix) attr_key = ctx_key.replace("langfuse.ctx.", "") - - # Only propagate if not already set on span - if not hasattr(span.attributes, attr_key) or ( - span.attributes is not None - and span.attributes.get(attr_key) != value - ): - propagated_attributes[attr_key] = value - langfuse_logger.debug( - f"Propagated context key '{ctx_key}' = '{value}' to span '{span.name}'" - ) + propagated_attributes[attr_key] = value + langfuse_logger.debug( + f"Propagated context key '{ctx_key}' = '{value}' to span '{span.name}'" + ) except Exception as e: langfuse_logger.debug(f"Could not read context key '{ctx_key}': {e}") @@ -184,24 +179,17 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None for key, value in metadata_dict.items(): attr_key = f"langfuse.metadata.{key}" - # Convert value to appropriate type for span attribute - if isinstance(value, (str, int, float, bool)): - attr_value = value - else: - # For complex types, convert to JSON string - attr_value = json.dumps(value) - - # Only propagate if not already set or different - existing_value = ( - span.attributes.get(attr_key) - if hasattr(span, "attributes") and span.attributes is not None - else None + # Convert value to appropriate type for span attribute (naive or json stringify) + attr_value = ( + value + if isinstance(value, (str, int, float, bool)) + else json.dumps(value) + ) + + propagated_attributes[attr_key] = attr_value + langfuse_logger.debug( + f"Propagated metadata key '{key}' = '{attr_value}' to span '{span.name}'" ) - if existing_value != attr_value: - propagated_attributes[attr_key] = attr_value - langfuse_logger.debug( - f"Propagated metadata key '{key}' = '{attr_value}' to span '{span.name}'" - ) except Exception as e: langfuse_logger.debug(f"Could not read metadata from context: {e}") diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index 317ea83eb..d5af127c6 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2071,7 +2071,7 @@ def test_context_manager_user_propagation(): user_id = "test_user_123" with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.user(id=user_id): + with langfuse.with_attributes(user_id=user_id): trace_id = parent_span.trace_id # Create child spans that should inherit user_id @@ -2107,7 +2107,7 @@ def test_context_manager_session_propagation(): session_id = "test_session_456" with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.session(id=session_id): + with langfuse.with_attributes(session_id=session_id): trace_id = parent_span.trace_id # Create child spans that should inherit session_id @@ -2142,8 +2142,12 @@ def test_context_manager_metadata_propagation(): langfuse = Langfuse() with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.metadata( - experiment="A/B", version="1.2.3", feature_flag="enabled" + with langfuse.with_attributes( + metadata={ + "experiment": "A/B", + "version": "1.2.3", + "feature_flag": "enabled", + } ): trace_id = parent_span.trace_id @@ -2185,17 +2189,18 @@ def test_context_manager_nested_contexts(): langfuse = Langfuse() with langfuse.start_as_current_span(name="outer-span") as outer_span: - with langfuse.user(id="user_1"): - with langfuse.session(id="session_1"): - with langfuse.metadata(env="prod", region="us-east"): - outer_trace_id = outer_span.trace_id + with langfuse.with_attributes(user_id="user_1", session_id="session_1"): + with langfuse.with_attributes( + metadata={"env": "prod", "region": "us-east"} + ): + outer_trace_id = outer_span.trace_id - # Create span in outer context - outer_child = langfuse.start_span(name="outer-child") - outer_child.end() + # Create span in outer context + outer_child = langfuse.start_span(name="outer-child") + outer_child.end() - nested_span = langfuse.start_span(name="nested-span") - nested_span.end() + nested_span = langfuse.start_span(name="nested-span") + nested_span.end() langfuse.flush() sleep(2) @@ -2229,8 +2234,10 @@ def test_context_manager_baggage_propagation(): # Test with baggage enabled (careful with sensitive data) with langfuse.start_as_current_span(name="service-span") as span: - with langfuse.session(id="public_session_789", as_baggage=True): - with langfuse.metadata(as_baggage=True, service="api", version="v1.0"): + with langfuse.with_attributes(session_id="public_session_789", as_baggage=True): + with langfuse.with_attributes( + as_baggage=True, metadata={"service": "api", "version": "v1.0"} + ): trace_id = span.trace_id # Create child spans that inherit baggage context From df6b82d197d8d8e9a6ff975aa69ed43e2cd3b556 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Wed, 8 Oct 2025 11:12:44 +0200 Subject: [PATCH 15/15] feat: propagate trace attributes onto all child spans on update (#1396) --- langfuse/_client/attributes.py | 1 - langfuse/_client/client.py | 114 ++++++++++++----------------- langfuse/_client/constants.py | 8 +- langfuse/_client/span.py | 8 +- langfuse/_client/span_processor.py | 114 ++++++++++------------------- langfuse/_client/utils.py | 15 ++++ tests/test_core_sdk.py | 24 +++--- 7 files changed, 120 insertions(+), 164 deletions(-) diff --git a/langfuse/_client/attributes.py b/langfuse/_client/attributes.py index 5ae81000c..75c5645ea 100644 --- a/langfuse/_client/attributes.py +++ b/langfuse/_client/attributes.py @@ -18,7 +18,6 @@ ObservationTypeGenerationLike, ObservationTypeSpanLike, ) - from langfuse._utils.serializer import EventSerializer from langfuse.model import PromptClient from langfuse.types import MapValue, SpanLevel diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index fa4196b34..a98cbda2c 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -16,6 +16,7 @@ Any, Callable, Dict, + Generator, List, Literal, Optional, @@ -23,16 +24,19 @@ Union, cast, overload, - Generator, ) import backoff import httpx from opentelemetry import ( baggage as otel_baggage_api, - trace as otel_trace_api, +) +from opentelemetry import ( context as otel_context_api, ) +from opentelemetry import ( + trace as otel_trace_api, +) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.util._decorator import ( @@ -43,14 +47,12 @@ from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.constants import ( + LANGFUSE_CORRELATION_CONTEXT_KEY, ObservationTypeGenerationLike, ObservationTypeLiteral, ObservationTypeLiteralNoEvent, ObservationTypeSpanLike, get_observation_types_list, - LANGFUSE_CTX_USER_ID, - LANGFUSE_CTX_SESSION_ID, - LANGFUSE_CTX_METADATA, ) from langfuse._client.datasets import DatasetClient, DatasetItemClient from langfuse._client.environment_variables import ( @@ -76,7 +78,10 @@ LangfuseSpan, LangfuseTool, ) -from langfuse._client.utils import run_async_safely +from langfuse._client.utils import ( + get_attribute_key_from_correlation_context, + run_async_safely, +) from langfuse._utils import _get_timestamp from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache @@ -219,10 +224,8 @@ def __init__( additional_headers: Optional[Dict[str, str]] = None, tracer_provider: Optional[TracerProvider] = None, ): - self._host = ( - host - if host is not None - else os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") + self._host = host or cast( + str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") ) self._environment = environment or cast( str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) @@ -361,19 +364,18 @@ def start_span( ) @_agnosticcontextmanager - def with_attributes( + def correlation_context( self, - session_id: Optional[str] = None, - user_id: Optional[str] = None, - metadata: Optional[dict[str, str]] = None, + correlation_context: Dict[str, str], + *, as_baggage: bool = False, ) -> Generator[None, None, None]: - """Creates a context manager that propagates the given attributes to all spans created within the context. + """Create a context manager that propagates the given correlation_context to all spans within the context manager's scope. Args: - session_id (str): Session identifier. - user_id (str): User identifier. - metadata (dict): Additional metadata to associate with all spans in the context. Values must be strings and are truncated to 200 characters. + correlation_context (Dict[str, str]): Dictionary containing key-value pairs to be propagated + to all spans within the context manager's scope. Common keys include user_id, session_id, + and custom metadata. All values must be strings below 200 characters. as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage for cross-service propagation. If False, stores only in local context for current-service propagation. Defaults to False. @@ -386,16 +388,21 @@ def with_attributes( outbound requests made within this context. Only use this for non-sensitive identifiers that are safe to transmit across service boundaries. - Example: + Examples: ```python - # Local context only (default) - with langfuse.with_attributes(session_id="session_123"): + # Local context only (default) - pass context as dictionary + with langfuse.correlation_context({"session_id": "session_123"}): with langfuse.start_as_current_span(name="process-request") as span: # This span and all its children will have session_id="session_123" child_span = langfuse.start_span(name="child-operation") + # Multiple values in context dictionary + with langfuse.correlation_context({"user_id": "user_456", "experiment": "A"}): + # All spans will have both user_id and experiment attributes + span = langfuse.start_span(name="experiment-operation") + # Cross-service propagation (use with caution) - with langfuse.with_attributes(session_id="session_123", as_baggage=True): + with langfuse.correlation_context({"session_id": "session_123"}, as_baggage=True): # session_id will be propagated to external service calls response = requests.get("https://api.example.com/data") ``` @@ -403,62 +410,33 @@ def with_attributes( current_context = otel_context_api.get_current() current_span = otel_trace_api.get_current_span() - # Process session_id - if session_id is not None: - current_context = otel_context_api.set_value( - LANGFUSE_CTX_SESSION_ID, session_id, current_context - ) - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("session.id", session_id) - if as_baggage: - current_context = otel_baggage_api.set_baggage( - "session.id", session_id, current_context - ) + current_context = otel_context_api.set_value( + LANGFUSE_CORRELATION_CONTEXT_KEY, correlation_context, current_context + ) - # Process user_id - if user_id is not None: - current_context = otel_context_api.set_value( - LANGFUSE_CTX_USER_ID, user_id, current_context - ) - if current_span is not None and current_span.is_recording(): - current_span.set_attribute("user.id", user_id) - if as_baggage: - current_context = otel_baggage_api.set_baggage( - "user.id", user_id, current_context + for key, value in correlation_context.items(): + if len(value) > 200: + langfuse_logger.warning( + f"Correlation context key '{key}' is over 200 characters ({len(value)} chars). Dropping value." ) + continue - # Process metadata - if metadata is not None: - # Truncate values with size > 200 to 200 characters and emit warning including the ky - for k, v in metadata.items(): - if not isinstance(v, str): - # Ignore unreachable mypy warning as this runtime guard should make sense either way - warnings.warn( # type: ignore[unreachable] - f"Metadata values must be strings, got {type(v)} for key '{k}'" - ) - del metadata[k] - if len(v) > 200: - warnings.warn( - f"Metadata value for key '{k}' exceeds 200 characters and will be truncated." - ) - metadata[k] = v[:200] + attribute_key = get_attribute_key_from_correlation_context(key) - current_context = otel_context_api.set_value( - LANGFUSE_CTX_METADATA, metadata, current_context - ) if current_span is not None and current_span.is_recording(): - for k, v in metadata.items(): - current_span.set_attribute(f"langfuse.metadata.{k}", v) + current_span.set_attribute(attribute_key, value) + if as_baggage: - for k, v in metadata.items(): - current_context = otel_baggage_api.set_baggage( - f"langfuse.metadata.{k}", str(v), current_context - ) + current_context = otel_baggage_api.set_baggage( + key, value, current_context + ) # Activate context, execute, and detach context token = otel_context_api.attach(current_context) + try: yield + finally: otel_context_api.detach(token) @@ -1780,7 +1758,7 @@ def update_current_trace( ``` """ warnings.warn( - "update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ", + "update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ", DeprecationWarning, stacklevel=2, ) diff --git a/langfuse/_client/constants.py b/langfuse/_client/constants.py index 2a1297de6..b385f0a8f 100644 --- a/langfuse/_client/constants.py +++ b/langfuse/_client/constants.py @@ -3,15 +3,13 @@ This module defines constants used throughout the Langfuse OpenTelemetry integration. """ -from typing import Literal, List, get_args, Union, Any +from typing import Any, List, Literal, Union, get_args + from typing_extensions import TypeAlias LANGFUSE_TRACER_NAME = "langfuse-sdk" -# Context key constants for Langfuse context propagation -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" +LANGFUSE_CORRELATION_CONTEXT_KEY = "langfuse.ctx.correlation" """Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat""" diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index e02707dc2..fb9a2849d 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -13,9 +13,9 @@ and scoring integration specific to Langfuse's observability platform. """ +import warnings from datetime import datetime from time import time_ns -import warnings from typing import ( TYPE_CHECKING, Any, @@ -44,10 +44,10 @@ create_trace_attributes, ) from langfuse._client.constants import ( - ObservationTypeLiteral, ObservationTypeGenerationLike, - ObservationTypeSpanLike, + ObservationTypeLiteral, ObservationTypeLiteralNoEvent, + ObservationTypeSpanLike, get_observation_types_list, ) from langfuse.logger import langfuse_logger @@ -234,7 +234,7 @@ def update_trace( public: Whether the trace should be publicly accessible """ warnings.warn( - "update_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ", + "update_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ", DeprecationWarning, stacklevel=2, ) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 2433c0f21..c62c5ad83 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -12,28 +12,32 @@ """ import base64 -import json import os from typing import Dict, List, Optional -from opentelemetry import baggage, context as context_api +from opentelemetry import baggage +from opentelemetry import context as context_api from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import format_span_id +from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.constants import ( + LANGFUSE_CORRELATION_CONTEXT_KEY, LANGFUSE_TRACER_NAME, - LANGFUSE_CTX_USER_ID, - LANGFUSE_CTX_SESSION_ID, - LANGFUSE_CTX_METADATA, ) from langfuse._client.environment_variables import ( LANGFUSE_FLUSH_AT, LANGFUSE_FLUSH_INTERVAL, LANGFUSE_OTEL_TRACES_EXPORT_PATH, ) -from langfuse._client.utils import span_formatter +from langfuse._client.utils import ( + correlation_context_to_attribute_map, + get_attribute_key_from_correlation_context, + span_formatter, +) from langfuse.logger import langfuse_logger from langfuse.version import __version__ as langfuse_version @@ -123,86 +127,46 @@ def __init__( ) def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: - """Handle span start event and propagate context and baggage to span attributes. - - This method is called when a span starts and applies context propagation: - 1. Propagates all baggage keys as span attributes - 2. Propagates langfuse.ctx.* context variables as span attributes - 3. Distributes langfuse.ctx.metadata keys as individual langfuse.metadata.* attributes - - Args: - span: The span that is starting - parent_context: The context when the span was created (optional) - """ - # Get the current context (use parent_context if available, otherwise current) + # Propagate correlation context to span current_context = parent_context or context_api.get_current() - - # Dictionary to collect span attributes that were propagated propagated_attributes = {} - # 1. Propagate all baggage keys as span attributes + # Propagate correlation context in baggage baggage_entries = baggage.get_all(context=current_context) + for key, value in baggage_entries.items(): - # Only propagate user.id, session.id and langfuse.metadata.* as those are set by us on the baggage - if key.startswith("langfuse.metadata.") or key in [ - "user.id", - "session.id", - ]: + if ( + key.startswith(LangfuseOtelSpanAttributes.TRACE_METADATA) + or key in correlation_context_to_attribute_map.values() + ): propagated_attributes[key] = value - langfuse_logger.debug( - f"Propagated baggage key '{key}' = '{value}' to span '{span.name}'" - ) - - # 2. Propagate langfuse.ctx.* context variables - langfuse_ctx_keys = [LANGFUSE_CTX_USER_ID, LANGFUSE_CTX_SESSION_ID] - for ctx_key in langfuse_ctx_keys: - try: - value = context_api.get_value(ctx_key, context=current_context) - if value is not None: - # Convert context key to span attribute name (remove langfuse.ctx. prefix) - attr_key = ctx_key.replace("langfuse.ctx.", "") - propagated_attributes[attr_key] = value - langfuse_logger.debug( - f"Propagated context key '{ctx_key}' = '{value}' to span '{span.name}'" - ) - except Exception as e: - langfuse_logger.debug(f"Could not read context key '{ctx_key}': {e}") - - # 3. Handle langfuse.ctx.metadata - distribute keys as individual attributes - try: - # Get metadata dict from context - metadata_dict = context_api.get_value( - LANGFUSE_CTX_METADATA, context=current_context + + # Propagate correlation context in OTEL context + correlation_context = ( + context_api.get_value(LANGFUSE_CORRELATION_CONTEXT_KEY, current_context) + or {} + ) + + if not isinstance(correlation_context, dict): + langfuse_logger.error( + f"Correlation context is not of type dict. Got type '{type(correlation_context)}'." ) - if metadata_dict is not None and isinstance(metadata_dict, dict): - # Set each metadata key as a separate span attribute with langfuse.metadata. prefix - for key, value in metadata_dict.items(): - attr_key = f"langfuse.metadata.{key}" - - # Convert value to appropriate type for span attribute (naive or json stringify) - attr_value = ( - value - if isinstance(value, (str, int, float, bool)) - else json.dumps(value) - ) - - propagated_attributes[attr_key] = attr_value - langfuse_logger.debug( - f"Propagated metadata key '{key}' = '{attr_value}' to span '{span.name}'" - ) - except Exception as e: - langfuse_logger.debug(f"Could not read metadata from context: {e}") - - # Log summary of propagated attributes + + return super().on_start(span, parent_context) + + for key, value in correlation_context.items(): + attribute_key = get_attribute_key_from_correlation_context(key) + propagated_attributes[attribute_key] = value + + # Write attributes on span if propagated_attributes: + for key, value in propagated_attributes.items(): + span.set_attribute(key, str(value)) + langfuse_logger.debug( - f"Propagated {len(propagated_attributes)} attributes to span '{span.name}': {list(propagated_attributes.keys())}" + f"Propagated {len(propagated_attributes)} attributes to span '{format_span_id(span.context.span_id)}': {propagated_attributes}" ) - # Set all propagated attributes on the span - for key, value in propagated_attributes.items(): - span.set_attribute(key, value) # type: ignore[arg-type] - return super().on_start(span, parent_context) def on_end(self, span: ReadableSpan) -> None: diff --git a/langfuse/_client/utils.py b/langfuse/_client/utils.py index d34857ebd..340daddb6 100644 --- a/langfuse/_client/utils.py +++ b/langfuse/_client/utils.py @@ -13,6 +13,8 @@ from opentelemetry.sdk import util from opentelemetry.sdk.trace import ReadableSpan +from langfuse._client.attributes import LangfuseOtelSpanAttributes + def span_formatter(span: ReadableSpan) -> str: parent_id = ( @@ -125,3 +127,16 @@ async def my_async_function(): else: # Loop exists but not running, safe to use asyncio.run() return asyncio.run(coro) + + +correlation_context_to_attribute_map = { + "session_id": LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "user_id": LangfuseOtelSpanAttributes.TRACE_USER_ID, +} + + +def get_attribute_key_from_correlation_context(correlation_context_key: str) -> str: + return ( + correlation_context_to_attribute_map.get(correlation_context_key) + or f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{correlation_context_key}" + ) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index d5af127c6..829d9d971 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -2071,7 +2071,7 @@ def test_context_manager_user_propagation(): user_id = "test_user_123" with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.with_attributes(user_id=user_id): + with langfuse.correlation_context({"user_id": user_id}): trace_id = parent_span.trace_id # Create child spans that should inherit user_id @@ -2107,7 +2107,7 @@ def test_context_manager_session_propagation(): session_id = "test_session_456" with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.with_attributes(session_id=session_id): + with langfuse.correlation_context({"session_id": session_id}): trace_id = parent_span.trace_id # Create child spans that should inherit session_id @@ -2142,8 +2142,8 @@ def test_context_manager_metadata_propagation(): langfuse = Langfuse() with langfuse.start_as_current_span(name="parent-span") as parent_span: - with langfuse.with_attributes( - metadata={ + with langfuse.correlation_context( + { "experiment": "A/B", "version": "1.2.3", "feature_flag": "enabled", @@ -2189,10 +2189,10 @@ def test_context_manager_nested_contexts(): langfuse = Langfuse() with langfuse.start_as_current_span(name="outer-span") as outer_span: - with langfuse.with_attributes(user_id="user_1", session_id="session_1"): - with langfuse.with_attributes( - metadata={"env": "prod", "region": "us-east"} - ): + with langfuse.correlation_context( + {"user_id": "user_1", "session_id": "session_1"} + ): + with langfuse.correlation_context({"env": "prod", "region": "us-east"}): outer_trace_id = outer_span.trace_id # Create span in outer context @@ -2234,9 +2234,11 @@ def test_context_manager_baggage_propagation(): # Test with baggage enabled (careful with sensitive data) with langfuse.start_as_current_span(name="service-span") as span: - with langfuse.with_attributes(session_id="public_session_789", as_baggage=True): - with langfuse.with_attributes( - as_baggage=True, metadata={"service": "api", "version": "v1.0"} + with langfuse.correlation_context( + {"session_id": "public_session_789"}, as_baggage=True + ): + with langfuse.correlation_context( + {"service": "api", "version": "v1.0"}, as_baggage=True ): trace_id = span.trace_id