Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions langfuse/_client/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from langfuse.model import PromptClient
from langfuse.types import MapValue, SpanLevel

from opentelemetry import baggage
import opentelemetry.context as otel_context


class LangfuseOtelSpanAttributes:
# Langfuse-Trace attributes
Expand Down Expand Up @@ -61,6 +64,16 @@ class LangfuseOtelSpanAttributes:
AS_ROOT = "langfuse.internal.as_root"


def propagate_attributes(
*,
current_ctx: otel_context.Context,
dict_to_propagate: Dict[str, Any],
) -> otel_context.Context:
for key, value in dict_to_propagate.items():
current_ctx = baggage.set_baggage(key, str(value), context=current_ctx)
return current_ctx
Comment on lines +72 to +74
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context is immutable, i.e. every set_baggage operation produces a new, updated context with the relevant properties. We update whatever context we get and return the new one.



def create_trace_attributes(
*,
name: Optional[str] = None,
Expand Down
21 changes: 18 additions & 3 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Literal,
Optional,
Expand All @@ -29,7 +30,9 @@
overload,
)

from contextlib import contextmanager
from opentelemetry import trace as otel_trace_api
from opentelemetry import context
from opentelemetry.util._decorator import _AgnosticContextManager

from langfuse.model import PromptClient
Expand All @@ -42,6 +45,7 @@
create_generation_attributes,
create_span_attributes,
create_trace_attributes,
propagate_attributes,
)
from langfuse._client.constants import (
ObservationTypeLiteral,
Expand Down Expand Up @@ -203,6 +207,7 @@ def end(self, *, end_time: Optional[int] = None) -> "LangfuseObservationWrapper"

return self

@contextmanager
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're turning this into a context manager as we need to detach the context at the end of the execution to avoid memory leaks and issues in async/multi-thread processing.

Every call like

    span.update_trace(name="foo")
    [...]
    # ongoing execution

must become

    with span.update_trace(name="foo"):
        [...]
        # ongoing execution

to ensure that all child spans will be started with the attached context.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this is a breaking change!

def update_trace(
self,
*,
Expand All @@ -215,7 +220,7 @@ def update_trace(
metadata: Optional[Any] = None,
tags: Optional[List[str]] = None,
public: Optional[bool] = None,
) -> "LangfuseObservationWrapper":
) -> Iterator[None]:
"""Update the trace that this span belongs to.

This method updates trace-level attributes of the trace that this span
Expand All @@ -234,7 +239,7 @@ def update_trace(
public: Whether the trace should be publicly accessible
"""
if not self._otel_span.is_recording():
return self
yield

media_processed_input = self._process_media_and_apply_mask(
data=input, field="input", span=self._otel_span
Expand All @@ -258,9 +263,19 @@ def update_trace(
public=public,
)

ctx = otel_trace_api.set_span_in_context(self._otel_span)
ctx = propagate_attributes(
current_ctx=ctx,
dict_to_propagate=attributes,
)

token = context.attach(ctx)
self._otel_span.set_attributes(attributes)

return self
try:
yield
finally:
context.detach(token)

@overload
def score(
Expand Down
18 changes: 17 additions & 1 deletion langfuse/_client/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import os
from typing import Dict, List, Optional

from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.processor.baggage import BaggageSpanProcessor, ALLOW_ALL_BAGGAGE_KEYS

from langfuse._client.constants import LANGFUSE_TRACER_NAME
from langfuse._client.environment_variables import (
Expand Down Expand Up @@ -65,6 +67,10 @@ def __init__(
else []
)

# Initialize a BaggageSpanProcessor so baggage keys are attached to spans.
# Allow all baggage keys by default (same behaviour as before).
self._baggage_processor = BaggageSpanProcessor(ALLOW_ALL_BAGGAGE_KEYS)

env_flush_at = os.environ.get(LANGFUSE_FLUSH_AT, None)
flush_at = flush_at or int(env_flush_at) if env_flush_at is not None else None

Expand Down Expand Up @@ -105,6 +111,16 @@ def __init__(
else None,
)

def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're injecting the baggage span processor in the on_start here to make use of it and call its own implementation: https://github.com/open-telemetry/opentelemetry-python-contrib/blob/main/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/processor.py.

The other overwrites are taken as-is from the SpanProcessor, i.e. there is no need to overwrite the on_end, etc.

# Forward to baggage processor first so baggage can be attached early.
self._baggage_processor.on_start(span, parent_context)

# Call parent on_start if it exists (no-op for BatchSpanProcessor but safe).
try:
super().on_start(span, parent_context)
except TypeError:
super().on_start(span)

def on_end(self, span: ReadableSpan) -> None:
# Only export spans that belong to the scoped project
# This is important to not send spans to wrong project in multi-project setups
Expand Down
21 changes: 19 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ requests = "^2"
opentelemetry-api = "^1.33.1"
opentelemetry-sdk = "^1.33.1"
opentelemetry-exporter-otlp-proto-http = "^1.33.1"
opentelemetry-processor-baggage = "^0.58b0"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.4,<9.0"
Expand Down
36 changes: 21 additions & 15 deletions tests/test_core_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,24 +606,24 @@ def test_score_trace_nested_observation():

# Create a parent span and set trace name
with langfuse.start_as_current_span(name="parent-span") as parent_span:
parent_span.update_trace(name=trace_name)

# Create a child span
child_span = langfuse.start_span(name="span")
# Set trace name and trace metadata as attributes on parent-span _and_ span using context manager.
with parent_span.update_trace(name=trace_name, metadata={"foo": "bar"}):
# Create a child span
child_span = langfuse.start_span(name="span")

# Score the child span
child_span.score(
name="valuation",
value=0.5,
comment="This is a comment",
)
# Score the child span
child_span.score(
name="valuation",
value=0.5,
comment="This is a comment",
)

# Get IDs for verification
child_span_id = child_span.id
trace_id = parent_span.trace_id
# Get IDs for verification
child_span_id = child_span.id
trace_id = parent_span.trace_id

# End the child span
child_span.end()
# End the child span
child_span.end()

# Ensure data is sent
langfuse.flush()
Expand All @@ -635,6 +635,12 @@ def test_score_trace_nested_observation():
assert trace.name == trace_name
assert len(trace.scores) == 1

observations = trace.observations
assert len(observations) == 2 # Parent span and child span
# Do not verify the attributes, since we actually strip them on the server for Langfuse SDK spans.
# for obs in observations:
# assert obs.metadata["attributes"]["langfuse.trace.metadata.foo"] == "bar"

score = trace.scores[0]

assert score.name == "valuation"
Expand Down
Loading