Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ._client.constants import ObservationTypeLiteral
from ._client.get_client import get_client
from ._client.observe import observe
from ._client.propagation import propagate_attributes
from ._client.span import (
LangfuseAgent,
LangfuseChain,
Expand All @@ -26,6 +27,7 @@
"Langfuse",
"get_client",
"observe",
"propagate_attributes",
"ObservationTypeLiteral",
"LangfuseSpan",
"LangfuseGeneration",
Expand Down
12 changes: 11 additions & 1 deletion langfuse/_client/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ObservationTypeGenerationLike,
ObservationTypeSpanLike,
)

from langfuse._utils.serializer import EventSerializer
from langfuse.model import PromptClient
from langfuse.types import MapValue, SpanLevel
Expand Down Expand Up @@ -60,6 +59,17 @@ class LangfuseOtelSpanAttributes:
# Internal
AS_ROOT = "langfuse.internal.as_root"

# Experiments
EXPERIMENT_ID = "langfuse.experiment.id"
EXPERIMENT_NAME = "langfuse.experiment.name"
EXPERIMENT_DESCRIPTION = "langfuse.experiment.description"
EXPERIMENT_METADATA = "langfuse.experiment.metadata"
EXPERIMENT_DATASET_ID = "langfuse.experiment.dataset.id"
EXPERIMENT_ITEM_ID = "langfuse.experiment.item.id"
EXPERIMENT_ITEM_EXPECTED_OUTPUT = "langfuse.experiment.item.expected_output"
EXPERIMENT_ITEM_METADATA = "langfuse.experiment.item.metadata"
EXPERIMENT_ITEM_ROOT_OBSERVATION_ID = "langfuse.experiment.item.root_observation_id"


def create_trace_attributes(
*,
Expand Down
156 changes: 89 additions & 67 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import backoff
import httpx
from opentelemetry import trace
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
Expand All @@ -37,8 +36,9 @@
)
from packaging.version import Version

