diff --git a/examples/mlx_metal_kernel_opt/README.md b/examples/mlx_metal_kernel_opt/README.md index 54d300c15..bb83fdea0 100644 --- a/examples/mlx_metal_kernel_opt/README.md +++ b/examples/mlx_metal_kernel_opt/README.md @@ -1,4 +1,4 @@ -# 🎯 Qwen3-0.6B Custom Metal Kernel Optimization with OpenEvolve +# 🎯Custom Metal Kernel Optimization with OpenEvolve **Evolving custom GPU kernels for Grouped Query Attention using MLX Metal kernels for Qwen3-0.6B on Apple Silicon** @@ -416,29 +416,3 @@ python run_benchmarks.py --mode compare --- **🎯 This example demonstrates OpenEvolve's capability to discover genuine algorithmic improvements through evolutionary optimization, achieving measurable performance gains on real hardware with production-ready implementations.** - -## πŸ”§ **Recent Improvements** - -### **βœ… Correct Terminology** -- **Before**: Incorrect references to "chunked GQA processing" -- **After**: Accurate descriptions of custom Metal kernel optimization -- **Benefits**: Technical accuracy and clear understanding of actual discoveries - -### **βœ… Comprehensive Testing** -- **Before**: Basic performance measurement -- **After**: 17-scenario comprehensive benchmark suite with statistical validation -- **Benefits**: Robust performance analysis and reproducible results - -### **βœ… Production Integration** -- **Before**: Standalone optimization experiments -- **After**: Full MLX-LM integration with seamless switching -- **Benefits**: Real-world usability and easy adoption - -### **βœ… Detailed Documentation** -- **Before**: High-level optimization descriptions -- **After**: Complete technical details with actual kernel code snippets -- **Benefits**: Understanding, reproducibility, and further research - ---- - -**πŸš€ Ready for custom Metal kernel evolution with comprehensive benchmarking and detailed analysis!** diff --git a/openevolve/evaluator.py b/openevolve/evaluator.py index 1af482bb2..dfe966f50 100644 --- a/openevolve/evaluator.py +++ b/openevolve/evaluator.py @@ -134,6 +134,20 @@ async def evaluate_program( # Process the result based on type eval_result = self._process_evaluation_result(result) + # Check if this was a timeout and capture artifacts if enabled + if artifacts_enabled and program_id and eval_result.metrics.get("timeout") is True: + if program_id not in self._pending_artifacts: + self._pending_artifacts[program_id] = {} + + self._pending_artifacts[program_id].update( + { + "timeout": True, + "timeout_duration": self.config.timeout, + "failure_stage": "evaluation", + "error_type": "timeout", + } + ) + # Add LLM feedback if configured llm_eval_result = None if self.config.use_llm_feedback and self.llm_ensemble: @@ -153,7 +167,8 @@ async def evaluate_program( ) and program_id ): - self._pending_artifacts[program_id] = {} + if program_id not in self._pending_artifacts: + self._pending_artifacts[program_id] = {} # Merge eval_result artifacts with llm artifacts if they exist if eval_result.has_artifacts(): @@ -179,6 +194,21 @@ async def evaluate_program( # Return just metrics for backward compatibility return eval_result.metrics + except asyncio.TimeoutError: + # Handle timeout specially - don't retry, just return timeout result + logger.warning(f"Evaluation timed out after {self.config.timeout}s") + + # Capture timeout artifacts if enabled + if artifacts_enabled and program_id: + self._pending_artifacts[program_id] = { + "timeout": True, + "timeout_duration": self.config.timeout, + "failure_stage": "evaluation", + "error_type": "timeout", + } + + return {"error": 0.0, "timeout": True} + except Exception as e: last_exception = e logger.warning( @@ -192,6 +222,7 @@ async def evaluate_program( "stderr": str(e), "traceback": traceback.format_exc(), "failure_stage": "evaluation", + "attempt": attempt + 1, } # If this is not the last attempt, wait a bit before retrying @@ -242,32 +273,36 @@ def get_pending_artifacts(self, program_id: str) -> Optional[Dict[str, Union[str """ return self._pending_artifacts.pop(program_id, None) - @run_in_executor - def _direct_evaluate(self, program_path: str) -> Dict[str, float]: + async def _direct_evaluate(self, program_path: str) -> Dict[str, float]: """ - Directly evaluate a program using the evaluation function + Directly evaluate a program using the evaluation function with timeout Args: program_path: Path to the program file Returns: Dictionary of metric name to score + + Raises: + asyncio.TimeoutError: If evaluation exceeds timeout + Exception: If evaluation function raises an exception """ - try: - # Run the evaluation with timeout - result = self.evaluate_function(program_path) - # Validate result - if not isinstance(result, dict): - logger.warning(f"Evaluation returned non-dictionary result: {result}") - return {"error": 0.0} + # Create a coroutine that runs the evaluation function in an executor + async def run_evaluation(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self.evaluate_function, program_path) - return result + # Run the evaluation with timeout - let exceptions bubble up for retry handling + result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout) - except Exception as e: - logger.error(f"Error in direct evaluation: {str(e)}") + # Validate result + if not isinstance(result, dict): + logger.warning(f"Evaluation returned non-dictionary result: {result}") return {"error": 0.0} + return result + async def _cascade_evaluate( self, program_path: str ) -> Union[Dict[str, float], EvaluationResult]: @@ -299,10 +334,24 @@ async def _cascade_evaluate( if not hasattr(module, "evaluate_stage1"): return await self._direct_evaluate(program_path) - # Run first stage + # Run first stage with timeout try: - stage1_result = await run_in_executor(module.evaluate_stage1)(program_path) + + async def run_stage1(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage1, program_path) + + stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout) stage1_eval_result = self._process_evaluation_result(stage1_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 1 evaluation timed out after {self.config.timeout}s") + return EvaluationResult( + metrics={"stage1_passed": 0.0, "error": 0.0, "timeout": True}, + artifacts={ + "failure_stage": "stage1", + "timeout": True, + }, + ) except Exception as e: logger.error(f"Error in stage 1 evaluation: {str(e)}") # Capture stage 1 failure as artifacts @@ -325,10 +374,27 @@ async def _cascade_evaluate( if not hasattr(module, "evaluate_stage2"): return stage1_eval_result - # Run second stage + # Run second stage with timeout try: - stage2_result = await run_in_executor(module.evaluate_stage2)(program_path) + + async def run_stage2(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage2, program_path) + + stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout) stage2_eval_result = self._process_evaluation_result(stage2_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 2 evaluation timed out after {self.config.timeout}s") + # Capture stage 2 failure, but keep stage 1 results + stage1_eval_result.artifacts.update( + { + "stage2_timeout": True, + "failure_stage": "stage2", + } + ) + stage1_eval_result.metrics["stage2_passed"] = 0.0 + stage1_eval_result.metrics["timeout"] = True + return stage1_eval_result except Exception as e: logger.error(f"Error in stage 2 evaluation: {str(e)}") # Capture stage 2 failure, but keep stage 1 results @@ -370,10 +436,27 @@ async def _cascade_evaluate( if not hasattr(module, "evaluate_stage3"): return merged_result - # Run third stage + # Run third stage with timeout try: - stage3_result = await run_in_executor(module.evaluate_stage3)(program_path) + + async def run_stage3(): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, module.evaluate_stage3, program_path) + + stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout) stage3_eval_result = self._process_evaluation_result(stage3_result) + except asyncio.TimeoutError: + logger.warning(f"Stage 3 evaluation timed out after {self.config.timeout}s") + # Capture stage 3 failure, but keep previous results + merged_result.artifacts.update( + { + "stage3_timeout": True, + "failure_stage": "stage3", + } + ) + merged_result.metrics["stage3_passed"] = 0.0 + merged_result.metrics["timeout"] = True + return merged_result except Exception as e: logger.error(f"Error in stage 3 evaluation: {str(e)}") # Capture stage 3 failure, but keep previous results @@ -398,8 +481,9 @@ async def _cascade_evaluate( except Exception as e: logger.error(f"Error in cascade evaluation: {str(e)}") + # Return proper cascade failure result instead of re-raising return EvaluationResult( - metrics={"error": 0.0}, + metrics={"stage1_passed": 0.0, "error": 0.0}, artifacts={ "stderr": str(e), "traceback": traceback.format_exc(), diff --git a/openevolve/utils/async_utils.py b/openevolve/utils/async_utils.py index 872f0e5ee..ded1fed65 100644 --- a/openevolve/utils/async_utils.py +++ b/openevolve/utils/async_utils.py @@ -32,6 +32,60 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper +async def run_with_timeout( + coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any +) -> Any: + """ + Run a coroutine with a timeout, returning a default value on timeout + + Args: + coro: Coroutine function to run + timeout: Timeout in seconds + *args: Arguments to pass to the coroutine + timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) + **kwargs: Keyword arguments to pass to the coroutine + + Returns: + Result of the coroutine or timeout_error_value on timeout + """ + if timeout_error_value is None: + timeout_error_value = {"error": 0.0, "timeout": True} + + try: + return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) + except asyncio.TimeoutError: + logger.warning(f"Operation timed out after {timeout}s") + return timeout_error_value + + +async def run_sync_with_timeout( + func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any +) -> Any: + """ + Run a synchronous function in an executor with a timeout + + Args: + func: Synchronous function to run + timeout: Timeout in seconds + *args: Arguments to pass to the function + timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True}) + **kwargs: Keyword arguments to pass to the function + + Returns: + Result of the function or timeout_error_value on timeout + """ + if timeout_error_value is None: + timeout_error_value = {"error": 0.0, "timeout": True} + + try: + loop = asyncio.get_event_loop() + task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) + return await asyncio.wait_for(task, timeout=timeout) + except asyncio.TimeoutError: + logger.warning(f"Sync operation timed out after {timeout}s") + return timeout_error_value + + async def gather_with_concurrency( n: int, *tasks: asyncio.Future, return_exceptions: bool = False ) -> List[Any]: @@ -116,9 +170,17 @@ class TaskPool: """ def __init__(self, max_concurrency: int = 10): - self.semaphore = asyncio.Semaphore(max_concurrency) + self.max_concurrency = max_concurrency + self._semaphore: Optional[asyncio.Semaphore] = None self.tasks: List[asyncio.Task] = [] + @property + def semaphore(self) -> asyncio.Semaphore: + """Lazy-initialize the semaphore when first needed""" + if self._semaphore is None: + self._semaphore = asyncio.Semaphore(self.max_concurrency) + return self._semaphore + async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any: """ Run a coroutine in the pool diff --git a/pyproject.toml b/pyproject.toml index 7f09ea376..98f524d8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openevolve" -version = "0.0.1" +version = "0.0.2" description = "Open-source implementation of AlphaEvolve" readme = "README.md" requires-python = ">=3.9" diff --git a/setup.py b/setup.py index 82f5b14f3..02c0f1253 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="openevolve", - version="0.0.1", + version="0.0.2", packages=find_packages(), include_package_data=True, ) diff --git a/tests/test_evaluator_timeout.py b/tests/test_evaluator_timeout.py new file mode 100644 index 000000000..d9053e4a0 --- /dev/null +++ b/tests/test_evaluator_timeout.py @@ -0,0 +1,459 @@ +""" +Tests for evaluation timeout functionality in openevolve.evaluator +""" + +import asyncio +import os +import tempfile +import time +import unittest +from unittest.mock import patch, MagicMock + +from openevolve.config import EvaluatorConfig +from openevolve.evaluator import Evaluator + + +class TestEvaluatorTimeout(unittest.TestCase): + """Tests for evaluator timeout functionality""" + + def setUp(self): + """Set up test evaluation file""" + # Create a test evaluation file + self.test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + + # Write test evaluation functions with shorter sleep times for faster tests + self.test_eval_file.write( + """ +import time + +def evaluate(program_path): + # Read the program to determine behavior + with open(program_path, 'r') as f: + code = f.read() + + if 'SLEEP_LONG' in code: + # Sleep for a long time to trigger timeout (reduced for faster tests) + time.sleep(8) + return {"score": 1.0} + elif 'SLEEP_SHORT' in code: + # Sleep for a short time that should not timeout + time.sleep(0.5) + return {"score": 0.8} + elif 'RAISE_ERROR' in code: + # Raise an error to trigger retries + raise RuntimeError("Evaluation failed") + else: + # Fast evaluation + return {"score": 0.5} + +def evaluate_stage1(program_path): + with open(program_path, 'r') as f: + code = f.read() + + if 'STAGE1_TIMEOUT' in code: + time.sleep(8) + return {"stage1_score": 1.0} + else: + return {"stage1_score": 0.7} + +def evaluate_stage2(program_path): + with open(program_path, 'r') as f: + code = f.read() + + if 'STAGE2_TIMEOUT' in code: + time.sleep(8) + return {"stage2_score": 1.0} + else: + return {"stage2_score": 0.8} + +def evaluate_stage3(program_path): + with open(program_path, 'r') as f: + code = f.read() + + if 'STAGE3_TIMEOUT' in code: + time.sleep(8) + return {"stage3_score": 1.0} + else: + return {"stage3_score": 0.9} +""" + ) + self.test_eval_file.close() + + def tearDown(self): + """Clean up test files""" + if os.path.exists(self.test_eval_file.name): + os.unlink(self.test_eval_file.name) + + def _create_evaluator(self, timeout=3, cascade_evaluation=False): + """Helper to create evaluator with given settings (shorter timeout for faster tests)""" + config = EvaluatorConfig() + config.timeout = timeout + config.max_retries = 1 # Minimal retries for faster testing + config.cascade_evaluation = cascade_evaluation + config.cascade_thresholds = [0.5, 0.7, 0.9] + + return Evaluator( + config=config, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + def test_fast_evaluation_completes(self): + """Test that fast evaluations complete successfully""" + + async def run_test(): + evaluator = self._create_evaluator(timeout=3) + program_code = "def test(): return 'fast'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_fast") + + elapsed_time = time.time() - start_time + + # Should complete quickly + self.assertLess(elapsed_time, 2.0) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.5) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + asyncio.run(run_test()) + + def test_short_evaluation_completes(self): + """Test that evaluations shorter than timeout complete successfully""" + + async def run_test(): + evaluator = self._create_evaluator(timeout=3) + program_code = "# SLEEP_SHORT\ndef test(): return 'short'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_short") + + elapsed_time = time.time() - start_time + + # Should complete within timeout + self.assertLess(elapsed_time, 3) + # Should return successful result + self.assertIn("score", result) + self.assertEqual(result["score"], 0.8) + # Should not have timeout or error flags + self.assertNotIn("timeout", result) + self.assertNotIn("error", result) + + asyncio.run(run_test()) + + def test_long_evaluation_times_out(self): + """Test that long evaluations time out properly""" + + async def run_test(): + evaluator = self._create_evaluator(timeout=3) + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_long") + + elapsed_time = time.time() - start_time + + # Should complete around the timeout period (allowing some margin) + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) + + # Should return timeout result + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage1(self): + """Test timeout in cascade evaluation stage 1""" + + async def run_test(): + evaluator = self._create_evaluator(timeout=3, cascade_evaluation=True) + program_code = "# STAGE1_TIMEOUT\ndef test(): return 'stage1_timeout'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_cascade_stage1") + + elapsed_time = time.time() - start_time + + # Should timeout around the configured timeout + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) + + # Should return stage1 timeout result + self.assertIn("stage1_passed", result) + self.assertEqual(result["stage1_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage2(self): + """Test timeout in cascade evaluation stage 2""" + + async def run_test(): + evaluator = self._create_evaluator(timeout=3, cascade_evaluation=True) + program_code = "# STAGE2_TIMEOUT\ndef test(): return 'stage2_timeout'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_cascade_stage2") + + elapsed_time = time.time() - start_time + + # Should timeout on stage 2, but stage 1 should complete first + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) + + # Should have stage1 result but stage2 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_passed", result) + self.assertEqual(result["stage2_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_cascade_evaluation_timeout_stage3(self): + """Test timeout in cascade evaluation stage 3""" + + async def run_test(): + evaluator = self._create_evaluator(timeout=3, cascade_evaluation=True) + program_code = "# STAGE3_TIMEOUT\ndef test(): return 'stage3_timeout'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_cascade_stage3") + + elapsed_time = time.time() - start_time + + # Should timeout on stage 3, but stages 1 and 2 should complete first + self.assertGreater(elapsed_time, 2.5) + self.assertLess(elapsed_time, 5) + + # Should have stage1 and stage2 results but stage3 timeout + self.assertIn("stage1_score", result) + self.assertEqual(result["stage1_score"], 0.7) + self.assertIn("stage2_score", result) + self.assertEqual(result["stage2_score"], 0.8) + self.assertIn("stage3_passed", result) + self.assertEqual(result["stage3_passed"], 0.0) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_timeout_config_respected(self): + """Test that the timeout configuration value is actually used""" + + async def run_test(): + # Create evaluator with different timeout + evaluator = self._create_evaluator(timeout=5) + + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_config") + + elapsed_time = time.time() - start_time + + # Should timeout around 5 seconds, not 3 + self.assertGreater(elapsed_time, 4.5) + self.assertLess(elapsed_time, 7) + + # Should return timeout result + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_multiple_retries_with_errors(self): + """Test that retries work correctly with actual errors (not timeouts)""" + + async def run_test(): + # Create evaluator with more retries + config = EvaluatorConfig() + config.timeout = 8 # Long timeout to avoid timeout during this test + config.max_retries = 2 # 3 total attempts + config.cascade_evaluation = False + + evaluator = Evaluator( + config=config, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + # Use RAISE_ERROR to trigger actual exceptions that will be retried + program_code = "# RAISE_ERROR\ndef test(): return 'error'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_retries") + + elapsed_time = time.time() - start_time + + # Should have retried 3 times (max_retries=2 means 3 total attempts) + # Each attempt should fail quickly, plus 1 second sleep between retries + # So total time should be around 2-3 seconds (quick failures + 2 sleep periods) + self.assertGreater(elapsed_time, 1.8) # At least 2 sleep periods + self.assertLess(elapsed_time, 4) # But not too long + + # Should return error result after all retries fail + self.assertIn("error", result) + self.assertEqual(result["error"], 0.0) + + asyncio.run(run_test()) + + def test_timeout_does_not_trigger_retries(self): + """Test that timeouts do not trigger retries (correct behavior)""" + + async def run_test(): + # Create evaluator with retries enabled + config = EvaluatorConfig() + config.timeout = 2 # Short timeout + config.max_retries = 2 # Would allow 3 attempts if retries were triggered + config.cascade_evaluation = False + + evaluator = Evaluator( + config=config, + evaluation_file=self.test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + # Use SLEEP_LONG to trigger timeout + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + start_time = time.time() + + result = await evaluator.evaluate_program(program_code, "test_timeout_no_retry") + + elapsed_time = time.time() - start_time + + # Should timeout only once (~2 seconds), not retry multiple times + # If retries were happening, this would take ~6 seconds + self.assertGreater(elapsed_time, 1.8) # At least the timeout period + self.assertLess(elapsed_time, 3.5) # But not multiple timeout periods + + # Should return timeout result + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + asyncio.run(run_test()) + + def test_artifacts_on_timeout(self): + """Test that timeout artifacts are properly captured""" + + async def run_test(): + # Enable artifacts + with patch.dict(os.environ, {"ENABLE_ARTIFACTS": "true"}): + evaluator = self._create_evaluator(timeout=3) + program_code = "# SLEEP_LONG\ndef test(): return 'long'" + + # Execute evaluation + result = await evaluator.evaluate_program(program_code, "test_artifacts") + + # Verify timeout occurred + self.assertIn("timeout", result, "Result should contain timeout flag") + self.assertTrue(result["timeout"], "Timeout flag should be True") + + # Verify artifacts were captured + artifacts = evaluator.get_pending_artifacts("test_artifacts") + self.assertIsNotNone(artifacts, "Artifacts should not be None") + + # Verify required artifact fields + self.assertIn("failure_stage", artifacts, "Artifacts should contain failure_stage") + self.assertEqual( + artifacts["failure_stage"], "evaluation", "failure_stage should be 'evaluation'" + ) + + self.assertIn("timeout", artifacts, "Artifacts should contain timeout flag") + self.assertTrue(artifacts["timeout"], "Artifact timeout flag should be True") + + self.assertIn("error_type", artifacts, "Artifacts should contain error_type") + self.assertEqual( + artifacts["error_type"], "timeout", "error_type should be 'timeout'" + ) + + self.assertIn( + "timeout_duration", artifacts, "Artifacts should contain timeout_duration" + ) + self.assertEqual( + artifacts["timeout_duration"], 3, "timeout_duration should match config" + ) + + print(f"βœ… Artifacts captured correctly: {list(artifacts.keys())}") + + asyncio.run(run_test()) + + +class TestTimeoutIntegration(unittest.TestCase): + """Integration tests for timeout functionality""" + + def test_real_world_scenario(self): + """Test a scenario similar to the reported bug""" + + async def run_test(): + # Create a test evaluation file that simulates a long-running evaluation + test_eval_file = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) + + test_eval_file.write( + """ +import time + +def evaluate(program_path): + # Simulate a very long evaluation (like the 11-hour case) + time.sleep(6) # 6 seconds to test timeout (reduced for faster tests) + return {"accReturn": 0.1, "CalmarRatio": 0.9, "combined_score": 0.82} +""" + ) + test_eval_file.close() + + try: + # Configure like user's config but with shorter timeout for testing + config = EvaluatorConfig() + config.timeout = 3 # 3 seconds instead of 600 + config.max_retries = 1 + config.cascade_evaluation = False + config.parallel_evaluations = 1 + + evaluator = Evaluator( + config=config, + evaluation_file=test_eval_file.name, + llm_ensemble=None, + prompt_sampler=None, + ) + + program_code = """ +# Financial optimization algorithm +def search_algorithm(): + # This would normally run for hours + return {"report_type_factor_map": {}} +""" + + start_time = time.time() + result = await evaluator.evaluate_program(program_code, "financial_test") + elapsed_time = time.time() - start_time + + # Should timeout in ~3 seconds, not 6+ seconds + self.assertLess(elapsed_time, 5) + self.assertGreater(elapsed_time, 2.5) + + # Should return timeout error + self.assertIn("error", result) + self.assertIn("timeout", result) + self.assertTrue(result["timeout"]) + + finally: + if os.path.exists(test_eval_file.name): + os.unlink(test_eval_file.name) + + asyncio.run(run_test()) + + +if __name__ == "__main__": + # Run with verbose output to see test progress + unittest.main(verbosity=2)