88__TASK_FORCE_RUN__ = 1
99
1010_LOGGER = logging .getLogger (__name__ )
11- _ASYNC_SLEEP_SECONDS = 0.3
1211
1312def _safe_run (func ):
1413 """
@@ -242,7 +241,7 @@ async def _execution_wrapper(self):
242241 self ._running = False
243242 return
244243 self ._running = True
245- msg = None
244+
246245 while self ._running :
247246 try :
248247 msg = self ._messages .get_nowait ()
@@ -278,16 +277,18 @@ async def _cleanup(self):
278277 _LOGGER .error ("An error occurred when executing the task's OnStop hook. " )
279278
280279 self ._running = False
280+ self ._completion_event .set ()
281281
282282 def start (self ):
283283 """Start the async task."""
284284 if self ._running :
285285 _LOGGER .warning ("Task is already running. Ignoring .start() call" )
286286 return
287287 # Start execution
288+ self ._completion_event = asyncio .Event ()
288289 self ._task = asyncio .get_running_loop ().create_task (self ._execution_wrapper ())
289290
290- async def stop (self , event = None ):
291+ async def stop (self , wait_for_completion = False ):
291292 """
292293 Send a signal to the thread in order to stop it. If the task is not running do nothing.
293294
@@ -301,8 +302,9 @@ async def stop(self, event=None):
301302
302303 # Queue is of infinite size, should not raise an exception
303304 self ._messages .put_nowait (__TASK_STOP__ )
304- while not self ._task .done ():
305- await asyncio .sleep (_ASYNC_SLEEP_SECONDS )
305+
306+ if wait_for_completion :
307+ await self ._completion_event .wait ()
306308
307309 def force_execution (self ):
308310 """Force an execution of the task without waiting for the period to end."""
0 commit comments