From bc8bc111a69206ffdc3f5d0d78d344b2b748f885 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=AD=E6=BD=87?= <1576730710@qq.com> Date: Thu, 23 Oct 2025 10:41:02 +0800 Subject: [PATCH 1/5] =?UTF-8?q?[python]=20=E4=BF=AE=E5=A4=8D=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=E9=87=8D=E5=90=AF=E6=9C=BA=E5=88=B6=E4=B8=8D=E5=AE=8C?= =?UTF-8?q?=E5=96=84=E4=B8=8EScheduler=E5=81=9C=E6=AD=A2=E6=97=B6=E7=9A=84?= =?UTF-8?q?AttributeError?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fit/python/fitframework/__init__.py | 42 ++++- .../fitframework/utils/process_monitor.py | 88 +++++++++ .../fitframework/utils/restart_policy.py | 107 +++++++++++ .../python/fitframework/utils/scheduler.py | 7 +- .../conf/application.yml | 6 + .../fit_py_health_monitor/conf/plugin.yml | 7 + .../fit_py_health_monitor/health_monitor.py | 176 ++++++++++++++++++ .../heart_beat_agent.py | 9 +- 8 files changed, 436 insertions(+), 6 deletions(-) create mode 100644 framework/fit/python/fitframework/utils/process_monitor.py create mode 100644 framework/fit/python/fitframework/utils/restart_policy.py create mode 100644 framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml create mode 100644 framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml create mode 100644 framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py diff --git a/framework/fit/python/fitframework/__init__.py b/framework/fit/python/fitframework/__init__.py index a8ec7680..964ad29e 100644 --- a/framework/fit/python/fitframework/__init__.py +++ b/framework/fit/python/fitframework/__init__.py @@ -215,7 +215,43 @@ def main(): if platform.system() in ('Windows', 'Darwin'): # Windows 或 macOS main() else: # Linux 及其他 + from fitframework.utils.restart_policy import create_default_restart_policy + + restart_policy = create_default_restart_policy() + fit_logger.info(f"Starting process manager with restart policy: {restart_policy.get_status()}") + while True: - main_process = Process(target=main, name='MainProcess') - main_process.start() - main_process.join() + try: + main_process = Process(target=main, name='MainProcess') + main_process.start() + fit_logger.info(f"Main process started with PID: {main_process.pid}") + main_process.join() + + # 检查进程退出码 + exit_code = main_process.exitcode + fit_logger.info(f"Main process exited with code: {exit_code}") + + # 使用重启策略判断是否应该重启 + if not restart_policy.should_restart(exit_code): + fit_logger.info("Restart policy indicates no restart needed, stopping") + break + + # 获取重启延迟 + restart_delay = restart_policy.get_restart_delay() + status = restart_policy.get_status() + + fit_logger.warning(f"Main process exited unexpectedly, restarting in {restart_delay:.2f} seconds... " + f"(attempt {status['current_attempt']}/{status['max_attempts']})") + time.sleep(restart_delay) + + except Exception as e: + fit_logger.error(f"Error during process management: {e}") + if not restart_policy.should_restart(-1): # 使用-1表示异常退出 + fit_logger.error("Restart policy indicates no restart needed due to errors, stopping") + break + + restart_delay = restart_policy.get_restart_delay() + status = restart_policy.get_status() + fit_logger.warning(f"Error occurred, restarting in {restart_delay:.2f} seconds... " + f"(attempt {status['current_attempt']}/{status['max_attempts']})") + time.sleep(restart_delay) diff --git a/framework/fit/python/fitframework/utils/process_monitor.py b/framework/fit/python/fitframework/utils/process_monitor.py new file mode 100644 index 00000000..bfb3c65a --- /dev/null +++ b/framework/fit/python/fitframework/utils/process_monitor.py @@ -0,0 +1,88 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +功 能:进程监控和健康检查工具 +""" +import time +from typing import Optional + +import psutil + +from fitframework.api.logging import fit_logger + + +class ProcessMonitor: + """进程监控器,用于监控主进程的健康状态""" + + def __init__(self, pid: int, max_memory_mb: int = 1024, max_cpu_percent: float = 80.0): + self.pid = pid + self.max_memory_mb = max_memory_mb + self.max_cpu_percent = max_cpu_percent + self.start_time = time.time() + self.last_check_time = time.time() + + def is_process_alive(self) -> bool: + """检查进程是否还活着""" + try: + return psutil.pid_exists(self.pid) + except Exception as e: + fit_logger.warning(f"Error checking process {self.pid}: {e}") + return False + + def get_process_info(self) -> Optional[dict]: + """获取进程信息""" + try: + if not self.is_process_alive(): + return None + + process = psutil.Process(self.pid) + return { + 'pid': self.pid, + 'memory_mb': process.memory_info().rss / 1024 / 1024, + 'cpu_percent': process.cpu_percent(), + 'status': process.status(), + 'create_time': process.create_time(), + 'num_threads': process.num_threads() + } + except Exception as e: + fit_logger.warning(f"Error getting process info for {self.pid}: {e}") + return None + + def is_healthy(self) -> bool: + """检查进程是否健康""" + info = self.get_process_info() + if not info: + return False + + # 检查内存使用 + if info['memory_mb'] > self.max_memory_mb: + fit_logger.warning(f"Process {self.pid} memory usage too high: {info['memory_mb']:.2f}MB > {self.max_memory_mb}MB") + return False + + # 检查CPU使用(需要两次采样) + cpu_percent = info['cpu_percent'] + if cpu_percent > self.max_cpu_percent: + fit_logger.warning(f"Process {self.pid} CPU usage too high: {cpu_percent:.2f}% > {self.max_cpu_percent}%") + return False + + return True + + def should_restart(self) -> bool: + """判断是否应该重启进程""" + if not self.is_process_alive(): + fit_logger.error(f"Process {self.pid} is not alive") + return True + + if not self.is_healthy(): + fit_logger.warning(f"Process {self.pid} is unhealthy") + return True + + return False + + +def create_process_monitor(pid: int) -> ProcessMonitor: + """创建进程监控器""" + return ProcessMonitor(pid) diff --git a/framework/fit/python/fitframework/utils/restart_policy.py b/framework/fit/python/fitframework/utils/restart_policy.py new file mode 100644 index 00000000..ccf5c465 --- /dev/null +++ b/framework/fit/python/fitframework/utils/restart_policy.py @@ -0,0 +1,107 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +功 能:进程重启策略配置 +""" +import time +from typing import Dict, Any +from fitframework.api.logging import fit_logger + + +class RestartPolicy: + """进程重启策略""" + + def __init__(self, + max_attempts: int = 10, + base_delay: float = 5.0, + max_delay: float = 300.0, + backoff_multiplier: float = 1.5, + reset_after_success: bool = True): + self.max_attempts = max_attempts + self.base_delay = base_delay + self.max_delay = max_delay + self.backoff_multiplier = backoff_multiplier + self.reset_after_success = reset_after_success + + self.current_attempt = 0 + self.current_delay = base_delay + self.last_success_time = time.time() + + def should_restart(self, exit_code: int) -> bool: + """判断是否应该重启""" + # 正常退出不重启 + if exit_code == 0: + if self.reset_after_success: + self._reset() + return False + + # 超过最大尝试次数不重启 + if self.current_attempt >= self.max_attempts: + fit_logger.error(f"Maximum restart attempts ({self.max_attempts}) reached") + return False + + return True + + def get_restart_delay(self) -> float: + """获取重启延迟时间""" + delay = min(self.current_delay, self.max_delay) + self.current_attempt += 1 + + # 指数退避 + if self.current_attempt < self.max_attempts: + self.current_delay = min(self.current_delay * self.backoff_multiplier, self.max_delay) + + return delay + + def _reset(self): + """重置策略状态""" + self.current_attempt = 0 + self.current_delay = self.base_delay + self.last_success_time = time.time() + + def get_status(self) -> Dict[str, Any]: + """获取当前状态""" + return { + 'current_attempt': self.current_attempt, + 'max_attempts': self.max_attempts, + 'current_delay': self.current_delay, + 'base_delay': self.base_delay, + 'max_delay': self.max_delay, + 'last_success_time': self.last_success_time + } + + +def create_default_restart_policy() -> RestartPolicy: + """创建默认重启策略""" + return RestartPolicy( + max_attempts=10, + base_delay=5.0, + max_delay=300.0, + backoff_multiplier=1.5, + reset_after_success=True + ) + + +def create_aggressive_restart_policy() -> RestartPolicy: + """创建激进重启策略(快速重启)""" + return RestartPolicy( + max_attempts=20, + base_delay=2.0, + max_delay=60.0, + backoff_multiplier=1.2, + reset_after_success=True + ) + + +def create_conservative_restart_policy() -> RestartPolicy: + """创建保守重启策略(慢速重启)""" + return RestartPolicy( + max_attempts=5, + base_delay=10.0, + max_delay=600.0, + backoff_multiplier=2.0, + reset_after_success=True + ) diff --git a/framework/fit/python/fitframework/utils/scheduler.py b/framework/fit/python/fitframework/utils/scheduler.py index b0c78894..04b62450 100644 --- a/framework/fit/python/fitframework/utils/scheduler.py +++ b/framework/fit/python/fitframework/utils/scheduler.py @@ -109,8 +109,11 @@ def _start(): @register_event(FrameworkEvent.FRAMEWORK_STOPPING) def _stop(): global _timer - _timer.cancel() - fit_logger.info("timer stopped") + if _timer is not None: + _timer.cancel() + fit_logger.info("timer stopped") + else: + fit_logger.warning("timer was not initialized, skip stopping") class _Timer: diff --git a/framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml b/framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml new file mode 100644 index 00000000..23c99c1b --- /dev/null +++ b/framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml @@ -0,0 +1,6 @@ +# 健康监控插件配置 +health-monitor: + check-interval: 30 # 健康检查间隔(秒) + memory-threshold: 80.0 # 内存使用率阈值(百分比) + cpu-threshold: 90.0 # CPU使用率阈值(百分比) + disk-threshold: 90.0 # 磁盘使用率阈值(百分比) diff --git a/framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml b/framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml new file mode 100644 index 00000000..1413fc98 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml @@ -0,0 +1,7 @@ +# 健康监控插件元数据 +name: fit_py_health_monitor +version: 1.0.0 +description: "系统健康监控插件" +author: "Fit Framework Team" +level: 1 +type: plugin diff --git a/framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py b/framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py new file mode 100644 index 00000000..2de96550 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py @@ -0,0 +1,176 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +功 能:系统健康监控插件 +""" +import time +from typing import Dict, Any + +import psutil + +from fitframework.api.decorators import fitable, fit, value +from fitframework.api.decorators import register_event +from fitframework.api.enums import FrameworkEvent +from fitframework.api.logging import sys_plugin_logger + + +@value('health-monitor.check-interval', 30, converter=int) +def get_check_interval(): + """健康检查间隔(秒)""" + pass + + +@value('health-monitor.memory-threshold', 80.0, converter=float) +def get_memory_threshold(): + """内存使用率阈值(百分比)""" + pass + + +@value('health-monitor.cpu-threshold', 90.0, converter=float) +def get_cpu_threshold(): + """CPU使用率阈值(百分比)""" + pass + + +@value('health-monitor.disk-threshold', 90.0, converter=float) +def get_disk_threshold(): + """磁盘使用率阈值(百分比)""" + pass + + +class HealthMonitor: + """系统健康监控器""" + + def __init__(self): + self.last_check_time = time.time() + self.health_status = {} + self.alert_count = 0 + self.max_alerts = 10 + + def check_system_health(self) -> Dict[str, Any]: + """检查系统健康状态""" + try: + # 内存使用率 + memory = psutil.virtual_memory() + memory_percent = memory.percent + + # CPU使用率 + cpu_percent = psutil.cpu_percent(interval=1) + + # 磁盘使用率 + disk = psutil.disk_usage('/') + disk_percent = (disk.used / disk.total) * 100 + + # 进程信息 + current_process = psutil.Process() + process_memory = current_process.memory_info().rss / 1024 / 1024 # MB + process_cpu = current_process.cpu_percent() + + health_status = { + 'timestamp': time.time(), + 'memory': { + 'total': memory.total, + 'available': memory.available, + 'percent': memory_percent, + 'healthy': memory_percent < get_memory_threshold() + }, + 'cpu': { + 'percent': cpu_percent, + 'healthy': cpu_percent < get_cpu_threshold() + }, + 'disk': { + 'total': disk.total, + 'used': disk.used, + 'free': disk.free, + 'percent': disk_percent, + 'healthy': disk_percent < get_disk_threshold() + }, + 'process': { + 'memory_mb': process_memory, + 'cpu_percent': process_cpu, + 'pid': current_process.pid, + 'status': current_process.status() + } + } + + # 检查是否健康 + is_healthy = ( + health_status['memory']['healthy'] and + health_status['cpu']['healthy'] and + health_status['disk']['healthy'] + ) + + health_status['overall_healthy'] = is_healthy + + if not is_healthy: + self.alert_count += 1 + self._log_health_alert(health_status) + else: + self.alert_count = 0 # 重置告警计数 + + self.health_status = health_status + self.last_check_time = time.time() + + return health_status + + except Exception as e: + sys_plugin_logger.error(f"Error checking system health: {e}") + return {'error': str(e), 'overall_healthy': False} + + def _log_health_alert(self, health_status: Dict[str, Any]): + """记录健康告警""" + if self.alert_count > self.max_alerts: + return # 避免日志过多 + + alerts = [] + if not health_status['memory']['healthy']: + alerts.append(f"Memory usage: {health_status['memory']['percent']:.1f}%") + if not health_status['cpu']['healthy']: + alerts.append(f"CPU usage: {health_status['cpu']['percent']:.1f}%") + if not health_status['disk']['healthy']: + alerts.append(f"Disk usage: {health_status['disk']['percent']:.1f}%") + + if alerts: + sys_plugin_logger.warning(f"System health alert #{self.alert_count}: {', '.join(alerts)}") + + def get_health_summary(self) -> Dict[str, Any]: + """获取健康状态摘要""" + return { + 'last_check': self.last_check_time, + 'alert_count': self.alert_count, + 'status': self.health_status.get('overall_healthy', False), + 'details': self.health_status + } + + +# 全局健康监控器实例 +_health_monitor = HealthMonitor() + + +@fitable("modelengine.fit.health.check", "local-worker") +def check_health() -> Dict[str, Any]: + """执行健康检查""" + return _health_monitor.check_system_health() + + +@fit("modelengine.fit.health.status") +def get_health_status() -> Dict[str, Any]: + """获取健康状态""" + return _health_monitor.get_health_summary() + + +@register_event(FrameworkEvent.APPLICATION_STARTED) +def start_health_monitoring(): + """启动健康监控""" + sys_plugin_logger.info("Health monitoring started") + # 执行初始健康检查 + _health_monitor.check_system_health() + + +@register_event(FrameworkEvent.FRAMEWORK_STOPPING) +def stop_health_monitoring(): + """停止健康监控""" + sys_plugin_logger.info("Health monitoring stopped") diff --git a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py index 2fd79eb7..0c1a5b8d 100644 --- a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py +++ b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py @@ -100,13 +100,18 @@ def _try_heart_beat_once(): _registry_fitable_addresses() sys_plugin_logger.debug(f'heart beating success.') _LAST_HEART_BEAT_SUCCESS_TIME = heart_beat_finish_time - except: + except Exception as e: _FAIL_COUNT += 1 except_type, except_value, except_traceback = sys.exc_info() sys_plugin_logger.warning(f"heart beat failed. [fail_count={_FAIL_COUNT}]") sys_plugin_logger.warning(f"heart beat error type: {except_type}, value: {except_value}, " f"trace back:\n{''.join(traceback.format_tb(except_traceback))}") + # 如果连续失败次数过多,考虑重启心跳任务 + if _FAIL_COUNT >= 5: + sys_plugin_logger.error(f"heart beat failed too many times ({_FAIL_COUNT}), considering restart") + # 这里可以添加重启心跳任务的逻辑 + def _heart_beat_task(queue: multiprocessing.Queue): while True: @@ -128,6 +133,8 @@ def _heart_beat_monitor(heart_beat_background_job): global _HEART_BEAT_EXIT_UNEXPECTEDLY _HEART_BEAT_EXIT_UNEXPECTEDLY = True sys_plugin_logger.error("heart beat job is not alive, runtime should shutdown immediately.") + # 添加延迟,给进程重启机制一些时间 + time.sleep(2) shutdown() From 0134ef8bf15c5c4c8deb5d163d500d3c3c5036a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=AD=E6=BD=87?= <1576730710@qq.com> Date: Mon, 10 Nov 2025 10:28:55 +0800 Subject: [PATCH 2/5] =?UTF-8?q?[python]=20=E4=BF=AE=E5=A4=8D=20HeartBeatWa?= =?UTF-8?q?tchThread=20=E9=87=8D=E5=A4=8D=E8=B0=83=E7=94=A8=20shutdown()?= =?UTF-8?q?=20=E6=96=B9=E6=B3=95=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../heart_beat_agent.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py index 0c1a5b8d..b757ca03 100644 --- a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py +++ b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py @@ -18,7 +18,7 @@ import traceback from multiprocessing import Process from queue import Empty -from threading import Thread +from threading import Thread, Lock from typing import List from fitframework import const @@ -36,6 +36,9 @@ _LAST_HEART_BEAT_SUCCESS_TIME = time.time() # 心跳进程是否意外退出 _HEART_BEAT_EXIT_UNEXPECTEDLY = False +# 是否正在关闭中(用于区分正常关闭和意外退出) +_SHUTDOWN_IN_PROGRESS = False +_SHUTDOWN_LOCK = Lock() @value('heart-beat.client.sceneType', "fit-registry") @@ -130,8 +133,15 @@ def _heart_beat_task(queue: multiprocessing.Queue): def _heart_beat_monitor(heart_beat_background_job): while heart_beat_background_job.is_alive(): time.sleep(1) - global _HEART_BEAT_EXIT_UNEXPECTEDLY - _HEART_BEAT_EXIT_UNEXPECTEDLY = True + global _HEART_BEAT_EXIT_UNEXPECTEDLY, _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK + # 检查是否正在关闭中 + with _SHUTDOWN_LOCK: + if _SHUTDOWN_IN_PROGRESS: + # 如果是正常关闭,心跳任务退出是预期的,不需要再次调用shutdown + sys_plugin_logger.info("heart beat job exited during graceful shutdown, no action needed.") + return + # 如果不是正常关闭,则认为是意外退出 + _HEART_BEAT_EXIT_UNEXPECTEDLY = True sys_plugin_logger.error("heart beat job is not alive, runtime should shutdown immediately.") # 添加延迟,给进程重启机制一些时间 time.sleep(2) @@ -157,7 +167,11 @@ def online() -> None: @register_event(Fit_Event.FRAMEWORK_STOPPING) def offline(): """ Runtime关闭前应主动向心跳代理申请offline,心跳代理停止发送heartbeat并调用心跳服务端leave接口 """ + global _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK sys_plugin_logger.info("heart beat agent offline") + # 设置关闭标志,表示正在正常关闭 + with _SHUTDOWN_LOCK: + _SHUTDOWN_IN_PROGRESS = True _HEART_BEAT_FINISH_QUEUE.put(None) From 58e74ad867ef8f3b05fa0374efec42093b6464d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=AD=E6=BD=87?= <1576730710@qq.com> Date: Tue, 11 Nov 2025 15:14:27 +0800 Subject: [PATCH 3/5] =?UTF-8?q?[python]=20=E6=B7=BB=E5=8A=A0=E4=B8=BB?= =?UTF-8?q?=E8=BF=9B=E7=A8=8B=E5=BC=82=E5=B8=B8=E9=80=80=E5=87=BA=E7=9A=84?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fit/python/fitframework/__init__.py | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/framework/fit/python/fitframework/__init__.py b/framework/fit/python/fitframework/__init__.py index 964ad29e..2dd9b4d3 100644 --- a/framework/fit/python/fitframework/__init__.py +++ b/framework/fit/python/fitframework/__init__.py @@ -180,16 +180,6 @@ def heart_beat_exit_unexpectedly() -> bool: pass -def determine_should_terminate_main() -> bool: - try: - return heart_beat_exit_unexpectedly() or get_should_terminate_main() - except: - except_type, except_value, except_traceback = sys.exc_info() - fit_logger.warning(f"get should terminate main error, error type: {except_type}, value: {except_value}, " - f"trace back:\n{''.join(traceback.format_tb(except_traceback))}") - return False - - @shutdown_on_error @timer def main(): @@ -205,7 +195,31 @@ def main(): fit_logger.info(f"fit framework is now available in version {_FIT_FRAMEWORK_VERSION}.") if get_terminate_main_enabled(): fit_logger.info("terminate main enabled.") - while not determine_should_terminate_main(): + while True: + # 明确区分退出原因并打印日志 + hb_exit = False + should_terminate = False + try: + hb_exit = heart_beat_exit_unexpectedly() + except: + except_type, except_value, except_traceback = sys.exc_info() + fit_logger.warning(f"check heart_beat_exit_unexpectedly error, error type: {except_type}, " + f"value: {except_value}, trace back:\n" + f"{''.join(traceback.format_tb(except_traceback))}") + try: + should_terminate = get_should_terminate_main() + except: + except_type, except_value, except_traceback = sys.exc_info() + fit_logger.warning(f"check get_should_terminate_main error, error type: {except_type}, " + f"value: {except_value}, trace back:\n" + f"{''.join(traceback.format_tb(except_traceback))}") + if hb_exit: + fit_logger.warning("main process will exit due to heartbeat background job exited unexpectedly.") + break + if should_terminate: + # 详细原因已在 terminate_main 插件内部按条件分别打印,这里汇总打印一次 + fit_logger.info("main process will exit due to terminate-main condition matched.") + break time.sleep(1) fit_logger.info("main process terminated.") shutdown() From 987b923822128cb7035a18d9b8a6a6d673278a9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=AD=E6=BD=87?= <1576730710@qq.com> Date: Tue, 18 Nov 2025 15:13:08 +0800 Subject: [PATCH 4/5] =?UTF-8?q?[python]=20=E7=A7=BB=E9=99=A4=E6=97=A0?= =?UTF-8?q?=E7=94=A8=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fitframework/utils/process_monitor.py | 88 --------- .../conf/application.yml | 6 - .../fit_py_health_monitor/conf/plugin.yml | 7 - .../fit_py_health_monitor/health_monitor.py | 176 ------------------ 4 files changed, 277 deletions(-) delete mode 100644 framework/fit/python/fitframework/utils/process_monitor.py delete mode 100644 framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml delete mode 100644 framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml delete mode 100644 framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py diff --git a/framework/fit/python/fitframework/utils/process_monitor.py b/framework/fit/python/fitframework/utils/process_monitor.py deleted file mode 100644 index bfb3c65a..00000000 --- a/framework/fit/python/fitframework/utils/process_monitor.py +++ /dev/null @@ -1,88 +0,0 @@ -# -- encoding: utf-8 -- -# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. -# This file is a part of the ModelEngine Project. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ====================================================================================================================== -""" -功 能:进程监控和健康检查工具 -""" -import time -from typing import Optional - -import psutil - -from fitframework.api.logging import fit_logger - - -class ProcessMonitor: - """进程监控器,用于监控主进程的健康状态""" - - def __init__(self, pid: int, max_memory_mb: int = 1024, max_cpu_percent: float = 80.0): - self.pid = pid - self.max_memory_mb = max_memory_mb - self.max_cpu_percent = max_cpu_percent - self.start_time = time.time() - self.last_check_time = time.time() - - def is_process_alive(self) -> bool: - """检查进程是否还活着""" - try: - return psutil.pid_exists(self.pid) - except Exception as e: - fit_logger.warning(f"Error checking process {self.pid}: {e}") - return False - - def get_process_info(self) -> Optional[dict]: - """获取进程信息""" - try: - if not self.is_process_alive(): - return None - - process = psutil.Process(self.pid) - return { - 'pid': self.pid, - 'memory_mb': process.memory_info().rss / 1024 / 1024, - 'cpu_percent': process.cpu_percent(), - 'status': process.status(), - 'create_time': process.create_time(), - 'num_threads': process.num_threads() - } - except Exception as e: - fit_logger.warning(f"Error getting process info for {self.pid}: {e}") - return None - - def is_healthy(self) -> bool: - """检查进程是否健康""" - info = self.get_process_info() - if not info: - return False - - # 检查内存使用 - if info['memory_mb'] > self.max_memory_mb: - fit_logger.warning(f"Process {self.pid} memory usage too high: {info['memory_mb']:.2f}MB > {self.max_memory_mb}MB") - return False - - # 检查CPU使用(需要两次采样) - cpu_percent = info['cpu_percent'] - if cpu_percent > self.max_cpu_percent: - fit_logger.warning(f"Process {self.pid} CPU usage too high: {cpu_percent:.2f}% > {self.max_cpu_percent}%") - return False - - return True - - def should_restart(self) -> bool: - """判断是否应该重启进程""" - if not self.is_process_alive(): - fit_logger.error(f"Process {self.pid} is not alive") - return True - - if not self.is_healthy(): - fit_logger.warning(f"Process {self.pid} is unhealthy") - return True - - return False - - -def create_process_monitor(pid: int) -> ProcessMonitor: - """创建进程监控器""" - return ProcessMonitor(pid) diff --git a/framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml b/framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml deleted file mode 100644 index 23c99c1b..00000000 --- a/framework/fit/python/plugin/fit_py_health_monitor/conf/application.yml +++ /dev/null @@ -1,6 +0,0 @@ -# 健康监控插件配置 -health-monitor: - check-interval: 30 # 健康检查间隔(秒) - memory-threshold: 80.0 # 内存使用率阈值(百分比) - cpu-threshold: 90.0 # CPU使用率阈值(百分比) - disk-threshold: 90.0 # 磁盘使用率阈值(百分比) diff --git a/framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml b/framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml deleted file mode 100644 index 1413fc98..00000000 --- a/framework/fit/python/plugin/fit_py_health_monitor/conf/plugin.yml +++ /dev/null @@ -1,7 +0,0 @@ -# 健康监控插件元数据 -name: fit_py_health_monitor -version: 1.0.0 -description: "系统健康监控插件" -author: "Fit Framework Team" -level: 1 -type: plugin diff --git a/framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py b/framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py deleted file mode 100644 index 2de96550..00000000 --- a/framework/fit/python/plugin/fit_py_health_monitor/health_monitor.py +++ /dev/null @@ -1,176 +0,0 @@ -# -- encoding: utf-8 -- -# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. -# This file is a part of the ModelEngine Project. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ====================================================================================================================== -""" -功 能:系统健康监控插件 -""" -import time -from typing import Dict, Any - -import psutil - -from fitframework.api.decorators import fitable, fit, value -from fitframework.api.decorators import register_event -from fitframework.api.enums import FrameworkEvent -from fitframework.api.logging import sys_plugin_logger - - -@value('health-monitor.check-interval', 30, converter=int) -def get_check_interval(): - """健康检查间隔(秒)""" - pass - - -@value('health-monitor.memory-threshold', 80.0, converter=float) -def get_memory_threshold(): - """内存使用率阈值(百分比)""" - pass - - -@value('health-monitor.cpu-threshold', 90.0, converter=float) -def get_cpu_threshold(): - """CPU使用率阈值(百分比)""" - pass - - -@value('health-monitor.disk-threshold', 90.0, converter=float) -def get_disk_threshold(): - """磁盘使用率阈值(百分比)""" - pass - - -class HealthMonitor: - """系统健康监控器""" - - def __init__(self): - self.last_check_time = time.time() - self.health_status = {} - self.alert_count = 0 - self.max_alerts = 10 - - def check_system_health(self) -> Dict[str, Any]: - """检查系统健康状态""" - try: - # 内存使用率 - memory = psutil.virtual_memory() - memory_percent = memory.percent - - # CPU使用率 - cpu_percent = psutil.cpu_percent(interval=1) - - # 磁盘使用率 - disk = psutil.disk_usage('/') - disk_percent = (disk.used / disk.total) * 100 - - # 进程信息 - current_process = psutil.Process() - process_memory = current_process.memory_info().rss / 1024 / 1024 # MB - process_cpu = current_process.cpu_percent() - - health_status = { - 'timestamp': time.time(), - 'memory': { - 'total': memory.total, - 'available': memory.available, - 'percent': memory_percent, - 'healthy': memory_percent < get_memory_threshold() - }, - 'cpu': { - 'percent': cpu_percent, - 'healthy': cpu_percent < get_cpu_threshold() - }, - 'disk': { - 'total': disk.total, - 'used': disk.used, - 'free': disk.free, - 'percent': disk_percent, - 'healthy': disk_percent < get_disk_threshold() - }, - 'process': { - 'memory_mb': process_memory, - 'cpu_percent': process_cpu, - 'pid': current_process.pid, - 'status': current_process.status() - } - } - - # 检查是否健康 - is_healthy = ( - health_status['memory']['healthy'] and - health_status['cpu']['healthy'] and - health_status['disk']['healthy'] - ) - - health_status['overall_healthy'] = is_healthy - - if not is_healthy: - self.alert_count += 1 - self._log_health_alert(health_status) - else: - self.alert_count = 0 # 重置告警计数 - - self.health_status = health_status - self.last_check_time = time.time() - - return health_status - - except Exception as e: - sys_plugin_logger.error(f"Error checking system health: {e}") - return {'error': str(e), 'overall_healthy': False} - - def _log_health_alert(self, health_status: Dict[str, Any]): - """记录健康告警""" - if self.alert_count > self.max_alerts: - return # 避免日志过多 - - alerts = [] - if not health_status['memory']['healthy']: - alerts.append(f"Memory usage: {health_status['memory']['percent']:.1f}%") - if not health_status['cpu']['healthy']: - alerts.append(f"CPU usage: {health_status['cpu']['percent']:.1f}%") - if not health_status['disk']['healthy']: - alerts.append(f"Disk usage: {health_status['disk']['percent']:.1f}%") - - if alerts: - sys_plugin_logger.warning(f"System health alert #{self.alert_count}: {', '.join(alerts)}") - - def get_health_summary(self) -> Dict[str, Any]: - """获取健康状态摘要""" - return { - 'last_check': self.last_check_time, - 'alert_count': self.alert_count, - 'status': self.health_status.get('overall_healthy', False), - 'details': self.health_status - } - - -# 全局健康监控器实例 -_health_monitor = HealthMonitor() - - -@fitable("modelengine.fit.health.check", "local-worker") -def check_health() -> Dict[str, Any]: - """执行健康检查""" - return _health_monitor.check_system_health() - - -@fit("modelengine.fit.health.status") -def get_health_status() -> Dict[str, Any]: - """获取健康状态""" - return _health_monitor.get_health_summary() - - -@register_event(FrameworkEvent.APPLICATION_STARTED) -def start_health_monitoring(): - """启动健康监控""" - sys_plugin_logger.info("Health monitoring started") - # 执行初始健康检查 - _health_monitor.check_system_health() - - -@register_event(FrameworkEvent.FRAMEWORK_STOPPING) -def stop_health_monitoring(): - """停止健康监控""" - sys_plugin_logger.info("Health monitoring stopped") From 3493a9f187be65c2c1df13501aa2f725ced01ee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=AD=E6=BD=87?= <1576730710@qq.com> Date: Tue, 18 Nov 2025 15:41:09 +0800 Subject: [PATCH 5/5] =?UTF-8?q?[python]=20=E4=BF=AE=E6=94=B9=E6=A3=80?= =?UTF-8?q?=E8=A7=86=E6=84=8F=E8=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fit/python/fitframework/__init__.py | 72 ++++++++----------- .../fitframework/utils/restart_policy.py | 2 +- 2 files changed, 32 insertions(+), 42 deletions(-) diff --git a/framework/fit/python/fitframework/__init__.py b/framework/fit/python/fitframework/__init__.py index 2dd9b4d3..f19106ff 100644 --- a/framework/fit/python/fitframework/__init__.py +++ b/framework/fit/python/fitframework/__init__.py @@ -180,6 +180,20 @@ def heart_beat_exit_unexpectedly() -> bool: pass +def _safe_check(checker, desc: str) -> bool: + """ + 安全执行checker,异常时打印日志并返回False + """ + try: + return checker() + except: + except_type, except_value, except_traceback = sys.exc_info() + fit_logger.warning(f"check {desc} error, error type: {except_type}, " + f"value: {except_value}, trace back:\n" + f"{''.join(traceback.format_tb(except_traceback))}") + return False + + @shutdown_on_error @timer def main(): @@ -197,22 +211,8 @@ def main(): fit_logger.info("terminate main enabled.") while True: # 明确区分退出原因并打印日志 - hb_exit = False - should_terminate = False - try: - hb_exit = heart_beat_exit_unexpectedly() - except: - except_type, except_value, except_traceback = sys.exc_info() - fit_logger.warning(f"check heart_beat_exit_unexpectedly error, error type: {except_type}, " - f"value: {except_value}, trace back:\n" - f"{''.join(traceback.format_tb(except_traceback))}") - try: - should_terminate = get_should_terminate_main() - except: - except_type, except_value, except_traceback = sys.exc_info() - fit_logger.warning(f"check get_should_terminate_main error, error type: {except_type}, " - f"value: {except_value}, trace back:\n" - f"{''.join(traceback.format_tb(except_traceback))}") + hb_exit = _safe_check(heart_beat_exit_unexpectedly, "heart_beat_exit_unexpectedly") + should_terminate = _safe_check(get_should_terminate_main, "get_should_terminate_main") if hb_exit: fit_logger.warning("main process will exit due to heartbeat background job exited unexpectedly.") break @@ -235,37 +235,27 @@ def main(): fit_logger.info(f"Starting process manager with restart policy: {restart_policy.get_status()}") while True: + exit_code = None try: main_process = Process(target=main, name='MainProcess') main_process.start() fit_logger.info(f"Main process started with PID: {main_process.pid}") main_process.join() - - # 检查进程退出码 exit_code = main_process.exitcode - fit_logger.info(f"Main process exited with code: {exit_code}") - - # 使用重启策略判断是否应该重启 - if not restart_policy.should_restart(exit_code): - fit_logger.info("Restart policy indicates no restart needed, stopping") - break + except Exception as e: + fit_logger.error(f"Error during process management: {e}") + exit_code = -1 - # 获取重启延迟 - restart_delay = restart_policy.get_restart_delay() - status = restart_policy.get_status() + fit_logger.info(f"Main process exited with code: {exit_code}") + # 使用重启策略判断是否应该重启 + if not restart_policy.should_restart(exit_code): + fit_logger.info("Restart policy indicates no restart needed, stopping") + break - fit_logger.warning(f"Main process exited unexpectedly, restarting in {restart_delay:.2f} seconds... " - f"(attempt {status['current_attempt']}/{status['max_attempts']})") - time.sleep(restart_delay) + # 获取重启延迟 + restart_delay = restart_policy.get_restart_delay() + status = restart_policy.get_status() - except Exception as e: - fit_logger.error(f"Error during process management: {e}") - if not restart_policy.should_restart(-1): # 使用-1表示异常退出 - fit_logger.error("Restart policy indicates no restart needed due to errors, stopping") - break - - restart_delay = restart_policy.get_restart_delay() - status = restart_policy.get_status() - fit_logger.warning(f"Error occurred, restarting in {restart_delay:.2f} seconds... " - f"(attempt {status['current_attempt']}/{status['max_attempts']})") - time.sleep(restart_delay) + fit_logger.warning(f"Main process exited unexpectedly, restarting in {restart_delay:.2f} seconds... " + f"(attempt {status['current_attempt']}/{status['max_attempts']})") + time.sleep(restart_delay) diff --git a/framework/fit/python/fitframework/utils/restart_policy.py b/framework/fit/python/fitframework/utils/restart_policy.py index ccf5c465..cbe44192 100644 --- a/framework/fit/python/fitframework/utils/restart_policy.py +++ b/framework/fit/python/fitframework/utils/restart_policy.py @@ -1,5 +1,5 @@ # -- encoding: utf-8 -- -# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. # This file is a part of the ModelEngine Project. # Licensed under the MIT License. See License.txt in the project root for license information. # ======================================================================================================================