diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 99c0382ac..100e3a053 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -102,6 +102,7 @@ class Langfuse: media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable. sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable. mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API. + blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (`metadata.scope.name`) Example: ```python @@ -159,6 +160,7 @@ def __init__( media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, + blocked_instrumentation_scopes: Optional[List[str]] = None, ): self._host = host or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") self._environment = environment or os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) @@ -220,6 +222,7 @@ def __init__( sample_rate=sample_rate, mask=mask, tracing_enabled=self._tracing_enabled, + blocked_instrumentation_scopes=blocked_instrumentation_scopes, ) self._mask = self._resources.mask diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index b0acd40d8..daa462177 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -18,7 +18,7 @@ import os import threading from queue import Full, Queue -from typing import Any, Dict, Optional, cast +from typing import Any, Dict, List, Optional, cast import httpx from opentelemetry import trace as otel_trace_api @@ -93,6 +93,7 @@ def __new__( sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, tracing_enabled: Optional[bool] = None, + blocked_instrumentation_scopes: Optional[List[str]] = None, ) -> "LangfuseResourceManager": if public_key in cls._instances: return cls._instances[public_key] @@ -117,6 +118,7 @@ def __new__( tracing_enabled=tracing_enabled if tracing_enabled is not None else True, + blocked_instrumentation_scopes=blocked_instrumentation_scopes, ) cls._instances[public_key] = instance @@ -139,6 +141,7 @@ def _initialize_instance( sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, tracing_enabled: bool = True, + blocked_instrumentation_scopes: Optional[List[str]] = None, ): self.public_key = public_key self.secret_key = secret_key @@ -159,6 +162,7 @@ def _initialize_instance( timeout=timeout, flush_at=flush_at, flush_interval=flush_interval, + blocked_instrumentation_scopes=blocked_instrumentation_scopes, ) tracer_provider.add_span_processor(langfuse_processor) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 12c7d6cb8..8dcb78fea 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -13,7 +13,7 @@ import base64 import os -from typing import Optional +from typing import List, Optional from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import ReadableSpan @@ -34,10 +34,11 @@ class LangfuseSpanProcessor(BatchSpanProcessor): This processor extends OpenTelemetry's BatchSpanProcessor with Langfuse-specific functionality: 1. Project-scoped span filtering to prevent cross-project data leakage - 2. Configurable batch processing parameters for optimal performance - 3. HTTP-based span export to the Langfuse OTLP endpoint - 4. Debug logging for span processing operations - 5. Authentication with Langfuse API using Basic Auth + 2. Instrumentation scope filtering to block spans from specific libraries/frameworks + 3. Configurable batch processing parameters for optimal performance + 4. HTTP-based span export to the Langfuse OTLP endpoint + 5. Debug logging for span processing operations + 6. Authentication with Langfuse API using Basic Auth The processor is designed to efficiently handle large volumes of spans with minimal overhead, while ensuring spans are only sent to the correct project. @@ -54,8 +55,14 @@ def __init__( timeout: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[float] = None, + blocked_instrumentation_scopes: Optional[List[str]] = None, ): self.public_key = public_key + self.blocked_instrumentation_scopes = ( + blocked_instrumentation_scopes + if blocked_instrumentation_scopes is not None + else [] + ) flush_at = flush_at or int(os.environ.get(LANGFUSE_FLUSH_AT, 15)) flush_interval = flush_interval or float( os.environ.get(LANGFUSE_FLUSH_INTERVAL, 0.5) @@ -93,6 +100,10 @@ def on_end(self, span: ReadableSpan) -> None: ) return + # Do not export spans from blocked instrumentation scopes + if self._is_blocked_instrumentation_scope(span): + return + langfuse_logger.debug( f"Trace: Processing span name='{span._name}' | Full details:\n{span_formatter(span)}" ) @@ -106,6 +117,12 @@ def _is_langfuse_span(span: ReadableSpan) -> bool: and span.instrumentation_scope.name == LANGFUSE_TRACER_NAME ) + def _is_blocked_instrumentation_scope(self, span: ReadableSpan) -> bool: + return ( + span.instrumentation_scope is not None + and span.instrumentation_scope.name in self.blocked_instrumentation_scopes + ) + def _is_langfuse_project_span(self, span: ReadableSpan) -> bool: if not LangfuseSpanProcessor._is_langfuse_span(span): return False diff --git a/tests/test_otel.py b/tests/test_otel.py index 5611f1c3f..acabc4896 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -45,7 +45,6 @@ def clear(self): self._finished_spans.clear() -@pytest.mark.otel class TestOTelBase: """Base class for OTEL tests with common fixtures and helper methods.""" @@ -84,6 +83,10 @@ def mock_init(self, **kwargs): from opentelemetry.sdk.trace.export import BatchSpanProcessor self.public_key = kwargs.get("public_key", "test-key") + blocked_scopes = kwargs.get("blocked_instrumentation_scopes") + self.blocked_instrumentation_scopes = ( + blocked_scopes if blocked_scopes is not None else [] + ) BatchSpanProcessor.__init__( self, span_exporter=memory_exporter, @@ -239,7 +242,6 @@ def assert_parent_child_relationship(self, parent_span: dict, child_span: dict): ), f"Child span {child_span['name']} should have same trace ID as parent {parent_span['name']}" -@pytest.mark.otel class TestBasicSpans(TestOTelBase): """Tests for basic span operations and attributes.""" @@ -695,7 +697,6 @@ def test_error_handling(self, langfuse_client, memory_exporter): ) -@pytest.mark.otel class TestAdvancedSpans(TestOTelBase): """Tests for advanced span functionality including generations, timing, and usage metrics.""" @@ -953,7 +954,6 @@ def test_trace_id_generation(self, langfuse_client): assert trace_id1 != trace_id2, "Generated trace IDs should be unique" -@pytest.mark.otel class TestMetadataHandling(TestOTelBase): """Tests for metadata serialization, updates, and integrity.""" @@ -1771,7 +1771,298 @@ def test_sdk_client_isolation(self, multi_project_setup): assert proj1_tracer != proj2_tracer -@pytest.mark.otel +class TestInstrumentationScopeFiltering(TestOTelBase): + """Tests for filtering spans by instrumentation scope names.""" + + @pytest.fixture(scope="function") + def instrumentation_filtering_setup(self, monkeypatch): + """Create setup for testing instrumentation scope filtering with actual span export.""" + from opentelemetry import trace as trace_api_reset + from opentelemetry.sdk.trace import TracerProvider + + original_provider = trace_api_reset.get_tracer_provider() + + # Create separate exporters for blocked and allowed scopes testing + blocked_exporter = InMemorySpanExporter() + + import uuid + + unique_suffix = str(uuid.uuid4())[:8] + test_key = f"test_key_{unique_suffix}" + + # Clear singleton instances to avoid cross-test contamination + monkeypatch.setattr(LangfuseResourceManager, "_instances", {}) + + # Mock the LangfuseSpanProcessor to use our test exporters + def mock_processor_init(self, **kwargs): + from opentelemetry.sdk.trace.export import BatchSpanProcessor + + self.public_key = kwargs.get("public_key", "test-key") + blocked_scopes = kwargs.get("blocked_instrumentation_scopes") + self.blocked_instrumentation_scopes = ( + blocked_scopes if blocked_scopes is not None else [] + ) + + # For testing, use the appropriate exporter based on setup + exporter = kwargs.get("_test_exporter", blocked_exporter) + + BatchSpanProcessor.__init__( + self, + span_exporter=exporter, + max_export_batch_size=512, + schedule_delay_millis=5000, + ) + + monkeypatch.setattr( + "langfuse._client.span_processor.LangfuseSpanProcessor.__init__", + mock_processor_init, + ) + + # Create test tracer provider that will be used for all spans + test_tracer_provider = TracerProvider() + + # We'll add the LangfuseSpanProcessor to this provider after it's created + # in the mock_initialize function below + + # Mock resource manager initialization to use our test setup + original_initialize = LangfuseResourceManager._initialize_instance + + def mock_initialize(self, **kwargs): + # Call original_initialize to set up all the necessary attributes + original_initialize(self, **kwargs) + + # Now create our custom LangfuseSpanProcessor with the actual blocked_instrumentation_scopes + from langfuse._client.span_processor import LangfuseSpanProcessor + + processor = LangfuseSpanProcessor( + public_key=self.public_key, + secret_key=self.secret_key, + host=self.host, + blocked_instrumentation_scopes=kwargs.get( + "blocked_instrumentation_scopes" + ), + ) + # Replace its exporter with our test exporter + processor._span_exporter = blocked_exporter + + # Add the processor to our test tracer provider + test_tracer_provider.add_span_processor(processor) + + # Override the tracer to use our test tracer provider + self._otel_tracer = test_tracer_provider.get_tracer( + "langfuse-sdk", "test", attributes={"public_key": self.public_key} + ) + + monkeypatch.setattr( + LangfuseResourceManager, "_initialize_instance", mock_initialize + ) + + setup = { + "blocked_exporter": blocked_exporter, + "test_key": test_key, + "test_tracer_provider": test_tracer_provider, + "original_provider": original_provider, + "original_initialize": original_initialize, + } + + yield setup + + # Clean up + trace_api_reset.set_tracer_provider(original_provider) + monkeypatch.setattr( + LangfuseResourceManager, "_initialize_instance", original_initialize + ) + blocked_exporter.shutdown() + + def test_blocked_instrumentation_scopes_export_filtering( + self, instrumentation_filtering_setup + ): + """Test that spans from blocked instrumentation scopes are not exported.""" + # Create Langfuse client with blocked scopes + Langfuse( + public_key=instrumentation_filtering_setup["test_key"], + secret_key="test-secret-key", + host="http://localhost:3000", + blocked_instrumentation_scopes=["openai", "anthropic"], + ) + + # Get the tracer provider and create different instrumentation scope tracers + tracer_provider = instrumentation_filtering_setup["test_tracer_provider"] + + # Create langfuse tracer with proper attributes for project validation + langfuse_tracer = tracer_provider.get_tracer( + "langfuse-sdk", + attributes={"public_key": instrumentation_filtering_setup["test_key"]}, + ) + openai_tracer = tracer_provider.get_tracer("openai") + anthropic_tracer = tracer_provider.get_tracer("anthropic") + allowed_tracer = tracer_provider.get_tracer("allowed-library") + + # Create spans from each tracer + langfuse_span = langfuse_tracer.start_span("langfuse-span") + langfuse_span.end() + + openai_span = openai_tracer.start_span("openai-span") + openai_span.end() + + anthropic_span = anthropic_tracer.start_span("anthropic-span") + anthropic_span.end() + + allowed_span = allowed_tracer.start_span("allowed-span") + allowed_span.end() + + # Force flush to ensure all spans are processed + tracer_provider.force_flush() + + # Check which spans were actually exported + exported_spans = instrumentation_filtering_setup[ + "blocked_exporter" + ].get_finished_spans() + exported_span_names = [span.name for span in exported_spans] + exported_scope_names = [ + span.instrumentation_scope.name + for span in exported_spans + if span.instrumentation_scope + ] + + # Langfuse spans should be exported (not blocked) + assert "langfuse-span" in exported_span_names + assert "langfuse-sdk" in exported_scope_names + + # Blocked scopes should NOT be exported + assert "openai-span" not in exported_span_names + assert "anthropic-span" not in exported_span_names + assert "openai" not in exported_scope_names + assert "anthropic" not in exported_scope_names + + # Allowed scopes should be exported + assert "allowed-span" in exported_span_names + assert "allowed-library" in exported_scope_names + + def test_no_blocked_scopes_allows_all_exports( + self, instrumentation_filtering_setup + ): + """Test that when no scopes are blocked, all spans are exported.""" + # Create Langfuse client with NO blocked scopes + Langfuse( + public_key=instrumentation_filtering_setup["test_key"], + secret_key="test-secret-key", + host="http://localhost:3000", + blocked_instrumentation_scopes=[], + ) + + # Get the tracer provider and create different instrumentation scope tracers + tracer_provider = instrumentation_filtering_setup["test_tracer_provider"] + + langfuse_tracer = tracer_provider.get_tracer( + "langfuse-sdk", + attributes={"public_key": instrumentation_filtering_setup["test_key"]}, + ) + openai_tracer = tracer_provider.get_tracer("openai") + anthropic_tracer = tracer_provider.get_tracer("anthropic") + + # Create spans from each tracer + langfuse_span = langfuse_tracer.start_span("langfuse-span") + langfuse_span.end() + + openai_span = openai_tracer.start_span("openai-span") + openai_span.end() + + anthropic_span = anthropic_tracer.start_span("anthropic-span") + anthropic_span.end() + + # Force flush + tracer_provider.force_flush() + + # Check that ALL spans were exported + exported_spans = instrumentation_filtering_setup[ + "blocked_exporter" + ].get_finished_spans() + exported_span_names = [span.name for span in exported_spans] + + assert "langfuse-span" in exported_span_names + assert "openai-span" in exported_span_names + assert "anthropic-span" in exported_span_names + + def test_none_blocked_scopes_allows_all_exports( + self, instrumentation_filtering_setup + ): + """Test that when blocked_scopes is None (default), all spans are exported.""" + # Create Langfuse client with None blocked scopes (default behavior) + Langfuse( + public_key=instrumentation_filtering_setup["test_key"], + secret_key="test-secret-key", + host="http://localhost:3000", + blocked_instrumentation_scopes=None, + ) + + # Get the tracer provider and create different instrumentation scope tracers + tracer_provider = instrumentation_filtering_setup["test_tracer_provider"] + + langfuse_tracer = tracer_provider.get_tracer( + "langfuse-sdk", + attributes={"public_key": instrumentation_filtering_setup["test_key"]}, + ) + openai_tracer = tracer_provider.get_tracer("openai") + + # Create spans from each tracer + langfuse_span = langfuse_tracer.start_span("langfuse-span") + langfuse_span.end() + + openai_span = openai_tracer.start_span("openai-span") + openai_span.end() + + # Force flush + tracer_provider.force_flush() + + # Check that ALL spans were exported + exported_spans = instrumentation_filtering_setup[ + "blocked_exporter" + ].get_finished_spans() + exported_span_names = [span.name for span in exported_spans] + + assert "langfuse-span" in exported_span_names + assert "openai-span" in exported_span_names + + def test_blocking_langfuse_sdk_scope_export(self, instrumentation_filtering_setup): + """Test that even Langfuse's own spans are blocked if explicitly specified.""" + # Create Langfuse client that blocks its own instrumentation scope + Langfuse( + public_key=instrumentation_filtering_setup["test_key"], + secret_key="test-secret-key", + host="http://localhost:3000", + blocked_instrumentation_scopes=["langfuse-sdk"], + ) + + # Get the tracer provider and create tracers + tracer_provider = instrumentation_filtering_setup["test_tracer_provider"] + + langfuse_tracer = tracer_provider.get_tracer( + "langfuse-sdk", + attributes={"public_key": instrumentation_filtering_setup["test_key"]}, + ) + other_tracer = tracer_provider.get_tracer("other-library") + + # Create spans + langfuse_span = langfuse_tracer.start_span("langfuse-span") + langfuse_span.end() + + other_span = other_tracer.start_span("other-span") + other_span.end() + + # Force flush + tracer_provider.force_flush() + + # Check exports - Langfuse spans should be blocked, others allowed + exported_spans = instrumentation_filtering_setup[ + "blocked_exporter" + ].get_finished_spans() + exported_span_names = [span.name for span in exported_spans] + + assert "langfuse-span" not in exported_span_names + assert "other-span" in exported_span_names + + class TestConcurrencyAndAsync(TestOTelBase): """Tests for asynchronous and concurrent span operations.""" @@ -2134,7 +2425,6 @@ def test_metrics_and_timing(self, langfuse_client, memory_exporter): # Add tests for media functionality in its own class -@pytest.mark.otel class TestMediaHandling(TestOTelBase): """Tests for media object handling, upload, and references."""