From 8312b0f311ace05d726fd7f5b50f80953aaadc79 Mon Sep 17 00:00:00 2001 From: MashAliK <42744726+MashAliK@users.noreply.github.com> Date: Mon, 9 Jun 2025 01:29:14 -0400 Subject: [PATCH 01/11] init --- openevolve/config.py | 4 +- openevolve/controller.py | 261 +++++++++++++-------------------- openevolve/database.py | 144 +++++++++--------- openevolve/iteration.py | 144 ++++++++++++++++++ openevolve/utils/__init__.py | 1 - openevolve/utils/code_utils.py | 1 - pyproject.toml | 4 +- 7 files changed, 317 insertions(+), 242 deletions(-) create mode 100644 openevolve/iteration.py diff --git a/openevolve/config.py b/openevolve/config.py index 1b0dc6e80..71af39476 100644 --- a/openevolve/config.py +++ b/openevolve/config.py @@ -151,6 +151,7 @@ class DatabaseConfig: # Evolutionary parameters population_size: int = 1000 + allowed_population_overflow: int = 50 archive_size: int = 100 num_islands: int = 5 @@ -217,6 +218,7 @@ class Config: log_level: str = "INFO" log_dir: Optional[str] = None random_seed: Optional[int] = 42 + language: str = None # Component configurations llm: LLMConfig = field(default_factory=LLMConfig) @@ -361,4 +363,4 @@ def load_config(config_path: Optional[Union[str, Path]] = None) -> Config: # Make the system message available to the individual models, in case it is not provided from the prompt sampler config.llm.update_model_params({"system_message": config.prompt.system_message}) - return config + return config \ No newline at end of file diff --git a/openevolve/controller.py b/openevolve/controller.py index 5205561ca..20cfde6c3 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -5,25 +5,23 @@ import asyncio import logging import os +import shutil import re import time import uuid from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union import traceback +import concurrent.futures from openevolve.config import Config, load_config from openevolve.database import Program, ProgramDatabase from openevolve.evaluator import Evaluator from openevolve.llm.ensemble import LLMEnsemble from openevolve.prompt.sampler import PromptSampler +from openevolve.iteration import run_iteration_sync, Result from openevolve.utils.code_utils import ( - apply_diff, extract_code_language, - extract_diffs, - format_diff_summary, - parse_evolve_blocks, - parse_full_rewrite, ) from openevolve.utils.format_utils import ( format_metrics_safe, @@ -129,7 +127,8 @@ def __init__( # Load initial program self.initial_program_path = initial_program_path self.initial_program_code = self._load_initial_program() - self.language = extract_code_language(self.initial_program_code) + if not self.config.language: + self.config.language = extract_code_language(self.initial_program_code) # Extract file extension from initial program self.file_extension = os.path.splitext(initial_program_path)[1] @@ -162,8 +161,9 @@ def __init__( self.evaluator_prompt_sampler, database=self.database, ) + self.evaluation_file = evaluation_file - logger.info(f"Initialized OpenEvolve with {initial_program_path} " f"and {evaluation_file}") + logger.info(f"Initialized OpenEvolve with {initial_program_path}") def _setup_logging(self) -> None: """Set up logging""" @@ -236,7 +236,7 @@ async def run( initial_program = Program( id=initial_program_id, code=self.initial_program_code, - language=self.language, + language=self.config.language, metrics=initial_metrics, iteration_found=start_iteration, ) @@ -263,171 +263,112 @@ async def run( logger.info(f"Using island-based evolution with {self.config.database.num_islands} islands") self.database.log_island_status() - for i in range(start_iteration, total_iterations): - iteration_start = time.time() - - # Manage island evolution - switch islands periodically - if i > start_iteration and current_island_counter >= programs_per_island: - self.database.next_island() - current_island_counter = 0 - logger.debug(f"Switched to island {self.database.current_island}") - - current_island_counter += 1 - - # Sample parent and inspirations from current island - parent, inspirations = self.database.sample() - - # Get artifacts for the parent program if available - parent_artifacts = self.database.get_artifacts(parent.id) - - # Get actual top programs for prompt context (separate from inspirations) - # This ensures the LLM sees only high-performing programs as examples - actual_top_programs = self.database.get_top_programs(5) - - # Build prompt - prompt = self.prompt_sampler.build_prompt( - current_program=parent.code, - parent_program=parent.code, # We don't have the parent's code, use the same - program_metrics=parent.metrics, - previous_programs=[p.to_dict() for p in self.database.get_top_programs(3)], - top_programs=[p.to_dict() for p in actual_top_programs], # Use actual top programs - inspirations=[p.to_dict() for p in inspirations], # Pass inspirations separately - language=self.language, - evolution_round=i, - diff_based_evolution=self.config.diff_based_evolution, - program_artifacts=parent_artifacts if parent_artifacts else None, - ) - - # Generate code modification - try: - llm_response = await self.llm_ensemble.generate_with_context( - system_message=prompt["system"], - messages=[{"role": "user", "content": prompt["user"]}], + # create temp file to save database snapshots to for process workers to load from + temp_db_path = "tmp/" + str(uuid.uuid4()) + self.database.save(temp_db_path, start_iteration) + + with concurrent.futures.ProcessPoolExecutor( + max_workers=self.config.evaluator.parallel_evaluations + ) as executor: + futures = [] + for i in range(start_iteration, total_iterations): + futures.append( + executor.submit( + run_iteration_sync, i, self.config, self.evaluation_file, temp_db_path + ) ) - # Parse the response - if self.config.diff_based_evolution: - diff_blocks = extract_diffs(llm_response) - - if not diff_blocks: - logger.warning(f"Iteration {i+1}: No valid diffs found in response") + iteration = start_iteration + 1 + for future in concurrent.futures.as_completed(futures): + logger.info(f"Completed iteration {iteration}") + try: + result: Result = future.result() + # if result is nonType + if not isinstance(result, Result): + logger.warning(f"No valid diffs or program length exceeded limit") continue + # Manage island evolution - switch islands periodically + if ( + iteration - 1 > start_iteration + and current_island_counter >= programs_per_island + ): + self.database.next_island() + current_island_counter = 0 + logger.debug(f"Switched to island {self.database.current_island}") + + current_island_counter += 1 + + # Add to database (will be added to current island) + self.database.add(result.child_program, iteration=iteration) + + # Log prompts + self.database.log_prompt( + template_key=( + "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" + ), + program_id=result.child_program.id, + prompt=result.prompt, + responses=[result.llm_response], + ) - # Apply the diffs - child_code = apply_diff(parent.code, llm_response) - changes_summary = format_diff_summary(diff_blocks) - else: - # Parse full rewrite - new_code = parse_full_rewrite(llm_response, self.language) - - if not new_code: - logger.warning(f"Iteration {i+1}: No valid code found in response") - continue - - child_code = new_code - changes_summary = "Full rewrite" - - # Check code length - if len(child_code) > self.config.max_code_length: - logger.warning( - f"Iteration {i+1}: Generated code exceeds maximum length " - f"({len(child_code)} > {self.config.max_code_length})" + # Store artifacts if they exist (after program is added to database) + if result.artifacts: + self.database.store_artifacts(result.child_program.id, result.artifacts) + + # Log prompts + self.database.log_prompt( + template_key=( + "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" + ), + program_id=result.child_program.id, + prompt=result.prompt, + responses=[result.llm_response], ) - continue - # Evaluate the child program - child_id = str(uuid.uuid4()) - child_metrics = await self.evaluator.evaluate_program(child_code, child_id) - - # Handle artifacts if they exist - artifacts = self.evaluator.get_pending_artifacts(child_id) - - # Create a child program - child_program = Program( - id=child_id, - code=child_code, - language=self.language, - parent_id=parent.id, - generation=parent.generation + 1, - metrics=child_metrics, - metadata={ - "changes": changes_summary, - "parent_metrics": parent.metrics, - }, - ) + # Increment generation for current island + self.database.increment_island_generation() - # Add to database (will be added to current island) - self.database.add(child_program, iteration=i + 1) - - # Log prompts - self.database.log_prompt( - template_key=( - "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" - ), - program_id=child_id, - prompt=prompt, - responses=[llm_response], - ) + # Check if migration should occur + if self.database.should_migrate(): + logger.info(f"Performing migration at iteration {iteration}") + self.database.migrate_programs() + self.database.log_island_status() - # Store artifacts if they exist - if artifacts: - self.database.store_artifacts(child_id, artifacts) - - # Log prompts - self.database.log_prompt( - template_key=( - "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" - ), - program_id=child_id, - prompt=prompt, - responses=[llm_response], - ) + # Log progress + self._log_iteration( + iteration, result.parent, result.child_program, result.iteration_time + ) - # Increment generation for current island - self.database.increment_island_generation() - - # Check if migration should occur - if self.database.should_migrate(): - logger.info(f"Performing migration at iteration {i+1}") - self.database.migrate_programs() - self.database.log_island_status() - - # Log progress - iteration_time = time.time() - iteration_start - self._log_iteration(i, parent, child_program, iteration_time) - - # Specifically check if this is the new best program - if self.database.best_program_id == child_program.id: - logger.info(f"🌟 New best solution found at iteration {i+1}: {child_program.id}") - logger.info(f"Metrics: {format_metrics_safe(child_program.metrics)}") - - # Save checkpoint - if (i + 1) % self.config.checkpoint_interval == 0: - self._save_checkpoint(i + 1) - # Also log island status at checkpoints - logger.info(f"Island status at checkpoint {i+1}:") - self.database.log_island_status() - - # Check if target score reached - if target_score is not None: - # Only consider numeric metrics for target score calculation - numeric_metrics = [ - v - for v in child_metrics.values() - if isinstance(v, (int, float)) and not isinstance(v, bool) - ] - if numeric_metrics: - avg_score = sum(numeric_metrics) / len(numeric_metrics) + # Specifically check if this is the new best program + if self.database.best_program_id == result.child_program.id: + logger.info( + f"🌟 New best solution found at iteration {iteration}: {result.child_program.id}" + ) + logger.info(f"Metrics: {format_metrics_safe(result.child_program.metrics)}") + + # Save checkpoint + if (iteration) % self.config.checkpoint_interval == 0: + self._save_checkpoint(iteration) + # Also log island status at checkpoints + logger.info(f"Island status at checkpoint {iteration}:") + self.database.log_island_status() + + # Check if target score reached + if target_score is not None: + avg_score = sum(result["child_metrics"].values()) / max( + 1, len(result.child_metrics) + ) if avg_score >= target_score: logger.info( - f"Target score {target_score} reached after {i+1} iterations" + f"Target score {target_score} reached after {iteration} iterations" ) break + self.database.save(temp_db_path, iteration) - except Exception as e: - logger.exception(f"Error in iteration {i+1}: {str(e)}") - continue - + except Exception as e: + logger.error(f"Error in iteration {i+1}: {str(e)}") + continue + shutil.rmtree(temp_db_path) # Get the best program using our tracking mechanism best_program = None if self.database.best_program_id: @@ -607,4 +548,4 @@ def _save_best_program(self, program: Optional[Program] = None) -> None: indent=2, ) - logger.info(f"Saved best program to {code_path} with program info to {info_path}") + logger.info(f"Saved best program to {code_path} with program info to {info_path}") \ No newline at end of file diff --git a/openevolve/database.py b/openevolve/database.py index 6e2cd8492..62914a239 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -6,10 +6,13 @@ import json import logging import os +import shutil import random import time from dataclasses import asdict, dataclass, field, fields from pathlib import Path +from Levenshtein import ratio +from filelock import FileLock, Timeout from typing import Any, Dict, List, Optional, Set, Tuple, Union import numpy as np @@ -164,9 +167,6 @@ def add( self.programs[program.id] = program - # Enforce population size limit - self._enforce_population_limit() - # Calculate feature coordinates for MAP-Elites feature_coords = self._calculate_feature_coords(program) @@ -209,6 +209,10 @@ def add( self._save_program(program) logger.debug(f"Added program {program.id} to island {island_idx}") + + # Enforce population size limit + self._enforce_population_limit() + return program.id def get(self, program_id: str) -> Optional[Program]: @@ -350,19 +354,28 @@ def save(self, path: Optional[str] = None, iteration: int = 0) -> None: logger.warning("No database path specified, skipping save") return - # Create directory if it doesn't exist - os.makedirs(save_path, exist_ok=True) - - # Save each program - for program in self.programs.values(): - prompts = None - if ( - self.config.log_prompts - and self.prompts_by_program - and program.id in self.prompts_by_program - ): - prompts = self.prompts_by_program[program.id] - self._save_program(program, save_path, prompts=prompts) + lock_name = os.path.basename(save_path) + ".lock" + lock_path = os.path.join("tmp/locks", lock_name) + try: + with FileLock(lock_path, timeout=10): + # Create directory and remove old path if it exists + if os.path.exists(save_path): + shutil.rmtree(save_path) + os.makedirs(save_path) + + # Save each program + for program in self.programs.values(): + prompts = None + if ( + self.config.log_prompts + and self.prompts_by_program + and program.id in self.prompts_by_program + ): + prompts = self.prompts_by_program[program.id] + self._save_program(program, save_path, prompts=prompts) + + except Timeout: + logger.exception("Could not acquire the lock within 10 seconds") # Save metadata metadata = { @@ -411,19 +424,25 @@ def load(self, path: str) -> None: logger.info(f"Loaded database metadata with last_iteration={self.last_iteration}") # Load programs + lock_name = os.path.basename(path) + ".lock" + lock_path = os.path.join("tmp/locks", lock_name) programs_dir = os.path.join(path, "programs") - if os.path.exists(programs_dir): - for program_file in os.listdir(programs_dir): - if program_file.endswith(".json"): - program_path = os.path.join(programs_dir, program_file) - try: - with open(program_path, "r") as f: - program_data = json.load(f) - - program = Program.from_dict(program_data) - self.programs[program.id] = program - except Exception as e: - logger.warning(f"Error loading program {program_file}: {str(e)}") + try: + with FileLock(lock_path, timeout=10): + if os.path.exists(programs_dir): + for program_file in os.listdir(programs_dir): + if program_file.endswith(".json"): + program_path = os.path.join(programs_dir, program_file) + try: + with open(program_path, "r") as f: + program_data = json.load(f) + + program = Program.from_dict(program_data) + self.programs[program.id] = program + except Exception as e: + logger.warning(f"Error loading program {program_file}: {str(e)}") + except Timeout: + logger.exception("Could not acquire the lock within 10 seconds") # Reconstruct island assignments from metadata self._reconstruct_islands(saved_islands) @@ -570,7 +589,7 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: if dim == "complexity": # Use code length as complexity measure complexity = len(program.code) - bin_idx = min(int(complexity / 1000 * self.feature_bins), self.feature_bins - 1) + bin_idx = min(int(complexity / 1000), self.feature_bins - 1) coords.append(bin_idx) elif dim == "diversity": # Use average edit distance to other programs @@ -580,13 +599,10 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: sample_programs = random.sample( list(self.programs.values()), min(5, len(self.programs)) ) - avg_distance = sum( - calculate_edit_distance(program.code, other.code) - for other in sample_programs + avg_distance_ratio = sum( + 1 - calculate_edit_distance(program.code, other.code) for other in sample_programs ) / len(sample_programs) - bin_idx = min( - int(avg_distance / 1000 * self.feature_bins), self.feature_bins - 1 - ) + bin_idx = min(int(avg_distance_ratio * 20), self.feature_bins - 1) coords.append(bin_idx) elif dim == "score": # Use average of numeric metrics @@ -594,7 +610,7 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: bin_idx = 0 else: avg_score = safe_numeric_average(program.metrics) - bin_idx = min(int(avg_score * self.feature_bins), self.feature_bins - 1) + bin_idx = max(0, min(int(avg_score * self.feature_bins), self.feature_bins - 1)) coords.append(bin_idx) elif dim in program.metrics: # Use specific metric @@ -604,7 +620,10 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: else: # Default to middle bin if feature not found coords.append(self.feature_bins // 2) - + logging.info( + "MAP-Elites coords: %s", + str({self.config.feature_dimensions[i]: coords[i] for i in range(len(coords))}), + ) return coords def _feature_coords_to_key(self, coords: List[int]) -> str: @@ -940,7 +959,10 @@ def _enforce_population_limit(self) -> None: """ Enforce the population size limit by removing worst programs if needed """ - if len(self.programs) <= self.config.population_size: + if ( + len(self.programs) + <= self.config.population_size + self.config.allowed_population_overflow + ): return # Calculate how many programs to remove @@ -1125,7 +1147,7 @@ def _calculate_island_diversity(self, programs: List[Program]) -> float: if len(programs) < 2: return 0.0 - total_diversity = 0 + total_diversity_ratio = 0 comparisons = 0 # Use deterministic sampling instead of random.sample() to ensure consistent results @@ -1142,46 +1164,12 @@ def _calculate_island_diversity(self, programs: List[Program]) -> float: for i, prog1 in enumerate(sample_programs): for prog2 in sample_programs[i + 1 :]: - if comparisons >= max_comparisons: - break - - # Use fast approximation instead of expensive edit distance - diversity = self._fast_code_diversity(prog1.code, prog2.code) - total_diversity += diversity + total_diversity_ratio += 1 - ratio( + prog1.code, prog2.code + ) # ratio measures similarity comparisons += 1 - if comparisons >= max_comparisons: - break - - return total_diversity / max(1, comparisons) - - def _fast_code_diversity(self, code1: str, code2: str) -> float: - """ - Fast approximation of code diversity using simple metrics - - Returns diversity score (higher = more diverse) - """ - if code1 == code2: - return 0.0 - - # Length difference (scaled to reasonable range) - len1, len2 = len(code1), len(code2) - length_diff = abs(len1 - len2) - - # Line count difference - lines1 = code1.count("\n") - lines2 = code2.count("\n") - line_diff = abs(lines1 - lines2) - - # Simple character set difference - chars1 = set(code1) - chars2 = set(code2) - char_diff = len(chars1.symmetric_difference(chars2)) - - # Combine metrics (scaled to match original edit distance range) - diversity = length_diff * 0.1 + line_diff * 10 + char_diff * 0.5 - - return diversity + return total_diversity_ratio / max(1, comparisons) def log_island_status(self) -> None: """Log current status of all islands""" @@ -1388,4 +1376,4 @@ def log_prompt( if program_id not in self.prompts_by_program: self.prompts_by_program[program_id] = {} - self.prompts_by_program[program_id][template_key] = prompt + self.prompts_by_program[program_id][template_key] = prompt \ No newline at end of file diff --git a/openevolve/iteration.py b/openevolve/iteration.py new file mode 100644 index 000000000..b447055bd --- /dev/null +++ b/openevolve/iteration.py @@ -0,0 +1,144 @@ +import asyncio +import os +import uuid +import logging +import time +from dataclasses import dataclass + +from openevolve.database import Program, ProgramDatabase +from openevolve.config import Config +from openevolve.evaluator import Evaluator +from openevolve.llm.ensemble import LLMEnsemble +from openevolve.prompt.sampler import PromptSampler +from openevolve.utils.code_utils import ( + apply_diff, + extract_diffs, + format_diff_summary, + parse_full_rewrite, +) + + +@dataclass +class Result: + """Resulting program and metrics from an iteration of OpenEvolve""" + + child_program: str = None + parent: str = None + child_metrics: str = None + iteration_time: float = None + + +def run_iteration_sync(iteration: int, config: Config, evaluation_file: str, database_path: str): + # setup logger showing PID for current process + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + logging.basicConfig( + level=getattr(logging, config.log_level), + format="%(asctime)s - %(levelname)s - PID %(process)d - %(message)s", + ) + logger = logging.getLogger(__name__) + logger.info("Starting iteration in PID %s", os.getpid()) + + llm_ensemble = LLMEnsemble(config.llm.models) + llm_evaluator_ensemble = LLMEnsemble(config.llm.evaluator_models) + + prompt_sampler = PromptSampler(config.prompt) + evaluator_prompt_sampler = PromptSampler(config.prompt) + evaluator_prompt_sampler.set_templates("evaluator_system_message") + + # Pass random seed to database if specified + if config.random_seed is not None: + config.database.random_seed = config.random_seed + + # load most recent database snapshot + config.database.db_path = database_path + database = ProgramDatabase(config.database) + + evaluator = Evaluator( + config.evaluator, + evaluation_file, + llm_evaluator_ensemble, + evaluator_prompt_sampler, + ) + + # Sample parent and inspirations from current island + parent, inspirations = database.sample() + + # Build prompt + prompt = prompt_sampler.build_prompt( + current_program=parent.code, + parent_program=parent.code, # We don't have the parent's code, use the same + program_metrics=parent.metrics, + previous_programs=[p.to_dict() for p in database.get_top_programs(3)], + top_programs=[p.to_dict() for p in inspirations], + language=config.language, + evolution_round=iteration, + allow_full_rewrite=config.allow_full_rewrites, + ) + + async def _run(): + result = Result(parent=parent) + iteration_start = time.time() + + # Generate code modification + try: + llm_response = await llm_ensemble.generate_with_context( + system_message=prompt["system"], + messages=[{"role": "user", "content": prompt["user"]}], + ) + + # Parse the response + if config.diff_based_evolution: + diff_blocks = extract_diffs(llm_response) + + if not diff_blocks: + logger.warning(f"Iteration {iteration+1}: No valid diffs found in response") + return + + # Apply the diffs + child_code = apply_diff(parent.code, llm_response) + changes_summary = format_diff_summary(diff_blocks) + else: + # Parse full rewrite + new_code = parse_full_rewrite(llm_response, config.language) + + if not new_code: + logger.warning(f"Iteration {iteration+1}: No valid code found in response") + return + + child_code = new_code + changes_summary = "Full rewrite" + + # Check code length + if len(child_code) > config.max_code_length: + logger.warning( + f"Iteration {iteration+1}: Generated code exceeds maximum length " + f"({len(child_code)} > {config.max_code_length})" + ) + return + + # Evaluate the child program + child_id = str(uuid.uuid4()) + result.child_metrics = await evaluator.evaluate_program(child_code, child_id) + + # Create a child program + result.child_program = Program( + id=child_id, + code=child_code, + language=config.language, + parent_id=parent.id, + generation=parent.generation + 1, + metrics=result.child_metrics, + metadata={ + "changes": changes_summary, + "parent_metrics": parent.metrics, + }, + ) + + except Exception as e: + logger.exception("Error in PID %s:", os.getpid()) + + result.iteration_time = time.time() - iteration_start + return result + + return asyncio.run(_run()) diff --git a/openevolve/utils/__init__.py b/openevolve/utils/__init__.py index 89a4a1b62..f9c300170 100644 --- a/openevolve/utils/__init__.py +++ b/openevolve/utils/__init__.py @@ -32,7 +32,6 @@ "retry_async", "run_in_executor", "apply_diff", - "calculate_edit_distance", "extract_code_language", "extract_diffs", "format_diff_summary", diff --git a/openevolve/utils/code_utils.py b/openevolve/utils/code_utils.py index 60fb63001..398587b32 100644 --- a/openevolve/utils/code_utils.py +++ b/openevolve/utils/code_utils.py @@ -179,7 +179,6 @@ def calculate_edit_distance(code1: str, code2: str) -> int: return dp[m][n] - def extract_code_language(code: str) -> str: """ Try to determine the language of a code snippet diff --git a/pyproject.toml b/pyproject.toml index 94f30a35c..ee9fa7c23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,8 @@ dependencies = [ "numpy>=1.22.0", "tqdm>=4.64.0", "flask", + "levenshtein>=0.27.1", + "filelock>=3.18.0", ] [project.optional-dependencies] @@ -45,4 +47,4 @@ disallow_untyped_defs = true disallow_incomplete_defs = true [project.scripts] -openevolve-run = "openevolve.cli:main" +openevolve-run = "openevolve.cli:main" \ No newline at end of file From 37afa09370eb87c0917250c03d7a876be8da420a Mon Sep 17 00:00:00 2001 From: SuhailB Date: Wed, 2 Jul 2025 14:51:25 -0700 Subject: [PATCH 02/11] Merge latest main into parallel-iterations fork --- openevolve/database.py | 195 ++++++++++++++++++++++++++++++++++++++++ openevolve/iteration.py | 26 +++++- 2 files changed, 218 insertions(+), 3 deletions(-) diff --git a/openevolve/database.py b/openevolve/database.py index 62914a239..042b87186 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -1346,6 +1346,201 @@ def _load_artifact_dir(self, artifact_dir: str) -> Dict[str, Union[str, bytes]]: return artifacts + def log_prompt( + self, + program_id: str, + template_key: str, + prompt: Dict[str, str], + responses: Optional[List[str]] = None, + ) -> None: + """ + Log a prompt for a program. + Only logs if self.config.log_prompts is True. + + Args: + program_id: ID of the program to log the prompt for + template_key: Key for the prompt template + prompt: Prompts in the format {template_key: { 'system': str, 'user': str }}. + responses: Optional list of responses to the prompt, if available. + """ + + if not self.config.log_prompts: + return + + if responses is None: + responses = [] + prompt["responses"] = responses + + if self.prompts_by_program is None: + self.prompts_by_program = {} + + if program_id not in self.prompts_by_program: + self.prompts_by_program[program_id] = {} + self.prompts_by_program[program_id][template_key] = prompt + + # Artifact storage and retrieval methods + + def store_artifacts(self, program_id: str, artifacts: Dict[str, Union[str, bytes]]) -> None: + """ + Store artifacts for a program + + Args: + program_id: ID of the program + artifacts: Dictionary of artifact name to content + """ + if not artifacts: + return + + program = self.get(program_id) + if not program: + logger.warning(f"Cannot store artifacts: program {program_id} not found") + return + + # Check if artifacts are enabled + artifacts_enabled = os.environ.get("ENABLE_ARTIFACTS", "true").lower() == "true" + if not artifacts_enabled: + logger.debug("Artifacts disabled, skipping storage") + return + + # Split artifacts by size + small_artifacts = {} + large_artifacts = {} + size_threshold = getattr(self.config, "artifact_size_threshold", 32 * 1024) # 32KB default + + for key, value in artifacts.items(): + size = self._get_artifact_size(value) + if size <= size_threshold: + small_artifacts[key] = value + else: + large_artifacts[key] = value + + # Store small artifacts as JSON + if small_artifacts: + program.artifacts_json = json.dumps(small_artifacts, default=self._artifact_serializer) + logger.debug(f"Stored {len(small_artifacts)} small artifacts for program {program_id}") + + # Store large artifacts to disk + if large_artifacts: + artifact_dir = self._create_artifact_dir(program_id) + program.artifact_dir = artifact_dir + for key, value in large_artifacts.items(): + self._write_artifact_file(artifact_dir, key, value) + logger.debug(f"Stored {len(large_artifacts)} large artifacts for program {program_id}") + + def get_artifacts(self, program_id: str) -> Dict[str, Union[str, bytes]]: + """ + Retrieve all artifacts for a program + + Args: + program_id: ID of the program + + Returns: + Dictionary of artifact name to content + """ + program = self.get(program_id) + if not program: + return {} + + artifacts = {} + + # Load small artifacts from JSON + if program.artifacts_json: + try: + small_artifacts = json.loads(program.artifacts_json) + artifacts.update(small_artifacts) + except json.JSONDecodeError as e: + logger.warning(f"Failed to decode artifacts JSON for program {program_id}: {e}") + + # Load large artifacts from disk + if program.artifact_dir and os.path.exists(program.artifact_dir): + disk_artifacts = self._load_artifact_dir(program.artifact_dir) + artifacts.update(disk_artifacts) + + return artifacts + + def _get_artifact_size(self, value: Union[str, bytes]) -> int: + """Get size of an artifact value in bytes""" + if isinstance(value, str): + return len(value.encode("utf-8")) + elif isinstance(value, bytes): + return len(value) + else: + return len(str(value).encode("utf-8")) + + def _artifact_serializer(self, obj): + """JSON serializer for artifacts that handles bytes""" + if isinstance(obj, bytes): + return {"__bytes__": base64.b64encode(obj).decode("utf-8")} + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + + def _artifact_deserializer(self, dct): + """JSON deserializer for artifacts that handles bytes""" + if "__bytes__" in dct: + return base64.b64decode(dct["__bytes__"]) + return dct + + def _create_artifact_dir(self, program_id: str) -> str: + """Create artifact directory for a program""" + base_path = getattr(self.config, "artifacts_base_path", None) + if not base_path: + base_path = ( + os.path.join(self.config.db_path or ".", "artifacts") + if self.config.db_path + else "./artifacts" + ) + + artifact_dir = os.path.join(base_path, program_id) + os.makedirs(artifact_dir, exist_ok=True) + return artifact_dir + + def _write_artifact_file(self, artifact_dir: str, key: str, value: Union[str, bytes]) -> None: + """Write an artifact to a file""" + # Sanitize filename + safe_key = "".join(c for c in key if c.isalnum() or c in "._-") + if not safe_key: + safe_key = "artifact" + + file_path = os.path.join(artifact_dir, safe_key) + + try: + if isinstance(value, str): + with open(file_path, "w", encoding="utf-8") as f: + f.write(value) + elif isinstance(value, bytes): + with open(file_path, "wb") as f: + f.write(value) + else: + # Convert to string and write + with open(file_path, "w", encoding="utf-8") as f: + f.write(str(value)) + except Exception as e: + logger.warning(f"Failed to write artifact {key} to {file_path}: {e}") + + def _load_artifact_dir(self, artifact_dir: str) -> Dict[str, Union[str, bytes]]: + """Load artifacts from a directory""" + artifacts = {} + + try: + for filename in os.listdir(artifact_dir): + file_path = os.path.join(artifact_dir, filename) + if os.path.isfile(file_path): + try: + # Try to read as text first + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + artifacts[filename] = content + except UnicodeDecodeError: + # If text fails, read as binary + with open(file_path, "rb") as f: + content = f.read() + artifacts[filename] = content + except Exception as e: + logger.warning(f"Failed to read artifact file {file_path}: {e}") + except Exception as e: + logger.warning(f"Failed to list artifact directory {artifact_dir}: {e}") + + return artifacts + def log_prompt( self, program_id: str, diff --git a/openevolve/iteration.py b/openevolve/iteration.py index b447055bd..244f4355f 100644 --- a/openevolve/iteration.py +++ b/openevolve/iteration.py @@ -26,6 +26,10 @@ class Result: parent: str = None child_metrics: str = None iteration_time: float = None + prompt: str = None + llm_response: str = None + artifacts: dict = None + def run_iteration_sync(iteration: int, config: Config, evaluation_file: str, database_path: str): @@ -59,21 +63,32 @@ def run_iteration_sync(iteration: int, config: Config, evaluation_file: str, dat evaluation_file, llm_evaluator_ensemble, evaluator_prompt_sampler, + database=database, ) # Sample parent and inspirations from current island parent, inspirations = database.sample() + # Get artifacts for the parent program if available + parent_artifacts = database.get_artifacts(parent.id) + + # Get actual top programs for prompt context (separate from inspirations) + # This ensures the LLM sees only high-performing programs as examples + actual_top_programs = database.get_top_programs(5) + + # Build prompt prompt = prompt_sampler.build_prompt( current_program=parent.code, parent_program=parent.code, # We don't have the parent's code, use the same program_metrics=parent.metrics, previous_programs=[p.to_dict() for p in database.get_top_programs(3)], - top_programs=[p.to_dict() for p in inspirations], + top_programs=[p.to_dict() for p in actual_top_programs], + inspirations=[p.to_dict() for p in inspirations], language=config.language, evolution_round=iteration, - allow_full_rewrite=config.allow_full_rewrites, + diff_based_evolution=config.diff_based_evolution, + program_artifacts=parent_artifacts if parent_artifacts else None, ) async def _run(): @@ -120,7 +135,8 @@ async def _run(): # Evaluate the child program child_id = str(uuid.uuid4()) result.child_metrics = await evaluator.evaluate_program(child_code, child_id) - + # Handle artifacts if they exist + artifacts = evaluator.get_pending_artifacts(child_id) # Create a child program result.child_program = Program( id=child_id, @@ -134,6 +150,10 @@ async def _run(): "parent_metrics": parent.metrics, }, ) + result.prompt = prompt + result.llm_response = llm_response + # Store artifacts in the result so they can be saved later + result.artifacts = artifacts except Exception as e: logger.exception("Error in PID %s:", os.getpid()) From 16e7dc97312ecadc02356036ba7db4602c08f6ca Mon Sep 17 00:00:00 2001 From: SuhailB Date: Wed, 2 Jul 2025 17:49:39 -0700 Subject: [PATCH 03/11] Fixed large artifact store/load issue --- openevolve/database.py | 202 +---------------------------------------- 1 file changed, 4 insertions(+), 198 deletions(-) diff --git a/openevolve/database.py b/openevolve/database.py index 042b87186..a3269802c 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -359,9 +359,10 @@ def save(self, path: Optional[str] = None, iteration: int = 0) -> None: try: with FileLock(lock_path, timeout=10): # Create directory and remove old path if it exists - if os.path.exists(save_path): - shutil.rmtree(save_path) - os.makedirs(save_path) + # if os.path.exists(save_path): + # shutil.rmtree(save_path) + # create directory if it doesn't exist + os.makedirs(save_path, exist_ok=True) # Save each program for program in self.programs.values(): @@ -1377,198 +1378,3 @@ def log_prompt( if program_id not in self.prompts_by_program: self.prompts_by_program[program_id] = {} self.prompts_by_program[program_id][template_key] = prompt - - # Artifact storage and retrieval methods - - def store_artifacts(self, program_id: str, artifacts: Dict[str, Union[str, bytes]]) -> None: - """ - Store artifacts for a program - - Args: - program_id: ID of the program - artifacts: Dictionary of artifact name to content - """ - if not artifacts: - return - - program = self.get(program_id) - if not program: - logger.warning(f"Cannot store artifacts: program {program_id} not found") - return - - # Check if artifacts are enabled - artifacts_enabled = os.environ.get("ENABLE_ARTIFACTS", "true").lower() == "true" - if not artifacts_enabled: - logger.debug("Artifacts disabled, skipping storage") - return - - # Split artifacts by size - small_artifacts = {} - large_artifacts = {} - size_threshold = getattr(self.config, "artifact_size_threshold", 32 * 1024) # 32KB default - - for key, value in artifacts.items(): - size = self._get_artifact_size(value) - if size <= size_threshold: - small_artifacts[key] = value - else: - large_artifacts[key] = value - - # Store small artifacts as JSON - if small_artifacts: - program.artifacts_json = json.dumps(small_artifacts, default=self._artifact_serializer) - logger.debug(f"Stored {len(small_artifacts)} small artifacts for program {program_id}") - - # Store large artifacts to disk - if large_artifacts: - artifact_dir = self._create_artifact_dir(program_id) - program.artifact_dir = artifact_dir - for key, value in large_artifacts.items(): - self._write_artifact_file(artifact_dir, key, value) - logger.debug(f"Stored {len(large_artifacts)} large artifacts for program {program_id}") - - def get_artifacts(self, program_id: str) -> Dict[str, Union[str, bytes]]: - """ - Retrieve all artifacts for a program - - Args: - program_id: ID of the program - - Returns: - Dictionary of artifact name to content - """ - program = self.get(program_id) - if not program: - return {} - - artifacts = {} - - # Load small artifacts from JSON - if program.artifacts_json: - try: - small_artifacts = json.loads(program.artifacts_json) - artifacts.update(small_artifacts) - except json.JSONDecodeError as e: - logger.warning(f"Failed to decode artifacts JSON for program {program_id}: {e}") - - # Load large artifacts from disk - if program.artifact_dir and os.path.exists(program.artifact_dir): - disk_artifacts = self._load_artifact_dir(program.artifact_dir) - artifacts.update(disk_artifacts) - - return artifacts - - def _get_artifact_size(self, value: Union[str, bytes]) -> int: - """Get size of an artifact value in bytes""" - if isinstance(value, str): - return len(value.encode("utf-8")) - elif isinstance(value, bytes): - return len(value) - else: - return len(str(value).encode("utf-8")) - - def _artifact_serializer(self, obj): - """JSON serializer for artifacts that handles bytes""" - if isinstance(obj, bytes): - return {"__bytes__": base64.b64encode(obj).decode("utf-8")} - raise TypeError(f"Object of type {type(obj)} is not JSON serializable") - - def _artifact_deserializer(self, dct): - """JSON deserializer for artifacts that handles bytes""" - if "__bytes__" in dct: - return base64.b64decode(dct["__bytes__"]) - return dct - - def _create_artifact_dir(self, program_id: str) -> str: - """Create artifact directory for a program""" - base_path = getattr(self.config, "artifacts_base_path", None) - if not base_path: - base_path = ( - os.path.join(self.config.db_path or ".", "artifacts") - if self.config.db_path - else "./artifacts" - ) - - artifact_dir = os.path.join(base_path, program_id) - os.makedirs(artifact_dir, exist_ok=True) - return artifact_dir - - def _write_artifact_file(self, artifact_dir: str, key: str, value: Union[str, bytes]) -> None: - """Write an artifact to a file""" - # Sanitize filename - safe_key = "".join(c for c in key if c.isalnum() or c in "._-") - if not safe_key: - safe_key = "artifact" - - file_path = os.path.join(artifact_dir, safe_key) - - try: - if isinstance(value, str): - with open(file_path, "w", encoding="utf-8") as f: - f.write(value) - elif isinstance(value, bytes): - with open(file_path, "wb") as f: - f.write(value) - else: - # Convert to string and write - with open(file_path, "w", encoding="utf-8") as f: - f.write(str(value)) - except Exception as e: - logger.warning(f"Failed to write artifact {key} to {file_path}: {e}") - - def _load_artifact_dir(self, artifact_dir: str) -> Dict[str, Union[str, bytes]]: - """Load artifacts from a directory""" - artifacts = {} - - try: - for filename in os.listdir(artifact_dir): - file_path = os.path.join(artifact_dir, filename) - if os.path.isfile(file_path): - try: - # Try to read as text first - with open(file_path, "r", encoding="utf-8") as f: - content = f.read() - artifacts[filename] = content - except UnicodeDecodeError: - # If text fails, read as binary - with open(file_path, "rb") as f: - content = f.read() - artifacts[filename] = content - except Exception as e: - logger.warning(f"Failed to read artifact file {file_path}: {e}") - except Exception as e: - logger.warning(f"Failed to list artifact directory {artifact_dir}: {e}") - - return artifacts - - def log_prompt( - self, - program_id: str, - template_key: str, - prompt: Dict[str, str], - responses: Optional[List[str]] = None, - ) -> None: - """ - Log a prompt for a program. - Only logs if self.config.log_prompts is True. - - Args: - program_id: ID of the program to log the prompt for - template_key: Key for the prompt template - prompt: Prompts in the format {template_key: { 'system': str, 'user': str }}. - responses: Optional list of responses to the prompt, if available. - """ - - if not self.config.log_prompts: - return - - if responses is None: - responses = [] - prompt["responses"] = responses - - if self.prompts_by_program is None: - self.prompts_by_program = {} - - if program_id not in self.prompts_by_program: - self.prompts_by_program[program_id] = {} - self.prompts_by_program[program_id][template_key] = prompt \ No newline at end of file From d18123a57541bfa3fc9def051210a3480b2709cb Mon Sep 17 00:00:00 2001 From: SuhailB Date: Thu, 3 Jul 2025 14:36:55 -0700 Subject: [PATCH 04/11] Added iteration increment --- openevolve/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openevolve/controller.py b/openevolve/controller.py index 20cfde6c3..6dd58473b 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -364,7 +364,7 @@ async def run( ) break self.database.save(temp_db_path, iteration) - + iteration += 1 except Exception as e: logger.error(f"Error in iteration {i+1}: {str(e)}") continue From 321fb328fa018900c9b7efdbe5520f96795600d9 Mon Sep 17 00:00:00 2001 From: SuhailB Date: Thu, 3 Jul 2025 15:12:45 -0700 Subject: [PATCH 05/11] fixed error in iteration log message --- openevolve/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openevolve/controller.py b/openevolve/controller.py index 6dd58473b..dd9cc2bea 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -366,7 +366,7 @@ async def run( self.database.save(temp_db_path, iteration) iteration += 1 except Exception as e: - logger.error(f"Error in iteration {i+1}: {str(e)}") + logger.error(f"Error in iteration {iteration}: {str(e)}") continue shutil.rmtree(temp_db_path) # Get the best program using our tracking mechanism From c5a2efec4fe7b4424d9412ef9adc7b6a25418418 Mon Sep 17 00:00:00 2001 From: SuhailB Date: Fri, 4 Jul 2025 10:43:58 -0700 Subject: [PATCH 06/11] Reverted database and config mods --- openevolve/config.py | 1 - openevolve/database.py | 68 +++++++++++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 21 deletions(-) diff --git a/openevolve/config.py b/openevolve/config.py index 71af39476..5c241a33e 100644 --- a/openevolve/config.py +++ b/openevolve/config.py @@ -151,7 +151,6 @@ class DatabaseConfig: # Evolutionary parameters population_size: int = 1000 - allowed_population_overflow: int = 50 archive_size: int = 100 num_islands: int = 5 diff --git a/openevolve/database.py b/openevolve/database.py index a3269802c..310076415 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -6,12 +6,9 @@ import json import logging import os -import shutil import random import time from dataclasses import asdict, dataclass, field, fields -from pathlib import Path -from Levenshtein import ratio from filelock import FileLock, Timeout from typing import Any, Dict, List, Optional, Set, Tuple, Union @@ -358,9 +355,6 @@ def save(self, path: Optional[str] = None, iteration: int = 0) -> None: lock_path = os.path.join("tmp/locks", lock_name) try: with FileLock(lock_path, timeout=10): - # Create directory and remove old path if it exists - # if os.path.exists(save_path): - # shutil.rmtree(save_path) # create directory if it doesn't exist os.makedirs(save_path, exist_ok=True) @@ -590,7 +584,7 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: if dim == "complexity": # Use code length as complexity measure complexity = len(program.code) - bin_idx = min(int(complexity / 1000), self.feature_bins - 1) + bin_idx = min(int(complexity / 1000 * self.feature_bins), self.feature_bins - 1) coords.append(bin_idx) elif dim == "diversity": # Use average edit distance to other programs @@ -600,10 +594,13 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: sample_programs = random.sample( list(self.programs.values()), min(5, len(self.programs)) ) - avg_distance_ratio = sum( - 1 - calculate_edit_distance(program.code, other.code) for other in sample_programs + avg_distance = sum( + calculate_edit_distance(program.code, other.code) + for other in sample_programs ) / len(sample_programs) - bin_idx = min(int(avg_distance_ratio * 20), self.feature_bins - 1) + bin_idx = min( + int(avg_distance / 1000 * self.feature_bins), self.feature_bins - 1 + ) coords.append(bin_idx) elif dim == "score": # Use average of numeric metrics @@ -611,7 +608,7 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: bin_idx = 0 else: avg_score = safe_numeric_average(program.metrics) - bin_idx = max(0, min(int(avg_score * self.feature_bins), self.feature_bins - 1)) + bin_idx = min(int(avg_score * self.feature_bins), self.feature_bins - 1) coords.append(bin_idx) elif dim in program.metrics: # Use specific metric @@ -960,10 +957,7 @@ def _enforce_population_limit(self) -> None: """ Enforce the population size limit by removing worst programs if needed """ - if ( - len(self.programs) - <= self.config.population_size + self.config.allowed_population_overflow - ): + if len(self.programs) <= self.config.population_size: return # Calculate how many programs to remove @@ -1148,7 +1142,7 @@ def _calculate_island_diversity(self, programs: List[Program]) -> float: if len(programs) < 2: return 0.0 - total_diversity_ratio = 0 + total_diversity = 0 comparisons = 0 # Use deterministic sampling instead of random.sample() to ensure consistent results @@ -1165,12 +1159,46 @@ def _calculate_island_diversity(self, programs: List[Program]) -> float: for i, prog1 in enumerate(sample_programs): for prog2 in sample_programs[i + 1 :]: - total_diversity_ratio += 1 - ratio( - prog1.code, prog2.code - ) # ratio measures similarity + if comparisons >= max_comparisons: + break + + # Use fast approximation instead of expensive edit distance + diversity = self._fast_code_diversity(prog1.code, prog2.code) + total_diversity += diversity comparisons += 1 - return total_diversity_ratio / max(1, comparisons) + if comparisons >= max_comparisons: + break + + return total_diversity / max(1, comparisons) + + def _fast_code_diversity(self, code1: str, code2: str) -> float: + """ + Fast approximation of code diversity using simple metrics + + Returns diversity score (higher = more diverse) + """ + if code1 == code2: + return 0.0 + + # Length difference (scaled to reasonable range) + len1, len2 = len(code1), len(code2) + length_diff = abs(len1 - len2) + + # Line count difference + lines1 = code1.count("\n") + lines2 = code2.count("\n") + line_diff = abs(lines1 - lines2) + + # Simple character set difference + chars1 = set(code1) + chars2 = set(code2) + char_diff = len(chars1.symmetric_difference(chars2)) + + # Combine metrics (scaled to match original edit distance range) + diversity = length_diff * 0.1 + line_diff * 10 + char_diff * 0.5 + + return diversity def log_island_status(self) -> None: """Log current status of all islands""" From 38a656866a29fd2412534d2c6b58dc20a0783932 Mon Sep 17 00:00:00 2001 From: Suhail Basalama <40245947+SuhailB@users.noreply.github.com> Date: Sat, 5 Jul 2025 01:29:41 -0700 Subject: [PATCH 07/11] Updated dependencies in pyproject.toml --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ee9fa7c23..1f4000375 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,6 @@ dependencies = [ "numpy>=1.22.0", "tqdm>=4.64.0", "flask", - "levenshtein>=0.27.1", "filelock>=3.18.0", ] @@ -47,4 +46,4 @@ disallow_untyped_defs = true disallow_incomplete_defs = true [project.scripts] -openevolve-run = "openevolve.cli:main" \ No newline at end of file +openevolve-run = "openevolve.cli:main" From 11da33cdc19d5ede3f571b2c3630f6c785a06bf3 Mon Sep 17 00:00:00 2001 From: SuhailB Date: Sat, 5 Jul 2025 13:45:53 -0700 Subject: [PATCH 08/11] Updated utils init and added a new line in code_utils --- openevolve/utils/__init__.py | 1 + openevolve/utils/code_utils.py | 1 + 2 files changed, 2 insertions(+) diff --git a/openevolve/utils/__init__.py b/openevolve/utils/__init__.py index f9c300170..89a4a1b62 100644 --- a/openevolve/utils/__init__.py +++ b/openevolve/utils/__init__.py @@ -32,6 +32,7 @@ "retry_async", "run_in_executor", "apply_diff", + "calculate_edit_distance", "extract_code_language", "extract_diffs", "format_diff_summary", diff --git a/openevolve/utils/code_utils.py b/openevolve/utils/code_utils.py index 398587b32..60fb63001 100644 --- a/openevolve/utils/code_utils.py +++ b/openevolve/utils/code_utils.py @@ -179,6 +179,7 @@ def calculate_edit_distance(code1: str, code2: str) -> int: return dp[m][n] + def extract_code_language(code: str) -> str: """ Try to determine the language of a code snippet From ab4d044cc7567f693512bf212ef7615d148023b8 Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Mon, 7 Jul 2025 21:32:53 +0800 Subject: [PATCH 09/11] Update config.py --- openevolve/config.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/openevolve/config.py b/openevolve/config.py index 5c241a33e..c89766d1c 100644 --- a/openevolve/config.py +++ b/openevolve/config.py @@ -56,7 +56,10 @@ class LLMConfig(LLMModelConfig): retry_delay: int = 5 # n-model configuration for evolution LLM ensemble - models: List[LLMModelConfig] = field(default_factory=lambda: [LLMModelConfig()]) + models: List[LLMModelConfig] = field(default_factory=lambda: [ + LLMModelConfig(name="gpt-4o-mini", weight=0.8), + LLMModelConfig(name="gpt-4o", weight=0.2) + ]) # n-model configuration for evaluator LLM ensemble evaluator_models: List[LLMModelConfig] = field(default_factory=lambda: []) @@ -195,7 +198,7 @@ class EvaluatorConfig: cascade_thresholds: List[float] = field(default_factory=lambda: [0.5, 0.75, 0.9]) # Parallel evaluation - parallel_evaluations: int = 4 + parallel_evaluations: int = 1 distributed: bool = False # LLM-based feedback From 470a271e662146a4416bcb107f0db9ed4ec432a6 Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Mon, 7 Jul 2025 22:30:10 +0800 Subject: [PATCH 10/11] Refactor to threaded parallel evolution and remove file locks Replaces process-based parallelism with a new thread-based parallel controller using shared memory for improved performance and reliability. Removes filelock usage and related code from the database, as thread-based parallelism does not require file-based locking. Updates the main controller to use the new parallel system, adds checkpoint resume support, and adapts iteration logic for thread safety. Cleans up dependencies by removing filelock from requirements. --- openevolve/cli.py | 1 + openevolve/controller.py | 222 ++++++++------------- openevolve/database.py | 63 +++--- openevolve/iteration.py | 225 ++++++++++----------- openevolve/threaded_parallel.py | 336 ++++++++++++++++++++++++++++++++ pyproject.toml | 1 - 6 files changed, 546 insertions(+), 302 deletions(-) create mode 100644 openevolve/threaded_parallel.py diff --git a/openevolve/cli.py b/openevolve/cli.py index 98b0008f9..dd5d707dd 100644 --- a/openevolve/cli.py +++ b/openevolve/cli.py @@ -126,6 +126,7 @@ async def main_async() -> int: best_program = await openevolve.run( iterations=args.iterations, target_score=args.target_score, + checkpoint_path=args.checkpoint, ) # Get the checkpoint path diff --git a/openevolve/controller.py b/openevolve/controller.py index dd9cc2bea..bf4ea683d 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -5,21 +5,18 @@ import asyncio import logging import os -import shutil -import re +import signal import time import uuid from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union -import traceback -import concurrent.futures +from typing import Any, Dict, List, Optional, Union from openevolve.config import Config, load_config from openevolve.database import Program, ProgramDatabase from openevolve.evaluator import Evaluator from openevolve.llm.ensemble import LLMEnsemble from openevolve.prompt.sampler import PromptSampler -from openevolve.iteration import run_iteration_sync, Result +from openevolve.threaded_parallel import ImprovedParallelController from openevolve.utils.code_utils import ( extract_code_language, ) @@ -164,6 +161,9 @@ def __init__( self.evaluation_file = evaluation_file logger.info(f"Initialized OpenEvolve with {initial_program_path}") + + # Initialize improved parallel processing components + self.parallel_controller = None def _setup_logging(self) -> None: """Set up logging""" @@ -198,24 +198,31 @@ async def run( self, iterations: Optional[int] = None, target_score: Optional[float] = None, - ) -> Program: + checkpoint_path: Optional[str] = None, + ) -> Optional[Program]: """ - Run the evolution process + Run the evolution process with improved parallel processing Args: iterations: Maximum number of iterations (uses config if None) target_score: Target score to reach (continues until reached if specified) + checkpoint_path: Path to resume from checkpoint Returns: Best program found """ max_iterations = iterations or self.config.max_iterations - - # Define start_iteration before creating the initial program - start_iteration = self.database.last_iteration + + # Determine starting iteration + start_iteration = 0 + if checkpoint_path and os.path.exists(checkpoint_path): + self._load_checkpoint(checkpoint_path) + start_iteration = self.database.last_iteration + 1 + logger.info(f"Resuming from checkpoint at iteration {start_iteration}") + else: + start_iteration = self.database.last_iteration # Only add initial program if starting fresh (not resuming from checkpoint) - # Check if we're resuming AND no program matches initial code to avoid pollution should_add_initial = ( start_iteration == 0 and len(self.database.programs) == 0 @@ -244,144 +251,49 @@ async def run( self.database.add(initial_program) else: logger.info( - f"Skipping initial program addition (resuming from iteration {start_iteration} with {len(self.database.programs)} existing programs)" + f"Skipping initial program addition (resuming from iteration {start_iteration} " + f"with {len(self.database.programs)} existing programs)" ) - # Main evolution loop - total_iterations = start_iteration + max_iterations - - logger.info( - f"Starting evolution from iteration {start_iteration} for {max_iterations} iterations (total: {total_iterations})" - ) - - # Island-based evolution variables - programs_per_island = max( - 1, max_iterations // (self.config.database.num_islands * 10) - ) # Dynamic allocation - current_island_counter = 0 - - logger.info(f"Using island-based evolution with {self.config.database.num_islands} islands") - self.database.log_island_status() - - # create temp file to save database snapshots to for process workers to load from - temp_db_path = "tmp/" + str(uuid.uuid4()) - self.database.save(temp_db_path, start_iteration) - - with concurrent.futures.ProcessPoolExecutor( - max_workers=self.config.evaluator.parallel_evaluations - ) as executor: - futures = [] - for i in range(start_iteration, total_iterations): - futures.append( - executor.submit( - run_iteration_sync, i, self.config, self.evaluation_file, temp_db_path - ) - ) - - iteration = start_iteration + 1 - for future in concurrent.futures.as_completed(futures): - logger.info(f"Completed iteration {iteration}") - try: - result: Result = future.result() - # if result is nonType - if not isinstance(result, Result): - logger.warning(f"No valid diffs or program length exceeded limit") - continue - # Manage island evolution - switch islands periodically - if ( - iteration - 1 > start_iteration - and current_island_counter >= programs_per_island - ): - self.database.next_island() - current_island_counter = 0 - logger.debug(f"Switched to island {self.database.current_island}") - - current_island_counter += 1 - - # Add to database (will be added to current island) - self.database.add(result.child_program, iteration=iteration) - - # Log prompts - self.database.log_prompt( - template_key=( - "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" - ), - program_id=result.child_program.id, - prompt=result.prompt, - responses=[result.llm_response], - ) - - # Store artifacts if they exist (after program is added to database) - if result.artifacts: - self.database.store_artifacts(result.child_program.id, result.artifacts) - - # Log prompts - self.database.log_prompt( - template_key=( - "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" - ), - program_id=result.child_program.id, - prompt=result.prompt, - responses=[result.llm_response], - ) - - # Increment generation for current island - self.database.increment_island_generation() - - # Check if migration should occur - if self.database.should_migrate(): - logger.info(f"Performing migration at iteration {iteration}") - self.database.migrate_programs() - self.database.log_island_status() - - # Log progress - self._log_iteration( - iteration, result.parent, result.child_program, result.iteration_time - ) + # Initialize improved parallel processing + try: + self.parallel_controller = ImprovedParallelController( + self.config, self.evaluation_file, self.database + ) + + # Set up signal handlers for graceful shutdown + def signal_handler(signum, frame): + logger.info(f"Received signal {signum}, initiating graceful shutdown...") + self.parallel_controller.request_shutdown() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + self.parallel_controller.start() + + # Run evolution with improved parallel processing and checkpoint callback + await self._run_evolution_with_checkpoints( + start_iteration, max_iterations, target_score + ) + + finally: + # Clean up parallel processing resources + if self.parallel_controller: + self.parallel_controller.stop() + self.parallel_controller = None - # Specifically check if this is the new best program - if self.database.best_program_id == result.child_program.id: - logger.info( - f"🌟 New best solution found at iteration {iteration}: {result.child_program.id}" - ) - logger.info(f"Metrics: {format_metrics_safe(result.child_program.metrics)}") - - # Save checkpoint - if (iteration) % self.config.checkpoint_interval == 0: - self._save_checkpoint(iteration) - # Also log island status at checkpoints - logger.info(f"Island status at checkpoint {iteration}:") - self.database.log_island_status() - - # Check if target score reached - if target_score is not None: - avg_score = sum(result["child_metrics"].values()) / max( - 1, len(result.child_metrics) - ) - if avg_score >= target_score: - logger.info( - f"Target score {target_score} reached after {iteration} iterations" - ) - break - self.database.save(temp_db_path, iteration) - iteration += 1 - except Exception as e: - logger.error(f"Error in iteration {iteration}: {str(e)}") - continue - shutil.rmtree(temp_db_path) - # Get the best program using our tracking mechanism + # Get the best program best_program = None if self.database.best_program_id: best_program = self.database.get(self.database.best_program_id) logger.info(f"Using tracked best program: {self.database.best_program_id}") - # Fallback to calculating best program if tracked program not found if best_program is None: best_program = self.database.get_best_program() logger.info("Using calculated best program (tracked program not found)") # Check if there's a better program by combined_score that wasn't tracked - if "combined_score" in best_program.metrics: + if best_program and "combined_score" in best_program.metrics: best_by_combined = self.database.get_best_program(metric="combined_score") if ( best_by_combined @@ -397,7 +309,8 @@ async def run( f"Found program with better combined_score: {best_by_combined.id}" ) logger.warning( - f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}" + f"Score difference: {best_program.metrics['combined_score']:.4f} vs " + f"{best_by_combined.metrics['combined_score']:.4f}" ) best_program = best_by_combined @@ -406,14 +319,10 @@ async def run( f"Evolution complete. Best program has metrics: " f"{format_metrics_safe(best_program.metrics)}" ) - - # Save the best program (using our tracked best program) self._save_best_program(best_program) - return best_program else: logger.warning("No valid programs found during evolution") - # Return None if no programs found instead of undefined initial_program return None def _log_iteration( @@ -499,6 +408,35 @@ def _save_checkpoint(self, iteration: int) -> None: logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}") + def _load_checkpoint(self, checkpoint_path: str) -> None: + """Load state from a checkpoint directory""" + if not os.path.exists(checkpoint_path): + raise FileNotFoundError(f"Checkpoint directory {checkpoint_path} not found") + + logger.info(f"Loading checkpoint from {checkpoint_path}") + self.database.load(checkpoint_path) + logger.info( + f"Checkpoint loaded successfully (iteration {self.database.last_iteration})" + ) + + async def _run_evolution_with_checkpoints( + self, start_iteration: int, max_iterations: int, target_score: Optional[float] + ) -> None: + """Run evolution with checkpoint saving support""" + logger.info(f"Using island-based evolution with {self.config.database.num_islands} islands") + self.database.log_island_status() + + # Run the evolution process with checkpoint callback + await self.parallel_controller.run_evolution( + start_iteration, max_iterations, target_score, + checkpoint_callback=self._save_checkpoint + ) + + # Save final checkpoint if needed + final_iteration = start_iteration + max_iterations - 1 + if final_iteration > 0 and final_iteration % self.config.checkpoint_interval == 0: + self._save_checkpoint(final_iteration) + def _save_best_program(self, program: Optional[Program] = None) -> None: """ Save the best program diff --git a/openevolve/database.py b/openevolve/database.py index 310076415..f33f994a1 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -9,7 +9,7 @@ import random import time from dataclasses import asdict, dataclass, field, fields -from filelock import FileLock, Timeout +# FileLock removed - no longer needed with threaded parallel processing from typing import Any, Dict, List, Optional, Set, Tuple, Union import numpy as np @@ -351,26 +351,19 @@ def save(self, path: Optional[str] = None, iteration: int = 0) -> None: logger.warning("No database path specified, skipping save") return - lock_name = os.path.basename(save_path) + ".lock" - lock_path = os.path.join("tmp/locks", lock_name) - try: - with FileLock(lock_path, timeout=10): - # create directory if it doesn't exist - os.makedirs(save_path, exist_ok=True) + # create directory if it doesn't exist + os.makedirs(save_path, exist_ok=True) - # Save each program - for program in self.programs.values(): - prompts = None - if ( - self.config.log_prompts - and self.prompts_by_program - and program.id in self.prompts_by_program - ): - prompts = self.prompts_by_program[program.id] - self._save_program(program, save_path, prompts=prompts) - - except Timeout: - logger.exception("Could not acquire the lock within 10 seconds") + # Save each program + for program in self.programs.values(): + prompts = None + if ( + self.config.log_prompts + and self.prompts_by_program + and program.id in self.prompts_by_program + ): + prompts = self.prompts_by_program[program.id] + self._save_program(program, save_path, prompts=prompts) # Save metadata metadata = { @@ -419,25 +412,19 @@ def load(self, path: str) -> None: logger.info(f"Loaded database metadata with last_iteration={self.last_iteration}") # Load programs - lock_name = os.path.basename(path) + ".lock" - lock_path = os.path.join("tmp/locks", lock_name) programs_dir = os.path.join(path, "programs") - try: - with FileLock(lock_path, timeout=10): - if os.path.exists(programs_dir): - for program_file in os.listdir(programs_dir): - if program_file.endswith(".json"): - program_path = os.path.join(programs_dir, program_file) - try: - with open(program_path, "r") as f: - program_data = json.load(f) - - program = Program.from_dict(program_data) - self.programs[program.id] = program - except Exception as e: - logger.warning(f"Error loading program {program_file}: {str(e)}") - except Timeout: - logger.exception("Could not acquire the lock within 10 seconds") + if os.path.exists(programs_dir): + for program_file in os.listdir(programs_dir): + if program_file.endswith(".json"): + program_path = os.path.join(programs_dir, program_file) + try: + with open(program_path, "r") as f: + program_data = json.load(f) + + program = Program.from_dict(program_data) + self.programs[program.id] = program + except Exception as e: + logger.warning(f"Error loading program {program_file}: {str(e)}") # Reconstruct island assignments from metadata self._reconstruct_islands(saved_islands) diff --git a/openevolve/iteration.py b/openevolve/iteration.py index 244f4355f..af3d3850d 100644 --- a/openevolve/iteration.py +++ b/openevolve/iteration.py @@ -32,133 +32,116 @@ class Result: -def run_iteration_sync(iteration: int, config: Config, evaluation_file: str, database_path: str): - # setup logger showing PID for current process - for handler in logging.root.handlers[:]: - logging.root.removeHandler(handler) - logging.basicConfig( - level=getattr(logging, config.log_level), - format="%(asctime)s - %(levelname)s - PID %(process)d - %(message)s", - ) + + +async def run_iteration_with_shared_db( + iteration: int, + config: Config, + database: ProgramDatabase, + evaluator: Evaluator, + llm_ensemble: LLMEnsemble, + prompt_sampler: PromptSampler +): + """ + Run a single iteration using shared memory database + + This is optimized for use with persistent worker processes. + """ logger = logging.getLogger(__name__) - logger.info("Starting iteration in PID %s", os.getpid()) - - llm_ensemble = LLMEnsemble(config.llm.models) - llm_evaluator_ensemble = LLMEnsemble(config.llm.evaluator_models) - - prompt_sampler = PromptSampler(config.prompt) - evaluator_prompt_sampler = PromptSampler(config.prompt) - evaluator_prompt_sampler.set_templates("evaluator_system_message") - - # Pass random seed to database if specified - if config.random_seed is not None: - config.database.random_seed = config.random_seed - - # load most recent database snapshot - config.database.db_path = database_path - database = ProgramDatabase(config.database) - - evaluator = Evaluator( - config.evaluator, - evaluation_file, - llm_evaluator_ensemble, - evaluator_prompt_sampler, - database=database, - ) - - # Sample parent and inspirations from current island - parent, inspirations = database.sample() - - # Get artifacts for the parent program if available - parent_artifacts = database.get_artifacts(parent.id) - - # Get actual top programs for prompt context (separate from inspirations) - # This ensures the LLM sees only high-performing programs as examples - actual_top_programs = database.get_top_programs(5) - - - # Build prompt - prompt = prompt_sampler.build_prompt( - current_program=parent.code, - parent_program=parent.code, # We don't have the parent's code, use the same - program_metrics=parent.metrics, - previous_programs=[p.to_dict() for p in database.get_top_programs(3)], - top_programs=[p.to_dict() for p in actual_top_programs], - inspirations=[p.to_dict() for p in inspirations], - language=config.language, - evolution_round=iteration, - diff_based_evolution=config.diff_based_evolution, - program_artifacts=parent_artifacts if parent_artifacts else None, - ) - - async def _run(): + + try: + # Sample parent and inspirations from database + parent, inspirations = database.sample() + + # Get artifacts for the parent program if available + parent_artifacts = database.get_artifacts(parent.id) + + # Get actual top programs for prompt context (separate from inspirations) + actual_top_programs = database.get_top_programs(5) + + # Build prompt + prompt = prompt_sampler.build_prompt( + current_program=parent.code, + parent_program=parent.code, + program_metrics=parent.metrics, + previous_programs=[p.to_dict() for p in database.get_top_programs(3)], + top_programs=[p.to_dict() for p in actual_top_programs], + inspirations=[p.to_dict() for p in inspirations], + language=config.language, + evolution_round=iteration, + diff_based_evolution=config.diff_based_evolution, + program_artifacts=parent_artifacts if parent_artifacts else None, + ) + result = Result(parent=parent) iteration_start = time.time() # Generate code modification - try: - llm_response = await llm_ensemble.generate_with_context( - system_message=prompt["system"], - messages=[{"role": "user", "content": prompt["user"]}], + llm_response = await llm_ensemble.generate_with_context( + system_message=prompt["system"], + messages=[{"role": "user", "content": prompt["user"]}], + ) + + # Parse the response + if config.diff_based_evolution: + diff_blocks = extract_diffs(llm_response) + + if not diff_blocks: + logger.warning(f"Iteration {iteration+1}: No valid diffs found in response") + return None + + # Apply the diffs + child_code = apply_diff(parent.code, llm_response) + changes_summary = format_diff_summary(diff_blocks) + else: + # Parse full rewrite + new_code = parse_full_rewrite(llm_response, config.language) + + if not new_code: + logger.warning(f"Iteration {iteration+1}: No valid code found in response") + return None + + child_code = new_code + changes_summary = "Full rewrite" + + # Check code length + if len(child_code) > config.max_code_length: + logger.warning( + f"Iteration {iteration+1}: Generated code exceeds maximum length " + f"({len(child_code)} > {config.max_code_length})" ) - - # Parse the response - if config.diff_based_evolution: - diff_blocks = extract_diffs(llm_response) - - if not diff_blocks: - logger.warning(f"Iteration {iteration+1}: No valid diffs found in response") - return - - # Apply the diffs - child_code = apply_diff(parent.code, llm_response) - changes_summary = format_diff_summary(diff_blocks) - else: - # Parse full rewrite - new_code = parse_full_rewrite(llm_response, config.language) - - if not new_code: - logger.warning(f"Iteration {iteration+1}: No valid code found in response") - return - - child_code = new_code - changes_summary = "Full rewrite" - - # Check code length - if len(child_code) > config.max_code_length: - logger.warning( - f"Iteration {iteration+1}: Generated code exceeds maximum length " - f"({len(child_code)} > {config.max_code_length})" - ) - return - - # Evaluate the child program - child_id = str(uuid.uuid4()) - result.child_metrics = await evaluator.evaluate_program(child_code, child_id) - # Handle artifacts if they exist - artifacts = evaluator.get_pending_artifacts(child_id) - # Create a child program - result.child_program = Program( - id=child_id, - code=child_code, - language=config.language, - parent_id=parent.id, - generation=parent.generation + 1, - metrics=result.child_metrics, - metadata={ - "changes": changes_summary, - "parent_metrics": parent.metrics, - }, - ) - result.prompt = prompt - result.llm_response = llm_response - # Store artifacts in the result so they can be saved later - result.artifacts = artifacts - - except Exception as e: - logger.exception("Error in PID %s:", os.getpid()) - + return None + + # Evaluate the child program + child_id = str(uuid.uuid4()) + result.child_metrics = await evaluator.evaluate_program(child_code, child_id) + + # Handle artifacts if they exist + artifacts = evaluator.get_pending_artifacts(child_id) + + # Create a child program + result.child_program = Program( + id=child_id, + code=child_code, + language=config.language, + parent_id=parent.id, + generation=parent.generation + 1, + metrics=result.child_metrics, + iteration_found=iteration, + metadata={ + "changes": changes_summary, + "parent_metrics": parent.metrics, + }, + ) + + result.prompt = prompt + result.llm_response = llm_response + result.artifacts = artifacts result.iteration_time = time.time() - iteration_start + result.iteration = iteration + return result - return asyncio.run(_run()) + except Exception as e: + logger.exception(f"Error in iteration {iteration}: {e}") + return None diff --git a/openevolve/threaded_parallel.py b/openevolve/threaded_parallel.py new file mode 100644 index 000000000..e772d6920 --- /dev/null +++ b/openevolve/threaded_parallel.py @@ -0,0 +1,336 @@ +""" +Improved parallel processing using threads with shared memory +""" + +import asyncio +import logging +import signal +import threading +import time +from concurrent.futures import ThreadPoolExecutor, Future +from typing import Any, Dict, List, Optional + +from openevolve.config import Config +from openevolve.database import ProgramDatabase +from openevolve.evaluator import Evaluator +from openevolve.llm.ensemble import LLMEnsemble +from openevolve.prompt.sampler import PromptSampler +from openevolve.iteration import run_iteration_with_shared_db + +logger = logging.getLogger(__name__) + + +class ThreadedEvaluationPool: + """ + Thread-based parallel evaluation pool for improved performance + + Uses threads instead of processes to avoid pickling issues while + still providing parallelism for I/O-bound LLM calls. + """ + + def __init__(self, config: Config, evaluation_file: str, database: ProgramDatabase): + self.config = config + self.evaluation_file = evaluation_file + self.database = database + + self.num_workers = config.evaluator.parallel_evaluations + self.executor = None + + # Pre-initialize components for each thread + self.thread_local = threading.local() + + logger.info(f"Initializing threaded evaluation pool with {self.num_workers} workers") + + def start(self) -> None: + """Start the thread pool""" + self.executor = ThreadPoolExecutor( + max_workers=self.num_workers, + thread_name_prefix="EvalWorker" + ) + logger.info(f"Started thread pool with {self.num_workers} threads") + + def stop(self) -> None: + """Stop the thread pool""" + if self.executor: + self.executor.shutdown(wait=True) + self.executor = None + logger.info("Stopped thread pool") + + def submit_evaluation(self, iteration: int) -> Future: + """ + Submit an evaluation task to the thread pool + + Args: + iteration: Iteration number to evaluate + + Returns: + Future that will contain the result + """ + if not self.executor: + raise RuntimeError("Thread pool not started") + + return self.executor.submit(self._run_evaluation, iteration) + + def _run_evaluation(self, iteration: int): + """Run evaluation in a worker thread""" + # Get or create thread-local components + if not hasattr(self.thread_local, 'initialized'): + self._initialize_thread_components() + + try: + # Run the iteration + result = asyncio.run(run_iteration_with_shared_db( + iteration, + self.config, + self.database, # Shared database (thread-safe reads) + self.thread_local.evaluator, + self.thread_local.llm_ensemble, + self.thread_local.prompt_sampler + )) + + return result + + except Exception as e: + logger.error(f"Error in thread evaluation {iteration}: {e}") + return None + + def _initialize_thread_components(self) -> None: + """Initialize components for this thread""" + thread_id = threading.get_ident() + logger.debug(f"Initializing components for thread {thread_id}") + + try: + # Initialize LLM components + self.thread_local.llm_ensemble = LLMEnsemble(self.config.llm.models) + self.thread_local.llm_evaluator_ensemble = LLMEnsemble(self.config.llm.evaluator_models) + + # Initialize prompt samplers + self.thread_local.prompt_sampler = PromptSampler(self.config.prompt) + self.thread_local.evaluator_prompt_sampler = PromptSampler(self.config.prompt) + self.thread_local.evaluator_prompt_sampler.set_templates("evaluator_system_message") + + # Initialize evaluator + self.thread_local.evaluator = Evaluator( + self.config.evaluator, + self.evaluation_file, + self.thread_local.llm_evaluator_ensemble, + self.thread_local.evaluator_prompt_sampler, + database=self.database, + ) + + self.thread_local.initialized = True + logger.debug(f"Initialized components for thread {thread_id}") + + except Exception as e: + logger.error(f"Failed to initialize thread components: {e}") + raise + + +class ImprovedParallelController: + """ + Controller for improved parallel processing using shared memory and threads + """ + + def __init__(self, config: Config, evaluation_file: str, database: ProgramDatabase): + self.config = config + self.evaluation_file = evaluation_file + self.database = database + + self.thread_pool = None + self.database_lock = threading.RLock() # For database writes + self.shutdown_flag = threading.Event() # For graceful shutdown + + def start(self) -> None: + """Start the improved parallel system""" + self.thread_pool = ThreadedEvaluationPool( + self.config, self.evaluation_file, self.database + ) + self.thread_pool.start() + + logger.info("Started improved parallel controller") + + def stop(self) -> None: + """Stop the improved parallel system""" + self.shutdown_flag.set() # Signal shutdown + + if self.thread_pool: + self.thread_pool.stop() + self.thread_pool = None + + logger.info("Stopped improved parallel controller") + + def request_shutdown(self) -> None: + """Request graceful shutdown (for signal handlers)""" + logger.info("Graceful shutdown requested...") + self.shutdown_flag.set() + + async def run_evolution( + self, start_iteration: int, max_iterations: int, target_score: Optional[float] = None, + checkpoint_callback=None + ): + """ + Run evolution with improved parallel processing + + Args: + start_iteration: Starting iteration number + max_iterations: Maximum number of iterations + target_score: Target score to achieve + + Returns: + Best program found + """ + total_iterations = start_iteration + max_iterations + + logger.info( + f"Starting improved parallel evolution from iteration {start_iteration} " + f"for {max_iterations} iterations (total: {total_iterations})" + ) + + # Submit initial batch of evaluations + pending_futures = {} + batch_size = min(self.config.evaluator.parallel_evaluations * 2, max_iterations) + + for i in range(start_iteration, min(start_iteration + batch_size, total_iterations)): + future = self.thread_pool.submit_evaluation(i) + pending_futures[i] = future + + next_iteration_to_submit = start_iteration + batch_size + completed_iterations = 0 + + # Island management + programs_per_island = max(1, max_iterations // (self.config.database.num_islands * 10)) + current_island_counter = 0 + + # Process results as they complete + while pending_futures and completed_iterations < max_iterations and not self.shutdown_flag.is_set(): + # Find completed futures + completed_iteration = None + for iteration, future in list(pending_futures.items()): + if future.done(): + completed_iteration = iteration + break + + if completed_iteration is None: + # No results ready, wait a bit + await asyncio.sleep(0.01) + continue + + # Process completed result + future = pending_futures.pop(completed_iteration) + + try: + result = future.result() + + if result and hasattr(result, 'child_program') and result.child_program: + # Thread-safe database update + with self.database_lock: + self.database.add(result.child_program, iteration=completed_iteration) + + # Store artifacts if they exist + if result.artifacts: + self.database.store_artifacts(result.child_program.id, result.artifacts) + + # Log prompts + if hasattr(result, 'prompt') and result.prompt: + self.database.log_prompt( + template_key=( + "full_rewrite_user" if not self.config.diff_based_evolution + else "diff_user" + ), + program_id=result.child_program.id, + prompt=result.prompt, + responses=[result.llm_response] if hasattr(result, 'llm_response') else [], + ) + + # Manage island evolution + if completed_iteration > start_iteration and current_island_counter >= programs_per_island: + self.database.next_island() + current_island_counter = 0 + logger.debug(f"Switched to island {self.database.current_island}") + + current_island_counter += 1 + + # Increment generation for current island + self.database.increment_island_generation() + + # Check migration + if self.database.should_migrate(): + logger.info(f"Performing migration at iteration {completed_iteration}") + self.database.migrate_programs() + self.database.log_island_status() + + # Log progress (outside lock) + logger.info( + f"Iteration {completed_iteration}: " + f"Program {result.child_program.id} " + f"(parent: {result.parent.id if result.parent else 'None'}) " + f"completed in {result.iteration_time:.2f}s" + ) + + if result.child_program.metrics: + metrics_str = ", ".join([ + f"{k}={v:.4f}" if isinstance(v, (int, float)) else f"{k}={v}" + for k, v in result.child_program.metrics.items() + ]) + logger.info(f"Metrics: {metrics_str}") + + # Check for new best program + if self.database.best_program_id == result.child_program.id: + logger.info( + f"🌟 New best solution found at iteration {completed_iteration}: " + f"{result.child_program.id}" + ) + + # Save checkpoints at intervals + if completed_iteration % self.config.checkpoint_interval == 0: + logger.info(f"Checkpoint interval reached at iteration {completed_iteration}") + self.database.log_island_status() + if checkpoint_callback: + checkpoint_callback(completed_iteration) + + # Check target score + if target_score is not None and result.child_program.metrics: + numeric_metrics = [ + v for v in result.child_program.metrics.values() + if isinstance(v, (int, float)) + ] + if numeric_metrics: + avg_score = sum(numeric_metrics) / len(numeric_metrics) + if avg_score >= target_score: + logger.info( + f"Target score {target_score} reached after {completed_iteration} iterations" + ) + break + else: + logger.warning(f"No valid result from iteration {completed_iteration}") + + except Exception as e: + logger.error(f"Error processing result from iteration {completed_iteration}: {e}") + + completed_iterations += 1 + + # Submit next iteration if available + if next_iteration_to_submit < total_iterations: + future = self.thread_pool.submit_evaluation(next_iteration_to_submit) + pending_futures[next_iteration_to_submit] = future + next_iteration_to_submit += 1 + + # Handle shutdown or completion + if self.shutdown_flag.is_set(): + logger.info("Shutdown requested, canceling remaining evaluations...") + # Cancel remaining futures + for iteration, future in pending_futures.items(): + future.cancel() + logger.debug(f"Canceled iteration {iteration}") + else: + # Wait for any remaining futures if not shutting down + for iteration, future in pending_futures.items(): + try: + future.result(timeout=10.0) + except Exception as e: + logger.warning(f"Error waiting for iteration {iteration}: {e}") + + if self.shutdown_flag.is_set(): + logger.info("Evolution interrupted by shutdown") + else: + logger.info("Evolution completed") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1f4000375..94f30a35c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,6 @@ dependencies = [ "numpy>=1.22.0", "tqdm>=4.64.0", "flask", - "filelock>=3.18.0", ] [project.optional-dependencies] From 7dcfda89012e823f0cf9f32d6e73daf1c65d9efe Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Mon, 7 Jul 2025 22:32:01 +0800 Subject: [PATCH 11/11] Bump version to 0.0.12 Update project version from 0.0.11 to 0.0.12 in both pyproject.toml and setup.py for a new release. --- pyproject.toml | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 94f30a35c..b5de7e6b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openevolve" -version = "0.0.11" +version = "0.0.12" description = "Open-source implementation of AlphaEvolve" readme = "README.md" requires-python = ">=3.9" diff --git a/setup.py b/setup.py index e7ea0d5bb..2d13b91f2 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="openevolve", - version="0.0.11", + version="0.0.12", packages=find_packages(), include_package_data=True, )