diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0cb4cb3d6..344225dcb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,7 +88,7 @@ jobs: - name: Setup node (for langfuse server) uses: actions/setup-node@v3 with: - node-version: 20 + node-version: 24 - name: Cache langfuse server dependencies uses: actions/cache@v3 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 59073b603..b62426d7b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.3.2 + rev: v0.14.4 hooks: # Run the linter and fix - id: ruff @@ -10,6 +10,7 @@ repos: # Run the formatter. - id: ruff-format types_or: [python, pyi, jupyter] + args: [--config=ci.ruff.toml] - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.18.2 diff --git a/langfuse/__init__.py b/langfuse/__init__.py index f96b18bc8..ab4ceedb1 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -1,5 +1,13 @@ """.. include:: ../README.md""" +from langfuse.batch_evaluation import ( + BatchEvaluationResult, + BatchEvaluationResumeToken, + CompositeEvaluatorFunction, + EvaluatorInputs, + EvaluatorStats, + MapperFunction, +) from langfuse.experiment import Evaluation from ._client import client as _client_module @@ -41,6 +49,12 @@ "LangfuseRetriever", "LangfuseGuardrail", "Evaluation", + "EvaluatorInputs", + "MapperFunction", + "CompositeEvaluatorFunction", + "EvaluatorStats", + "BatchEvaluationResumeToken", + "BatchEvaluationResult", "experiment", "api", ] diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 2b3d6671c..3a432da25 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -86,6 +86,13 @@ Prompt_Chat, Prompt_Text, ) +from langfuse.batch_evaluation import ( + BatchEvaluationResult, + BatchEvaluationResumeToken, + BatchEvaluationRunner, + CompositeEvaluatorFunction, + MapperFunction, +) from langfuse.experiment import ( Evaluation, EvaluatorFunction, @@ -2458,6 +2465,7 @@ def run_experiment( data: ExperimentData, task: TaskFunction, evaluators: List[EvaluatorFunction] = [], + composite_evaluator: Optional[CompositeEvaluatorFunction] = None, run_evaluators: List[RunEvaluatorFunction] = [], max_concurrency: int = 50, metadata: Optional[Dict[str, str]] = None, @@ -2493,6 +2501,10 @@ def run_experiment( evaluators: List of functions to evaluate each item's output individually. Each evaluator receives input, output, expected_output, and metadata. Can return single Evaluation dict or list of Evaluation dicts. + composite_evaluator: Optional function that creates composite scores from item-level evaluations. + Receives the same inputs as item-level evaluators (input, output, expected_output, metadata) + plus the list of evaluations from item-level evaluators. Useful for weighted averages, + pass/fail decisions based on multiple criteria, or custom scoring logic combining multiple metrics. run_evaluators: List of functions to evaluate the entire experiment run. Each run evaluator receives all item_results and can compute aggregate metrics. Useful for calculating averages, distributions, or cross-item comparisons. @@ -2630,6 +2642,7 @@ def average_accuracy(*, item_results, **kwargs): data=data, task=task, evaluators=evaluators or [], + composite_evaluator=composite_evaluator, run_evaluators=run_evaluators or [], max_concurrency=max_concurrency, metadata=metadata, @@ -2646,6 +2659,7 @@ async def _run_experiment_async( data: ExperimentData, task: TaskFunction, evaluators: List[EvaluatorFunction], + composite_evaluator: Optional[CompositeEvaluatorFunction], run_evaluators: List[RunEvaluatorFunction], max_concurrency: int, metadata: Optional[Dict[str, Any]] = None, @@ -2661,7 +2675,14 @@ async def _run_experiment_async( async def process_item(item: ExperimentItem) -> ExperimentItemResult: async with semaphore: return await self._process_experiment_item( - item, task, evaluators, name, run_name, description, metadata + item, + task, + evaluators, + composite_evaluator, + name, + run_name, + description, + metadata, ) # Run all items concurrently @@ -2743,6 +2764,7 @@ async def _process_experiment_item( item: ExperimentItem, task: Callable, evaluators: List[Callable], + composite_evaluator: Optional[CompositeEvaluatorFunction], experiment_name: str, experiment_run_name: str, experiment_description: Optional[str], @@ -2901,6 +2923,51 @@ async def _process_experiment_item( except Exception as e: langfuse_logger.error(f"Evaluator failed: {e}") + # Run composite evaluator if provided and we have evaluations + if composite_evaluator and evaluations: + try: + composite_eval_metadata: Optional[Dict[str, Any]] = None + if isinstance(item, dict): + composite_eval_metadata = item.get("metadata") + elif hasattr(item, "metadata"): + composite_eval_metadata = item.metadata + + result = composite_evaluator( + input=input_data, + output=output, + expected_output=expected_output, + metadata=composite_eval_metadata, + evaluations=evaluations, + ) + + # Handle async composite evaluators + if asyncio.iscoroutine(result): + result = await result + + # Normalize to list + composite_evals: List[Evaluation] = [] + if isinstance(result, (dict, Evaluation)): + composite_evals = [result] # type: ignore + elif isinstance(result, list): + composite_evals = result # type: ignore + + # Store composite evaluations as scores and add to evaluations list + for composite_evaluation in composite_evals: + self.create_score( + trace_id=trace_id, + observation_id=span.id, + name=composite_evaluation.name, + value=composite_evaluation.value, # type: ignore + comment=composite_evaluation.comment, + metadata=composite_evaluation.metadata, + config_id=composite_evaluation.config_id, + data_type=composite_evaluation.data_type, # type: ignore + ) + evaluations.append(composite_evaluation) + + except Exception as e: + langfuse_logger.error(f"Composite evaluator failed: {e}") + return ExperimentItemResult( item=item, output=output, @@ -2919,6 +2986,234 @@ def _create_experiment_run_name( return f"{name} - {iso_timestamp}" + def run_batched_evaluation( + self, + *, + scope: Literal["traces", "observations"], + mapper: MapperFunction, + filter: Optional[str] = None, + fetch_batch_size: int = 50, + max_items: Optional[int] = None, + max_retries: int = 3, + evaluators: List[EvaluatorFunction], + composite_evaluator: Optional[CompositeEvaluatorFunction] = None, + max_concurrency: int = 50, + metadata: Optional[Dict[str, Any]] = None, + resume_from: Optional[BatchEvaluationResumeToken] = None, + verbose: bool = False, + ) -> BatchEvaluationResult: + """Fetch traces or observations and run evaluations on each item. + + This method provides a powerful way to evaluate existing data in Langfuse at scale. + It fetches items based on filters, transforms them using a mapper function, runs + evaluators on each item, and creates scores that are linked back to the original + entities. This is ideal for: + + - Running evaluations on production traces after deployment + - Backtesting new evaluation metrics on historical data + - Batch scoring of observations for quality monitoring + - Periodic evaluation runs on recent data + + The method uses a streaming/pipeline approach to process items in batches, making + it memory-efficient for large datasets. It includes comprehensive error handling, + retry logic, and resume capability for long-running evaluations. + + Args: + scope: The type of items to evaluate. Must be one of: + - "traces": Evaluate complete traces with all their observations + - "observations": Evaluate individual observations (spans, generations, events) + mapper: Function that transforms API response objects into evaluator inputs. + Receives a trace/observation object and returns an EvaluatorInputs + instance with input, output, expected_output, and metadata fields. + Can be sync or async. + evaluators: List of evaluation functions to run on each item. Each evaluator + receives the mapped inputs and returns Evaluation object(s). Evaluator + failures are logged but don't stop the batch evaluation. + filter: Optional JSON filter string for querying items (same format as Langfuse API). Examples: + - '{"tags": ["production"]}' + - '{"user_id": "user123", "timestamp": {"operator": ">", "value": "2024-01-01"}}' + Default: None (fetches all items). + fetch_batch_size: Number of items to fetch per API call and hold in memory. + Larger values may be faster but use more memory. Default: 50. + max_items: Maximum total number of items to process. If None, processes all + items matching the filter. Useful for testing or limiting evaluation runs. + Default: None (process all). + max_concurrency: Maximum number of items to evaluate concurrently. Controls + parallelism and resource usage. Default: 50. + composite_evaluator: Optional function that creates a composite score from + item-level evaluations. Receives the original item and its evaluations, + returns a single Evaluation. Useful for weighted averages or combined metrics. + Default: None. + metadata: Optional metadata dict to add to all created scores. Useful for + tracking evaluation runs, versions, or other context. Default: None. + max_retries: Maximum number of retry attempts for failed batch fetches. + Uses exponential backoff (1s, 2s, 4s). Default: 3. + verbose: If True, logs progress information to console. Useful for monitoring + long-running evaluations. Default: False. + resume_from: Optional resume token from a previous incomplete run. Allows + continuing evaluation after interruption or failure. Default: None. + + + Returns: + BatchEvaluationResult containing: + - total_items_fetched: Number of items fetched from API + - total_items_processed: Number of items successfully evaluated + - total_items_failed: Number of items that failed evaluation + - total_scores_created: Scores created by item-level evaluators + - total_composite_scores_created: Scores created by composite evaluator + - total_evaluations_failed: Individual evaluator failures + - evaluator_stats: Per-evaluator statistics (success rate, scores created) + - resume_token: Token for resuming if incomplete (None if completed) + - completed: True if all items processed + - duration_seconds: Total execution time + - failed_item_ids: IDs of items that failed + - error_summary: Error types and counts + - has_more_items: True if max_items reached but more exist + + Raises: + ValueError: If invalid scope is provided. + + Examples: + Basic trace evaluation: + ```python + from langfuse import Langfuse, EvaluatorInputs, Evaluation + + client = Langfuse() + + # Define mapper to extract fields from traces + def trace_mapper(trace): + return EvaluatorInputs( + input=trace.input, + output=trace.output, + expected_output=None, + metadata={"trace_id": trace.id} + ) + + # Define evaluator + def length_evaluator(*, input, output, expected_output, metadata): + return Evaluation( + name="output_length", + value=len(output) if output else 0 + ) + + # Run batch evaluation + result = client.run_batched_evaluation( + scope="traces", + mapper=trace_mapper, + evaluators=[length_evaluator], + filter='{"tags": ["production"]}', + max_items=1000, + verbose=True + ) + + print(f"Processed {result.total_items_processed} traces") + print(f"Created {result.total_scores_created} scores") + ``` + + Evaluation with composite scorer: + ```python + def accuracy_evaluator(*, input, output, expected_output, metadata): + # ... evaluation logic + return Evaluation(name="accuracy", value=0.85) + + def relevance_evaluator(*, input, output, expected_output, metadata): + # ... evaluation logic + return Evaluation(name="relevance", value=0.92) + + def composite_evaluator(*, item, evaluations): + # Weighted average of evaluations + weights = {"accuracy": 0.6, "relevance": 0.4} + total = sum( + e.value * weights.get(e.name, 0) + for e in evaluations + if isinstance(e.value, (int, float)) + ) + return Evaluation( + name="composite_score", + value=total, + comment=f"Weighted average of {len(evaluations)} metrics" + ) + + result = client.run_batched_evaluation( + scope="traces", + mapper=trace_mapper, + evaluators=[accuracy_evaluator, relevance_evaluator], + composite_evaluator=composite_evaluator, + filter='{"user_id": "important_user"}', + verbose=True + ) + ``` + + Handling incomplete runs with resume: + ```python + # Initial run that may fail or timeout + result = client.run_batched_evaluation( + scope="observations", + mapper=obs_mapper, + evaluators=[my_evaluator], + max_items=10000, + verbose=True + ) + + # Check if incomplete + if not result.completed and result.resume_token: + print(f"Processed {result.resume_token.items_processed} items before interruption") + + # Resume from where it left off + result = client.run_batched_evaluation( + scope="observations", + mapper=obs_mapper, + evaluators=[my_evaluator], + resume_from=result.resume_token, + verbose=True + ) + + print(f"Total items processed: {result.total_items_processed}") + ``` + + Monitoring evaluator performance: + ```python + result = client.run_batched_evaluation(...) + + for stats in result.evaluator_stats: + success_rate = stats.successful_runs / stats.total_runs + print(f"{stats.name}:") + print(f" Success rate: {success_rate:.1%}") + print(f" Scores created: {stats.total_scores_created}") + + if stats.failed_runs > 0: + print(f" ⚠️ Failed {stats.failed_runs} times") + ``` + + Note: + - Evaluator failures are logged but don't stop the batch evaluation + - Individual item failures are tracked but don't stop processing + - Fetch failures are retried with exponential backoff + - All scores are automatically flushed to Langfuse at the end + - The resume mechanism uses timestamp-based filtering to avoid duplicates + """ + runner = BatchEvaluationRunner(self) + + return cast( + BatchEvaluationResult, + run_async_safely( + runner.run_async( + scope=scope, + mapper=mapper, + evaluators=evaluators, + filter=filter, + fetch_batch_size=fetch_batch_size, + max_items=max_items, + max_concurrency=max_concurrency, + composite_evaluator=composite_evaluator, + metadata=metadata, + max_retries=max_retries, + verbose=verbose, + resume_from=resume_from, + ) + ), + ) + def auth_check(self) -> bool: """Check if the provided credentials (public and secret key) are valid. diff --git a/langfuse/_client/datasets.py b/langfuse/_client/datasets.py index beb1248ba..0a9a0312c 100644 --- a/langfuse/_client/datasets.py +++ b/langfuse/_client/datasets.py @@ -4,6 +4,7 @@ from opentelemetry.util._decorator import _agnosticcontextmanager +from langfuse.batch_evaluation import CompositeEvaluatorFunction from langfuse.experiment import ( EvaluatorFunction, ExperimentResult, @@ -204,6 +205,7 @@ def run_experiment( description: Optional[str] = None, task: TaskFunction, evaluators: List[EvaluatorFunction] = [], + composite_evaluator: Optional[CompositeEvaluatorFunction] = None, run_evaluators: List[RunEvaluatorFunction] = [], max_concurrency: int = 50, metadata: Optional[Dict[str, Any]] = None, @@ -234,6 +236,10 @@ def run_experiment( .metadata attributes. Signature should be: task(*, item, **kwargs) -> Any evaluators: List of functions to evaluate each item's output individually. These will have access to the item's expected_output for comparison. + composite_evaluator: Optional function that creates composite scores from item-level evaluations. + Receives the same inputs as item-level evaluators (input, output, expected_output, metadata) + plus the list of evaluations from item-level evaluators. Useful for weighted averages, + pass/fail decisions based on multiple criteria, or custom scoring logic combining multiple metrics. run_evaluators: List of functions to evaluate the entire experiment run. Useful for computing aggregate statistics across all dataset items. max_concurrency: Maximum number of concurrent task executions (default: 50). @@ -411,6 +417,7 @@ def content_diversity(*, item_results, **kwargs): data=self.items, task=task, evaluators=evaluators, + composite_evaluator=composite_evaluator, run_evaluators=run_evaluators, max_concurrency=max_concurrency, metadata=metadata, diff --git a/langfuse/_client/observe.py b/langfuse/_client/observe.py index afd969201..e8786a0e0 100644 --- a/langfuse/_client/observe.py +++ b/langfuse/_client/observe.py @@ -589,7 +589,9 @@ def __next__(self) -> Any: raise # Re-raise StopIteration except Exception as e: - self.span.update(level="ERROR", status_message=str(e) or type(e).__name__).end() + self.span.update( + level="ERROR", status_message=str(e) or type(e).__name__ + ).end() raise @@ -654,6 +656,8 @@ async def __anext__(self) -> Any: raise # Re-raise StopAsyncIteration except Exception as e: - self.span.update(level="ERROR", status_message=str(e) or type(e).__name__).end() + self.span.update( + level="ERROR", status_message=str(e) or type(e).__name__ + ).end() raise diff --git a/langfuse/batch_evaluation.py b/langfuse/batch_evaluation.py new file mode 100644 index 000000000..35e1ea938 --- /dev/null +++ b/langfuse/batch_evaluation.py @@ -0,0 +1,1558 @@ +"""Batch evaluation functionality for Langfuse. + +This module provides comprehensive batch evaluation capabilities for running evaluations +on traces and observations fetched from Langfuse. It includes type definitions, +protocols, result classes, and the implementation for large-scale evaluation workflows +with error handling, retry logic, and resume capability. +""" + +import asyncio +import json +import logging +import time +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Dict, + List, + Optional, + Protocol, + Tuple, + Union, +) + +from langfuse.api.resources.commons.types import ( + ObservationsView, + TraceWithFullDetails, +) +from langfuse.experiment import Evaluation, EvaluatorFunction + +if TYPE_CHECKING: + from langfuse._client.client import Langfuse + +logger = logging.getLogger("langfuse") + + +class EvaluatorInputs: + """Input data structure for evaluators, returned by mapper functions. + + This class provides a strongly-typed container for transforming API response + objects (traces, observations) into the standardized format expected + by evaluator functions. It ensures consistent access to input, output, expected + output, and metadata regardless of the source entity type. + + Attributes: + input: The input data that was provided to generate the output being evaluated. + For traces, this might be the initial prompt or request. For observations, + this could be the span's input. The exact meaning depends on your use case. + output: The actual output that was produced and needs to be evaluated. + For traces, this is typically the final response. For observations, + this might be the generation output or span result. + expected_output: Optional ground truth or expected result for comparison. + Used by evaluators to assess correctness. May be None if no ground truth + is available for the entity being evaluated. + metadata: Optional structured metadata providing additional context for evaluation. + Can include information about the entity, execution context, user attributes, + or any other relevant data that evaluators might use. + + Examples: + Simple mapper for traces: + ```python + from langfuse import EvaluatorInputs + + def trace_mapper(trace): + return EvaluatorInputs( + input=trace.input, + output=trace.output, + expected_output=None, # No ground truth available + metadata={"user_id": trace.user_id, "tags": trace.tags} + ) + ``` + + Mapper for observations extracting specific fields: + ```python + def observation_mapper(observation): + # Extract input/output from observation's data + input_data = observation.input if hasattr(observation, 'input') else None + output_data = observation.output if hasattr(observation, 'output') else None + + return EvaluatorInputs( + input=input_data, + output=output_data, + expected_output=None, + metadata={ + "observation_type": observation.type, + "model": observation.model, + "latency_ms": observation.end_time - observation.start_time + } + ) + ``` + ``` + + Note: + All arguments must be passed as keywords when instantiating this class. + """ + + def __init__( + self, + *, + input: Any, + output: Any, + expected_output: Any = None, + metadata: Optional[Dict[str, Any]] = None, + ): + """Initialize EvaluatorInputs with the provided data. + + Args: + input: The input data for evaluation. + output: The output data to be evaluated. + expected_output: Optional ground truth for comparison. + metadata: Optional additional context for evaluation. + + Note: + All arguments must be provided as keywords. + """ + self.input = input + self.output = output + self.expected_output = expected_output + self.metadata = metadata + + +class MapperFunction(Protocol): + """Protocol defining the interface for mapper functions in batch evaluation. + + Mapper functions transform API response objects (traces or observations) + into the standardized EvaluatorInputs format that evaluators expect. This abstraction + allows you to define how to extract and structure evaluation data from different + entity types. + + Mapper functions must: + - Accept a single item parameter (trace, observation) + - Return an EvaluatorInputs instance with input, output, expected_output, metadata + - Can be either synchronous or asynchronous + - Should handle missing or malformed data gracefully + """ + + def __call__( + self, + *, + item: Union["TraceWithFullDetails", "ObservationsView"], + **kwargs: Dict[str, Any], + ) -> Union[EvaluatorInputs, Awaitable[EvaluatorInputs]]: + """Transform an API response object into evaluator inputs. + + This method defines how to extract evaluation-relevant data from the raw + API response object. The implementation should map entity-specific fields + to the standardized input/output/expected_output/metadata structure. + + Args: + item: The API response object to transform. The type depends on the scope: + - TraceWithFullDetails: When evaluating traces + - ObservationsView: When evaluating observations + + Returns: + EvaluatorInputs: A structured container with: + - input: The input data that generated the output + - output: The output to be evaluated + - expected_output: Optional ground truth for comparison + - metadata: Optional additional context + + Can return either a direct EvaluatorInputs instance or an awaitable + (for async mappers that need to fetch additional data). + + Examples: + Basic trace mapper: + ```python + def map_trace(trace): + return EvaluatorInputs( + input=trace.input, + output=trace.output, + expected_output=None, + metadata={"trace_id": trace.id, "user": trace.user_id} + ) + ``` + + Observation mapper with conditional logic: + ```python + def map_observation(observation): + # Extract fields based on observation type + if observation.type == "GENERATION": + input_data = observation.input + output_data = observation.output + else: + # For other types, use different fields + input_data = observation.metadata.get("input") + output_data = observation.metadata.get("output") + + return EvaluatorInputs( + input=input_data, + output=output_data, + expected_output=None, + metadata={"obs_id": observation.id, "type": observation.type} + ) + ``` + + Async mapper (if additional processing needed): + ```python + async def map_trace_async(trace): + # Could do async processing here if needed + processed_output = await some_async_transformation(trace.output) + + return EvaluatorInputs( + input=trace.input, + output=processed_output, + expected_output=None, + metadata={"trace_id": trace.id} + ) + ``` + """ + ... + + +class CompositeEvaluatorFunction(Protocol): + """Protocol defining the interface for composite evaluator functions. + + Composite evaluators create aggregate scores from multiple item-level evaluations. + This is commonly used to compute weighted averages, combined metrics, or other + composite assessments based on individual evaluation results. + + Composite evaluators: + - Accept the same inputs as item-level evaluators (input, output, expected_output, metadata) + plus the list of evaluations + - Return either a single Evaluation, a list of Evaluations, or a dict + - Can be either synchronous or asynchronous + - Have access to both raw item data and evaluation results + """ + + def __call__( + self, + *, + input: Optional[Any] = None, + output: Optional[Any] = None, + expected_output: Optional[Any] = None, + metadata: Optional[Dict[str, Any]] = None, + evaluations: List[Evaluation], + **kwargs: Dict[str, Any], + ) -> Union[ + Evaluation, + List[Evaluation], + Dict[str, Any], + Awaitable[Evaluation], + Awaitable[List[Evaluation]], + Awaitable[Dict[str, Any]], + ]: + r"""Create a composite evaluation from item-level evaluation results. + + This method combines multiple evaluation scores into a single composite metric. + Common use cases include weighted averages, pass/fail decisions based on multiple + criteria, or custom scoring logic that considers multiple dimensions. + + Args: + input: The input data that was provided to the system being evaluated. + output: The output generated by the system being evaluated. + expected_output: The expected/reference output for comparison (if available). + metadata: Additional metadata about the evaluation context. + evaluations: List of evaluation results from item-level evaluators. + Each evaluation contains name, value, comment, and metadata. + + Returns: + Can return any of: + - Evaluation: A single composite evaluation result + - List[Evaluation]: Multiple composite evaluations + - Dict: A dict that will be converted to an Evaluation + - name: Identifier for the composite metric (e.g., "composite_score") + - value: The computed composite value + - comment: Optional explanation of how the score was computed + - metadata: Optional details about the composition logic + + Can return either a direct Evaluation instance or an awaitable + (for async composite evaluators). + + Examples: + Simple weighted average: + ```python + def weighted_composite(*, input, output, expected_output, metadata, evaluations): + weights = { + "accuracy": 0.5, + "relevance": 0.3, + "safety": 0.2 + } + + total_score = 0.0 + total_weight = 0.0 + + for eval in evaluations: + if eval.name in weights and isinstance(eval.value, (int, float)): + total_score += eval.value * weights[eval.name] + total_weight += weights[eval.name] + + final_score = total_score / total_weight if total_weight > 0 else 0.0 + + return Evaluation( + name="composite_score", + value=final_score, + comment=f"Weighted average of {len(evaluations)} metrics" + ) + ``` + + Pass/fail composite based on thresholds: + ```python + def pass_fail_composite(*, input, output, expected_output, metadata, evaluations): + # Must pass all criteria + thresholds = { + "accuracy": 0.7, + "safety": 0.9, + "relevance": 0.6 + } + + passes = True + failing_metrics = [] + + for metric, threshold in thresholds.items(): + eval_result = next((e for e in evaluations if e.name == metric), None) + if eval_result and isinstance(eval_result.value, (int, float)): + if eval_result.value < threshold: + passes = False + failing_metrics.append(metric) + + return Evaluation( + name="passes_all_checks", + value=passes, + comment=f"Failed: {', '.join(failing_metrics)}" if failing_metrics else "All checks passed", + data_type="BOOLEAN" + ) + ``` + + Async composite with external scoring: + ```python + async def llm_composite(*, input, output, expected_output, metadata, evaluations): + # Use LLM to synthesize multiple evaluation results + eval_summary = "\n".join( + f"- {e.name}: {e.value}" for e in evaluations + ) + + prompt = f"Given these evaluation scores:\n{eval_summary}\n" + prompt += f"For the output: {output}\n" + prompt += "Provide an overall quality score from 0-1." + + response = await openai.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": prompt}] + ) + + score = float(response.choices[0].message.content.strip()) + + return Evaluation( + name="llm_composite_score", + value=score, + comment="LLM-synthesized composite score" + ) + ``` + + Context-aware composite: + ```python + def context_composite(*, input, output, expected_output, metadata, evaluations): + # Adjust weighting based on metadata + base_weights = {"accuracy": 0.5, "speed": 0.3, "cost": 0.2} + + # If metadata indicates high importance, prioritize accuracy + if metadata and metadata.get('importance') == 'high': + weights = {"accuracy": 0.7, "speed": 0.2, "cost": 0.1} + else: + weights = base_weights + + total = sum( + e.value * weights.get(e.name, 0) + for e in evaluations + if isinstance(e.value, (int, float)) + ) + + return Evaluation( + name="weighted_composite", + value=total, + comment="Context-aware weighted composite" + ) + ``` + """ + ... + + +class EvaluatorStats: + """Statistics for a single evaluator's performance during batch evaluation. + + This class tracks detailed metrics about how a specific evaluator performed + across all items in a batch evaluation run. It helps identify evaluator issues, + understand reliability, and optimize evaluation pipelines. + + Attributes: + name: The name of the evaluator function (extracted from __name__). + total_runs: Total number of times the evaluator was invoked. + successful_runs: Number of times the evaluator completed successfully. + failed_runs: Number of times the evaluator raised an exception or failed. + total_scores_created: Total number of evaluation scores created by this evaluator. + Can be higher than successful_runs if the evaluator returns multiple scores. + + Examples: + Accessing evaluator stats from batch evaluation result: + ```python + result = client.run_batched_evaluation(...) + + for stats in result.evaluator_stats: + print(f"Evaluator: {stats.name}") + print(f" Success rate: {stats.successful_runs / stats.total_runs:.1%}") + print(f" Scores created: {stats.total_scores_created}") + + if stats.failed_runs > 0: + print(f" ⚠️ Failed {stats.failed_runs} times") + ``` + + Identifying problematic evaluators: + ```python + result = client.run_batched_evaluation(...) + + # Find evaluators with high failure rates + for stats in result.evaluator_stats: + failure_rate = stats.failed_runs / stats.total_runs + if failure_rate > 0.1: # More than 10% failures + print(f"⚠️ {stats.name} has {failure_rate:.1%} failure rate") + print(f" Consider debugging or removing this evaluator") + ``` + + Note: + All arguments must be passed as keywords when instantiating this class. + """ + + def __init__( + self, + *, + name: str, + total_runs: int = 0, + successful_runs: int = 0, + failed_runs: int = 0, + total_scores_created: int = 0, + ): + """Initialize EvaluatorStats with the provided metrics. + + Args: + name: The evaluator function name. + total_runs: Total number of evaluator invocations. + successful_runs: Number of successful completions. + failed_runs: Number of failures. + total_scores_created: Total scores created by this evaluator. + + Note: + All arguments must be provided as keywords. + """ + self.name = name + self.total_runs = total_runs + self.successful_runs = successful_runs + self.failed_runs = failed_runs + self.total_scores_created = total_scores_created + + +class BatchEvaluationResumeToken: + """Token for resuming a failed batch evaluation run. + + This class encapsulates all the information needed to resume a batch evaluation + that was interrupted or failed partway through. It uses timestamp-based filtering + to avoid re-processing items that were already evaluated, even if the underlying + dataset changed between runs. + + Attributes: + scope: The type of items being evaluated ("traces", "observations"). + filter: The original JSON filter string used to query items. + last_processed_timestamp: ISO 8601 timestamp of the last successfully processed item. + Used to construct a filter that only fetches items after this timestamp. + last_processed_id: The ID of the last successfully processed item, for reference. + items_processed: Count of items successfully processed before interruption. + + Examples: + Resuming a failed batch evaluation: + ```python + # Initial run that fails partway through + try: + result = client.run_batched_evaluation( + scope="traces", + mapper=my_mapper, + evaluators=[evaluator1, evaluator2], + filter='{"tags": ["production"]}', + max_items=10000 + ) + except Exception as e: + print(f"Evaluation failed: {e}") + + # Save the resume token + if result.resume_token: + # Store resume token for later (e.g., in a file or database) + import json + with open("resume_token.json", "w") as f: + json.dump({ + "scope": result.resume_token.scope, + "filter": result.resume_token.filter, + "last_timestamp": result.resume_token.last_processed_timestamp, + "last_id": result.resume_token.last_processed_id, + "items_done": result.resume_token.items_processed + }, f) + + # Later, resume from where it left off + with open("resume_token.json") as f: + token_data = json.load(f) + + resume_token = BatchEvaluationResumeToken( + scope=token_data["scope"], + filter=token_data["filter"], + last_processed_timestamp=token_data["last_timestamp"], + last_processed_id=token_data["last_id"], + items_processed=token_data["items_done"] + ) + + # Resume the evaluation + result = client.run_batched_evaluation( + scope="traces", + mapper=my_mapper, + evaluators=[evaluator1, evaluator2], + resume_from=resume_token + ) + + print(f"Processed {result.total_items_processed} additional items") + ``` + + Handling partial completion: + ```python + result = client.run_batched_evaluation(...) + + if not result.completed: + print(f"Evaluation incomplete. Processed {result.resume_token.items_processed} items") + print(f"Last item: {result.resume_token.last_processed_id}") + print(f"Resume from: {result.resume_token.last_processed_timestamp}") + + # Optionally retry automatically + if result.resume_token: + print("Retrying...") + result = client.run_batched_evaluation( + scope=result.resume_token.scope, + mapper=my_mapper, + evaluators=my_evaluators, + resume_from=result.resume_token + ) + ``` + + Note: + All arguments must be passed as keywords when instantiating this class. + The timestamp-based approach means that items created after the initial run + but before the timestamp will be skipped. This is intentional to avoid + duplicates and ensure consistent evaluation. + """ + + def __init__( + self, + *, + scope: str, + filter: Optional[str], + last_processed_timestamp: str, + last_processed_id: str, + items_processed: int, + ): + """Initialize BatchEvaluationResumeToken with the provided state. + + Args: + scope: The scope type ("traces", "observations"). + filter: The original JSON filter string. + last_processed_timestamp: ISO 8601 timestamp of last processed item. + last_processed_id: ID of last processed item. + items_processed: Count of items processed before interruption. + + Note: + All arguments must be provided as keywords. + """ + self.scope = scope + self.filter = filter + self.last_processed_timestamp = last_processed_timestamp + self.last_processed_id = last_processed_id + self.items_processed = items_processed + + +class BatchEvaluationResult: + r"""Complete result structure for batch evaluation execution. + + This class encapsulates comprehensive statistics and metadata about a batch + evaluation run, including counts, evaluator-specific metrics, timing information, + error details, and resume capability. + + Attributes: + total_items_fetched: Total number of items fetched from the API. + total_items_processed: Number of items successfully evaluated. + total_items_failed: Number of items that failed during evaluation. + total_scores_created: Total scores created by all item-level evaluators. + total_composite_scores_created: Scores created by the composite evaluator. + total_evaluations_failed: Number of individual evaluator failures across all items. + evaluator_stats: List of per-evaluator statistics (success/failure rates, scores created). + resume_token: Token for resuming if evaluation was interrupted (None if completed). + completed: True if all items were processed, False if stopped early or failed. + duration_seconds: Total time taken to execute the batch evaluation. + failed_item_ids: List of IDs for items that failed evaluation. + error_summary: Dictionary mapping error types to occurrence counts. + has_more_items: True if max_items limit was reached but more items exist. + item_evaluations: Dictionary mapping item IDs to their evaluation results (both regular and composite). + + Examples: + Basic result inspection: + ```python + result = client.run_batched_evaluation(...) + + print(f"Processed: {result.total_items_processed}/{result.total_items_fetched}") + print(f"Scores created: {result.total_scores_created}") + print(f"Duration: {result.duration_seconds:.2f}s") + print(f"Success rate: {result.total_items_processed / result.total_items_fetched:.1%}") + ``` + + Detailed analysis with evaluator stats: + ```python + result = client.run_batched_evaluation(...) + + print(f"\n📊 Batch Evaluation Results") + print(f"{'='*50}") + print(f"Items processed: {result.total_items_processed}") + print(f"Items failed: {result.total_items_failed}") + print(f"Scores created: {result.total_scores_created}") + + if result.total_composite_scores_created > 0: + print(f"Composite scores: {result.total_composite_scores_created}") + + print(f"\n📈 Evaluator Performance:") + for stats in result.evaluator_stats: + success_rate = stats.successful_runs / stats.total_runs if stats.total_runs > 0 else 0 + print(f"\n {stats.name}:") + print(f" Success rate: {success_rate:.1%}") + print(f" Scores created: {stats.total_scores_created}") + if stats.failed_runs > 0: + print(f" ⚠️ Failures: {stats.failed_runs}") + + if result.error_summary: + print(f"\n⚠️ Errors encountered:") + for error_type, count in result.error_summary.items(): + print(f" {error_type}: {count}") + ``` + + Handling incomplete runs: + ```python + result = client.run_batched_evaluation(...) + + if not result.completed: + print("⚠️ Evaluation incomplete!") + + if result.resume_token: + print(f"Processed {result.resume_token.items_processed} items before failure") + print(f"Use resume_from parameter to continue from:") + print(f" Timestamp: {result.resume_token.last_processed_timestamp}") + print(f" Last ID: {result.resume_token.last_processed_id}") + + if result.has_more_items: + print(f"ℹ️ More items available beyond max_items limit") + ``` + + Performance monitoring: + ```python + result = client.run_batched_evaluation(...) + + items_per_second = result.total_items_processed / result.duration_seconds + avg_scores_per_item = result.total_scores_created / result.total_items_processed + + print(f"Performance metrics:") + print(f" Throughput: {items_per_second:.2f} items/second") + print(f" Avg scores/item: {avg_scores_per_item:.2f}") + print(f" Total duration: {result.duration_seconds:.2f}s") + + if result.total_evaluations_failed > 0: + failure_rate = result.total_evaluations_failed / ( + result.total_items_processed * len(result.evaluator_stats) + ) + print(f" Evaluation failure rate: {failure_rate:.1%}") + ``` + + Note: + All arguments must be passed as keywords when instantiating this class. + """ + + def __init__( + self, + *, + total_items_fetched: int, + total_items_processed: int, + total_items_failed: int, + total_scores_created: int, + total_composite_scores_created: int, + total_evaluations_failed: int, + evaluator_stats: List[EvaluatorStats], + resume_token: Optional[BatchEvaluationResumeToken], + completed: bool, + duration_seconds: float, + failed_item_ids: List[str], + error_summary: Dict[str, int], + has_more_items: bool, + item_evaluations: Dict[str, List["Evaluation"]], + ): + """Initialize BatchEvaluationResult with comprehensive statistics. + + Args: + total_items_fetched: Total items fetched from API. + total_items_processed: Items successfully evaluated. + total_items_failed: Items that failed evaluation. + total_scores_created: Scores from item-level evaluators. + total_composite_scores_created: Scores from composite evaluator. + total_evaluations_failed: Individual evaluator failures. + evaluator_stats: Per-evaluator statistics. + resume_token: Token for resuming (None if completed). + completed: Whether all items were processed. + duration_seconds: Total execution time. + failed_item_ids: IDs of failed items. + error_summary: Error types and counts. + has_more_items: Whether more items exist beyond max_items. + item_evaluations: Dictionary mapping item IDs to their evaluation results. + + Note: + All arguments must be provided as keywords. + """ + self.total_items_fetched = total_items_fetched + self.total_items_processed = total_items_processed + self.total_items_failed = total_items_failed + self.total_scores_created = total_scores_created + self.total_composite_scores_created = total_composite_scores_created + self.total_evaluations_failed = total_evaluations_failed + self.evaluator_stats = evaluator_stats + self.resume_token = resume_token + self.completed = completed + self.duration_seconds = duration_seconds + self.failed_item_ids = failed_item_ids + self.error_summary = error_summary + self.has_more_items = has_more_items + self.item_evaluations = item_evaluations + + def __str__(self) -> str: + """Return a formatted string representation of the batch evaluation results. + + Returns: + A multi-line string with a summary of the evaluation results. + """ + lines = [] + lines.append("=" * 60) + lines.append("Batch Evaluation Results") + lines.append("=" * 60) + + # Summary statistics + lines.append(f"\nStatus: {'Completed' if self.completed else 'Incomplete'}") + lines.append(f"Duration: {self.duration_seconds:.2f}s") + lines.append(f"\nItems fetched: {self.total_items_fetched}") + lines.append(f"Items processed: {self.total_items_processed}") + + if self.total_items_failed > 0: + lines.append(f"Items failed: {self.total_items_failed}") + + # Success rate + if self.total_items_fetched > 0: + success_rate = self.total_items_processed / self.total_items_fetched * 100 + lines.append(f"Success rate: {success_rate:.1f}%") + + # Scores created + lines.append(f"\nScores created: {self.total_scores_created}") + if self.total_composite_scores_created > 0: + lines.append(f"Composite scores: {self.total_composite_scores_created}") + + total_scores = self.total_scores_created + self.total_composite_scores_created + lines.append(f"Total scores: {total_scores}") + + # Evaluator statistics + if self.evaluator_stats: + lines.append("\nEvaluator Performance:") + for stats in self.evaluator_stats: + lines.append(f" {stats.name}:") + if stats.total_runs > 0: + success_rate = ( + stats.successful_runs / stats.total_runs * 100 + if stats.total_runs > 0 + else 0 + ) + lines.append( + f" Runs: {stats.successful_runs}/{stats.total_runs} " + f"({success_rate:.1f}% success)" + ) + lines.append(f" Scores created: {stats.total_scores_created}") + if stats.failed_runs > 0: + lines.append(f" Failed runs: {stats.failed_runs}") + + # Performance metrics + if self.total_items_processed > 0 and self.duration_seconds > 0: + items_per_sec = self.total_items_processed / self.duration_seconds + lines.append("\nPerformance:") + lines.append(f" Throughput: {items_per_sec:.2f} items/second") + if self.total_scores_created > 0: + avg_scores = self.total_scores_created / self.total_items_processed + lines.append(f" Avg scores per item: {avg_scores:.2f}") + + # Errors and warnings + if self.error_summary: + lines.append("\nErrors encountered:") + for error_type, count in self.error_summary.items(): + lines.append(f" {error_type}: {count}") + + # Incomplete run information + if not self.completed: + lines.append("\nWarning: Evaluation incomplete") + if self.resume_token: + lines.append( + f" Last processed: {self.resume_token.last_processed_timestamp}" + ) + lines.append(f" Items processed: {self.resume_token.items_processed}") + lines.append(" Use resume_from parameter to continue") + + if self.has_more_items: + lines.append("\nNote: More items available beyond max_items limit") + + lines.append("=" * 60) + return "\n".join(lines) + + +class BatchEvaluationRunner: + """Handles batch evaluation execution for a Langfuse client. + + This class encapsulates all the logic for fetching items, running evaluators, + creating scores, and managing the evaluation lifecycle. It provides a clean + separation of concerns from the main Langfuse client class. + + The runner uses a streaming/pipeline approach to process items in batches, + avoiding loading the entire dataset into memory. This makes it suitable for + evaluating large numbers of items. + + Attributes: + client: The Langfuse client instance used for API calls and score creation. + _log: Logger instance for this runner. + """ + + def __init__(self, client: "Langfuse"): + """Initialize the batch evaluation runner. + + Args: + client: The Langfuse client instance. + """ + self.client = client + self._log = logger + + async def run_async( + self, + *, + scope: str, + mapper: MapperFunction, + evaluators: List[EvaluatorFunction], + filter: Optional[str] = None, + fetch_batch_size: int = 50, + max_items: Optional[int] = None, + max_concurrency: int = 50, + composite_evaluator: Optional[CompositeEvaluatorFunction] = None, + metadata: Optional[Dict[str, Any]] = None, + max_retries: int = 3, + verbose: bool = False, + resume_from: Optional[BatchEvaluationResumeToken] = None, + ) -> BatchEvaluationResult: + """Run batch evaluation asynchronously. + + This is the main implementation method that orchestrates the entire batch + evaluation process: fetching items, mapping, evaluating, creating scores, + and tracking statistics. + + Args: + scope: The type of items to evaluate ("traces", "observations"). + mapper: Function to transform API response items to evaluator inputs. + evaluators: List of evaluation functions to run on each item. + filter: JSON filter string for querying items. + fetch_batch_size: Number of items to fetch per API call. + max_items: Maximum number of items to process (None = all). + max_concurrency: Maximum number of concurrent evaluations. + composite_evaluator: Optional function to create composite scores. + metadata: Metadata to add to all created scores. + max_retries: Maximum retries for failed batch fetches. + verbose: If True, log progress to console. + resume_from: Resume token from a previous failed run. + + Returns: + BatchEvaluationResult with comprehensive statistics. + """ + start_time = time.time() + + # Initialize tracking variables + total_items_fetched = 0 + total_items_processed = 0 + total_items_failed = 0 + total_scores_created = 0 + total_composite_scores_created = 0 + total_evaluations_failed = 0 + failed_item_ids: List[str] = [] + error_summary: Dict[str, int] = {} + item_evaluations: Dict[str, List[Evaluation]] = {} + + # Initialize evaluator stats + evaluator_stats_dict = { + getattr(evaluator, "__name__", "unknown_evaluator"): EvaluatorStats( + name=getattr(evaluator, "__name__", "unknown_evaluator") + ) + for evaluator in evaluators + } + + # Handle resume token by modifying filter + effective_filter = self._build_timestamp_filter(filter, resume_from) + + # Create semaphore for concurrency control + semaphore = asyncio.Semaphore(max_concurrency) + + # Pagination state + page = 1 + has_more = True + last_item_timestamp: Optional[str] = None + last_item_id: Optional[str] = None + + if verbose: + self._log.info(f"Starting batch evaluation on {scope}") + if resume_from: + self._log.info( + f"Resuming from {resume_from.last_processed_timestamp} " + f"({resume_from.items_processed} items already processed)" + ) + + # Main pagination loop + while has_more: + # Check if we've reached max_items + if max_items is not None and total_items_fetched >= max_items: + if verbose: + self._log.info(f"Reached max_items limit ({max_items})") + has_more = True # More items may exist + break + + # Fetch next batch with retry logic + try: + items = await self._fetch_batch_with_retry( + scope=scope, + filter=effective_filter, + page=page, + limit=fetch_batch_size, + max_retries=max_retries, + ) + except Exception as e: + # Failed after max_retries - create resume token and return + error_msg = f"Failed to fetch batch after {max_retries} retries" + self._log.error(f"{error_msg}: {e}") + + resume_token = BatchEvaluationResumeToken( + scope=scope, + filter=filter, # Original filter, not modified + last_processed_timestamp=last_item_timestamp or "", + last_processed_id=last_item_id or "", + items_processed=total_items_processed, + ) + + return self._build_result( + total_items_fetched=total_items_fetched, + total_items_processed=total_items_processed, + total_items_failed=total_items_failed, + total_scores_created=total_scores_created, + total_composite_scores_created=total_composite_scores_created, + total_evaluations_failed=total_evaluations_failed, + evaluator_stats_dict=evaluator_stats_dict, + resume_token=resume_token, + completed=False, + start_time=start_time, + failed_item_ids=failed_item_ids, + error_summary=error_summary, + has_more_items=has_more, + item_evaluations=item_evaluations, + ) + + # Check if we got any items + if not items: + has_more = False + if verbose: + self._log.info("No more items to fetch") + break + + total_items_fetched += len(items) + + if verbose: + self._log.info(f"Fetched batch {page} ({len(items)} items)") + + # Limit items if max_items would be exceeded + items_to_process = items + if max_items is not None: + remaining_capacity = max_items - total_items_processed + if len(items) > remaining_capacity: + items_to_process = items[:remaining_capacity] + if verbose: + self._log.info( + f"Limiting batch to {len(items_to_process)} items " + f"to respect max_items={max_items}" + ) + + # Process items concurrently + async def process_item( + item: Union[TraceWithFullDetails, ObservationsView], + ) -> Tuple[str, Union[Tuple[int, int, int, List[Evaluation]], Exception]]: + """Process a single item and return (item_id, result).""" + async with semaphore: + item_id = self._get_item_id(item, scope) + try: + result = await self._process_batch_evaluation_item( + item=item, + scope=scope, + mapper=mapper, + evaluators=evaluators, + composite_evaluator=composite_evaluator, + metadata=metadata, + evaluator_stats_dict=evaluator_stats_dict, + ) + return (item_id, result) + except Exception as e: + return (item_id, e) + + # Run all items in batch concurrently + tasks = [process_item(item) for item in items_to_process] + results = await asyncio.gather(*tasks) + + # Process results and update statistics + for item, (item_id, result) in zip(items_to_process, results): + if isinstance(result, Exception): + # Item processing failed + total_items_failed += 1 + failed_item_ids.append(item_id) + error_type = type(result).__name__ + error_summary[error_type] = error_summary.get(error_type, 0) + 1 + self._log.warning(f"Item {item_id} failed: {result}") + else: + # Item processed successfully + total_items_processed += 1 + scores_created, composite_created, evals_failed, evaluations = ( + result + ) + total_scores_created += scores_created + total_composite_scores_created += composite_created + total_evaluations_failed += evals_failed + + # Store evaluations for this item + item_evaluations[item_id] = evaluations + + # Update last processed tracking + last_item_timestamp = self._get_item_timestamp(item, scope) + last_item_id = item_id + + if verbose: + if max_items is not None and max_items > 0: + progress_pct = total_items_processed / max_items * 100 + self._log.info( + f"Progress: {total_items_processed}/{max_items} items " + f"({progress_pct:.1f}%), {total_scores_created} scores created" + ) + else: + self._log.info( + f"Progress: {total_items_processed} items processed, " + f"{total_scores_created} scores created" + ) + + # Check if we should continue to next page + if len(items) < fetch_batch_size: + # Last page - no more items available + has_more = False + else: + page += 1 + + # Check max_items again before next fetch + if max_items is not None and total_items_fetched >= max_items: + has_more = True # More items exist but we're stopping + break + + # Flush all scores to Langfuse + if verbose: + self._log.info("Flushing scores to Langfuse...") + self.client.flush() + + # Build final result + duration = time.time() - start_time + + if verbose: + self._log.info( + f"Batch evaluation complete: {total_items_processed} items processed " + f"in {duration:.2f}s" + ) + + # Completed successfully if we either: + # 1. Ran out of items (has_more is False), OR + # 2. Hit max_items limit (intentionally stopped) + completed_successfully = not has_more or ( + max_items is not None and total_items_fetched >= max_items + ) + + return self._build_result( + total_items_fetched=total_items_fetched, + total_items_processed=total_items_processed, + total_items_failed=total_items_failed, + total_scores_created=total_scores_created, + total_composite_scores_created=total_composite_scores_created, + total_evaluations_failed=total_evaluations_failed, + evaluator_stats_dict=evaluator_stats_dict, + resume_token=None, # No resume needed on successful completion + completed=completed_successfully, + start_time=start_time, + failed_item_ids=failed_item_ids, + error_summary=error_summary, + has_more_items=( + has_more and max_items is not None and total_items_fetched >= max_items + ), + item_evaluations=item_evaluations, + ) + + async def _fetch_batch_with_retry( + self, + *, + scope: str, + filter: Optional[str], + page: int, + limit: int, + max_retries: int, + ) -> List[Union[TraceWithFullDetails, ObservationsView]]: + """Fetch a batch of items with retry logic. + + Args: + scope: The type of items ("traces", "observations"). + filter: JSON filter string for querying. + page: Page number (1-indexed). + limit: Number of items per page. + max_retries: Maximum number of retry attempts. + verbose: Whether to log retry attempts. + + Returns: + List of items from the API. + + Raises: + Exception: If all retry attempts fail. + """ + if scope == "traces": + response = self.client.api.trace.list( + page=page, + limit=limit, + filter=filter, + request_options={"max_retries": max_retries}, + ) # type: ignore + return list(response.data) # type: ignore + elif scope == "observations": + response = self.client.api.observations.get_many( + page=page, + limit=limit, + filter=filter, + request_options={"max_retries": max_retries}, + ) # type: ignore + return list(response.data) # type: ignore + else: + error_message = f"Invalid scope: {scope}" + raise ValueError(error_message) + + async def _process_batch_evaluation_item( + self, + item: Union[TraceWithFullDetails, ObservationsView], + scope: str, + mapper: MapperFunction, + evaluators: List[EvaluatorFunction], + composite_evaluator: Optional[CompositeEvaluatorFunction], + metadata: Optional[Dict[str, Any]], + evaluator_stats_dict: Dict[str, EvaluatorStats], + ) -> Tuple[int, int, int, List[Evaluation]]: + """Process a single item: map, evaluate, create scores. + + Args: + item: The API response object to evaluate. + scope: The type of item ("traces", "observations"). + mapper: Function to transform item to evaluator inputs. + evaluators: List of evaluator functions. + composite_evaluator: Optional composite evaluator function. + metadata: Additional metadata to add to scores. + evaluator_stats_dict: Dictionary tracking evaluator statistics. + + Returns: + Tuple of (scores_created, composite_scores_created, evaluations_failed, all_evaluations). + + Raises: + Exception: If mapping fails or item processing encounters fatal error. + """ + scores_created = 0 + composite_scores_created = 0 + evaluations_failed = 0 + + # Run mapper to transform item + evaluator_inputs = await self._run_mapper(mapper, item) + + # Run all evaluators + evaluations: List[Evaluation] = [] + for evaluator in evaluators: + evaluator_name = getattr(evaluator, "__name__", "unknown_evaluator") + stats = evaluator_stats_dict[evaluator_name] + stats.total_runs += 1 + + try: + eval_results = await self._run_evaluator_internal( + evaluator, + input=evaluator_inputs.input, + output=evaluator_inputs.output, + expected_output=evaluator_inputs.expected_output, + metadata=evaluator_inputs.metadata, + ) + + stats.successful_runs += 1 + stats.total_scores_created += len(eval_results) + evaluations.extend(eval_results) + + except Exception as e: + # Evaluator failed - log warning and continue with other evaluators + stats.failed_runs += 1 + evaluations_failed += 1 + self._log.warning( + f"Evaluator {evaluator_name} failed on item " + f"{self._get_item_id(item, scope)}: {e}" + ) + + # Create scores for item-level evaluations + item_id = self._get_item_id(item, scope) + for evaluation in evaluations: + self._create_score_for_scope( + scope=scope, + item_id=item_id, + evaluation=evaluation, + additional_metadata=metadata, + ) + scores_created += 1 + + # Run composite evaluator if provided and we have evaluations + if composite_evaluator and evaluations: + try: + composite_evals = await self._run_composite_evaluator( + composite_evaluator, + input=evaluator_inputs.input, + output=evaluator_inputs.output, + expected_output=evaluator_inputs.expected_output, + metadata=evaluator_inputs.metadata, + evaluations=evaluations, + ) + + # Create scores for all composite evaluations + for composite_eval in composite_evals: + self._create_score_for_scope( + scope=scope, + item_id=item_id, + evaluation=composite_eval, + additional_metadata=metadata, + ) + composite_scores_created += 1 + + # Add composite evaluations to the list + evaluations.extend(composite_evals) + + except Exception as e: + self._log.warning(f"Composite evaluator failed on item {item_id}: {e}") + + return ( + scores_created, + composite_scores_created, + evaluations_failed, + evaluations, + ) + + async def _run_evaluator_internal( + self, + evaluator: EvaluatorFunction, + **kwargs: Any, + ) -> List[Evaluation]: + """Run an evaluator function and normalize the result. + + Unlike experiment._run_evaluator, this version raises exceptions + so we can track failures in our statistics. + + Args: + evaluator: The evaluator function to run. + **kwargs: Arguments to pass to the evaluator. + + Returns: + List of Evaluation objects. + + Raises: + Exception: If evaluator raises an exception (not caught). + """ + result = evaluator(**kwargs) + + # Handle async evaluators + if asyncio.iscoroutine(result): + result = await result + + # Normalize to list + if isinstance(result, (dict, Evaluation)): + return [result] # type: ignore + elif isinstance(result, list): + return result + else: + return [] + + async def _run_mapper( + self, + mapper: MapperFunction, + item: Union[TraceWithFullDetails, ObservationsView], + ) -> EvaluatorInputs: + """Run mapper function (handles both sync and async mappers). + + Args: + mapper: The mapper function to run. + item: The API response object to map. + + Returns: + EvaluatorInputs instance. + + Raises: + Exception: If mapper raises an exception. + """ + result = mapper(item=item) + if asyncio.iscoroutine(result): + return await result # type: ignore + return result # type: ignore + + async def _run_composite_evaluator( + self, + composite_evaluator: CompositeEvaluatorFunction, + input: Optional[Any], + output: Optional[Any], + expected_output: Optional[Any], + metadata: Optional[Dict[str, Any]], + evaluations: List[Evaluation], + ) -> List[Evaluation]: + """Run composite evaluator function (handles both sync and async). + + Args: + composite_evaluator: The composite evaluator function. + input: The input data provided to the system. + output: The output generated by the system. + expected_output: The expected/reference output. + metadata: Additional metadata about the evaluation context. + evaluations: List of item-level evaluations. + + Returns: + List of Evaluation objects (normalized from single or list return). + + Raises: + Exception: If composite evaluator raises an exception. + """ + result = composite_evaluator( + input=input, + output=output, + expected_output=expected_output, + metadata=metadata, + evaluations=evaluations, + ) + if asyncio.iscoroutine(result): + result = await result + + # Normalize to list (same as regular evaluator) + if isinstance(result, (dict, Evaluation)): + return [result] # type: ignore + elif isinstance(result, list): + return result + else: + return [] + + def _create_score_for_scope( + self, + scope: str, + item_id: str, + evaluation: Evaluation, + additional_metadata: Optional[Dict[str, Any]], + ) -> None: + """Create a score linked to the appropriate entity based on scope. + + Args: + scope: The type of entity ("traces", "observations"). + item_id: The ID of the entity. + evaluation: The evaluation result to create a score from. + additional_metadata: Additional metadata to merge with evaluation metadata. + """ + # Merge metadata + score_metadata = { + **(evaluation.metadata or {}), + **(additional_metadata or {}), + } + + if scope == "traces": + self.client.create_score( + trace_id=item_id, + name=evaluation.name, + value=evaluation.value, # type: ignore + comment=evaluation.comment, + metadata=score_metadata, + data_type=evaluation.data_type, # type: ignore[arg-type] + config_id=evaluation.config_id, + ) + elif scope == "observations": + self.client.create_score( + observation_id=item_id, + name=evaluation.name, + value=evaluation.value, # type: ignore + comment=evaluation.comment, + metadata=score_metadata, + data_type=evaluation.data_type, # type: ignore[arg-type] + config_id=evaluation.config_id, + ) + + def _build_timestamp_filter( + self, + original_filter: Optional[str], + resume_from: Optional[BatchEvaluationResumeToken], + ) -> Optional[str]: + """Build filter with timestamp constraint for resume capability. + + Args: + original_filter: The original JSON filter string. + resume_from: Optional resume token with timestamp information. + + Returns: + Modified filter string with timestamp constraint, or original filter. + """ + if not resume_from: + return original_filter + + # Parse original filter (should be array) or create empty array + try: + filter_list = json.loads(original_filter) if original_filter else [] + if not isinstance(filter_list, list): + self._log.warning( + f"Filter should be a JSON array, got: {type(filter_list).__name__}" + ) + filter_list = [] + except json.JSONDecodeError: + self._log.warning( + f"Invalid JSON in original filter, ignoring: {original_filter}" + ) + filter_list = [] + + # Add timestamp constraint to filter array + timestamp_field = self._get_timestamp_field_for_scope(resume_from.scope) + timestamp_filter = { + "type": "datetime", + "column": timestamp_field, + "operator": ">", + "value": resume_from.last_processed_timestamp, + } + filter_list.append(timestamp_filter) + + return json.dumps(filter_list) + + @staticmethod + def _get_item_id( + item: Union[TraceWithFullDetails, ObservationsView], + scope: str, + ) -> str: + """Extract ID from item based on scope. + + Args: + item: The API response object. + scope: The type of item. + + Returns: + The item's ID. + """ + return item.id + + @staticmethod + def _get_item_timestamp( + item: Union[TraceWithFullDetails, ObservationsView], + scope: str, + ) -> str: + """Extract timestamp from item based on scope. + + Args: + item: The API response object. + scope: The type of item. + + Returns: + ISO 8601 timestamp string. + """ + if scope == "traces": + # Type narrowing for traces + if hasattr(item, "timestamp"): + return item.timestamp.isoformat() # type: ignore[attr-defined] + elif scope == "observations": + # Type narrowing for observations + if hasattr(item, "start_time"): + return item.start_time.isoformat() # type: ignore[attr-defined] + return "" + + @staticmethod + def _get_timestamp_field_for_scope(scope: str) -> str: + """Get the timestamp field name for filtering based on scope. + + Args: + scope: The type of items. + + Returns: + The field name to use in filters. + """ + if scope == "traces": + return "timestamp" + elif scope == "observations": + return "start_time" + return "timestamp" # Default + + def _build_result( + self, + total_items_fetched: int, + total_items_processed: int, + total_items_failed: int, + total_scores_created: int, + total_composite_scores_created: int, + total_evaluations_failed: int, + evaluator_stats_dict: Dict[str, EvaluatorStats], + resume_token: Optional[BatchEvaluationResumeToken], + completed: bool, + start_time: float, + failed_item_ids: List[str], + error_summary: Dict[str, int], + has_more_items: bool, + item_evaluations: Dict[str, List[Evaluation]], + ) -> BatchEvaluationResult: + """Build the final BatchEvaluationResult. + + Args: + total_items_fetched: Total items fetched. + total_items_processed: Items successfully processed. + total_items_failed: Items that failed. + total_scores_created: Scores from item evaluators. + total_composite_scores_created: Scores from composite evaluator. + total_evaluations_failed: Individual evaluator failures. + evaluator_stats_dict: Per-evaluator statistics. + resume_token: Resume token if incomplete. + completed: Whether evaluation completed fully. + start_time: Start time (unix timestamp). + failed_item_ids: IDs of failed items. + error_summary: Error type counts. + has_more_items: Whether more items exist. + item_evaluations: Dictionary mapping item IDs to their evaluation results. + + Returns: + BatchEvaluationResult instance. + """ + duration = time.time() - start_time + + return BatchEvaluationResult( + total_items_fetched=total_items_fetched, + total_items_processed=total_items_processed, + total_items_failed=total_items_failed, + total_scores_created=total_scores_created, + total_composite_scores_created=total_composite_scores_created, + total_evaluations_failed=total_evaluations_failed, + evaluator_stats=list(evaluator_stats_dict.values()), + resume_token=resume_token, + completed=completed, + duration_seconds=duration, + failed_item_ids=failed_item_ids, + error_summary=error_summary, + has_more_items=has_more_items, + item_evaluations=item_evaluations, + ) diff --git a/tests/test_batch_evaluation.py b/tests/test_batch_evaluation.py new file mode 100644 index 000000000..46bda13d6 --- /dev/null +++ b/tests/test_batch_evaluation.py @@ -0,0 +1,1077 @@ +"""Comprehensive tests for batch evaluation functionality. + +This test suite covers the run_batched_evaluation method which allows evaluating +traces, observations, and sessions fetched from Langfuse with mappers, evaluators, +and composite evaluators. +""" + +import asyncio +import time + +import pytest + +from langfuse import get_client +from langfuse.batch_evaluation import ( + BatchEvaluationResult, + BatchEvaluationResumeToken, + EvaluatorInputs, + EvaluatorStats, +) +from langfuse.experiment import Evaluation +from tests.utils import create_uuid + +# ============================================================================ +# FIXTURES & SETUP +# ============================================================================ + + +pytestmark = pytest.mark.skip(reason="Reason for skipping this file") + + +@pytest.fixture +def langfuse_client(): + """Get a Langfuse client for testing.""" + return get_client() + + +@pytest.fixture +def sample_trace_name(): + """Generate a unique trace name for filtering.""" + return f"batch-eval-test-{create_uuid()}" + + +def simple_trace_mapper(*, item): + """Simple mapper for traces.""" + return EvaluatorInputs( + input=item.input if hasattr(item, "input") else None, + output=item.output if hasattr(item, "output") else None, + expected_output=None, + metadata={"trace_id": item.id}, + ) + + +def simple_evaluator(*, input, output, expected_output=None, metadata=None, **kwargs): + """Simple evaluator that returns a score based on output length.""" + if output is None: + return Evaluation(name="length_score", value=0.0, comment="No output") + + return Evaluation( + name="length_score", + value=float(len(str(output))) / 10.0, + comment=f"Length: {len(str(output))}", + ) + + +# ============================================================================ +# BASIC FUNCTIONALITY TESTS +# ============================================================================ + + +def test_run_batched_evaluation_on_traces_basic(langfuse_client): + """Test basic batch evaluation on traces.""" + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + max_items=5, + verbose=True, + ) + + # Validate result structure + assert isinstance(result, BatchEvaluationResult) + assert result.total_items_fetched >= 0 + assert result.total_items_processed >= 0 + assert result.total_scores_created >= 0 + assert result.completed is True + assert isinstance(result.duration_seconds, float) + assert result.duration_seconds > 0 + + # Verify evaluator stats + assert len(result.evaluator_stats) == 1 + stats = result.evaluator_stats[0] + assert isinstance(stats, EvaluatorStats) + assert stats.name == "simple_evaluator" + + +def test_batch_evaluation_with_filter(langfuse_client): + """Test batch evaluation with JSON filter.""" + # Create a trace with specific tag + unique_tag = f"test-filter-{create_uuid()}" + with langfuse_client.start_as_current_span( + name=f"filtered-trace-{create_uuid()}" + ) as span: + span.update_trace( + input="Filtered test", + output="Filtered output", + tags=[unique_tag], + ) + + langfuse_client.flush() + time.sleep(3) + + # Filter format: array of filter conditions + filter_json = f'[{{"type": "arrayOptions", "column": "tags", "operator": "any of", "value": ["{unique_tag}"]}}]' + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + filter=filter_json, + verbose=True, + ) + + # Should only process the filtered trace + assert result.total_items_fetched >= 1 + assert result.completed is True + + +def test_batch_evaluation_with_metadata(langfuse_client): + """Test that additional metadata is added to all scores.""" + + def metadata_checking_evaluator(*, input, output, metadata=None, **kwargs): + return Evaluation( + name="test_score", + value=1.0, + metadata={"evaluator_data": "test"}, + ) + + additional_metadata = { + "batch_run_id": "test-batch-123", + "evaluation_version": "v2.0", + } + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[metadata_checking_evaluator], + metadata=additional_metadata, + max_items=2, + ) + + assert result.total_scores_created > 0 + + # Verify scores were created with merged metadata + langfuse_client.flush() + time.sleep(3) + + # Note: In a real test, you'd verify via API that metadata was merged + # For now, just verify the operation completed + assert result.completed is True + + +def test_result_structure_fields(langfuse_client): + """Test that BatchEvaluationResult has all expected fields.""" + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + max_items=3, + ) + + # Check all result fields exist + assert hasattr(result, "total_items_fetched") + assert hasattr(result, "total_items_processed") + assert hasattr(result, "total_items_failed") + assert hasattr(result, "total_scores_created") + assert hasattr(result, "total_composite_scores_created") + assert hasattr(result, "total_evaluations_failed") + assert hasattr(result, "evaluator_stats") + assert hasattr(result, "resume_token") + assert hasattr(result, "completed") + assert hasattr(result, "duration_seconds") + assert hasattr(result, "failed_item_ids") + assert hasattr(result, "error_summary") + assert hasattr(result, "has_more_items") + assert hasattr(result, "item_evaluations") + + # Check types + assert isinstance(result.evaluator_stats, list) + assert isinstance(result.failed_item_ids, list) + assert isinstance(result.error_summary, dict) + assert isinstance(result.completed, bool) + assert isinstance(result.has_more_items, bool) + assert isinstance(result.item_evaluations, dict) + + +# ============================================================================ +# MAPPER FUNCTION TESTS +# ============================================================================ + + +def test_simple_mapper(langfuse_client): + """Test basic mapper functionality.""" + + def custom_mapper(*, item): + return EvaluatorInputs( + input=item.input if hasattr(item, "input") else "no input", + output=item.output if hasattr(item, "output") else "no output", + expected_output=None, + metadata={"custom_field": "test_value"}, + ) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=custom_mapper, + evaluators=[simple_evaluator], + max_items=2, + ) + + assert result.total_items_processed > 0 + + +@pytest.mark.asyncio +async def test_async_mapper(langfuse_client): + """Test that async mappers work correctly.""" + + async def async_mapper(*, item): + await asyncio.sleep(0.01) # Simulate async work + return EvaluatorInputs( + input=item.input if hasattr(item, "input") else None, + output=item.output if hasattr(item, "output") else None, + expected_output=None, + metadata={"async": True}, + ) + + # Note: run_batched_evaluation is synchronous but handles async mappers + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=async_mapper, + evaluators=[simple_evaluator], + max_items=2, + ) + + assert result.total_items_processed > 0 + + +def test_mapper_failure_handling(langfuse_client): + """Test that mapper failures cause items to be skipped.""" + + def failing_mapper(*, item): + raise ValueError("Intentional mapper failure") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=failing_mapper, + evaluators=[simple_evaluator], + max_items=3, + ) + + # All items should fail due to mapper failures + assert result.total_items_failed > 0 + assert len(result.failed_item_ids) > 0 + assert "ValueError" in result.error_summary or "Exception" in result.error_summary + + +def test_mapper_with_missing_fields(langfuse_client): + """Test mapper handles traces with missing fields gracefully.""" + + def robust_mapper(*, item): + # Handle missing fields with defaults + input_val = getattr(item, "input", None) or "default_input" + output_val = getattr(item, "output", None) or "default_output" + + return EvaluatorInputs( + input=input_val, + output=output_val, + expected_output=None, + metadata={}, + ) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=robust_mapper, + evaluators=[simple_evaluator], + max_items=2, + ) + + assert result.total_items_processed > 0 + + +# ============================================================================ +# EVALUATOR TESTS +# ============================================================================ + + +def test_single_evaluator(langfuse_client): + """Test with a single evaluator.""" + + def quality_evaluator(*, input, output, **kwargs): + return Evaluation(name="quality", value=0.85, comment="High quality") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[quality_evaluator], + max_items=2, + ) + + assert result.total_scores_created > 0 + assert len(result.evaluator_stats) == 1 + assert result.evaluator_stats[0].name == "quality_evaluator" + + +def test_multiple_evaluators(langfuse_client): + """Test with multiple evaluators running in parallel.""" + + def accuracy_evaluator(*, input, output, **kwargs): + return Evaluation(name="accuracy", value=0.9) + + def relevance_evaluator(*, input, output, **kwargs): + return Evaluation(name="relevance", value=0.8) + + def safety_evaluator(*, input, output, **kwargs): + return Evaluation(name="safety", value=1.0) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[accuracy_evaluator, relevance_evaluator, safety_evaluator], + max_items=2, + ) + + # Should have 3 evaluators + assert len(result.evaluator_stats) == 3 + assert result.total_scores_created >= result.total_items_processed * 3 + + +@pytest.mark.asyncio +async def test_async_evaluator(langfuse_client): + """Test that async evaluators work correctly.""" + + async def async_evaluator(*, input, output, **kwargs): + await asyncio.sleep(0.01) # Simulate async work + return Evaluation(name="async_score", value=0.75) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[async_evaluator], + max_items=2, + ) + + assert result.total_scores_created > 0 + + +def test_evaluator_returning_list(langfuse_client): + """Test evaluator that returns multiple Evaluations.""" + + def multi_score_evaluator(*, input, output, **kwargs): + return [ + Evaluation(name="score_1", value=0.8), + Evaluation(name="score_2", value=0.9), + Evaluation(name="score_3", value=0.7), + ] + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[multi_score_evaluator], + max_items=2, + ) + + # Should create 3 scores per item + assert result.total_scores_created >= result.total_items_processed * 3 + + +def test_evaluator_failure_statistics(langfuse_client): + """Test that evaluator failures are tracked in statistics.""" + + def working_evaluator(*, input, output, **kwargs): + return Evaluation(name="working", value=1.0) + + def failing_evaluator(*, input, output, **kwargs): + raise RuntimeError("Intentional evaluator failure") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[working_evaluator, failing_evaluator], + max_items=3, + ) + + # Verify evaluator stats + assert len(result.evaluator_stats) == 2 + + working_stats = next( + s for s in result.evaluator_stats if s.name == "working_evaluator" + ) + assert working_stats.successful_runs > 0 + assert working_stats.failed_runs == 0 + + failing_stats = next( + s for s in result.evaluator_stats if s.name == "failing_evaluator" + ) + assert failing_stats.failed_runs > 0 + assert failing_stats.successful_runs == 0 + + # Total evaluations failed should be tracked + assert result.total_evaluations_failed > 0 + + +def test_mixed_sync_async_evaluators(langfuse_client): + """Test mixing synchronous and asynchronous evaluators.""" + + def sync_evaluator(*, input, output, **kwargs): + return Evaluation(name="sync_score", value=0.8) + + async def async_evaluator(*, input, output, **kwargs): + await asyncio.sleep(0.01) + return Evaluation(name="async_score", value=0.9) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[sync_evaluator, async_evaluator], + max_items=2, + ) + + assert len(result.evaluator_stats) == 2 + assert result.total_scores_created >= result.total_items_processed * 2 + + +# ============================================================================ +# COMPOSITE EVALUATOR TESTS +# ============================================================================ + + +def test_composite_evaluator_weighted_average(langfuse_client): + """Test composite evaluator that computes weighted average.""" + + def accuracy_evaluator(*, input, output, **kwargs): + return Evaluation(name="accuracy", value=0.8) + + def relevance_evaluator(*, input, output, **kwargs): + return Evaluation(name="relevance", value=0.9) + + def composite_evaluator(*, input, output, expected_output, metadata, evaluations): + weights = {"accuracy": 0.6, "relevance": 0.4} + total = sum( + e.value * weights.get(e.name, 0) + for e in evaluations + if isinstance(e.value, (int, float)) + ) + + return Evaluation( + name="composite_score", + value=total, + comment=f"Weighted average of {len(evaluations)} metrics", + ) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[accuracy_evaluator, relevance_evaluator], + composite_evaluator=composite_evaluator, + max_items=2, + ) + + # Should have both regular and composite scores + assert result.total_scores_created > 0 + assert result.total_composite_scores_created > 0 + assert result.total_scores_created > result.total_composite_scores_created + + +def test_composite_evaluator_pass_fail(langfuse_client): + """Test composite evaluator that implements pass/fail logic.""" + + def metric1_evaluator(*, input, output, **kwargs): + return Evaluation(name="metric1", value=0.9) + + def metric2_evaluator(*, input, output, **kwargs): + return Evaluation(name="metric2", value=0.7) + + def pass_fail_composite(*, input, output, expected_output, metadata, evaluations): + thresholds = {"metric1": 0.8, "metric2": 0.6} + + passes = all( + e.value >= thresholds.get(e.name, 0) + for e in evaluations + if isinstance(e.value, (int, float)) + ) + + return Evaluation( + name="passes_all_checks", + value=1.0 if passes else 0.0, + comment="All checks passed" if passes else "Some checks failed", + ) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[metric1_evaluator, metric2_evaluator], + composite_evaluator=pass_fail_composite, + max_items=2, + ) + + assert result.total_composite_scores_created > 0 + + +@pytest.mark.asyncio +async def test_async_composite_evaluator(langfuse_client): + """Test async composite evaluator.""" + + def evaluator1(*, input, output, **kwargs): + return Evaluation(name="eval1", value=0.8) + + async def async_composite(*, input, output, expected_output, metadata, evaluations): + await asyncio.sleep(0.01) # Simulate async processing + avg = sum( + e.value for e in evaluations if isinstance(e.value, (int, float)) + ) / len(evaluations) + return Evaluation(name="async_composite", value=avg) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[evaluator1], + composite_evaluator=async_composite, + max_items=2, + ) + + assert result.total_composite_scores_created > 0 + + +def test_composite_evaluator_with_no_evaluations(langfuse_client): + """Test composite evaluator when no evaluations are present.""" + + def always_failing_evaluator(*, input, output, **kwargs): + raise Exception("Always fails") + + def composite_evaluator(*, input, output, expected_output, metadata, evaluations): + # Should not be called if no evaluations succeed + return Evaluation(name="composite", value=0.0) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[always_failing_evaluator], + composite_evaluator=composite_evaluator, + max_items=2, + ) + + # Composite evaluator should not create scores if no evaluations + assert result.total_composite_scores_created == 0 + + +def test_composite_evaluator_failure_handling(langfuse_client): + """Test that composite evaluator failures are handled gracefully.""" + + def evaluator1(*, input, output, **kwargs): + return Evaluation(name="eval1", value=0.8) + + def failing_composite(*, input, output, expected_output, metadata, evaluations): + raise ValueError("Composite evaluator failed") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[evaluator1], + composite_evaluator=failing_composite, + max_items=2, + ) + + # Regular scores should still be created + assert result.total_scores_created > 0 + # But no composite scores + assert result.total_composite_scores_created == 0 + + +# ============================================================================ +# ERROR HANDLING TESTS +# ============================================================================ + + +def test_mapper_failure_skips_item(langfuse_client): + """Test that mapper failure causes item to be skipped.""" + + call_count = {"count": 0} + + def sometimes_failing_mapper(*, item): + call_count["count"] += 1 + if call_count["count"] % 2 == 0: + raise Exception("Mapper failed") + return simple_trace_mapper(item=item) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=sometimes_failing_mapper, + evaluators=[simple_evaluator], + max_items=4, + ) + + # Some items should fail, some should succeed + assert result.total_items_failed > 0 + assert result.total_items_processed > 0 + + +def test_evaluator_failure_continues(langfuse_client): + """Test that one evaluator failing doesn't stop others.""" + + def working_evaluator1(*, input, output, **kwargs): + return Evaluation(name="working1", value=0.8) + + def failing_evaluator(*, input, output, **kwargs): + raise Exception("Evaluator failed") + + def working_evaluator2(*, input, output, **kwargs): + return Evaluation(name="working2", value=0.9) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[working_evaluator1, failing_evaluator, working_evaluator2], + max_items=2, + ) + + # Working evaluators should still create scores + assert result.total_scores_created >= result.total_items_processed * 2 + + # Failing evaluator should be tracked + failing_stats = next( + s for s in result.evaluator_stats if s.name == "failing_evaluator" + ) + assert failing_stats.failed_runs > 0 + + +def test_all_evaluators_fail(langfuse_client): + """Test when all evaluators fail but item is still processed.""" + + def failing_evaluator1(*, input, output, **kwargs): + raise Exception("Failed 1") + + def failing_evaluator2(*, input, output, **kwargs): + raise Exception("Failed 2") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[failing_evaluator1, failing_evaluator2], + max_items=2, + ) + + # Items should be processed even if all evaluators fail + assert result.total_items_processed > 0 + # But no scores created + assert result.total_scores_created == 0 + # All evaluations failed + assert result.total_evaluations_failed > 0 + + +# ============================================================================ +# EDGE CASES TESTS +# ============================================================================ + + +def test_empty_results_handling(langfuse_client): + """Test batch evaluation when filter returns no items.""" + nonexistent_name = f"nonexistent-trace-{create_uuid()}" + nonexistent_filter = f'[{{"type": "string", "column": "name", "operator": "=", "value": "{nonexistent_name}"}}]' + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + filter=nonexistent_filter, + ) + + assert result.total_items_fetched == 0 + assert result.total_items_processed == 0 + assert result.total_scores_created == 0 + assert result.completed is True + assert result.has_more_items is False + + +def test_max_items_zero(langfuse_client): + """Test with max_items=0 (should process no items).""" + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + max_items=0, + ) + + assert result.total_items_fetched == 0 + assert result.total_items_processed == 0 + + +def test_evaluation_value_type_conversions(langfuse_client): + """Test that different evaluation value types are handled correctly.""" + + def multi_type_evaluator(*, input, output, **kwargs): + return [ + Evaluation(name="int_score", value=5), # int + Evaluation(name="float_score", value=0.85), # float + Evaluation(name="bool_score", value=True), # bool + Evaluation(name="none_score", value=None), # None + ] + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[multi_type_evaluator], + max_items=1, + ) + + # All value types should be converted and scores created + assert result.total_scores_created >= 4 + + +# ============================================================================ +# PAGINATION TESTS +# ============================================================================ + + +def test_pagination_with_max_items(langfuse_client): + """Test that max_items limit is respected.""" + # Create more traces to ensure we have enough data + for i in range(10): + with langfuse_client.start_as_current_span( + name=f"pagination-test-{create_uuid()}" + ) as span: + span.update_trace( + input=f"Input {i}", + output=f"Output {i}", + tags=["pagination_test"], + ) + + langfuse_client.flush() + time.sleep(3) + + filter_json = '[{"type": "arrayOptions", "column": "tags", "operator": "any of", "value": ["pagination_test"]}]' + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + filter=filter_json, + max_items=5, + fetch_batch_size=2, + ) + + # Should not exceed max_items + assert result.total_items_processed <= 5 + + +def test_has_more_items_flag(langfuse_client): + """Test that has_more_items flag is set correctly when max_items is reached.""" + # Create enough traces to exceed max_items + batch_tag = f"batch-test-{create_uuid()}" + for i in range(15): + with langfuse_client.start_as_current_span(name=f"more-items-test-{i}") as span: + span.update_trace( + input=f"Input {i}", + output=f"Output {i}", + tags=[batch_tag], + ) + + langfuse_client.flush() + time.sleep(3) + + filter_json = f'[{{"type": "arrayOptions", "column": "tags", "operator": "any of", "value": ["{batch_tag}"]}}]' + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + filter=filter_json, + max_items=5, + fetch_batch_size=2, + ) + + # has_more_items should be True if we hit the limit + if result.total_items_fetched >= 5: + assert result.has_more_items is True + + +def test_fetch_batch_size_parameter(langfuse_client): + """Test that different fetch_batch_size values work correctly.""" + for batch_size in [1, 5, 10]: + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + max_items=3, + fetch_batch_size=batch_size, + ) + + # Should complete regardless of batch size + assert result.completed is True or result.total_items_processed > 0 + + +# ============================================================================ +# RESUME FUNCTIONALITY TESTS +# ============================================================================ + + +def test_resume_token_structure(langfuse_client): + """Test that BatchEvaluationResumeToken has correct structure.""" + resume_token = BatchEvaluationResumeToken( + scope="traces", + filter='{"test": "filter"}', + last_processed_timestamp="2024-01-01T00:00:00Z", + last_processed_id="trace-123", + items_processed=10, + ) + + assert resume_token.scope == "traces" + assert resume_token.filter == '{"test": "filter"}' + assert resume_token.last_processed_timestamp == "2024-01-01T00:00:00Z" + assert resume_token.last_processed_id == "trace-123" + assert resume_token.items_processed == 10 + + +# ============================================================================ +# CONCURRENCY TESTS +# ============================================================================ + + +def test_max_concurrency_parameter(langfuse_client): + """Test that max_concurrency parameter works correctly.""" + for concurrency in [1, 5, 10]: + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + max_items=3, + max_concurrency=concurrency, + ) + + # Should complete regardless of concurrency + assert result.completed is True or result.total_items_processed > 0 + + +# ============================================================================ +# STATISTICS TESTS +# ============================================================================ + + +def test_evaluator_stats_structure(langfuse_client): + """Test that EvaluatorStats has correct structure.""" + + def test_evaluator(*, input, output, **kwargs): + return Evaluation(name="test", value=1.0) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[test_evaluator], + max_items=2, + ) + + assert len(result.evaluator_stats) == 1 + stats = result.evaluator_stats[0] + + # Check all fields exist + assert hasattr(stats, "name") + assert hasattr(stats, "total_runs") + assert hasattr(stats, "successful_runs") + assert hasattr(stats, "failed_runs") + assert hasattr(stats, "total_scores_created") + + # Check values + assert stats.name == "test_evaluator" + assert stats.total_runs == result.total_items_processed + assert stats.successful_runs == result.total_items_processed + assert stats.failed_runs == 0 + + +def test_evaluator_stats_tracking(langfuse_client): + """Test that evaluator statistics are tracked correctly.""" + + call_count = {"count": 0} + + def sometimes_failing_evaluator(*, input, output, **kwargs): + call_count["count"] += 1 + if call_count["count"] % 2 == 0: + raise Exception("Failed") + return Evaluation(name="test", value=1.0) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[sometimes_failing_evaluator], + max_items=4, + ) + + stats = result.evaluator_stats[0] + assert stats.total_runs == result.total_items_processed + assert stats.successful_runs > 0 + assert stats.failed_runs > 0 + assert stats.successful_runs + stats.failed_runs == stats.total_runs + + +def test_error_summary_aggregation(langfuse_client): + """Test that error types are aggregated correctly in error_summary.""" + + def failing_mapper(*, item): + raise ValueError("Mapper error") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=failing_mapper, + evaluators=[simple_evaluator], + max_items=3, + ) + + # Error summary should contain the error type + assert len(result.error_summary) > 0 + assert any("Error" in key for key in result.error_summary.keys()) + + +def test_failed_item_ids_collected(langfuse_client): + """Test that failed item IDs are collected.""" + + def failing_mapper(*, item): + raise Exception("Failed") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=failing_mapper, + evaluators=[simple_evaluator], + max_items=3, + ) + + assert len(result.failed_item_ids) > 0 + # Each failed ID should be a string + assert all(isinstance(item_id, str) for item_id in result.failed_item_ids) + + +# ============================================================================ +# PERFORMANCE TESTS +# ============================================================================ + + +def test_duration_tracking(langfuse_client): + """Test that duration is tracked correctly.""" + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + max_items=2, + ) + + assert result.duration_seconds > 0 + assert result.duration_seconds < 60 # Should complete quickly for small batch + + +def test_verbose_logging(langfuse_client): + """Test that verbose=True doesn't cause errors.""" + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[simple_evaluator], + max_items=2, + verbose=True, # Should log progress + ) + + assert result.completed is True + + +# ============================================================================ +# ITEM EVALUATIONS TESTS +# ============================================================================ + + +def test_item_evaluations_basic(langfuse_client): + """Test that item_evaluations dict contains correct structure.""" + + def test_evaluator(*, input, output, **kwargs): + return Evaluation(name="test_metric", value=0.5) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[test_evaluator], + max_items=3, + ) + + # Check that item_evaluations is a dict + assert isinstance(result.item_evaluations, dict) + + # Should have evaluations for each processed item + assert len(result.item_evaluations) == result.total_items_processed + + # Each entry should be a list of Evaluation objects + for item_id, evaluations in result.item_evaluations.items(): + assert isinstance(item_id, str) + assert isinstance(evaluations, list) + assert all(isinstance(e, Evaluation) for e in evaluations) + # Should have one evaluation per evaluator + assert len(evaluations) == 1 + assert evaluations[0].name == "test_metric" + + +def test_item_evaluations_multiple_evaluators(langfuse_client): + """Test item_evaluations with multiple evaluators.""" + + def accuracy_evaluator(*, input, output, **kwargs): + return Evaluation(name="accuracy", value=0.8) + + def relevance_evaluator(*, input, output, **kwargs): + return Evaluation(name="relevance", value=0.9) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[accuracy_evaluator, relevance_evaluator], + max_items=2, + ) + + # Check structure + assert len(result.item_evaluations) == result.total_items_processed + + # Each item should have evaluations from both evaluators + for item_id, evaluations in result.item_evaluations.items(): + assert len(evaluations) == 2 + eval_names = {e.name for e in evaluations} + assert eval_names == {"accuracy", "relevance"} + + +def test_item_evaluations_with_composite(langfuse_client): + """Test that item_evaluations includes composite evaluations.""" + + def base_evaluator(*, input, output, **kwargs): + return Evaluation(name="base_score", value=0.7) + + def composite_evaluator(*, input, output, expected_output, metadata, evaluations): + return Evaluation( + name="composite_score", + value=sum( + e.value for e in evaluations if isinstance(e.value, (int, float)) + ), + ) + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=simple_trace_mapper, + evaluators=[base_evaluator], + composite_evaluator=composite_evaluator, + max_items=2, + ) + + # Each item should have both base and composite evaluations + for item_id, evaluations in result.item_evaluations.items(): + assert len(evaluations) == 2 + eval_names = {e.name for e in evaluations} + assert eval_names == {"base_score", "composite_score"} + + # Verify composite scores were created + assert result.total_composite_scores_created > 0 + + +def test_item_evaluations_empty_on_failure(langfuse_client): + """Test that failed items don't appear in item_evaluations.""" + + def failing_mapper(*, item): + raise Exception("Mapper failed") + + result = langfuse_client.run_batched_evaluation( + scope="traces", + mapper=failing_mapper, + evaluators=[simple_evaluator], + max_items=3, + ) + + # All items failed, so item_evaluations should be empty + assert len(result.item_evaluations) == 0 + assert result.total_items_failed > 0 diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index 81a874ae4..39c8de92d 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -1959,9 +1959,9 @@ def test_start_as_current_observation_types(): expected_types = {obs_type.upper() for obs_type in observation_types} | { "SPAN" } # includes parent span - assert expected_types.issubset( - found_types - ), f"Missing types: {expected_types - found_types}" + assert expected_types.issubset(found_types), ( + f"Missing types: {expected_types - found_types}" + ) # Verify each specific observation exists for obs_type in observation_types: @@ -2005,25 +2005,25 @@ def test_that_generation_like_properties_are_actually_created(): ) as obs: # Verify the properties are accessible on the observation object if hasattr(obs, "model"): - assert ( - obs.model == test_model - ), f"{obs_type} should have model property" + assert obs.model == test_model, ( + f"{obs_type} should have model property" + ) if hasattr(obs, "completion_start_time"): - assert ( - obs.completion_start_time == test_completion_start_time - ), f"{obs_type} should have completion_start_time property" + assert obs.completion_start_time == test_completion_start_time, ( + f"{obs_type} should have completion_start_time property" + ) if hasattr(obs, "model_parameters"): - assert ( - obs.model_parameters == test_model_parameters - ), f"{obs_type} should have model_parameters property" + assert obs.model_parameters == test_model_parameters, ( + f"{obs_type} should have model_parameters property" + ) if hasattr(obs, "usage_details"): - assert ( - obs.usage_details == test_usage_details - ), f"{obs_type} should have usage_details property" + assert obs.usage_details == test_usage_details, ( + f"{obs_type} should have usage_details property" + ) if hasattr(obs, "cost_details"): - assert ( - obs.cost_details == test_cost_details - ), f"{obs_type} should have cost_details property" + assert obs.cost_details == test_cost_details, ( + f"{obs_type} should have cost_details property" + ) langfuse.flush() @@ -2037,28 +2037,28 @@ def test_that_generation_like_properties_are_actually_created(): for obs in trace.observations if obs.name == f"test-{obs_type}" and obs.type == obs_type.upper() ] - assert ( - len(observations) == 1 - ), f"Expected one {obs_type.upper()} observation, but found {len(observations)}" + assert len(observations) == 1, ( + f"Expected one {obs_type.upper()} observation, but found {len(observations)}" + ) obs = observations[0] assert obs.model == test_model, f"{obs_type} should have model property" - assert ( - obs.model_parameters == test_model_parameters - ), f"{obs_type} should have model_parameters property" + assert obs.model_parameters == test_model_parameters, ( + f"{obs_type} should have model_parameters property" + ) # usage_details assert hasattr(obs, "usage_details"), f"{obs_type} should have usage_details" - assert obs.usage_details == dict( - test_usage_details, total=30 - ), f"{obs_type} should persist usage_details" # API adds total + assert obs.usage_details == dict(test_usage_details, total=30), ( + f"{obs_type} should persist usage_details" + ) # API adds total - assert ( - obs.cost_details == test_cost_details - ), f"{obs_type} should persist cost_details" + assert obs.cost_details == test_cost_details, ( + f"{obs_type} should persist cost_details" + ) # completion_start_time, because of time skew not asserting time - assert ( - obs.completion_start_time is not None - ), f"{obs_type} should persist completion_start_time property" + assert obs.completion_start_time is not None, ( + f"{obs_type} should persist completion_start_time property" + ) diff --git a/tests/test_decorators.py b/tests/test_decorators.py index 0eac5c617..0c82c1a6f 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -1728,12 +1728,12 @@ def root_function(): # Verify results assert items == ["item_0", "item_1", "item_2"] - assert ( - span_info["generator_span_id"] != "0000000000000000" - ), "Generator context should be preserved" - assert ( - span_info["root_span_id"] != span_info["generator_span_id"] - ), "Should have different span IDs" + assert span_info["generator_span_id"] != "0000000000000000", ( + "Generator context should be preserved" + ) + assert span_info["root_span_id"] != span_info["generator_span_id"], ( + "Should have different span IDs" + ) # Verify trace structure trace_data = get_api().trace.get(mock_trace_id) @@ -1794,12 +1794,12 @@ async def root_function(): # Verify results assert items == ["async_item_0", "async_item_1", "async_item_2"] - assert ( - span_info["generator_span_id"] != "0000000000000000" - ), "Generator context should be preserved" - assert ( - span_info["root_span_id"] != span_info["generator_span_id"] - ), "Should have different span IDs" + assert span_info["generator_span_id"] != "0000000000000000", ( + "Generator context should be preserved" + ) + assert span_info["root_span_id"] != span_info["generator_span_id"], ( + "Should have different span IDs" + ) # Verify trace structure trace_data = get_api().trace.get(mock_trace_id) @@ -1860,15 +1860,15 @@ async def parent_function(): assert items == ["child_0", "child_1"] # Verify span hierarchy - assert ( - span_info["parent_span_id"] != span_info["child_span_id"] - ), "Parent and child should have different span IDs" - assert ( - span_info["parent_trace_id"] == span_info["child_trace_id"] - ), "Parent and child should share same trace ID" - assert ( - span_info["child_span_id"] != "0000000000000000" - ), "Child context should be preserved" + assert span_info["parent_span_id"] != span_info["child_span_id"], ( + "Parent and child should have different span IDs" + ) + assert span_info["parent_trace_id"] == span_info["child_trace_id"], ( + "Parent and child should share same trace ID" + ) + assert span_info["child_span_id"] != "0000000000000000", ( + "Child context should be preserved" + ) # Verify trace structure trace_data = get_api().trace.get(mock_trace_id) diff --git a/tests/test_deprecation.py b/tests/test_deprecation.py index bcb2626b9..edda545fd 100644 --- a/tests/test_deprecation.py +++ b/tests/test_deprecation.py @@ -109,12 +109,12 @@ def test_deprecated_function_warnings(self, langfuse_client, func_info): deprecation_warnings = [ w for w in warning_list if issubclass(w.category, DeprecationWarning) ] - assert ( - len(deprecation_warnings) > 0 - ), f"No DeprecationWarning emitted for {target}.{method_name}" + assert len(deprecation_warnings) > 0, ( + f"No DeprecationWarning emitted for {target}.{method_name}" + ) # Check that the warning message matches expected warning_messages = [str(w.message) for w in deprecation_warnings] - assert ( - expected_message in warning_messages - ), f"Expected warning message not found for {target}.{method_name}. Got: {warning_messages}" + assert expected_message in warning_messages, ( + f"Expected warning message not found for {target}.{method_name}. Got: {warning_messages}" + ) diff --git a/tests/test_experiments.py b/tests/test_experiments.py index 71f2e5926..3ba8b4afa 100644 --- a/tests/test_experiments.py +++ b/tests/test_experiments.py @@ -106,33 +106,33 @@ def test_run_experiment_on_local_dataset(sample_dataset): assert trace is not None, f"Trace {trace_id} should exist" # Validate trace name - assert ( - trace.name == "experiment-item-run" - ), f"Trace {trace_id} should have correct name" + assert trace.name == "experiment-item-run", ( + f"Trace {trace_id} should have correct name" + ) # Validate trace input - should contain the experiment item assert trace.input is not None, f"Trace {trace_id} should have input" expected_input = expected_inputs[i] # The input should contain the item data in some form - assert expected_input in str( - trace.input - ), f"Trace {trace_id} input should contain '{expected_input}'" + assert expected_input in str(trace.input), ( + f"Trace {trace_id} input should contain '{expected_input}'" + ) # Validate trace output - should be the task result assert trace.output is not None, f"Trace {trace_id} should have output" expected_output = expected_outputs[i] - assert ( - trace.output == expected_output - ), f"Trace {trace_id} output should be '{expected_output}', got '{trace.output}'" + assert trace.output == expected_output, ( + f"Trace {trace_id} output should be '{expected_output}', got '{trace.output}'" + ) # Validate trace metadata contains experiment name assert trace.metadata is not None, f"Trace {trace_id} should have metadata" - assert ( - "experiment_name" in trace.metadata - ), f"Trace {trace_id} metadata should contain experiment_name" - assert ( - trace.metadata["experiment_name"] == "Euro capitals" - ), f"Trace {trace_id} metadata should have correct experiment_name" + assert "experiment_name" in trace.metadata, ( + f"Trace {trace_id} metadata should contain experiment_name" + ) + assert trace.metadata["experiment_name"] == "Euro capitals", ( + f"Trace {trace_id} metadata should have correct experiment_name" + ) def test_run_experiment_on_langfuse_dataset(): @@ -199,9 +199,9 @@ def test_run_experiment_on_langfuse_dataset(): assert trace is not None, f"Trace {trace_id} should exist" # Validate trace name - assert ( - trace.name == "experiment-item-run" - ), f"Trace {trace_id} should have correct name" + assert trace.name == "experiment-item-run", ( + f"Trace {trace_id} should have correct name" + ) # Validate trace input and output match expected pairs assert trace.input is not None, f"Trace {trace_id} should have input" @@ -214,54 +214,54 @@ def test_run_experiment_on_langfuse_dataset(): matching_input = expected_input break - assert ( - matching_input is not None - ), f"Trace {trace_id} input '{trace_input_str}' should contain one of {list(expected_data.keys())}" + assert matching_input is not None, ( + f"Trace {trace_id} input '{trace_input_str}' should contain one of {list(expected_data.keys())}" + ) # Validate trace output matches the expected output for this input assert trace.output is not None, f"Trace {trace_id} should have output" expected_output = expected_data[matching_input] - assert ( - trace.output == expected_output - ), f"Trace {trace_id} output should be '{expected_output}', got '{trace.output}'" + assert trace.output == expected_output, ( + f"Trace {trace_id} output should be '{expected_output}', got '{trace.output}'" + ) # Validate trace metadata contains experiment and dataset info assert trace.metadata is not None, f"Trace {trace_id} should have metadata" - assert ( - "experiment_name" in trace.metadata - ), f"Trace {trace_id} metadata should contain experiment_name" - assert ( - trace.metadata["experiment_name"] == experiment_name - ), f"Trace {trace_id} metadata should have correct experiment_name" + assert "experiment_name" in trace.metadata, ( + f"Trace {trace_id} metadata should contain experiment_name" + ) + assert trace.metadata["experiment_name"] == experiment_name, ( + f"Trace {trace_id} metadata should have correct experiment_name" + ) # Validate dataset-specific metadata fields - assert ( - "dataset_id" in trace.metadata - ), f"Trace {trace_id} metadata should contain dataset_id" - assert ( - trace.metadata["dataset_id"] == dataset.id - ), f"Trace {trace_id} metadata should have correct dataset_id" + assert "dataset_id" in trace.metadata, ( + f"Trace {trace_id} metadata should contain dataset_id" + ) + assert trace.metadata["dataset_id"] == dataset.id, ( + f"Trace {trace_id} metadata should have correct dataset_id" + ) - assert ( - "dataset_item_id" in trace.metadata - ), f"Trace {trace_id} metadata should contain dataset_item_id" + assert "dataset_item_id" in trace.metadata, ( + f"Trace {trace_id} metadata should contain dataset_item_id" + ) # Get the dataset item ID from metadata and validate it exists dataset_item_id = trace.metadata["dataset_item_id"] - assert ( - dataset_item_id in dataset_item_map - ), f"Trace {trace_id} metadata dataset_item_id should correspond to a valid dataset item" + assert dataset_item_id in dataset_item_map, ( + f"Trace {trace_id} metadata dataset_item_id should correspond to a valid dataset item" + ) # Validate the dataset item input matches the trace input dataset_item = dataset_item_map[dataset_item_id] - assert ( - dataset_item.input == matching_input - ), f"Trace {trace_id} should correspond to dataset item with input '{matching_input}'" + assert dataset_item.input == matching_input, ( + f"Trace {trace_id} should correspond to dataset item with input '{matching_input}'" + ) assert dataset_run is not None, f"Dataset run {dataset_run_id} should exist" assert dataset_run.name == result.run_name, "Dataset run should have correct name" - assert ( - dataset_run.description == "Test on Langfuse dataset" - ), "Dataset run should have correct description" + assert dataset_run.description == "Test on Langfuse dataset", ( + "Dataset run should have correct description" + ) # Get dataset run items to verify trace linkage dataset_run_items = api.dataset_run_items.list( @@ -797,3 +797,59 @@ def mock_task_with_boolean_results(*, item: ExperimentItem, **kwargs): for score in trace.scores: assert score.data_type == "BOOLEAN" + + +def test_experiment_composite_evaluator_weighted_average(): + """Test composite evaluator in experiments that computes weighted average.""" + langfuse_client = get_client() + + def accuracy_evaluator(*, input, output, **kwargs): + return Evaluation(name="accuracy", value=0.8) + + def relevance_evaluator(*, input, output, **kwargs): + return Evaluation(name="relevance", value=0.9) + + def composite_evaluator(*, input, output, expected_output, metadata, evaluations): + weights = {"accuracy": 0.6, "relevance": 0.4} + total = sum( + e.value * weights.get(e.name, 0) + for e in evaluations + if isinstance(e.value, (int, float)) + ) + + return Evaluation( + name="composite_score", + value=total, + comment=f"Weighted average of {len(evaluations)} metrics", + ) + + data = [ + {"input": "Test 1", "expected_output": "Output 1"}, + {"input": "Test 2", "expected_output": "Output 2"}, + ] + + result = langfuse_client.run_experiment( + name=f"Composite Test {create_uuid()}", + data=data, + task=mock_task, + evaluators=[accuracy_evaluator, relevance_evaluator], + composite_evaluator=composite_evaluator, + ) + + # Verify results + assert len(result.item_results) == 2 + + for item_result in result.item_results: + # Should have 3 evaluations: accuracy, relevance, and composite_score + assert len(item_result.evaluations) == 3 + eval_names = [e.name for e in item_result.evaluations] + assert "accuracy" in eval_names + assert "relevance" in eval_names + assert "composite_score" in eval_names + + # Check composite score value + composite_eval = next( + e for e in item_result.evaluations if e.name == "composite_score" + ) + expected_value = 0.8 * 0.6 + 0.9 * 0.4 # 0.84 + assert abs(composite_eval.value - expected_value) < 0.001 diff --git a/tests/test_langchain.py b/tests/test_langchain.py index b4cf828b2..14c25446f 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -814,9 +814,9 @@ def _generate_random_dict(n: int, key_length: int = 8) -> Dict[str, Any]: overhead = duration_with_langfuse - duration_without_langfuse print(f"Langfuse overhead: {overhead}ms") - assert ( - overhead < 100 - ), f"Langfuse tracing overhead of {overhead}ms exceeds threshold" + assert overhead < 100, ( + f"Langfuse tracing overhead of {overhead}ms exceeds threshold" + ) langfuse.flush() diff --git a/tests/test_otel.py b/tests/test_otel.py index ca87691db..89c028c68 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -207,14 +207,14 @@ def verify_span_attribute( ): """Verify that a span has a specific attribute with an optional expected value.""" attributes = span_data["attributes"] - assert ( - attribute_key in attributes - ), f"Attribute {attribute_key} not found in span" + assert attribute_key in attributes, ( + f"Attribute {attribute_key} not found in span" + ) if expected_value is not None: - assert ( - attributes[attribute_key] == expected_value - ), f"Expected {attribute_key} to be {expected_value}, got {attributes[attribute_key]}" + assert attributes[attribute_key] == expected_value, ( + f"Expected {attribute_key} to be {expected_value}, got {attributes[attribute_key]}" + ) return attributes[attribute_key] @@ -226,20 +226,20 @@ def verify_json_attribute( parsed_json = json.loads(json_string) if expected_dict is not None: - assert ( - parsed_json == expected_dict - ), f"Expected JSON {attribute_key} to be {expected_dict}, got {parsed_json}" + assert parsed_json == expected_dict, ( + f"Expected JSON {attribute_key} to be {expected_dict}, got {parsed_json}" + ) return parsed_json def assert_parent_child_relationship(self, parent_span: dict, child_span: dict): """Verify parent-child relationship between two spans.""" - assert ( - child_span["parent_span_id"] == parent_span["span_id"] - ), f"Child span {child_span['name']} should have parent {parent_span['name']}" - assert ( - child_span["trace_id"] == parent_span["trace_id"] - ), f"Child span {child_span['name']} should have same trace ID as parent {parent_span['name']}" + assert child_span["parent_span_id"] == parent_span["span_id"], ( + f"Child span {child_span['name']} should have parent {parent_span['name']}" + ) + assert child_span["trace_id"] == parent_span["trace_id"], ( + f"Child span {child_span['name']} should have same trace ID as parent {parent_span['name']}" + ) class TestBasicSpans(TestOTelBase): @@ -255,9 +255,9 @@ def test_basic_span_creation(self, langfuse_client, memory_exporter): spans = self.get_spans_by_name(memory_exporter, "test-span") # Verify we created exactly one span - assert ( - len(spans) == 1 - ), f"Expected 1 span named 'test-span', but found {len(spans)}" + assert len(spans) == 1, ( + f"Expected 1 span named 'test-span', but found {len(spans)}" + ) span_data = spans[0] # Verify the span attributes @@ -617,9 +617,9 @@ def test_start_as_current_observation_types(self, langfuse_client, memory_export for obs_type in observation_types: expected_name = f"test-{obs_type}" matching_spans = [span for span in spans if span["name"] == expected_name] - assert ( - len(matching_spans) == 1 - ), f"Expected one span with name {expected_name}" + assert len(matching_spans) == 1, ( + f"Expected one span with name {expected_name}" + ) span_data = matching_spans[0] expected_otel_type = obs_type # OTEL attributes use lowercase @@ -627,9 +627,9 @@ def test_start_as_current_observation_types(self, langfuse_client, memory_export LangfuseOtelSpanAttributes.OBSERVATION_TYPE ) - assert ( - actual_type == expected_otel_type - ), f"Expected observation type {expected_otel_type}, got {actual_type}" + assert actual_type == expected_otel_type, ( + f"Expected observation type {expected_otel_type}, got {actual_type}" + ) def test_start_observation(self, langfuse_client, memory_exporter): """Test creating different observation types using start_observation.""" @@ -690,81 +690,81 @@ def test_start_observation(self, langfuse_client, memory_exporter): for obs_type in observation_types: expected_name = f"factory-{obs_type}" matching_spans = [span for span in spans if span["name"] == expected_name] - assert ( - len(matching_spans) == 1 - ), f"Expected one span with name {expected_name}, found {len(matching_spans)}" + assert len(matching_spans) == 1, ( + f"Expected one span with name {expected_name}, found {len(matching_spans)}" + ) span_data = matching_spans[0] actual_type = span_data["attributes"].get( LangfuseOtelSpanAttributes.OBSERVATION_TYPE ) - assert ( - actual_type == obs_type - ), f"Factory pattern failed: Expected observation type {obs_type}, got {actual_type}" + assert actual_type == obs_type, ( + f"Factory pattern failed: Expected observation type {obs_type}, got {actual_type}" + ) # Ensure returned objects are of correct types for obs_type, obs_instance in created_observations: if obs_type == "span": from langfuse._client.span import LangfuseSpan - assert isinstance( - obs_instance, LangfuseSpan - ), f"Expected LangfuseSpan, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseSpan), ( + f"Expected LangfuseSpan, got {type(obs_instance)}" + ) elif obs_type == "generation": from langfuse._client.span import LangfuseGeneration - assert isinstance( - obs_instance, LangfuseGeneration - ), f"Expected LangfuseGeneration, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseGeneration), ( + f"Expected LangfuseGeneration, got {type(obs_instance)}" + ) elif obs_type == "agent": from langfuse._client.span import LangfuseAgent - assert isinstance( - obs_instance, LangfuseAgent - ), f"Expected LangfuseAgent, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseAgent), ( + f"Expected LangfuseAgent, got {type(obs_instance)}" + ) elif obs_type == "tool": from langfuse._client.span import LangfuseTool - assert isinstance( - obs_instance, LangfuseTool - ), f"Expected LangfuseTool, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseTool), ( + f"Expected LangfuseTool, got {type(obs_instance)}" + ) elif obs_type == "chain": from langfuse._client.span import LangfuseChain - assert isinstance( - obs_instance, LangfuseChain - ), f"Expected LangfuseChain, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseChain), ( + f"Expected LangfuseChain, got {type(obs_instance)}" + ) elif obs_type == "retriever": from langfuse._client.span import LangfuseRetriever - assert isinstance( - obs_instance, LangfuseRetriever - ), f"Expected LangfuseRetriever, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseRetriever), ( + f"Expected LangfuseRetriever, got {type(obs_instance)}" + ) elif obs_type == "evaluator": from langfuse._client.span import LangfuseEvaluator - assert isinstance( - obs_instance, LangfuseEvaluator - ), f"Expected LangfuseEvaluator, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseEvaluator), ( + f"Expected LangfuseEvaluator, got {type(obs_instance)}" + ) elif obs_type == "embedding": from langfuse._client.span import LangfuseEmbedding - assert isinstance( - obs_instance, LangfuseEmbedding - ), f"Expected LangfuseEmbedding, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseEmbedding), ( + f"Expected LangfuseEmbedding, got {type(obs_instance)}" + ) elif obs_type == "guardrail": from langfuse._client.span import LangfuseGuardrail - assert isinstance( - obs_instance, LangfuseGuardrail - ), f"Expected LangfuseGuardrail, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseGuardrail), ( + f"Expected LangfuseGuardrail, got {type(obs_instance)}" + ) elif obs_type == "event": from langfuse._client.span import LangfuseEvent - assert isinstance( - obs_instance, LangfuseEvent - ), f"Expected LangfuseEvent, got {type(obs_instance)}" + assert isinstance(obs_instance, LangfuseEvent), ( + f"Expected LangfuseEvent, got {type(obs_instance)}" + ) def test_custom_trace_id(self, langfuse_client, memory_exporter): """Test setting a custom trace ID.""" @@ -785,9 +785,9 @@ def test_custom_trace_id(self, langfuse_client, memory_exporter): assert len(spans) == 1, "Expected one span" span_data = spans[0] - assert ( - span_data["trace_id"] == custom_trace_id - ), "Trace ID doesn't match custom ID" + assert span_data["trace_id"] == custom_trace_id, ( + "Trace ID doesn't match custom ID" + ) assert span_data["attributes"][LangfuseOtelSpanAttributes.AS_ROOT] is True # Test additional spans with the same trace context @@ -799,9 +799,9 @@ def test_custom_trace_id(self, langfuse_client, memory_exporter): # Verify child span uses the same trace ID child_spans = self.get_spans_by_name(memory_exporter, "child-span") assert len(child_spans) == 1, "Expected one child span" - assert ( - child_spans[0]["trace_id"] == custom_trace_id - ), "Child span has wrong trace ID" + assert child_spans[0]["trace_id"] == custom_trace_id, ( + "Child span has wrong trace ID" + ) def test_custom_parent_span_id(self, langfuse_client, memory_exporter): """Test setting a custom parent span ID.""" @@ -1115,9 +1115,9 @@ def test_non_error_levels_dont_set_otel_status( from opentelemetry.trace.status import StatusCode # Default status should be UNSET, not ERROR - assert ( - raw_span.status.status_code != StatusCode.ERROR - ), f"Level {level} should not set ERROR status" + assert raw_span.status.status_code != StatusCode.ERROR, ( + f"Level {level} should not set ERROR status" + ) def test_multiple_error_updates(self, langfuse_client, memory_exporter): """Test that multiple ERROR level updates work correctly.""" @@ -1206,12 +1206,12 @@ def test_different_observation_types_error_handling( raw_span = obs_spans[0] from opentelemetry.trace.status import StatusCode - assert ( - raw_span.status.status_code == StatusCode.ERROR - ), f"{obs_type} should have ERROR status" - assert ( - raw_span.status.description == f"{obs_type} failed" - ), f"{obs_type} should have correct description" + assert raw_span.status.status_code == StatusCode.ERROR, ( + f"{obs_type} should have ERROR status" + ) + assert raw_span.status.description == f"{obs_type} failed", ( + f"{obs_type} should have correct description" + ) class TestAdvancedSpans(TestOTelBase): @@ -1387,9 +1387,9 @@ def test_sampling(self, monkeypatch, tracer_provider, mock_processor_init): span.end() # With a sample rate of 0, we should have no spans - assert ( - len(sampled_exporter.get_finished_spans()) == 0 - ), "Expected no spans with 0 sampling" + assert len(sampled_exporter.get_finished_spans()) == 0, ( + "Expected no spans with 0 sampling" + ) # Restore the original provider trace_api.set_tracer_provider(original_provider) @@ -1445,9 +1445,9 @@ def test_disabled_tracing(self, monkeypatch, tracer_provider, mock_processor_ini # Verify no spans were created spans = exporter.get_finished_spans() - assert ( - len(spans) == 0 - ), f"Expected no spans when tracing is disabled, got {len(spans)}" + assert len(spans) == 0, ( + f"Expected no spans when tracing is disabled, got {len(spans)}" + ) def test_trace_id_generation(self, langfuse_client): """Test trace ID generation follows expected format.""" @@ -1456,12 +1456,12 @@ def test_trace_id_generation(self, langfuse_client): trace_id2 = langfuse_client.create_trace_id() # Verify format: 32 hex characters - assert ( - len(trace_id1) == 32 - ), f"Trace ID length should be 32, got {len(trace_id1)}" - assert ( - len(trace_id2) == 32 - ), f"Trace ID length should be 32, got {len(trace_id2)}" + assert len(trace_id1) == 32, ( + f"Trace ID length should be 32, got {len(trace_id1)}" + ) + assert len(trace_id2) == 32, ( + f"Trace ID length should be 32, got {len(trace_id2)}" + ) # jerify it's a valid hex string int(trace_id1, 16), "Trace ID should be a valid hex string" @@ -2752,14 +2752,14 @@ def thread3_function(): assert thread3_span["trace_id"] == trace_id # Verify thread2 span is at the root level (no parent within our trace) - assert ( - thread2_span["attributes"][LangfuseOtelSpanAttributes.AS_ROOT] is True - ), "Thread 2 span should not have a parent" + assert thread2_span["attributes"][LangfuseOtelSpanAttributes.AS_ROOT] is True, ( + "Thread 2 span should not have a parent" + ) # Verify thread3 span is a child of the main span - assert ( - thread3_span["parent_span_id"] == main_span_id - ), "Thread 3 span should be a child of main span" + assert thread3_span["parent_span_id"] == main_span_id, ( + "Thread 3 span should be a child of main span" + ) @pytest.mark.asyncio async def test_span_metadata_updates_in_async_context( @@ -2918,12 +2918,12 @@ def test_metrics_and_timing(self, langfuse_client, memory_exporter): # The span timing should be within our manually recorded range # Note: This might fail on slow systems, so we use a relaxed comparison - assert ( - span_start_seconds <= end_time - ), "Span start time should be before our recorded end time" - assert ( - span_end_seconds >= start_time - ), "Span end time should be after our recorded start time" + assert span_start_seconds <= end_time, ( + "Span start time should be before our recorded end time" + ) + assert span_end_seconds >= start_time, ( + "Span end time should be after our recorded start time" + ) # Span duration should be positive and roughly match our sleep time span_duration_seconds = ( @@ -2933,9 +2933,9 @@ def test_metrics_and_timing(self, langfuse_client, memory_exporter): # Since we slept for 0.1 seconds, the span duration should be at least 0.05 seconds # but we'll be generous with the upper bound due to potential system delays - assert ( - span_duration_seconds >= 0.05 - ), f"Span duration ({span_duration_seconds}s) should be at least 0.05s" + assert span_duration_seconds >= 0.05, ( + f"Span duration ({span_duration_seconds}s) should be at least 0.05s" + ) # Add tests for media functionality in its own class @@ -3129,9 +3129,9 @@ def mask_sensitive_data(data): # Run all test cases for i, test_case in enumerate(test_cases): result = mask_sensitive_data(test_case["input"]) - assert ( - result == test_case["expected"] - ), f"Test case {i} failed: {result} != {test_case['expected']}" + assert result == test_case["expected"], ( + f"Test case {i} failed: {result} != {test_case['expected']}" + ) # Now test using the actual LangfuseSpan implementation from unittest.mock import MagicMock diff --git a/tests/test_prompt_atexit.py b/tests/test_prompt_atexit.py index 9f8838adb..2eac27ceb 100644 --- a/tests/test_prompt_atexit.py +++ b/tests/test_prompt_atexit.py @@ -50,9 +50,9 @@ def wait_2_sec(): print(process.stderr) shutdown_count = logs.count("Shutdown of prompt refresh task manager completed.") - assert ( - shutdown_count == 1 - ), f"Expected 1 shutdown messages, but found {shutdown_count}" + assert shutdown_count == 1, ( + f"Expected 1 shutdown messages, but found {shutdown_count}" + ) @pytest.mark.timeout(10) @@ -114,6 +114,6 @@ async def run_multiple_mains(): print(process.stderr) shutdown_count = logs.count("Shutdown of prompt refresh task manager completed.") - assert ( - shutdown_count == 3 - ), f"Expected 3 shutdown messages, but found {shutdown_count}" + assert shutdown_count == 3, ( + f"Expected 3 shutdown messages, but found {shutdown_count}" + ) diff --git a/tests/test_propagate_attributes.py b/tests/test_propagate_attributes.py index 16a960c1f..affa84dd2 100644 --- a/tests/test_propagate_attributes.py +++ b/tests/test_propagate_attributes.py @@ -68,9 +68,9 @@ def verify_missing_attribute(self, span_data: dict, attr_key: str): AssertionError: If the attribute exists on the span """ attributes = span_data["attributes"] - assert ( - attr_key not in attributes - ), f"Attribute '{attr_key}' should NOT be on span '{span_data['name']}'" + assert attr_key not in attributes, ( + f"Attribute '{attr_key}' should NOT be on span '{span_data['name']}'" + ) class TestPropagateAttributesBasic(TestPropagateAttributesBase):