diff --git a/utils/dist.py b/utils/dist.py index 869000865f3..5b9bb0b6fae 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -103,8 +103,6 @@ status_count = {} lock_retriever = threading.Lock() -dist_lock = threading.BoundedSemaphore(int(dist_conf.distributed.dist_threads)) -fetch_lock = threading.BoundedSemaphore(1) delete_enabled = False failed_clean_enabled = False @@ -551,65 +549,57 @@ def run(self): self.current_queue = {} self.current_two_queue = {} self.stop_dist = threading.Event() - self.threads = [] + + # Define the set of threads that should be running + thread_targets = [] if dist_conf.GCP.enabled and HAVE_GCP: - # autodiscovery is generic name so in case if we have AWS or Azure it should implement the logic inside - thread = threading.Thread(target=cloud.autodiscovery, name="autodiscovery", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) - - for _ in range(int(dist_conf.distributed.dist_threads)): - if dist_lock.acquire(blocking=False): - if NFS_FETCH: - thread = threading.Thread(target=self.fetch_latest_reports_nfs, name="fetch_latest_reports_nfs", args=()) - elif RESTAPI_FETCH: - thread = threading.Thread(target=self.fetch_latest_reports, name="fetch_latest_reports", args=()) - if RESTAPI_FETCH or NFS_FETCH: - thread.daemon = True - thread.start() - self.threads.append(thread) - - if fetch_lock.acquire(blocking=False): - thread = threading.Thread(target=self.fetcher, name="fetcher", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) - - # Delete the task and all its associated files. - # (It will still remain in the nodes" database, though.) + thread_targets.append((cloud.autodiscovery, "autodiscovery", ())) + + # Data fetchers + for i in range(int(dist_conf.distributed.dist_threads)): + if NFS_FETCH: + thread_targets.append((self.fetch_latest_reports_nfs, f"fetch_latest_reports_nfs_{i}", ())) + elif RESTAPI_FETCH: + thread_targets.append((self.fetch_latest_reports, f"fetch_latest_reports_{i}", ())) + + thread_targets.append((self.fetcher, "fetcher", ())) + if dist_conf.distributed.remove_task_on_worker or delete_enabled: - thread = threading.Thread(target=self.remove_from_worker, name="remove_from_worker", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.remove_from_worker, "remove_from_worker", ())) if dist_conf.distributed.failed_cleaner or failed_clean_enabled: - thread = threading.Thread(target=self.failed_cleaner, name="failed_to_clean", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.failed_cleaner, "failed_to_clean", ())) - thread = threading.Thread(target=self.free_space_mon, name="free_space_mon", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.free_space_mon, "free_space_mon", ())) if reporting_conf.callback.enabled: - thread = threading.Thread(target=self.notification_loop, name="notification_loop", args=()) - thread.daemon = True - thread.start() - self.threads.append(thread) + thread_targets.append((self.notification_loop, "notification_loop", ())) - # thread monitoring - for thr in self.threads: - try: - thr.join(timeout=0.0) - log.info("Thread: %s - Alive: %s", thr.name, str(thr.is_alive())) - except Exception as e: - log.exception(e) - time.sleep(60) + # Supervisor Loop + active_threads = {} # name -> thread_obj + + log.info("Retriever supervisor started. Monitoring %d threads.", len(thread_targets)) + + while not self.stop_dist.is_set(): + for target_func, name, args in thread_targets: + thread = active_threads.get(name) + + if thread is None or not thread.is_alive(): + if thread is not None: + log.critical("Thread %s died! Respawning...", name) + else: + log.info("Starting thread %s", name) + + new_thread = threading.Thread(target=target_func, name=name, args=args) + new_thread.daemon = True + new_thread.start() + active_threads[name] = new_thread + + # Periodic health check + time.sleep(600) + + log.info("Retriever supervisor stopping.") def free_space_mon(self): """ @@ -1219,7 +1209,6 @@ def remove_from_worker(self): node = nodes[node_id] if node and details[node_id]: ids = ",".join(list(set(details[node_id]))) - print(ids) _delete_many(node_id, ids, nodes, db) db.commit() @@ -1461,8 +1450,7 @@ def submit_tasks(self, node_name, pend_tasks_num, options_like=False, force_push """ # 4. Apply the limit and execute the query. to_upload = db.scalars(stmt.limit(pend_tasks_num)).all() - print(to_upload, node.name, pend_tasks_num) - + if not to_upload: db.commit() log.info("nothing to upload? How? o_O")