diff --git a/CHANGES/7315.bugfix b/CHANGES/7315.bugfix new file mode 100644 index 0000000000..2c76b9c985 --- /dev/null +++ b/CHANGES/7315.bugfix @@ -0,0 +1 @@ +Fixed bug where API process would miss heartbeats during long uploads. diff --git a/pulpcore/app/entrypoint.py b/pulpcore/app/entrypoint.py index 9f7f1d43a2..2b3ca86fc8 100644 --- a/pulpcore/app/entrypoint.py +++ b/pulpcore/app/entrypoint.py @@ -2,6 +2,8 @@ from logging import getLogger import os import sys +import threading +import time import click import django @@ -22,9 +24,40 @@ class PulpApiWorker(SyncWorker): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.heartbeat_thread = None + + def _heartbeat_loop(self): + """Run heartbeat in a loop. Exit process if heartbeat fails.""" + try: + while self.alive: + try: + self.heartbeat() + except Exception as e: + # Any unhandled exception in heartbeat should restart the worker + logger.error( + f"Heartbeat thread encountered fatal error for worker {self.name}: {e}" + ) + # Exit with error code to trigger worker restart + os._exit(Arbiter.WORKER_BOOT_ERROR) + + time.sleep(self.heartbeat_interval) + except Exception as e: + # Safety net: if even the loop itself fails, restart + logger.error(f"Heartbeat thread loop failed for worker {self.name}: {e}") + os._exit(Arbiter.WORKER_BOOT_ERROR) + def notify(self): super().notify() - self.heartbeat() + + # Check if heartbeat thread is still alive + if not self.heartbeat_thread.is_alive(): + logger.error( + f"Heartbeat thread died unexpectedly for worker {self.name}. " + "Exiting to trigger restart." + ) + os._exit(Arbiter.WORKER_BOOT_ERROR) def heartbeat(self): try: @@ -37,7 +70,8 @@ def heartbeat(self): logger.debug(self.beat_msg) except (InterfaceError, DatabaseError) as e: logger.error(f"{self.fail_beat_msg} Exception: {str(e)}") - exit(Arbiter.WORKER_BOOT_ERROR) + # This will be caught by _heartbeat_loop and trigger os._exit + raise def init_process(self): os.environ.setdefault("DJANGO_SETTINGS_MODULE", "pulpcore.app.settings") @@ -76,6 +110,15 @@ def init_process(self): logger.error(f"An API app with name {self.name} already exists in the database.") exit(Arbiter.WORKER_BOOT_ERROR) + # Store heartbeat interval for the heartbeat thread + self.heartbeat_interval = settings.API_APP_TTL // 3 + + # Start heartbeat thread before entering the main loop + self.heartbeat_thread = threading.Thread( + target=self._heartbeat_loop, daemon=True, name=f"heartbeat-{self.name}" + ) + self.heartbeat_thread.start() + super().init_process() def run(self):