@@ -46,7 +46,7 @@ class AutoscaledPool:
4646 _AUTOSCALE_INTERVAL = timedelta (seconds = 10 )
4747 """Interval at which the autoscaled pool adjusts the desired concurrency based on the latest system status."""
4848
49- _LOGGING_INTERVAL = timedelta (seconds = 0.5 )
49+ _LOGGING_INTERVAL = timedelta (minutes = 1 )
5050 """Interval at which the autoscaled pool logs its current state."""
5151
5252 _DESIRED_CONCURRENCY_RATIO = 0.9
@@ -61,9 +61,6 @@ class AutoscaledPool:
6161 _TASK_TIMEOUT : timedelta | None = None
6262 """Timeout within which the `run_task_function` must complete."""
6363
64- _OVERLOADED_BACKOFF_TIME : timedelta = timedelta (seconds = 1 )
65- """When overloaded, Autoscaled pool waits this long before rechecking system status."""
66-
6764 def __init__ (
6865 self ,
6966 * ,
@@ -126,33 +123,19 @@ async def run(self) -> None:
126123 )
127124
128125 try :
129- logger .info (f'Await result' )
130126 await run .result
131- logger .info (f'Finished naturally, { run .worker_tasks = } , { run .result .result ()= } ' )
132127 except AbortError :
133- logger .info ('AbortError' )
134128 orchestrator .cancel ()
135129 for task in run .worker_tasks :
136130 if not task .done ():
137131 task .cancel ()
138- except Exception as exc :
139- logger .error ('Something sinister happened' , exc_info = exc )
140- raise
141- except BaseException as exc :
142- logger .error ('BaseException happened' , exc_info = exc )
143- raise
144-
145132 finally :
146- logger .error ('finally' )
147133 with suppress (asyncio .CancelledError ):
148- logger .error ('self._autoscale_task.stop()' )
149134 await self ._autoscale_task .stop ()
150135 with suppress (asyncio .CancelledError ):
151- logger .error ('await self._log_system_status_task.stop()' )
152136 await self ._log_system_status_task .stop ()
153137
154138 if not orchestrator .done ():
155- logger .error ('not orchestrator.done()' )
156139 orchestrator .cancel ()
157140 elif not orchestrator .cancelled () and orchestrator .exception () is not None :
158141 logger .error ('Exception in worker task orchestrator' , exc_info = orchestrator .exception ())
@@ -233,59 +216,48 @@ async def _worker_task_orchestrator(self, run: _AutoscaledPoolRun) -> None:
233216 Exits when `is_finished_function` returns True.
234217 """
235218 finished = False
236- logger . info ( '_worker_task_orchestrator' )
219+
237220 try :
238221 while not (finished := await self ._is_finished_function ()) and not run .result .done ():
239222 run .worker_tasks_updated .clear ()
240223
241224 current_status = self ._system_status .get_current_system_info ()
242225 if not current_status .is_system_idle :
243- logger .info (f'Not scheduling new tasks - system is overloaded: { current_status } ' )
244- await asyncio .sleep (self ._OVERLOADED_BACKOFF_TIME .total_seconds ())
245- logger .info ('Release the overloaded backoff' )
226+ logger .debug ('Not scheduling new tasks - system is overloaded' )
246227 elif self ._is_paused :
247- logger .info ('Not scheduling new tasks - the autoscaled pool is paused' )
228+ logger .debug ('Not scheduling new tasks - the autoscaled pool is paused' )
248229 elif self .current_concurrency >= self .desired_concurrency :
249- logger .info ('Not scheduling new tasks - already running at desired concurrency' )
230+ logger .debug ('Not scheduling new tasks - already running at desired concurrency' )
250231 elif not await self ._is_task_ready_function ():
251232 logger .debug ('Not scheduling new task - no task is ready' )
252233 else :
253- logger .info ('Scheduling a new task' )
234+ logger .debug ('Scheduling a new task' )
254235 worker_task = asyncio .create_task (self ._worker_task (), name = 'autoscaled pool worker task' )
255236 worker_task .add_done_callback (lambda task : self ._reap_worker_task (task , run ))
256237 run .worker_tasks .append (worker_task )
257238
258239 if math .isfinite (self ._max_tasks_per_minute ):
259- logger .info ('Deadlock sleep????' )
260240 await asyncio .sleep (60 / self ._max_tasks_per_minute )
261241
262242 continue
263243
264244 with suppress (asyncio .TimeoutError ):
265245 await asyncio .wait_for (run .worker_tasks_updated .wait (), timeout = 0.5 )
266-
267- logger .info ("Just finishing" )
268-
269- except BaseException as e :
270- logger .error ('What is hiding here?' , exc_info = e )
271- raise
272-
273246 finally :
274- logger .info (f'Finally pool. { finished = } , { (run .result .done ())= } , { (run .result .result () if run .result .done () else None )= } ' )
275247 if finished :
276- logger .info ('`is_finished_function` reports that we are finished' )
248+ logger .debug ('`is_finished_function` reports that we are finished' )
277249 elif run .result .done () and run .result .exception () is not None :
278- logger .info ('Unhandled exception in `run_task_function`' )
250+ logger .debug ('Unhandled exception in `run_task_function`' )
279251
280252 if run .worker_tasks :
281- logger .info ('Terminating - waiting for tasks to complete' )
253+ logger .debug ('Terminating - waiting for tasks to complete' )
282254 await asyncio .wait (run .worker_tasks , return_when = asyncio .ALL_COMPLETED )
283- logger .info ('Worker tasks finished' )
255+ logger .debug ('Worker tasks finished' )
284256 else :
285- logger .info ('Terminating - no running tasks to wait for' )
257+ logger .debug ('Terminating - no running tasks to wait for' )
286258
287259 if not run .result .done ():
288- run .result .set_result ("Hello" )
260+ run .result .set_result (object () )
289261
290262 def _reap_worker_task (self , task : asyncio .Task , run : _AutoscaledPoolRun ) -> None :
291263 """Handle cleanup and tracking of a completed worker task.
0 commit comments