Skip to content
106 changes: 68 additions & 38 deletions qtapputils/managers/taskmanagers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class TaskManagerBase(QObject):
"""
sig_run_tasks_started = Signal()
sig_run_tasks_finished = Signal()
sig_run_tasks = Signal()

def __init__(self, verbose: bool = False):
super().__init__()
Expand Down Expand Up @@ -104,6 +103,16 @@ def __init__(self, verbose: bool = False):
def is_running(self):
return len(self._running_tasks + self._pending_tasks) > 0

def wait(self):
"""
Waits for completion of all running and pending tasks, and for the
worker thread to fully exit its event loop.

Note: This does not block the main GUI event loop, allowing the UI to
remain responsive while waiting.
"""
qtwait(lambda: not self.is_running and not self._thread.isRunning())

def run_tasks(
self, callback: Callable = None, returned_values: tuple = None):
"""
Expand Down Expand Up @@ -133,15 +142,12 @@ def worker(self) -> WorkerBase:
def set_worker(self, worker: WorkerBase):
""""Install the provided worker on this manager"""
self._thread = QThread()

self._worker = worker

self._worker.moveToThread(self._thread)
self._worker.sig_task_completed.connect(
self._handle_task_completed, Qt.QueuedConnection)
self.sig_run_tasks.connect(
self._worker.run_tasks, Qt.QueuedConnection)
self._thread.started.connect(self._worker.run_tasks)

self._thread.start()
self._worker.sig_task_completed.connect(self._handle_task_completed)

# ---- Private API
def _handle_task_completed(
Expand All @@ -152,25 +158,38 @@ def _handle_task_completed(
This is the ONLY slot that should be called after a task is
completed by the worker.
"""
# Run the callback associated with the specified task UUID if any.
if self._task_callbacks[task_uuid4] is not None:
# Execute the callback associated with this task (if one exists).
callback = self._task_callbacks.get(task_uuid4)
if callback is not None:
try:
self._task_callbacks[task_uuid4](*returned_values)
callback(*returned_values)
except TypeError:
# This means there is 'returned_values' is None.
self._task_callbacks[task_uuid4]()
callback()

# Clean up completed task.
# Remove references to the completed task from internal structures.
self._cleanup_task(task_uuid4)

# Execute pending tasks if worker is idle.
if len(self._running_tasks) == 0:
if len(self._pending_tasks) > 0:
self._run_pending_tasks()
else:
if self.verbose:
print('All pending tasks were executed.')
self.sig_run_tasks_finished.emit()
# If there are still running tasks, do not proceed further.
if len(self._running_tasks) > 0:
return

# We quit the thread here to ensure all resources are cleaned up
# and to prevent issues with lingering events or stale object
# references. This makes the worker lifecycle simpler and more robust,
# especially in PyQt/PySide, and avoids subtle bugs that can arise
# from reusing threads across multiple batches.
self._thread.quit()

# If there are pending tasks, begin processing them.
if len(self._pending_tasks) > 0:
self._run_pending_tasks()
else:
# No pending tasks remain; notify listeners that
# all tasks are finished.
if self.verbose:
print('All pending tasks were executed.')
self.sig_run_tasks_finished.emit()

def _cleanup_task(self, task_uuid4: uuid.UUID):
"""Cleanup task associated with the specified UUID."""
Expand Down Expand Up @@ -198,24 +217,35 @@ def _run_tasks(self):

def _run_pending_tasks(self):
"""Execute all pending tasks."""
if len(self._running_tasks) == 0 and len(self._pending_tasks) > 0:
if self.verbose:
print('Executing {} pending tasks...'.format(
len(self._pending_tasks)))

self._running_tasks = self._pending_tasks.copy()
self._pending_tasks = []
for task_uuid4 in self._running_tasks:
task, args, kargs = self._task_data[task_uuid4]
self._worker.add_task(task_uuid4, task, *args, **kargs)

self.sig_run_tasks.emit()

def close(self):
if hasattr(self, "_thread"):
qtwait(lambda: not self.is_running)
self._thread.quit()
self._thread.wait()
# If the worker is currently processing tasks, defer execution of
# pending tasks.
if len(self._running_tasks) > 0:
return

# If there are no pending tasks, nothing to do.
if len(self._pending_tasks) == 0:
return

if self.verbose:
print(f'Executing {len(self._pending_tasks)} pending tasks...')

# Ensure the thread is not running before starting new tasks.
# This prevents starting a thread that is already active, which can
# cause errors.
if self._thread.isRunning():
qtwait(lambda: not self._thread.isRunning())

# Move all pending tasks to the running tasks queue.
self._running_tasks = self._pending_tasks.copy()
self._pending_tasks = []

# Add each running task to the worker's queue.
for task_uuid4 in self._running_tasks:
task, args, kargs = self._task_data[task_uuid4]
self._worker.add_task(task_uuid4, task, *args, **kargs)

# Start the thread so the worker can process the tasks.
self._thread.start()


class LIFOTaskManager(TaskManagerBase):
Expand Down
6 changes: 4 additions & 2 deletions qtapputils/managers/tests/test_taskmanagers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def task_manager(worker, qtbot):
task_manager.set_worker(worker)
yield task_manager

task_manager.close()
task_manager.wait()

assert not task_manager.is_running
assert not task_manager._thread.isRunning()

Expand All @@ -63,7 +64,8 @@ def lifo_task_manager(worker, qtbot):
task_manager.set_worker(worker)
yield task_manager

task_manager.close()
task_manager.wait()

assert not task_manager.is_running
assert not task_manager._thread.isRunning()

Expand Down
Loading