From a09758251c90f64a224608ffff6a76dd02bec2bd Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 15:44:29 +0800 Subject: [PATCH 1/8] fix and add test --- openevolve/evaluator.py | 75 +++++- openevolve/utils/async_utils.py | 54 +++++ tests/test_evaluator_timeout.py | 398 ++++++++++++++++++++++++++++++++ 3 files changed, 517 insertions(+), 10 deletions(-) create mode 100644 tests/test_evaluator_timeout.py diff --git a/openevolve/evaluator.py b/openevolve/evaluator.py index 1af482bb2..6841b63d1 100644 --- a/openevolve/evaluator.py +++ b/openevolve/evaluator.py @@ -242,10 +242,9 @@ def get_pending_artifacts(self, program_id: str) -> Optional[Dict[str, Union[str """ return self._pending_artifacts.pop(program_id, None) - @run_in_executor - def _direct_evaluate(self, program_path: str) -> Dict[str, float]: + async def _direct_evaluate(self, program_path: str) -> Dict[str, float]: """ - Directly evaluate a program using the evaluation function + Directly evaluate a program using the evaluation function with timeout Args: program_path: Path to the program file @@ -254,8 +253,13 @@ def _direct_evaluate(self, program_path: str) -> Dict[str, float]: Dictionary of metric name to score """ try: + # Create a coroutine that runs the evaluation function in an executor + async def run_evaluation(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self.evaluate_function, program_path) + # Run the evaluation with timeout - result = self.evaluate_function(program_path) + result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout) # Validate result if not isinstance(result, dict): @@ -264,6 +268,9 @@ def _direct_evaluate(self, program_path: str) -> Dict[str, float]: return result + except asyncio.TimeoutError: + logger.warning(f"Evaluation timed out after {self.config.timeout}s") + return {"error": 0.0, "timeout": True} except Exception as e: logger.error(f"Error in direct evaluation: {str(e)}") return {"error": 0.0} @@ -299,10 +306,24 @@ async def _cascade_evaluate( if not hasattr(module, "evaluate_stage1"): return await self._direct_evaluate(program_path) - # Run first stage + # Run first stage with timeout try: - stage1_result = await run_in_executor(module.evaluate_stage1)(program_path) + + async def run_stage1(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage1, program_path) + + stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout) stage1_eval_result = self._process_evaluation_result(stage1_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 1 evaluation timed out after {self.config.timeout}s") + return EvaluationResult( + metrics={"stage1_passed": 0.0, "error": 0.0, "timeout": True}, + artifacts={ + "failure_stage": "stage1", + "timeout": True, + }, + ) except Exception as e: logger.error(f"Error in stage 1 evaluation: {str(e)}") # Capture stage 1 failure as artifacts @@ -325,10 +346,27 @@ async def _cascade_evaluate( if not hasattr(module, "evaluate_stage2"): return stage1_eval_result - # Run second stage + # Run second stage with timeout try: - stage2_result = await run_in_executor(module.evaluate_stage2)(program_path) + + async def run_stage2(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage2, program_path) + + stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout) stage2_eval_result = self._process_evaluation_result(stage2_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 2 evaluation timed out after {self.config.timeout}s") + # Capture stage 2 failure, but keep stage 1 results + stage1_eval_result.artifacts.update( + { + "stage2_timeout": True, + "failure_stage": "stage2", + } + ) + stage1_eval_result.metrics["stage2_passed"] = 0.0 + stage1_eval_result.metrics["timeout"] = True + return stage1_eval_result except Exception as e: logger.error(f"Error in stage 2 evaluation: {str(e)}") # Capture stage 2 failure, but keep stage 1 results @@ -370,10 +408,27 @@ async def _cascade_evaluate( if not hasattr(module, "evaluate_stage3"): return merged_result - # Run third stage + # Run third stage with timeout try: - stage3_result = await run_in_executor(module.evaluate_stage3)(program_path) + + async def run_stage3(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage3, program_path) + + stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout) stage3_eval_result = self._process_evaluation_result(stage3_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 3 evaluation timed out after {self.config.timeout}s") + # Capture stage 3 failure, but keep previous results + merged_result.artifacts.update( + { + "stage3_timeout": True, + "failure_stage": "stage3", + } + ) + merged_result.metrics["stage3_passed"] = 0.0 + merged_result.metrics["timeout"] = True + return merged_result except Exception as e: logger.error(f"Error in stage 3 evaluation: {str(e)}") # Capture stage 3 failure, but keep previous results diff --git a/openevolve/utils/async_utils.py b/openevolve/utils/async_utils.py index 872f0e5ee..aad758d28 100644 --- a/openevolve/utils/async_utils.py +++ b/openevolve/utils/async_utils.py @@ -32,6 +32,60 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper +async def run_with_timeout( + coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any +) -> Any: + """ + Run a coroutine with a timeout, returning a default value on timeout + + Args: + coro: Coroutine function to run + timeout: Timeout in seconds + *args: Arguments to pass to the coroutine + timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) + **kwargs: Keyword arguments to pass to the coroutine + + Returns: + Result of the coroutine or timeout_error_value on timeout + """ + if timeout_error_value is None: + timeout_error_value = {"error": 0.0, "timeout": True} + + try: + return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) + except asyncio.TimeoutError: + logger.warning(f"Operation timed out after {timeout}s") + return timeout_error_value + + +async def run_sync_with_timeout( + func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any +) -> Any: + """ + Run a synchronous function in an executor with a timeout + + Args: + func: Synchronous function to run + timeout: Timeout in seconds + *args: Arguments to pass to the function + timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) + **kwargs: Keyword arguments to pass to the function + + Returns: + Result of the function or timeout_error_value on timeout + """ + if timeout_error_value is None: + timeout_error_value = {"error": 0.0, "timeout": True} + + try: + loop = asyncio.get_event_loop() + task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) + return await asyncio.wait_for(task, timeout=timeout) + except asyncio.TimeoutError: + logger.warning(f"Sync operation timed out after {timeout}s") + return timeout_error_value + + async def gather_with_concurrency( n: int, *tasks: asyncio.Future, return_exceptions: bool = False ) -> List[Any]: diff --git a/tests/test_evaluator_timeout.py b/tests/test_evaluator_timeout.py new file mode 100644 index 000000000..f9872c3ae --- /dev/null +++ b/tests/test_evaluator_timeout.py @@ -0,0 +1,398 @@ +""" +Tests for evaluation timeout functionality in openevolve.evaluator +""" + +import asyncio +import os +import tempfile +import time +import unittest +from unittest.mock import patch, MagicMock + +from openevolve.config import EvaluatorConfig +from openevolve.evaluator import Evaluator + + +class TestEvaluatorTimeout(unittest.TestCase): + """Tests for evaluator timeout functionality""" + + def setUp(self): + """Set up test evaluator with timeout configuration""" + # Create a test evaluation file + self.test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + + # Write test evaluation functions + self.test_eval_file.write( + """ +import time + +def evaluate(program_path): + # Read the program to determine behavior + with open(program_path, 'r') as f: + code = f.read() + + if 'SLEEP_LONG' in code: + # Sleep for a long time to trigger timeout + time.sleep(30) + return {"score": 1.0} + elif 'SLEEP_SHORT' in code: + # Sleep for a short time that should not timeout + time.sleep(1) + return {"score": 0.8} + else: + # Fast evaluation + return {"score": 0.5} + +def evaluate_stage1(program_path): + with open(program_path, 'r') as f: + code = f.read() + + if 'STAGE1_TIMEOUT' in code: + time.sleep(30) + return {"stage1_score": 1.0} + else: + return {"stage1_score": 0.7} + +def evaluate_stage2(program_path): + with open(program_path, 'r') as f: + code = f.read() + + if 'STAGE2_TIMEOUT' in code: + time.sleep(30) + return {"stage2_score": 1.0} + else: + return {"stage2_score": 0.8} + +def evaluate_stage3(program_path): + with open(program_path, 'r') as f: + code = f.read() + + if 'STAGE3_TIMEOUT' in code: + time.sleep(30) + return {"stage3_score": 1.0} + else: + return {"stage3_score": 0.9} +""" + ) + self.test_eval_file.close() + + # Create config with short timeout for testing + self.config = EvaluatorConfig() + self.config.timeout = 5 # 5 second timeout for testing + self.config.max_retries = 1 # Minimal retries for faster testing + self.config.cascade_evaluation = False + self.config.cascade_thresholds = [0.5, 0.7, 0.9] + + # Create evaluator + self.evaluator = Evaluator( + config=self.config, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + def tearDown(self): + """Clean up test files""" + if os.path.exists(self.test_eval_file.name): + os.unlink(self.test_eval_file.name) + + def test_fast_evaluation_completes(self): + """Test that fast evaluations complete successfully""" + + async def run_test(): + program_code = "def test(): return 'fast'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_fast") + + elapsed_time = time.time() - start_time + + # Should complete quickly + self.assertLess(elapsed_time, 3.0) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.5) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + asyncio.run(run_test()) + + def test_short_evaluation_completes(self): + """Test that evaluations shorter than timeout complete successfully""" + + async def run_test(): + program_code = "# SLEEP_SHORT\ndef test(): return 'short'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_short") + + elapsed_time = time.time() - start_time + + # Should complete within timeout + self.assertLess(elapsed_time, self.config.timeout) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.8) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + asyncio.run(run_test()) + + def test_long_evaluation_times_out(self): + """Test that long evaluations time out properly""" + + async def run_test(): + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_long") + + elapsed_time = time.time() - start_time + + # Should complete around the timeout period (allowing some margin) + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should return timeout result + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage1(self): + """Test timeout in cascade evaluation stage 1""" + + async def run_test(): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage1") + + elapsed_time = time.time() - start_time + + # Should timeout around the configured timeout + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should return stage1 timeout result + self.assertIn("stage1_passed", result) + self.assertEqual(result["stage1_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage2(self): + """Test timeout in cascade evaluation stage 2""" + + async def run_test(): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage2") + + elapsed_time = time.time() - start_time + + # Should timeout on stage 2, but stage 1 should complete first + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should have stage1 result but stage2 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_passed", result) + self.assertEqual(result["stage2_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage3(self): + """Test timeout in cascade evaluation stage 3""" + + async def run_test(): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage3") + + elapsed_time = time.time() - start_time + + # Should timeout on stage 3, but stages 1 and 2 should complete first + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should have stage1 and stage2 results but stage3 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_score", result) + self.assertEqual(result["stage2_score"], 0.8) + self.assertIn("stage3_passed", result) + self.assertEqual(result["stage3_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_timeout_config_respected(self): + """Test that the timeout configuration value is actually used""" + + async def run_test(): + # Create evaluator with different timeout + config_10s = EvaluatorConfig() + config_10s.timeout = 10 # 10 second timeout + config_10s.max_retries = 1 + + evaluator_10s = Evaluator( + config=config_10s, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await evaluator_10s.evaluate_program(program_code, "test_config") + + elapsed_time = time.time() - start_time + + # Should timeout around 10 seconds, not 5 + self.assertGreater(elapsed_time, 9) + self.assertLess(elapsed_time, 13) + + # Should return timeout result + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_multiple_retries_with_timeout(self): + """Test that retries work correctly with timeout""" + + async def run_test(): + # Set more retries + self.evaluator.config.max_retries = 2 + + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_retries") + + elapsed_time = time.time() - start_time + + # Should timeout on each retry (3 total attempts) + # Each attempt should take ~5 seconds, so total should be ~15 seconds + # But since timeouts happen in parallel, it should be closer to 5 seconds per attempt + expected_time = self.config.timeout * (self.evaluator.config.max_retries + 1) + self.assertGreater(elapsed_time, expected_time - 2) + self.assertLess(elapsed_time, expected_time + 5) + + # Should return timeout result after all retries fail + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + + asyncio.run(run_test()) + + def test_artifacts_on_timeout(self): + """Test that timeout artifacts are properly captured""" + + async def run_test(): + # Enable artifacts + with patch.dict(os.environ, {"ENABLE_ARTIFACTS": "true"}): + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + + result = await self.evaluator.evaluate_program(program_code, "test_artifacts") + + # Should have timeout result + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + # Should have captured artifacts + artifacts = self.evaluator.get_pending_artifacts("test_artifacts") + self.assertIsNotNone(artifacts) + self.assertIn("failure_stage", artifacts) + + asyncio.run(run_test()) + + +class TestTimeoutIntegration(unittest.TestCase): + """Integration tests for timeout functionality""" + + def test_real_world_scenario(self): + """Test a scenario similar to the reported bug""" + + async def run_test(): + # Create a test evaluation file that simulates a long-running evaluation + test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + + test_eval_file.write( + """ +import time + +def evaluate(program_path): + # Simulate a very long evaluation (like the 11-hour case) + time.sleep(20) # 20 seconds to test timeout + return {"accReturn": 0.1, "CalmarRatio": 0.9, "combined_score": 0.82} +""" + ) + test_eval_file.close() + + try: + # Configure like user's config but with shorter timeout for testing + config = EvaluatorConfig() + config.timeout = 5 # 5 seconds instead of 600 + config.max_retries = 1 + config.cascade_evaluation = False + config.parallel_evaluations = 1 + + evaluator = Evaluator( + config=config, + evaluation_file=test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + program_code = """ +# Financial optimization algorithm +def search_algorithm(): + # This would normally run for hours + return {"report_type_factor_map": {}} +""" + + start_time = time.time() + result = await evaluator.evaluate_program(program_code, "financial_test") + elapsed_time = time.time() - start_time + + # Should timeout in ~5 seconds, not 20+ seconds + self.assertLess(elapsed_time, 8) + self.assertGreater(elapsed_time, 4) + + # Should return timeout error + self.assertIn("error", result) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + finally: + if os.path.exists(test_eval_file.name): + os.unlink(test_eval_file.name) + + asyncio.run(run_test()) + + +if __name__ == "__main__": + # Run with verbose output to see test progress + unittest.main(verbosity=2) From b483af9c874ad7ed738a4ccac0346616e0258d90 Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 16:04:17 +0800 Subject: [PATCH 2/8] fxies --- openevolve/evaluator.py | 65 +++- openevolve/utils/async_utils.py | 44 ++- tests/test_evaluator_timeout.py | 594 +++++++++++++++++--------------- 3 files changed, 399 insertions(+), 304 deletions(-) diff --git a/openevolve/evaluator.py b/openevolve/evaluator.py index 6841b63d1..44a4171b9 100644 --- a/openevolve/evaluator.py +++ b/openevolve/evaluator.py @@ -117,12 +117,21 @@ async def evaluate_program( # Retry logic for evaluation last_exception = None for attempt in range(self.config.max_retries + 1): - # Create a temporary file for the program - with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_file: - temp_file.write(program_code.encode("utf-8")) - temp_file_path = temp_file.name - + # Create a temporary file for the program - FIXED: proper file handling + temp_file_path = None try: + # Create temp file and write content with proper flushing + temp_fd, temp_file_path = tempfile.mkstemp(suffix=".py", text=True) + with os.fdopen(temp_fd, 'w') as temp_file: + temp_file.write(program_code) + temp_file.flush() # Ensure content is written to disk + os.fsync(temp_file.fileno()) # Force sync to disk + + # Verify file was written correctly (debug) + with open(temp_file_path, 'r') as verify_file: + written_content = verify_file.read() + logger.debug(f"Temp file content (first 100 chars): {written_content[:100]}") + # Run evaluation if self.config.cascade_evaluation: # Run cascade evaluation @@ -186,13 +195,26 @@ async def evaluate_program( ) traceback.print_exc() - # Capture failure artifacts if enabled + # Capture failure artifacts if enabled - FIXED: better artifact capture if artifacts_enabled and program_id: - self._pending_artifacts[program_id] = { + failure_artifacts = { "stderr": str(e), "traceback": traceback.format_exc(), "failure_stage": "evaluation", + "attempt": attempt + 1, + "timeout_config": self.config.timeout, } + + # Check if this was a timeout error + if isinstance(e, asyncio.TimeoutError) or "timeout" in str(e).lower(): + failure_artifacts["timeout"] = True + failure_artifacts["failure_stage"] = "timeout" + + # Store or update artifacts + if program_id in self._pending_artifacts: + self._pending_artifacts[program_id].update(failure_artifacts) + else: + self._pending_artifacts[program_id] = failure_artifacts # If this is not the last attempt, wait a bit before retrying if attempt < self.config.max_retries: @@ -200,14 +222,22 @@ async def evaluate_program( finally: # Clean up temporary file - if os.path.exists(temp_file_path): - os.unlink(temp_file_path) + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except OSError: + pass # Ignore cleanup errors - # All retries failed + # All retries failed - FIXED: better error return with timeout info logger.error( f"All evaluation attempts failed for program{program_id_str}. Last error: {str(last_exception)}" ) - return {"error": 0.0} + + # Check if the last exception was a timeout + if isinstance(last_exception, asyncio.TimeoutError): + return {"error": 0.0, "timeout": True} + else: + return {"error": 0.0} def _process_evaluation_result(self, result: Any) -> EvaluationResult: """ @@ -252,13 +282,19 @@ async def _direct_evaluate(self, program_path: str) -> Dict[str, float]: Returns: Dictionary of metric name to score """ + logger.debug(f"Starting direct evaluation with timeout={self.config.timeout}s") + try: # Create a coroutine that runs the evaluation function in an executor async def run_evaluation(): loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, self.evaluate_function, program_path) + logger.debug(f"Running evaluation function on {program_path}") + result = await loop.run_in_executor(None, self.evaluate_function, program_path) + logger.debug(f"Evaluation function returned: {result}") + return result # Run the evaluation with timeout + logger.debug(f"Waiting for evaluation with {self.config.timeout}s timeout") result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout) # Validate result @@ -266,6 +302,7 @@ async def run_evaluation(): logger.warning(f"Evaluation returned non-dictionary result: {result}") return {"error": 0.0} + logger.debug(f"Evaluation completed successfully: {result}") return result except asyncio.TimeoutError: @@ -273,6 +310,7 @@ async def run_evaluation(): return {"error": 0.0, "timeout": True} except Exception as e: logger.error(f"Error in direct evaluation: {str(e)}") + traceback.print_exc() return {"error": 0.0} async def _cascade_evaluate( @@ -308,7 +346,6 @@ async def _cascade_evaluate( # Run first stage with timeout try: - async def run_stage1(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, module.evaluate_stage1, program_path) @@ -348,7 +385,6 @@ async def run_stage1(): # Run second stage with timeout try: - async def run_stage2(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, module.evaluate_stage2, program_path) @@ -410,7 +446,6 @@ async def run_stage2(): # Run third stage with timeout try: - async def run_stage3(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, module.evaluate_stage3, program_path) diff --git a/openevolve/utils/async_utils.py b/openevolve/utils/async_utils.py index aad758d28..c3a9de35b 100644 --- a/openevolve/utils/async_utils.py +++ b/openevolve/utils/async_utils.py @@ -33,24 +33,28 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: async def run_with_timeout( - coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any + coro: Callable, + timeout: float, + *args: Any, + timeout_error_value: Any = None, + **kwargs: Any ) -> Any: """ Run a coroutine with a timeout, returning a default value on timeout - + Args: coro: Coroutine function to run timeout: Timeout in seconds *args: Arguments to pass to the coroutine timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the coroutine - + Returns: Result of the coroutine or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) except asyncio.TimeoutError: @@ -59,24 +63,28 @@ async def run_with_timeout( async def run_sync_with_timeout( - func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any + func: Callable, + timeout: float, + *args: Any, + timeout_error_value: Any = None, + **kwargs: Any ) -> Any: """ Run a synchronous function in an executor with a timeout - + Args: func: Synchronous function to run timeout: Timeout in seconds *args: Arguments to pass to the function timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the function - + Returns: Result of the function or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: loop = asyncio.get_event_loop() task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) @@ -170,9 +178,21 @@ class TaskPool: """ def __init__(self, max_concurrency: int = 10): - self.semaphore = asyncio.Semaphore(max_concurrency) + self.max_concurrency = max_concurrency + self._semaphore = None # Lazy initialization self.tasks: List[asyncio.Task] = [] + @property + def semaphore(self): + """Lazy-initialized semaphore that creates itself when first accessed""" + if self._semaphore is None: + try: + self._semaphore = asyncio.Semaphore(self.max_concurrency) + except RuntimeError: + # No event loop running, will be created later when needed + pass + return self._semaphore + async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any: """ Run a coroutine in the pool @@ -185,7 +205,11 @@ async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any: Returns: Result of the coroutine """ - async with self.semaphore: + # Ensure semaphore is created in the current event loop + if self._semaphore is None: + self._semaphore = asyncio.Semaphore(self.max_concurrency) + + async with self._semaphore: return await coro(*args, **kwargs) def create_task(self, coro: Callable, *args: Any, **kwargs: Any) -> asyncio.Task: diff --git a/tests/test_evaluator_timeout.py b/tests/test_evaluator_timeout.py index f9872c3ae..bf3c87558 100644 --- a/tests/test_evaluator_timeout.py +++ b/tests/test_evaluator_timeout.py @@ -7,73 +7,93 @@ import tempfile import time import unittest +import logging from unittest.mock import patch, MagicMock from openevolve.config import EvaluatorConfig from openevolve.evaluator import Evaluator +# Enable debug logging for tests +logging.basicConfig(level=logging.DEBUG) -class TestEvaluatorTimeout(unittest.TestCase): - """Tests for evaluator timeout functionality""" - def setUp(self): +class TestEvaluatorTimeout(unittest.IsolatedAsyncioTestCase): + """Tests for evaluator timeout functionality using proper async testing""" + + async def asyncSetUp(self): """Set up test evaluator with timeout configuration""" # Create a test evaluation file - self.test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) - - # Write test evaluation functions - self.test_eval_file.write( - """ + self.test_eval_file = tempfile.NamedTemporaryFile( + mode='w', suffix='.py', delete=False + ) + + # Write test evaluation functions with more explicit debugging + self.test_eval_file.write(""" import time +import sys def evaluate(program_path): + print(f"[DEBUG] Evaluation function called with: {program_path}", file=sys.stderr) + # Read the program to determine behavior - with open(program_path, 'r') as f: - code = f.read() + try: + with open(program_path, 'r') as f: + code = f.read() + print(f"[DEBUG] Read program code: {repr(code[:100])}", file=sys.stderr) + except Exception as e: + print(f"[DEBUG] Error reading program: {e}", file=sys.stderr) + return {"error": 1.0} if 'SLEEP_LONG' in code: - # Sleep for a long time to trigger timeout + print("[DEBUG] Found SLEEP_LONG marker, sleeping for 30 seconds...", file=sys.stderr) time.sleep(30) + print("[DEBUG] Sleep completed (this should not appear for timeout tests)", file=sys.stderr) return {"score": 1.0} elif 'SLEEP_SHORT' in code: - # Sleep for a short time that should not timeout + print("[DEBUG] Found SLEEP_SHORT marker, sleeping for 1 second...", file=sys.stderr) time.sleep(1) + print("[DEBUG] Short sleep completed", file=sys.stderr) return {"score": 0.8} else: - # Fast evaluation + print("[DEBUG] No sleep markers found, returning fast result", file=sys.stderr) return {"score": 0.5} def evaluate_stage1(program_path): + print(f"[DEBUG] Stage1 evaluation called with: {program_path}", file=sys.stderr) with open(program_path, 'r') as f: code = f.read() if 'STAGE1_TIMEOUT' in code: + print("[DEBUG] Stage1 timeout test, sleeping...", file=sys.stderr) time.sleep(30) return {"stage1_score": 1.0} else: return {"stage1_score": 0.7} def evaluate_stage2(program_path): + print(f"[DEBUG] Stage2 evaluation called with: {program_path}", file=sys.stderr) with open(program_path, 'r') as f: code = f.read() if 'STAGE2_TIMEOUT' in code: + print("[DEBUG] Stage2 timeout test, sleeping...", file=sys.stderr) time.sleep(30) return {"stage2_score": 1.0} else: return {"stage2_score": 0.8} def evaluate_stage3(program_path): + print(f"[DEBUG] Stage3 evaluation called with: {program_path}", file=sys.stderr) with open(program_path, 'r') as f: code = f.read() if 'STAGE3_TIMEOUT' in code: + print("[DEBUG] Stage3 timeout test, sleeping...", file=sys.stderr) time.sleep(30) return {"stage3_score": 1.0} else: return {"stage3_score": 0.9} -""" - ) +""") self.test_eval_file.close() # Create config with short timeout for testing @@ -91,306 +111,322 @@ def evaluate_stage3(program_path): prompt_sampler=None, ) - def tearDown(self): + async def asyncTearDown(self): """Clean up test files""" if os.path.exists(self.test_eval_file.name): os.unlink(self.test_eval_file.name) - def test_fast_evaluation_completes(self): + async def test_fast_evaluation_completes(self): """Test that fast evaluations complete successfully""" - - async def run_test(): - program_code = "def test(): return 'fast'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_fast") - - elapsed_time = time.time() - start_time - - # Should complete quickly - self.assertLess(elapsed_time, 3.0) - # Should return successful result - self.assertIn("score", result) - self.assertEqual(result["score"], 0.5) - # Should not have timeout or error flags - self.assertNotIn("timeout", result) - self.assertNotIn("error", result) - - asyncio.run(run_test()) - - def test_short_evaluation_completes(self): + program_code = "def test(): return 'fast'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_fast") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Fast evaluation took {elapsed_time:.3f}s, result: {result}") + + # Should complete quickly + self.assertLess(elapsed_time, 3.0) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.5) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + async def test_short_evaluation_completes(self): """Test that evaluations shorter than timeout complete successfully""" - - async def run_test(): - program_code = "# SLEEP_SHORT\ndef test(): return 'short'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_short") - - elapsed_time = time.time() - start_time - - # Should complete within timeout - self.assertLess(elapsed_time, self.config.timeout) - # Should return successful result - self.assertIn("score", result) - self.assertEqual(result["score"], 0.8) - # Should not have timeout or error flags - self.assertNotIn("timeout", result) - self.assertNotIn("error", result) - - asyncio.run(run_test()) - - def test_long_evaluation_times_out(self): + program_code = "# SLEEP_SHORT\ndef test(): return 'short'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_short") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Short evaluation took {elapsed_time:.3f}s, result: {result}") + + # Should complete within timeout but take at least 1 second + self.assertGreater(elapsed_time, 0.8) # At least the sleep time + self.assertLess(elapsed_time, self.config.timeout) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.8) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + async def test_long_evaluation_times_out(self): """Test that long evaluations time out properly""" - - async def run_test(): - program_code = "# SLEEP_LONG\ndef test(): return 'long'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_long") - - elapsed_time = time.time() - start_time - - # Should complete around the timeout period (allowing some margin) - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should return timeout result - self.assertIn("error", result) - self.assertEqual(result["error"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - asyncio.run(run_test()) - - def test_cascade_evaluation_timeout_stage1(self): + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_long") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Long evaluation took {elapsed_time:.3f}s, result: {result}") + + # Should complete around the timeout period (allowing some margin) + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should return timeout result + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + async def test_cascade_evaluation_timeout_stage1(self): """Test timeout in cascade evaluation stage 1""" - - async def run_test(): - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - - program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage1") - - elapsed_time = time.time() - start_time - - # Should timeout around the configured timeout - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should return stage1 timeout result - self.assertIn("stage1_passed", result) - self.assertEqual(result["stage1_passed"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - asyncio.run(run_test()) - - def test_cascade_evaluation_timeout_stage2(self): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage1") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Cascade stage1 took {elapsed_time:.3f}s, result: {result}") + + # Should timeout around the configured timeout + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should return stage1 timeout result + self.assertIn("stage1_passed", result) + self.assertEqual(result["stage1_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + async def test_cascade_evaluation_timeout_stage2(self): """Test timeout in cascade evaluation stage 2""" - - async def run_test(): - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - - program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage2") - - elapsed_time = time.time() - start_time - - # Should timeout on stage 2, but stage 1 should complete first - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should have stage1 result but stage2 timeout - self.assertIn("stage1_score", result) - self.assertEqual(result["stage1_score"], 0.7) - self.assertIn("stage2_passed", result) - self.assertEqual(result["stage2_passed"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - asyncio.run(run_test()) - - def test_cascade_evaluation_timeout_stage3(self): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage2") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Cascade stage2 took {elapsed_time:.3f}s, result: {result}") + + # Should timeout on stage 2, but stage 1 should complete first + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should have stage1 result but stage2 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_passed", result) + self.assertEqual(result["stage2_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + async def test_cascade_evaluation_timeout_stage3(self): """Test timeout in cascade evaluation stage 3""" - - async def run_test(): - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - - program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage3") - - elapsed_time = time.time() - start_time - - # Should timeout on stage 3, but stages 1 and 2 should complete first - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should have stage1 and stage2 results but stage3 timeout - self.assertIn("stage1_score", result) - self.assertEqual(result["stage1_score"], 0.7) - self.assertIn("stage2_score", result) - self.assertEqual(result["stage2_score"], 0.8) - self.assertIn("stage3_passed", result) - self.assertEqual(result["stage3_passed"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - asyncio.run(run_test()) - - def test_timeout_config_respected(self): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage3") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Cascade stage3 took {elapsed_time:.3f}s, result: {result}") + + # Should timeout on stage 3, but stages 1 and 2 should complete first + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should have stage1 and stage2 results but stage3 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_score", result) + self.assertEqual(result["stage2_score"], 0.8) + self.assertIn("stage3_passed", result) + self.assertEqual(result["stage3_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + async def test_timeout_config_respected(self): """Test that the timeout configuration value is actually used""" - - async def run_test(): - # Create evaluator with different timeout - config_10s = EvaluatorConfig() - config_10s.timeout = 10 # 10 second timeout - config_10s.max_retries = 1 - - evaluator_10s = Evaluator( - config=config_10s, - evaluation_file=self.test_eval_file.name, - llm_ensemble=None, - prompt_sampler=None, - ) - + # Create evaluator with different timeout + config_10s = EvaluatorConfig() + config_10s.timeout = 10 # 10 second timeout + config_10s.max_retries = 0 # No retries for cleaner test + + evaluator_10s = Evaluator( + config=config_10s, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await evaluator_10s.evaluate_program(program_code, "test_config") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Config test took {elapsed_time:.3f}s, result: {result}") + + # Should timeout around 10 seconds, not 5 + self.assertGreater(elapsed_time, 9) + self.assertLess(elapsed_time, 13) + + # Should return timeout result + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + async def test_multiple_retries_with_timeout(self): + """Test that retries work correctly with timeout""" + # Set more retries + self.evaluator.config.max_retries = 2 # 3 total attempts + + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_retries") + + elapsed_time = time.time() - start_time + + print(f"[TEST] Retry test took {elapsed_time:.3f}s, result: {result}") + + # Should timeout on each retry (3 total attempts) + # Each attempt should take ~5 seconds, so total should be ~15 seconds + expected_time = self.config.timeout * (self.evaluator.config.max_retries + 1) + self.assertGreater(elapsed_time, expected_time - 3) # Allow some margin + self.assertLess(elapsed_time, expected_time + 6) + + # Should return timeout result after all retries fail + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + # Should also have timeout flag since the last exception was TimeoutError + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + async def test_artifacts_on_timeout(self): + """Test that timeout artifacts are properly captured""" + # Enable artifacts + with patch.dict(os.environ, {'ENABLE_ARTIFACTS': 'true'}): program_code = "# SLEEP_LONG\ndef test(): return 'long'" - start_time = time.time() - - result = await evaluator_10s.evaluate_program(program_code, "test_config") - - elapsed_time = time.time() - start_time - - # Should timeout around 10 seconds, not 5 - self.assertGreater(elapsed_time, 9) - self.assertLess(elapsed_time, 13) - - # Should return timeout result + + result = await self.evaluator.evaluate_program(program_code, "test_artifacts") + + print(f"[TEST] Artifacts test result: {result}") + + # Should have timeout result self.assertIn("timeout", result) self.assertTrue(result["timeout"]) - - asyncio.run(run_test()) - - def test_multiple_retries_with_timeout(self): - """Test that retries work correctly with timeout""" - - async def run_test(): - # Set more retries - self.evaluator.config.max_retries = 2 - - program_code = "# SLEEP_LONG\ndef test(): return 'long'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_retries") - - elapsed_time = time.time() - start_time - - # Should timeout on each retry (3 total attempts) - # Each attempt should take ~5 seconds, so total should be ~15 seconds - # But since timeouts happen in parallel, it should be closer to 5 seconds per attempt - expected_time = self.config.timeout * (self.evaluator.config.max_retries + 1) - self.assertGreater(elapsed_time, expected_time - 2) - self.assertLess(elapsed_time, expected_time + 5) - - # Should return timeout result after all retries fail - self.assertIn("error", result) - self.assertEqual(result["error"], 0.0) - - asyncio.run(run_test()) - - def test_artifacts_on_timeout(self): - """Test that timeout artifacts are properly captured""" - - async def run_test(): - # Enable artifacts - with patch.dict(os.environ, {"ENABLE_ARTIFACTS": "true"}): - program_code = "# SLEEP_LONG\ndef test(): return 'long'" - - result = await self.evaluator.evaluate_program(program_code, "test_artifacts") - - # Should have timeout result - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - # Should have captured artifacts - artifacts = self.evaluator.get_pending_artifacts("test_artifacts") - self.assertIsNotNone(artifacts) - self.assertIn("failure_stage", artifacts) - - asyncio.run(run_test()) + + # Should have captured artifacts + artifacts = self.evaluator.get_pending_artifacts("test_artifacts") + print(f"[TEST] Captured artifacts: {artifacts}") + + self.assertIsNotNone(artifacts, "Artifacts should not be None") + self.assertIn("failure_stage", artifacts) + # Should have timeout-related information + self.assertTrue( + artifacts.get("timeout") is True or + "timeout" in artifacts.get("failure_stage", "").lower(), + f"Artifacts should indicate timeout: {artifacts}" + ) -class TestTimeoutIntegration(unittest.TestCase): +class TestTimeoutIntegration(unittest.IsolatedAsyncioTestCase): """Integration tests for timeout functionality""" - def test_real_world_scenario(self): + async def test_real_world_scenario(self): """Test a scenario similar to the reported bug""" - - async def run_test(): - # Create a test evaluation file that simulates a long-running evaluation - test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) - - test_eval_file.write( - """ + # Create a test evaluation file that simulates a long-running evaluation + test_eval_file = tempfile.NamedTemporaryFile( + mode='w', suffix='.py', delete=False + ) + + test_eval_file.write(""" import time +import sys def evaluate(program_path): + print("[DEBUG] Real-world scenario test evaluation starting", file=sys.stderr) # Simulate a very long evaluation (like the 11-hour case) time.sleep(20) # 20 seconds to test timeout + print("[DEBUG] This should not print in timeout test", file=sys.stderr) return {"accReturn": 0.1, "CalmarRatio": 0.9, "combined_score": 0.82} -""" +""") + test_eval_file.close() + + try: + # Configure like user's config but with shorter timeout for testing + config = EvaluatorConfig() + config.timeout = 5 # 5 seconds instead of 600 + config.max_retries = 0 # No retries for cleaner test + config.cascade_evaluation = False + config.parallel_evaluations = 1 + + evaluator = Evaluator( + config=config, + evaluation_file=test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, ) - test_eval_file.close() - - try: - # Configure like user's config but with shorter timeout for testing - config = EvaluatorConfig() - config.timeout = 5 # 5 seconds instead of 600 - config.max_retries = 1 - config.cascade_evaluation = False - config.parallel_evaluations = 1 - - evaluator = Evaluator( - config=config, - evaluation_file=test_eval_file.name, - llm_ensemble=None, - prompt_sampler=None, - ) - - program_code = """ + + program_code = """ # Financial optimization algorithm def search_algorithm(): # This would normally run for hours return {"report_type_factor_map": {}} """ - start_time = time.time() - result = await evaluator.evaluate_program(program_code, "financial_test") - elapsed_time = time.time() - start_time + start_time = time.time() + result = await evaluator.evaluate_program(program_code, "financial_test") + elapsed_time = time.time() - start_time - # Should timeout in ~5 seconds, not 20+ seconds - self.assertLess(elapsed_time, 8) - self.assertGreater(elapsed_time, 4) + print(f"[TEST] Integration test took {elapsed_time:.3f}s, result: {result}") - # Should return timeout error - self.assertIn("error", result) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) + # Should timeout in ~5 seconds, not 20+ seconds + self.assertLess(elapsed_time, 8) + self.assertGreater(elapsed_time, 4) + + # Should return timeout error + self.assertIn("error", result) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + finally: + if os.path.exists(test_eval_file.name): + os.unlink(test_eval_file.name) - finally: - if os.path.exists(test_eval_file.name): - os.unlink(test_eval_file.name) - asyncio.run(run_test()) +class TestBasicFunctionality(unittest.TestCase): + """Basic non-async tests that should work without event loop""" + + def test_config_loading(self): + """Test that evaluator config loads correctly""" + config = EvaluatorConfig() + config.timeout = 600 + config.max_retries = 3 + + self.assertEqual(config.timeout, 600) + self.assertEqual(config.max_retries, 3) + + def test_taskpool_creation(self): + """Test that TaskPool can be created without active event loop""" + from openevolve.utils.async_utils import TaskPool + + # This should not raise an error anymore + pool = TaskPool(max_concurrency=4) + self.assertEqual(pool.max_concurrency, 4) + self.assertIsNone(pool._semaphore) # Should be None until first use if __name__ == "__main__": From d38aab977a1eb179f3cd50cf4d5cfd5da659b8b9 Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 16:06:49 +0800 Subject: [PATCH 3/8] Revert "fxies" This reverts commit b483af9c874ad7ed738a4ccac0346616e0258d90. --- openevolve/evaluator.py | 65 +--- openevolve/utils/async_utils.py | 44 +-- tests/test_evaluator_timeout.py | 594 +++++++++++++++----------------- 3 files changed, 304 insertions(+), 399 deletions(-) diff --git a/openevolve/evaluator.py b/openevolve/evaluator.py index 44a4171b9..6841b63d1 100644 --- a/openevolve/evaluator.py +++ b/openevolve/evaluator.py @@ -117,21 +117,12 @@ async def evaluate_program( # Retry logic for evaluation last_exception = None for attempt in range(self.config.max_retries + 1): - # Create a temporary file for the program - FIXED: proper file handling - temp_file_path = None - try: - # Create temp file and write content with proper flushing - temp_fd, temp_file_path = tempfile.mkstemp(suffix=".py", text=True) - with os.fdopen(temp_fd, 'w') as temp_file: - temp_file.write(program_code) - temp_file.flush() # Ensure content is written to disk - os.fsync(temp_file.fileno()) # Force sync to disk - - # Verify file was written correctly (debug) - with open(temp_file_path, 'r') as verify_file: - written_content = verify_file.read() - logger.debug(f"Temp file content (first 100 chars): {written_content[:100]}") + # Create a temporary file for the program + with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_file: + temp_file.write(program_code.encode("utf-8")) + temp_file_path = temp_file.name + try: # Run evaluation if self.config.cascade_evaluation: # Run cascade evaluation @@ -195,26 +186,13 @@ async def evaluate_program( ) traceback.print_exc() - # Capture failure artifacts if enabled - FIXED: better artifact capture + # Capture failure artifacts if enabled if artifacts_enabled and program_id: - failure_artifacts = { + self._pending_artifacts[program_id] = { "stderr": str(e), "traceback": traceback.format_exc(), "failure_stage": "evaluation", - "attempt": attempt + 1, - "timeout_config": self.config.timeout, } - - # Check if this was a timeout error - if isinstance(e, asyncio.TimeoutError) or "timeout" in str(e).lower(): - failure_artifacts["timeout"] = True - failure_artifacts["failure_stage"] = "timeout" - - # Store or update artifacts - if program_id in self._pending_artifacts: - self._pending_artifacts[program_id].update(failure_artifacts) - else: - self._pending_artifacts[program_id] = failure_artifacts # If this is not the last attempt, wait a bit before retrying if attempt < self.config.max_retries: @@ -222,22 +200,14 @@ async def evaluate_program( finally: # Clean up temporary file - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - except OSError: - pass # Ignore cleanup errors + if os.path.exists(temp_file_path): + os.unlink(temp_file_path) - # All retries failed - FIXED: better error return with timeout info + # All retries failed logger.error( f"All evaluation attempts failed for program{program_id_str}. Last error: {str(last_exception)}" ) - - # Check if the last exception was a timeout - if isinstance(last_exception, asyncio.TimeoutError): - return {"error": 0.0, "timeout": True} - else: - return {"error": 0.0} + return {"error": 0.0} def _process_evaluation_result(self, result: Any) -> EvaluationResult: """ @@ -282,19 +252,13 @@ async def _direct_evaluate(self, program_path: str) -> Dict[str, float]: Returns: Dictionary of metric name to score """ - logger.debug(f"Starting direct evaluation with timeout={self.config.timeout}s") - try: # Create a coroutine that runs the evaluation function in an executor async def run_evaluation(): loop = asyncio.get_event_loop() - logger.debug(f"Running evaluation function on {program_path}") - result = await loop.run_in_executor(None, self.evaluate_function, program_path) - logger.debug(f"Evaluation function returned: {result}") - return result + return await loop.run_in_executor(None, self.evaluate_function, program_path) # Run the evaluation with timeout - logger.debug(f"Waiting for evaluation with {self.config.timeout}s timeout") result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout) # Validate result @@ -302,7 +266,6 @@ async def run_evaluation(): logger.warning(f"Evaluation returned non-dictionary result: {result}") return {"error": 0.0} - logger.debug(f"Evaluation completed successfully: {result}") return result except asyncio.TimeoutError: @@ -310,7 +273,6 @@ async def run_evaluation(): return {"error": 0.0, "timeout": True} except Exception as e: logger.error(f"Error in direct evaluation: {str(e)}") - traceback.print_exc() return {"error": 0.0} async def _cascade_evaluate( @@ -346,6 +308,7 @@ async def _cascade_evaluate( # Run first stage with timeout try: + async def run_stage1(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, module.evaluate_stage1, program_path) @@ -385,6 +348,7 @@ async def run_stage1(): # Run second stage with timeout try: + async def run_stage2(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, module.evaluate_stage2, program_path) @@ -446,6 +410,7 @@ async def run_stage2(): # Run third stage with timeout try: + async def run_stage3(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, module.evaluate_stage3, program_path) diff --git a/openevolve/utils/async_utils.py b/openevolve/utils/async_utils.py index c3a9de35b..aad758d28 100644 --- a/openevolve/utils/async_utils.py +++ b/openevolve/utils/async_utils.py @@ -33,28 +33,24 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: async def run_with_timeout( - coro: Callable, - timeout: float, - *args: Any, - timeout_error_value: Any = None, - **kwargs: Any + coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any ) -> Any: """ Run a coroutine with a timeout, returning a default value on timeout - + Args: coro: Coroutine function to run timeout: Timeout in seconds *args: Arguments to pass to the coroutine timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the coroutine - + Returns: Result of the coroutine or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) except asyncio.TimeoutError: @@ -63,28 +59,24 @@ async def run_with_timeout( async def run_sync_with_timeout( - func: Callable, - timeout: float, - *args: Any, - timeout_error_value: Any = None, - **kwargs: Any + func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any ) -> Any: """ Run a synchronous function in an executor with a timeout - + Args: func: Synchronous function to run timeout: Timeout in seconds *args: Arguments to pass to the function timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the function - + Returns: Result of the function or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: loop = asyncio.get_event_loop() task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) @@ -178,21 +170,9 @@ class TaskPool: """ def __init__(self, max_concurrency: int = 10): - self.max_concurrency = max_concurrency - self._semaphore = None # Lazy initialization + self.semaphore = asyncio.Semaphore(max_concurrency) self.tasks: List[asyncio.Task] = [] - @property - def semaphore(self): - """Lazy-initialized semaphore that creates itself when first accessed""" - if self._semaphore is None: - try: - self._semaphore = asyncio.Semaphore(self.max_concurrency) - except RuntimeError: - # No event loop running, will be created later when needed - pass - return self._semaphore - async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any: """ Run a coroutine in the pool @@ -205,11 +185,7 @@ async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any: Returns: Result of the coroutine """ - # Ensure semaphore is created in the current event loop - if self._semaphore is None: - self._semaphore = asyncio.Semaphore(self.max_concurrency) - - async with self._semaphore: + async with self.semaphore: return await coro(*args, **kwargs) def create_task(self, coro: Callable, *args: Any, **kwargs: Any) -> asyncio.Task: diff --git a/tests/test_evaluator_timeout.py b/tests/test_evaluator_timeout.py index bf3c87558..f9872c3ae 100644 --- a/tests/test_evaluator_timeout.py +++ b/tests/test_evaluator_timeout.py @@ -7,93 +7,73 @@ import tempfile import time import unittest -import logging from unittest.mock import patch, MagicMock from openevolve.config import EvaluatorConfig from openevolve.evaluator import Evaluator -# Enable debug logging for tests -logging.basicConfig(level=logging.DEBUG) +class TestEvaluatorTimeout(unittest.TestCase): + """Tests for evaluator timeout functionality""" -class TestEvaluatorTimeout(unittest.IsolatedAsyncioTestCase): - """Tests for evaluator timeout functionality using proper async testing""" - - async def asyncSetUp(self): + def setUp(self): """Set up test evaluator with timeout configuration""" # Create a test evaluation file - self.test_eval_file = tempfile.NamedTemporaryFile( - mode='w', suffix='.py', delete=False - ) - - # Write test evaluation functions with more explicit debugging - self.test_eval_file.write(""" + self.test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + + # Write test evaluation functions + self.test_eval_file.write( + """ import time -import sys def evaluate(program_path): - print(f"[DEBUG] Evaluation function called with: {program_path}", file=sys.stderr) - # Read the program to determine behavior - try: - with open(program_path, 'r') as f: - code = f.read() - print(f"[DEBUG] Read program code: {repr(code[:100])}", file=sys.stderr) - except Exception as e: - print(f"[DEBUG] Error reading program: {e}", file=sys.stderr) - return {"error": 1.0} + with open(program_path, 'r') as f: + code = f.read() if 'SLEEP_LONG' in code: - print("[DEBUG] Found SLEEP_LONG marker, sleeping for 30 seconds...", file=sys.stderr) + # Sleep for a long time to trigger timeout time.sleep(30) - print("[DEBUG] Sleep completed (this should not appear for timeout tests)", file=sys.stderr) return {"score": 1.0} elif 'SLEEP_SHORT' in code: - print("[DEBUG] Found SLEEP_SHORT marker, sleeping for 1 second...", file=sys.stderr) + # Sleep for a short time that should not timeout time.sleep(1) - print("[DEBUG] Short sleep completed", file=sys.stderr) return {"score": 0.8} else: - print("[DEBUG] No sleep markers found, returning fast result", file=sys.stderr) + # Fast evaluation return {"score": 0.5} def evaluate_stage1(program_path): - print(f"[DEBUG] Stage1 evaluation called with: {program_path}", file=sys.stderr) with open(program_path, 'r') as f: code = f.read() if 'STAGE1_TIMEOUT' in code: - print("[DEBUG] Stage1 timeout test, sleeping...", file=sys.stderr) time.sleep(30) return {"stage1_score": 1.0} else: return {"stage1_score": 0.7} def evaluate_stage2(program_path): - print(f"[DEBUG] Stage2 evaluation called with: {program_path}", file=sys.stderr) with open(program_path, 'r') as f: code = f.read() if 'STAGE2_TIMEOUT' in code: - print("[DEBUG] Stage2 timeout test, sleeping...", file=sys.stderr) time.sleep(30) return {"stage2_score": 1.0} else: return {"stage2_score": 0.8} def evaluate_stage3(program_path): - print(f"[DEBUG] Stage3 evaluation called with: {program_path}", file=sys.stderr) with open(program_path, 'r') as f: code = f.read() if 'STAGE3_TIMEOUT' in code: - print("[DEBUG] Stage3 timeout test, sleeping...", file=sys.stderr) time.sleep(30) return {"stage3_score": 1.0} else: return {"stage3_score": 0.9} -""") +""" + ) self.test_eval_file.close() # Create config with short timeout for testing @@ -111,322 +91,306 @@ def evaluate_stage3(program_path): prompt_sampler=None, ) - async def asyncTearDown(self): + def tearDown(self): """Clean up test files""" if os.path.exists(self.test_eval_file.name): os.unlink(self.test_eval_file.name) - async def test_fast_evaluation_completes(self): + def test_fast_evaluation_completes(self): """Test that fast evaluations complete successfully""" - program_code = "def test(): return 'fast'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_fast") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Fast evaluation took {elapsed_time:.3f}s, result: {result}") - - # Should complete quickly - self.assertLess(elapsed_time, 3.0) - # Should return successful result - self.assertIn("score", result) - self.assertEqual(result["score"], 0.5) - # Should not have timeout or error flags - self.assertNotIn("timeout", result) - self.assertNotIn("error", result) - - async def test_short_evaluation_completes(self): + + async def run_test(): + program_code = "def test(): return 'fast'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_fast") + + elapsed_time = time.time() - start_time + + # Should complete quickly + self.assertLess(elapsed_time, 3.0) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.5) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + asyncio.run(run_test()) + + def test_short_evaluation_completes(self): """Test that evaluations shorter than timeout complete successfully""" - program_code = "# SLEEP_SHORT\ndef test(): return 'short'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_short") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Short evaluation took {elapsed_time:.3f}s, result: {result}") - - # Should complete within timeout but take at least 1 second - self.assertGreater(elapsed_time, 0.8) # At least the sleep time - self.assertLess(elapsed_time, self.config.timeout) - # Should return successful result - self.assertIn("score", result) - self.assertEqual(result["score"], 0.8) - # Should not have timeout or error flags - self.assertNotIn("timeout", result) - self.assertNotIn("error", result) - - async def test_long_evaluation_times_out(self): + + async def run_test(): + program_code = "# SLEEP_SHORT\ndef test(): return 'short'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_short") + + elapsed_time = time.time() - start_time + + # Should complete within timeout + self.assertLess(elapsed_time, self.config.timeout) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.8) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + asyncio.run(run_test()) + + def test_long_evaluation_times_out(self): """Test that long evaluations time out properly""" - program_code = "# SLEEP_LONG\ndef test(): return 'long'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_long") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Long evaluation took {elapsed_time:.3f}s, result: {result}") - - # Should complete around the timeout period (allowing some margin) - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should return timeout result - self.assertIn("error", result) - self.assertEqual(result["error"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - async def test_cascade_evaluation_timeout_stage1(self): + + async def run_test(): + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_long") + + elapsed_time = time.time() - start_time + + # Should complete around the timeout period (allowing some margin) + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should return timeout result + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage1(self): """Test timeout in cascade evaluation stage 1""" - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - - program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage1") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Cascade stage1 took {elapsed_time:.3f}s, result: {result}") - - # Should timeout around the configured timeout - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should return stage1 timeout result - self.assertIn("stage1_passed", result) - self.assertEqual(result["stage1_passed"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - async def test_cascade_evaluation_timeout_stage2(self): + + async def run_test(): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage1") + + elapsed_time = time.time() - start_time + + # Should timeout around the configured timeout + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should return stage1 timeout result + self.assertIn("stage1_passed", result) + self.assertEqual(result["stage1_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage2(self): """Test timeout in cascade evaluation stage 2""" - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - - program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage2") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Cascade stage2 took {elapsed_time:.3f}s, result: {result}") - - # Should timeout on stage 2, but stage 1 should complete first - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should have stage1 result but stage2 timeout - self.assertIn("stage1_score", result) - self.assertEqual(result["stage1_score"], 0.7) - self.assertIn("stage2_passed", result) - self.assertEqual(result["stage2_passed"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - async def test_cascade_evaluation_timeout_stage3(self): + + async def run_test(): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage2") + + elapsed_time = time.time() - start_time + + # Should timeout on stage 2, but stage 1 should complete first + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should have stage1 result but stage2 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_passed", result) + self.assertEqual(result["stage2_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage3(self): """Test timeout in cascade evaluation stage 3""" - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - - program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage3") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Cascade stage3 took {elapsed_time:.3f}s, result: {result}") - - # Should timeout on stage 3, but stages 1 and 2 should complete first - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - - # Should have stage1 and stage2 results but stage3 timeout - self.assertIn("stage1_score", result) - self.assertEqual(result["stage1_score"], 0.7) - self.assertIn("stage2_score", result) - self.assertEqual(result["stage2_score"], 0.8) - self.assertIn("stage3_passed", result) - self.assertEqual(result["stage3_passed"], 0.0) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - async def test_timeout_config_respected(self): + + async def run_test(): + # Enable cascade evaluation + self.evaluator.config.cascade_evaluation = True + + program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage3") + + elapsed_time = time.time() - start_time + + # Should timeout on stage 3, but stages 1 and 2 should complete first + self.assertGreater(elapsed_time, self.config.timeout - 1) + self.assertLess(elapsed_time, self.config.timeout + 3) + + # Should have stage1 and stage2 results but stage3 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_score", result) + self.assertEqual(result["stage2_score"], 0.8) + self.assertIn("stage3_passed", result) + self.assertEqual(result["stage3_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_timeout_config_respected(self): """Test that the timeout configuration value is actually used""" - # Create evaluator with different timeout - config_10s = EvaluatorConfig() - config_10s.timeout = 10 # 10 second timeout - config_10s.max_retries = 0 # No retries for cleaner test - - evaluator_10s = Evaluator( - config=config_10s, - evaluation_file=self.test_eval_file.name, - llm_ensemble=None, - prompt_sampler=None, - ) - - program_code = "# SLEEP_LONG\ndef test(): return 'long'" - start_time = time.time() - - result = await evaluator_10s.evaluate_program(program_code, "test_config") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Config test took {elapsed_time:.3f}s, result: {result}") - - # Should timeout around 10 seconds, not 5 - self.assertGreater(elapsed_time, 9) - self.assertLess(elapsed_time, 13) - - # Should return timeout result - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - async def test_multiple_retries_with_timeout(self): - """Test that retries work correctly with timeout""" - # Set more retries - self.evaluator.config.max_retries = 2 # 3 total attempts - - program_code = "# SLEEP_LONG\ndef test(): return 'long'" - start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_retries") - - elapsed_time = time.time() - start_time - - print(f"[TEST] Retry test took {elapsed_time:.3f}s, result: {result}") - - # Should timeout on each retry (3 total attempts) - # Each attempt should take ~5 seconds, so total should be ~15 seconds - expected_time = self.config.timeout * (self.evaluator.config.max_retries + 1) - self.assertGreater(elapsed_time, expected_time - 3) # Allow some margin - self.assertLess(elapsed_time, expected_time + 6) - - # Should return timeout result after all retries fail - self.assertIn("error", result) - self.assertEqual(result["error"], 0.0) - # Should also have timeout flag since the last exception was TimeoutError - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - async def test_artifacts_on_timeout(self): - """Test that timeout artifacts are properly captured""" - # Enable artifacts - with patch.dict(os.environ, {'ENABLE_ARTIFACTS': 'true'}): + + async def run_test(): + # Create evaluator with different timeout + config_10s = EvaluatorConfig() + config_10s.timeout = 10 # 10 second timeout + config_10s.max_retries = 1 + + evaluator_10s = Evaluator( + config=config_10s, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + program_code = "# SLEEP_LONG\ndef test(): return 'long'" - - result = await self.evaluator.evaluate_program(program_code, "test_artifacts") - - print(f"[TEST] Artifacts test result: {result}") - - # Should have timeout result + start_time = time.time() + + result = await evaluator_10s.evaluate_program(program_code, "test_config") + + elapsed_time = time.time() - start_time + + # Should timeout around 10 seconds, not 5 + self.assertGreater(elapsed_time, 9) + self.assertLess(elapsed_time, 13) + + # Should return timeout result self.assertIn("timeout", result) self.assertTrue(result["timeout"]) - - # Should have captured artifacts - artifacts = self.evaluator.get_pending_artifacts("test_artifacts") - print(f"[TEST] Captured artifacts: {artifacts}") - - self.assertIsNotNone(artifacts, "Artifacts should not be None") - self.assertIn("failure_stage", artifacts) - # Should have timeout-related information - self.assertTrue( - artifacts.get("timeout") is True or - "timeout" in artifacts.get("failure_stage", "").lower(), - f"Artifacts should indicate timeout: {artifacts}" - ) + asyncio.run(run_test()) -class TestTimeoutIntegration(unittest.IsolatedAsyncioTestCase): + def test_multiple_retries_with_timeout(self): + """Test that retries work correctly with timeout""" + + async def run_test(): + # Set more retries + self.evaluator.config.max_retries = 2 + + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await self.evaluator.evaluate_program(program_code, "test_retries") + + elapsed_time = time.time() - start_time + + # Should timeout on each retry (3 total attempts) + # Each attempt should take ~5 seconds, so total should be ~15 seconds + # But since timeouts happen in parallel, it should be closer to 5 seconds per attempt + expected_time = self.config.timeout * (self.evaluator.config.max_retries + 1) + self.assertGreater(elapsed_time, expected_time - 2) + self.assertLess(elapsed_time, expected_time + 5) + + # Should return timeout result after all retries fail + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + + asyncio.run(run_test()) + + def test_artifacts_on_timeout(self): + """Test that timeout artifacts are properly captured""" + + async def run_test(): + # Enable artifacts + with patch.dict(os.environ, {"ENABLE_ARTIFACTS": "true"}): + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + + result = await self.evaluator.evaluate_program(program_code, "test_artifacts") + + # Should have timeout result + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + # Should have captured artifacts + artifacts = self.evaluator.get_pending_artifacts("test_artifacts") + self.assertIsNotNone(artifacts) + self.assertIn("failure_stage", artifacts) + + asyncio.run(run_test()) + + +class TestTimeoutIntegration(unittest.TestCase): """Integration tests for timeout functionality""" - async def test_real_world_scenario(self): + def test_real_world_scenario(self): """Test a scenario similar to the reported bug""" - # Create a test evaluation file that simulates a long-running evaluation - test_eval_file = tempfile.NamedTemporaryFile( - mode='w', suffix='.py', delete=False - ) - - test_eval_file.write(""" + + async def run_test(): + # Create a test evaluation file that simulates a long-running evaluation + test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + + test_eval_file.write( + """ import time -import sys def evaluate(program_path): - print("[DEBUG] Real-world scenario test evaluation starting", file=sys.stderr) # Simulate a very long evaluation (like the 11-hour case) time.sleep(20) # 20 seconds to test timeout - print("[DEBUG] This should not print in timeout test", file=sys.stderr) return {"accReturn": 0.1, "CalmarRatio": 0.9, "combined_score": 0.82} -""") - test_eval_file.close() - - try: - # Configure like user's config but with shorter timeout for testing - config = EvaluatorConfig() - config.timeout = 5 # 5 seconds instead of 600 - config.max_retries = 0 # No retries for cleaner test - config.cascade_evaluation = False - config.parallel_evaluations = 1 - - evaluator = Evaluator( - config=config, - evaluation_file=test_eval_file.name, - llm_ensemble=None, - prompt_sampler=None, +""" ) - - program_code = """ + test_eval_file.close() + + try: + # Configure like user's config but with shorter timeout for testing + config = EvaluatorConfig() + config.timeout = 5 # 5 seconds instead of 600 + config.max_retries = 1 + config.cascade_evaluation = False + config.parallel_evaluations = 1 + + evaluator = Evaluator( + config=config, + evaluation_file=test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + program_code = """ # Financial optimization algorithm def search_algorithm(): # This would normally run for hours return {"report_type_factor_map": {}} """ - start_time = time.time() - result = await evaluator.evaluate_program(program_code, "financial_test") - elapsed_time = time.time() - start_time + start_time = time.time() + result = await evaluator.evaluate_program(program_code, "financial_test") + elapsed_time = time.time() - start_time - print(f"[TEST] Integration test took {elapsed_time:.3f}s, result: {result}") + # Should timeout in ~5 seconds, not 20+ seconds + self.assertLess(elapsed_time, 8) + self.assertGreater(elapsed_time, 4) - # Should timeout in ~5 seconds, not 20+ seconds - self.assertLess(elapsed_time, 8) - self.assertGreater(elapsed_time, 4) - - # Should return timeout error - self.assertIn("error", result) - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - finally: - if os.path.exists(test_eval_file.name): - os.unlink(test_eval_file.name) + # Should return timeout error + self.assertIn("error", result) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + finally: + if os.path.exists(test_eval_file.name): + os.unlink(test_eval_file.name) -class TestBasicFunctionality(unittest.TestCase): - """Basic non-async tests that should work without event loop""" - - def test_config_loading(self): - """Test that evaluator config loads correctly""" - config = EvaluatorConfig() - config.timeout = 600 - config.max_retries = 3 - - self.assertEqual(config.timeout, 600) - self.assertEqual(config.max_retries, 3) - - def test_taskpool_creation(self): - """Test that TaskPool can be created without active event loop""" - from openevolve.utils.async_utils import TaskPool - - # This should not raise an error anymore - pool = TaskPool(max_concurrency=4) - self.assertEqual(pool.max_concurrency, 4) - self.assertIsNone(pool._semaphore) # Should be None until first use + asyncio.run(run_test()) if __name__ == "__main__": From 7355f2fc23834b4fcf0f8bd7d4fadb281cc26f0d Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 16:23:16 +0800 Subject: [PATCH 4/8] d --- openevolve/utils/async_utils.py | 34 +++-- tests/test_evaluator_timeout.py | 224 +++++++++++++++----------------- 2 files changed, 131 insertions(+), 127 deletions(-) diff --git a/openevolve/utils/async_utils.py b/openevolve/utils/async_utils.py index aad758d28..245889536 100644 --- a/openevolve/utils/async_utils.py +++ b/openevolve/utils/async_utils.py @@ -33,24 +33,28 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: async def run_with_timeout( - coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any + coro: Callable, + timeout: float, + *args: Any, + timeout_error_value: Any = None, + **kwargs: Any ) -> Any: """ Run a coroutine with a timeout, returning a default value on timeout - + Args: coro: Coroutine function to run timeout: Timeout in seconds *args: Arguments to pass to the coroutine timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the coroutine - + Returns: Result of the coroutine or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) except asyncio.TimeoutError: @@ -59,24 +63,28 @@ async def run_with_timeout( async def run_sync_with_timeout( - func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any + func: Callable, + timeout: float, + *args: Any, + timeout_error_value: Any = None, + **kwargs: Any ) -> Any: """ Run a synchronous function in an executor with a timeout - + Args: func: Synchronous function to run timeout: Timeout in seconds *args: Arguments to pass to the function timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the function - + Returns: Result of the function or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: loop = asyncio.get_event_loop() task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) @@ -170,9 +178,17 @@ class TaskPool: """ def __init__(self, max_concurrency: int = 10): - self.semaphore = asyncio.Semaphore(max_concurrency) + self.max_concurrency = max_concurrency + self._semaphore: Optional[asyncio.Semaphore] = None self.tasks: List[asyncio.Task] = [] + @property + def semaphore(self) -> asyncio.Semaphore: + """Lazy-initialize the semaphore when first needed""" + if self._semaphore is None: + self._semaphore = asyncio.Semaphore(self.max_concurrency) + return self._semaphore + async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any: """ Run a coroutine in the pool diff --git a/tests/test_evaluator_timeout.py b/tests/test_evaluator_timeout.py index f9872c3ae..437569dd5 100644 --- a/tests/test_evaluator_timeout.py +++ b/tests/test_evaluator_timeout.py @@ -17,13 +17,14 @@ class TestEvaluatorTimeout(unittest.TestCase): """Tests for evaluator timeout functionality""" def setUp(self): - """Set up test evaluator with timeout configuration""" + """Set up test evaluation file""" # Create a test evaluation file - self.test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) - + self.test_eval_file = tempfile.NamedTemporaryFile( + mode='w', suffix='.py', delete=False + ) + # Write test evaluation functions - self.test_eval_file.write( - """ + self.test_eval_file.write(""" import time def evaluate(program_path): @@ -72,41 +73,40 @@ def evaluate_stage3(program_path): return {"stage3_score": 1.0} else: return {"stage3_score": 0.9} -""" - ) +""") self.test_eval_file.close() - # Create config with short timeout for testing - self.config = EvaluatorConfig() - self.config.timeout = 5 # 5 second timeout for testing - self.config.max_retries = 1 # Minimal retries for faster testing - self.config.cascade_evaluation = False - self.config.cascade_thresholds = [0.5, 0.7, 0.9] + def tearDown(self): + """Clean up test files""" + if os.path.exists(self.test_eval_file.name): + os.unlink(self.test_eval_file.name) + + def _create_evaluator(self, timeout=5, cascade_evaluation=False): + """Helper to create evaluator with given settings""" + config = EvaluatorConfig() + config.timeout = timeout + config.max_retries = 1 # Minimal retries for faster testing + config.cascade_evaluation = cascade_evaluation + config.cascade_thresholds = [0.5, 0.7, 0.9] - # Create evaluator - self.evaluator = Evaluator( - config=self.config, + return Evaluator( + config=config, evaluation_file=self.test_eval_file.name, llm_ensemble=None, prompt_sampler=None, ) - def tearDown(self): - """Clean up test files""" - if os.path.exists(self.test_eval_file.name): - os.unlink(self.test_eval_file.name) - def test_fast_evaluation_completes(self): """Test that fast evaluations complete successfully""" - async def run_test(): + evaluator = self._create_evaluator(timeout=5) program_code = "def test(): return 'fast'" start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_fast") - + + result = await evaluator.evaluate_program(program_code, "test_fast") + elapsed_time = time.time() - start_time - + # Should complete quickly self.assertLess(elapsed_time, 3.0) # Should return successful result @@ -120,17 +120,17 @@ async def run_test(): def test_short_evaluation_completes(self): """Test that evaluations shorter than timeout complete successfully""" - async def run_test(): + evaluator = self._create_evaluator(timeout=5) program_code = "# SLEEP_SHORT\ndef test(): return 'short'" start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_short") - + + result = await evaluator.evaluate_program(program_code, "test_short") + elapsed_time = time.time() - start_time - + # Should complete within timeout - self.assertLess(elapsed_time, self.config.timeout) + self.assertLess(elapsed_time, 5) # Should return successful result self.assertIn("score", result) self.assertEqual(result["score"], 0.8) @@ -142,19 +142,19 @@ async def run_test(): def test_long_evaluation_times_out(self): """Test that long evaluations time out properly""" - async def run_test(): + evaluator = self._create_evaluator(timeout=5) program_code = "# SLEEP_LONG\ndef test(): return 'long'" start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_long") - + + result = await evaluator.evaluate_program(program_code, "test_long") + elapsed_time = time.time() - start_time - + # Should complete around the timeout period (allowing some margin) - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - + self.assertGreater(elapsed_time, 4) + self.assertLess(elapsed_time, 8) + # Should return timeout result self.assertIn("error", result) self.assertEqual(result["error"], 0.0) @@ -165,22 +165,19 @@ async def run_test(): def test_cascade_evaluation_timeout_stage1(self): """Test timeout in cascade evaluation stage 1""" - async def run_test(): - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - + evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage1") - + + result = await evaluator.evaluate_program(program_code, "test_cascade_stage1") + elapsed_time = time.time() - start_time - + # Should timeout around the configured timeout - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - + self.assertGreater(elapsed_time, 4) + self.assertLess(elapsed_time, 8) + # Should return stage1 timeout result self.assertIn("stage1_passed", result) self.assertEqual(result["stage1_passed"], 0.0) @@ -191,22 +188,19 @@ async def run_test(): def test_cascade_evaluation_timeout_stage2(self): """Test timeout in cascade evaluation stage 2""" - async def run_test(): - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - + evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage2") - + + result = await evaluator.evaluate_program(program_code, "test_cascade_stage2") + elapsed_time = time.time() - start_time - + # Should timeout on stage 2, but stage 1 should complete first - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - + self.assertGreater(elapsed_time, 4) + self.assertLess(elapsed_time, 8) + # Should have stage1 result but stage2 timeout self.assertIn("stage1_score", result) self.assertEqual(result["stage1_score"], 0.7) @@ -219,22 +213,19 @@ async def run_test(): def test_cascade_evaluation_timeout_stage3(self): """Test timeout in cascade evaluation stage 3""" - async def run_test(): - # Enable cascade evaluation - self.evaluator.config.cascade_evaluation = True - + evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_cascade_stage3") - + + result = await evaluator.evaluate_program(program_code, "test_cascade_stage3") + elapsed_time = time.time() - start_time - + # Should timeout on stage 3, but stages 1 and 2 should complete first - self.assertGreater(elapsed_time, self.config.timeout - 1) - self.assertLess(elapsed_time, self.config.timeout + 3) - + self.assertGreater(elapsed_time, 4) + self.assertLess(elapsed_time, 8) + # Should have stage1 and stage2 results but stage3 timeout self.assertIn("stage1_score", result) self.assertEqual(result["stage1_score"], 0.7) @@ -249,31 +240,21 @@ async def run_test(): def test_timeout_config_respected(self): """Test that the timeout configuration value is actually used""" - async def run_test(): # Create evaluator with different timeout - config_10s = EvaluatorConfig() - config_10s.timeout = 10 # 10 second timeout - config_10s.max_retries = 1 - - evaluator_10s = Evaluator( - config=config_10s, - evaluation_file=self.test_eval_file.name, - llm_ensemble=None, - prompt_sampler=None, - ) - + evaluator = self._create_evaluator(timeout=10) + program_code = "# SLEEP_LONG\ndef test(): return 'long'" start_time = time.time() - - result = await evaluator_10s.evaluate_program(program_code, "test_config") - + + result = await evaluator.evaluate_program(program_code, "test_config") + elapsed_time = time.time() - start_time - + # Should timeout around 10 seconds, not 5 self.assertGreater(elapsed_time, 9) self.assertLess(elapsed_time, 13) - + # Should return timeout result self.assertIn("timeout", result) self.assertTrue(result["timeout"]) @@ -282,25 +263,33 @@ async def run_test(): def test_multiple_retries_with_timeout(self): """Test that retries work correctly with timeout""" - async def run_test(): - # Set more retries - self.evaluator.config.max_retries = 2 - + # Create evaluator with more retries + config = EvaluatorConfig() + config.timeout = 5 + config.max_retries = 2 # 3 total attempts + config.cascade_evaluation = False + + evaluator = Evaluator( + config=config, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + program_code = "# SLEEP_LONG\ndef test(): return 'long'" start_time = time.time() - - result = await self.evaluator.evaluate_program(program_code, "test_retries") - + + result = await evaluator.evaluate_program(program_code, "test_retries") + elapsed_time = time.time() - start_time - + # Should timeout on each retry (3 total attempts) - # Each attempt should take ~5 seconds, so total should be ~15 seconds - # But since timeouts happen in parallel, it should be closer to 5 seconds per attempt - expected_time = self.config.timeout * (self.evaluator.config.max_retries + 1) - self.assertGreater(elapsed_time, expected_time - 2) + # Each attempt should take ~5 seconds + expected_time = 5 * (config.max_retries + 1) + self.assertGreater(elapsed_time, expected_time - 3) self.assertLess(elapsed_time, expected_time + 5) - + # Should return timeout result after all retries fail self.assertIn("error", result) self.assertEqual(result["error"], 0.0) @@ -309,20 +298,20 @@ async def run_test(): def test_artifacts_on_timeout(self): """Test that timeout artifacts are properly captured""" - async def run_test(): # Enable artifacts - with patch.dict(os.environ, {"ENABLE_ARTIFACTS": "true"}): + with patch.dict(os.environ, {'ENABLE_ARTIFACTS': 'true'}): + evaluator = self._create_evaluator(timeout=5) program_code = "# SLEEP_LONG\ndef test(): return 'long'" - - result = await self.evaluator.evaluate_program(program_code, "test_artifacts") - + + result = await evaluator.evaluate_program(program_code, "test_artifacts") + # Should have timeout result self.assertIn("timeout", result) self.assertTrue(result["timeout"]) - + # Should have captured artifacts - artifacts = self.evaluator.get_pending_artifacts("test_artifacts") + artifacts = evaluator.get_pending_artifacts("test_artifacts") self.assertIsNotNone(artifacts) self.assertIn("failure_stage", artifacts) @@ -334,21 +323,20 @@ class TestTimeoutIntegration(unittest.TestCase): def test_real_world_scenario(self): """Test a scenario similar to the reported bug""" - async def run_test(): # Create a test evaluation file that simulates a long-running evaluation - test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) - - test_eval_file.write( - """ + test_eval_file = tempfile.NamedTemporaryFile( + mode='w', suffix='.py', delete=False + ) + + test_eval_file.write(""" import time def evaluate(program_path): # Simulate a very long evaluation (like the 11-hour case) time.sleep(20) # 20 seconds to test timeout return {"accReturn": 0.1, "CalmarRatio": 0.9, "combined_score": 0.82} -""" - ) +""") test_eval_file.close() try: @@ -380,7 +368,7 @@ def search_algorithm(): # Should timeout in ~5 seconds, not 20+ seconds self.assertLess(elapsed_time, 8) self.assertGreater(elapsed_time, 4) - + # Should return timeout error self.assertIn("error", result) self.assertIn("timeout", result) From 08c5b801ae29d4da57f98766a2570763c1cc382d Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 16:56:08 +0800 Subject: [PATCH 5/8] f --- examples/mlx_metal_kernel_opt/README.md | 28 +--- openevolve/evaluator.py | 184 ++++++++------------- openevolve/utils/async_utils.py | 24 +-- tests/test_evaluator_timeout.py | 205 ++++++++++++++++-------- 4 files changed, 216 insertions(+), 225 deletions(-) diff --git a/examples/mlx_metal_kernel_opt/README.md b/examples/mlx_metal_kernel_opt/README.md index 54d300c15..bb83fdea0 100644 --- a/examples/mlx_metal_kernel_opt/README.md +++ b/examples/mlx_metal_kernel_opt/README.md @@ -1,4 +1,4 @@ -# 🎯 Qwen3-0.6B Custom Metal Kernel Optimization with OpenEvolve +# 🎯Custom Metal Kernel Optimization with OpenEvolve **Evolving custom GPU kernels for Grouped Query Attention using MLX Metal kernels for Qwen3-0.6B on Apple Silicon** @@ -416,29 +416,3 @@ python run_benchmarks.py --mode compare --- **🎯 This example demonstrates OpenEvolve's capability to discover genuine algorithmic improvements through evolutionary optimization, achieving measurable performance gains on real hardware with production-ready implementations.** - -## πŸ”§ **Recent Improvements** - -### **βœ… Correct Terminology** -- **Before**: Incorrect references to "chunked GQA processing" -- **After**: Accurate descriptions of custom Metal kernel optimization -- **Benefits**: Technical accuracy and clear understanding of actual discoveries - -### **βœ… Comprehensive Testing** -- **Before**: Basic performance measurement -- **After**: 17-scenario comprehensive benchmark suite with statistical validation -- **Benefits**: Robust performance analysis and reproducible results - -### **βœ… Production Integration** -- **Before**: Standalone optimization experiments -- **After**: Full MLX-LM integration with seamless switching -- **Benefits**: Real-world usability and easy adoption - -### **βœ… Detailed Documentation** -- **Before**: High-level optimization descriptions -- **After**: Complete technical details with actual kernel code snippets -- **Benefits**: Understanding, reproducibility, and further research - ---- - -**πŸš€ Ready for custom Metal kernel evolution with comprehensive benchmarking and detailed analysis!** diff --git a/openevolve/evaluator.py b/openevolve/evaluator.py index 6841b63d1..7d89110bb 100644 --- a/openevolve/evaluator.py +++ b/openevolve/evaluator.py @@ -134,6 +134,20 @@ async def evaluate_program( # Process the result based on type eval_result = self._process_evaluation_result(result) + # Check if this was a timeout and capture artifacts if enabled + if artifacts_enabled and program_id and eval_result.metrics.get("timeout") is True: + if program_id not in self._pending_artifacts: + self._pending_artifacts[program_id] = {} + + self._pending_artifacts[program_id].update( + { + "timeout": True, + "timeout_duration": self.config.timeout, + "failure_stage": "evaluation", + "error_type": "timeout", + } + ) + # Add LLM feedback if configured llm_eval_result = None if self.config.use_llm_feedback and self.llm_ensemble: @@ -153,7 +167,8 @@ async def evaluate_program( ) and program_id ): - self._pending_artifacts[program_id] = {} + if program_id not in self._pending_artifacts: + self._pending_artifacts[program_id] = {} # Merge eval_result artifacts with llm artifacts if they exist if eval_result.has_artifacts(): @@ -179,6 +194,21 @@ async def evaluate_program( # Return just metrics for backward compatibility return eval_result.metrics + except asyncio.TimeoutError: + # Handle timeout specially - don't retry, just return timeout result + logger.warning(f"Evaluation timed out after {self.config.timeout}s") + + # Capture timeout artifacts if enabled + if artifacts_enabled and program_id: + self._pending_artifacts[program_id] = { + "timeout": True, + "timeout_duration": self.config.timeout, + "failure_stage": "evaluation", + "error_type": "timeout", + } + + return {"error": 0.0, "timeout": True} + except Exception as e: last_exception = e logger.warning( @@ -192,6 +222,7 @@ async def evaluate_program( "stderr": str(e), "traceback": traceback.format_exc(), "failure_stage": "evaluation", + "attempt": attempt + 1, } # If this is not the last attempt, wait a bit before retrying @@ -251,30 +282,27 @@ async def _direct_evaluate(self, program_path: str) -> Dict[str, float]: Returns: Dictionary of metric name to score - """ - try: - # Create a coroutine that runs the evaluation function in an executor - async def run_evaluation(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, self.evaluate_function, program_path) - # Run the evaluation with timeout - result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout) + Raises: + asyncio.TimeoutError: If evaluation exceeds timeout + Exception: If evaluation function raises an exception + """ - # Validate result - if not isinstance(result, dict): - logger.warning(f"Evaluation returned non-dictionary result: {result}") - return {"error": 0.0} + # Create a coroutine that runs the evaluation function in an executor + async def run_evaluation(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self.evaluate_function, program_path) - return result + # Run the evaluation with timeout - let exceptions bubble up for retry handling + result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout) - except asyncio.TimeoutError: - logger.warning(f"Evaluation timed out after {self.config.timeout}s") - return {"error": 0.0, "timeout": True} - except Exception as e: - logger.error(f"Error in direct evaluation: {str(e)}") + # Validate result + if not isinstance(result, dict): + logger.warning(f"Evaluation returned non-dictionary result: {result}") return {"error": 0.0} + return result + async def _cascade_evaluate( self, program_path: str ) -> Union[Dict[str, float], EvaluationResult]: @@ -286,6 +314,10 @@ async def _cascade_evaluate( Returns: Dictionary of metrics or EvaluationResult with metrics and artifacts + + Raises: + asyncio.TimeoutError: If any stage exceeds timeout + Exception: If any evaluation stage raises an exception """ # Import the evaluation module to get cascade functions if they exist try: @@ -307,34 +339,12 @@ async def _cascade_evaluate( return await self._direct_evaluate(program_path) # Run first stage with timeout - try: + async def run_stage1(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage1, program_path) - async def run_stage1(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, module.evaluate_stage1, program_path) - - stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout) - stage1_eval_result = self._process_evaluation_result(stage1_result) - except asyncio.TimeoutError: - logger.warning(f"Stage 1 evaluation timed out after {self.config.timeout}s") - return EvaluationResult( - metrics={"stage1_passed": 0.0, "error": 0.0, "timeout": True}, - artifacts={ - "failure_stage": "stage1", - "timeout": True, - }, - ) - except Exception as e: - logger.error(f"Error in stage 1 evaluation: {str(e)}") - # Capture stage 1 failure as artifacts - return EvaluationResult( - metrics={"stage1_passed": 0.0, "error": 0.0}, - artifacts={ - "stderr": str(e), - "traceback": traceback.format_exc(), - "failure_stage": "stage1", - }, - ) + stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout) + stage1_eval_result = self._process_evaluation_result(stage1_result) # Check threshold if not self._passes_threshold( @@ -347,38 +357,12 @@ async def run_stage1(): return stage1_eval_result # Run second stage with timeout - try: - - async def run_stage2(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, module.evaluate_stage2, program_path) + async def run_stage2(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage2, program_path) - stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout) - stage2_eval_result = self._process_evaluation_result(stage2_result) - except asyncio.TimeoutError: - logger.warning(f"Stage 2 evaluation timed out after {self.config.timeout}s") - # Capture stage 2 failure, but keep stage 1 results - stage1_eval_result.artifacts.update( - { - "stage2_timeout": True, - "failure_stage": "stage2", - } - ) - stage1_eval_result.metrics["stage2_passed"] = 0.0 - stage1_eval_result.metrics["timeout"] = True - return stage1_eval_result - except Exception as e: - logger.error(f"Error in stage 2 evaluation: {str(e)}") - # Capture stage 2 failure, but keep stage 1 results - stage1_eval_result.artifacts.update( - { - "stage2_stderr": str(e), - "stage2_traceback": traceback.format_exc(), - "failure_stage": "stage2", - } - ) - stage1_eval_result.metrics["stage2_passed"] = 0.0 - return stage1_eval_result + stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout) + stage2_eval_result = self._process_evaluation_result(stage2_result) # Merge results from stage 1 and 2 merged_metrics = {} @@ -409,38 +393,12 @@ async def run_stage2(): return merged_result # Run third stage with timeout - try: + async def run_stage3(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage3, program_path) - async def run_stage3(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, module.evaluate_stage3, program_path) - - stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout) - stage3_eval_result = self._process_evaluation_result(stage3_result) - except asyncio.TimeoutError: - logger.warning(f"Stage 3 evaluation timed out after {self.config.timeout}s") - # Capture stage 3 failure, but keep previous results - merged_result.artifacts.update( - { - "stage3_timeout": True, - "failure_stage": "stage3", - } - ) - merged_result.metrics["stage3_passed"] = 0.0 - merged_result.metrics["timeout"] = True - return merged_result - except Exception as e: - logger.error(f"Error in stage 3 evaluation: {str(e)}") - # Capture stage 3 failure, but keep previous results - merged_result.artifacts.update( - { - "stage3_stderr": str(e), - "stage3_traceback": traceback.format_exc(), - "failure_stage": "stage3", - } - ) - merged_result.metrics["stage3_passed"] = 0.0 - return merged_result + stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout) + stage3_eval_result = self._process_evaluation_result(stage3_result) # Merge stage 3 results for name, value in stage3_eval_result.metrics.items(): @@ -453,14 +411,8 @@ async def run_stage3(): except Exception as e: logger.error(f"Error in cascade evaluation: {str(e)}") - return EvaluationResult( - metrics={"error": 0.0}, - artifacts={ - "stderr": str(e), - "traceback": traceback.format_exc(), - "failure_stage": "cascade_setup", - }, - ) + # Re-raise the exception to allow retry handling at higher level + raise async def _llm_evaluate(self, program_code: str, program_id: str = "") -> Dict[str, float]: """ diff --git a/openevolve/utils/async_utils.py b/openevolve/utils/async_utils.py index 245889536..ded1fed65 100644 --- a/openevolve/utils/async_utils.py +++ b/openevolve/utils/async_utils.py @@ -33,28 +33,24 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: async def run_with_timeout( - coro: Callable, - timeout: float, - *args: Any, - timeout_error_value: Any = None, - **kwargs: Any + coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any ) -> Any: """ Run a coroutine with a timeout, returning a default value on timeout - + Args: coro: Coroutine function to run timeout: Timeout in seconds *args: Arguments to pass to the coroutine timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the coroutine - + Returns: Result of the coroutine or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) except asyncio.TimeoutError: @@ -63,28 +59,24 @@ async def run_with_timeout( async def run_sync_with_timeout( - func: Callable, - timeout: float, - *args: Any, - timeout_error_value: Any = None, - **kwargs: Any + func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any ) -> Any: """ Run a synchronous function in an executor with a timeout - + Args: func: Synchronous function to run timeout: Timeout in seconds *args: Arguments to pass to the function timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) **kwargs: Keyword arguments to pass to the function - + Returns: Result of the function or timeout_error_value on timeout """ if timeout_error_value is None: timeout_error_value = {"error": 0.0, "timeout": True} - + try: loop = asyncio.get_event_loop() task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) diff --git a/tests/test_evaluator_timeout.py b/tests/test_evaluator_timeout.py index 437569dd5..87f050de9 100644 --- a/tests/test_evaluator_timeout.py +++ b/tests/test_evaluator_timeout.py @@ -19,12 +19,11 @@ class TestEvaluatorTimeout(unittest.TestCase): def setUp(self): """Set up test evaluation file""" # Create a test evaluation file - self.test_eval_file = tempfile.NamedTemporaryFile( - mode='w', suffix='.py', delete=False - ) - + self.test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + # Write test evaluation functions - self.test_eval_file.write(""" + self.test_eval_file.write( + """ import time def evaluate(program_path): @@ -40,6 +39,9 @@ def evaluate(program_path): # Sleep for a short time that should not timeout time.sleep(1) return {"score": 0.8} + elif 'RAISE_ERROR' in code: + # Raise an error to trigger retries + raise RuntimeError("Evaluation failed") else: # Fast evaluation return {"score": 0.5} @@ -73,7 +75,8 @@ def evaluate_stage3(program_path): return {"stage3_score": 1.0} else: return {"stage3_score": 0.9} -""") +""" + ) self.test_eval_file.close() def tearDown(self): @@ -98,15 +101,16 @@ def _create_evaluator(self, timeout=5, cascade_evaluation=False): def test_fast_evaluation_completes(self): """Test that fast evaluations complete successfully""" + async def run_test(): evaluator = self._create_evaluator(timeout=5) program_code = "def test(): return 'fast'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_fast") - + elapsed_time = time.time() - start_time - + # Should complete quickly self.assertLess(elapsed_time, 3.0) # Should return successful result @@ -120,15 +124,16 @@ async def run_test(): def test_short_evaluation_completes(self): """Test that evaluations shorter than timeout complete successfully""" + async def run_test(): evaluator = self._create_evaluator(timeout=5) program_code = "# SLEEP_SHORT\ndef test(): return 'short'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_short") - + elapsed_time = time.time() - start_time - + # Should complete within timeout self.assertLess(elapsed_time, 5) # Should return successful result @@ -142,19 +147,20 @@ async def run_test(): def test_long_evaluation_times_out(self): """Test that long evaluations time out properly""" + async def run_test(): evaluator = self._create_evaluator(timeout=5) program_code = "# SLEEP_LONG\ndef test(): return 'long'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_long") - + elapsed_time = time.time() - start_time - + # Should complete around the timeout period (allowing some margin) self.assertGreater(elapsed_time, 4) self.assertLess(elapsed_time, 8) - + # Should return timeout result self.assertIn("error", result) self.assertEqual(result["error"], 0.0) @@ -165,19 +171,20 @@ async def run_test(): def test_cascade_evaluation_timeout_stage1(self): """Test timeout in cascade evaluation stage 1""" + async def run_test(): evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_cascade_stage1") - + elapsed_time = time.time() - start_time - + # Should timeout around the configured timeout self.assertGreater(elapsed_time, 4) self.assertLess(elapsed_time, 8) - + # Should return stage1 timeout result self.assertIn("stage1_passed", result) self.assertEqual(result["stage1_passed"], 0.0) @@ -188,19 +195,20 @@ async def run_test(): def test_cascade_evaluation_timeout_stage2(self): """Test timeout in cascade evaluation stage 2""" + async def run_test(): evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_cascade_stage2") - + elapsed_time = time.time() - start_time - + # Should timeout on stage 2, but stage 1 should complete first self.assertGreater(elapsed_time, 4) self.assertLess(elapsed_time, 8) - + # Should have stage1 result but stage2 timeout self.assertIn("stage1_score", result) self.assertEqual(result["stage1_score"], 0.7) @@ -213,19 +221,20 @@ async def run_test(): def test_cascade_evaluation_timeout_stage3(self): """Test timeout in cascade evaluation stage 3""" + async def run_test(): evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_cascade_stage3") - + elapsed_time = time.time() - start_time - + # Should timeout on stage 3, but stages 1 and 2 should complete first self.assertGreater(elapsed_time, 4) self.assertLess(elapsed_time, 8) - + # Should have stage1 and stage2 results but stage3 timeout self.assertIn("stage1_score", result) self.assertEqual(result["stage1_score"], 0.7) @@ -240,80 +249,143 @@ async def run_test(): def test_timeout_config_respected(self): """Test that the timeout configuration value is actually used""" + async def run_test(): # Create evaluator with different timeout evaluator = self._create_evaluator(timeout=10) - + program_code = "# SLEEP_LONG\ndef test(): return 'long'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_config") - + elapsed_time = time.time() - start_time - + # Should timeout around 10 seconds, not 5 self.assertGreater(elapsed_time, 9) self.assertLess(elapsed_time, 13) - + # Should return timeout result self.assertIn("timeout", result) self.assertTrue(result["timeout"]) asyncio.run(run_test()) - def test_multiple_retries_with_timeout(self): - """Test that retries work correctly with timeout""" + def test_multiple_retries_with_errors(self): + """Test that retries work correctly with actual errors (not timeouts)""" + async def run_test(): # Create evaluator with more retries config = EvaluatorConfig() - config.timeout = 5 + config.timeout = 10 # Long timeout to avoid timeout during this test config.max_retries = 2 # 3 total attempts config.cascade_evaluation = False - + evaluator = Evaluator( config=config, evaluation_file=self.test_eval_file.name, llm_ensemble=None, prompt_sampler=None, ) - - program_code = "# SLEEP_LONG\ndef test(): return 'long'" + + # Use RAISE_ERROR to trigger actual exceptions that will be retried + program_code = "# RAISE_ERROR\ndef test(): return 'error'" start_time = time.time() - + result = await evaluator.evaluate_program(program_code, "test_retries") - + elapsed_time = time.time() - start_time - - # Should timeout on each retry (3 total attempts) - # Each attempt should take ~5 seconds - expected_time = 5 * (config.max_retries + 1) - self.assertGreater(elapsed_time, expected_time - 3) - self.assertLess(elapsed_time, expected_time + 5) - - # Should return timeout result after all retries fail + + # Should have retried 3 times (max_retries=2 means 3 total attempts) + # Each attempt should fail quickly, plus 1 second sleep between retries + # So total time should be around 2-3 seconds (quick failures + 2 sleep periods) + self.assertGreater(elapsed_time, 1.8) # At least 2 sleep periods + self.assertLess(elapsed_time, 5) # But not too long + + # Should return error result after all retries fail self.assertIn("error", result) self.assertEqual(result["error"], 0.0) asyncio.run(run_test()) + def test_timeout_does_not_trigger_retries(self): + """Test that timeouts do not trigger retries (correct behavior)""" + + async def run_test(): + # Create evaluator with retries enabled + config = EvaluatorConfig() + config.timeout = 3 # Short timeout + config.max_retries = 2 # Would allow 3 attempts if retries were triggered + config.cascade_evaluation = False + + evaluator = Evaluator( + config=config, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + # Use SLEEP_LONG to trigger timeout + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_timeout_no_retry") + + elapsed_time = time.time() - start_time + + # Should timeout only once (~3 seconds), not retry multiple times + # If retries were happening, this would take ~9 seconds + self.assertGreater(elapsed_time, 2.5) # At least the timeout period + self.assertLess(elapsed_time, 5) # But not multiple timeout periods + + # Should return timeout result + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + def test_artifacts_on_timeout(self): """Test that timeout artifacts are properly captured""" + async def run_test(): # Enable artifacts - with patch.dict(os.environ, {'ENABLE_ARTIFACTS': 'true'}): + with patch.dict(os.environ, {"ENABLE_ARTIFACTS": "true"}): evaluator = self._create_evaluator(timeout=5) program_code = "# SLEEP_LONG\ndef test(): return 'long'" - + + # Execute evaluation result = await evaluator.evaluate_program(program_code, "test_artifacts") - - # Should have timeout result - self.assertIn("timeout", result) - self.assertTrue(result["timeout"]) - - # Should have captured artifacts + + # Verify timeout occurred + self.assertIn("timeout", result, "Result should contain timeout flag") + self.assertTrue(result["timeout"], "Timeout flag should be True") + + # Verify artifacts were captured artifacts = evaluator.get_pending_artifacts("test_artifacts") - self.assertIsNotNone(artifacts) - self.assertIn("failure_stage", artifacts) + self.assertIsNotNone(artifacts, "Artifacts should not be None") + + # Verify required artifact fields + self.assertIn("failure_stage", artifacts, "Artifacts should contain failure_stage") + self.assertEqual( + artifacts["failure_stage"], "evaluation", "failure_stage should be 'evaluation'" + ) + + self.assertIn("timeout", artifacts, "Artifacts should contain timeout flag") + self.assertTrue(artifacts["timeout"], "Artifact timeout flag should be True") + + self.assertIn("error_type", artifacts, "Artifacts should contain error_type") + self.assertEqual( + artifacts["error_type"], "timeout", "error_type should be 'timeout'" + ) + + self.assertIn( + "timeout_duration", artifacts, "Artifacts should contain timeout_duration" + ) + self.assertEqual( + artifacts["timeout_duration"], 5, "timeout_duration should match config" + ) + + print(f"βœ… Artifacts captured correctly: {list(artifacts.keys())}") asyncio.run(run_test()) @@ -323,20 +395,21 @@ class TestTimeoutIntegration(unittest.TestCase): def test_real_world_scenario(self): """Test a scenario similar to the reported bug""" + async def run_test(): # Create a test evaluation file that simulates a long-running evaluation - test_eval_file = tempfile.NamedTemporaryFile( - mode='w', suffix='.py', delete=False - ) - - test_eval_file.write(""" + test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + + test_eval_file.write( + """ import time def evaluate(program_path): # Simulate a very long evaluation (like the 11-hour case) time.sleep(20) # 20 seconds to test timeout return {"accReturn": 0.1, "CalmarRatio": 0.9, "combined_score": 0.82} -""") +""" + ) test_eval_file.close() try: @@ -368,7 +441,7 @@ def search_algorithm(): # Should timeout in ~5 seconds, not 20+ seconds self.assertLess(elapsed_time, 8) self.assertGreater(elapsed_time, 4) - + # Should return timeout error self.assertIn("error", result) self.assertIn("timeout", result) From 16e683dc3a90d267e2c03102cf4ea86d864c70ca Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 16:57:10 +0800 Subject: [PATCH 6/8] bump versions for new release --- pyproject.toml | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7f09ea376..98f524d8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openevolve" -version = "0.0.1" +version = "0.0.2" description = "Open-source implementation of AlphaEvolve" readme = "README.md" requires-python = ">=3.9" diff --git a/setup.py b/setup.py index 82f5b14f3..02c0f1253 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="openevolve", - version="0.0.1", + version="0.0.2", packages=find_packages(), include_package_data=True, ) From cb0a98439a40f5fecdd3fbd36ca302c8537ec8d8 Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 16:57:33 +0800 Subject: [PATCH 7/8] Update test_evaluator_timeout.py --- tests/test_evaluator_timeout.py | 88 ++++++++++++++++----------------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/tests/test_evaluator_timeout.py b/tests/test_evaluator_timeout.py index 87f050de9..d9053e4a0 100644 --- a/tests/test_evaluator_timeout.py +++ b/tests/test_evaluator_timeout.py @@ -21,7 +21,7 @@ def setUp(self): # Create a test evaluation file self.test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) - # Write test evaluation functions + # Write test evaluation functions with shorter sleep times for faster tests self.test_eval_file.write( """ import time @@ -32,12 +32,12 @@ def evaluate(program_path): code = f.read() if 'SLEEP_LONG' in code: - # Sleep for a long time to trigger timeout - time.sleep(30) + # Sleep for a long time to trigger timeout (reduced for faster tests) + time.sleep(8) return {"score": 1.0} elif 'SLEEP_SHORT' in code: # Sleep for a short time that should not timeout - time.sleep(1) + time.sleep(0.5) return {"score": 0.8} elif 'RAISE_ERROR' in code: # Raise an error to trigger retries @@ -51,7 +51,7 @@ def evaluate_stage1(program_path): code = f.read() if 'STAGE1_TIMEOUT' in code: - time.sleep(30) + time.sleep(8) return {"stage1_score": 1.0} else: return {"stage1_score": 0.7} @@ -61,7 +61,7 @@ def evaluate_stage2(program_path): code = f.read() if 'STAGE2_TIMEOUT' in code: - time.sleep(30) + time.sleep(8) return {"stage2_score": 1.0} else: return {"stage2_score": 0.8} @@ -71,7 +71,7 @@ def evaluate_stage3(program_path): code = f.read() if 'STAGE3_TIMEOUT' in code: - time.sleep(30) + time.sleep(8) return {"stage3_score": 1.0} else: return {"stage3_score": 0.9} @@ -84,8 +84,8 @@ def tearDown(self): if os.path.exists(self.test_eval_file.name): os.unlink(self.test_eval_file.name) - def _create_evaluator(self, timeout=5, cascade_evaluation=False): - """Helper to create evaluator with given settings""" + def _create_evaluator(self, timeout=3, cascade_evaluation=False): + """Helper to create evaluator with given settings (shorter timeout for faster tests)""" config = EvaluatorConfig() config.timeout = timeout config.max_retries = 1 # Minimal retries for faster testing @@ -103,7 +103,7 @@ def test_fast_evaluation_completes(self): """Test that fast evaluations complete successfully""" async def run_test(): - evaluator = self._create_evaluator(timeout=5) + evaluator = self._create_evaluator(timeout=3) program_code = "def test(): return 'fast'" start_time = time.time() @@ -112,7 +112,7 @@ async def run_test(): elapsed_time = time.time() - start_time # Should complete quickly - self.assertLess(elapsed_time, 3.0) + self.assertLess(elapsed_time, 2.0) # Should return successful result self.assertIn("score", result) self.assertEqual(result["score"], 0.5) @@ -126,7 +126,7 @@ def test_short_evaluation_completes(self): """Test that evaluations shorter than timeout complete successfully""" async def run_test(): - evaluator = self._create_evaluator(timeout=5) + evaluator = self._create_evaluator(timeout=3) program_code = "# SLEEP_SHORT\ndef test(): return 'short'" start_time = time.time() @@ -135,7 +135,7 @@ async def run_test(): elapsed_time = time.time() - start_time # Should complete within timeout - self.assertLess(elapsed_time, 5) + self.assertLess(elapsed_time, 3) # Should return successful result self.assertIn("score", result) self.assertEqual(result["score"], 0.8) @@ -149,7 +149,7 @@ def test_long_evaluation_times_out(self): """Test that long evaluations time out properly""" async def run_test(): - evaluator = self._create_evaluator(timeout=5) + evaluator = self._create_evaluator(timeout=3) program_code = "# SLEEP_LONG\ndef test(): return 'long'" start_time = time.time() @@ -158,8 +158,8 @@ async def run_test(): elapsed_time = time.time() - start_time # Should complete around the timeout period (allowing some margin) - self.assertGreater(elapsed_time, 4) - self.assertLess(elapsed_time, 8) + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) # Should return timeout result self.assertIn("error", result) @@ -173,7 +173,7 @@ def test_cascade_evaluation_timeout_stage1(self): """Test timeout in cascade evaluation stage 1""" async def run_test(): - evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) + evaluator = self._create_evaluator(timeout=3, cascade_evaluation=True) program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" start_time = time.time() @@ -182,8 +182,8 @@ async def run_test(): elapsed_time = time.time() - start_time # Should timeout around the configured timeout - self.assertGreater(elapsed_time, 4) - self.assertLess(elapsed_time, 8) + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) # Should return stage1 timeout result self.assertIn("stage1_passed", result) @@ -197,7 +197,7 @@ def test_cascade_evaluation_timeout_stage2(self): """Test timeout in cascade evaluation stage 2""" async def run_test(): - evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) + evaluator = self._create_evaluator(timeout=3, cascade_evaluation=True) program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" start_time = time.time() @@ -206,8 +206,8 @@ async def run_test(): elapsed_time = time.time() - start_time # Should timeout on stage 2, but stage 1 should complete first - self.assertGreater(elapsed_time, 4) - self.assertLess(elapsed_time, 8) + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) # Should have stage1 result but stage2 timeout self.assertIn("stage1_score", result) @@ -223,7 +223,7 @@ def test_cascade_evaluation_timeout_stage3(self): """Test timeout in cascade evaluation stage 3""" async def run_test(): - evaluator = self._create_evaluator(timeout=5, cascade_evaluation=True) + evaluator = self._create_evaluator(timeout=3, cascade_evaluation=True) program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" start_time = time.time() @@ -232,8 +232,8 @@ async def run_test(): elapsed_time = time.time() - start_time # Should timeout on stage 3, but stages 1 and 2 should complete first - self.assertGreater(elapsed_time, 4) - self.assertLess(elapsed_time, 8) + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) # Should have stage1 and stage2 results but stage3 timeout self.assertIn("stage1_score", result) @@ -252,7 +252,7 @@ def test_timeout_config_respected(self): async def run_test(): # Create evaluator with different timeout - evaluator = self._create_evaluator(timeout=10) + evaluator = self._create_evaluator(timeout=5) program_code = "# SLEEP_LONG\ndef test(): return 'long'" start_time = time.time() @@ -261,9 +261,9 @@ async def run_test(): elapsed_time = time.time() - start_time - # Should timeout around 10 seconds, not 5 - self.assertGreater(elapsed_time, 9) - self.assertLess(elapsed_time, 13) + # Should timeout around 5 seconds, not 3 + self.assertGreater(elapsed_time, 4.5) + self.assertLess(elapsed_time, 7) # Should return timeout result self.assertIn("timeout", result) @@ -277,7 +277,7 @@ def test_multiple_retries_with_errors(self): async def run_test(): # Create evaluator with more retries config = EvaluatorConfig() - config.timeout = 10 # Long timeout to avoid timeout during this test + config.timeout = 8 # Long timeout to avoid timeout during this test config.max_retries = 2 # 3 total attempts config.cascade_evaluation = False @@ -300,7 +300,7 @@ async def run_test(): # Each attempt should fail quickly, plus 1 second sleep between retries # So total time should be around 2-3 seconds (quick failures + 2 sleep periods) self.assertGreater(elapsed_time, 1.8) # At least 2 sleep periods - self.assertLess(elapsed_time, 5) # But not too long + self.assertLess(elapsed_time, 4) # But not too long # Should return error result after all retries fail self.assertIn("error", result) @@ -314,7 +314,7 @@ def test_timeout_does_not_trigger_retries(self): async def run_test(): # Create evaluator with retries enabled config = EvaluatorConfig() - config.timeout = 3 # Short timeout + config.timeout = 2 # Short timeout config.max_retries = 2 # Would allow 3 attempts if retries were triggered config.cascade_evaluation = False @@ -333,10 +333,10 @@ async def run_test(): elapsed_time = time.time() - start_time - # Should timeout only once (~3 seconds), not retry multiple times - # If retries were happening, this would take ~9 seconds - self.assertGreater(elapsed_time, 2.5) # At least the timeout period - self.assertLess(elapsed_time, 5) # But not multiple timeout periods + # Should timeout only once (~2 seconds), not retry multiple times + # If retries were happening, this would take ~6 seconds + self.assertGreater(elapsed_time, 1.8) # At least the timeout period + self.assertLess(elapsed_time, 3.5) # But not multiple timeout periods # Should return timeout result self.assertIn("timeout", result) @@ -350,7 +350,7 @@ def test_artifacts_on_timeout(self): async def run_test(): # Enable artifacts with patch.dict(os.environ, {"ENABLE_ARTIFACTS": "true"}): - evaluator = self._create_evaluator(timeout=5) + evaluator = self._create_evaluator(timeout=3) program_code = "# SLEEP_LONG\ndef test(): return 'long'" # Execute evaluation @@ -382,7 +382,7 @@ async def run_test(): "timeout_duration", artifacts, "Artifacts should contain timeout_duration" ) self.assertEqual( - artifacts["timeout_duration"], 5, "timeout_duration should match config" + artifacts["timeout_duration"], 3, "timeout_duration should match config" ) print(f"βœ… Artifacts captured correctly: {list(artifacts.keys())}") @@ -405,8 +405,8 @@ async def run_test(): import time def evaluate(program_path): - # Simulate a very long evaluation (like the 11-hour case) - time.sleep(20) # 20 seconds to test timeout + # Simulate a very long evaluation (like the 11-hour case) + time.sleep(6) # 6 seconds to test timeout (reduced for faster tests) return {"accReturn": 0.1, "CalmarRatio": 0.9, "combined_score": 0.82} """ ) @@ -415,7 +415,7 @@ def evaluate(program_path): try: # Configure like user's config but with shorter timeout for testing config = EvaluatorConfig() - config.timeout = 5 # 5 seconds instead of 600 + config.timeout = 3 # 3 seconds instead of 600 config.max_retries = 1 config.cascade_evaluation = False config.parallel_evaluations = 1 @@ -438,9 +438,9 @@ def search_algorithm(): result = await evaluator.evaluate_program(program_code, "financial_test") elapsed_time = time.time() - start_time - # Should timeout in ~5 seconds, not 20+ seconds - self.assertLess(elapsed_time, 8) - self.assertGreater(elapsed_time, 4) + # Should timeout in ~3 seconds, not 6+ seconds + self.assertLess(elapsed_time, 5) + self.assertGreater(elapsed_time, 2.5) # Should return timeout error self.assertIn("error", result) From 0fac26a05fd71fa1290002748eafdce18c3c599d Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Thu, 19 Jun 2025 17:06:28 +0800 Subject: [PATCH 8/8] Update evaluator.py --- openevolve/evaluator.py | 119 +++++++++++++++++++++++++++++++++------- 1 file changed, 98 insertions(+), 21 deletions(-) diff --git a/openevolve/evaluator.py b/openevolve/evaluator.py index 7d89110bb..dfe966f50 100644 --- a/openevolve/evaluator.py +++ b/openevolve/evaluator.py @@ -314,10 +314,6 @@ async def _cascade_evaluate( Returns: Dictionary of metrics or EvaluationResult with metrics and artifacts - - Raises: - asyncio.TimeoutError: If any stage exceeds timeout - Exception: If any evaluation stage raises an exception """ # Import the evaluation module to get cascade functions if they exist try: @@ -339,12 +335,34 @@ async def _cascade_evaluate( return await self._direct_evaluate(program_path) # Run first stage with timeout - async def run_stage1(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, module.evaluate_stage1, program_path) + try: - stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout) - stage1_eval_result = self._process_evaluation_result(stage1_result) + async def run_stage1(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage1, program_path) + + stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout) + stage1_eval_result = self._process_evaluation_result(stage1_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 1 evaluation timed out after {self.config.timeout}s") + return EvaluationResult( + metrics={"stage1_passed": 0.0, "error": 0.0, "timeout": True}, + artifacts={ + "failure_stage": "stage1", + "timeout": True, + }, + ) + except Exception as e: + logger.error(f"Error in stage 1 evaluation: {str(e)}") + # Capture stage 1 failure as artifacts + return EvaluationResult( + metrics={"stage1_passed": 0.0, "error": 0.0}, + artifacts={ + "stderr": str(e), + "traceback": traceback.format_exc(), + "failure_stage": "stage1", + }, + ) # Check threshold if not self._passes_threshold( @@ -357,12 +375,38 @@ async def run_stage1(): return stage1_eval_result # Run second stage with timeout - async def run_stage2(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, module.evaluate_stage2, program_path) + try: - stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout) - stage2_eval_result = self._process_evaluation_result(stage2_result) + async def run_stage2(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage2, program_path) + + stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout) + stage2_eval_result = self._process_evaluation_result(stage2_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 2 evaluation timed out after {self.config.timeout}s") + # Capture stage 2 failure, but keep stage 1 results + stage1_eval_result.artifacts.update( + { + "stage2_timeout": True, + "failure_stage": "stage2", + } + ) + stage1_eval_result.metrics["stage2_passed"] = 0.0 + stage1_eval_result.metrics["timeout"] = True + return stage1_eval_result + except Exception as e: + logger.error(f"Error in stage 2 evaluation: {str(e)}") + # Capture stage 2 failure, but keep stage 1 results + stage1_eval_result.artifacts.update( + { + "stage2_stderr": str(e), + "stage2_traceback": traceback.format_exc(), + "failure_stage": "stage2", + } + ) + stage1_eval_result.metrics["stage2_passed"] = 0.0 + return stage1_eval_result # Merge results from stage 1 and 2 merged_metrics = {} @@ -393,12 +437,38 @@ async def run_stage2(): return merged_result # Run third stage with timeout - async def run_stage3(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, module.evaluate_stage3, program_path) + try: - stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout) - stage3_eval_result = self._process_evaluation_result(stage3_result) + async def run_stage3(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage3, program_path) + + stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout) + stage3_eval_result = self._process_evaluation_result(stage3_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 3 evaluation timed out after {self.config.timeout}s") + # Capture stage 3 failure, but keep previous results + merged_result.artifacts.update( + { + "stage3_timeout": True, + "failure_stage": "stage3", + } + ) + merged_result.metrics["stage3_passed"] = 0.0 + merged_result.metrics["timeout"] = True + return merged_result + except Exception as e: + logger.error(f"Error in stage 3 evaluation: {str(e)}") + # Capture stage 3 failure, but keep previous results + merged_result.artifacts.update( + { + "stage3_stderr": str(e), + "stage3_traceback": traceback.format_exc(), + "failure_stage": "stage3", + } + ) + merged_result.metrics["stage3_passed"] = 0.0 + return merged_result # Merge stage 3 results for name, value in stage3_eval_result.metrics.items(): @@ -411,8 +481,15 @@ async def run_stage3(): except Exception as e: logger.error(f"Error in cascade evaluation: {str(e)}") - # Re-raise the exception to allow retry handling at higher level - raise + # Return proper cascade failure result instead of re-raising + return EvaluationResult( + metrics={"stage1_passed": 0.0, "error": 0.0}, + artifacts={ + "stderr": str(e), + "traceback": traceback.format_exc(), + "failure_stage": "cascade_setup", + }, + ) async def _llm_evaluate(self, program_code: str, program_id: str = "") -> Dict[str, float]: """