|
6 | 6 | """ |
7 | 7 |
|
8 | 8 | import asyncio |
| 9 | +import concurrent.futures |
9 | 10 | import os |
10 | 11 | import time |
11 | 12 | from typing import Any, cast |
@@ -188,53 +189,44 @@ def execute_assessment_tasks_parallel( |
188 | 189 |
|
189 | 190 | start_time = time.time() |
190 | 191 |
|
191 | | - # Run async executor |
192 | | - # Use asyncio.run() for clean event loop management |
| 192 | + # Define the async coroutine to run |
| 193 | + async def _run() -> tuple[list[AssessmentResult], dict[str, Any]]: |
| 194 | + return await execute_tasks_async( |
| 195 | + tasks=tasks, |
| 196 | + extraction_results=extraction_results, |
| 197 | + page_images=page_images, |
| 198 | + sorted_page_ids=sorted_page_ids, |
| 199 | + model_id=model_id, |
| 200 | + system_prompt=system_prompt, |
| 201 | + temperature=temperature, |
| 202 | + max_tokens=max_tokens, |
| 203 | + document_schema=document_schema, |
| 204 | + max_concurrent=max_concurrent, |
| 205 | + max_retries=max_retries, |
| 206 | + connect_timeout=connect_timeout, |
| 207 | + read_timeout=read_timeout, |
| 208 | + ) |
| 209 | + |
| 210 | + # Check if there's already a running event loop |
| 211 | + # This is more robust than catching exceptions with string matching |
193 | 212 | try: |
194 | | - results, metering = asyncio.run( |
195 | | - execute_tasks_async( |
196 | | - tasks=tasks, |
197 | | - extraction_results=extraction_results, |
198 | | - page_images=page_images, |
199 | | - sorted_page_ids=sorted_page_ids, |
200 | | - model_id=model_id, |
201 | | - system_prompt=system_prompt, |
202 | | - temperature=temperature, |
203 | | - max_tokens=max_tokens, |
204 | | - document_schema=document_schema, |
205 | | - max_concurrent=max_concurrent, |
206 | | - max_retries=max_retries, |
207 | | - connect_timeout=connect_timeout, |
208 | | - read_timeout=read_timeout, |
209 | | - ) |
| 213 | + loop = asyncio.get_running_loop() |
| 214 | + except RuntimeError: |
| 215 | + loop = None |
| 216 | + |
| 217 | + if loop is not None and loop.is_running(): |
| 218 | + # We're inside an async context (e.g., Jupyter, nested async call) |
| 219 | + # Execute in a separate thread to avoid "cannot be called from a running event loop" |
| 220 | + logger.warning( |
| 221 | + "Event loop already running, executing in separate thread", |
| 222 | + extra={"loop": str(loop)}, |
210 | 223 | ) |
211 | | - except RuntimeError as e: |
212 | | - # Handle case where event loop already exists (shouldn't happen in Lambda) |
213 | | - if "There is no current event loop" in str(e) or "asyncio.run()" in str(e): |
214 | | - logger.warning( |
215 | | - "Event loop already exists, using get_event_loop", |
216 | | - extra={"error": str(e)}, |
217 | | - ) |
218 | | - loop = asyncio.get_event_loop() |
219 | | - results, metering = loop.run_until_complete( |
220 | | - execute_tasks_async( |
221 | | - tasks=tasks, |
222 | | - extraction_results=extraction_results, |
223 | | - page_images=page_images, |
224 | | - sorted_page_ids=sorted_page_ids, |
225 | | - model_id=model_id, |
226 | | - system_prompt=system_prompt, |
227 | | - temperature=temperature, |
228 | | - max_tokens=max_tokens, |
229 | | - document_schema=document_schema, |
230 | | - max_concurrent=max_concurrent, |
231 | | - max_retries=max_retries, |
232 | | - connect_timeout=connect_timeout, |
233 | | - read_timeout=read_timeout, |
234 | | - ) |
235 | | - ) |
236 | | - else: |
237 | | - raise |
| 224 | + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: |
| 225 | + future = executor.submit(asyncio.run, _run()) |
| 226 | + results, metering = future.result() |
| 227 | + else: |
| 228 | + # No running loop - safe to use asyncio.run() |
| 229 | + results, metering = asyncio.run(_run()) |
238 | 230 |
|
239 | 231 | duration = time.time() - start_time |
240 | 232 |
|
|
0 commit comments