from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.attributes import LangfuseOtelSpanAttributes, _serialize
from langfuse._client.constants import (
LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT,
ObservationTypeGenerationLike,
ObservationTypeLiteral,
ObservationTypeLiteralNoEvent,
Expand All @@ -57,6 +57,10 @@
LANGFUSE_TRACING_ENABLED,
LANGFUSE_TRACING_ENVIRONMENT,
)
from langfuse._client.propagation import (
PropagatedExperimentAttributes,
_propagate_attributes,
)
from langfuse._client.resource_manager import LangfuseResourceManager
from langfuse._client.span import (
LangfuseAgent,
Expand All @@ -70,7 +74,7 @@
LangfuseSpan,
LangfuseTool,
)
from langfuse._client.utils import run_async_safely
from langfuse._client.utils import get_sha256_hash_hex, run_async_safely
from langfuse._utils import _get_timestamp
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
Expand Down Expand Up @@ -1638,10 +1642,6 @@ def update_current_trace(
) -> None:
"""Update the current trace with additional information.

This method updates the Langfuse trace that the current span belongs to. It's useful for
adding trace-level metadata like user ID, session ID, or tags that apply to
the entire Langfuse trace rather than just a single observation.

Args:
name: Updated name for the Langfuse trace
user_id: ID of the user who initiated the Langfuse trace
Expand All @@ -1653,25 +1653,8 @@ def update_current_trace(
tags: List of tags to categorize the Langfuse trace
public: Whether the Langfuse trace should be publicly accessible

Example:
```python
with langfuse.start_as_current_span(name="handle-request") as span:
# Get user information
user = authenticate_user(request)

# Update trace with user context
langfuse.update_current_trace(
user_id=user.id,
session_id=request.session_id,
tags=["production", "web-app"]
)

# Continue processing
response = process_request(request)

# Update span with results
span.update(output=response)
```
See Also:
:func:`langfuse.propagate_attributes`: Recommended replacement
"""
if not self._tracing_enabled:
langfuse_logger.debug(
Expand Down Expand Up @@ -1817,7 +1800,7 @@ def _create_remote_parent_span(
is_remote=False,
)

return trace.NonRecordingSpan(span_context)
return otel_trace_api.NonRecordingSpan(span_context)

def _is_valid_trace_id(self, trace_id: str) -> bool:
pattern = r"^[0-9a-f]{32}$"
Expand Down Expand Up @@ -2477,7 +2460,7 @@ def run_experiment(
evaluators: List[EvaluatorFunction] = [],
run_evaluators: List[RunEvaluatorFunction] = [],
max_concurrency: int = 50,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, str]] = None,
) -> ExperimentResult:
"""Run an experiment on a dataset with automatic tracing and evaluation.

Expand Down Expand Up @@ -2649,7 +2632,7 @@ def average_accuracy(*, item_results, **kwargs):
evaluators=evaluators or [],
run_evaluators=run_evaluators or [],
max_concurrency=max_concurrency,
metadata=metadata or {},
metadata=metadata,
),
),
)
Expand All @@ -2665,7 +2648,7 @@ async def _run_experiment_async(
evaluators: List[EvaluatorFunction],
run_evaluators: List[RunEvaluatorFunction],
max_concurrency: int,
metadata: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None,
) -> ExperimentResult:
langfuse_logger.debug(
f"Starting experiment '{name}' run '{run_name}' with {len(data)} items"
Expand Down Expand Up @@ -2763,85 +2746,123 @@ async def _process_experiment_item(
experiment_name: str,
experiment_run_name: str,
experiment_description: Optional[str],
experiment_metadata: Dict[str, Any],
experiment_metadata: Optional[Dict[str, Any]] = None,
) -> ExperimentItemResult:
# Execute task with tracing
span_name = "experiment-item-run"

with self.start_as_current_span(name=span_name) as span:
try:
output = await _run_task(task, item)

input_data = (
item.get("input")
if isinstance(item, dict)
else getattr(item, "input", None)
)

item_metadata: Dict[str, Any] = {}
if input_data is None:
raise ValueError("Experiment Item is missing input. Skipping item.")

expected_output = (
item.get("expected_output")
if isinstance(item, dict)
else getattr(item, "expected_output", None)
)

if isinstance(item, dict):
item_metadata = item.get("metadata", None) or {}
item_metadata = (
item.get("metadata")
if isinstance(item, dict)
else getattr(item, "metadata", None)
)

final_metadata = {
final_observation_metadata = {
"experiment_name": experiment_name,
"experiment_run_name": experiment_run_name,
**experiment_metadata,
**(experiment_metadata or {}),
}

if (
not isinstance(item, dict)
and hasattr(item, "dataset_id")
and hasattr(item, "id")
):
final_metadata.update(
{"dataset_id": item.dataset_id, "dataset_item_id": item.id}
)

if isinstance(item_metadata, dict):
final_metadata.update(item_metadata)

span.update(
input=input_data,
output=output,
metadata=final_metadata,
)

# Get trace ID for linking
trace_id = span.trace_id
dataset_id = None
dataset_item_id = None
dataset_run_id = None

# Link to dataset run if this is a dataset item
if hasattr(item, "id") and hasattr(item, "dataset_id"):
try:
dataset_run_item = self.api.dataset_run_items.create(
# Use sync API to avoid event loop issues when run_async_safely
# creates multiple event loops across different threads
dataset_run_item = await asyncio.to_thread(
self.api.dataset_run_items.create,
request=CreateDatasetRunItemRequest(
runName=experiment_run_name,
runDescription=experiment_description,
metadata=experiment_metadata,
datasetItemId=item.id, # type: ignore
traceId=trace_id,
observationId=span.id,
)
),
)

dataset_run_id = dataset_run_item.dataset_run_id

except Exception as e:
langfuse_logger.error(f"Failed to create dataset run item: {e}")

if (
not isinstance(item, dict)
and hasattr(item, "dataset_id")
and hasattr(item, "id")
):
dataset_id = item.dataset_id
dataset_item_id = item.id

final_observation_metadata.update(
{"dataset_id": dataset_id, "dataset_item_id": dataset_item_id}
)

if isinstance(item_metadata, dict):
final_observation_metadata.update(item_metadata)

experiment_id = dataset_run_id or self._create_observation_id()
experiment_item_id = (
dataset_item_id or get_sha256_hash_hex(_serialize(input_data))[:16]
)
span._otel_span.set_attributes(
{
k: v
for k, v in {
LangfuseOtelSpanAttributes.ENVIRONMENT: LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT,
LangfuseOtelSpanAttributes.EXPERIMENT_DESCRIPTION: experiment_description,
LangfuseOtelSpanAttributes.EXPERIMENT_ITEM_EXPECTED_OUTPUT: _serialize(
expected_output
),
}.items()
if v is not None
}
)

with _propagate_attributes(
experiment=PropagatedExperimentAttributes(
experiment_id=experiment_id,
experiment_name=experiment_run_name,
experiment_metadata=_serialize(experiment_metadata),
experiment_dataset_id=dataset_id,
experiment_item_id=experiment_item_id,
experiment_item_metadata=_serialize(item_metadata),
experiment_item_root_observation_id=span.id,
)
):
output = await _run_task(task, item)

span.update(
input=input_data,
output=output,
metadata=final_observation_metadata,
)

# Run evaluators
evaluations = []

for evaluator in evaluators:
try:
expected_output = None

if isinstance(item, dict):
expected_output = item.get("expected_output")
elif hasattr(item, "expected_output"):
expected_output = item.expected_output

eval_metadata: Optional[Dict[str, Any]] = None

if isinstance(item, dict):
Expand All @@ -2862,6 +2883,7 @@ async def _process_experiment_item(
for evaluation in eval_results:
self.create_score(
trace_id=trace_id,
observation_id=span.id,
name=evaluation.name,
value=evaluation.value, # type: ignore
comment=evaluation.comment,
Expand Down
4 changes: 3 additions & 1 deletion langfuse/_client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
This module defines constants used throughout the Langfuse OpenTelemetry integration.
"""

from typing import Literal, List, get_args, Union, Any
from typing import Any, List, Literal, Union, get_args

from typing_extensions import TypeAlias

LANGFUSE_TRACER_NAME = "langfuse-sdk"

LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT = "sdk-experiment"

"""Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat"""
ObservationTypeGenerationLike: TypeAlias = Literal[
Expand Down
Loading