Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 43 additions & 55 deletions utils/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@
status_count = {}

lock_retriever = threading.Lock()
dist_lock = threading.BoundedSemaphore(int(dist_conf.distributed.dist_threads))
fetch_lock = threading.BoundedSemaphore(1)

delete_enabled = False
failed_clean_enabled = False
Expand Down Expand Up @@ -551,65 +549,57 @@
self.current_queue = {}
self.current_two_queue = {}
self.stop_dist = threading.Event()
self.threads = []

# Define the set of threads that should be running
thread_targets = []

if dist_conf.GCP.enabled and HAVE_GCP:
# autodiscovery is generic name so in case if we have AWS or Azure it should implement the logic inside
thread = threading.Thread(target=cloud.autodiscovery, name="autodiscovery", args=())
thread.daemon = True
thread.start()
self.threads.append(thread)

for _ in range(int(dist_conf.distributed.dist_threads)):
if dist_lock.acquire(blocking=False):
if NFS_FETCH:
thread = threading.Thread(target=self.fetch_latest_reports_nfs, name="fetch_latest_reports_nfs", args=())
elif RESTAPI_FETCH:
thread = threading.Thread(target=self.fetch_latest_reports, name="fetch_latest_reports", args=())
if RESTAPI_FETCH or NFS_FETCH:
thread.daemon = True
thread.start()
self.threads.append(thread)

if fetch_lock.acquire(blocking=False):
thread = threading.Thread(target=self.fetcher, name="fetcher", args=())
thread.daemon = True
thread.start()
self.threads.append(thread)

# Delete the task and all its associated files.
# (It will still remain in the nodes" database, though.)
thread_targets.append((cloud.autodiscovery, "autodiscovery", ()))

# Data fetchers
for i in range(int(dist_conf.distributed.dist_threads)):
if NFS_FETCH:
thread_targets.append((self.fetch_latest_reports_nfs, f"fetch_latest_reports_nfs_{i}", ()))
elif RESTAPI_FETCH:
thread_targets.append((self.fetch_latest_reports, f"fetch_latest_reports_{i}", ()))

thread_targets.append((self.fetcher, "fetcher", ()))

if dist_conf.distributed.remove_task_on_worker or delete_enabled:
thread = threading.Thread(target=self.remove_from_worker, name="remove_from_worker", args=())
thread.daemon = True
thread.start()
self.threads.append(thread)
thread_targets.append((self.remove_from_worker, "remove_from_worker", ()))

if dist_conf.distributed.failed_cleaner or failed_clean_enabled:
thread = threading.Thread(target=self.failed_cleaner, name="failed_to_clean", args=())
thread.daemon = True
thread.start()
self.threads.append(thread)
thread_targets.append((self.failed_cleaner, "failed_to_clean", ()))

thread = threading.Thread(target=self.free_space_mon, name="free_space_mon", args=())
thread.daemon = True
thread.start()
self.threads.append(thread)
thread_targets.append((self.free_space_mon, "free_space_mon", ()))

if reporting_conf.callback.enabled:
thread = threading.Thread(target=self.notification_loop, name="notification_loop", args=())
thread.daemon = True
thread.start()
self.threads.append(thread)
thread_targets.append((self.notification_loop, "notification_loop", ()))

# thread monitoring
for thr in self.threads:
try:
thr.join(timeout=0.0)
log.info("Thread: %s - Alive: %s", thr.name, str(thr.is_alive()))
except Exception as e:
log.exception(e)
time.sleep(60)
# Supervisor Loop
active_threads = {} # name -> thread_obj

log.info("Retriever supervisor started. Monitoring %d threads.", len(thread_targets))

while not self.stop_dist.is_set():
for target_func, name, args in thread_targets:
thread = active_threads.get(name)

if thread is None or not thread.is_alive():
if thread is not None:
log.critical("Thread %s died! Respawning...", name)
else:
log.info("Starting thread %s", name)

new_thread = threading.Thread(target=target_func, name=name, args=args)
new_thread.daemon = True
new_thread.start()
active_threads[name] = new_thread

# Periodic health check
time.sleep(600)

log.info("Retriever supervisor stopping.")

def free_space_mon(self):
"""
Expand Down Expand Up @@ -1219,7 +1209,6 @@
node = nodes[node_id]
if node and details[node_id]:
ids = ",".join(list(set(details[node_id])))
print(ids)
_delete_many(node_id, ids, nodes, db)

db.commit()
Expand Down Expand Up @@ -1461,8 +1450,7 @@
"""
# 4. Apply the limit and execute the query.
to_upload = db.scalars(stmt.limit(pend_tasks_num)).all()
print(to_upload, node.name, pend_tasks_num)


Check failure on line 1453 in utils/dist.py

View workflow job for this annotation

GitHub Actions / test (3.10)

Ruff (W293)

utils/dist.py:1453:1: W293 Blank line contains whitespace

Check failure on line 1453 in utils/dist.py

View workflow job for this annotation

GitHub Actions / test (3.10)

Ruff (W293)

utils/dist.py:1453:1: W293 Blank line contains whitespace
if not to_upload:
db.commit()
log.info("nothing to upload? How? o_O")
Expand Down
Loading