Skip to content

Commit 62d1c92

Browse files
s-gavrenkovsvc-harness-git2mjnitz02GenAI Git Botdependabot[bot]
authored
[RAPTOR-14353] Run flask server using CLI (gunicorn with gevent support) (#1633)
* working * fixed * removed rr * Reconcile dependencies, updated IDs, tags * working * working * Reconcile dependencies, updated IDs, tags * [BUZZOK-27100] Fix Drum Inline Runner and streamline DRUM options generation to help prevent logic diverging (#1615) * Fix Drum Inline Runner and streamline DRUM options generation * Cleanup * Refactor setup to new file * Fix mocks * Add unit tests * Change tracer * [-] (Auto) Bump env_info versions (#1622) Co-authored-by: GenAI Git Bot <genai-systems-devs@datarobot.com> * Bump keras in /public_dropin_environments/python3_keras (#1624) Bumps [keras](https://github.com/keras-team/keras) from 3.10.0 to 3.11.0. - [Release notes](https://github.com/keras-team/keras/releases) - [Commits](keras-team/keras@v3.10.0...v3.11.0) --- updated-dependencies: - dependency-name: keras dependency-version: 3.11.0 dependency-type: indirect ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Configure OTel metrics by default in drum. (#1620) * [RAPTOR-14453] Regen requirements.txt to fix CVE-2025-8747 (#1623) * [RAPTOR-14453] Regen requirements.txt to fix CVE-2025-8747 This regenerates the requirements.txt file from requirements.in, to pull in the latest keras 3.11.2, upgrading from 3.10.0. Upgrading past 3.11.0 fixes CVE-2025-8747. * Reconcile dependencies, updated IDs, tags --------- Co-authored-by: svc-harness-git2 <svc-harness-git2@datarobot.com> * [BUZZOK-27241] Update DRUM version to 1.16.22 to add support for `def chat` kwargs (#1621) * Update DRUM version * Update CHANGELOG.md * [BUZZOK-27241] [BUZZOK-27421] Bump requirements in GenAI Agents environment (#1627) * Bump requirements in GenAI Agents environment * Update * Reconcile dependencies, updated IDs, tags --------- Co-authored-by: svc-harness-git2 <svc-harness-git2@datarobot.com> * [CFX-3334] Update to latest drgithelper and properly set permissions for credential cache daemon (#1630) * [CFX-3334] Update to latest drgithelper and properly set permissions for credential cache daemon * Reconcile dependencies, updated IDs, tags --------- Co-authored-by: svc-harness-git2 <svc-harness-git2@datarobot.com> * Add OTEL logging configuration, refactor traces and metrics. (#1626) * Bump DRUM version. (#1631) * [RAPTOR-13851] pytorch: rebuild requirements to pull in updates (#1637) * [RAPTOR-13851] pytorch: rebuild requirements to pull in updates This rebuilds requirements.txt to pull in updates for pytorch to resolve CVE-2025-3730. * Reconcile dependencies, updated IDs, tags --------- Co-authored-by: svc-harness-git2 <svc-harness-git2@datarobot.com> * Avoid infinite recursion in logs. (#1638) * Update version for new release. (#1639) Co-authored-by: “Nickolai <“nickolai@datarobot.com”> * [RAPTOR-14353] add client and NIM timeouts (#1640) * addd timeouts * fiz black * fix tests * fixed black * add timeout * fixed * replaced with RuntimeParameters * removed unused import * replaced with static methods * [RAPTOR-14353] Add nim watchdog (#1632) * add watchdog * 5 attempts * watchdog_additional * fixed version * add env USE_NIM_WATCHDOG * add env USE_NIM_WATCHDOG * fixed lint * fixed version * rever docker changes * add changes * removed changelogs * max_attempts = 3 * replaced with os.kill(pid, signal.SIGTERM) * renamed * fixed comments * add NIM_WATCHDOG_REQUEST_TIMEOUT and NIM_WATCHDOG_MAX_ATTEMPTS * add logs * fix lint * updated version (#1643) * Reconcile dependencies, updated IDs, tags * working * Reconcile dependencies, updated IDs, tags * refactoring * revert nim sidecar * revrt env_info.json * delint * revert * added comments * added is_client_request_timeout_enabled * added comments * added comments * fix signal termination --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: svc-harness-git2 <svc-harness-git2@datarobot.com> Co-authored-by: Matt Nitzken <mjnitz02@gmail.com> Co-authored-by: svc-harness-git2 <130688563+svc-harness-git2@users.noreply.github.com> Co-authored-by: GenAI Git Bot <genai-systems-devs@datarobot.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Nickolai Novik <92932793+nickolai-dr@users.noreply.github.com> Co-authored-by: Aaron Ball <aaron.ball@datarobot.com> Co-authored-by: Chris Russell-Walker <christopher.russell-walker@datarobot.com> Co-authored-by: “Nickolai <“nickolai@datarobot.com”>
1 parent 862c4ca commit 62d1c92

File tree

6 files changed

+162
-51
lines changed

6 files changed

+162
-51
lines changed

custom_model_runner/datarobot_drum/drum/drum.py

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,12 @@
7777

7878

7979
class CMRunner:
80-
def __init__(self, runtime):
80+
def __init__(self, runtime, flask_app=None, worker_ctx=None):
8181
self.runtime = runtime
82+
self.flask_app = (
83+
flask_app # This is the Flask app object, used when running the application via CLI
84+
)
85+
self.worker_ctx = worker_ctx # This is the Gunicorn worker context object (WorkerCtx)
8286
self.options = runtime.options
8387
self.options.model_config = read_model_metadata_yaml(self.options.code_dir)
8488
self.options.default_parameter_values = (
@@ -497,8 +501,18 @@ def run(self):
497501
with self._setup_output_if_not_exists():
498502
self._run_predictions(stats_collector)
499503
finally:
500-
if stats_collector:
501-
stats_collector.disable()
504+
if self.worker_ctx:
505+
# Perform cleanup specific to the Gunicorn worker being terminated.
506+
# Gunicorn spawns multiple worker processes to handle requests. Each worker has its own context,
507+
# and this ensures that only the resources associated with the current worker are released.
508+
# defer_cleanup simply saves methods to be executed during worker restart or shutdown.
509+
# More details in https://github.com/datarobot/datarobot-custom-templates/pull/419
510+
self.worker_ctx.defer_cleanup(
511+
lambda: stats_collector.disable(), desc="stats_collector.disable()"
512+
)
513+
else:
514+
if stats_collector:
515+
stats_collector.disable()
502516
if stats_collector:
503517
stats_collector.print_reports()
504518
elif self.run_mode == RunMode.SERVER:
@@ -826,7 +840,7 @@ def _run_predictions(self, stats_collector: Optional[StatsCollector] = None):
826840
if stats_collector:
827841
stats_collector.mark("start")
828842
predictor = (
829-
PredictionServer(params)
843+
PredictionServer(params, self.flask_app)
830844
if self.run_mode == RunMode.SERVER
831845
else GenericPredictorComponent(params)
832846
)
@@ -836,16 +850,39 @@ def _run_predictions(self, stats_collector: Optional[StatsCollector] = None):
836850
if stats_collector:
837851
stats_collector.mark("run")
838852
finally:
839-
if predictor is not None:
840-
predictor.terminate()
841-
if stats_collector:
842-
stats_collector.mark("end")
853+
if self.worker_ctx:
854+
# Perform cleanup specific to the Gunicorn worker being terminated.
855+
# Gunicorn spawns multiple worker processes to handle requests. Each worker has its own context,
856+
# and this ensures that only the resources associated with the current worker are released.
857+
# defer_cleanup simply saves methods to be executed during worker restart or shutdown.
858+
# More details in https://github.com/datarobot/datarobot-custom-templates/pull/419
859+
if predictor is not None:
860+
self.worker_ctx.defer_cleanup(
861+
lambda: predictor.terminate(), desc="predictor.terminate()"
862+
)
863+
if stats_collector:
864+
self.worker_ctx.defer_cleanup(
865+
lambda: stats_collector.mark("end"), desc="stats_collector.mark('end')"
866+
)
867+
self.worker_ctx.defer_cleanup(
868+
lambda: self.logger.info(
869+
"<<< Finish {} in the {} mode".format(
870+
ArgumentsOptions.MAIN_COMMAND, self.run_mode.value
871+
)
872+
),
873+
desc="logger.info(...)",
874+
)
843875

844-
self.logger.info(
845-
"<<< Finish {} in the {} mode".format(
846-
ArgumentsOptions.MAIN_COMMAND, self.run_mode.value
847-
)
848-
)
876+
else:
877+
if predictor is not None:
878+
predictor.terminate()
879+
if stats_collector:
880+
stats_collector.mark("end")
881+
self.logger.info(
882+
"<<< Finish {} in the {} mode".format(
883+
ArgumentsOptions.MAIN_COMMAND, self.run_mode.value
884+
)
885+
)
849886

850887
@contextlib.contextmanager
851888
def _setup_output_if_not_exists(self):

custom_model_runner/datarobot_drum/drum/main.py

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,50 @@
5454
)
5555

5656

57-
def main():
58-
with DrumRuntime() as runtime:
57+
def main(flask_app=None, worker_ctx=None):
58+
"""
59+
The main entry point for the custom model runner.
60+
61+
This function initializes the runtime environment, sets up logging, handles
62+
signal interruptions, and starts the CMRunner for executing user-defined models.
63+
64+
Args:
65+
flask_app: Optional[Flask] Flask application instance, used when running using command line.
66+
worker_ctx: Optional gunicorn worker context (WorkerCtx), used for managing cleanup tasks in a
67+
multi-worker setup (e.g., Gunicorn).
68+
69+
Returns:
70+
None
71+
"""
72+
with DrumRuntime(flask_app) as runtime:
5973
config_logging()
6074

75+
if worker_ctx:
76+
# Perform cleanup specific to the Gunicorn worker being terminated.
77+
# Gunicorn spawns multiple worker processes to handle requests. Each worker has its own context,
78+
# and this ensures that only the resources associated with the current worker are released.
79+
# defer_cleanup simply saves methods to be executed during worker restart or shutdown.
80+
# More details in https://github.com/datarobot/datarobot-custom-templates/pull/419
81+
if runtime.options and RunMode(runtime.options.subparser_name) == RunMode.SERVER:
82+
if runtime.cm_runner:
83+
worker_ctx.defer_cleanup(
84+
lambda: runtime.cm_runner.terminate(), desc="runtime.cm_runner.terminate()"
85+
)
86+
if runtime.trace_provider is not None:
87+
worker_ctx.defer_cleanup(
88+
lambda: runtime.trace_provider.shutdown(),
89+
desc="runtime.trace_provider.shutdown()",
90+
)
91+
if runtime.metric_provider is not None:
92+
worker_ctx.defer_cleanup(
93+
lambda: runtime.metric_provider.shutdown(),
94+
desc="runtime.metric_provider.shutdown()",
95+
)
96+
if runtime.log_provider is not None:
97+
worker_ctx.defer_cleanup(
98+
lambda: runtime.log_provider.shutdown(), desc="runtime.log_provider.shutdown()"
99+
)
100+
61101
def signal_handler(sig, frame):
62102
# The signal is assigned so the stacktrace is not presented when Ctrl-C is pressed.
63103
# The cleanup itself is done only if we are NOT running in performance test mode which
@@ -89,13 +129,14 @@ def signal_handler(sig, frame):
89129
runtime.metric_provider = metric_provider
90130
runtime.log_provider = log_provider
91131

92-
signal.signal(signal.SIGINT, signal_handler)
93-
signal.signal(signal.SIGTERM, signal_handler)
132+
if worker_ctx is None:
133+
signal.signal(signal.SIGINT, signal_handler)
134+
signal.signal(signal.SIGTERM, signal_handler)
94135

95136
from datarobot_drum.drum.drum import CMRunner
96137

97138
try:
98-
runtime.cm_runner = CMRunner(runtime)
139+
runtime.cm_runner = CMRunner(runtime, flask_app, worker_ctx)
99140
runtime.cm_runner.run()
100141
except DrumSchemaValidationException:
101142
sys.exit(ExitCodes.SCHEMA_VALIDATION_ERROR.value)

custom_model_runner/datarobot_drum/drum/root_predictors/prediction_server.py

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,11 @@ class TimeoutWSGIRequestHandler(WSGIRequestHandler):
6868

6969

7070
class PredictionServer(PredictMixin):
71-
def __init__(self, params: dict):
71+
def __init__(self, params: dict, flask_app=None):
7272
self._params = params
73+
self.flask_app = (
74+
flask_app # This is the Flask app object, used when running the application via CLI
75+
)
7376
self._show_perf = self._params.get("show_perf")
7477
self._resource_monitor = ResourceMonitor(monitor_current_process=True)
7578
self._run_language = RunLanguage(params.get("run_language"))
@@ -310,7 +313,7 @@ def handle_exception(e):
310313
cli = sys.modules["flask.cli"]
311314
cli.show_server_banner = lambda *x: None
312315

313-
app = get_flask_app(model_api)
316+
app = get_flask_app(model_api, self.flask_app)
314317
self.load_flask_extensions(app)
315318
self._run_flask_app(app)
316319

@@ -319,6 +322,19 @@ def handle_exception(e):
319322

320323
return []
321324

325+
def is_client_request_timeout_enabled(self):
326+
if (
327+
RuntimeParameters.has("DRUM_CLIENT_REQUEST_TIMEOUT")
328+
and int(RuntimeParameters.get("DRUM_CLIENT_REQUEST_TIMEOUT")) > 0
329+
):
330+
logger.info(
331+
"Client request timeout is enabled, timeout: %s",
332+
str(int(TimeoutWSGIRequestHandler.timeout)),
333+
)
334+
return True
335+
else:
336+
return False
337+
322338
def _run_flask_app(self, app):
323339
host = self._params.get("host", None)
324340
port = self._params.get("port", None)
@@ -328,30 +344,34 @@ def _run_flask_app(self, app):
328344
processes = self._params.get("processes")
329345
logger.info("Number of webserver processes: %s", processes)
330346
try:
331-
if RuntimeParameters.has("USE_NIM_WATCHDOG") and str(
332-
RuntimeParameters.get("USE_NIM_WATCHDOG")
333-
).lower() in ["true", "1", "yes"]:
334-
# Start the watchdog thread before running the app
335-
self._server_watchdog = Thread(
336-
target=self.watchdog,
337-
args=(port,),
338-
daemon=True,
339-
name="NIM Sidecar Watchdog",
347+
if self.flask_app:
348+
# when running application via the command line (e.g., gunicorn worker)
349+
pass
350+
else:
351+
if RuntimeParameters.has("USE_NIM_WATCHDOG") and str(
352+
RuntimeParameters.get("USE_NIM_WATCHDOG")
353+
).lower() in ["true", "1", "yes"]:
354+
# Start the watchdog thread before running the app
355+
self._server_watchdog = Thread(
356+
target=self.watchdog,
357+
args=(port,),
358+
daemon=True,
359+
name="NIM Sidecar Watchdog",
360+
)
361+
self._server_watchdog.start()
362+
363+
# Configure the server with timeout settings
364+
app.run(
365+
host=host,
366+
port=port,
367+
threaded=False,
368+
processes=processes,
369+
**(
370+
{"request_handler": TimeoutWSGIRequestHandler}
371+
if self.is_client_request_timeout_enabled()
372+
else {}
373+
),
340374
)
341-
self._server_watchdog.start()
342-
343-
# Configure the server with timeout settings
344-
app.run(
345-
host=host,
346-
port=port,
347-
threaded=False,
348-
processes=processes,
349-
**(
350-
{"request_handler": TimeoutWSGIRequestHandler}
351-
if RuntimeParameters.has("DRUM_CLIENT_REQUEST_TIMEOUT")
352-
else {}
353-
),
354-
)
355375
except OSError as e:
356376
raise DrumCommonException("{}: host: {}; port: {}".format(e, host, port))
357377

custom_model_runner/datarobot_drum/drum/runtime.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
Released under the terms of DataRobot Tool and Utility Agreement.
66
"""
77
import logging
8+
from typing import Optional
89

910
from datarobot_drum.drum.server import (
1011
empty_api_blueprint,
@@ -13,6 +14,7 @@
1314
)
1415
from datarobot_drum.drum.common import verbose_stdout, get_drum_logger
1516
from datarobot_drum.drum.enum import LOGGER_NAME_PREFIX, RunMode
17+
from flask import Flask
1618

1719
from datarobot_drum.drum.exceptions import DrumCommonException
1820

@@ -23,14 +25,17 @@
2325

2426

2527
class DrumRuntime:
26-
def __init__(self):
28+
def __init__(self, flask_app: Optional[Flask] = None):
2729
self.initialization_succeeded = False
2830
self.options = None
2931
self.cm_runner = None
3032
# OTEL services
3133
self.trace_provider = None
3234
self.metric_provider = None
3335
self.log_provider = None
36+
self.flask_app = (
37+
flask_app # This is the Flask app object, used when running the application via CLI
38+
)
3439

3540
def __enter__(self):
3641
return self
@@ -83,12 +88,12 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
8388
port = int(host_port_list[1]) if len(host_port_list) == 2 else None
8489

8590
with verbose_stdout(self.options.verbose):
86-
run_error_server(host, port, exc_value)
91+
run_error_server(host, port, exc_value, self.flask_app)
8792

8893
return False # propagate exception further
8994

9095

91-
def run_error_server(host, port, exc_value):
96+
def run_error_server(host, port, exc_value, flask_app: Optional[Flask] = None):
9297
model_api = empty_api_blueprint()
9398

9499
@model_api.route("/", methods=["GET"])
@@ -109,5 +114,9 @@ def predict():
109114
def transform():
110115
return {"message": "ERROR: {}".format(exc_value)}, HTTP_513_DRUM_PIPELINE_ERROR
111116

112-
app = get_flask_app(model_api)
113-
app.run(host, port)
117+
app = get_flask_app(model_api, flask_app)
118+
if flask_app:
119+
# when running application via the command line (e.g., gunicorn worker)
120+
pass
121+
else:
122+
app.run(host, port)

custom_model_runner/datarobot_drum/drum/server.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
Released under the terms of DataRobot Tool and Utility Agreement.
66
"""
77
import datetime
8+
from typing import Optional
9+
810
import flask
911
import os
1012
import uuid
@@ -29,8 +31,9 @@
2931
logger = get_drum_logger(LOGGER_NAME_PREFIX)
3032

3133

32-
def get_flask_app(api_blueprint):
33-
app = create_flask_app()
34+
def get_flask_app(api_blueprint, app: Optional[Flask] = None):
35+
if app is None:
36+
app = create_flask_app()
3437
url_prefix = os.environ.get(URL_PREFIX_ENV_VAR_NAME, "")
3538
app.register_blueprint(api_blueprint, url_prefix=url_prefix)
3639
return app

tests/unit/datarobot_drum/drum/test_prediction_server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ def chat_hook(completion_request, model):
235235

236236

237237
@pytest.mark.parametrize(
238-
"processes_param, expected_processes, request_timeout", [(None, 1, None), (10, 10, 600)]
238+
"processes_param, expected_processes, request_timeout",
239+
[(None, 1, None), (None, 1, 0), (10, 10, 600)],
239240
)
240241
def test_run_flask_app(processes_param, expected_processes, request_timeout):
241242
if request_timeout:

0 commit comments

Comments
 (0)