diff --git a/langfuse/_client/attributes.py b/langfuse/_client/attributes.py index 5ae81000c..9699850ea 100644 --- a/langfuse/_client/attributes.py +++ b/langfuse/_client/attributes.py @@ -23,6 +23,9 @@ from langfuse.model import PromptClient from langfuse.types import MapValue, SpanLevel +from opentelemetry import baggage +import opentelemetry.context as otel_context + class LangfuseOtelSpanAttributes: # Langfuse-Trace attributes @@ -61,6 +64,16 @@ class LangfuseOtelSpanAttributes: AS_ROOT = "langfuse.internal.as_root" +def propagate_attributes( + *, + current_ctx: otel_context.Context, + dict_to_propagate: Dict[str, Any], +) -> otel_context.Context: + for key, value in dict_to_propagate.items(): + current_ctx = baggage.set_baggage(key, str(value), context=current_ctx) + return current_ctx + + def create_trace_attributes( *, name: Optional[str] = None, diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 9fa9c7489..a49ec23cd 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -20,6 +20,7 @@ TYPE_CHECKING, Any, Dict, + Iterator, List, Literal, Optional, @@ -29,7 +30,9 @@ overload, ) +from contextlib import contextmanager from opentelemetry import trace as otel_trace_api +from opentelemetry import context from opentelemetry.util._decorator import _AgnosticContextManager from langfuse.model import PromptClient @@ -42,6 +45,7 @@ create_generation_attributes, create_span_attributes, create_trace_attributes, + propagate_attributes, ) from langfuse._client.constants import ( ObservationTypeLiteral, @@ -203,6 +207,7 @@ def end(self, *, end_time: Optional[int] = None) -> "LangfuseObservationWrapper" return self + @contextmanager def update_trace( self, *, @@ -215,7 +220,7 @@ def update_trace( metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None, - ) -> "LangfuseObservationWrapper": + ) -> Iterator[None]: """Update the trace that this span belongs to. This method updates trace-level attributes of the trace that this span @@ -234,7 +239,7 @@ def update_trace( public: Whether the trace should be publicly accessible """ if not self._otel_span.is_recording(): - return self + yield media_processed_input = self._process_media_and_apply_mask( data=input, field="input", span=self._otel_span @@ -258,9 +263,19 @@ def update_trace( public=public, ) + ctx = otel_trace_api.set_span_in_context(self._otel_span) + ctx = propagate_attributes( + current_ctx=ctx, + dict_to_propagate=attributes, + ) + + token = context.attach(ctx) self._otel_span.set_attributes(attributes) - return self + try: + yield + finally: + context.detach(token) @overload def score( diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index ca8fb9b5a..ce0a5644f 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -15,9 +15,11 @@ import os from typing import Dict, List, Optional +from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.processor.baggage import BaggageSpanProcessor, ALLOW_ALL_BAGGAGE_KEYS from langfuse._client.constants import LANGFUSE_TRACER_NAME from langfuse._client.environment_variables import ( @@ -65,6 +67,10 @@ def __init__( else [] ) + # Initialize a BaggageSpanProcessor so baggage keys are attached to spans. + # Allow all baggage keys by default (same behaviour as before). + self._baggage_processor = BaggageSpanProcessor(ALLOW_ALL_BAGGAGE_KEYS) + env_flush_at = os.environ.get(LANGFUSE_FLUSH_AT, None) flush_at = flush_at or int(env_flush_at) if env_flush_at is not None else None @@ -105,6 +111,16 @@ def __init__( else None, ) + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: + # Forward to baggage processor first so baggage can be attached early. + self._baggage_processor.on_start(span, parent_context) + + # Call parent on_start if it exists (no-op for BatchSpanProcessor but safe). + try: + super().on_start(span, parent_context) + except TypeError: + super().on_start(span) + def on_end(self, span: ReadableSpan) -> None: # Only export spans that belong to the scoped project # This is important to not send spans to wrong project in multi-project setups diff --git a/poetry.lock b/poetry.lock index d88f11dbb..15f41e0b1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.0 and should not be changed by hand. [[package]] name = "annotated-types" @@ -1139,6 +1139,23 @@ opentelemetry-sdk = ">=1.36.0,<1.37.0" requests = ">=2.7,<3.0" typing-extensions = ">=4.5.0" +[[package]] +name = "opentelemetry-processor-baggage" +version = "0.58b0" +description = "OpenTelemetry Baggage Span Processor" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_processor_baggage-0.58b0-py3-none-any.whl", hash = "sha256:eacad51f9f3714a76daccb2a94dde0402838c97479d6848283a1f06457cb6ce4"}, + {file = "opentelemetry_processor_baggage-0.58b0.tar.gz", hash = "sha256:a99a656d4867f9b96e0da31e2f9ef0d1c6136c0122472411618936c5e6205c77"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.5,<2.0" +opentelemetry-sdk = ">=1.5,<2.0" +wrapt = ">=1.0.0,<2.0.0" + [[package]] name = "opentelemetry-proto" version = "1.36.0" @@ -2853,4 +2870,4 @@ openai = ["openai"] [metadata] lock-version = "2.1" python-versions = ">=3.9,<4.0" -content-hash = "83ae81e7b9fd90ae8000dc0ac491ff766b899b166a5fc895043d0555267e288c" +content-hash = "f41f23b39f79795827754dceb93c38e1038c613246e790d3b4e193fc265df15b" diff --git a/pyproject.toml b/pyproject.toml index ff5ebafac..2627d33cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ requests = "^2" opentelemetry-api = "^1.33.1" opentelemetry-sdk = "^1.33.1" opentelemetry-exporter-otlp-proto-http = "^1.33.1" +opentelemetry-processor-baggage = "^0.58b0" [tool.poetry.group.dev.dependencies] pytest = ">=7.4,<9.0" diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index f29851d84..93493b383 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -606,24 +606,24 @@ def test_score_trace_nested_observation(): # Create a parent span and set trace name with langfuse.start_as_current_span(name="parent-span") as parent_span: - parent_span.update_trace(name=trace_name) - - # Create a child span - child_span = langfuse.start_span(name="span") + # Set trace name and trace metadata as attributes on parent-span _and_ span using context manager. + with parent_span.update_trace(name=trace_name, metadata={"foo": "bar"}): + # Create a child span + child_span = langfuse.start_span(name="span") - # Score the child span - child_span.score( - name="valuation", - value=0.5, - comment="This is a comment", - ) + # Score the child span + child_span.score( + name="valuation", + value=0.5, + comment="This is a comment", + ) - # Get IDs for verification - child_span_id = child_span.id - trace_id = parent_span.trace_id + # Get IDs for verification + child_span_id = child_span.id + trace_id = parent_span.trace_id - # End the child span - child_span.end() + # End the child span + child_span.end() # Ensure data is sent langfuse.flush() @@ -635,6 +635,12 @@ def test_score_trace_nested_observation(): assert trace.name == trace_name assert len(trace.scores) == 1 + observations = trace.observations + assert len(observations) == 2 # Parent span and child span + # Do not verify the attributes, since we actually strip them on the server for Langfuse SDK spans. + # for obs in observations: + # assert obs.metadata["attributes"]["langfuse.trace.metadata.foo"] == "bar" + score = trace.scores[0] assert score.name == "valuation"