Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class Langfuse:
media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (`metadata.scope.name`)

Example:
```python
Expand Down Expand Up @@ -159,6 +160,7 @@ def __init__(
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
):
self._host = host or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
self._environment = environment or os.environ.get(LANGFUSE_TRACING_ENVIRONMENT)
Expand Down Expand Up @@ -220,6 +222,7 @@ def __init__(
sample_rate=sample_rate,
mask=mask,
tracing_enabled=self._tracing_enabled,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
)
self._mask = self._resources.mask

Expand Down
6 changes: 5 additions & 1 deletion langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os
import threading
from queue import Full, Queue
from typing import Any, Dict, Optional, cast
from typing import Any, Dict, List, Optional, cast

import httpx
from opentelemetry import trace as otel_trace_api
Expand Down Expand Up @@ -93,6 +93,7 @@ def __new__(
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
tracing_enabled: Optional[bool] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
) -> "LangfuseResourceManager":
if public_key in cls._instances:
return cls._instances[public_key]
Expand All @@ -117,6 +118,7 @@ def __new__(
tracing_enabled=tracing_enabled
if tracing_enabled is not None
else True,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
)

cls._instances[public_key] = instance
Expand All @@ -139,6 +141,7 @@ def _initialize_instance(
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
tracing_enabled: bool = True,
blocked_instrumentation_scopes: Optional[List[str]] = None,
):
self.public_key = public_key
self.secret_key = secret_key
Expand All @@ -159,6 +162,7 @@ def _initialize_instance(
timeout=timeout,
flush_at=flush_at,
flush_interval=flush_interval,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
)
tracer_provider.add_span_processor(langfuse_processor)

Expand Down
27 changes: 22 additions & 5 deletions langfuse/_client/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import base64
import os
from typing import Optional
from typing import List, Optional

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan
Expand All @@ -34,10 +34,11 @@ class LangfuseSpanProcessor(BatchSpanProcessor):

This processor extends OpenTelemetry's BatchSpanProcessor with Langfuse-specific functionality:
1. Project-scoped span filtering to prevent cross-project data leakage
2. Configurable batch processing parameters for optimal performance
3. HTTP-based span export to the Langfuse OTLP endpoint
4. Debug logging for span processing operations
5. Authentication with Langfuse API using Basic Auth
2. Instrumentation scope filtering to block spans from specific libraries/frameworks
3. Configurable batch processing parameters for optimal performance
4. HTTP-based span export to the Langfuse OTLP endpoint
5. Debug logging for span processing operations
6. Authentication with Langfuse API using Basic Auth

The processor is designed to efficiently handle large volumes of spans with
minimal overhead, while ensuring spans are only sent to the correct project.
Expand All @@ -54,8 +55,14 @@ def __init__(
timeout: Optional[int] = None,
flush_at: Optional[int] = None,
flush_interval: Optional[float] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
):
self.public_key = public_key
self.blocked_instrumentation_scopes = (
blocked_instrumentation_scopes
if blocked_instrumentation_scopes is not None
else []
)
flush_at = flush_at or int(os.environ.get(LANGFUSE_FLUSH_AT, 15))
flush_interval = flush_interval or float(
os.environ.get(LANGFUSE_FLUSH_INTERVAL, 0.5)
Expand Down Expand Up @@ -93,6 +100,10 @@ def on_end(self, span: ReadableSpan) -> None:
)
return

# Do not export spans from blocked instrumentation scopes
if self._is_blocked_instrumentation_scope(span):
return

langfuse_logger.debug(
f"Trace: Processing span name='{span._name}' | Full details:\n{span_formatter(span)}"
)
Expand All @@ -106,6 +117,12 @@ def _is_langfuse_span(span: ReadableSpan) -> bool:
and span.instrumentation_scope.name == LANGFUSE_TRACER_NAME
)

def _is_blocked_instrumentation_scope(self, span: ReadableSpan) -> bool:
return (
span.instrumentation_scope is not None
and span.instrumentation_scope.name in self.blocked_instrumentation_scopes
)

def _is_langfuse_project_span(self, span: ReadableSpan) -> bool:
if not LangfuseSpanProcessor._is_langfuse_span(span):
return False
Expand Down
Loading