From 8d58e59bb9b6525727d44714e20a6ff1233fd0d3 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Mon, 26 Jan 2026 08:55:28 +0100 Subject: [PATCH 1/4] Refactor thread management for distributed tasks (#2866) --- utils/dist.py | 79 ++++++++++++++++++++++++--------------------------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/utils/dist.py b/utils/dist.py index 869000865f3..b2812eb1c63 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -553,63 +553,58 @@ def run(self): self.stop_dist = threading.Event() self.threads = [] + # Define the set of threads that should be running + thread_targets = [] + if dist_conf.GCP.enabled and HAVE_GCP: - # autodiscovery is generic name so in case if we have AWS or Azure it should implement the logic inside - thread = threading.Thread(target=cloud.autodiscovery, name="autodiscovery", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((cloud.autodiscovery, "autodiscovery", ())) - for _ in range(int(dist_conf.distributed.dist_threads)): + # Data fetchers + for i in range(int(dist_conf.distributed.dist_threads)): if dist_lock.acquire(blocking=False): if NFS_FETCH: - thread = threading.Thread(target=self.fetch_latest_reports_nfs, name="fetch_latest_reports_nfs", args=()) + thread_targets.append((self.fetch_latest_reports_nfs, f"fetch_latest_reports_nfs_{i}", ())) elif RESTAPI_FETCH: - thread = threading.Thread(target=self.fetch_latest_reports, name="fetch_latest_reports", args=()) - if RESTAPI_FETCH or NFS_FETCH: - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.fetch_latest_reports, f"fetch_latest_reports_{i}", ())) if fetch_lock.acquire(blocking=False): - thread = threading.Thread(target=self.fetcher, name="fetcher", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.fetcher, "fetcher", ())) - # Delete the task and all its associated files. - # (It will still remain in the nodes" database, though.) if dist_conf.distributed.remove_task_on_worker or delete_enabled: - thread = threading.Thread(target=self.remove_from_worker, name="remove_from_worker", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.remove_from_worker, "remove_from_worker", ())) if dist_conf.distributed.failed_cleaner or failed_clean_enabled: - thread = threading.Thread(target=self.failed_cleaner, name="failed_to_clean", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.failed_cleaner, "failed_to_clean", ())) - thread = threading.Thread(target=self.free_space_mon, name="free_space_mon", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.free_space_mon, "free_space_mon", ())) if reporting_conf.callback.enabled: - thread = threading.Thread(target=self.notification_loop, name="notification_loop", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.notification_loop, "notification_loop", ())) - # thread monitoring - for thr in self.threads: - try: - thr.join(timeout=0.0) - log.info("Thread: %s - Alive: %s", thr.name, str(thr.is_alive())) - except Exception as e: - log.exception(e) - time.sleep(60) + # Supervisor Loop + active_threads = {} # name -> thread_obj + + log.info("Retriever supervisor started. Monitoring %d threads.", len(thread_targets)) + + while not self.stop_dist.is_set(): + for target_func, name, args in thread_targets: + thread = active_threads.get(name) + + if thread is None or not thread.is_alive(): + if thread is not None: + log.critical("Thread %s died! Respawning...", name) + else: + log.info("Starting thread %s", name) + + new_thread = threading.Thread(target=target_func, name=name, args=args) + new_thread.daemon = True + new_thread.start() + active_threads[name] = new_thread + + # Periodic health check + time.sleep(600) + + log.info("Retriever supervisor stopping.") def free_space_mon(self): """ From 7a957884fde7106462dfa412e3170a93dff943d9 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Mon, 26 Jan 2026 09:16:12 +0100 Subject: [PATCH 2/4] Update dist.py The dist_lock and fetch_lock were essentially "permission slips" to start threads. Their purpose was to limit concurrency: 1. `dist_lock`: Was a Semaphore initialized with dist_threads (e.g., 4). It allowed exactly 4 report-downloading threads to start. 2. `fetch_lock`: Was a Semaphore initialized with 1. It allowed exactly 1 task-fetching thread to start. Why they caused problems (and why we removed them) In the original code, they acted as a trap: 1. Acquisition: When the script started, the main thread grabbed these "permission slips" (acquired the locks) to start the worker threads. 2. The Crash: If a worker thread crashed (threw an exception and died), it did not give the permission slip back (it didn't release the lock). 3. The Deadlock: If you tried to add restart logic that checked these locks, it would see "0 permissions available" and refuse to start a replacement thread. The script would think "I already have 4 threads running" when in reality it had 0 running and 4 lost locks. The New Solution (Supervisor) We replaced these locks with a Supervisor Loop. Instead of relying on a counter (lock) that can get out of sync, the new code checks the actual reality: * "Do I have a thread named 'fetcher' running?" * If No: Start one immediately. This is much more robust because it self-corrects based on the actual system state, not a variable that might be wrong. You no longer need the locks because the Supervisor logic explicitly strictly enforcing "1 fetcher, N downloaders". --- utils/dist.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/utils/dist.py b/utils/dist.py index b2812eb1c63..07ce84e8526 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -103,8 +103,6 @@ status_count = {} lock_retriever = threading.Lock() -dist_lock = threading.BoundedSemaphore(int(dist_conf.distributed.dist_threads)) -fetch_lock = threading.BoundedSemaphore(1) delete_enabled = False failed_clean_enabled = False @@ -561,14 +559,12 @@ def run(self): # Data fetchers for i in range(int(dist_conf.distributed.dist_threads)): - if dist_lock.acquire(blocking=False): - if NFS_FETCH: - thread_targets.append((self.fetch_latest_reports_nfs, f"fetch_latest_reports_nfs_{i}", ())) - elif RESTAPI_FETCH: - thread_targets.append((self.fetch_latest_reports, f"fetch_latest_reports_{i}", ())) - - if fetch_lock.acquire(blocking=False): - thread_targets.append((self.fetcher, "fetcher", ())) + if NFS_FETCH: + thread_targets.append((self.fetch_latest_reports_nfs, f"fetch_latest_reports_nfs_{i}", ())) + elif RESTAPI_FETCH: + thread_targets.append((self.fetch_latest_reports, f"fetch_latest_reports_{i}", ())) + + thread_targets.append((self.fetcher, "fetcher", ())) if dist_conf.distributed.remove_task_on_worker or delete_enabled: thread_targets.append((self.remove_from_worker, "remove_from_worker", ())) From 99f1dc8d82c56cdfa018d9ef56b74cc6aa7c6f23 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Mon, 26 Jan 2026 09:16:41 +0100 Subject: [PATCH 3/4] Remove unused threads list from dist.py Remove unused threads list initialization. --- utils/dist.py | 1 - 1 file changed, 1 deletion(-) diff --git a/utils/dist.py b/utils/dist.py index 07ce84e8526..0e2aaf33445 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -549,7 +549,6 @@ def run(self): self.current_queue = {} self.current_two_queue = {} self.stop_dist = threading.Event() - self.threads = [] # Define the set of threads that should be running thread_targets = [] From 388ea13006311346fb0bb0861d7fc4908ce174a8 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Mon, 26 Jan 2026 09:17:53 +0100 Subject: [PATCH 4/4] Remove debug print statements from dist.py Removed print statements for ids and to_upload in dist.py. --- utils/dist.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/utils/dist.py b/utils/dist.py index 0e2aaf33445..5b9bb0b6fae 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -1209,7 +1209,6 @@ def remove_from_worker(self): node = nodes[node_id] if node and details[node_id]: ids = ",".join(list(set(details[node_id]))) - print(ids) _delete_many(node_id, ids, nodes, db) db.commit() @@ -1451,8 +1450,7 @@ def submit_tasks(self, node_name, pend_tasks_num, options_like=False, force_push """ # 4. Apply the limit and execute the query. to_upload = db.scalars(stmt.limit(pend_tasks_num)).all() - print(to_upload, node.name, pend_tasks_num) - + if not to_upload: db.commit() log.info("nothing to upload? How? o_O")