diff --git a/framework/fit/python/bootstrap/fit_py_service_db/service_db.py b/framework/fit/python/bootstrap/fit_py_service_db/service_db.py index f3110ecb..ce7d7500 100644 --- a/framework/fit/python/bootstrap/fit_py_service_db/service_db.py +++ b/framework/fit/python/bootstrap/fit_py_service_db/service_db.py @@ -127,7 +127,7 @@ def register_all_fit_services() -> None: fitable_aliases_infos = reduce(list.__add__, list(_plugin_fitable_dict.values())) local_fitable_aliases_infos = [] for fitable_aliases_info in fitable_aliases_infos: - if not _local_only_invoke(fitable_aliases_info.fitable.genericable_id): + if not _local_only_invoke(fitable_aliases_info.fitable.genericableId): local_fitable_aliases_infos.append(fitable_aliases_info) online_fit_services(local_fitable_aliases_infos) except FitException: diff --git a/framework/fit/python/conf/application.yml b/framework/fit/python/conf/application.yml index 1e581840..446d1fb8 100644 --- a/framework/fit/python/conf/application.yml +++ b/framework/fit/python/conf/application.yml @@ -17,4 +17,24 @@ debug-console: true terminate-main: enabled: false local_ip: "localhost" -context-path: "" \ No newline at end of file +context-path: "" +http: + server: + enabled: true + address: + use-random-port: false + port: 9666 + port-to-register: + protocol: 2 + formats: + - 1 + - 2 +registry-center: + server: + mode: 'DIRECT' # DIRECT 表示直连,直接连接内存注册中心;PROXY 表示代理模式,通过本地代理服务连接 Nacos 注册中心 + addresses: + - "localhost:8080" + protocol: 2 + formats: + - 1 + context-path: "" \ No newline at end of file diff --git a/framework/fit/python/conf/fit.yml b/framework/fit/python/conf/fit.yml index 51f86858..64155ee8 100644 --- a/framework/fit/python/conf/fit.yml +++ b/framework/fit/python/conf/fit.yml @@ -155,11 +155,17 @@ fit.public.genericables.2ac926e6e40245b78b7bdda23bcb727b: route: default: "ONLINE_FIT_SERVICE_FITABLE_ID" fit.public.genericables.modelengine.fit.registry.registry-service.query-running-fitables: - name: "QUERY_FITABLE_METAS_GEN_ID" + name: "query_fitable_metas_gen_id" tags: - "nonTraceable" route: default: "query-running-fitables" +fit.public.genericables.modelengine.fit.registry.registry-service.register-fitables: + name: 'register_fitables_gen_id' + tags: + - 'nonTraceable' + route: + default: 'register-fitables' fit.public.genericables.GET_FITABLES_OF_GENERICABLE_GEN_ID: name: "get_fitables_of_genericable" tags: @@ -499,4 +505,4 @@ fit.public.genericables.modelengine.fit.get.earliest.start.time: default: "local-worker" tags: - "localOnly" - - "nonTraceable" + - "nonTraceable" \ No newline at end of file diff --git a/framework/fit/python/fit_common_struct/core.py b/framework/fit/python/fit_common_struct/core.py index 5b193c3c..c20b1e03 100644 --- a/framework/fit/python/fit_common_struct/core.py +++ b/framework/fit/python/fit_common_struct/core.py @@ -10,9 +10,9 @@ class Genericable(object): - def __init__(self, genericable_id: str, genericable_version: str): - self.genericable_id = genericable_id - self.genericable_version = genericable_version + def __init__(self, genericableId: str, genericableVersion: str): + self.genericableId = genericableId + self.genericableVersion = genericableVersion def __eq__(self, other): if not isinstance(other, self.__class__): @@ -28,11 +28,11 @@ def __repr__(self): class Fitable(object): - def __init__(self, genericable_id: str, genericable_version: str, fitable_id: str, fitable_version: str): - self.genericable_id = genericable_id - self.genericable_version = genericable_version - self.fitable_id = fitable_id - self.fitable_version = fitable_version + def __init__(self, genericableId: str, genericableVersion: str, fitableId: str, fitableVersion: str): + self.genericableId = genericableId + self.genericableVersion = genericableVersion + self.fitableId = fitableId + self.fitableVersion = fitableVersion def __eq__(self, other): if not isinstance(other, self.__class__): diff --git a/framework/fit/python/plugin/fit_py_registry_client/entity.py b/framework/fit/python/fit_common_struct/entity.py similarity index 79% rename from framework/fit/python/plugin/fit_py_registry_client/entity.py rename to framework/fit/python/fit_common_struct/entity.py index ffffa438..fbb90efc 100644 --- a/framework/fit/python/plugin/fit_py_registry_client/entity.py +++ b/framework/fit/python/fit_common_struct/entity.py @@ -10,49 +10,25 @@ from numpy import int32 from fit_common_struct.core import Address as AddressInner +from fit_common_struct.core import Fitable -class FitableInfo(object): - - def __init__(self, genericableId: str, genericableVersion: str, fitableId: str, fitableVersion: str): - self.genericableId = genericableId - self.genericableVersion = genericableVersion - self.fitableId = fitableId - self.fitableVersion = fitableVersion - - def __eq__(self, other): - if not isinstance(other, self.__class__): - return False - return self.__dict__ == other.__dict__ - - def __hash__(self): - return hash(tuple(self.__dict__.values())) - - def __repr__(self): - return str(tuple(self.__dict__.values())) - - -class GenericableInfo: - - def __init__(self, genericableId: str, genericableVersion: str): - self.genericableId = genericableId - self.genericableVersion = genericableVersion - - def __eq__(self, other): - if not isinstance(other, self.__class__): - return False - return self.__dict__ == other.__dict__ - - def __hash__(self): - return hash(tuple(self.__dict__.values())) - - def __repr__(self): - return str(tuple(self.__dict__.values())) +def safe_hash_dict(obj_dict): + """安全地计算包含列表的字典的哈希值""" + hashable_values = [] + for value in obj_dict.values(): + if isinstance(value, list): + hashable_values.append(tuple(value)) + elif isinstance(value, dict): + hashable_values.append(tuple(sorted(value.items()))) + else: + hashable_values.append(value) + return hash(tuple(hashable_values)) class FitableMeta(object): - def __init__(self, fitable: FitableInfo, aliases: List[str], formats: List[int32]): + def __init__(self, fitable: Fitable, aliases: List[str], formats: List[int32]): self.fitable = fitable self.aliases = aliases @@ -68,7 +44,8 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ def __hash__(self): - return hash(tuple(self.__dict__.values())) + # 使用安全的哈希函数处理包含列表的对象 + return safe_hash_dict(self.__dict__) def __repr__(self): return str(tuple(self.__dict__.values())) @@ -131,7 +108,7 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ def __hash__(self): - return hash(tuple(self.__dict__.values())) + return safe_hash_dict(self.__dict__) def __repr__(self): return str(tuple(self.__dict__.values())) @@ -154,7 +131,7 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ def __hash__(self): - return hash(tuple(self.__dict__.values())) + return safe_hash_dict(self.__dict__) def __repr__(self): return str(tuple(self.__dict__.values())) @@ -178,7 +155,7 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ def __hash__(self): - return hash(tuple(self.__dict__.values())) + return safe_hash_dict(self.__dict__) def __repr__(self): return str(tuple(self.__dict__.values())) @@ -186,7 +163,7 @@ def __repr__(self): class FitableAddressInstance(object): - def __init__(self, applicationInstances: List[ApplicationInstance], fitable: FitableInfo): + def __init__(self, applicationInstances: List[ApplicationInstance], fitable: Fitable): self.applicationInstances = applicationInstances self.fitable = fitable @@ -196,7 +173,7 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ def __hash__(self): - return hash(tuple(self.__dict__.values())) + return safe_hash_dict(self.__dict__) def __repr__(self): return str(tuple(self.__dict__.values())) @@ -214,7 +191,41 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ def __hash__(self): - return hash(tuple(self.__dict__.values())) + return safe_hash_dict(self.__dict__) + + def __repr__(self): + return str(tuple(self.__dict__.values())) + +class HeartBeatInfo(object): + + def __init__(self, sceneType: str, aliveTime: int, initDelay: int): + self.sceneType: str = sceneType + self.aliveTime: int = aliveTime + self.initDelay: int = initDelay + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return self.__dict__ == other.__dict__ + + def __hash__(self): + return safe_hash_dict(self.__dict__) + + def __repr__(self): + return str(tuple(self.__dict__.values())) + + +class HeartBeatAddress(object): + def __init__(self, id_: str): + self.id = id_ + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return self.__dict__ == other.__dict__ + + def __hash__(self): + return safe_hash_dict(self.__dict__) def __repr__(self): return str(tuple(self.__dict__.values())) diff --git a/framework/fit/python/fitframework/const.py b/framework/fit/python/fitframework/const.py index 766edf53..645f4098 100644 --- a/framework/fit/python/fitframework/const.py +++ b/framework/fit/python/fitframework/const.py @@ -231,12 +231,23 @@ # registry server QUERY_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.query-fitables-addresses' +QUERY_FIT_SERVICE_FIT_ID = 'query-fitables-addresses' SUBSCRIBE_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.subscribe-fitables' +SUBSCRIBE_FIT_SERVICE_FIT_ID = 'subscribe-fitables' +UNSUBSCRIBE_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.unsubscribe-fitables' +UNSUBSCRIBE_FIT_SERVICE_FIT_ID = 'unsubscribe-fitables' REGISTER_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.register-fitables' +REGISTER_FIT_SERVICE_FIT_ID = 'register-fitables' +UNREGISTER_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.unregister-fitables' +UNREGISTER_FIT_SERVICE_FIT_ID = 'unregister-fitables' QUERY_FITABLE_METAS_GEN_ID = 'modelengine.fit.registry.registry-service.query-running-fitables' +QUERY_FITABLE_METAS_FIT_ID = 'query-running-fitables' # heartbeat server -HEART_BEAT_GEN_ID = 'modelengine.fit.heartbeat.send-heartbeat' +SEND_HEART_BEAT_GEN_ID = 'modelengine.fit.heartbeat.send-heartbeat' +SEND_HEART_BEAT_FIT_ID = 'send-heartbeat' +STOP_HEART_BEAT_GEN_ID = 'modelengine.fit.heartbeat.stop-heartbeat' +STOP_HEART_BEAT_FIT_ID = 'stop-heartbeat' # debugger DEBUGGER_START_FIT_ID = 'debugger_start_fitable_id' diff --git a/framework/fit/python/fitframework/core/broker/configure_based_brokerimpl.py b/framework/fit/python/fitframework/core/broker/configure_based_brokerimpl.py index b42ce9fd..67da2b20 100644 --- a/framework/fit/python/fitframework/core/broker/configure_based_brokerimpl.py +++ b/framework/fit/python/fitframework/core/broker/configure_based_brokerimpl.py @@ -317,7 +317,7 @@ def default_load_balancing(self, generic_id, fitable_id, fitable: Fitable): f"addresses count: {len(addresses)}") if len(addresses) == 0: fit_logger.warning(f"cannot get any address can use in this worker. " - f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]") + f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]") return None # no choice! if len(addresses) == 1: @@ -339,13 +339,13 @@ def get_fit_service_addresses(self, fitable: Fitable) -> List[Address]: addresses: List[Address] = _get_fit_service_address_with_priorities(fitable) if not addresses: fit_logger.warning(f"cannot get any endpoint after checking format and protocol. " - f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]") + f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]") return [] addresses: List[Address] = _load_balance_env_filtering(addresses) if not addresses: fit_logger.warning(f"cannot get any endpoint after filtering by environment. " - f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]") + f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]") return [] return addresses @@ -398,7 +398,7 @@ def custom_load_balancing(self, address_filter: Callable[[Address], bool], addresses: List[Address] = self.get_fit_service_addresses(fitable) if len(addresses) == 0: fit_logger.warning(f"cannot get any address can use in this worker. " - f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]") + f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]") return None try: addresses = [address for address in addresses if address_filter(address)] @@ -407,15 +407,15 @@ def custom_load_balancing(self, address_filter: Callable[[Address], bool], return None if not addresses: fit_logger.warning(f"cannot get any address after custom load balancing. " - f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]") + f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]") return None if len(addresses) > 1: fit_logger.warning(f"get more than one address after custom load balancing. " - f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]") + f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]") return addresses[0] if addresses[0].id == _worker_id(): - return service_repo.get_fitable_ref(fitable.genericable_id, fitable.fitable_id) + return service_repo.get_fitable_ref(fitable.genericableId, fitable.fitableId) return addresses[0] @@ -466,9 +466,9 @@ def lb_call_template(fitable_info: Fitable, target_addresses: List[Address]) -> pass args = fitable, addresses - lb_fitable_id = get_fit_ffp_fitable_id(fitable.genericable_id, 'load_balance') + lb_fitable_id = get_fit_ffp_fitable_id(fitable.genericableId, 'load_balance') if lb_fitable_id: - fit_invoke_info = (fitable.genericable_id, lb_fitable_id, lb_call_template) + fit_invoke_info = (fitable.genericableId, lb_fitable_id, lb_call_template) return _ffp_invoke(fit_invoke_info, False, None, None, *args) else: fit_invoke_info = (const.LOAD_BALANCING_GEN_ID, const.LOAD_BALANCING_RANDOM_FIT_ID, lb_call_template) @@ -591,7 +591,7 @@ def _get_fit_service_address_with_priorities(fitable: Fitable) -> List[Address]: def _get_fit_service_address_and_convert(fitable: Fitable) -> List[Address]: addresses: List[Address] = get_fit_service_address_list(fitable) - fit_logger.debug(f"got address, gid: {fitable.genericable_id}, count: {len(addresses)}") + fit_logger.debug(f"got address, gid: {fitable.genericableId}, count: {len(addresses)}") return addresses diff --git a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py index 2fd79eb7..cd414704 100644 --- a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py +++ b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py @@ -26,7 +26,7 @@ from fitframework.api.decorators import register_event from fitframework.api.enums import FrameworkEvent as Fit_Event from fitframework.api.logging import sys_plugin_logger -from .heart_beat_utils import HeartBeatAddress, HeartBeatInfo +from fit_common_struct.entity import HeartBeatInfo, HeartBeatAddress # 用于控制心跳任务退出的队列 _HEART_BEAT_FINISH_QUEUE = multiprocessing.Queue() @@ -71,7 +71,7 @@ def get_runtime_worker_id() -> str: pass -@fit(const.HEART_BEAT_GEN_ID) +@fit(const.SEND_HEART_BEAT_GEN_ID) def heartbeat(beat_info: List[HeartBeatInfo], address: HeartBeatAddress) -> bool: """ 可能返回 false,也可能抛出异常,也可能超时 """ pass diff --git a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_utils.py b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_utils.py index 8da66bee..be3c2e46 100644 --- a/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_utils.py +++ b/framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_utils.py @@ -12,18 +12,6 @@ from fitframework.api.logging import sys_plugin_logger -class HeartBeatInfo: - def __init__(self, sceneType: str, aliveTime: int, initDelay: int): - self.sceneType: str = sceneType - self.aliveTime: int = aliveTime - self.initDelay: int = initDelay - - -class HeartBeatAddress: - def __init__(self, id_: str): - self.id = id_ - - def timeout_or_exception_retry(timeout: int = 3, a_exception=Exception, max_retry: int = 1): """ diff --git a/framework/fit/python/plugin/fit_py_http_client/conf/application.yml b/framework/fit/python/plugin/fit_py_http_client/conf/application.yml index 783c26c3..e6baccab 100644 --- a/framework/fit/python/plugin/fit_py_http_client/conf/application.yml +++ b/framework/fit/python/plugin/fit_py_http_client/conf/application.yml @@ -8,4 +8,4 @@ https: key_path: "plugin/fit_py_http_client/resources/global.key" key_file_encrypted: false # 私钥是否被加密,仅当 cert_enabled 为 true 时有意义 key_file_password: "" # 私钥的密码,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义 - key_file_password_encrypted: false # 私钥的密码是否被加密,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义 + key_file_password_encrypted: false # 私钥的密码是否被加密,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义 \ No newline at end of file diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/conf/application.yml b/framework/fit/python/plugin/fit_py_nacos_registry/conf/application.yml new file mode 100644 index 00000000..902e5c97 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/conf/application.yml @@ -0,0 +1,3 @@ +nacos: + async: + timeout: 30 \ No newline at end of file diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/conf/info.yml b/framework/fit/python/plugin/fit_py_nacos_registry/conf/info.yml new file mode 100644 index 00000000..f3899853 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/conf/info.yml @@ -0,0 +1,2 @@ +category: "system" +level: 4 \ No newline at end of file diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/heartbeat/heartbeat_service.py b/framework/fit/python/plugin/fit_py_nacos_registry/heartbeat/heartbeat_service.py new file mode 100644 index 00000000..7cd2351b --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/heartbeat/heartbeat_service.py @@ -0,0 +1,32 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +功 能:服务上线相关功能。 +""" + +from typing import List +from fitframework import fitable, const +from fit_common_struct.entity import HeartBeatInfo, HeartBeatAddress + +@fitable(const.SEND_HEART_BEAT_GEN_ID, const.SEND_HEART_BEAT_FIT_ID) +def send_heartbeat(heartbeatInfo: List[HeartBeatInfo], address: HeartBeatAddress) -> bool: + """ + 发送心跳信息。 + + @param heartbeatInfo: 表示待停止心跳信息列表 + @param address: 表示待停止心跳信息列表。 + """ + return True + +@fitable(const.STOP_HEART_BEAT_GEN_ID, const.STOP_HEART_BEAT_FIT_ID) +def stop_Heartbeat(heartbeatInfo: List[HeartBeatInfo], address: HeartBeatAddress) -> bool: + """ + 发送停止心跳信息。 + + @param heartbeatInfo: 表示待停止心跳信息列表。 + @param address: 表示待停止心跳信息列表。 + """ + return True \ No newline at end of file diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/service/__init__.py b/framework/fit/python/plugin/fit_py_nacos_registry/service/__init__.py new file mode 100644 index 00000000..6f4ff757 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/service/__init__.py @@ -0,0 +1,26 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +Nacos registry service package. +""" + +from .nacos_registry_server import ( + register_fitables, + unregister_fitables, + query_fitable_addresses, + subscribe_fit_service, + unsubscribe_fitables, + query_fitable_metas +) + +__all__ = [ + 'register_fitables', + 'unregister_fitables', + 'query_fitable_addresses', + 'subscribe_fit_service', + 'unsubscribe_fitables', + 'query_fitable_metas' +] diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/service/async_executor.py b/framework/fit/python/plugin/fit_py_nacos_registry/service/async_executor.py new file mode 100644 index 00000000..5bee3a4d --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/service/async_executor.py @@ -0,0 +1,220 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +Async executor for Nacos operations. + +This module provides an async executor for handling Nacos operations +in a background thread with proper event loop management. +""" +import asyncio +import atexit +import threading +from concurrent.futures import Future +from fitframework import value + +from v2.nacos import NacosNamingService, RegisterInstanceParam, ListInstanceParam, \ + DeregisterInstanceParam, SubscribeServiceParam, ListServiceParam + +from fitframework.api.logging import sys_plugin_logger +from .config import build_nacos_config +@value("nacos.async.timeout",default_value=10, converter=int) +def get_nacos_async_timeout(): + pass + +class AsyncExecutor: + """Executor for handling asynchronous operations in a background thread.""" + + def __init__(self): + self._loop = None + self._thread = None + self._started = False + self._shutdown = False + self._nacos_client = None + self._init_complete = threading.Event() + + def start(self): + """Start the background event loop thread.""" + if self._started: + return + + self._thread = threading.Thread( + target=self._run_event_loop, + daemon=True, + name="NacosAsyncThread" + ) + self._thread.start() + + # Wait for initialization to complete + if not self._init_complete.wait(timeout=10): # Max wait 10 seconds + raise RuntimeError("Failed to initialize async executor within timeout") + + self._started = True + + def _run_event_loop(self): + """Run the event loop in the background thread.""" + try: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + # Create Nacos client in this event loop + async def init_nacos_client(): + try: + config = build_nacos_config() + self._nacos_client = await NacosNamingService.create_naming_service(config) + sys_plugin_logger.info("Nacos client initialized successfully") + except Exception as e: + sys_plugin_logger.error(f"Failed to initialize Nacos client: {e}") + raise + finally: + # Mark initialization complete + self._init_complete.set() + + self._loop.run_until_complete(init_nacos_client()) + + # Run event loop until shutdown + self._loop.run_forever() + except Exception as e: + sys_plugin_logger.error(f"Error in async executor event loop: {e}") + self._init_complete.set() # Set even on failure to avoid infinite wait + finally: + try: + if self._nacos_client: + # Try to close the client if it has a close method + try: + self._nacos_client.shutdown() + sys_plugin_logger.info("Nacos client cleaned up") + except Exception as cleanup_error: + sys_plugin_logger.error(f"Error cleaning up Nacos client: {cleanup_error}") + + if self._loop and not self._loop.is_closed(): + self._loop.close() + except Exception as e: + sys_plugin_logger.error(f"Error during cleanup: {e}") + + def run_coroutine(self, coro): + """ + Run a coroutine in the background event loop and return the result. + + Args: + coro: The coroutine to run. + + Returns: + The result of the coroutine. + + Raises: + RuntimeError: If the executor is not properly initialized. + """ + if not self._started: + self.start() + + if self._loop is None or self._nacos_client is None: + raise RuntimeError("Async executor not properly initialized") + + # Create a Future to get the result + result_future = Future() + + async def wrapped_coro(): + try: + result = await coro + result_future.set_result(result) + except Exception as e: + result_future.set_exception(e) + + # Schedule the coroutine in the event loop + self._loop.call_soon_threadsafe(asyncio.create_task, wrapped_coro()) + + # Wait for result + return result_future.result(timeout=get_nacos_async_timeout()) + + def get_nacos_client(self): + """ + Get the Nacos client instance. + + Returns: + The Nacos client instance. + """ + if not self._started: + self.start() + return self._nacos_client + + def shutdown(self): + """Shutdown the async executor.""" + if self._loop and not self._loop.is_closed(): + self._loop.call_soon_threadsafe(self._loop.stop) + self._shutdown = True + + +# Global async executor +_async_executor = AsyncExecutor() + + +def run_async_safely(coro): + """ + Run an async operation safely using the dedicated executor. + + Args: + coro: The coroutine to run. + + Returns: + The result of the coroutine. + + Raises: + Exception: If the async operation fails. + """ + try: + return _async_executor.run_coroutine(coro) + except Exception as e: + sys_plugin_logger.error(f"Error running async operation: {e}") + raise + + +# Async wrapper functions +async def call_list_instances(param: ListInstanceParam): + """List instances.""" + client = _async_executor.get_nacos_client() + return await client.list_instances(param) + + +async def call_deregister_instance(param: DeregisterInstanceParam) -> bool: + """Deregister instance.""" + client = _async_executor.get_nacos_client() + return await client.deregister_instance(param) + + +async def call_subscribe(param: SubscribeServiceParam) -> None: + """Subscribe to service.""" + client = _async_executor.get_nacos_client() + await client.subscribe(param) + + +async def call_unsubscribe(param: SubscribeServiceParam) -> None: + """Unsubscribe from service.""" + client = _async_executor.get_nacos_client() + await client.unsubscribe(param) + + +async def call_list_services(param: ListServiceParam): + """List services.""" + client = _async_executor.get_nacos_client() + return await client.list_services(param) + + +async def call_register_instance(param: RegisterInstanceParam) -> None: + """Register instance.""" + client = _async_executor.get_nacos_client() + await client.register_instance(param) + + +def _cleanup_async_executor(): + """Cleanup the async executor.""" + try: + _async_executor.shutdown() + except Exception as e: + sys_plugin_logger.error(f"Error during async executor cleanup: {e}") + + +# Register cleanup function +atexit.register(_cleanup_async_executor) diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/service/config.py b/framework/fit/python/plugin/fit_py_nacos_registry/service/config.py new file mode 100644 index 00000000..2542cad7 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/service/config.py @@ -0,0 +1,134 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +Configuration module for Nacos registry server. +""" +from v2.nacos import ClientConfigBuilder + +from fitframework import value +from fitframework.utils import tools + + +@value('registry-center.server.addresses', converter=tools.to_list) +def get_registry_server_addresses() -> list: + """Get the list of registry server addresses.""" + pass + + +@value('nacos.username', default_value=None) +def get_nacos_username() -> str: + """ + Get the Nacos username. + + Returns: + Nacos username. + """ + pass + + +@value('nacos.password', default_value=None) +def get_nacos_password() -> str: + """ + Get the Nacos password. + + Returns: + Nacos password. + """ + pass + + +@value('nacos.accessKey', default_value=None) +def get_nacos_access_key() -> str: + """ + Get the Nacos access key. + + Returns: + Nacos access key. + """ + pass + + +@value('nacos.secretKey', default_value=None) +def get_nacos_secret_key() -> str: + """ + Get the Nacos secret key. + + Returns: + Nacos secret key. + """ + pass + + +@value('nacos.namespace', default_value="") +def get_nacos_namespace() -> str: + """ + Get the Nacos namespace. + + Returns: + Nacos namespace. + """ + pass + + +@value('nacos.isEphemeral', default_value=True, converter=bool) +def get_heartbeat_is_ephemeral() -> bool: + """ + Get whether the heartbeat is ephemeral. + + Returns: + Whether the heartbeat is ephemeral. + """ + pass + + +@value('nacos.heartBeatInterval', default_value=5000, converter=int) +def get_heartbeat_interval() -> int: + """ + Get the heartbeat interval in milliseconds. + + Returns: + Heartbeat interval in milliseconds. + """ + pass + + +@value('nacos.heartBeatTimeout', default_value=15000, converter=int) +def get_heartbeat_timeout() -> int: + """ + Get the heartbeat timeout in milliseconds. + + Returns: + Heartbeat timeout in milliseconds. + """ + pass + + +@value('nacos.weight', default_value=1.0, converter=float) +def get_heartbeat_weight() -> float: + """ + Get the heartbeat weight. + + Returns: + Heartbeat weight. + """ + pass + + +def build_nacos_config(): + """ + Build the Nacos client configuration. + + Returns: + Configured Nacos client config. + """ + return (ClientConfigBuilder() + .server_address(get_registry_server_addresses()[0]) + .namespace_id(get_nacos_namespace() or 'local') + .username(get_nacos_username()) + .password(get_nacos_password()) + .access_key(get_nacos_access_key()) + .secret_key(get_nacos_secret_key()) + .build()) diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/service/constants.py b/framework/fit/python/plugin/fit_py_nacos_registry/service/constants.py new file mode 100644 index 00000000..ef504c58 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/service/constants.py @@ -0,0 +1,29 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +Constants for Nacos registry server. +""" +import re + +# Metadata keys +CLUSTER_DOMAIN_KEY = "cluster.domain" +WORKER_KEY = "worker" +APPLICATION_KEY = "application" +FITABLE_META_KEY = "fitable-meta" + +# Patterns and separators +CLUSTER_PORT_PATTERN = re.compile(r"cluster\.(.*?)\.port") +SEPARATOR = "::" + +# Protocol code mapping +PROTOCOL_CODE_MAP = { + "rsocket": 0, + "socket": 1, + "http": 2, + "grpc": 3, + "uc": 10, + "share_memory": 11 +} diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/service/nacos_registry_server.py b/framework/fit/python/plugin/fit_py_nacos_registry/service/nacos_registry_server.py new file mode 100644 index 00000000..6314c673 --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/service/nacos_registry_server.py @@ -0,0 +1,471 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +Service for providing Nacos registry center functionality. +""" +from concurrent.futures import ThreadPoolExecutor +from typing import List, Dict +import weakref +import atexit + +from v2.nacos import RegisterInstanceParam, ListInstanceParam, \ + DeregisterInstanceParam, SubscribeServiceParam, Instance, ListServiceParam + +from fitframework import fitable, const +from fitframework.api.logging import sys_plugin_logger +from fit_common_struct.entity import Worker, FitableMeta, Application, FitableAddressInstance, \ + FitableMetaInstance, ApplicationInstance +from fit_common_struct.core import Fitable, Genericable + +from .config import get_nacos_namespace +from .utils import build_service_key, get_service_name, get_group_name_from_fitable, \ + get_group_name_from_genericable, create_instances +from .parsers import parse_fitable_meta, parse_application, parse_worker, \ + group_instances_by_application, extract_workers +from .async_executor import run_async_safely, call_list_instances, call_deregister_instance, \ + call_subscribe, call_unsubscribe, call_list_services, call_register_instance + +# Global variables +_service_subscriptions: weakref.WeakValueDictionary = weakref.WeakValueDictionary() +_executor = ThreadPoolExecutor(max_workers=10) + + +def _cleanup_executor(): + """Cleanup the thread pool executor.""" + try: + if _executor: + _executor.shutdown(wait=True) + sys_plugin_logger.info("Thread pool executor shut down successfully") + except Exception as e: + sys_plugin_logger.error(f"Error shutting down thread pool executor: {e}") + + +# Register cleanup function to ensure executor is properly closed +atexit.register(_cleanup_executor) + + +def on_service_changed(fitable_info: Fitable, worker_id: str) -> None: + """ + Handle service change events, query and notify updates to Fitables instance information. + + Args: + fitable_info: The changed Fitables information. + worker_id: The worker ID. + """ + try: + # Query current instances + instances = query_fitable_addresses([fitable_info], worker_id) + # notify_fitables(instances) + sys_plugin_logger.debug( + f"Service changed for fitable: {fitable_info}, instances: {len(instances)}" + ) + except Exception as e: + sys_plugin_logger.error(f"Service change handling failed: {e}") + + +def extract_workers(app_instances: List[Instance], application: Application) -> List[Worker]: + """ + Extract all workers corresponding to instances. + + Args: + app_instances: The list of application instances. + application: The application object. + + Returns: + List of workers. + """ + workers = [] + for instance in app_instances: + worker = parse_worker(instance) + workers.append(worker) + + return workers + + +@fitable(const.REGISTER_FIT_SERVICE_GEN_ID, const.REGISTER_FIT_SERVICE_FIT_ID) +def register_fitables(fitable_metas: List[FitableMeta], worker: Worker, application: Application) -> None: + """ + Register Fitable service implementations to the registry center. + + Args: + fitable_metas: List of Fitable metadata to register. + worker: Current FIT process information. + application: Current application information. + + Raises: + Exception: If registration fails due to registry error. + """ + try: + sys_plugin_logger.debug( + f"Registering fitables. [fitableMetas={fitable_metas}, " + f"worker={worker.id}, application={application.nameVersion}]" + ) + + for meta in fitable_metas: + fitable = meta.fitable + group_name = get_group_name_from_fitable(fitable) + service_name = get_service_name(fitable) + + instances = create_instances(worker, application, meta) + for instance in instances: + param = RegisterInstanceParam( + service_name=service_name, + group_name=group_name, + ip=instance["ip"], + port=instance["port"], + weight=instance["weight"], + ephemeral=instance["ephemeral"], + metadata=instance["metadata"] + ) + run_async_safely(call_register_instance(param)) + + sys_plugin_logger.info(f"Successfully registered fitables for worker {worker.id}") + except Exception as e: + sys_plugin_logger.error(f"Failed to register fitables due to registry error: {e}") + raise + +@fitable(const.UNREGISTER_FIT_SERVICE_GEN_ID, const.UNREGISTER_FIT_SERVICE_FIT_ID) +def unregister_fitables(fitables: List[Fitable], worker_id: str) -> None: + """ + Unregister service implementations from the registry center. + + Args: + fitables: List of Fitable implementations to unregister. + worker_id: Unique identifier of the process where service implementations reside. + """ + sys_plugin_logger.debug( + f"Unregistering fitables for worker. [fitables={fitables}, workerId={worker_id}]" + ) + + for fitable in fitables: + unregister_single_fitable(fitable, worker_id) + + +def unregister_single_fitable(fitable: Fitable, worker_id: str) -> None: + """ + Unregister a single Fitable implementation. + + Args: + fitable: The Fitable implementation to unregister. + worker_id: The worker ID. + """ + group_name = get_group_name_from_fitable(fitable) + service_name = get_service_name(fitable) + + try: + # Get all instances for the service + param = ListInstanceParam( + service_name=service_name, + group_name=group_name, + healthy_only=True + ) + instances = run_async_safely(call_list_instances(param)) + unregister_matching_instances(instances, worker_id, service_name, group_name) + except Exception as e: + sys_plugin_logger.error(f"Failed to unregister fitable due to registry error: {e}") + + +def unregister_matching_instances(instances: List[Instance], worker_id: str, service_name: str, group_name: str) -> None: + """ + Unregister matching instances. + + Args: + instances: List of instances to check. + worker_id: The worker ID to match. + service_name: The service name. + group_name: The group name. + """ + for instance in instances: + try: + worker = parse_worker(instance) + if worker and worker.id == worker_id: + param = DeregisterInstanceParam( + service_name=service_name, + group_name=group_name, + ip=instance.ip, + port=instance.port + ) + run_async_safely(call_deregister_instance(param)) + sys_plugin_logger.debug(f"Successfully deregistered instance {instance.ip}:{instance.port}") + except Exception as e: + sys_plugin_logger.error(f"Failed to deregister instance: {e}") + + +@fitable(const.QUERY_FIT_SERVICE_GEN_ID, const.QUERY_FIT_SERVICE_FIT_ID) +def query_fitable_addresses(fitables: List[Fitable], worker_id: str) -> List[FitableAddressInstance]: + """ + Query instance information for Fitable implementations (pull mode). + + Args: + fitables: List of Fitable implementation information. + worker_id: Current FIT process identifier. + + Returns: + List of obtained instance information. + """ + sys_plugin_logger.debug( + f"Querying fitables for worker. [fitables={fitables}, workerId={worker_id}]" + ) + result_map = {} + + for fitable in fitables: + try: + instances = query_instances(fitable) + if not instances: + continue + process_application_instances(result_map, fitable, instances) + except Exception as e: + sys_plugin_logger.error(f"Failed to query fitables for genericableId: {e}") + + return list(result_map.values()) + + +def query_instances(fitable: Fitable) -> List[Instance]: + """ + Query instances for a specific Fitable. + + Args: + fitable: The Fitable to query instances for. + + Returns: + List of instances. + """ + group_name = get_group_name_from_fitable(fitable) + service_name = get_service_name(fitable) + + param = ListInstanceParam( + service_name=service_name, + group_name=group_name, + healthy_only=True + ) + return run_async_safely(call_list_instances(param)) + + +def process_application_instances(result_map: Dict, fitable: Fitable, instances: List[Instance]) -> None: + """ + Process application instances and group them. + + Args: + result_map: Dictionary to store results. + fitable: The Fitable being processed. + instances: List of instances to process. + """ + app_instances_map = group_instances_by_application(instances) + + for app, app_instances in app_instances_map.items(): + meta = parse_fitable_meta(app_instances[0].metadata) + workers = extract_workers(app_instances, app) + + fai = result_map.get(fitable) + if fai is None: + fai = FitableAddressInstance(applicationInstances=[], fitable=fitable) + result_map[fitable] = fai + + app_instance = ApplicationInstance(workers=list(workers), application=app, formats=meta.formats if meta.formats else []) + fai.applicationInstances.append(app_instance) + + +@fitable(const.SUBSCRIBE_FIT_SERVICE_GEN_ID, const.SUBSCRIBE_FIT_SERVICE_FIT_ID) +def subscribe_fit_service(fitables: List[Fitable], worker_id: str, callback_fitable_id: str) -> List[FitableAddressInstance]: + """ + Subscribe to Fitable service instance information (push mode). + + Args: + fitables: List of Fitable implementation information. + worker_id: Current FIT process identifier. + callback_fitable_id: Identifier for callback Fitable implementation. + + Returns: + Queried instance information. + """ + sys_plugin_logger.debug( + f"Subscribing to fitables for worker. [fitables={fitables}, " + f"workerId={worker_id}, callbackFitableId={callback_fitable_id}]" + ) + + # Register subscriptions + for fitable in fitables: + try: + group_name = get_group_name_from_fitable(fitable) + service_name = get_service_name(fitable) + service_key = build_service_key(group_name, service_name) + + if service_key in _service_subscriptions: + sys_plugin_logger.debug( + f"Already subscribed to service. [groupName={group_name}, serviceName={service_name}]" + ) + continue + + # Create event listener + def create_event_listener(fitable_ref: Fitable, worker_id_ref: str): + def event_listener(event): + _executor.submit(on_service_changed, fitable_ref, worker_id_ref) + return event_listener + + event_listener = create_event_listener(fitable, worker_id) + _service_subscriptions[service_key] = event_listener + + # Register subscription + param = SubscribeServiceParam( + service_name=service_name, + group_name=group_name, + subscribe_callback=event_listener + ) + run_async_safely(call_subscribe(param)) + sys_plugin_logger.debug( + f"Subscribed to service. [groupName={group_name}, serviceName={service_name}]" + ) + + except Exception as e: + sys_plugin_logger.error(f"Failed to subscribe to Nacos service: {e}") + + return query_fitable_addresses(fitables, worker_id) + + +@fitable(const.UNSUBSCRIBE_FIT_SERVICE_GEN_ID, const.UNSUBSCRIBE_FIT_SERVICE_FIT_ID) +def unsubscribe_fitables(fitables: List[Fitable], worker_id: str, callback_fitable_id: str) -> None: + """ + Unsubscribe from specified Fitable service instance information. + + Args: + fitables: List of specified Fitable implementations. + worker_id: Unique identifier of the specified process. + callback_fitable_id: Unique identifier for unsubscribe callback Fitable implementation. + """ + sys_plugin_logger.debug(f"Unsubscribing from fitables for worker. [fitables={fitables}, workerId={worker_id}, callbackFitableId={callback_fitable_id}]") + + for fitable in fitables: + try: + group_name = get_group_name_from_fitable(fitable) + service_name = get_service_name(fitable) + service_key = build_service_key(group_name, service_name) + + # Use pop with default to avoid KeyError if listener was garbage collected + listener = _service_subscriptions.pop(service_key, None) + if listener is not None: + param = SubscribeServiceParam( + service_name=service_name, + group_name=group_name, + subscribe_callback=listener + ) + run_async_safely(call_unsubscribe(param)) + sys_plugin_logger.debug(f"Unsubscribed from service. [groupName={group_name}, serviceName={service_name}]") + else: + sys_plugin_logger.debug(f"Listener already cleaned up for service. [groupName={group_name}, serviceName={service_name}]") + except Exception as e: + sys_plugin_logger.error(f"Failed to unsubscribe from Nacos service: {e}") + + +@fitable(const.QUERY_FITABLE_METAS_GEN_ID, const.QUERY_FITABLE_METAS_FIT_ID) +def query_fitable_metas(genericable_infos: List[Genericable]) -> List[FitableMetaInstance]: + """ + Query Fitable metadata from the registry center. + + Args: + genericable_infos: List of Genericable information. + + Returns: + List of queried Fitable metadata. + """ + sys_plugin_logger.debug( + f"Querying fitable metas for genericables. [genericables={genericable_infos}]" + ) + meta_environments = {} + + for genericable in genericable_infos: + process_genericable_services(genericable, meta_environments) + + return build_fitable_meta_instances(meta_environments) + + +def process_genericable_services(genericable: Genericable, meta_environments: Dict) -> None: + """ + Process service list for a Genericable. + + Args: + genericable: The Genericable to process. + meta_environments: Dictionary to collect metadata environments. + """ + group_name = get_group_name_from_genericable(genericable) + + try: + # Get all services under the group + param = ListServiceParam( + namespace_id=get_nacos_namespace(), + group_name=group_name, + page_no=1, + page_size=1000 # Assume fetching enough services at once + ) + service_list = run_async_safely(call_list_services(param)) + + for service_name in service_list.services: + process_service_instances(service_name, group_name, meta_environments) + except Exception as e: + sys_plugin_logger.error(f"Failed to query fitable metas: {e}") + + +def process_service_instances(service_name: str, group_name: str, meta_environments: Dict) -> None: + """ + Process service instances. + + Args: + service_name: The service name. + group_name: The group name. + meta_environments: Dictionary to collect metadata environments. + """ + try: + # Get service instances + param = ListInstanceParam( + service_name=service_name, + group_name=group_name, + healthy_only=True + ) + instances = run_async_safely(call_list_instances(param)) + + if not instances: + return + + meta = parse_fitable_meta(instances[0].metadata) + collect_environments_from_instances(instances, meta, meta_environments) + except Exception as e: + sys_plugin_logger.error(f"Failed to select instances for service {service_name}: {e}") + + +def collect_environments_from_instances(instances: List[Instance], meta: FitableMeta, meta_environments: Dict) -> None: + """ + Collect environment information from instances. + + Args: + instances: List of instances to process. + meta: FitableMeta object. + meta_environments: Dictionary to collect environments. + """ + for instance in instances: + try: + worker = parse_worker(instance) + if worker and worker.environment: + if meta not in meta_environments: + meta_environments[meta] = set() + meta_environments[meta].add(worker.environment) + except Exception as e: + sys_plugin_logger.error(f"Failed to parse worker metadata: {e}") + + +def build_fitable_meta_instances(meta_environments: Dict) -> List[FitableMetaInstance]: + """ + Build FitableMetaInstance list. + + Args: + meta_environments: Dictionary mapping metadata to environments. + + Returns: + List of FitableMetaInstance objects. + """ + results = [] + for meta, envs in meta_environments.items(): + instance = FitableMetaInstance(meta, list(envs)) + results.append(instance) + return results diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/service/parsers.py b/framework/fit/python/plugin/fit_py_nacos_registry/service/parsers.py new file mode 100644 index 00000000..89aaf83e --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/service/parsers.py @@ -0,0 +1,137 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +Parsing functions for Nacos registry server. +""" +from typing import Dict, List, Set + +from v2.nacos import Instance + +from fit_common_struct.entity import Worker, Application, FitableMeta, Endpoint, Address +from fit_common_struct.core import Fitable +from fitframework.utils.json_serialize_utils import json_deserialize +from fitframework.api.logging import sys_plugin_logger + +from .constants import WORKER_KEY, APPLICATION_KEY, FITABLE_META_KEY + + +def parse_fitable_meta(metadata: Dict) -> FitableMeta: + """ + Parse FitableMeta from metadata. + + Args: + metadata: The metadata dictionary. + + Returns: + Parsed FitableMeta object or default if parsing fails. + """ + try: + meta_json = metadata.get(FITABLE_META_KEY) + if meta_json: + return json_deserialize(FitableMeta, meta_json) + except Exception as e: + sys_plugin_logger.error(f"Failed to parse fitable meta for instance: {e}") + + # Return default value + default_fitable = Fitable("unknown", "1.0", "unknown", "1.0") + meta = FitableMeta(default_fitable, [], []) + return meta + + +def parse_application(metadata: Dict) -> Application: + """ + Parse Application from metadata. + + Args: + metadata: The metadata dictionary. + + Returns: + Parsed Application object or default if parsing fails. + """ + try: + app_json = metadata.get(APPLICATION_KEY) + if app_json: + return json_deserialize(Application, app_json) + except Exception as e: + sys_plugin_logger.error(f"Failed to parse application metadata for instance: {e}") + + # Return default value + return Application("unknown", "unknown") + + +def parse_worker(instance_or_metadata) -> Worker: + """ + Parse Worker from instance or metadata. + + Args: + instance_or_metadata: Either an Instance object or metadata dictionary. + + Returns: + Parsed Worker object or default if parsing fails. + """ + try: + # Handle different input types + if hasattr(instance_or_metadata, 'metadata'): + metadata = instance_or_metadata.metadata + ip = getattr(instance_or_metadata, 'ip', 'unknown') + port = getattr(instance_or_metadata, 'port', 0) + else: + metadata = instance_or_metadata + ip = 'unknown' + port = 0 + + worker_json = metadata.get(WORKER_KEY) + if worker_json: + return json_deserialize(Worker, worker_json) + except Exception as e: + sys_plugin_logger.error(f"Failed to parse worker metadata for instance: {e}") + + # Fallback - create basic worker information + worker = Worker([], "unknown", "", {}) + + # If IP and port info available, create basic address + if ip != 'unknown' and port != 0: + endpoint = Endpoint(port, 1) # Default protocol + address = Address(ip, [endpoint]) + worker.addresses = [address] + + return worker + + +def group_instances_by_application(instances: List[Instance]) -> Dict[Application, List[Instance]]: + """ + Group instances by application. + + Args: + instances: List of instances to group. + + Returns: + Dictionary mapping applications to their instances. + """ + app_instances_map = {} + for instance in instances: + metadata = instance.metadata + app = parse_application(metadata) + app_instances_map.setdefault(app, []).append(instance) + return app_instances_map + + +def extract_workers(app_instances: List[Instance], application: Application) -> Set[Worker]: + """ + Extract all workers corresponding to instances. + + Args: + app_instances: The list of application instances. + application: The application object. + + Returns: + Set of workers. + """ + workers = set() + for instance in app_instances: + worker = parse_worker(instance) + workers.add(worker) + return workers diff --git a/framework/fit/python/plugin/fit_py_nacos_registry/service/utils.py b/framework/fit/python/plugin/fit_py_nacos_registry/service/utils.py new file mode 100644 index 00000000..cc8ca40b --- /dev/null +++ b/framework/fit/python/plugin/fit_py_nacos_registry/service/utils.py @@ -0,0 +1,162 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +""" +Utility functions for Nacos registry server. +""" +from typing import List, Dict + +from fit_common_struct.core import Fitable, Genericable +from fit_common_struct.entity import Worker, Application, FitableMeta, Endpoint + +from fitframework.api.logging import sys_plugin_logger +from fitframework.utils.json_serialize_utils import json_serialize + +from .constants import SEPARATOR, CLUSTER_PORT_PATTERN, PROTOCOL_CODE_MAP, \ + WORKER_KEY, APPLICATION_KEY, FITABLE_META_KEY +from .config import get_heartbeat_weight, get_heartbeat_is_ephemeral, \ + get_heartbeat_interval, get_heartbeat_timeout + + +def build_service_key(group_name: str, service_name: str) -> str: + """ + Build a unique key in the format :: for service subscriptions. + + Args: + group_name: The group name as string. + service_name: The service name as string. + + Returns: + A concatenated key like groupName::serviceName. + """ + return f"{group_name}{SEPARATOR}{service_name}" + + +def get_service_name(fitable: Fitable) -> str: + """ + Get the service name from Fitable. + + Args: + fitable: The Fitable object. + + Returns: + The service name. + """ + return f"{fitable.fitableId}{SEPARATOR}{fitable.fitableVersion}" + + +def get_group_name_from_fitable(fitable: Fitable) -> str: + """ + Get the group name from Fitable. + + Args: + fitable: The Fitable object. + + Returns: + The group name. + """ + return f"{fitable.genericableId}{SEPARATOR}{fitable.genericableVersion}" + + +def get_group_name_from_genericable(genericable: Genericable) -> str: + """ + Get the group name from Genericable. + + Args: + genericable: The Genericable object. + + Returns: + The group name. + """ + return f"{genericable.genericableId}{SEPARATOR}{genericable.genericableVersion}" + + +def create_instances(worker: Worker, application: Application, meta: FitableMeta) -> List[Dict]: + """ + Create instance information. + + Args: + worker: Worker node object. + application: Application object. + meta: FitableMeta metadata object. + + Returns: + List of instance dictionaries. + """ + sys_plugin_logger.debug( + f"Creating instance for worker. [worker={worker.id}, " + f"application={application.nameVersion}, meta={meta}]" + ) + instances = [] + + for address in worker.addresses: + for endpoint in address.endpoints: + # Prepare metadata + metadata = build_instance_metadata(worker, application, meta) + + # Build instance + instance = { + "ip": address.host, + "port": endpoint.port, + "weight": get_heartbeat_weight(), + "ephemeral": get_heartbeat_is_ephemeral(), + "metadata": metadata + } + instances.append(instance) + + return instances + + +def build_instance_metadata(worker: Worker, application: Application, meta: FitableMeta) -> Dict[str, str]: + """ + Build metadata for service instance, including worker, application and FitableMeta information. + + Args: + worker: The worker node object. + application: The application object. + meta: The FitableMeta metadata object. + + Returns: + A dict containing all serialized metadata. + """ + metadata = {} + + # Add heartbeat configuration + metadata["preserved.heart.beat.interval"] = str(get_heartbeat_interval()) + metadata["preserved.heart.beat.timeout"] = str(get_heartbeat_timeout()) + + try: + metadata[WORKER_KEY] = json_serialize(worker) + metadata[APPLICATION_KEY] = json_serialize(application) + metadata[FITABLE_META_KEY] = json_serialize(meta) + except Exception as e: + sys_plugin_logger.error(f"Failed to serialize metadata for worker: {e}") + + return metadata + + +def build_endpoints(extensions: Dict[str, str]) -> List[Endpoint]: + """ + Build endpoint list from extensions. + + Args: + extensions: Extension configuration dictionary. + + Returns: + List of endpoints. + """ + endpoints = [] + + for key, value in extensions.items(): + match = CLUSTER_PORT_PATTERN.match(key) + if match: + protocol_name = match.group(1).lower() + if protocol_name in PROTOCOL_CODE_MAP: + endpoint = Endpoint(int(value), PROTOCOL_CODE_MAP[protocol_name]) + endpoints.append(endpoint) + else: + sys_plugin_logger.error(f"Unknown protocol: {protocol_name}") + + return endpoints diff --git a/framework/fit/python/plugin/fit_py_registry_client/conf/application.yml b/framework/fit/python/plugin/fit_py_registry_client/conf/application.yml index 2151c63e..b14c85dc 100644 --- a/framework/fit/python/plugin/fit_py_registry_client/conf/application.yml +++ b/framework/fit/python/plugin/fit_py_registry_client/conf/application.yml @@ -6,13 +6,6 @@ registry-center: registry_fitable_frequency: 15 registered-fitables-expire-interval: 30 # expected expire interval used by server invalid_address_ttl: 60 - server: - addresses: - - "localhost:8080" - protocol: 2 - formats: - - 1 - context-path: "" service_ids: - "modelengine.fit.registry.registry-service.register-fitables" - "modelengine.fit.registry.registry-service.query-fitables-addresses" diff --git a/framework/fit/python/plugin/fit_py_registry_client/fitable_address_service.py b/framework/fit/python/plugin/fit_py_registry_client/fitable_address_service.py index 64d8b419..c9865be6 100644 --- a/framework/fit/python/plugin/fit_py_registry_client/fitable_address_service.py +++ b/framework/fit/python/plugin/fit_py_registry_client/fitable_address_service.py @@ -14,7 +14,7 @@ from fitframework.api.decorators import fitable, fit, scheduled_executor, value from fitframework.api.logging import sys_plugin_logger from fitframework.utils import tools -from .entity import FitableAddressInstance, FitableInfo +from fit_common_struct.entity import FitableAddressInstance from .registry_address_service import get_cache_aware_registry_address _PUSH_MODE = 'push' @@ -58,7 +58,7 @@ def get_runtime_worker_id() -> str: @fit(const.SUBSCRIBE_FIT_SERVICE_GEN_ID) -def subscribe_fit_service(fitables: List[FitableInfo], worker_id: str, callback_fitable_id: str) \ +def subscribe_fit_service(fitables: List[Fitable], worker_id: str, callback_fitable_id: str) \ -> List[FitableAddressInstance]: """ 注册中心所提供接口,用于订阅某个泛服务实现的实例信息,并且也会返回查询到的实例信息,在推模式下使用。 @@ -72,7 +72,7 @@ def subscribe_fit_service(fitables: List[FitableInfo], worker_id: str, callback_ @fit(const.QUERY_FIT_SERVICE_GEN_ID) -def query_fitable_addresses(fitables: List[FitableInfo], worker_id: str) -> List[FitableAddressInstance]: +def query_fitable_addresses(fitables: List[Fitable], worker_id: str) -> List[FitableAddressInstance]: """ 注册中心所提供接口,用于查询某个泛服务实现的实例信息,在拉模式下使用。 @@ -83,9 +83,9 @@ def query_fitable_addresses(fitables: List[FitableInfo], worker_id: str) -> List pass -def _convert_fitable_to_fitable_info(fitable_: Fitable) -> FitableInfo: - return FitableInfo(fitable_.genericable_id, fitable_.genericable_version, fitable_.fitable_id, - fitable_.fitable_version) +def _convert_fitable_to_fitable_info(fitable_: Fitable) -> Fitable: + return Fitable(fitable_.genericableId, fitable_.genericableVersion, fitable_.fitableId, + fitable_.fitableVersion) def _convert_fitable_address_instance_to_addresses(fitable_inst: FitableAddressInstance) -> \ @@ -120,7 +120,7 @@ def notify_fitable_changes(fitable_instances: List[FitableAddressInstance]) -> N _update_addresses_in_cache(fitable_, addresses) -def _get_fitable_address_instances(fitable_infos: List[FitableInfo]) -> List[FitableAddressInstance]: +def _get_fitable_address_instances(fitable_infos: List[Fitable]) -> List[FitableAddressInstance]: if _registry_client_mode() == _PULL_MODE: return query_fitable_addresses(fitable_infos, get_runtime_worker_id()) else: @@ -157,9 +157,9 @@ def get_fit_service_address_list(fitable_: Fitable) -> List[Address]: @param fitable_: 待查询的泛服务实例信息。 @return: 所查询到的该泛服务的实例列表。 """ - sys_plugin_logger.debug(f"get fit service address list: gid{fitable_.genericable_id}") - if fitable_.genericable_id in _get_registry_server_generic_ids(): - sys_plugin_logger.debug(f"get fit service address list: gid{fitable_.genericable_id}, is registry server api") + sys_plugin_logger.debug(f"get fit service address list: gid{fitable_.genericableId}") + if fitable_.genericableId in _get_registry_server_generic_ids(): + sys_plugin_logger.debug(f"get fit service address list: gid{fitable_.genericableId}, is registry server api") return get_cache_aware_registry_address() addresses = _get_addresses_from_cache(fitable_) if addresses: diff --git a/framework/fit/python/plugin/fit_py_registry_client/fitable_meta_service.py b/framework/fit/python/plugin/fit_py_registry_client/fitable_meta_service.py index f3934d17..5a61eb67 100644 --- a/framework/fit/python/plugin/fit_py_registry_client/fitable_meta_service.py +++ b/framework/fit/python/plugin/fit_py_registry_client/fitable_meta_service.py @@ -8,13 +8,13 @@ """ from typing import List -from fit_common_struct.core import Genericable from fitframework import fitable, const, fit -from .entity import FitableMetaInstance, GenericableInfo +from fit_common_struct.entity import FitableMetaInstance +from fit_common_struct.core import Genericable @fit(const.QUERY_FITABLE_METAS_GEN_ID) -def query_fitable_metas(genericable_infos: List[GenericableInfo]) -> List[FitableMetaInstance]: +def query_fitable_metas(genericable_infos: List[Genericable]) -> List[FitableMetaInstance]: """ 注册中心所提供接口,用于查询泛服务的元数据。 @@ -27,6 +27,6 @@ def query_fitable_metas(genericable_infos: List[GenericableInfo]) -> List[Fitabl @fitable(const.GET_FITABLES_OF_GENERICABLE_GEN_ID, const.GET_FITABLES_OF_GENERICABLE_FIT_ID) def get_all_fitables_from_registry(genericable: Genericable) -> List[str]: fitable_meta_instances: List[FitableMetaInstance] = query_fitable_metas( - [GenericableInfo(genericable.genericable_id, genericable.genericable_version)]) + [Genericable(genericable.genericableId, genericable.genericableVersion)]) return [instance.meta.fitable.fitableId for instance in fitable_meta_instances if - instance.meta.fitable.genericableId == genericable.genericable_id] + instance.meta.fitable.genericableId == genericable.genericableId] diff --git a/framework/fit/python/plugin/fit_py_registry_client/online_fitable_service.py b/framework/fit/python/plugin/fit_py_registry_client/online_fitable_service.py index 4a1395cc..3d2a2673 100644 --- a/framework/fit/python/plugin/fit_py_registry_client/online_fitable_service.py +++ b/framework/fit/python/plugin/fit_py_registry_client/online_fitable_service.py @@ -17,8 +17,9 @@ from fitframework.api.decorators import scheduled_executor from fitframework.api.logging import sys_plugin_logger from fitframework.core.network.enums import SerializingStructureEnum -from .entity import Worker, FitableMeta, FitableInfo, Address as AddressRegistry, Endpoint, \ +from fit_common_struct.entity import Worker, FitableMeta, Address as AddressRegistry, Endpoint, \ Application, Address +from fit_common_struct.core import Fitable @value('worker-environment.env') @@ -81,10 +82,10 @@ def register_fitables(fitable_metas: List[FitableMeta], worker: Worker, applicat def _convert_fitable_aliases_info_to_fitable_meta(fitable_aliases_info: FitableAliasesInfo) -> FitableMeta: local_formats = [each_format.value for each_format in get_registered_formats()] - fitable_info = FitableInfo(fitable_aliases_info.fitable.genericable_id, - fitable_aliases_info.fitable.genericable_version, - fitable_aliases_info.fitable.fitable_id, - fitable_aliases_info.fitable.fitable_version) + fitable_info = Fitable(fitable_aliases_info.fitable.genericableId, + fitable_aliases_info.fitable.genericableVersion, + fitable_aliases_info.fitable.fitableId, + fitable_aliases_info.fitable.fitableVersion) return FitableMeta(fitable_info, fitable_aliases_info.aliases, local_formats) @@ -100,7 +101,7 @@ def _fetch_all_addresses() -> List[AddressRegistry]: def _fetch_fitable_meta_info(fitable_meta: FitableMeta) -> tuple: - fitable_info: FitableInfo = fitable_meta.fitable + fitable_info: Fitable = fitable_meta.fitable return fitable_info.genericableId, fitable_info.fitableVersion, fitable_info.fitableId, fitable_info.fitableVersion diff --git a/framework/fit/python/plugin/fit_py_registry_client/registry_address_service.py b/framework/fit/python/plugin/fit_py_registry_client/registry_address_service.py index c9249eeb..b70078a9 100644 --- a/framework/fit/python/plugin/fit_py_registry_client/registry_address_service.py +++ b/framework/fit/python/plugin/fit_py_registry_client/registry_address_service.py @@ -9,6 +9,7 @@ from typing import List from fit_common_struct.core import Address +from fitframework import to_bool from fitframework.api.decorators import scheduled_executor, value from fitframework.api.logging import sys_plugin_logger from fitframework.utils import tools @@ -30,6 +31,10 @@ def _get_registry_pull_frequency(): def _get_registry_server_addresses() -> list: pass +@value('registry-center.server.mode',default_value='DIRECT') +def _get_registry_server_mode(): + pass + @value('registry-center.server.protocol', converter=int) def _get_registry_server_protocol(): @@ -57,9 +62,45 @@ def _build_address(addr: str) -> Address: return Address(host=ip, port=int(port), protocol=_get_registry_server_protocol(), environment=_get_worker_env(), formats=_get_registry_server_formats(), worker_id='', context_path=_get_registry_context_path()) +@value('http.server.enabled', False, converter=to_bool) +def _get_http_enabled(): + pass + +@value('http.server.address.port', converter=int) +def _get_http_server_port(): + pass + +@value('https.server.enabled', default_value=False, converter=to_bool) +def _get_https_enabled(): + pass + +@value('local_ip') +def _get_host(): + pass + +@value('https.server.address.port', converter=int) +def _get_https_server_port(): + pass + + +def _process_address_by_mode() -> list: + if _get_registry_server_mode() == 'PROXY': + if _get_http_enabled(): + port = _get_http_server_port() + elif _get_https_enabled(): + port = _get_https_server_port() + else: + port = "" + return [_get_host() + port] + elif _get_registry_server_mode() == 'DIRECT': + return _get_registry_server_addresses() + else: + raise RuntimeError(f"unsupported registry server mode: {_get_registry_server_mode()}") + + def _get_registry_addresses_from_configuration(): - return [_build_address(addr) for addr in _get_registry_server_addresses()] + return [_build_address(addr) for addr in _process_address_by_mode()] @scheduled_executor(_get_registry_pull_frequency()) diff --git a/framework/fit/python/plugin/fit_py_registry_client/test/fitable_address_service_test.py b/framework/fit/python/plugin/fit_py_registry_client/test/fitable_address_service_test.py index ae28acf6..272453be 100644 --- a/framework/fit/python/plugin/fit_py_registry_client/test/fitable_address_service_test.py +++ b/framework/fit/python/plugin/fit_py_registry_client/test/fitable_address_service_test.py @@ -21,16 +21,16 @@ class FitableAddressServiceTest(FitTestSupport): def setUpClass(cls): super(FitableAddressServiceTest, cls).setUpClass() from plugin.fit_py_registry_client import fitable_address_service - from plugin.fit_py_registry_client import entity + from fit_common_struct import entity global fitable_address_service global entity - def query_fitable_addresses_side_effect(fitables: List[entity.FitableInfo], worker_id: str): + def query_fitable_addresses_side_effect(fitables: List[entity.Fitable], worker_id: str): if fitables[0].genericableId == "gid_ut": return [cls.build_fitable_address_instance("host_ut_1", 8000)] return [] - def subscribe_fit_service_side_effect(fitables: List[entity.FitableInfo], worker_id: str, + def subscribe_fit_service_side_effect(fitables: List[entity.Fitable], worker_id: str, callback_fitable_id: str): if fitables[0].genericableId == "gid_ut": return [cls.build_fitable_address_instance("host_ut_1", 8000)] @@ -59,7 +59,7 @@ def build_fitable_address_instance(cls, host: str, port: int): worker = entity.Worker([address], "worker_ut", "env_ut", {"http.context-path": "context-path-ut"}) application = entity.Application("name_ut", "name_version_ut") application_instance = entity.ApplicationInstance([worker], application, [1]) - fitable_info = entity.FitableInfo("gid_ut", "1.0.0", "fid_ut", "1.0.0") + fitable_info = entity.Fitable("gid_ut", "1.0.0", "fid_ut", "1.0.0") fitable_address_instance = entity.FitableAddressInstance([application_instance], fitable_info) return fitable_address_instance diff --git a/framework/fit/python/plugin/fit_py_server_http/conf/application.yml b/framework/fit/python/plugin/fit_py_server_http/conf/application.yml index 12bb6f67..4dbac3fd 100644 --- a/framework/fit/python/plugin/fit_py_server_http/conf/application.yml +++ b/framework/fit/python/plugin/fit_py_server_http/conf/application.yml @@ -1,15 +1,4 @@ server-thread-count: 8 -http: - server: - enabled: true - address: - use-random-port: false - port: 9666 - port-to-register: - protocol: 2 - formats: - - 1 - - 2 https: server: enabled: false @@ -34,7 +23,6 @@ https: tls_protocol: "" ciphers: - async: task-count-limit: 1000 result-save-duration: 300