diff --git a/framework/fit/python/fitframework/__init__.py b/framework/fit/python/fitframework/__init__.py index 7146b326..f58ac5c6 100644 --- a/framework/fit/python/fitframework/__init__.py +++ b/framework/fit/python/fitframework/__init__.py @@ -180,13 +180,17 @@ def heart_beat_exit_unexpectedly() -> bool: pass -def determine_should_terminate_main() -> bool: +def _safe_check(checker, desc: str) -> bool: + """ + 安全执行checker,异常时打印日志并返回False + """ try: - return heart_beat_exit_unexpectedly() or get_should_terminate_main() + return checker() except: except_type, except_value, except_traceback = sys.exc_info() - fit_logger.warning(f"get should terminate main error, error type: {except_type}, value: {except_value}, " - f"trace back:\n{''.join(traceback.format_tb(except_traceback))}") + fit_logger.warning(f"check {desc} error, error type: {except_type}, " + f"value: {except_value}, trace back:\n" + f"{''.join(traceback.format_tb(except_traceback))}") return False @@ -205,7 +209,17 @@ def main(): fit_logger.info(f"fit framework is now available in version {_FIT_FRAMEWORK_VERSION}.") if get_terminate_main_enabled(): fit_logger.info("terminate main enabled.") - while not determine_should_terminate_main(): + while True: + # 明确区分退出原因并打印日志 + hb_exit = _safe_check(heart_beat_exit_unexpectedly, "heart_beat_exit_unexpectedly") + should_terminate = _safe_check(get_should_terminate_main, "get_should_terminate_main") + if hb_exit: + fit_logger.warning("main process will exit due to heartbeat background job exited unexpectedly.") + break + if should_terminate: + # 详细原因已在 terminate_main 插件内部按条件分别打印,这里汇总打印一次 + fit_logger.info("main process will exit due to terminate-main condition matched.") + break time.sleep(1) fit_logger.info("main process terminated.") shutdown() @@ -215,7 +229,33 @@ def main(): if platform.system() in ('Windows', 'Darwin'): # Windows 或 macOS main() else: # Linux 及其他 + from fitframework.utils.restart_policy import create_default_restart_policy + + restart_policy = create_default_restart_policy() + fit_logger.info(f"Starting process manager with restart policy: {restart_policy.get_status()}") + while True: - main_process = Process(target=main, name='MainProcess') - main_process.start() - main_process.join() + exit_code = None + try: + main_process = Process(target=main, name='MainProcess') + main_process.start() + fit_logger.info(f"Main process started with PID: {main_process.pid}") + main_process.join() + exit_code = main_process.exitcode + except Exception as e: + fit_logger.error(f"Error during process management: {e}") + exit_code = -1 + + fit_logger.info(f"Main process exited with code: {exit_code}") + # 使用重启策略判断是否应该重启 + if not restart_policy.should_restart(exit_code): + fit_logger.info("Restart policy indicates no restart needed, stopping") + break + + # 获取重启延迟 + restart_delay = restart_policy.get_restart_delay() + status = restart_policy.get_status() + + fit_logger.warning(f"Main process exited unexpectedly, restarting in {restart_delay:.2f} seconds... " + f"(attempt {status['current_attempt']}/{status['max_attempts']})") + time.sleep(restart_delay) diff --git a/framework/fit/python/fitframework/utils/restart_policy.py b/framework/fit/python/fitframework/utils/restart_policy.py new file mode 100644 index 00000000..cbe44192 --- /dev/null +++ b/framework/fit/python/fitframework/utils/restart_policy.py @@ -0,0 +1,107 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +功 能:进程重启策略配置 +""" +import time +from typing import Dict, Any +from fitframework.api.logging import fit_logger + + +class RestartPolicy: + """进程重启策略""" + + def __init__(self, + max_attempts: int = 10, + base_delay: float = 5.0, + max_delay: float = 300.0, + backoff_multiplier: float = 1.5, + reset_after_success: bool = True): + self.max_attempts = max_attempts + self.base_delay = base_delay + self.max_delay = max_delay + self.backoff_multiplier = backoff_multiplier + self.reset_after_success = reset_after_success + + self.current_attempt = 0 + self.current_delay = base_delay + self.last_success_time = time.time() + + def should_restart(self, exit_code: int) -> bool: + """判断是否应该重启""" + # 正常退出不重启 + if exit_code == 0: + if self.reset_after_success: + self._reset() + return False + + # 超过最大尝试次数不重启 + if self.current_attempt >= self.max_attempts: + fit_logger.error(f"Maximum restart attempts ({self.max_attempts}) reached") + return False + + return True + + def get_restart_delay(self) -> float: + """获取重启延迟时间""" + delay = min(self.current_delay, self.max_delay) + self.current_attempt += 1 + + # 指数退避 + if self.current_attempt < self.max_attempts: + self.current_delay = min(self.current_delay * self.backoff_multiplier, self.max_delay) + + return delay + + def _reset(self): + """重置策略状态""" + self.current_attempt = 0 + self.current_delay = self.base_delay + self.last_success_time = time.time() + + def get_status(self) -> Dict[str, Any]: + """获取当前状态""" + return { + 'current_attempt': self.current_attempt, + 'max_attempts': self.max_attempts, + 'current_delay': self.current_delay, + 'base_delay': self.base_delay, + 'max_delay': self.max_delay, + 'last_success_time': self.last_success_time + } + + +def create_default_restart_policy() -> RestartPolicy: + """创建默认重启策略""" + return RestartPolicy( + max_attempts=10, + base_delay=5.0, + max_delay=300.0, + backoff_multiplier=1.5, + reset_after_success=True + ) + + +def create_aggressive_restart_policy() -> RestartPolicy: + """创建激进重启策略(快速重启)""" + return RestartPolicy( + max_attempts=20, + base_delay=2.0, + max_delay=60.0, + backoff_multiplier=1.2, + reset_after_success=True + ) + + +def create_conservative_restart_policy() -> RestartPolicy: + """创建保守重启策略(慢速重启)""" + return RestartPolicy( + max_attempts=5, + base_delay=10.0, + max_delay=600.0, + backoff_multiplier=2.0, + reset_after_success=True + ) diff --git a/framework/fit/python/fitframework/utils/scheduler.py b/framework/fit/python/fitframework/utils/scheduler.py index b0c78894..04b62450 100644 --- a/framework/fit/python/fitframework/utils/scheduler.py +++ b/framework/fit/python/fitframework/utils/scheduler.py @@ -109,8 +109,11 @@ def _start(): @register_event(FrameworkEvent.FRAMEWORK_STOPPING) def _stop(): global _timer - _timer.cancel() - fit_logger.info("timer stopped") + if _timer is not None: + _timer.cancel() + fit_logger.info("timer stopped") + else: + fit_logger.warning("timer was not initialized, skip stopping") class _Timer: diff --git a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py index 4dbd4c28..d2917598 100644 --- a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py +++ b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py @@ -18,7 +18,7 @@ import traceback from multiprocessing import Process from queue import Empty -from threading import Thread +from threading import Thread, Lock from typing import List from fitframework import const @@ -36,6 +36,10 @@ _LAST_HEART_BEAT_SUCCESS_TIME = time.time() # 心跳进程是否意外退出 _HEART_BEAT_EXIT_UNEXPECTEDLY = False + +# 是否正在关闭中(用于区分正常关闭和意外退出) +_SHUTDOWN_IN_PROGRESS = False +_SHUTDOWN_LOCK = Lock() # 上次注册服务的时间,用于避免频繁注册覆盖热加载的服务 _LAST_REGISTRY_TIME = 0 @@ -118,13 +122,18 @@ def _try_heart_beat_once(): sys_plugin_logger.debug(f'heart beating success.') _LAST_HEART_BEAT_SUCCESS_TIME = heart_beat_finish_time - except: + except Exception as e: _FAIL_COUNT += 1 except_type, except_value, except_traceback = sys.exc_info() sys_plugin_logger.warning(f"heart beat failed. [fail_count={_FAIL_COUNT}]") sys_plugin_logger.warning(f"heart beat error type: {except_type}, value: {except_value}, " f"trace back:\n{''.join(traceback.format_tb(except_traceback))}") + # 如果连续失败次数过多,考虑重启心跳任务 + if _FAIL_COUNT >= 5: + sys_plugin_logger.error(f"heart beat failed too many times ({_FAIL_COUNT}), considering restart") + # 这里可以添加重启心跳任务的逻辑 + def _heart_beat_task(queue: multiprocessing.Queue): while True: @@ -143,9 +152,18 @@ def _heart_beat_task(queue: multiprocessing.Queue): def _heart_beat_monitor(heart_beat_background_job): while heart_beat_background_job.is_alive(): time.sleep(1) - global _HEART_BEAT_EXIT_UNEXPECTEDLY - _HEART_BEAT_EXIT_UNEXPECTEDLY = True + global _HEART_BEAT_EXIT_UNEXPECTEDLY, _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK + # 检查是否正在关闭中 + with _SHUTDOWN_LOCK: + if _SHUTDOWN_IN_PROGRESS: + # 如果是正常关闭,心跳任务退出是预期的,不需要再次调用shutdown + sys_plugin_logger.info("heart beat job exited during graceful shutdown, no action needed.") + return + # 如果不是正常关闭,则认为是意外退出 + _HEART_BEAT_EXIT_UNEXPECTEDLY = True sys_plugin_logger.error("heart beat job is not alive, runtime should shutdown immediately.") + # 添加延迟,给进程重启机制一些时间 + time.sleep(2) shutdown() @@ -168,7 +186,11 @@ def online() -> None: @register_event(Fit_Event.FRAMEWORK_STOPPING) def offline(): """ Runtime关闭前应主动向心跳代理申请offline,心跳代理停止发送heartbeat并调用心跳服务端leave接口 """ + global _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK sys_plugin_logger.info("heart beat agent offline") + # 设置关闭标志,表示正在正常关闭 + with _SHUTDOWN_LOCK: + _SHUTDOWN_IN_PROGRESS = True _HEART_BEAT_FINISH_QUEUE.put(None)