diff --git a/langfuse/__init__.py b/langfuse/__init__.py index b2b73b54b..f96b18bc8 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -7,6 +7,7 @@ from ._client.constants import ObservationTypeLiteral from ._client.get_client import get_client from ._client.observe import observe +from ._client.propagation import propagate_attributes from ._client.span import ( LangfuseAgent, LangfuseChain, @@ -26,6 +27,7 @@ "Langfuse", "get_client", "observe", + "propagate_attributes", "ObservationTypeLiteral", "LangfuseSpan", "LangfuseGeneration", diff --git a/langfuse/_client/attributes.py b/langfuse/_client/attributes.py index 5ae81000c..343c70cdb 100644 --- a/langfuse/_client/attributes.py +++ b/langfuse/_client/attributes.py @@ -18,7 +18,6 @@ ObservationTypeGenerationLike, ObservationTypeSpanLike, ) - from langfuse._utils.serializer import EventSerializer from langfuse.model import PromptClient from langfuse.types import MapValue, SpanLevel @@ -60,6 +59,17 @@ class LangfuseOtelSpanAttributes: # Internal AS_ROOT = "langfuse.internal.as_root" + # Experiments + EXPERIMENT_ID = "langfuse.experiment.id" + EXPERIMENT_NAME = "langfuse.experiment.name" + EXPERIMENT_DESCRIPTION = "langfuse.experiment.description" + EXPERIMENT_METADATA = "langfuse.experiment.metadata" + EXPERIMENT_DATASET_ID = "langfuse.experiment.dataset.id" + EXPERIMENT_ITEM_ID = "langfuse.experiment.item.id" + EXPERIMENT_ITEM_EXPECTED_OUTPUT = "langfuse.experiment.item.expected_output" + EXPERIMENT_ITEM_METADATA = "langfuse.experiment.item.metadata" + EXPERIMENT_ITEM_ROOT_OBSERVATION_ID = "langfuse.experiment.item.root_observation_id" + def create_trace_attributes( *, diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 7b9dcfcc5..55a9667e3 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -27,7 +27,6 @@ import backoff import httpx -from opentelemetry import trace from opentelemetry import trace as otel_trace_api from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator @@ -37,8 +36,9 @@ ) from packaging.version import Version -from langfuse._client.attributes import LangfuseOtelSpanAttributes +from langfuse._client.attributes import LangfuseOtelSpanAttributes, _serialize from langfuse._client.constants import ( + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, ObservationTypeGenerationLike, ObservationTypeLiteral, ObservationTypeLiteralNoEvent, @@ -57,6 +57,10 @@ LANGFUSE_TRACING_ENABLED, LANGFUSE_TRACING_ENVIRONMENT, ) +from langfuse._client.propagation import ( + PropagatedExperimentAttributes, + _propagate_attributes, +) from langfuse._client.resource_manager import LangfuseResourceManager from langfuse._client.span import ( LangfuseAgent, @@ -70,7 +74,7 @@ LangfuseSpan, LangfuseTool, ) -from langfuse._client.utils import run_async_safely +from langfuse._client.utils import get_sha256_hash_hex, run_async_safely from langfuse._utils import _get_timestamp from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache @@ -1638,10 +1642,6 @@ def update_current_trace( ) -> None: """Update the current trace with additional information. - This method updates the Langfuse trace that the current span belongs to. It's useful for - adding trace-level metadata like user ID, session ID, or tags that apply to - the entire Langfuse trace rather than just a single observation. - Args: name: Updated name for the Langfuse trace user_id: ID of the user who initiated the Langfuse trace @@ -1653,25 +1653,8 @@ def update_current_trace( tags: List of tags to categorize the Langfuse trace public: Whether the Langfuse trace should be publicly accessible - Example: - ```python - with langfuse.start_as_current_span(name="handle-request") as span: - # Get user information - user = authenticate_user(request) - - # Update trace with user context - langfuse.update_current_trace( - user_id=user.id, - session_id=request.session_id, - tags=["production", "web-app"] - ) - - # Continue processing - response = process_request(request) - - # Update span with results - span.update(output=response) - ``` + See Also: + :func:`langfuse.propagate_attributes`: Recommended replacement """ if not self._tracing_enabled: langfuse_logger.debug( @@ -1817,7 +1800,7 @@ def _create_remote_parent_span( is_remote=False, ) - return trace.NonRecordingSpan(span_context) + return otel_trace_api.NonRecordingSpan(span_context) def _is_valid_trace_id(self, trace_id: str) -> bool: pattern = r"^[0-9a-f]{32}$" @@ -2477,7 +2460,7 @@ def run_experiment( evaluators: List[EvaluatorFunction] = [], run_evaluators: List[RunEvaluatorFunction] = [], max_concurrency: int = 50, - metadata: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, str]] = None, ) -> ExperimentResult: """Run an experiment on a dataset with automatic tracing and evaluation. @@ -2649,7 +2632,7 @@ def average_accuracy(*, item_results, **kwargs): evaluators=evaluators or [], run_evaluators=run_evaluators or [], max_concurrency=max_concurrency, - metadata=metadata or {}, + metadata=metadata, ), ), ) @@ -2665,7 +2648,7 @@ async def _run_experiment_async( evaluators: List[EvaluatorFunction], run_evaluators: List[RunEvaluatorFunction], max_concurrency: int, - metadata: Dict[str, Any], + metadata: Optional[Dict[str, Any]] = None, ) -> ExperimentResult: langfuse_logger.debug( f"Starting experiment '{name}' run '{run_name}' with {len(data)} items" @@ -2763,58 +2746,51 @@ async def _process_experiment_item( experiment_name: str, experiment_run_name: str, experiment_description: Optional[str], - experiment_metadata: Dict[str, Any], + experiment_metadata: Optional[Dict[str, Any]] = None, ) -> ExperimentItemResult: - # Execute task with tracing span_name = "experiment-item-run" with self.start_as_current_span(name=span_name) as span: try: - output = await _run_task(task, item) - input_data = ( item.get("input") if isinstance(item, dict) else getattr(item, "input", None) ) - item_metadata: Dict[str, Any] = {} + if input_data is None: + raise ValueError("Experiment Item is missing input. Skipping item.") + + expected_output = ( + item.get("expected_output") + if isinstance(item, dict) + else getattr(item, "expected_output", None) + ) - if isinstance(item, dict): - item_metadata = item.get("metadata", None) or {} + item_metadata = ( + item.get("metadata") + if isinstance(item, dict) + else getattr(item, "metadata", None) + ) - final_metadata = { + final_observation_metadata = { "experiment_name": experiment_name, "experiment_run_name": experiment_run_name, - **experiment_metadata, + **(experiment_metadata or {}), } - if ( - not isinstance(item, dict) - and hasattr(item, "dataset_id") - and hasattr(item, "id") - ): - final_metadata.update( - {"dataset_id": item.dataset_id, "dataset_item_id": item.id} - ) - - if isinstance(item_metadata, dict): - final_metadata.update(item_metadata) - - span.update( - input=input_data, - output=output, - metadata=final_metadata, - ) - - # Get trace ID for linking trace_id = span.trace_id + dataset_id = None + dataset_item_id = None dataset_run_id = None # Link to dataset run if this is a dataset item if hasattr(item, "id") and hasattr(item, "dataset_id"): try: - dataset_run_item = self.api.dataset_run_items.create( + # Use sync API to avoid event loop issues when run_async_safely + # creates multiple event loops across different threads + dataset_run_item = await asyncio.to_thread( + self.api.dataset_run_items.create, request=CreateDatasetRunItemRequest( runName=experiment_run_name, runDescription=experiment_description, @@ -2822,7 +2798,7 @@ async def _process_experiment_item( datasetItemId=item.id, # type: ignore traceId=trace_id, observationId=span.id, - ) + ), ) dataset_run_id = dataset_run_item.dataset_run_id @@ -2830,18 +2806,63 @@ async def _process_experiment_item( except Exception as e: langfuse_logger.error(f"Failed to create dataset run item: {e}") + if ( + not isinstance(item, dict) + and hasattr(item, "dataset_id") + and hasattr(item, "id") + ): + dataset_id = item.dataset_id + dataset_item_id = item.id + + final_observation_metadata.update( + {"dataset_id": dataset_id, "dataset_item_id": dataset_item_id} + ) + + if isinstance(item_metadata, dict): + final_observation_metadata.update(item_metadata) + + experiment_id = dataset_run_id or self._create_observation_id() + experiment_item_id = ( + dataset_item_id or get_sha256_hash_hex(_serialize(input_data))[:16] + ) + span._otel_span.set_attributes( + { + k: v + for k, v in { + LangfuseOtelSpanAttributes.ENVIRONMENT: LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION: experiment_description, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT: _serialize( + expected_output + ), + }.items() + if v is not None + } + ) + + with _propagate_attributes( + experiment=PropagatedExperimentAttributes( + experiment_id=experiment_id, + experiment_name=experiment_run_name, + experiment_metadata=_serialize(experiment_metadata), + experiment_dataset_id=dataset_id, + experiment_item_id=experiment_item_id, + experiment_item_metadata=_serialize(item_metadata), + experiment_item_root_observation_id=span.id, + ) + ): + output = await _run_task(task, item) + + span.update( + input=input_data, + output=output, + metadata=final_observation_metadata, + ) + # Run evaluators evaluations = [] for evaluator in evaluators: try: - expected_output = None - - if isinstance(item, dict): - expected_output = item.get("expected_output") - elif hasattr(item, "expected_output"): - expected_output = item.expected_output - eval_metadata: Optional[Dict[str, Any]] = None if isinstance(item, dict): @@ -2862,6 +2883,7 @@ async def _process_experiment_item( for evaluation in eval_results: self.create_score( trace_id=trace_id, + observation_id=span.id, name=evaluation.name, value=evaluation.value, # type: ignore comment=evaluation.comment, diff --git a/langfuse/_client/constants.py b/langfuse/_client/constants.py index b699480c0..c2d0aa7aa 100644 --- a/langfuse/_client/constants.py +++ b/langfuse/_client/constants.py @@ -3,11 +3,13 @@ This module defines constants used throughout the Langfuse OpenTelemetry integration. """ -from typing import Literal, List, get_args, Union, Any +from typing import Any, List, Literal, Union, get_args + from typing_extensions import TypeAlias LANGFUSE_TRACER_NAME = "langfuse-sdk" +LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT = "sdk-experiment" """Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat""" ObservationTypeGenerationLike: TypeAlias = Literal[ diff --git a/langfuse/_client/propagation.py b/langfuse/_client/propagation.py new file mode 100644 index 000000000..49d34a99f --- /dev/null +++ b/langfuse/_client/propagation.py @@ -0,0 +1,466 @@ +"""Attribute propagation utilities for Langfuse OpenTelemetry integration. + +This module provides the `propagate_attributes` context manager for setting trace-level +attributes (user_id, session_id, metadata) that automatically propagate to all child spans +within the context. +""" + +from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast + +from opentelemetry import baggage +from opentelemetry import ( + baggage as otel_baggage_api, +) +from opentelemetry import ( + context as otel_context_api, +) +from opentelemetry import ( + trace as otel_trace_api, +) +from opentelemetry.util._decorator import ( + _AgnosticContextManager, + _agnosticcontextmanager, +) + +from langfuse._client.attributes import LangfuseOtelSpanAttributes +from langfuse._client.constants import LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT +from langfuse.logger import langfuse_logger + +PropagatedKeys = Literal[ + "user_id", + "session_id", + "metadata", + "version", + "tags", +] + +InternalPropagatedKeys = Literal[ + "experiment_id", + "experiment_name", + "experiment_metadata", + "experiment_dataset_id", + "experiment_item_id", + "experiment_item_metadata", + "experiment_item_root_observation_id", +] + +propagated_keys: List[Union[PropagatedKeys, InternalPropagatedKeys]] = [ + "user_id", + "session_id", + "metadata", + "version", + "tags", + "experiment_id", + "experiment_name", + "experiment_metadata", + "experiment_dataset_id", + "experiment_item_id", + "experiment_item_metadata", + "experiment_item_root_observation_id", +] + + +class PropagatedExperimentAttributes(TypedDict): + experiment_id: str + experiment_name: str + experiment_metadata: Optional[str] + experiment_dataset_id: Optional[str] + experiment_item_id: str + experiment_item_metadata: Optional[str] + experiment_item_root_observation_id: str + + +def propagate_attributes( + *, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + version: Optional[str] = None, + tags: Optional[List[str]] = None, + as_baggage: bool = False, +) -> _AgnosticContextManager[Any]: + """Propagate trace-level attributes to all spans created within this context. + + This context manager sets attributes on the currently active span AND automatically + propagates them to all new child spans created within the context. This is the + recommended way to set trace-level attributes like user_id, session_id, and metadata + dimensions that should be consistently applied across all observations in a trace. + + **IMPORTANT**: Call this as early as possible within your trace/workflow. Only the + currently active span and spans created after entering this context will have these + attributes. Pre-existing spans will NOT be retroactively updated. + + **Why this matters**: Langfuse aggregation queries (e.g., total cost by user_id, + filtering by session_id) only include observations that have the attribute set. + If you call `propagate_attributes` late in your workflow, earlier spans won't be + included in aggregations for that attribute. + + Args: + user_id: User identifier to associate with all spans in this context. + Must be US-ASCII string, ≤200 characters. Use this to track which user + generated each trace and enable e.g. per-user cost/performance analysis. + session_id: Session identifier to associate with all spans in this context. + Must be US-ASCII string, ≤200 characters. Use this to group related traces + within a user session (e.g., a conversation thread, multi-turn interaction). + metadata: Additional key-value metadata to propagate to all spans. + - Keys and values must be US-ASCII strings + - All values must be ≤200 characters + - Use for dimensions like internal correlating identifiers + - AVOID: large payloads, sensitive data, non-string values (will be dropped with warning) + version: Version identfier for parts of your application that are independently versioned, e.g. agents + tags: List of tags to categorize the group of observations + as_baggage: If True, propagates attributes using OpenTelemetry baggage for + cross-process/service propagation. **Security warning**: When enabled, + attribute values are added to HTTP headers on ALL outbound requests. + Only enable if values are safe to transmit via HTTP headers and you need + cross-service tracing. Default: False. + + Returns: + Context manager that propagates attributes to all child spans. + + Example: + Basic usage with user and session tracking: + + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Set attributes early in the trace + with langfuse.start_as_current_span(name="user_workflow") as span: + with langfuse.propagate_attributes( + user_id="user_123", + session_id="session_abc", + metadata={"experiment": "variant_a", "environment": "production"} + ): + # All spans created here will have user_id, session_id, and metadata + with langfuse.start_span(name="llm_call") as llm_span: + # This span inherits: user_id, session_id, experiment, environment + ... + + with langfuse.start_generation(name="completion") as gen: + # This span also inherits all attributes + ... + ``` + + Late propagation (anti-pattern): + + ```python + with langfuse.start_as_current_span(name="workflow") as span: + # These spans WON'T have user_id + early_span = langfuse.start_span(name="early_work") + early_span.end() + + # Set attributes in the middle + with langfuse.propagate_attributes(user_id="user_123"): + # Only spans created AFTER this point will have user_id + late_span = langfuse.start_span(name="late_work") + late_span.end() + + # Result: Aggregations by user_id will miss "early_work" span + ``` + + Cross-service propagation with baggage (advanced): + + ```python + # Service A - originating service + with langfuse.start_as_current_span(name="api_request"): + with langfuse.propagate_attributes( + user_id="user_123", + session_id="session_abc", + as_baggage=True # Propagate via HTTP headers + ): + # Make HTTP request to Service B + response = requests.get("https://service-b.example.com/api") + # user_id and session_id are now in HTTP headers + + # Service B - downstream service + # OpenTelemetry will automatically extract baggage from HTTP headers + # and propagate to spans in Service B + ``` + + Note: + - **Validation**: All attribute values (user_id, session_id, metadata values) + must be strings ≤200 characters. Invalid values will be dropped with a + warning logged. Ensure values meet constraints before calling. + - **OpenTelemetry**: This uses OpenTelemetry context propagation under the hood, + making it compatible with other OTel-instrumented libraries. + + Raises: + No exceptions are raised. Invalid values are logged as warnings and dropped. + """ + return _propagate_attributes( + user_id=user_id, + session_id=session_id, + metadata=metadata, + version=version, + tags=tags, + as_baggage=as_baggage, + ) + + +@_agnosticcontextmanager +def _propagate_attributes( + *, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + version: Optional[str] = None, + tags: Optional[List[str]] = None, + as_baggage: bool = False, + experiment: Optional[PropagatedExperimentAttributes] = None, +) -> Generator[Any, Any, Any]: + context = otel_context_api.get_current() + current_span = otel_trace_api.get_current_span() + + propagated_string_attributes: Dict[str, Optional[Union[str, List[str]]]] = { + "user_id": user_id, + "session_id": session_id, + "version": version, + "tags": tags, + } + + propagated_string_attributes = propagated_string_attributes | ( + cast(Dict[str, Union[str, List[str], None]], experiment) or {} + ) + + # Filter out None values + propagated_string_attributes = { + k: v for k, v in propagated_string_attributes.items() if v is not None + } + + for key, value in propagated_string_attributes.items(): + validated_value = _validate_propagated_value(value=value, key=key) + + if validated_value is not None: + context = _set_propagated_attribute( + key=key, + value=validated_value, + context=context, + span=current_span, + as_baggage=as_baggage, + ) + + if metadata is not None: + validated_metadata: Dict[str, str] = {} + + for key, value in metadata.items(): + if _validate_string_value(value=value, key=f"metadata.{key}"): + validated_metadata[key] = value + + if validated_metadata: + context = _set_propagated_attribute( + key="metadata", + value=validated_metadata, + context=context, + span=current_span, + as_baggage=as_baggage, + ) + + # Activate context, execute, and detach context + token = otel_context_api.attach(context=context) + + try: + yield + + finally: + otel_context_api.detach(token) + + +def _get_propagated_attributes_from_context( + context: otel_context_api.Context, +) -> Dict[str, Union[str, List[str]]]: + propagated_attributes: Dict[str, Union[str, List[str]]] = {} + + # Handle baggage + baggage_entries = baggage.get_all(context=context) + for baggage_key, baggage_value in baggage_entries.items(): + if baggage_key.startswith(LANGFUSE_BAGGAGE_PREFIX): + span_key = _get_span_key_from_baggage_key(baggage_key) + + if span_key: + propagated_attributes[span_key] = ( + baggage_value + if isinstance(baggage_value, (str, list)) + else str(baggage_value) + ) + + # Handle OTEL context + for key in propagated_keys: + context_key = _get_propagated_context_key(key) + value = otel_context_api.get_value(key=context_key, context=context) + + if value is None: + continue + + if isinstance(value, dict): + # Handle metadata + for k, v in value.items(): + span_key = f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}" + propagated_attributes[span_key] = v + + else: + span_key = _get_propagated_span_key(key) + + propagated_attributes[span_key] = ( + value if isinstance(value, (str, list)) else str(value) + ) + + if ( + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID + in propagated_attributes + ): + propagated_attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = ( + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT + ) + + return propagated_attributes + + +def _set_propagated_attribute( + *, + key: str, + value: Union[str, List[str], Dict[str, str]], + context: otel_context_api.Context, + span: otel_trace_api.Span, + as_baggage: bool, +) -> otel_context_api.Context: + # Get key names + context_key = _get_propagated_context_key(key) + span_key = _get_propagated_span_key(key) + baggage_key = _get_propagated_baggage_key(key) + + # Merge metadata with previously set metadata keys + if isinstance(value, dict): + existing_metadata_in_context = cast( + dict, otel_context_api.get_value(context_key) or {} + ) + value = existing_metadata_in_context | value + + # Merge tags with previously set tags + if isinstance(value, list): + existing_tags_in_context = cast( + list, otel_context_api.get_value(context_key) or [] + ) + merged_tags = list(existing_tags_in_context) + merged_tags.extend(tag for tag in value if tag not in existing_tags_in_context) + + value = merged_tags + + # Set in context + context = otel_context_api.set_value( + key=context_key, + value=value, + context=context, + ) + + # Set on current span + if span is not None and span.is_recording(): + if isinstance(value, dict): + # Handle metadata + for k, v in value.items(): + span.set_attribute( + key=f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}", + value=v, + ) + + else: + span.set_attribute(key=span_key, value=value) + + # Set on baggage + if as_baggage: + if isinstance(value, dict): + # Handle metadata + for k, v in value.items(): + context = otel_baggage_api.set_baggage( + name=f"{baggage_key}_{k}", value=v, context=context + ) + else: + context = otel_baggage_api.set_baggage( + name=baggage_key, value=value, context=context + ) + + return context + + +def _validate_propagated_value( + *, value: Any, key: str +) -> Optional[Union[str, List[str]]]: + if isinstance(value, list): + validated_values = [ + v for v in value if _validate_string_value(key=key, value=v) + ] + + return validated_values if len(validated_values) > 0 else None + + if not isinstance(value, str): + langfuse_logger.warning( # type: ignore + f"Propagated attribute '{key}' value is not a string. Dropping value." + ) + return None + + if len(value) > 200: + langfuse_logger.warning( + f"Propagated attribute '{key}' value is over 200 characters ({len(value)} chars). Dropping value." + ) + return None + + return value + + +def _validate_string_value(*, value: str, key: str) -> bool: + if not isinstance(value, str): + langfuse_logger.warning( # type: ignore + f"Propagated attribute '{key}' value is not a string. Dropping value." + ) + return False + + if len(value) > 200: + langfuse_logger.warning( + f"Propagated attribute '{key}' value is over 200 characters ({len(value)} chars). Dropping value." + ) + return False + + return True + + +def _get_propagated_context_key(key: str) -> str: + return f"langfuse.propagated.{key}" + + +LANGFUSE_BAGGAGE_PREFIX = "langfuse_" + + +def _get_propagated_baggage_key(key: str) -> str: + return f"{LANGFUSE_BAGGAGE_PREFIX}{key}" + + +def _get_span_key_from_baggage_key(key: str) -> Optional[str]: + if not key.startswith(LANGFUSE_BAGGAGE_PREFIX): + return None + + # Remove prefix to get the actual key name + suffix = key[len(LANGFUSE_BAGGAGE_PREFIX) :] + + if suffix.startswith("metadata_"): + metadata_key = suffix[len("metadata_") :] + + return _get_propagated_span_key(metadata_key) + + return _get_propagated_span_key(suffix) + + +def _get_propagated_span_key(key: str) -> str: + return { + "session_id": LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "user_id": LangfuseOtelSpanAttributes.TRACE_USER_ID, + "version": LangfuseOtelSpanAttributes.VERSION, + "tags": LangfuseOtelSpanAttributes.TRACE_TAGS, + "experiment_id": LangfuseOtelSpanAttributes.EXPERIMENT_ID, + "experiment_name": LangfuseOtelSpanAttributes.EXPERIMENT_NAME, + "experiment_metadata": LangfuseOtelSpanAttributes.EXPERIMENT_METADATA, + "experiment_dataset_id": LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID, + "experiment_item_id": LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID, + "experiment_item_metadata": LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_METADATA, + "experiment_item_root_observation_id": LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID, + }.get(key) or f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{key}" diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 866022a6e..72ebb6bee 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -13,9 +13,9 @@ and scoring integration specific to Langfuse's observability platform. """ +import warnings from datetime import datetime from time import time_ns -import warnings from typing import ( TYPE_CHECKING, Any, @@ -30,8 +30,8 @@ ) from opentelemetry import trace as otel_trace_api -from opentelemetry.util._decorator import _AgnosticContextManager from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util._decorator import _AgnosticContextManager from langfuse.model import PromptClient @@ -45,10 +45,10 @@ create_trace_attributes, ) from langfuse._client.constants import ( - ObservationTypeLiteral, ObservationTypeGenerationLike, - ObservationTypeSpanLike, + ObservationTypeLiteral, ObservationTypeLiteralNoEvent, + ObservationTypeSpanLike, get_observation_types_list, ) from langfuse.logger import langfuse_logger @@ -223,10 +223,6 @@ def update_trace( ) -> "LangfuseObservationWrapper": """Update the trace that this span belongs to. - This method updates trace-level attributes of the trace that this span - belongs to. This is useful for adding or modifying trace-wide information - like user ID, session ID, or tags. - Args: name: Updated name for the trace user_id: ID of the user who initiated the trace @@ -237,6 +233,9 @@ def update_trace( metadata: Additional metadata to associate with the trace tags: List of tags to categorize the trace public: Whether the trace should be publicly accessible + + See Also: + :func:`langfuse.propagate_attributes`: Recommended replacement """ if not self._otel_span.is_recording(): return self diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 5a2d251c0..a7d9fd2f4 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -15,9 +15,12 @@ import os from typing import Dict, List, Optional +from opentelemetry import context as context_api +from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import format_span_id from langfuse._client.constants import LANGFUSE_TRACER_NAME from langfuse._client.environment_variables import ( @@ -25,6 +28,7 @@ LANGFUSE_FLUSH_INTERVAL, LANGFUSE_OTEL_TRACES_EXPORT_PATH, ) +from langfuse._client.propagation import _get_propagated_attributes_from_context from langfuse._client.utils import span_formatter from langfuse.logger import langfuse_logger from langfuse.version import __version__ as langfuse_version @@ -114,6 +118,19 @@ def __init__( else None, ) + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: + context = parent_context or context_api.get_current() + propagated_attributes = _get_propagated_attributes_from_context(context) + + if propagated_attributes: + span.set_attributes(propagated_attributes) + + langfuse_logger.debug( + f"Propagated {len(propagated_attributes)} attributes to span '{format_span_id(span.context.span_id)}': {propagated_attributes}" + ) + + return super().on_start(span, parent_context) + def on_end(self, span: ReadableSpan) -> None: # Only export spans that belong to the scoped project # This is important to not send spans to wrong project in multi-project setups diff --git a/langfuse/_client/utils.py b/langfuse/_client/utils.py index d34857ebd..16d963d88 100644 --- a/langfuse/_client/utils.py +++ b/langfuse/_client/utils.py @@ -7,6 +7,7 @@ import asyncio import json import threading +from hashlib import sha256 from typing import Any, Coroutine from opentelemetry import trace as otel_trace_api @@ -125,3 +126,7 @@ async def my_async_function(): else: # Loop exists but not running, safe to use asyncio.run() return asyncio.run(coro) + + +def get_sha256_hash_hex(value: Any) -> str: + return sha256(value.encode("utf-8")).digest().hex() diff --git a/poetry.lock b/poetry.lock index 51f812200..378d67caf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1057,6 +1057,39 @@ opentelemetry-sdk = ">=1.38.0,<1.39.0" requests = ">=2.7,<3.0" typing-extensions = ">=4.5.0" +[[package]] +name = "opentelemetry-instrumentation" +version = "0.59b0" +description = "Instrumentation Tools & Auto Instrumentation for OpenTelemetry Python" +optional = false +python-versions = ">=3.9" +files = [ + {file = "opentelemetry_instrumentation-0.59b0-py3-none-any.whl", hash = "sha256:44082cc8fe56b0186e87ee8f7c17c327c4c2ce93bdbe86496e600985d74368ee"}, + {file = "opentelemetry_instrumentation-0.59b0.tar.gz", hash = "sha256:6010f0faaacdaf7c4dff8aac84e226d23437b331dcda7e70367f6d73a7db1adc"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.4,<2.0" +opentelemetry-semantic-conventions = "0.59b0" +packaging = ">=18.0" +wrapt = ">=1.0.0,<2.0.0" + +[[package]] +name = "opentelemetry-instrumentation-threading" +version = "0.59b0" +description = "Thread context propagation support for OpenTelemetry" +optional = false +python-versions = ">=3.9" +files = [ + {file = "opentelemetry_instrumentation_threading-0.59b0-py3-none-any.whl", hash = "sha256:76da2fc01fe1dccebff6581080cff9e42ac7b27cc61eb563f3c4435c727e8eca"}, + {file = "opentelemetry_instrumentation_threading-0.59b0.tar.gz", hash = "sha256:ce5658730b697dcbc0e0d6d13643a69fd8aeb1b32fa8db3bade8ce114c7975f3"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.12,<2.0" +opentelemetry-instrumentation = "0.59b0" +wrapt = ">=1.0.0,<2.0.0" + [[package]] name = "opentelemetry-proto" version = "1.38.0" @@ -2755,4 +2788,4 @@ cffi = ["cffi (>=1.17,<2.0)", "cffi (>=2.0.0b)"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<4.0" -content-hash = "cfda3b10ea654d3aa01a0d4292631380b85dcaa982e726b6e6f575f15d9a22da" +content-hash = "bb4ec20d58e29f5d71599357de616571eeae016818d5c246774f0c5bc01d3d0e" diff --git a/pyproject.toml b/pyproject.toml index 0725bc045..fa262952f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ langchain-openai = ">=0.0.5,<0.4" langchain = ">=1" langgraph = ">=1" autoevals = "^0.0.130" +opentelemetry-instrumentation-threading = "^0.59b0" [tool.poetry.group.docs.dependencies] pdoc = "^15.0.4" diff --git a/tests/test_experiments.py b/tests/test_experiments.py index db3d74a65..71f2e5926 100644 --- a/tests/test_experiments.py +++ b/tests/test_experiments.py @@ -399,7 +399,7 @@ def test_dataset_with_missing_fields(): ) # Should handle missing fields gracefully - assert len(result.item_results) == 3 + assert len(result.item_results) == 2 for item_result in result.item_results: assert hasattr(item_result, "trace_id") assert hasattr(item_result, "output") diff --git a/tests/test_propagate_attributes.py b/tests/test_propagate_attributes.py new file mode 100644 index 000000000..16a960c1f --- /dev/null +++ b/tests/test_propagate_attributes.py @@ -0,0 +1,2771 @@ +"""Comprehensive tests for propagate_attributes functionality. + +This module tests the propagate_attributes context manager that allows setting +trace-level attributes (user_id, session_id, metadata) that automatically propagate +to all child spans within the context. +""" + +import concurrent.futures +import time + +import pytest +from opentelemetry.instrumentation.threading import ThreadingInstrumentor + +from langfuse import propagate_attributes +from langfuse._client.attributes import LangfuseOtelSpanAttributes, _serialize +from langfuse._client.constants import LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT +from tests.test_otel import TestOTelBase + + +class TestPropagateAttributesBase(TestOTelBase): + """Base class for propagate_attributes tests with shared helper methods.""" + + @pytest.fixture + def langfuse_client(self, monkeypatch, tracer_provider, mock_processor_init): + """Create a mocked Langfuse client with explicit tracer_provider for testing.""" + from langfuse import Langfuse + + # Set environment variables + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "test-public-key") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "test-secret-key") + + # Create test client with explicit tracer_provider + client = Langfuse( + public_key="test-public-key", + secret_key="test-secret-key", + host="http://test-host", + tracing_enabled=True, + tracer_provider=tracer_provider, # Pass the test provider explicitly + ) + + yield client + + def get_span_by_name(self, memory_exporter, name: str) -> dict: + """Get single span by name (assert exactly one exists). + + Args: + memory_exporter: The in-memory span exporter fixture + name: The name of the span to retrieve + + Returns: + dict: The span data as a dictionary + + Raises: + AssertionError: If zero or more than one span with the name exists + """ + spans = self.get_spans_by_name(memory_exporter, name) + assert len(spans) == 1, f"Expected 1 span named '{name}', found {len(spans)}" + return spans[0] + + def verify_missing_attribute(self, span_data: dict, attr_key: str): + """Verify that a span does NOT have a specific attribute. + + Args: + span_data: The span data dictionary + attr_key: The attribute key to check for absence + + Raises: + AssertionError: If the attribute exists on the span + """ + attributes = span_data["attributes"] + assert ( + attr_key not in attributes + ), f"Attribute '{attr_key}' should NOT be on span '{span_data['name']}'" + + +class TestPropagateAttributesBasic(TestPropagateAttributesBase): + """Tests for basic propagate_attributes functionality.""" + + def test_user_id_propagates_to_child_spans(self, langfuse_client, memory_exporter): + """Verify user_id propagates to all child spans within context.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="test_user_123"): + child1 = langfuse_client.start_span(name="child-span-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-span-2") + child2.end() + + # Verify both children have user_id + child1_span = self.get_span_by_name(memory_exporter, "child-span-1") + self.verify_span_attribute( + child1_span, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "test_user_123", + ) + + child2_span = self.get_span_by_name(memory_exporter, "child-span-2") + self.verify_span_attribute( + child2_span, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "test_user_123", + ) + + def test_session_id_propagates_to_child_spans( + self, langfuse_client, memory_exporter + ): + """Verify session_id propagates to all child spans within context.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(session_id="session_abc"): + child1 = langfuse_client.start_span(name="child-span-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-span-2") + child2.end() + + # Verify both children have session_id + child1_span = self.get_span_by_name(memory_exporter, "child-span-1") + self.verify_span_attribute( + child1_span, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "session_abc", + ) + + child2_span = self.get_span_by_name(memory_exporter, "child-span-2") + self.verify_span_attribute( + child2_span, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "session_abc", + ) + + def test_metadata_propagates_to_child_spans(self, langfuse_client, memory_exporter): + """Verify metadata propagates to all child spans within context.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + metadata={"experiment": "variant_a", "version": "1.0"} + ): + child1 = langfuse_client.start_span(name="child-span-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-span-2") + child2.end() + + # Verify both children have metadata + child1_span = self.get_span_by_name(memory_exporter, "child-span-1") + self.verify_span_attribute( + child1_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment", + "variant_a", + ) + self.verify_span_attribute( + child1_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.version", + "1.0", + ) + + child2_span = self.get_span_by_name(memory_exporter, "child-span-2") + self.verify_span_attribute( + child2_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment", + "variant_a", + ) + self.verify_span_attribute( + child2_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.version", + "1.0", + ) + + def test_all_attributes_propagate_together(self, langfuse_client, memory_exporter): + """Verify user_id, session_id, and metadata all propagate together.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + user_id="user_123", + session_id="session_abc", + metadata={"experiment": "test", "env": "prod"}, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child has all attributes + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "session_abc" + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment", + "test", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "prod", + ) + + +class TestPropagateAttributesHierarchy(TestPropagateAttributesBase): + """Tests for propagation across span hierarchies.""" + + def test_propagation_to_direct_children(self, langfuse_client, memory_exporter): + """Verify attributes propagate to all direct children.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="user_123"): + child1 = langfuse_client.start_span(name="child-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-2") + child2.end() + + child3 = langfuse_client.start_span(name="child-3") + child3.end() + + # Verify all three children have user_id + for i in range(1, 4): + child_span = self.get_span_by_name(memory_exporter, f"child-{i}") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + def test_propagation_to_grandchildren(self, langfuse_client, memory_exporter): + """Verify attributes propagate through multiple levels of nesting.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="user_123", session_id="session_abc"): + with langfuse_client.start_as_current_span(name="child-span"): + grandchild = langfuse_client.start_span(name="grandchild-span") + grandchild.end() + + # Verify all three levels have attributes + parent_span = self.get_span_by_name(memory_exporter, "parent-span") + child_span = self.get_span_by_name(memory_exporter, "child-span") + grandchild_span = self.get_span_by_name(memory_exporter, "grandchild-span") + + for span in [parent_span, child_span, grandchild_span]: + self.verify_span_attribute( + span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + self.verify_span_attribute( + span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "session_abc" + ) + + def test_propagation_across_observation_types( + self, langfuse_client, memory_exporter + ): + """Verify attributes propagate to different observation types.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="user_123"): + # Create span + span = langfuse_client.start_span(name="test-span") + span.end() + + # Create generation + generation = langfuse_client.start_observation( + as_type="generation", name="test-generation" + ) + generation.end() + + # Verify both observation types have user_id + span_data = self.get_span_by_name(memory_exporter, "test-span") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + generation_data = self.get_span_by_name(memory_exporter, "test-generation") + self.verify_span_attribute( + generation_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + +class TestPropagateAttributesTiming(TestPropagateAttributesBase): + """Critical tests for early vs late propagation timing.""" + + def test_early_propagation_all_spans_covered( + self, langfuse_client, memory_exporter + ): + """Verify setting attributes early covers all child spans.""" + with langfuse_client.start_as_current_span(name="parent-span"): + # Set attributes BEFORE creating any children + with propagate_attributes(user_id="user_123"): + child1 = langfuse_client.start_span(name="child-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-2") + child2.end() + + child3 = langfuse_client.start_span(name="child-3") + child3.end() + + # Verify ALL children have user_id + for i in range(1, 4): + child_span = self.get_span_by_name(memory_exporter, f"child-{i}") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + def test_late_propagation_only_future_spans_covered( + self, langfuse_client, memory_exporter + ): + """Verify late propagation only affects spans created after context entry.""" + with langfuse_client.start_as_current_span(name="parent-span"): + # Create child1 BEFORE propagate_attributes + child1 = langfuse_client.start_span(name="child-1") + child1.end() + + # NOW set attributes + with propagate_attributes(user_id="user_123"): + # Create child2 AFTER propagate_attributes + child2 = langfuse_client.start_span(name="child-2") + child2.end() + + # Verify: child1 does NOT have user_id, child2 DOES + child1_span = self.get_span_by_name(memory_exporter, "child-1") + self.verify_missing_attribute( + child1_span, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + child2_span = self.get_span_by_name(memory_exporter, "child-2") + self.verify_span_attribute( + child2_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + def test_current_span_gets_attributes(self, langfuse_client, memory_exporter): + """Verify the currently active span gets attributes when propagate_attributes is called.""" + with langfuse_client.start_as_current_span(name="parent-span"): + # Call propagate_attributes while parent-span is active + with propagate_attributes(user_id="user_123"): + pass + + # Verify parent span itself has the attribute + parent_span = self.get_span_by_name(memory_exporter, "parent-span") + self.verify_span_attribute( + parent_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + def test_spans_outside_context_unaffected(self, langfuse_client, memory_exporter): + """Verify spans created outside context don't get attributes.""" + with langfuse_client.start_as_current_span(name="parent-span"): + # Span before context + span1 = langfuse_client.start_span(name="span-1") + span1.end() + + # Span inside context + with propagate_attributes(user_id="user_123"): + span2 = langfuse_client.start_span(name="span-2") + span2.end() + + # Span after context + span3 = langfuse_client.start_span(name="span-3") + span3.end() + + # Verify: only span2 has user_id + span1_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_missing_attribute( + span1_data, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + span2_data = self.get_span_by_name(memory_exporter, "span-2") + self.verify_span_attribute( + span2_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + span3_data = self.get_span_by_name(memory_exporter, "span-3") + self.verify_missing_attribute( + span3_data, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + +class TestPropagateAttributesValidation(TestPropagateAttributesBase): + """Tests for validation of propagated attribute values.""" + + def test_user_id_over_200_chars_dropped(self, langfuse_client, memory_exporter): + """Verify user_id over 200 characters is dropped with warning.""" + long_user_id = "x" * 201 + + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id=long_user_id): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child does NOT have user_id + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + def test_session_id_over_200_chars_dropped(self, langfuse_client, memory_exporter): + """Verify session_id over 200 characters is dropped with warning.""" + long_session_id = "y" * 201 + + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(session_id=long_session_id): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child does NOT have session_id + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID + ) + + def test_metadata_value_over_200_chars_dropped( + self, langfuse_client, memory_exporter + ): + """Verify metadata values over 200 characters are dropped with warning.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(metadata={"key": "z" * 201}): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child does NOT have metadata.key + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.key" + ) + + def test_exactly_200_chars_accepted(self, langfuse_client, memory_exporter): + """Verify exactly 200 characters is accepted (boundary test).""" + user_id_200 = "x" * 200 + + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id=user_id_200): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child HAS user_id + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, user_id_200 + ) + + def test_201_chars_rejected(self, langfuse_client, memory_exporter): + """Verify 201 characters is rejected (boundary test).""" + user_id_201 = "x" * 201 + + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id=user_id_201): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child does NOT have user_id + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + def test_non_string_user_id_dropped(self, langfuse_client, memory_exporter): + """Verify non-string user_id is dropped with warning.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id=12345): # type: ignore + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child does NOT have user_id + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + def test_mixed_valid_invalid_metadata(self, langfuse_client, memory_exporter): + """Verify mixed valid/invalid metadata - valid entries kept, invalid dropped.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + metadata={ + "valid_key": "valid_value", + "invalid_key": "x" * 201, # Too long + "another_valid": "ok", + } + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify: valid keys present, invalid key absent + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.valid_key", + "valid_value", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.another_valid", + "ok", + ) + self.verify_missing_attribute( + child_span, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.invalid_key" + ) + + +class TestPropagateAttributesNesting(TestPropagateAttributesBase): + """Tests for nested propagate_attributes contexts.""" + + def test_nested_contexts_inner_overwrites(self, langfuse_client, memory_exporter): + """Verify inner context overwrites outer context values.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="user1"): + # Create span in outer context + span1 = langfuse_client.start_span(name="span-1") + span1.end() + + # Inner context with different user_id + with propagate_attributes(user_id="user2"): + span2 = langfuse_client.start_span(name="span-2") + span2.end() + + # Verify: span1 has user1, span2 has user2 + span1_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span1_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user1" + ) + + span2_data = self.get_span_by_name(memory_exporter, "span-2") + self.verify_span_attribute( + span2_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user2" + ) + + def test_after_inner_context_outer_restored(self, langfuse_client, memory_exporter): + """Verify outer context is restored after exiting inner context.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="user1"): + # Span in outer context + span1 = langfuse_client.start_span(name="span-1") + span1.end() + + # Inner context + with propagate_attributes(user_id="user2"): + span2 = langfuse_client.start_span(name="span-2") + span2.end() + + # Back to outer context + span3 = langfuse_client.start_span(name="span-3") + span3.end() + + # Verify: span1 and span3 have user1, span2 has user2 + span1_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span1_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user1" + ) + + span2_data = self.get_span_by_name(memory_exporter, "span-2") + self.verify_span_attribute( + span2_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user2" + ) + + span3_data = self.get_span_by_name(memory_exporter, "span-3") + self.verify_span_attribute( + span3_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user1" + ) + + def test_nested_different_attributes(self, langfuse_client, memory_exporter): + """Verify nested contexts with different attributes merge correctly.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="user1"): + # Inner context adds session_id + with propagate_attributes(session_id="session1"): + span = langfuse_client.start_span(name="span-1") + span.end() + + # Verify: span has BOTH user_id and session_id + span_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user1" + ) + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "session1" + ) + + def test_nested_metadata_merges_additively(self, langfuse_client, memory_exporter): + """Verify nested contexts merge metadata keys additively.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(metadata={"env": "prod", "region": "us-east"}): + # Outer span should have outer metadata + outer_span = langfuse_client.start_span(name="outer-span") + outer_span.end() + + # Inner context adds more metadata + with propagate_attributes( + metadata={"experiment": "A", "version": "2.0"} + ): + inner_span = langfuse_client.start_span(name="inner-span") + inner_span.end() + + # Back to outer context + after_span = langfuse_client.start_span(name="after-span") + after_span.end() + + # Verify: outer span has only outer metadata + outer_span_data = self.get_span_by_name(memory_exporter, "outer-span") + self.verify_span_attribute( + outer_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "prod", + ) + self.verify_span_attribute( + outer_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.region", + "us-east", + ) + self.verify_missing_attribute( + outer_span_data, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment" + ) + + # Verify: inner span has ALL metadata (merged) + inner_span_data = self.get_span_by_name(memory_exporter, "inner-span") + self.verify_span_attribute( + inner_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "prod", + ) + self.verify_span_attribute( + inner_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.region", + "us-east", + ) + self.verify_span_attribute( + inner_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment", + "A", + ) + self.verify_span_attribute( + inner_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.version", + "2.0", + ) + + # Verify: after span has only outer metadata (inner context exited) + after_span_data = self.get_span_by_name(memory_exporter, "after-span") + self.verify_span_attribute( + after_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "prod", + ) + self.verify_span_attribute( + after_span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.region", + "us-east", + ) + self.verify_missing_attribute( + after_span_data, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment" + ) + + def test_nested_metadata_inner_overwrites_conflicting_keys( + self, langfuse_client, memory_exporter + ): + """Verify nested contexts: inner metadata overwrites outer for same keys.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + metadata={"env": "staging", "version": "1.0", "region": "us-west"} + ): + # Inner context overwrites some keys + with propagate_attributes( + metadata={"env": "production", "experiment": "B"} + ): + span = langfuse_client.start_span(name="span-1") + span.end() + + # Verify: inner values overwrite outer for conflicting keys + span_data = self.get_span_by_name(memory_exporter, "span-1") + + # Overwritten key + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "production", # Inner value wins + ) + + # Preserved keys from outer + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.version", + "1.0", # From outer + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.region", + "us-west", # From outer + ) + + # New key from inner + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment", + "B", # From inner + ) + + def test_triple_nested_metadata_accumulates(self, langfuse_client, memory_exporter): + """Verify metadata accumulates across three levels of nesting.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(metadata={"level": "1", "a": "outer"}): + with propagate_attributes(metadata={"level": "2", "b": "middle"}): + with propagate_attributes(metadata={"level": "3", "c": "inner"}): + span = langfuse_client.start_span(name="deep-span") + span.end() + + # Verify: deepest span has all metadata with innermost level winning + span_data = self.get_span_by_name(memory_exporter, "deep-span") + + # Conflicting key: innermost wins + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.level", + "3", + ) + + # Unique keys from each level + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.a", + "outer", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.b", + "middle", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.c", + "inner", + ) + + def test_metadata_merge_with_empty_inner(self, langfuse_client, memory_exporter): + """Verify empty inner metadata dict doesn't clear outer metadata.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(metadata={"key1": "value1", "key2": "value2"}): + # Inner context with empty metadata + with propagate_attributes(metadata={}): + span = langfuse_client.start_span(name="span-1") + span.end() + + # Verify: outer metadata is preserved + span_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.key1", + "value1", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.key2", + "value2", + ) + + def test_metadata_merge_preserves_user_session( + self, langfuse_client, memory_exporter + ): + """Verify metadata merging doesn't affect user_id/session_id.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + user_id="user1", + session_id="session1", + metadata={"outer": "value"}, + ): + with propagate_attributes(metadata={"inner": "value"}): + span = langfuse_client.start_span(name="span-1") + span.end() + + # Verify: user_id and session_id are preserved, metadata merged + span_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user1" + ) + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "session1" + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.outer", + "value", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.inner", + "value", + ) + + +class TestPropagateAttributesEdgeCases(TestPropagateAttributesBase): + """Tests for edge cases and unusual scenarios.""" + + def test_propagate_attributes_with_no_args(self, langfuse_client, memory_exporter): + """Verify calling propagate_attributes() with no args doesn't error.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Should not crash, spans created normally + child_span = self.get_span_by_name(memory_exporter, "child-span") + assert child_span is not None + + def test_none_values_ignored(self, langfuse_client, memory_exporter): + """Verify None values are ignored without error.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id=None, session_id=None, metadata=None): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Should not crash, no attributes set + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID + ) + + def test_empty_metadata_dict(self, langfuse_client, memory_exporter): + """Verify empty metadata dict doesn't cause errors.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(metadata={}): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Should not crash, no metadata attributes set + child_span = self.get_span_by_name(memory_exporter, "child-span") + assert child_span is not None + + def test_all_invalid_metadata_values(self, langfuse_client, memory_exporter): + """Verify all invalid metadata values results in no metadata attributes.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + metadata={ + "key1": "x" * 201, # Too long + "key2": "y" * 201, # Too long + } + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + # No metadata attributes should be set + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute( + child_span, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.key1" + ) + self.verify_missing_attribute( + child_span, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.key2" + ) + + def test_propagate_with_no_active_span(self, langfuse_client, memory_exporter): + """Verify propagate_attributes works even with no active span.""" + # Call propagate_attributes without creating a parent span first + with propagate_attributes(user_id="user_123"): + # Now create a span + with langfuse_client.start_as_current_span(name="span-1"): + pass + + # Should not crash, span should have user_id + span_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + +class TestPropagateAttributesFormat(TestPropagateAttributesBase): + """Tests for correct attribute formatting and naming.""" + + def test_user_id_uses_correct_attribute_name( + self, langfuse_client, memory_exporter + ): + """Verify user_id uses the correct OTel attribute name.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(user_id="user_123"): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + # Verify the exact attribute key is used + assert LangfuseOtelSpanAttributes.TRACE_USER_ID in child_span["attributes"] + assert ( + child_span["attributes"][LangfuseOtelSpanAttributes.TRACE_USER_ID] + == "user_123" + ) + + def test_session_id_uses_correct_attribute_name( + self, langfuse_client, memory_exporter + ): + """Verify session_id uses the correct OTel attribute name.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(session_id="session_abc"): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + # Verify the exact attribute key is used + assert LangfuseOtelSpanAttributes.TRACE_SESSION_ID in child_span["attributes"] + assert ( + child_span["attributes"][LangfuseOtelSpanAttributes.TRACE_SESSION_ID] + == "session_abc" + ) + + def test_metadata_keys_properly_prefixed(self, langfuse_client, memory_exporter): + """Verify metadata keys are properly prefixed with TRACE_METADATA.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + metadata={"experiment": "A", "version": "1.0", "env": "prod"} + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + attributes = child_span["attributes"] + + # Verify each metadata key is properly prefixed + expected_keys = [ + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment", + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.version", + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + ] + + for key in expected_keys: + assert key in attributes, f"Expected key '{key}' not found in attributes" + + def test_multiple_metadata_keys_independent(self, langfuse_client, memory_exporter): + """Verify multiple metadata keys are stored as independent attributes.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(metadata={"k1": "v1", "k2": "v2", "k3": "v3"}): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + attributes = child_span["attributes"] + + # Verify all three are separate attributes with correct values + assert attributes[f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.k1"] == "v1" + assert attributes[f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.k2"] == "v2" + assert attributes[f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.k3"] == "v3" + + +class TestPropagateAttributesThreading(TestPropagateAttributesBase): + """Tests for propagate_attributes with ThreadPoolExecutor.""" + + @pytest.fixture(autouse=True) + def instrument_threading(self): + """Auto-instrument threading for all tests in this class.""" + instrumentor = ThreadingInstrumentor() + instrumentor.instrument() + yield + instrumentor.uninstrument() + + def test_propagation_with_threadpoolexecutor( + self, langfuse_client, memory_exporter + ): + """Verify attributes propagate from main thread to worker threads.""" + + def worker_function(span_name: str): + """Worker creates a span in thread pool.""" + span = langfuse_client.start_span(name=span_name) + span.end() + return span_name + + with langfuse_client.start_as_current_span(name="main-span"): + with propagate_attributes(user_id="main_user", session_id="main_session"): + # Execute work in thread pool + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = [ + executor.submit(worker_function, f"worker-span-{i}") + for i in range(3) + ] + concurrent.futures.wait(futures) + + # Verify all worker spans have propagated attributes + for i in range(3): + worker_span = self.get_span_by_name(memory_exporter, f"worker-span-{i}") + self.verify_span_attribute( + worker_span, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "main_user", + ) + self.verify_span_attribute( + worker_span, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "main_session", + ) + + def test_propagation_isolated_between_threads( + self, langfuse_client, memory_exporter + ): + """Verify each thread's context is isolated from others.""" + + def create_trace_with_user(user_id: str): + """Create a trace with specific user_id.""" + with langfuse_client.start_as_current_span(name=f"trace-{user_id}"): + with propagate_attributes(user_id=user_id): + span = langfuse_client.start_span(name=f"span-{user_id}") + span.end() + + # Run two traces concurrently with different user_ids + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future1 = executor.submit(create_trace_with_user, "user1") + future2 = executor.submit(create_trace_with_user, "user2") + concurrent.futures.wait([future1, future2]) + + # Verify each trace has the correct user_id (no mixing) + span1 = self.get_span_by_name(memory_exporter, "span-user1") + self.verify_span_attribute( + span1, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user1" + ) + + span2 = self.get_span_by_name(memory_exporter, "span-user2") + self.verify_span_attribute( + span2, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user2" + ) + + def test_nested_propagation_across_thread_boundary( + self, langfuse_client, memory_exporter + ): + """Verify nested spans across thread boundaries inherit attributes.""" + + def worker_creates_child(): + """Worker thread creates a child span.""" + child = langfuse_client.start_span(name="worker-child-span") + child.end() + + with langfuse_client.start_as_current_span(name="main-parent-span"): + with propagate_attributes(user_id="main_user"): + # Create span in main thread + main_child = langfuse_client.start_span(name="main-child-span") + main_child.end() + + # Create span in worker thread + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(worker_creates_child) + future.result() + + # Verify both spans (main and worker) have user_id + main_child_span = self.get_span_by_name(memory_exporter, "main-child-span") + self.verify_span_attribute( + main_child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "main_user" + ) + + worker_child_span = self.get_span_by_name(memory_exporter, "worker-child-span") + self.verify_span_attribute( + worker_child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "main_user" + ) + + def test_worker_thread_can_override_propagated_attrs( + self, langfuse_client, memory_exporter + ): + """Verify worker thread can override propagated attributes.""" + + def worker_overrides_user(): + """Worker thread sets its own user_id.""" + with propagate_attributes(user_id="worker_user"): + span = langfuse_client.start_span(name="worker-span") + span.end() + + with langfuse_client.start_as_current_span(name="main-span"): + with propagate_attributes(user_id="main_user"): + # Create span in main thread + main_span = langfuse_client.start_span(name="main-child-span") + main_span.end() + + # Worker overrides with its own user_id + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(worker_overrides_user) + future.result() + + # Verify: main span has main_user, worker span has worker_user + main_child = self.get_span_by_name(memory_exporter, "main-child-span") + self.verify_span_attribute( + main_child, LangfuseOtelSpanAttributes.TRACE_USER_ID, "main_user" + ) + + worker_span = self.get_span_by_name(memory_exporter, "worker-span") + self.verify_span_attribute( + worker_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "worker_user" + ) + + def test_multiple_workers_with_same_propagated_context( + self, langfuse_client, memory_exporter + ): + """Verify multiple workers all inherit same propagated context.""" + + def worker_function(worker_id: int): + """Worker creates a span.""" + span = langfuse_client.start_span(name=f"worker-{worker_id}") + span.end() + + with langfuse_client.start_as_current_span(name="main-span"): + with propagate_attributes(session_id="shared_session"): + # Submit 5 workers + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(worker_function, i) for i in range(5)] + concurrent.futures.wait(futures) + + # Verify all 5 workers have same session_id + for i in range(5): + worker_span = self.get_span_by_name(memory_exporter, f"worker-{i}") + self.verify_span_attribute( + worker_span, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "shared_session", + ) + + def test_concurrent_traces_with_different_attributes( + self, langfuse_client, memory_exporter + ): + """Verify concurrent traces with different attributes don't mix.""" + + def create_trace(trace_id: int): + """Create a trace with unique user_id.""" + with langfuse_client.start_as_current_span(name=f"trace-{trace_id}"): + with propagate_attributes(user_id=f"user_{trace_id}"): + span = langfuse_client.start_span(name=f"span-{trace_id}") + span.end() + + # Create 10 traces concurrently + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + futures = [executor.submit(create_trace, i) for i in range(10)] + concurrent.futures.wait(futures) + + # Verify each trace has its correct user_id (no mixing) + for i in range(10): + span = self.get_span_by_name(memory_exporter, f"span-{i}") + self.verify_span_attribute( + span, LangfuseOtelSpanAttributes.TRACE_USER_ID, f"user_{i}" + ) + + def test_exception_in_worker_preserves_context( + self, langfuse_client, memory_exporter + ): + """Verify exception in worker doesn't corrupt main thread context.""" + + def worker_raises_exception(): + """Worker creates span then raises exception.""" + span = langfuse_client.start_span(name="worker-span") + span.end() + raise ValueError("Test exception") + + with langfuse_client.start_as_current_span(name="main-span"): + with propagate_attributes(user_id="main_user"): + # Create span before worker + span1 = langfuse_client.start_span(name="span-before") + span1.end() + + # Worker raises exception (catch it) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(worker_raises_exception) + try: + future.result() + except ValueError: + pass # Expected + + # Create span after exception + span2 = langfuse_client.start_span(name="span-after") + span2.end() + + # Verify both main thread spans still have correct user_id + span_before = self.get_span_by_name(memory_exporter, "span-before") + self.verify_span_attribute( + span_before, LangfuseOtelSpanAttributes.TRACE_USER_ID, "main_user" + ) + + span_after = self.get_span_by_name(memory_exporter, "span-after") + self.verify_span_attribute( + span_after, LangfuseOtelSpanAttributes.TRACE_USER_ID, "main_user" + ) + + +class TestPropagateAttributesCrossTracer(TestPropagateAttributesBase): + """Tests for propagate_attributes with different OpenTelemetry tracers.""" + + def test_different_tracer_spans_get_attributes( + self, langfuse_client, memory_exporter, tracer_provider + ): + """Verify spans from different tracers get propagated attributes.""" + # Get a different tracer (not the Langfuse tracer) + other_tracer = tracer_provider.get_tracer("other-library", "1.0.0") + + with langfuse_client.start_as_current_span(name="langfuse-parent"): + with propagate_attributes(user_id="user_123", session_id="session_abc"): + # Create span with Langfuse tracer + langfuse_span = langfuse_client.start_span(name="langfuse-child") + langfuse_span.end() + + # Create span with different tracer + with other_tracer.start_as_current_span(name="other-library-span"): + pass + + # Verify both spans have the propagated attributes + langfuse_span_data = self.get_span_by_name(memory_exporter, "langfuse-child") + self.verify_span_attribute( + langfuse_span_data, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "user_123", + ) + self.verify_span_attribute( + langfuse_span_data, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "session_abc", + ) + + other_span_data = self.get_span_by_name(memory_exporter, "other-library-span") + self.verify_span_attribute( + other_span_data, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "user_123", + ) + self.verify_span_attribute( + other_span_data, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "session_abc", + ) + + def test_nested_spans_from_multiple_tracers( + self, langfuse_client, memory_exporter, tracer_provider + ): + """Verify nested spans from multiple tracers all get propagated attributes.""" + tracer_a = tracer_provider.get_tracer("library-a", "1.0.0") + tracer_b = tracer_provider.get_tracer("library-b", "2.0.0") + + with langfuse_client.start_as_current_span(name="root"): + with propagate_attributes( + user_id="user_123", metadata={"experiment": "cross_tracer"} + ): + # Create nested spans from different tracers + with tracer_a.start_as_current_span(name="library-a-span"): + with tracer_b.start_as_current_span(name="library-b-span"): + langfuse_leaf = langfuse_client.start_span(name="langfuse-leaf") + langfuse_leaf.end() + + # Verify all spans have the attributes + for span_name in ["library-a-span", "library-b-span", "langfuse-leaf"]: + span_data = self.get_span_by_name(memory_exporter, span_name) + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "user_123", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment", + "cross_tracer", + ) + + def test_other_tracer_span_before_propagate_context( + self, langfuse_client, memory_exporter, tracer_provider + ): + """Verify spans created before propagate_attributes don't get attributes.""" + other_tracer = tracer_provider.get_tracer("other-library", "1.0.0") + + with langfuse_client.start_as_current_span(name="root"): + # Create span BEFORE propagate_attributes + with other_tracer.start_as_current_span(name="span-before"): + pass + + # NOW set attributes + with propagate_attributes(user_id="user_123"): + # Create span AFTER propagate_attributes + with other_tracer.start_as_current_span(name="span-after"): + pass + + # Verify: span-before does NOT have user_id, span-after DOES + span_before = self.get_span_by_name(memory_exporter, "span-before") + self.verify_missing_attribute( + span_before, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + span_after = self.get_span_by_name(memory_exporter, "span-after") + self.verify_span_attribute( + span_after, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + def test_mixed_tracers_with_metadata( + self, langfuse_client, memory_exporter, tracer_provider + ): + """Verify metadata propagates correctly to spans from different tracers.""" + other_tracer = tracer_provider.get_tracer("instrumented-library", "1.0.0") + + with langfuse_client.start_as_current_span(name="main"): + with propagate_attributes( + metadata={ + "env": "production", + "version": "2.0", + "feature_flag": "enabled", + } + ): + # Create spans from both tracers + langfuse_span = langfuse_client.start_span(name="langfuse-operation") + langfuse_span.end() + + with other_tracer.start_as_current_span(name="library-operation"): + pass + + # Verify both spans have all metadata + for span_name in ["langfuse-operation", "library-operation"]: + span_data = self.get_span_by_name(memory_exporter, span_name) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "production", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.version", + "2.0", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.feature_flag", + "enabled", + ) + + def test_propagate_without_langfuse_parent( + self, langfuse_client, memory_exporter, tracer_provider + ): + """Verify propagate_attributes works even when parent span is from different tracer.""" + other_tracer = tracer_provider.get_tracer("other-library", "1.0.0") + + # Parent span is from different tracer + with other_tracer.start_as_current_span(name="other-parent"): + with propagate_attributes(user_id="user_123", session_id="session_xyz"): + # Create children from both tracers + with other_tracer.start_as_current_span(name="other-child"): + pass + + langfuse_child = langfuse_client.start_span(name="langfuse-child") + langfuse_child.end() + + # Verify all spans have attributes (including non-Langfuse parent) + for span_name in ["other-parent", "other-child", "langfuse-child"]: + span_data = self.get_span_by_name(memory_exporter, span_name) + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "user_123", + ) + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "session_xyz", + ) + + def test_attributes_persist_across_tracer_changes( + self, langfuse_client, memory_exporter, tracer_provider + ): + """Verify attributes persist as execution moves between different tracers.""" + tracer_1 = tracer_provider.get_tracer("library-1", "1.0.0") + tracer_2 = tracer_provider.get_tracer("library-2", "1.0.0") + tracer_3 = tracer_provider.get_tracer("library-3", "1.0.0") + + with langfuse_client.start_as_current_span(name="root"): + with propagate_attributes(user_id="persistent_user"): + # Bounce between different tracers + with tracer_1.start_as_current_span(name="step-1"): + pass + + with tracer_2.start_as_current_span(name="step-2"): + with tracer_3.start_as_current_span(name="step-3"): + pass + + langfuse_span = langfuse_client.start_span(name="step-4") + langfuse_span.end() + + # Verify all steps have the user_id + for step_name in ["step-1", "step-2", "step-3", "step-4"]: + span_data = self.get_span_by_name(memory_exporter, step_name) + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "persistent_user", + ) + + +class TestPropagateAttributesAsync(TestPropagateAttributesBase): + """Tests for propagate_attributes with async/await.""" + + @pytest.mark.asyncio + async def test_async_propagation_basic(self, langfuse_client, memory_exporter): + """Verify attributes propagate in async context.""" + + async def async_operation(): + """Async function that creates a span.""" + span = langfuse_client.start_span(name="async-span") + span.end() + + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes(user_id="async_user", session_id="async_session"): + await async_operation() + + # Verify async span has attributes + async_span = self.get_span_by_name(memory_exporter, "async-span") + self.verify_span_attribute( + async_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "async_user" + ) + self.verify_span_attribute( + async_span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "async_session" + ) + + @pytest.mark.asyncio + async def test_async_nested_operations(self, langfuse_client, memory_exporter): + """Verify attributes propagate through nested async operations.""" + + async def level_3(): + span = langfuse_client.start_span(name="level-3-span") + span.end() + + async def level_2(): + span = langfuse_client.start_span(name="level-2-span") + span.end() + await level_3() + + async def level_1(): + span = langfuse_client.start_span(name="level-1-span") + span.end() + await level_2() + + with langfuse_client.start_as_current_span(name="root"): + with propagate_attributes( + user_id="nested_user", metadata={"level": "nested"} + ): + await level_1() + + # Verify all levels have attributes + for span_name in ["level-1-span", "level-2-span", "level-3-span"]: + span_data = self.get_span_by_name(memory_exporter, span_name) + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "nested_user" + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.level", + "nested", + ) + + @pytest.mark.asyncio + async def test_async_context_manager(self, langfuse_client, memory_exporter): + """Verify propagate_attributes works as context manager in async function.""" + with langfuse_client.start_as_current_span(name="parent"): + # propagate_attributes supports both sync and async contexts via regular 'with' + with propagate_attributes(user_id="async_ctx_user"): + span = langfuse_client.start_span(name="inside-async-ctx") + span.end() + + span_data = self.get_span_by_name(memory_exporter, "inside-async-ctx") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "async_ctx_user" + ) + + @pytest.mark.asyncio + async def test_multiple_async_tasks_concurrent( + self, langfuse_client, memory_exporter + ): + """Verify context isolation between concurrent async tasks.""" + import asyncio + + async def create_trace_with_user(user_id: str): + """Create a trace with specific user_id.""" + with langfuse_client.start_as_current_span(name=f"trace-{user_id}"): + with propagate_attributes(user_id=user_id): + await asyncio.sleep(0.01) # Simulate async work + span = langfuse_client.start_span(name=f"span-{user_id}") + span.end() + + # Run multiple traces concurrently + await asyncio.gather( + create_trace_with_user("user1"), + create_trace_with_user("user2"), + create_trace_with_user("user3"), + ) + + # Verify each trace has correct user_id (no mixing) + for user_id in ["user1", "user2", "user3"]: + span_data = self.get_span_by_name(memory_exporter, f"span-{user_id}") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, user_id + ) + + @pytest.mark.asyncio + async def test_async_with_sync_nested(self, langfuse_client, memory_exporter): + """Verify attributes propagate from async to sync code.""" + + def sync_operation(): + """Sync function called from async context.""" + span = langfuse_client.start_span(name="sync-in-async") + span.end() + + async def async_operation(): + """Async function that calls sync code.""" + span1 = langfuse_client.start_span(name="async-span") + span1.end() + sync_operation() + + with langfuse_client.start_as_current_span(name="root"): + with propagate_attributes(user_id="mixed_user"): + await async_operation() + + # Verify both spans have attributes + async_span = self.get_span_by_name(memory_exporter, "async-span") + self.verify_span_attribute( + async_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "mixed_user" + ) + + sync_span = self.get_span_by_name(memory_exporter, "sync-in-async") + self.verify_span_attribute( + sync_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "mixed_user" + ) + + @pytest.mark.asyncio + async def test_async_exception_preserves_context( + self, langfuse_client, memory_exporter + ): + """Verify context is preserved even when async operation raises exception.""" + + async def failing_operation(): + """Async operation that raises exception.""" + span = langfuse_client.start_span(name="span-before-error") + span.end() + raise ValueError("Test error") + + with langfuse_client.start_as_current_span(name="root"): + with propagate_attributes(user_id="error_user"): + span1 = langfuse_client.start_span(name="span-before-async") + span1.end() + + try: + await failing_operation() + except ValueError: + pass # Expected + + span2 = langfuse_client.start_span(name="span-after-error") + span2.end() + + # Verify all spans have attributes + for span_name in ["span-before-async", "span-before-error", "span-after-error"]: + span_data = self.get_span_by_name(memory_exporter, span_name) + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "error_user" + ) + + @pytest.mark.asyncio + async def test_async_with_metadata(self, langfuse_client, memory_exporter): + """Verify metadata propagates correctly in async context.""" + + async def async_with_metadata(): + span = langfuse_client.start_span(name="async-metadata-span") + span.end() + + with langfuse_client.start_as_current_span(name="root"): + with propagate_attributes( + user_id="metadata_user", + metadata={"async": "true", "operation": "test"}, + ): + await async_with_metadata() + + span_data = self.get_span_by_name(memory_exporter, "async-metadata-span") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "metadata_user" + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.async", + "true", + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.operation", + "test", + ) + + +class TestPropagateAttributesBaggage(TestPropagateAttributesBase): + """Tests for as_baggage=True parameter and OpenTelemetry baggage propagation.""" + + def test_baggage_is_set_when_as_baggage_true(self, langfuse_client): + """Verify baggage entries are created with correct keys when as_baggage=True.""" + from opentelemetry import baggage + from opentelemetry import context as otel_context + + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes( + user_id="user_123", + session_id="session_abc", + metadata={"env": "test", "version": "2.0"}, + as_baggage=True, + ): + # Get current context and inspect baggage + current_context = otel_context.get_current() + baggage_entries = baggage.get_all(context=current_context) + + # Verify baggage entries exist with correct keys + assert "langfuse_user_id" in baggage_entries + assert baggage_entries["langfuse_user_id"] == "user_123" + + assert "langfuse_session_id" in baggage_entries + assert baggage_entries["langfuse_session_id"] == "session_abc" + + assert "langfuse_metadata_env" in baggage_entries + assert baggage_entries["langfuse_metadata_env"] == "test" + + assert "langfuse_metadata_version" in baggage_entries + assert baggage_entries["langfuse_metadata_version"] == "2.0" + + def test_spans_receive_attributes_from_baggage( + self, langfuse_client, memory_exporter + ): + """Verify child spans get attributes when parent uses as_baggage=True.""" + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes( + user_id="baggage_user", + session_id="baggage_session", + metadata={"source": "baggage"}, + as_baggage=True, + ): + # Create child span + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child span has all attributes + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "baggage_user" + ) + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "baggage_session", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.source", + "baggage", + ) + + def test_baggage_disabled_by_default(self, langfuse_client): + """Verify as_baggage=False (default) doesn't create baggage entries.""" + from opentelemetry import baggage + from opentelemetry import context as otel_context + + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes( + user_id="user_123", + session_id="session_abc", + ): + # Get current context and inspect baggage + current_context = otel_context.get_current() + baggage_entries = baggage.get_all(context=current_context) + assert len(baggage_entries) == 0 + + def test_metadata_key_with_user_id_substring_doesnt_collide( + self, langfuse_client, memory_exporter + ): + """Verify metadata key containing 'user_id' substring doesn't map to TRACE_USER_ID.""" + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes( + metadata={"user_info": "some_data", "user_id_copy": "another"}, + as_baggage=True, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + + # Should NOT have TRACE_USER_ID attribute + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID + ) + + # Should have metadata attributes with correct keys + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.user_info", + "some_data", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.user_id_copy", + "another", + ) + + def test_metadata_key_with_session_substring_doesnt_collide( + self, langfuse_client, memory_exporter + ): + """Verify metadata key containing 'session_id' substring doesn't map to TRACE_SESSION_ID.""" + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes( + metadata={"session_data": "value1", "session_id_backup": "value2"}, + as_baggage=True, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + + # Should NOT have TRACE_SESSION_ID attribute + self.verify_missing_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID + ) + + # Should have metadata attributes with correct keys + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.session_data", + "value1", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.session_id_backup", + "value2", + ) + + def test_metadata_keys_extract_correctly_from_baggage( + self, langfuse_client, memory_exporter + ): + """Verify metadata keys are correctly formatted in baggage and extracted back.""" + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes( + metadata={ + "env": "production", + "region": "us-west", + "experiment_id": "exp_123", + }, + as_baggage=True, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + + # All metadata should be under TRACE_METADATA prefix + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "production", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.region", + "us-west", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.experiment_id", + "exp_123", + ) + + def test_baggage_and_context_both_propagate(self, langfuse_client, memory_exporter): + """Verify attributes propagate when both baggage and context mechanisms are active.""" + with langfuse_client.start_as_current_span(name="parent"): + # Enable baggage + with propagate_attributes( + user_id="user_both", + session_id="session_both", + metadata={"source": "both"}, + as_baggage=True, + ): + # Create multiple levels of nesting + with langfuse_client.start_as_current_span(name="middle"): + child = langfuse_client.start_span(name="leaf") + child.end() + + # Verify all spans have attributes + for span_name in ["parent", "middle", "leaf"]: + span_data = self.get_span_by_name(memory_exporter, span_name) + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_both" + ) + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "session_both" + ) + self.verify_span_attribute( + span_data, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.source", + "both", + ) + + def test_baggage_survives_context_isolation(self, langfuse_client, memory_exporter): + """Simulate cross-process scenario: baggage persists when context is detached/reattached.""" + from opentelemetry import context as otel_context + + # Step 1: Create context with baggage + with langfuse_client.start_as_current_span(name="original-process"): + with propagate_attributes( + user_id="cross_process_user", + session_id="cross_process_session", + as_baggage=True, + ): + # Capture the context with baggage + context_with_baggage = otel_context.get_current() + + # Step 2: Simulate "remote" process by creating span in saved context + # This mimics what happens when receiving an HTTP request with baggage headers + token = otel_context.attach(context_with_baggage) + try: + with langfuse_client.start_as_current_span(name="remote-process"): + child = langfuse_client.start_span(name="remote-child") + child.end() + finally: + otel_context.detach(token) + + # Verify remote spans have the propagated attributes from baggage + remote_child = self.get_span_by_name(memory_exporter, "remote-child") + self.verify_span_attribute( + remote_child, + LangfuseOtelSpanAttributes.TRACE_USER_ID, + "cross_process_user", + ) + self.verify_span_attribute( + remote_child, + LangfuseOtelSpanAttributes.TRACE_SESSION_ID, + "cross_process_session", + ) + + +class TestPropagateAttributesVersion(TestPropagateAttributesBase): + """Tests for version parameter propagation.""" + + def test_version_propagates_to_child_spans(self, langfuse_client, memory_exporter): + """Verify version propagates to all child spans within context.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(version="v1.2.3"): + child1 = langfuse_client.start_span(name="child-span-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-span-2") + child2.end() + + # Verify both children have version + child1_span = self.get_span_by_name(memory_exporter, "child-span-1") + self.verify_span_attribute( + child1_span, + LangfuseOtelSpanAttributes.VERSION, + "v1.2.3", + ) + + child2_span = self.get_span_by_name(memory_exporter, "child-span-2") + self.verify_span_attribute( + child2_span, + LangfuseOtelSpanAttributes.VERSION, + "v1.2.3", + ) + + def test_version_with_user_and_session(self, langfuse_client, memory_exporter): + """Verify version works together with user_id and session_id.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + user_id="user_123", + session_id="session_abc", + version="2.0.0", + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child has all attributes + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "session_abc" + ) + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.VERSION, "2.0.0" + ) + + def test_version_with_metadata(self, langfuse_client, memory_exporter): + """Verify version works together with metadata.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + version="1.0.0", + metadata={"env": "production", "region": "us-east"}, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.VERSION, "1.0.0" + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "production", + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.region", + "us-east", + ) + + def test_version_validation_over_200_chars(self, langfuse_client, memory_exporter): + """Verify version over 200 characters is dropped with warning.""" + long_version = "v" + "1.0.0" * 50 # Create a very long version string + + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(version=long_version): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child does NOT have version + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute(child_span, LangfuseOtelSpanAttributes.VERSION) + + def test_version_exactly_200_chars(self, langfuse_client, memory_exporter): + """Verify exactly 200 character version is accepted.""" + version_200 = "v" * 200 + + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(version=version_200): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child HAS version + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.VERSION, version_200 + ) + + def test_version_nested_contexts_inner_overwrites( + self, langfuse_client, memory_exporter + ): + """Verify inner context overwrites outer version.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(version="1.0.0"): + # Create span in outer context + span1 = langfuse_client.start_span(name="span-1") + span1.end() + + # Inner context with different version + with propagate_attributes(version="2.0.0"): + span2 = langfuse_client.start_span(name="span-2") + span2.end() + + # Back to outer context + span3 = langfuse_client.start_span(name="span-3") + span3.end() + + # Verify: span1 and span3 have version 1.0.0, span2 has 2.0.0 + span1_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span1_data, LangfuseOtelSpanAttributes.VERSION, "1.0.0" + ) + + span2_data = self.get_span_by_name(memory_exporter, "span-2") + self.verify_span_attribute( + span2_data, LangfuseOtelSpanAttributes.VERSION, "2.0.0" + ) + + span3_data = self.get_span_by_name(memory_exporter, "span-3") + self.verify_span_attribute( + span3_data, LangfuseOtelSpanAttributes.VERSION, "1.0.0" + ) + + def test_version_with_baggage(self, langfuse_client, memory_exporter): + """Verify version propagates through baggage.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + version="baggage_version", + user_id="user_123", + as_baggage=True, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child has version + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.VERSION, "baggage_version" + ) + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + + def test_version_semantic_versioning_formats( + self, langfuse_client, memory_exporter + ): + """Verify various semantic versioning formats work correctly.""" + test_versions = [ + "1.0.0", + "v2.3.4", + "1.0.0-alpha", + "2.0.0-beta.1", + "3.1.4-rc.2+build.123", + "0.1.0", + ] + + with langfuse_client.start_as_current_span(name="parent-span"): + for idx, version in enumerate(test_versions): + with propagate_attributes(version=version): + span = langfuse_client.start_span(name=f"span-{idx}") + span.end() + + # Verify all versions are correctly set + for idx, expected_version in enumerate(test_versions): + span_data = self.get_span_by_name(memory_exporter, f"span-{idx}") + self.verify_span_attribute( + span_data, LangfuseOtelSpanAttributes.VERSION, expected_version + ) + + def test_version_non_string_dropped(self, langfuse_client, memory_exporter): + """Verify non-string version is dropped with warning.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(version=123): # type: ignore + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child does NOT have version + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute(child_span, LangfuseOtelSpanAttributes.VERSION) + + def test_version_propagates_to_grandchildren( + self, langfuse_client, memory_exporter + ): + """Verify version propagates through multiple levels of nesting.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(version="nested_v1"): + with langfuse_client.start_as_current_span(name="child-span"): + grandchild = langfuse_client.start_span(name="grandchild-span") + grandchild.end() + + # Verify all three levels have version + parent_span = self.get_span_by_name(memory_exporter, "parent-span") + child_span = self.get_span_by_name(memory_exporter, "child-span") + grandchild_span = self.get_span_by_name(memory_exporter, "grandchild-span") + + for span in [parent_span, child_span, grandchild_span]: + self.verify_span_attribute( + span, LangfuseOtelSpanAttributes.VERSION, "nested_v1" + ) + + @pytest.mark.asyncio + async def test_version_with_async(self, langfuse_client, memory_exporter): + """Verify version propagates in async context.""" + + async def async_operation(): + span = langfuse_client.start_span(name="async-span") + span.end() + + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes(version="async_v1.0"): + await async_operation() + + async_span = self.get_span_by_name(memory_exporter, "async-span") + self.verify_span_attribute( + async_span, LangfuseOtelSpanAttributes.VERSION, "async_v1.0" + ) + + def test_version_attribute_key_format(self, langfuse_client, memory_exporter): + """Verify version uses correct attribute key format.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(version="key_test_v1"): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + attributes = child_span["attributes"] + + # Verify exact attribute key + assert LangfuseOtelSpanAttributes.VERSION in attributes + assert attributes[LangfuseOtelSpanAttributes.VERSION] == "key_test_v1" + + +class TestPropagateAttributesTags(TestPropagateAttributesBase): + """Tests for tags parameter propagation.""" + + def test_tags_propagate_to_child_spans(self, langfuse_client, memory_exporter): + """Verify tags propagate to all child spans within context.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(tags=["production", "api-v2", "critical"]): + child1 = langfuse_client.start_span(name="child-span-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-span-2") + child2.end() + + # Verify both children have tags + child1_span = self.get_span_by_name(memory_exporter, "child-span-1") + self.verify_span_attribute( + child1_span, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple(["production", "api-v2", "critical"]), + ) + + child2_span = self.get_span_by_name(memory_exporter, "child-span-2") + self.verify_span_attribute( + child2_span, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple(["production", "api-v2", "critical"]), + ) + + def test_tags_with_single_tag(self, langfuse_client, memory_exporter): + """Verify single tag works correctly.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(tags=["experiment"]): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple(["experiment"]), + ) + + def test_empty_tags_list(self, langfuse_client, memory_exporter): + """Verify empty tags list is handled correctly.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(tags=[]): + child = langfuse_client.start_span(name="child-span") + child.end() + + # With empty list, tags should not be set + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_missing_attribute(child_span, LangfuseOtelSpanAttributes.TRACE_TAGS) + + def test_tags_with_user_and_session(self, langfuse_client, memory_exporter): + """Verify tags work together with user_id and session_id.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + user_id="user_123", + session_id="session_abc", + tags=["test", "debug"], + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child has all attributes + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_USER_ID, "user_123" + ) + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_SESSION_ID, "session_abc" + ) + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple(["test", "debug"]), + ) + + def test_tags_with_metadata(self, langfuse_client, memory_exporter): + """Verify tags work together with metadata.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + tags=["experiment-a", "variant-1"], + metadata={"env": "staging"}, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple(["experiment-a", "variant-1"]), + ) + self.verify_span_attribute( + child_span, + f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.env", + "staging", + ) + + def test_tags_validation_with_invalid_tag(self, langfuse_client, memory_exporter): + """Verify tags with one invalid entry drops all tags.""" + long_tag = "x" * 201 # Over 200 chars + + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(tags=["valid_tag", long_tag]): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, LangfuseOtelSpanAttributes.TRACE_TAGS, tuple(["valid_tag"]) + ) + + def test_tags_nested_contexts_inner_appends(self, langfuse_client, memory_exporter): + """Verify inner context appends to outer tags.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(tags=["outer", "tag1"]): + # Create span in outer context + span1 = langfuse_client.start_span(name="span-1") + span1.end() + + # Inner context with more tags + with propagate_attributes(tags=["inner", "tag2"]): + span2 = langfuse_client.start_span(name="span-2") + span2.end() + + # Back to outer context + span3 = langfuse_client.start_span(name="span-3") + span3.end() + + # Verify: span1 and span3 have outer tags, span2 has inner tags + span1_data = self.get_span_by_name(memory_exporter, "span-1") + self.verify_span_attribute( + span1_data, LangfuseOtelSpanAttributes.TRACE_TAGS, tuple(["outer", "tag1"]) + ) + + span2_data = self.get_span_by_name(memory_exporter, "span-2") + self.verify_span_attribute( + span2_data, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple( + [ + "outer", + "tag1", + "inner", + "tag2", + ] + ), + ) + + span3_data = self.get_span_by_name(memory_exporter, "span-3") + self.verify_span_attribute( + span3_data, LangfuseOtelSpanAttributes.TRACE_TAGS, tuple(["outer", "tag1"]) + ) + + def test_tags_with_baggage(self, langfuse_client, memory_exporter): + """Verify tags propagate through baggage.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes( + tags=["baggage_tag1", "baggage_tag2"], + as_baggage=True, + ): + child = langfuse_client.start_span(name="child-span") + child.end() + + # Verify child has tags + child_span = self.get_span_by_name(memory_exporter, "child-span") + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple(["baggage_tag1", "baggage_tag2"]), + ) + + def test_tags_propagate_to_grandchildren(self, langfuse_client, memory_exporter): + """Verify tags propagate through multiple levels of nesting.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(tags=["level1", "level2", "level3"]): + with langfuse_client.start_as_current_span(name="child-span"): + grandchild = langfuse_client.start_span(name="grandchild-span") + grandchild.end() + + # Verify all three levels have tags + parent_span = self.get_span_by_name(memory_exporter, "parent-span") + child_span = self.get_span_by_name(memory_exporter, "child-span") + grandchild_span = self.get_span_by_name(memory_exporter, "grandchild-span") + + for span in [parent_span, child_span, grandchild_span]: + self.verify_span_attribute( + span, + LangfuseOtelSpanAttributes.TRACE_TAGS, + tuple(["level1", "level2", "level3"]), + ) + + @pytest.mark.asyncio + async def test_tags_with_async(self, langfuse_client, memory_exporter): + """Verify tags propagate in async context.""" + + async def async_operation(): + span = langfuse_client.start_span(name="async-span") + span.end() + + with langfuse_client.start_as_current_span(name="parent"): + with propagate_attributes(tags=["async", "test"]): + await async_operation() + + async_span = self.get_span_by_name(memory_exporter, "async-span") + self.verify_span_attribute( + async_span, LangfuseOtelSpanAttributes.TRACE_TAGS, tuple(["async", "test"]) + ) + + def test_tags_attribute_key_format(self, langfuse_client, memory_exporter): + """Verify tags use correct attribute key format.""" + with langfuse_client.start_as_current_span(name="parent-span"): + with propagate_attributes(tags=["key_test"]): + child = langfuse_client.start_span(name="child-span") + child.end() + + child_span = self.get_span_by_name(memory_exporter, "child-span") + attributes = child_span["attributes"] + + # Verify exact attribute key + assert LangfuseOtelSpanAttributes.TRACE_TAGS in attributes + assert attributes[LangfuseOtelSpanAttributes.TRACE_TAGS] == tuple(["key_test"]) + + +class TestPropagateAttributesExperiment(TestPropagateAttributesBase): + """Tests for experiment attribute propagation.""" + + def test_experiment_attributes_propagate_without_dataset( + self, langfuse_client, memory_exporter + ): + """Test experiment attribute propagation with local data (no Langfuse dataset).""" + # Create local dataset with metadata + local_data = [ + { + "input": "test input 1", + "expected_output": "expected result 1", + "metadata": {"item_type": "test", "priority": "high"}, + }, + ] + + # Task function that creates child spans + def task_with_child_spans(*, item, **kwargs): + # Create child spans to verify propagation + child1 = langfuse_client.start_span(name="child-span-1") + child1.end() + + child2 = langfuse_client.start_span(name="child-span-2") + child2.end() + + return f"processed: {item.get('input') if isinstance(item, dict) else item.input}" + + # Run experiment with local data + experiment_metadata = {"version": "1.0", "model": "test-model"} + result = langfuse_client.run_experiment( + name="Test Experiment", + description="Test experiment description", + data=local_data, + task=task_with_child_spans, + metadata=experiment_metadata, + ) + + # Flush to ensure spans are exported + langfuse_client.flush() + time.sleep(0.1) + + # Get the root span + root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run") + assert len(root_spans) >= 1, "Should have at least 1 root span" + first_root = root_spans[0] + + # Root-only attributes should be on root + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION, + "Test experiment description", + ) + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT, + _serialize("expected result 1"), + ) + + # Propagated attributes should also be on root + experiment_id = first_root["attributes"][ + LangfuseOtelSpanAttributes.EXPERIMENT_ID + ] + experiment_item_id = first_root["attributes"][ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID + ] + root_observation_id = first_root["attributes"][ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID + ] + + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_NAME, + result.run_name, + ) + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_METADATA, + _serialize(experiment_metadata), + ) + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_METADATA, + _serialize({"item_type": "test", "priority": "high"}), + ) + + # Environment should be set to sdk-experiment + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ) + + # Dataset ID should not be set for local data + self.verify_missing_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID, + ) + + # Verify child spans have propagated attributes but NOT root-only attributes + child_spans = self.get_spans_by_name( + memory_exporter, "child-span-1" + ) + self.get_spans_by_name(memory_exporter, "child-span-2") + + assert len(child_spans) >= 2, "Should have at least 2 child spans" + + for child_span in child_spans[:2]: # Check first item's children + # Propagated attributes should be present + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ID, + experiment_id, + ) + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_NAME, + result.run_name, + ) + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_METADATA, + _serialize(experiment_metadata), + ) + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID, + experiment_item_id, + ) + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID, + root_observation_id, + ) + + # Environment should be propagated to children + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ) + + # Root-only attributes should NOT be present on children + self.verify_missing_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION, + ) + self.verify_missing_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT, + ) + + # Dataset ID should not be set for local data + self.verify_missing_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID, + ) + + def test_experiment_attributes_propagate_with_dataset( + self, langfuse_client, memory_exporter, monkeypatch + ): + """Test experiment attribute propagation with Langfuse dataset.""" + import time + from datetime import datetime + + from langfuse._client.attributes import _serialize + from langfuse._client.datasets import DatasetClient, DatasetItemClient + from langfuse.model import Dataset, DatasetItem, DatasetStatus + + # Mock the async API to create dataset run items + async def mock_create_dataset_run_item(*args, **kwargs): + from langfuse.api.resources.dataset_run_items.types import DatasetRunItem + + request = kwargs.get("request") + return DatasetRunItem( + id="mock-run-item-id", + dataset_run_id="mock-dataset-run-id-123", + dataset_item_id=request.datasetItemId if request else "mock-item-id", + trace_id="mock-trace-id", + ) + + monkeypatch.setattr( + langfuse_client.async_api.dataset_run_items, + "create", + mock_create_dataset_run_item, + ) + + # Create a mock dataset with items + dataset_id = "test-dataset-id-456" + dataset_item_id = "test-dataset-item-id-789" + + mock_dataset = Dataset( + id=dataset_id, + name="Test Dataset", + description="Test dataset description", + project_id="test-project-id", + metadata={"test": "metadata"}, + created_at=datetime.now(), + updated_at=datetime.now(), + ) + + mock_dataset_item = DatasetItem( + id=dataset_item_id, + status=DatasetStatus.ACTIVE, + input="Germany", + expected_output="Berlin", + metadata={"source": "dataset", "index": 0}, + source_trace_id=None, + source_observation_id=None, + dataset_id=dataset_id, + dataset_name="Test Dataset", + created_at=datetime.now(), + updated_at=datetime.now(), + ) + + # Create dataset client with items + dataset_item_client = DatasetItemClient(mock_dataset_item, langfuse_client) + dataset = DatasetClient(mock_dataset, [dataset_item_client]) + + # Task with child spans + def task_with_children(*, item, **kwargs): + child1 = langfuse_client.start_span(name="dataset-child-1") + child1.end() + + child2 = langfuse_client.start_span(name="dataset-child-2") + child2.end() + + return f"Capital: {item.expected_output}" + + # Run experiment + experiment_metadata = {"dataset_version": "v2", "test_run": "true"} + dataset.run_experiment( + name="Dataset Test", + description="Dataset experiment description", + task=task_with_children, + metadata=experiment_metadata, + ) + + langfuse_client.flush() + time.sleep(0.1) + + # Verify root has dataset-specific attributes + root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run") + assert len(root_spans) >= 1, "Should have at least 1 root span" + first_root = root_spans[0] + + # Root-only attributes should be on root + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION, + "Dataset experiment description", + ) + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT, + _serialize("Berlin"), + ) + + # Should have dataset ID (this is the key difference from local data) + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID, + dataset_id, + ) + + # Should have the dataset item ID + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID, + dataset_item_id, + ) + + # Should have experiment metadata + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.EXPERIMENT_METADATA, + _serialize(experiment_metadata), + ) + + # Environment should be set to sdk-experiment + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ) + + # Verify child spans have dataset-specific propagated attributes + child_spans = self.get_spans_by_name( + memory_exporter, "dataset-child-1" + ) + self.get_spans_by_name(memory_exporter, "dataset-child-2") + + assert len(child_spans) >= 2, "Should have at least 2 child spans" + + for child_span in child_spans[:2]: + # Dataset ID should be propagated to children + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_DATASET_ID, + dataset_id, + ) + + # Dataset item ID should be propagated + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ID, + dataset_item_id, + ) + + # Experiment metadata should be propagated + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_METADATA, + _serialize(experiment_metadata), + ) + + # Item metadata should be propagated + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_METADATA, + _serialize({"source": "dataset", "index": 0}), + ) + + # Environment should be propagated to children + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ) + + # Root-only attributes should NOT be present on children + self.verify_missing_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION, + ) + self.verify_missing_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT, + ) + + def test_experiment_attributes_propagate_to_nested_children( + self, langfuse_client, memory_exporter + ): + """Test experiment attributes propagate to deeply nested child spans.""" + local_data = [{"input": "test", "expected_output": "result"}] + + # Task with deeply nested spans + def task_with_nested_spans(*, item, **kwargs): + with langfuse_client.start_as_current_span(name="child-span"): + with langfuse_client.start_as_current_span(name="grandchild-span"): + great_grandchild = langfuse_client.start_span( + name="great-grandchild-span" + ) + great_grandchild.end() + + return "processed" + + result = langfuse_client.run_experiment( + name="Nested Test", + description="Nested test", + data=local_data, + task=task_with_nested_spans, + metadata={"depth": "test"}, + ) + + langfuse_client.flush() + time.sleep(0.1) + + root_spans = self.get_spans_by_name(memory_exporter, "experiment-item-run") + first_root = root_spans[0] + experiment_id = first_root["attributes"][ + LangfuseOtelSpanAttributes.EXPERIMENT_ID + ] + root_observation_id = first_root["attributes"][ + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID + ] + + # Verify root has environment set + self.verify_span_attribute( + first_root, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ) + + # Verify all nested children have propagated attributes + for span_name in ["child-span", "grandchild-span", "great-grandchild-span"]: + span_data = self.get_span_by_name(memory_exporter, span_name) + + # Propagated attributes should be present + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.EXPERIMENT_ID, + experiment_id, + ) + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.EXPERIMENT_NAME, + result.run_name, + ) + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_ROOT_OBSERVATION_ID, + root_observation_id, + ) + + # Environment should be propagated to all nested children + self.verify_span_attribute( + span_data, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ) + + # Root-only attributes should NOT be present + self.verify_missing_attribute( + span_data, + LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION, + ) + self.verify_missing_attribute( + span_data, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT, + ) + + def test_experiment_metadata_merging(self, langfuse_client, memory_exporter): + """Test that experiment metadata and item metadata are both propagated correctly.""" + import time + + from langfuse._client.attributes import _serialize + + # Rich metadata + experiment_metadata = { + "experiment_type": "A/B test", + "model_version": "2.0", + "temperature": 0.7, + } + item_metadata = { + "item_category": "finance", + "difficulty": "hard", + "language": "en", + } + + local_data = [ + { + "input": "test", + "expected_output": {"status": "success"}, + "metadata": item_metadata, + } + ] + + def task_with_child(*, item, **kwargs): + child = langfuse_client.start_span(name="metadata-child") + child.end() + return "result" + + langfuse_client.run_experiment( + name="Metadata Test", + description="Metadata test", + data=local_data, + task=task_with_child, + metadata=experiment_metadata, + ) + + langfuse_client.flush() + time.sleep(0.1) + + # Verify root span has environment set + root_span = self.get_span_by_name(memory_exporter, "experiment-item-run") + self.verify_span_attribute( + root_span, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + ) + + # Verify child span has both experiment and item metadata propagated + child_span = self.get_span_by_name(memory_exporter, "metadata-child") + + # Verify experiment metadata is serialized and propagated + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_METADATA, + _serialize(experiment_metadata), + ) + + # Verify item metadata is serialized and propagated + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_METADATA, + _serialize(item_metadata), + ) + + # Verify environment is propagated to child + self.verify_span_attribute( + child_span, + LangfuseOtelSpanAttributes.ENVIRONMENT, + LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT, + )