Skip to content

Commit f0ad48d

Browse files
authored
[RAPTOR-14353] Add nim watchdog (#1632)
* add watchdog * 5 attempts * watchdog_additional * fixed version * add env USE_NIM_WATCHDOG * add env USE_NIM_WATCHDOG * fixed lint * fixed version * rever docker changes * add changes * removed changelogs * max_attempts = 3 * replaced with os.kill(pid, signal.SIGTERM) * renamed * fixed comments * add NIM_WATCHDOG_REQUEST_TIMEOUT and NIM_WATCHDOG_MAX_ATTEMPTS * add logs * fix lint
1 parent 2f28c4c commit f0ad48d

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

custom_model_runner/datarobot_drum/drum/root_predictors/prediction_server.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
import logging
88
import os
99
import sys
10+
import time
1011
from pathlib import Path
12+
from threading import Thread
13+
import subprocess
14+
import signal
1115

1216
import requests
1317
from flask import Response, jsonify, request
@@ -26,6 +30,7 @@
2630
ModelInfoKeys,
2731
RunLanguage,
2832
TargetType,
33+
URL_PREFIX_ENV_VAR_NAME,
2934
)
3035
from datarobot_drum.drum.exceptions import DrumCommonException
3136
from datarobot_drum.drum.model_metadata import read_model_metadata_yaml
@@ -81,6 +86,7 @@ def __init__(self, params: dict):
8186
"run_predictor_total", "finish", StatsOperation.SUB, "start"
8287
)
8388
self._predictor = self._setup_predictor()
89+
self._server_watchdog = None
8490

8591
def _setup_predictor(self):
8692
if self._run_language == RunLanguage.PYTHON:
@@ -322,6 +328,18 @@ def _run_flask_app(self, app):
322328
processes = self._params.get("processes")
323329
logger.info("Number of webserver processes: %s", processes)
324330
try:
331+
if RuntimeParameters.has("USE_NIM_WATCHDOG") and str(
332+
RuntimeParameters.get("USE_NIM_WATCHDOG")
333+
).lower() in ["true", "1", "yes"]:
334+
# Start the watchdog thread before running the app
335+
self._server_watchdog = Thread(
336+
target=self.watchdog,
337+
args=(port,),
338+
daemon=True,
339+
name="NIM Sidecar Watchdog",
340+
)
341+
self._server_watchdog.start()
342+
325343
# Configure the server with timeout settings
326344
app.run(
327345
host=host,
@@ -337,6 +355,98 @@ def _run_flask_app(self, app):
337355
except OSError as e:
338356
raise DrumCommonException("{}: host: {}; port: {}".format(e, host, port))
339357

358+
def _kill_all_processes(self):
359+
"""
360+
Forcefully terminates all running processes related to the server.
361+
Attempts a clean termination first, then uses system commands to kill remaining processes.
362+
Logs errors encountered during termination.
363+
"""
364+
365+
logger.error("All health check attempts failed. Forcefully killing all processes.")
366+
367+
# First try clean termination
368+
try:
369+
self._terminate()
370+
except Exception as e:
371+
logger.error(f"Error during clean termination: {str(e)}")
372+
373+
# Use more direct system commands to kill processes
374+
try:
375+
# Kill packedge jobs first (more aggressive approach)
376+
logger.info("Killing Python package jobs")
377+
# Run `busybox ps` and capture output
378+
result = subprocess.run(["busybox", "ps"], capture_output=True, text=True)
379+
# Parse lines, skip the header
380+
lines = result.stdout.strip().split("\n")[1:]
381+
# Extract the PID (first column)
382+
pids = [int(line.split()[0]) for line in lines]
383+
for pid in pids:
384+
print("Killing pid:", pid)
385+
os.kill(pid, signal.SIGTERM)
386+
except Exception as kill_error:
387+
logger.error(f"Error during process killing: {str(kill_error)}")
388+
389+
def watchdog(self, port):
390+
"""
391+
Watchdog thread that periodically checks if the server is alive by making
392+
GET requests to the /info/ endpoint. Makes 3 attempts with quadratic backoff
393+
before terminating the Flask app.
394+
"""
395+
396+
logger.info("Starting watchdog to monitor server health...")
397+
398+
import os
399+
400+
url_host = os.environ.get("TEST_URL_HOST", "localhost")
401+
url_prefix = os.environ.get(URL_PREFIX_ENV_VAR_NAME, "")
402+
health_url = f"http://{url_host}:{port}{url_prefix}/info/"
403+
404+
request_timeout = 120
405+
if RuntimeParameters.has("NIM_WATCHDOG_REQUEST_TIMEOUT"):
406+
try:
407+
request_timeout = int(RuntimeParameters.get("NIM_WATCHDOG_REQUEST_TIMEOUT"))
408+
except ValueError:
409+
logger.warning(
410+
"Invalid value for NIM_WATCHDOG_REQUEST_TIMEOUT, using default of 120 seconds"
411+
)
412+
logger.info("Nim watchdog health check request timeout is %s", request_timeout)
413+
check_interval = 10 # seconds
414+
max_attempts = 3
415+
if RuntimeParameters.has("NIM_WATCHDOG_MAX_ATTEMPTS"):
416+
try:
417+
max_attempts = int(RuntimeParameters.get("NIM_WATCHDOG_MAX_ATTEMPTS"))
418+
except ValueError:
419+
logger.warning("Invalid value for NIM_WATCHDOG_MAX_ATTEMPTS, using default of 3")
420+
logger.info("Nim watchdog max attempts: %s", max_attempts)
421+
attempt = 0
422+
base_sleep_time = 4
423+
424+
while True:
425+
try:
426+
# Check if server is responding to health checks
427+
logger.debug(f"Server health check")
428+
response = requests.get(health_url, timeout=request_timeout)
429+
logger.debug(f"Server health check status: {response.status_code}")
430+
# Connection succeeded, reset attempts and wait for next check
431+
attempt = 0
432+
time.sleep(check_interval) # Regular check interval
433+
continue
434+
435+
except Exception as e:
436+
attempt += 1
437+
logger.warning(f"health_url {health_url}")
438+
logger.warning(
439+
f"Server health check failed (attempt {attempt}/{max_attempts}): {str(e)}"
440+
)
441+
442+
if attempt >= max_attempts:
443+
self._kill_all_processes()
444+
445+
# Quadratic backoff
446+
sleep_time = base_sleep_time * (attempt**2)
447+
logger.info(f"Retrying in {sleep_time} seconds...")
448+
time.sleep(sleep_time)
449+
340450
def terminate(self):
341451
terminate_op = getattr(self._predictor, "terminate", None)
342452
if callable(terminate_op):

0 commit comments

Comments
 (0)