diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 9fa9c7489..c078d995d 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -31,6 +31,7 @@ from opentelemetry import trace as otel_trace_api from opentelemetry.util._decorator import _AgnosticContextManager +from opentelemetry.trace.status import Status, StatusCode from langfuse.model import PromptClient @@ -188,6 +189,8 @@ def __init__( self._otel_span.set_attributes( {k: v for k, v in attributes.items() if v is not None} ) + # Set OTEL span status if level is ERROR + self._set_otel_span_status_if_error(level=level, status_message=status_message) def end(self, *, end_time: Optional[int] = None) -> "LangfuseObservationWrapper": """End the span, marking it as completed. @@ -540,6 +543,28 @@ def _process_media_in_attribute( return data + def _set_otel_span_status_if_error( + self, *, level: Optional[SpanLevel] = None, status_message: Optional[str] = None + ) -> None: + """Set OpenTelemetry span status to ERROR if level is ERROR. + + This method sets the underlying OpenTelemetry span status to ERROR when the + Langfuse observation level is set to ERROR, ensuring consistency between + Langfuse and OpenTelemetry error states. + + Args: + level: The span level to check + status_message: Optional status message to include as description + """ + if level == "ERROR" and self._otel_span.is_recording(): + try: + self._otel_span.set_status( + Status(StatusCode.ERROR, description=status_message) + ) + except Exception: + # Silently ignore any errors when setting OTEL status to avoid existing flow disruptions + pass + def update( self, *, @@ -636,6 +661,8 @@ def update( ) self._otel_span.set_attributes(attributes=attributes) + # Set OTEL span status if level is ERROR + self._set_otel_span_status_if_error(level=level, status_message=status_message) return self diff --git a/tests/test_otel.py b/tests/test_otel.py index fd29ce671..623e866b5 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -944,6 +944,232 @@ def test_error_handling(self, langfuse_client, memory_exporter): == "Test error message" ) + def test_error_level_in_span_creation(self, langfuse_client, memory_exporter): + """Test that OTEL span status is set to ERROR when creating spans with level='ERROR'.""" + # Create a span with level="ERROR" at creation time + span = langfuse_client.start_span( + name="create-error-span", + level="ERROR", + status_message="Initial error state" + ) + span.end() + + # Get the raw OTEL spans to check the status + raw_spans = [ + s for s in memory_exporter.get_finished_spans() + if s.name == "create-error-span" + ] + assert len(raw_spans) == 1, "Expected one span" + raw_span = raw_spans[0] + + # Verify OTEL span status was set to ERROR + from opentelemetry.trace.status import StatusCode + assert raw_span.status.status_code == StatusCode.ERROR + assert raw_span.status.description == "Initial error state" + + # Also verify Langfuse attributes + spans = self.get_spans_by_name(memory_exporter, "create-error-span") + span_data = spans[0] + attributes = span_data["attributes"] + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR" + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Initial error state" + + def test_error_level_in_span_update(self, langfuse_client, memory_exporter): + """Test that OTEL span status is set to ERROR when updating spans to level='ERROR'.""" + # Create a normal span + span = langfuse_client.start_span(name="update-error-span", level="INFO") + + # Update it to ERROR level + span.update(level="ERROR", status_message="Updated to error state") + span.end() + + # Get the raw OTEL spans to check the status + raw_spans = [ + s for s in memory_exporter.get_finished_spans() + if s.name == "update-error-span" + ] + assert len(raw_spans) == 1, "Expected one span" + raw_span = raw_spans[0] + + # Verify OTEL span status was set to ERROR + from opentelemetry.trace.status import StatusCode + assert raw_span.status.status_code == StatusCode.ERROR + assert raw_span.status.description == "Updated to error state" + + # Also verify Langfuse attributes + spans = self.get_spans_by_name(memory_exporter, "update-error-span") + span_data = spans[0] + attributes = span_data["attributes"] + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR" + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Updated to error state" + + def test_generation_error_level_in_creation(self, langfuse_client, memory_exporter): + """Test that OTEL span status is set to ERROR when creating generations with level='ERROR'.""" + # Create a generation with level="ERROR" at creation time + generation = langfuse_client.start_generation( + name="create-error-generation", + model="gpt-4", + level="ERROR", + status_message="Generation failed during creation" + ) + generation.end() + + # Get the raw OTEL spans to check the status + raw_spans = [ + s for s in memory_exporter.get_finished_spans() + if s.name == "create-error-generation" + ] + assert len(raw_spans) == 1, "Expected one span" + raw_span = raw_spans[0] + + # Verify OTEL span status was set to ERROR + from opentelemetry.trace.status import StatusCode + assert raw_span.status.status_code == StatusCode.ERROR + assert raw_span.status.description == "Generation failed during creation" + + # Also verify Langfuse attributes + spans = self.get_spans_by_name(memory_exporter, "create-error-generation") + span_data = spans[0] + attributes = span_data["attributes"] + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR" + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Generation failed during creation" + + def test_generation_error_level_in_update(self, langfuse_client, memory_exporter): + """Test that OTEL span status is set to ERROR when updating generations to level='ERROR'.""" + # Create a normal generation + generation = langfuse_client.start_generation( + name="update-error-generation", + model="gpt-4", + level="INFO" + ) + + # Update it to ERROR level + generation.update(level="ERROR", status_message="Generation failed during execution") + generation.end() + + # Get the raw OTEL spans to check the status + raw_spans = [ + s for s in memory_exporter.get_finished_spans() + if s.name == "update-error-generation" + ] + assert len(raw_spans) == 1, "Expected one span" + raw_span = raw_spans[0] + + # Verify OTEL span status was set to ERROR + from opentelemetry.trace.status import StatusCode + assert raw_span.status.status_code == StatusCode.ERROR + assert raw_span.status.description == "Generation failed during execution" + + # Also verify Langfuse attributes + spans = self.get_spans_by_name(memory_exporter, "update-error-generation") + span_data = spans[0] + attributes = span_data["attributes"] + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_LEVEL] == "ERROR" + assert attributes[LangfuseOtelSpanAttributes.OBSERVATION_STATUS_MESSAGE] == "Generation failed during execution" + + def test_non_error_levels_dont_set_otel_status(self, langfuse_client, memory_exporter): + """Test that non-ERROR levels don't set OTEL span status to ERROR.""" + # Test different non-error levels + test_levels = ["INFO", "WARNING", "DEBUG", None] + + for i, level in enumerate(test_levels): + span_name = f"non-error-span-{i}" + span = langfuse_client.start_span(name=span_name, level=level) + + # Update with same level to test update path too + if level is not None: + span.update(level=level, status_message="Not an error") + + span.end() + + # Get the raw OTEL spans to check the status + raw_spans = [ + s for s in memory_exporter.get_finished_spans() + if s.name == span_name + ] + assert len(raw_spans) == 1, f"Expected one span for {span_name}" + raw_span = raw_spans[0] + + # Verify OTEL span status was NOT set to ERROR + from opentelemetry.trace.status import StatusCode + # Default status should be UNSET, not ERROR + assert raw_span.status.status_code != StatusCode.ERROR, f"Level {level} should not set ERROR status" + + def test_multiple_error_updates(self, langfuse_client, memory_exporter): + """Test that multiple ERROR level updates work correctly.""" + # Create a span + span = langfuse_client.start_span(name="multi-error-span") + + # First error update + span.update(level="ERROR", status_message="First error") + + # Second error update - should overwrite the first + span.update(level="ERROR", status_message="Second error") + + span.end() + + # Get the raw OTEL spans to check the status + raw_spans = [ + s for s in memory_exporter.get_finished_spans() + if s.name == "multi-error-span" + ] + assert len(raw_spans) == 1, "Expected one span" + raw_span = raw_spans[0] + + # Verify OTEL span status shows the last error message + from opentelemetry.trace.status import StatusCode + assert raw_span.status.status_code == StatusCode.ERROR + assert raw_span.status.description == "Second error" + + def test_error_without_status_message(self, langfuse_client, memory_exporter): + """Test that ERROR level works even without status_message.""" + # Create a span with ERROR level but no status message + span = langfuse_client.start_span(name="error-no-message-span", level="ERROR") + span.end() + + # Get the raw OTEL spans to check the status + raw_spans = [ + s for s in memory_exporter.get_finished_spans() + if s.name == "error-no-message-span" + ] + assert len(raw_spans) == 1, "Expected one span" + raw_span = raw_spans[0] + + # Verify OTEL span status was set to ERROR even without description + from opentelemetry.trace.status import StatusCode + assert raw_span.status.status_code == StatusCode.ERROR + # Description should be None when no status_message provided + assert raw_span.status.description is None + + def test_different_observation_types_error_handling(self, langfuse_client, memory_exporter): + """Test that ERROR level setting works for different observation types.""" + # Test different observation types + observation_types = ["agent", "tool", "chain", "retriever", "evaluator", "embedding", "guardrail"] + + # Create a parent span for child observations + with langfuse_client.start_as_current_span(name="error-test-parent") as parent: + for obs_type in observation_types: + # Create observation with ERROR level + obs = parent.start_observation( + name=f"error-{obs_type}", + as_type=obs_type, + level="ERROR", + status_message=f"{obs_type} failed" + ) + obs.end() + + # Check that all observations have correct OTEL status + raw_spans = memory_exporter.get_finished_spans() + + for obs_type in observation_types: + obs_spans = [s for s in raw_spans if s.name == f"error-{obs_type}"] + assert len(obs_spans) == 1, f"Expected one span for {obs_type}" + + raw_span = obs_spans[0] + from opentelemetry.trace.status import StatusCode + assert raw_span.status.status_code == StatusCode.ERROR, f"{obs_type} should have ERROR status" + assert raw_span.status.description == f"{obs_type} failed", f"{obs_type} should have correct description" + class TestAdvancedSpans(TestOTelBase): """Tests for advanced span functionality including generations, timing, and usage metrics."""