Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from opentelemetry import trace as otel_trace_api
from opentelemetry.util._decorator import _AgnosticContextManager
from opentelemetry.trace.status import Status, StatusCode

from langfuse.model import PromptClient

Expand Down Expand Up @@ -188,6 +189,8 @@ def __init__(
self._otel_span.set_attributes(
{k: v for k, v in attributes.items() if v is not None}
)
# Set OTEL span status if level is ERROR
self._set_otel_span_status_if_error(level=level, status_message=status_message)

def end(self, *, end_time: Optional[int] = None) -> "LangfuseObservationWrapper":
"""End the span, marking it as completed.
Expand Down Expand Up @@ -540,6 +543,28 @@ def _process_media_in_attribute(

return data

def _set_otel_span_status_if_error(
self, *, level: Optional[SpanLevel] = None, status_message: Optional[str] = None
) -> None:
"""Set OpenTelemetry span status to ERROR if level is ERROR.

This method sets the underlying OpenTelemetry span status to ERROR when the
Langfuse observation level is set to ERROR, ensuring consistency between
Langfuse and OpenTelemetry error states.

Args:
level: The span level to check
status_message: Optional status message to include as description
"""
if level == "ERROR" and self._otel_span.is_recording():
try:
self._otel_span.set_status(
Status(StatusCode.ERROR, description=status_message)
)
except Exception:
# Silently ignore any errors when setting OTEL status to avoid existing flow disruptions
pass

def update(
self,
*,
Expand Down Expand Up @@ -636,6 +661,8 @@ def update(
)

self._otel_span.set_attributes(attributes=attributes)
# Set OTEL span status if level is ERROR
self._set_otel_span_status_if_error(level=level, status_message=status_message)

return self

Expand Down
226 changes: 226 additions & 0 deletions tests/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,232 @@ def test_error_handling(self, langfuse_client, memory_exporter):
== "Test error message"
)

def test_error_level_in_span_creation(self, langfuse_client, memory_exporter):
"""Test that OTEL span status is set to ERROR when creating spans with level='ERROR'."""
# Create a span with level="ERROR" at creation time
span = langfuse_client.start_span(
name="create-error-span",
level="ERROR",
status_message="Initial error state"
)
span.end()

# Get the raw OTEL spans to check the status
raw_spans = [
s for s in memory_exporter.get_finished_spans()
if s.name == "create-error-span"
]
assert len(raw_spans) == 1, "Expected one span"
raw_span = raw_spans[0]

# Verify OTEL span status was set to ERROR
from opentelemetry.trace.status import StatusCode
assert raw_span.status.status_code == StatusCode.ERROR
assert raw_span.status.description == "Initial error state"

# Also verify Langfuse attributes
spans = self.get_spans_by_name(memory_exporter, "create-error-span")
span_data = spans[0]
attributes = span_data["attributes"]
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR"
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Initial error state"

def test_error_level_in_span_update(self, langfuse_client, memory_exporter):
"""Test that OTEL span status is set to ERROR when updating spans to level='ERROR'."""
# Create a normal span
span = langfuse_client.start_span(name="update-error-span", level="INFO")

# Update it to ERROR level
span.update(level="ERROR", status_message="Updated to error state")
span.end()

# Get the raw OTEL spans to check the status
raw_spans = [
s for s in memory_exporter.get_finished_spans()
if s.name == "update-error-span"
]
assert len(raw_spans) == 1, "Expected one span"
raw_span = raw_spans[0]

# Verify OTEL span status was set to ERROR
from opentelemetry.trace.status import StatusCode
assert raw_span.status.status_code == StatusCode.ERROR
assert raw_span.status.description == "Updated to error state"

# Also verify Langfuse attributes
spans = self.get_spans_by_name(memory_exporter, "update-error-span")
span_data = spans[0]
attributes = span_data["attributes"]
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR"
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Updated to error state"

def test_generation_error_level_in_creation(self, langfuse_client, memory_exporter):
"""Test that OTEL span status is set to ERROR when creating generations with level='ERROR'."""
# Create a generation with level="ERROR" at creation time
generation = langfuse_client.start_generation(
name="create-error-generation",
model="gpt-4",
level="ERROR",
status_message="Generation failed during creation"
)
generation.end()

# Get the raw OTEL spans to check the status
raw_spans = [
s for s in memory_exporter.get_finished_spans()
if s.name == "create-error-generation"
]
assert len(raw_spans) == 1, "Expected one span"
raw_span = raw_spans[0]

# Verify OTEL span status was set to ERROR
from opentelemetry.trace.status import StatusCode
assert raw_span.status.status_code == StatusCode.ERROR
assert raw_span.status.description == "Generation failed during creation"

# Also verify Langfuse attributes
spans = self.get_spans_by_name(memory_exporter, "create-error-generation")
span_data = spans[0]
attributes = span_data["attributes"]
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR"
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Generation failed during creation"

def test_generation_error_level_in_update(self, langfuse_client, memory_exporter):
"""Test that OTEL span status is set to ERROR when updating generations to level='ERROR'."""
# Create a normal generation
generation = langfuse_client.start_generation(
name="update-error-generation",
model="gpt-4",
level="INFO"
)

# Update it to ERROR level
generation.update(level="ERROR", status_message="Generation failed during execution")
generation.end()

# Get the raw OTEL spans to check the status
raw_spans = [
s for s in memory_exporter.get_finished_spans()
if s.name == "update-error-generation"
]
assert len(raw_spans) == 1, "Expected one span"
raw_span = raw_spans[0]

# Verify OTEL span status was set to ERROR
from opentelemetry.trace.status import StatusCode
assert raw_span.status.status_code == StatusCode.ERROR
assert raw_span.status.description == "Generation failed during execution"

# Also verify Langfuse attributes
spans = self.get_spans_by_name(memory_exporter, "update-error-generation")
span_data = spans[0]
attributes = span_data["attributes"]
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR"
assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Generation failed during execution"

def test_non_error_levels_dont_set_otel_status(self, langfuse_client, memory_exporter):
"""Test that non-ERROR levels don't set OTEL span status to ERROR."""
# Test different non-error levels
test_levels = ["INFO", "WARNING", "DEBUG", None]

for i, level in enumerate(test_levels):
span_name = f"non-error-span-{i}"
span = langfuse_client.start_span(name=span_name, level=level)

# Update with same level to test update path too
if level is not None:
span.update(level=level, status_message="Not an error")

span.end()

# Get the raw OTEL spans to check the status
raw_spans = [
s for s in memory_exporter.get_finished_spans()
if s.name == span_name
]
assert len(raw_spans) == 1, f"Expected one span for {span_name}"
raw_span = raw_spans[0]

# Verify OTEL span status was NOT set to ERROR
from opentelemetry.trace.status import StatusCode
# Default status should be UNSET, not ERROR
assert raw_span.status.status_code != StatusCode.ERROR, f"Level {level} should not set ERROR status"

def test_multiple_error_updates(self, langfuse_client, memory_exporter):
"""Test that multiple ERROR level updates work correctly."""
# Create a span
span = langfuse_client.start_span(name="multi-error-span")

# First error update
span.update(level="ERROR", status_message="First error")

# Second error update - should overwrite the first
span.update(level="ERROR", status_message="Second error")

span.end()

# Get the raw OTEL spans to check the status
raw_spans = [
s for s in memory_exporter.get_finished_spans()
if s.name == "multi-error-span"
]
assert len(raw_spans) == 1, "Expected one span"
raw_span = raw_spans[0]

# Verify OTEL span status shows the last error message
from opentelemetry.trace.status import StatusCode
assert raw_span.status.status_code == StatusCode.ERROR
assert raw_span.status.description == "Second error"

def test_error_without_status_message(self, langfuse_client, memory_exporter):
"""Test that ERROR level works even without status_message."""
# Create a span with ERROR level but no status message
span = langfuse_client.start_span(name="error-no-message-span", level="ERROR")
span.end()

# Get the raw OTEL spans to check the status
raw_spans = [
s for s in memory_exporter.get_finished_spans()
if s.name == "error-no-message-span"
]
assert len(raw_spans) == 1, "Expected one span"
raw_span = raw_spans[0]

# Verify OTEL span status was set to ERROR even without description
from opentelemetry.trace.status import StatusCode
assert raw_span.status.status_code == StatusCode.ERROR
# Description should be None when no status_message provided
assert raw_span.status.description is None

def test_different_observation_types_error_handling(self, langfuse_client, memory_exporter):
"""Test that ERROR level setting works for different observation types."""
# Test different observation types
observation_types = ["agent", "tool", "chain", "retriever", "evaluator", "embedding", "guardrail"]

# Create a parent span for child observations
with langfuse_client.start_as_current_span(name="error-test-parent") as parent:
for obs_type in observation_types:
# Create observation with ERROR level
obs = parent.start_observation(
name=f"error-{obs_type}",
as_type=obs_type,
level="ERROR",
status_message=f"{obs_type} failed"
)
obs.end()

# Check that all observations have correct OTEL status
raw_spans = memory_exporter.get_finished_spans()

for obs_type in observation_types:
obs_spans = [s for s in raw_spans if s.name == f"error-{obs_type}"]
assert len(obs_spans) == 1, f"Expected one span for {obs_type}"

raw_span = obs_spans[0]
from opentelemetry.trace.status import StatusCode
assert raw_span.status.status_code == StatusCode.ERROR, f"{obs_type} should have ERROR status"
assert raw_span.status.description == f"{obs_type} failed", f"{obs_type} should have correct description"


class TestAdvancedSpans(TestOTelBase):
"""Tests for advanced span functionality including generations, timing, and usage metrics."""
Expand Down