Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions framework/fit/python/fitframework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,17 @@ def heart_beat_exit_unexpectedly() -> bool:
pass


def determine_should_terminate_main() -> bool:
def _safe_check(checker, desc: str) -> bool:
"""
安全执行checker,异常时打印日志并返回False
"""
try:
return heart_beat_exit_unexpectedly() or get_should_terminate_main()
return checker()
except:
except_type, except_value, except_traceback = sys.exc_info()
fit_logger.warning(f"get should terminate main error, error type: {except_type}, value: {except_value}, "
f"trace back:\n{''.join(traceback.format_tb(except_traceback))}")
fit_logger.warning(f"check {desc} error, error type: {except_type}, "
f"value: {except_value}, trace back:\n"
f"{''.join(traceback.format_tb(except_traceback))}")
return False


Expand All @@ -205,7 +209,17 @@ def main():
fit_logger.info(f"fit framework is now available in version {_FIT_FRAMEWORK_VERSION}.")
if get_terminate_main_enabled():
fit_logger.info("terminate main enabled.")
while not determine_should_terminate_main():
while True:
# 明确区分退出原因并打印日志
hb_exit = _safe_check(heart_beat_exit_unexpectedly, "heart_beat_exit_unexpectedly")
should_terminate = _safe_check(get_should_terminate_main, "get_should_terminate_main")
if hb_exit:
fit_logger.warning("main process will exit due to heartbeat background job exited unexpectedly.")
break
if should_terminate:
# 详细原因已在 terminate_main 插件内部按条件分别打印,这里汇总打印一次
fit_logger.info("main process will exit due to terminate-main condition matched.")
break
time.sleep(1)
fit_logger.info("main process terminated.")
shutdown()
Expand All @@ -215,7 +229,33 @@ def main():
if platform.system() in ('Windows', 'Darwin'): # Windows 或 macOS
main()
else: # Linux 及其他
from fitframework.utils.restart_policy import create_default_restart_policy

restart_policy = create_default_restart_policy()
fit_logger.info(f"Starting process manager with restart policy: {restart_policy.get_status()}")

while True:
main_process = Process(target=main, name='MainProcess')
main_process.start()
main_process.join()
exit_code = None
try:
main_process = Process(target=main, name='MainProcess')
main_process.start()
fit_logger.info(f"Main process started with PID: {main_process.pid}")
main_process.join()
exit_code = main_process.exitcode
except Exception as e:
fit_logger.error(f"Error during process management: {e}")
exit_code = -1

fit_logger.info(f"Main process exited with code: {exit_code}")
# 使用重启策略判断是否应该重启
if not restart_policy.should_restart(exit_code):
fit_logger.info("Restart policy indicates no restart needed, stopping")
break

# 获取重启延迟
restart_delay = restart_policy.get_restart_delay()
status = restart_policy.get_status()

fit_logger.warning(f"Main process exited unexpectedly, restarting in {restart_delay:.2f} seconds... "
f"(attempt {status['current_attempt']}/{status['max_attempts']})")
time.sleep(restart_delay)
107 changes: 107 additions & 0 deletions framework/fit/python/fitframework/utils/restart_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# -- encoding: utf-8 --
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the ModelEngine Project.
# Licensed under the MIT License. See License.txt in the project root for license information.
# ======================================================================================================================
"""
功 能:进程重启策略配置
"""
import time
from typing import Dict, Any
from fitframework.api.logging import fit_logger


class RestartPolicy:
"""进程重启策略"""

def __init__(self,
max_attempts: int = 10,
base_delay: float = 5.0,
max_delay: float = 300.0,
backoff_multiplier: float = 1.5,
reset_after_success: bool = True):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_multiplier = backoff_multiplier
self.reset_after_success = reset_after_success

self.current_attempt = 0
self.current_delay = base_delay
self.last_success_time = time.time()

def should_restart(self, exit_code: int) -> bool:
"""判断是否应该重启"""
# 正常退出不重启
if exit_code == 0:
if self.reset_after_success:
self._reset()
return False

# 超过最大尝试次数不重启
if self.current_attempt >= self.max_attempts:
fit_logger.error(f"Maximum restart attempts ({self.max_attempts}) reached")
return False

return True

def get_restart_delay(self) -> float:
"""获取重启延迟时间"""
delay = min(self.current_delay, self.max_delay)
self.current_attempt += 1

# 指数退避
if self.current_attempt < self.max_attempts:
self.current_delay = min(self.current_delay * self.backoff_multiplier, self.max_delay)

return delay

def _reset(self):
"""重置策略状态"""
self.current_attempt = 0
self.current_delay = self.base_delay
self.last_success_time = time.time()

