Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/fi/opt/base/base_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ def optimize(
data_mapper: The user-provided data mapper.
dataset: The dataset to use for evaluation.
metric: The metric function to use for evaluation.
**kwargs: Additional, optimizer-specific arguments.
**kwargs: Additional, optimizer-specific arguments. Common optional
arguments include:
- early_stopping (EarlyStoppingConfig): Configuration for early
stopping criteria. Supports patience-based stopping, score
thresholds, minimum improvement deltas, and cost budgets.
When configured, optimization may terminate before reaching
the maximum number of iterations.

Returns:
An OptimizationResult object with the best generator and results.
An OptimizationResult object with the best generator, iteration
history, final score, and early stopping metadata (if applicable).
"""
pass
33 changes: 32 additions & 1 deletion src/fi/opt/optimizers/bayesian_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ..datamappers import BasicDataMapper
from ..generators.litellm import LiteLLMGenerator
from ..base.evaluator import Evaluator
from ..utils.early_stopping import EarlyStoppingConfig, EarlyStoppingChecker


TEACHER_SYSTEM_PROMPT = (
Expand Down Expand Up @@ -154,10 +155,17 @@ def optimize(
data_mapper: BasicDataMapper,
dataset: List[Dict[str, Any]],
initial_prompts: List[str],
early_stopping: Optional[EarlyStoppingConfig] = None,
**kwargs: Any,
) -> OptimizationResult:
logging.info("--- Starting Bayesian Search Optimization ---")

# Initialize early stopping checker
checker = None
if early_stopping and early_stopping.is_enabled():
checker = EarlyStoppingChecker(early_stopping)
logging.info(f"Early stopping enabled: {early_stopping}")

if not initial_prompts:
raise ValueError("Initial prompts list cannot be empty.")

Expand Down Expand Up @@ -228,6 +236,16 @@ def objective(trial: optuna.Trial) -> float:
logging.info(
f"Trial {trial.number}: Score={avg_score:.4f}, Num Examples={len(selected_indices)}"
)

# Check early stopping
if checker:
eval_size = len(self._select_eval_subset(dataset))
if checker.should_stop(avg_score, eval_size):
logging.info(
f"Early stopping triggered: {checker.get_state()['stop_reason']}"
)
trial.study.stop()

return avg_score

study = optuna.create_study(
Expand All @@ -238,17 +256,30 @@ def objective(trial: optuna.Trial) -> float:
study_name=self.study_name,
load_if_exists=bool(self.storage and self.study_name),
)
study.optimize(objective, n_trials=self.n_trials)

try:
study.optimize(objective, n_trials=self.n_trials)
except Exception as e:
logging.info(f"Optimization stopped: {e}")

best_prompt = study.best_trial.user_attrs.get("prompt", initial_prompt)
best_generator = LiteLLMGenerator(self.inference_model_name, best_prompt)

# Build result with early stopping metadata
return OptimizationResult(
best_generator=best_generator,
history=history,
final_score=float(study.best_value)
if study.best_value is not None
else 0.0,
early_stopped=checker.get_state()["stopped"] if checker else False,
stop_reason=checker.get_state()["stop_reason"] if checker else None,
total_iterations=len(history),
total_evaluations=(
checker.get_state()["total_evaluations"]
if checker
else sum(len(h.individual_results) for h in history)
),
)

def _score_prompt(
Expand Down
123 changes: 94 additions & 29 deletions src/fi/opt/optimizers/gepa.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ..base.evaluator import Evaluator
from ..generators.litellm import LiteLLMGenerator
from ..types import OptimizationResult, IterationHistory
from ..utils.early_stopping import EarlyStoppingConfig, EarlyStoppingChecker

logger = logging.getLogger(__name__)

Expand All @@ -32,11 +33,13 @@ def __init__(
evaluator: Evaluator,
data_mapper: BasicDataMapper,
history_list: List[IterationHistory],
early_stopping_checker: Optional[EarlyStoppingChecker] = None,
):
self.generator_model = generator_model
self.evaluator = evaluator
self.data_mapper = data_mapper
self.history_list = history_list
self.early_stopping_checker = early_stopping_checker
logger.info(f"Initialized with generator_model: {generator_model}")

def evaluate(
Expand Down Expand Up @@ -100,6 +103,17 @@ def evaluate(
)
)

# Check early stopping
if self.early_stopping_checker:
if self.early_stopping_checker.should_stop(avg_score, len(batch)):
logger.info(
f"Early stopping triggered: "
f"{self.early_stopping_checker.get_state()['stop_reason']}"
)
raise StopIteration(
self.early_stopping_checker.get_state()["stop_reason"]
)

trajectories = []
if capture_traces:
logger.info(f"Capturing traces.")
Expand Down Expand Up @@ -189,13 +203,20 @@ def optimize(
dataset: List[Dict[str, Any]],
initial_prompts: List[str],
max_metric_calls: Optional[int] = 150,
early_stopping: Optional[EarlyStoppingConfig] = None,
) -> OptimizationResult:
opt_start_time = time.time()
logger.info("--- Starting GEPA Prompt Optimization ---")
logger.info(f"Dataset size: {len(dataset)}")
logger.info(f"Initial prompts: {initial_prompts}")
logger.info(f"Max metric calls: {max_metric_calls}")

# Initialize early stopping checker
checker = None
if early_stopping and early_stopping.is_enabled():
checker = EarlyStoppingChecker(early_stopping)
logger.info(f"Early stopping enabled: {early_stopping}")

if not initial_prompts:
raise ValueError("Initial prompts list cannot be empty for GEPAOptimizer.")
history: List[IterationHistory] = []
Expand All @@ -206,6 +227,7 @@ def optimize(
evaluator=evaluator,
data_mapper=data_mapper,
history_list=history,
early_stopping_checker=checker,
)

# 2. Prepare the inputs for gepa.optimize
Expand All @@ -215,38 +237,81 @@ def optimize(
# 3. Call the external GEPA library's optimize function
logger.info("Calling gepa.optimize...")
gepa_start_time = time.time()
gepa_result = gepa.optimize(
seed_candidate=seed_candidate,
trainset=dataset,
valset=dataset,
adapter=adapter,
reflection_lm=self.reflection_model,
max_metric_calls=max_metric_calls,
display_progress_bar=True,
)
gepa_end_time = time.time()
logger.info(
f"gepa.optimize finished in {gepa_end_time - gepa_start_time:.2f}s."
)
logger.info(
f"GEPA result best score: {gepa_result.val_aggregate_scores[gepa_result.best_idx]}"
)
logger.info(f"GEPA best candidate: {gepa_result.best_candidate}")

logger.info(f"Captured {len(history)} iterations in history.")
# 4. Translate GEPA's result back into our framework's standard format
logger.info("Translating GEPA result to OptimizationResult...")
try:
gepa_result = gepa.optimize(
seed_candidate=seed_candidate,
trainset=dataset,
valset=dataset,
adapter=adapter,
reflection_lm=self.reflection_model,
max_metric_calls=max_metric_calls,
display_progress_bar=True,
)
gepa_end_time = time.time()
logger.info(
f"gepa.optimize finished in {gepa_end_time - gepa_start_time:.2f}s."
)
logger.info(
f"GEPA result best score: {gepa_result.val_aggregate_scores[gepa_result.best_idx]}"
)
logger.info(f"GEPA best candidate: {gepa_result.best_candidate}")

final_best_generator = LiteLLMGenerator(
model=self.generator_model,
prompt_template=gepa_result.best_candidate.get("prompt", ""),
)
logger.info(f"Captured {len(history)} iterations in history.")
# 4. Translate GEPA's result back into our framework's standard format
logger.info("Translating GEPA result to OptimizationResult...")

result = OptimizationResult(
best_generator=final_best_generator,
history=history,
final_score=gepa_result.val_aggregate_scores[gepa_result.best_idx],
)
final_best_generator = LiteLLMGenerator(
model=self.generator_model,
prompt_template=gepa_result.best_candidate.get("prompt", ""),
)

# Build result with early stopping metadata
result = OptimizationResult(
best_generator=final_best_generator,
history=history,
final_score=gepa_result.val_aggregate_scores[gepa_result.best_idx],
early_stopped=False,
stop_reason=None,
total_iterations=len(history),
total_evaluations=(
checker.get_state()["total_evaluations"]
if checker
else sum(len(h.individual_results) for h in history)
),
)

except StopIteration as e:
gepa_end_time = time.time()
logger.info(
f"GEPA stopped early after {gepa_end_time - gepa_start_time:.2f}s: {e}"
)

# Use best from history
if not history:
raise RuntimeError(
"Early stopping triggered before any evaluations completed"
)

best_history = max(history, key=lambda h: h.average_score)
final_best_generator = LiteLLMGenerator(
model=self.generator_model,
prompt_template=best_history.prompt,
)

result = OptimizationResult(
best_generator=final_best_generator,
history=history,
final_score=best_history.average_score,
early_stopped=True,
stop_reason=str(e),
total_iterations=len(history),
total_evaluations=(
checker.get_state()["total_evaluations"]
if checker
else sum(len(h.individual_results) for h in history)
),
)

opt_end_time = time.time()
logger.info(
Expand Down
27 changes: 27 additions & 0 deletions src/fi/opt/optimizers/metaprompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ..base.evaluator import Evaluator
from ..generators.litellm import LiteLLMGenerator
from ..types import IterationHistory, OptimizationResult
from ..utils.early_stopping import EarlyStoppingConfig, EarlyStoppingChecker
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,9 +90,16 @@ def optimize(
task_description: str = "I want to improve my prompt.",
num_rounds: Optional[int] = 5,
eval_subset_size: Optional[int] = 40,
early_stopping: Optional[EarlyStoppingConfig] = None,
) -> OptimizationResult:
logger.info("--- Starting Meta-Prompt Optimization ---")

# Initialize early stopping checker
checker = None
if early_stopping and early_stopping.is_enabled():
checker = EarlyStoppingChecker(early_stopping)
logger.info(f"Early stopping enabled: {early_stopping}")

if not initial_prompts:
raise ValueError("Initial prompts list cannot be empty.")

Expand Down Expand Up @@ -125,6 +133,15 @@ def optimize(
best_prompt = current_prompt
logger.info(f"New best score found: {best_score:.4f}")

# Check early stopping
if checker:
num_evals = len(eval_subset)
if checker.should_stop(current_score, num_evals):
logger.info(
f"Early stopping triggered: {checker.get_state()['stop_reason']}"
)
break

# 2. Use the teacher model to generate a new, improved prompt
annotated_results_str = self._format_results(iteration_history, eval_subset)

Expand Down Expand Up @@ -157,10 +174,20 @@ def optimize(
)

final_best_generator = LiteLLMGenerator(self.teacher.model_name, best_prompt)

# Build result with early stopping metadata
return OptimizationResult(
best_generator=final_best_generator,
history=history,
final_score=best_score,
early_stopped=checker.get_state()["stopped"] if checker else False,
stop_reason=checker.get_state()["stop_reason"] if checker else None,
total_iterations=len(history),
total_evaluations=(
checker.get_state()["total_evaluations"]
if checker
else sum(len(h.individual_results) for h in history)
),
)

def _score_prompt(
Expand Down
Loading