Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion contributing/samples/gepa/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from tau_bench.types import EnvRunResult
from tau_bench.types import RunConfig
import tau_bench_agent as tau_bench_agent_lib

import utils


Expand Down
1 change: 0 additions & 1 deletion contributing/samples/gepa/run_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from absl import flags
import experiment
from google.genai import types

import utils

_OUTPUT_DIR = flags.DEFINE_string(
Expand Down
125 changes: 125 additions & 0 deletions demo_parallel_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Performance demonstration for parallel LLM-as-judge evaluation.

This script demonstrates the performance improvement from parallelizing
LLM evaluation calls using asyncio.gather().
"""

from __future__ import annotations

import asyncio
from collections import defaultdict
import time
from typing import Optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Optional type is imported from typing but is not used within this file. It's a good practice to remove unused imports to maintain code cleanliness.


from google.genai import types as genai_types


# Simulated LLM call with artificial delay
async def mock_llm_call(delay: float = 0.5):
"""Simulates an LLM API call with specified delay."""
await asyncio.sleep(delay)
return genai_types.Content(
parts=[genai_types.Part(text="Mock LLM response")],
role="model",
)


async def serial_evaluation(
num_invocations: int, num_samples: int, delay: float
):
"""Simulates the OLD serial evaluation approach."""
results = []
for i in range(num_invocations):
invocation_samples = []
for j in range(num_samples):
response = await mock_llm_call(delay)
invocation_samples.append(response)
results.append(invocation_samples)
return results


async def parallel_evaluation(
num_invocations: int, num_samples: int, delay: float
):
"""Simulates the NEW parallel evaluation approach."""
tasks = []
invocation_indices = []

# Create all N×M tasks
for i in range(num_invocations):
for j in range(num_samples):
tasks.append(mock_llm_call(delay))
invocation_indices.append(i)

# Execute in parallel
all_results = await asyncio.gather(*tasks)

# Group by invocation
results_by_invocation = defaultdict(list)
for idx, result in zip(invocation_indices, all_results):
results_by_invocation[idx].append(result)

return [
results_by_invocation[i] for i in sorted(results_by_invocation.keys())
]


async def main():
"""Run performance comparison."""
num_invocations = 5
num_samples = 2
delay = 0.5 # 500ms per call

print("=" * 60)
print("LLM-as-Judge Parallel Evaluation Performance Test")
print("=" * 60)
print(f"Configuration:")
print(f" - Invocations: {num_invocations}")
print(f" - Samples per invocation: {num_samples}")
print(f" - Total LLM calls: {num_invocations * num_samples}")
print(f" - Simulated delay per call: {delay}s")
print()

# Test serial approach
print("Testing SERIAL approach (old)...")
start_time = time.perf_counter()
serial_results = await serial_evaluation(num_invocations, num_samples, delay)
serial_time = time.perf_counter() - start_time
print(f"✓ Completed in {serial_time:.2f}s")
print()

# Test parallel approach
print("Testing PARALLEL approach (new)...")
start_time = time.perf_counter()
parallel_results = await parallel_evaluation(
num_invocations, num_samples, delay
)
parallel_time = time.perf_counter() - start_time
print(f"✓ Completed in {parallel_time:.2f}s")
print()

# Calculate speedup
speedup = serial_time / parallel_time
time_saved = serial_time - parallel_time

print("=" * 60)
print("RESULTS")
print("=" * 60)
print(f"Serial time: {serial_time:.2f}s")
print(f"Parallel time: {parallel_time:.2f}s")
print(f"Speedup: {speedup:.2f}x faster")
print(
f"Time saved: {time_saved:.2f}s ({time_saved/serial_time*100:.1f}%)"
)
print("=" * 60)

# Verify results are the same
assert len(serial_results) == len(parallel_results)
for i in range(len(serial_results)):
assert len(serial_results[i]) == len(parallel_results[i])
print("✓ Results verified: both approaches produce same output structure")


if __name__ == "__main__":
asyncio.run(main())
99 changes: 71 additions & 28 deletions src/google/adk/evaluation/llm_as_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from __future__ import annotations

from abc import abstractmethod
import asyncio
from collections import defaultdict
from typing import Optional

from google.genai import types as genai_types
Expand All @@ -33,7 +35,7 @@
from .eval_case import Invocation
from .eval_metrics import BaseCriterion
from .eval_metrics import EvalMetric
from .eval_metrics import RubricScore
from .eval_rubrics import RubricScore
from .evaluator import EvaluationResult
from .evaluator import Evaluator
from .evaluator import PerInvocationResult
Expand Down Expand Up @@ -114,6 +116,44 @@ def aggregate_invocation_results(
) -> EvaluationResult:
"""Aggregates the per invocation results to get the overall score."""

async def _evaluate_single_sample(
self,
llm_request: LlmRequest,
actual: Invocation,
expected: Optional[Invocation],
) -> PerInvocationResult:
"""Evaluates a single sample for an invocation.

Args:
llm_request: The LLM request to execute.
actual: The actual invocation to evaluate.
expected: The expected invocation (optional).

Returns:
A PerInvocationResult containing the evaluation score and status.
"""
async with Aclosing(
self._judge_model.generate_content_async(llm_request)
) as agen:
async for llm_response in agen:
# Non-streaming call, so there is only one response content.
auto_rater_score = self.convert_auto_rater_response_to_score(
llm_response
)
return PerInvocationResult(
actual_invocation=actual,
expected_invocation=expected,
score=auto_rater_score.score,
eval_status=get_eval_status(
auto_rater_score.score, self._eval_metric.threshold
),
rubric_scores=auto_rater_score.rubric_scores,
)
# If we reach here, the LLM didn't return any response
raise RuntimeError(
"LLM evaluation failed: no response received from judge model"
)

@override
async def evaluate_invocations(
self,
Expand All @@ -133,8 +173,13 @@ async def evaluate_invocations(
else expected_invocations
)

per_invocation_results = []
for actual, expected in zip(actual_invocations, expected_invocations):
# Build all LLM evaluation tasks for parallel execution
tasks = []
invocation_indices = [] # Track which invocation each task belongs to

for invocation_idx, (actual, expected) in enumerate(
zip(actual_invocations, expected_invocations)
):
auto_rater_prompt = self.format_auto_rater_prompt(actual, expected)
llm_request = LlmRequest(
model=self._judge_model_options.judge_model,
Expand All @@ -149,32 +194,30 @@ async def evaluate_invocations(
)
add_default_retry_options_if_not_present(llm_request)
num_samples = self._judge_model_options.num_samples
invocation_result_samples = []

# Create tasks for all samples of this invocation
for _ in range(num_samples):
async with Aclosing(
self._judge_model.generate_content_async(llm_request)
) as agen:
async for llm_response in agen:
# Non-streaming call, so there is only one response content.
auto_rater_score = self.convert_auto_rater_response_to_score(
llm_response
)
invocation_result_samples.append(
PerInvocationResult(
actual_invocation=actual,
expected_invocation=expected,
score=auto_rater_score.score,
eval_status=get_eval_status(
auto_rater_score.score, self._eval_metric.threshold
),
rubric_scores=auto_rater_score.rubric_scores,
)
)
if not invocation_result_samples:
continue
per_invocation_results.append(
self.aggregate_per_invocation_samples(invocation_result_samples)
)
tasks.append(
self._evaluate_single_sample(llm_request, actual, expected)
)
invocation_indices.append(invocation_idx)

# Execute all tasks in parallel
all_results = await asyncio.gather(*tasks)

# Group results by invocation
results_by_invocation = defaultdict(list)
for invocation_idx, result in zip(invocation_indices, all_results):
results_by_invocation[invocation_idx].append(result)

# Aggregate samples for each invocation
per_invocation_results = []
for invocation_idx in sorted(results_by_invocation.keys()):
invocation_result_samples = results_by_invocation[invocation_idx]
if invocation_result_samples:
per_invocation_results.append(
self.aggregate_per_invocation_samples(invocation_result_samples)
)

if per_invocation_results:
return self.aggregate_invocation_results(per_invocation_results)
Expand Down
Loading
Loading