def get_status(self) -> Dict[str, Any]:
"""获取当前状态"""
return {
'current_attempt': self.current_attempt,
'max_attempts': self.max_attempts,
'current_delay': self.current_delay,
'base_delay': self.base_delay,
'max_delay': self.max_delay,
'last_success_time': self.last_success_time
}


def create_default_restart_policy() -> RestartPolicy:
"""创建默认重启策略"""
return RestartPolicy(
max_attempts=10,
base_delay=5.0,
max_delay=300.0,
backoff_multiplier=1.5,
reset_after_success=True
)


def create_aggressive_restart_policy() -> RestartPolicy:
"""创建激进重启策略(快速重启)"""
return RestartPolicy(
max_attempts=20,
base_delay=2.0,
max_delay=60.0,
backoff_multiplier=1.2,
reset_after_success=True
)


def create_conservative_restart_policy() -> RestartPolicy:
"""创建保守重启策略(慢速重启)"""
return RestartPolicy(
max_attempts=5,
base_delay=10.0,
max_delay=600.0,
backoff_multiplier=2.0,
reset_after_success=True
)
7 changes: 5 additions & 2 deletions framework/fit/python/fitframework/utils/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ def _start():
@register_event(FrameworkEvent.FRAMEWORK_STOPPING)
def _stop():
global _timer
_timer.cancel()
fit_logger.info("timer stopped")
if _timer is not None:
_timer.cancel()
fit_logger.info("timer stopped")
else:
fit_logger.warning("timer was not initialized, skip stopping")


class _Timer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import traceback
from multiprocessing import Process
from queue import Empty
from threading import Thread
from threading import Thread, Lock
from typing import List

from fitframework import const
Expand All @@ -36,6 +36,10 @@
_LAST_HEART_BEAT_SUCCESS_TIME = time.time()
# 心跳进程是否意外退出
_HEART_BEAT_EXIT_UNEXPECTEDLY = False

# 是否正在关闭中(用于区分正常关闭和意外退出)
_SHUTDOWN_IN_PROGRESS = False
_SHUTDOWN_LOCK = Lock()
# 上次注册服务的时间,用于避免频繁注册覆盖热加载的服务
_LAST_REGISTRY_TIME = 0

Expand Down Expand Up @@ -118,13 +122,18 @@ def _try_heart_beat_once():

sys_plugin_logger.debug(f'heart beating success.')
_LAST_HEART_BEAT_SUCCESS_TIME = heart_beat_finish_time
except:
except Exception as e:
_FAIL_COUNT += 1
except_type, except_value, except_traceback = sys.exc_info()
sys_plugin_logger.warning(f"heart beat failed. [fail_count={_FAIL_COUNT}]")
sys_plugin_logger.warning(f"heart beat error type: {except_type}, value: {except_value}, "
f"trace back:\n{''.join(traceback.format_tb(except_traceback))}")

# 如果连续失败次数过多,考虑重启心跳任务
if _FAIL_COUNT >= 5:
sys_plugin_logger.error(f"heart beat failed too many times ({_FAIL_COUNT}), considering restart")
# 这里可以添加重启心跳任务的逻辑


def _heart_beat_task(queue: multiprocessing.Queue):
while True:
Expand All @@ -143,9 +152,18 @@ def _heart_beat_task(queue: multiprocessing.Queue):
def _heart_beat_monitor(heart_beat_background_job):
while heart_beat_background_job.is_alive():
time.sleep(1)
global _HEART_BEAT_EXIT_UNEXPECTEDLY
_HEART_BEAT_EXIT_UNEXPECTEDLY = True
global _HEART_BEAT_EXIT_UNEXPECTEDLY, _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK
# 检查是否正在关闭中
with _SHUTDOWN_LOCK:
if _SHUTDOWN_IN_PROGRESS:
# 如果是正常关闭,心跳任务退出是预期的,不需要再次调用shutdown
sys_plugin_logger.info("heart beat job exited during graceful shutdown, no action needed.")
return
# 如果不是正常关闭,则认为是意外退出
_HEART_BEAT_EXIT_UNEXPECTEDLY = True
sys_plugin_logger.error("heart beat job is not alive, runtime should shutdown immediately.")
# 添加延迟,给进程重启机制一些时间
time.sleep(2)
shutdown()


Expand All @@ -168,7 +186,11 @@ def online() -> None:
@register_event(Fit_Event.FRAMEWORK_STOPPING)
def offline():
""" Runtime关闭前应主动向心跳代理申请offline,心跳代理停止发送heartbeat并调用心跳服务端leave接口 """
global _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK
sys_plugin_logger.info("heart beat agent offline")
# 设置关闭标志,表示正在正常关闭
with _SHUTDOWN_LOCK:
_SHUTDOWN_IN_PROGRESS = True
_HEART_BEAT_FINISH_QUEUE.put(None)


Expand Down