diff --git a/qtapputils/managers/taskmanagers.py b/qtapputils/managers/taskmanagers.py index 82b2d4f..661821e 100644 --- a/qtapputils/managers/taskmanagers.py +++ b/qtapputils/managers/taskmanagers.py @@ -75,7 +75,6 @@ class TaskManagerBase(QObject): """ sig_run_tasks_started = Signal() sig_run_tasks_finished = Signal() - sig_run_tasks = Signal() def __init__(self, verbose: bool = False): super().__init__() @@ -104,6 +103,16 @@ def __init__(self, verbose: bool = False): def is_running(self): return len(self._running_tasks + self._pending_tasks) > 0 + def wait(self): + """ + Waits for completion of all running and pending tasks, and for the + worker thread to fully exit its event loop. + + Note: This does not block the main GUI event loop, allowing the UI to + remain responsive while waiting. + """ + qtwait(lambda: not self.is_running and not self._thread.isRunning()) + def run_tasks( self, callback: Callable = None, returned_values: tuple = None): """ @@ -133,15 +142,12 @@ def worker(self) -> WorkerBase: def set_worker(self, worker: WorkerBase): """"Install the provided worker on this manager""" self._thread = QThread() - self._worker = worker + self._worker.moveToThread(self._thread) - self._worker.sig_task_completed.connect( - self._handle_task_completed, Qt.QueuedConnection) - self.sig_run_tasks.connect( - self._worker.run_tasks, Qt.QueuedConnection) + self._thread.started.connect(self._worker.run_tasks) - self._thread.start() + self._worker.sig_task_completed.connect(self._handle_task_completed) # ---- Private API def _handle_task_completed( @@ -152,25 +158,38 @@ def _handle_task_completed( This is the ONLY slot that should be called after a task is completed by the worker. """ - # Run the callback associated with the specified task UUID if any. - if self._task_callbacks[task_uuid4] is not None: + # Execute the callback associated with this task (if one exists). + callback = self._task_callbacks.get(task_uuid4) + if callback is not None: try: - self._task_callbacks[task_uuid4](*returned_values) + callback(*returned_values) except TypeError: # This means there is 'returned_values' is None. - self._task_callbacks[task_uuid4]() + callback() - # Clean up completed task. + # Remove references to the completed task from internal structures. self._cleanup_task(task_uuid4) - # Execute pending tasks if worker is idle. - if len(self._running_tasks) == 0: - if len(self._pending_tasks) > 0: - self._run_pending_tasks() - else: - if self.verbose: - print('All pending tasks were executed.') - self.sig_run_tasks_finished.emit() + # If there are still running tasks, do not proceed further. + if len(self._running_tasks) > 0: + return + + # We quit the thread here to ensure all resources are cleaned up + # and to prevent issues with lingering events or stale object + # references. This makes the worker lifecycle simpler and more robust, + # especially in PyQt/PySide, and avoids subtle bugs that can arise + # from reusing threads across multiple batches. + self._thread.quit() + + # If there are pending tasks, begin processing them. + if len(self._pending_tasks) > 0: + self._run_pending_tasks() + else: + # No pending tasks remain; notify listeners that + # all tasks are finished. + if self.verbose: + print('All pending tasks were executed.') + self.sig_run_tasks_finished.emit() def _cleanup_task(self, task_uuid4: uuid.UUID): """Cleanup task associated with the specified UUID.""" @@ -198,24 +217,35 @@ def _run_tasks(self): def _run_pending_tasks(self): """Execute all pending tasks.""" - if len(self._running_tasks) == 0 and len(self._pending_tasks) > 0: - if self.verbose: - print('Executing {} pending tasks...'.format( - len(self._pending_tasks))) - - self._running_tasks = self._pending_tasks.copy() - self._pending_tasks = [] - for task_uuid4 in self._running_tasks: - task, args, kargs = self._task_data[task_uuid4] - self._worker.add_task(task_uuid4, task, *args, **kargs) - - self.sig_run_tasks.emit() - - def close(self): - if hasattr(self, "_thread"): - qtwait(lambda: not self.is_running) - self._thread.quit() - self._thread.wait() + # If the worker is currently processing tasks, defer execution of + # pending tasks. + if len(self._running_tasks) > 0: + return + + # If there are no pending tasks, nothing to do. + if len(self._pending_tasks) == 0: + return + + if self.verbose: + print(f'Executing {len(self._pending_tasks)} pending tasks...') + + # Ensure the thread is not running before starting new tasks. + # This prevents starting a thread that is already active, which can + # cause errors. + if self._thread.isRunning(): + qtwait(lambda: not self._thread.isRunning()) + + # Move all pending tasks to the running tasks queue. + self._running_tasks = self._pending_tasks.copy() + self._pending_tasks = [] + + # Add each running task to the worker's queue. + for task_uuid4 in self._running_tasks: + task, args, kargs = self._task_data[task_uuid4] + self._worker.add_task(task_uuid4, task, *args, **kargs) + + # Start the thread so the worker can process the tasks. + self._thread.start() class LIFOTaskManager(TaskManagerBase): diff --git a/qtapputils/managers/tests/test_taskmanagers.py b/qtapputils/managers/tests/test_taskmanagers.py index 85936b9..3d9ee7b 100644 --- a/qtapputils/managers/tests/test_taskmanagers.py +++ b/qtapputils/managers/tests/test_taskmanagers.py @@ -52,7 +52,8 @@ def task_manager(worker, qtbot): task_manager.set_worker(worker) yield task_manager - task_manager.close() + task_manager.wait() + assert not task_manager.is_running assert not task_manager._thread.isRunning() @@ -63,7 +64,8 @@ def lifo_task_manager(worker, qtbot): task_manager.set_worker(worker) yield task_manager - task_manager.close() + task_manager.wait() + assert not task_manager.is_running assert not task_manager._thread.isRunning()