Skip to content

Commit e92b1f3

Browse files
committed
feat(evals): add run_batched_evaluation
1 parent aa7819f commit e92b1f3

File tree

4 files changed

+2731
-0
lines changed

4 files changed

+2731
-0
lines changed

langfuse/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
""".. include:: ../README.md"""
22

3+
from langfuse.batch_evaluation import (
4+
BatchEvaluationResult,
5+
BatchEvaluationResumeToken,
6+
CompositeEvaluatorFunction,
7+
EvaluatorInputs,
8+
EvaluatorStats,
9+
MapperFunction,
10+
)
311
from langfuse.experiment import Evaluation
412

513
from ._client import client as _client_module
@@ -41,6 +49,12 @@
4149
"LangfuseRetriever",
4250
"LangfuseGuardrail",
4351
"Evaluation",
52+
"EvaluatorInputs",
53+
"MapperFunction",
54+
"CompositeEvaluatorFunction",
55+
"EvaluatorStats",
56+
"BatchEvaluationResumeToken",
57+
"BatchEvaluationResult",
4458
"experiment",
4559
"api",
4660
]

langfuse/_client/client.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@
8686
Prompt_Chat,
8787
Prompt_Text,
8888
)
89+
from langfuse.batch_evaluation import (
90+
BatchEvaluationResult,
91+
BatchEvaluationResumeToken,
92+
CompositeEvaluatorFunction,
93+
MapperFunction,
94+
)
8995
from langfuse.experiment import (
9096
Evaluation,
9197
EvaluatorFunction,
@@ -2919,6 +2925,236 @@ def _create_experiment_run_name(
29192925

29202926
return f"{name} - {iso_timestamp}"
29212927

2928+
def run_batched_evaluation(
2929+
self,
2930+
*,
2931+
scope: Literal["traces", "observations", "sessions"],
2932+
mapper: MapperFunction,
2933+
evaluators: List[EvaluatorFunction],
2934+
filter: Optional[str] = None,
2935+
fetch_batch_size: int = 50,
2936+
max_items: Optional[int] = None,
2937+
max_concurrency: int = 50,
2938+
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
2939+
metadata: Optional[Dict[str, Any]] = None,
2940+
max_retries: int = 3,
2941+
verbose: bool = False,
2942+
resume_from: Optional[BatchEvaluationResumeToken] = None,
2943+
) -> BatchEvaluationResult:
2944+
"""Fetch traces, observations, or sessions and run evaluations on each item.
2945+
2946+
This method provides a powerful way to evaluate existing data in Langfuse at scale.
2947+
It fetches items based on filters, transforms them using a mapper function, runs
2948+
evaluators on each item, and creates scores that are linked back to the original
2949+
entities. This is ideal for:
2950+
2951+
- Running evaluations on production traces after deployment
2952+
- Backtesting new evaluation metrics on historical data
2953+
- Batch scoring of observations for quality monitoring
2954+
- Periodic evaluation runs on recent data
2955+
2956+
The method uses a streaming/pipeline approach to process items in batches, making
2957+
it memory-efficient for large datasets. It includes comprehensive error handling,
2958+
retry logic, and resume capability for long-running evaluations.
2959+
2960+
Args:
2961+
scope: The type of items to evaluate. Must be one of:
2962+
- "traces": Evaluate complete traces with all their observations
2963+
- "observations": Evaluate individual observations (spans, generations, events)
2964+
- "sessions": Evaluate entire sessions with multiple traces
2965+
mapper: Function that transforms API response objects into evaluator inputs.
2966+
Receives a trace/observation/session object and returns an EvaluatorInputs
2967+
instance with input, output, expected_output, and metadata fields.
2968+
Can be sync or async.
2969+
evaluators: List of evaluation functions to run on each item. Each evaluator
2970+
receives the mapped inputs and returns Evaluation object(s). Evaluator
2971+
failures are logged but don't stop the batch evaluation.
2972+
filter: Optional JSON filter string for querying items (same format as Langfuse API). Examples:
2973+
- '{"tags": ["production"]}'
2974+
- '{"user_id": "user123", "timestamp": {"operator": ">", "value": "2024-01-01"}}'
2975+
Default: None (fetches all items).
2976+
fetch_batch_size: Number of items to fetch per API call and hold in memory.
2977+
Larger values may be faster but use more memory. Default: 50.
2978+
max_items: Maximum total number of items to process. If None, processes all
2979+
items matching the filter. Useful for testing or limiting evaluation runs.
2980+
Default: None (process all).
2981+
max_concurrency: Maximum number of items to evaluate concurrently. Controls
2982+
parallelism and resource usage. Default: 50.
2983+
composite_evaluator: Optional function that creates a composite score from
2984+
item-level evaluations. Receives the original item and its evaluations,
2985+
returns a single Evaluation. Useful for weighted averages or combined metrics.
2986+
Default: None.
2987+
metadata: Optional metadata dict to add to all created scores. Useful for
2988+
tracking evaluation runs, versions, or other context. Default: None.
2989+
max_retries: Maximum number of retry attempts for failed batch fetches.
2990+
Uses exponential backoff (1s, 2s, 4s). Default: 3.
2991+
verbose: If True, logs progress information to console. Useful for monitoring
2992+
long-running evaluations. Default: False.
2993+
resume_from: Optional resume token from a previous incomplete run. Allows
2994+
continuing evaluation after interruption or failure. Default: None.
2995+
2996+
2997+
Returns:
2998+
BatchEvaluationResult containing:
2999+
- total_items_fetched: Number of items fetched from API
3000+
- total_items_processed: Number of items successfully evaluated
3001+
- total_items_failed: Number of items that failed evaluation
3002+
- total_scores_created: Scores created by item-level evaluators
3003+
- total_composite_scores_created: Scores created by composite evaluator
3004+
- total_evaluations_failed: Individual evaluator failures
3005+
- evaluator_stats: Per-evaluator statistics (success rate, scores created)
3006+
- resume_token: Token for resuming if incomplete (None if completed)
3007+
- completed: True if all items processed
3008+
- duration_seconds: Total execution time
3009+
- failed_item_ids: IDs of items that failed
3010+
- error_summary: Error types and counts
3011+
- has_more_items: True if max_items reached but more exist
3012+
3013+
Raises:
3014+
ValueError: If invalid scope is provided.
3015+
3016+
Examples:
3017+
Basic trace evaluation:
3018+
```python
3019+
from langfuse import Langfuse, EvaluatorInputs, Evaluation
3020+
3021+
client = Langfuse()
3022+
3023+
# Define mapper to extract fields from traces
3024+
def trace_mapper(trace):
3025+
return EvaluatorInputs(
3026+
input=trace.input,
3027+
output=trace.output,
3028+
expected_output=None,
3029+
metadata={"trace_id": trace.id}
3030+
)
3031+
3032+
# Define evaluator
3033+
def length_evaluator(*, input, output, expected_output, metadata):
3034+
return Evaluation(
3035+
name="output_length",
3036+
value=len(output) if output else 0
3037+
)
3038+
3039+
# Run batch evaluation
3040+
result = client.run_batched_evaluation(
3041+
scope="traces",
3042+
mapper=trace_mapper,
3043+
evaluators=[length_evaluator],
3044+
filter='{"tags": ["production"]}',
3045+
max_items=1000,
3046+
verbose=True
3047+
)
3048+
3049+
print(f"Processed {result.total_items_processed} traces")
3050+
print(f"Created {result.total_scores_created} scores")
3051+
```
3052+
3053+
Evaluation with composite scorer:
3054+
```python
3055+
def accuracy_evaluator(*, input, output, expected_output, metadata):
3056+
# ... evaluation logic
3057+
return Evaluation(name="accuracy", value=0.85)
3058+
3059+
def relevance_evaluator(*, input, output, expected_output, metadata):
3060+
# ... evaluation logic
3061+
return Evaluation(name="relevance", value=0.92)
3062+
3063+
def composite_evaluator(*, item, evaluations):
3064+
# Weighted average of evaluations
3065+
weights = {"accuracy": 0.6, "relevance": 0.4}
3066+
total = sum(
3067+
e.value * weights.get(e.name, 0)
3068+
for e in evaluations
3069+
if isinstance(e.value, (int, float))
3070+
)
3071+
return Evaluation(
3072+
name="composite_score",
3073+
value=total,
3074+
comment=f"Weighted average of {len(evaluations)} metrics"
3075+
)
3076+
3077+
result = client.run_batched_evaluation(
3078+
scope="traces",
3079+
mapper=trace_mapper,
3080+
evaluators=[accuracy_evaluator, relevance_evaluator],
3081+
composite_evaluator=composite_evaluator,
3082+
filter='{"user_id": "important_user"}',
3083+
verbose=True
3084+
)
3085+
```
3086+
3087+
Handling incomplete runs with resume:
3088+
```python
3089+
# Initial run that may fail or timeout
3090+
result = client.run_batched_evaluation(
3091+
scope="observations",
3092+
mapper=obs_mapper,
3093+
evaluators=[my_evaluator],
3094+
max_items=10000,
3095+
verbose=True
3096+
)
3097+
3098+
# Check if incomplete
3099+
if not result.completed and result.resume_token:
3100+
print(f"Processed {result.resume_token.items_processed} items before interruption")
3101+
3102+
# Resume from where it left off
3103+
result = client.run_batched_evaluation(
3104+
scope="observations",
3105+
mapper=obs_mapper,
3106+
evaluators=[my_evaluator],
3107+
resume_from=result.resume_token,
3108+
verbose=True
3109+
)
3110+
3111+
print(f"Total items processed: {result.total_items_processed}")
3112+
```
3113+
3114+
Monitoring evaluator performance:
3115+
```python
3116+
result = client.run_batched_evaluation(...)
3117+
3118+
for stats in result.evaluator_stats:
3119+
success_rate = stats.successful_runs / stats.total_runs
3120+
print(f"{stats.name}:")
3121+
print(f" Success rate: {success_rate:.1%}")
3122+
print(f" Scores created: {stats.total_scores_created}")
3123+
3124+
if stats.failed_runs > 0:
3125+
print(f" ⚠️ Failed {stats.failed_runs} times")
3126+
```
3127+
3128+
Note:
3129+
- Evaluator failures are logged but don't stop the batch evaluation
3130+
- Individual item failures are tracked but don't stop processing
3131+
- Fetch failures are retried with exponential backoff
3132+
- All scores are automatically flushed to Langfuse at the end
3133+
- The resume mechanism uses timestamp-based filtering to avoid duplicates
3134+
"""
3135+
from langfuse.batch_evaluation import BatchEvaluationRunner
3136+
3137+
runner = BatchEvaluationRunner(self)
3138+
return cast(
3139+
BatchEvaluationResult,
3140+
run_async_safely(
3141+
runner.run_async(
3142+
scope=scope,
3143+
mapper=mapper,
3144+
evaluators=evaluators,
3145+
filter=filter,
3146+
fetch_batch_size=fetch_batch_size,
3147+
max_items=max_items,
3148+
max_concurrency=max_concurrency,
3149+
composite_evaluator=composite_evaluator,
3150+
metadata=metadata,
3151+
max_retries=max_retries,
3152+
verbose=verbose,
3153+
resume_from=resume_from,
3154+
)
3155+
),
3156+
)
3157+
29223158
def auth_check(self) -> bool:
29233159
"""Check if the provided credentials (public and secret key) are valid.
29243160

0 commit comments

Comments
 (0)