From 59b3089f9d212521ee3d5e5991b1dd172ebd744b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=AD=E6=BD=87?= <1576730710@qq.com> Date: Fri, 10 Oct 2025 11:26:45 +0800 Subject: [PATCH] =?UTF-8?q?[code]=20=E7=A7=BB=E9=99=A4=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fit_py_code_node_tools/conf/__init__.py | 0 .../conf/application.yml | 31 --- .../fit_py_code_node_tools/conf/info.yml | 2 - .../fit_py_code_node_tools/python_repl.py | 139 ------------ .../python_repl_impl.py | 168 --------------- .../fit_py_code_node_tools/safe_global.py | 103 --------- .../test_python_repl_impl.py | 201 ------------------ framework/fel/python/requirements.txt | 4 - 8 files changed, 648 deletions(-) delete mode 100644 framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/__init__.py delete mode 100644 framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/application.yml delete mode 100644 framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/info.yml delete mode 100644 framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl.py delete mode 100644 framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl_impl.py delete mode 100644 framework/fel/python/plugins/builtins/fit_py_code_node_tools/safe_global.py delete mode 100644 framework/fel/python/plugins/builtins/fit_py_code_node_tools/test_python_repl_impl.py diff --git a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/__init__.py b/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/application.yml b/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/application.yml deleted file mode 100644 index 95928426..00000000 --- a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/application.yml +++ /dev/null @@ -1,31 +0,0 @@ -user: - function: - entrypoint: 'main' - -code: - import: - whitelist: - - 'json' - - 'typing' - - 'pandas' - - 'numpy' - - 're' - - 'requests' - - 'httpx' - - 'datetime' - - 'time' - - 'base64' - - 'hashlib' - blacklist: - - 'os' - - 'sys' - - 'cmd' - - 'subprocess' - - 'multiprocessing' - - 'timeit' - - 'platform' - - 'asyncio' - timeout: 10 - max_pool: 4 - mem_limit: 189792256 # 181*1024*1024 - verbose: False diff --git a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/info.yml b/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/info.yml deleted file mode 100644 index eb1e2d37..00000000 --- a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/conf/info.yml +++ /dev/null @@ -1,2 +0,0 @@ -category: "system" -level: 4 diff --git a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl.py b/framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl.py deleted file mode 100644 index dd4faf6a..00000000 --- a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl.py +++ /dev/null @@ -1,139 +0,0 @@ -# -- encoding: utf-8 -- -# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. -# This file is a part of the ModelEngine Project. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ====================================================================================================================== -import multiprocessing -import os -from typing import Dict -import threading - -from fitframework.api.decorators import fitable, value as FitConfigValue -from fitframework.api.logging import fit_logger -from fitframework.core.exception.fit_exception import FitException, InternalErrorCode -from fitframework.utils.tools import to_list - -from .python_repl_impl import execute_node_impl, GLOBAL_CONFIG - - -@FitConfigValue(key='user.function.entrypoint', default_value='main') -def _read_entrypoint_from_config(): - pass - - -@FitConfigValue(key='code.import.whitelist', default_value=['asyncio', 'json', 'numpy', 'typing'], converter=to_list) -def _read_import_whitelist_from_config(): - pass - - -@FitConfigValue(key='code.import.blacklist', - default_value=['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'], - converter=to_list) -def _read_import_blacklist_from_config(): - pass - - -@FitConfigValue(key='code.timeout', default_value=10, converter=int) -def _timeout(): - pass - - -@FitConfigValue(key='code.max_pool', default_value=4, converter=int) -def _max_pool(): - pass - - -@FitConfigValue(key='code.mem_limit', default_value=181*1024*1024, converter=int) -def _mem_limit(): - pass - - -@FitConfigValue(key='code.verbose', default_value=False, converter=bool) -def _verbose(): - pass - - -def _init_config(): - GLOBAL_CONFIG["entrypoint"] = _read_entrypoint_from_config() - GLOBAL_CONFIG["whitelist"] = _read_import_whitelist_from_config() - GLOBAL_CONFIG["blacklist"] = _read_import_blacklist_from_config() - GLOBAL_CONFIG["timeout"] = _timeout() - GLOBAL_CONFIG["max_pool"] = _max_pool() - GLOBAL_CONFIG["mem_limit"] = _mem_limit() - GLOBAL_CONFIG["verbose"] = _verbose() - - -class Singleton(type): - _lock = threading.Lock() - - def __init__(cls, *args, **kwargs): - cls._instance = None - super().__init__(*args, **kwargs) - - def __call__(cls, *args, **kwargs): - if cls._instance: - return cls._instance - - with cls._lock: - if not cls._instance: - cls._instance = super().__call__(*args, **kwargs) - - return cls._instance - - -class CodeExecutor(metaclass=Singleton): - def __init__(self): - _init_config() - self.pools = [] - for _ in range(GLOBAL_CONFIG["max_pool"]): - lock = threading.Lock() - pool = multiprocessing.Pool(processes=1) - self.pools.append((lock, pool)) - self.index = 0 - self.index_lock = threading.Lock() - self.config = GLOBAL_CONFIG - - def get_and_increment(self) -> int: - with self.index_lock: - i = self.index - self.index = i + 1 if i < self.config["max_pool"] - 1 else 0 - return i - - -def _print_process_usage(): - import psutil - # Get the current process ID - pid = os.getpid() - - # Create a Process object for the current process - process = psutil.Process(pid) - - # Get the CPU and memory usage of the current process - cpu_usage = process.cpu_percent(interval=1.0) # This returns the CPU usage as a percentage - memory_info = process.memory_info() # Returns memory usage as a named tuple (rss, vms) - - # rss (Resident Set Size) - the non-swapped physical memory the process has used - # vms (Virtual Memory Size) - the total memory the process can access - memory_usage = memory_info.rss / (1024 * 1024) # Convert to MB - virtual_memory = memory_info.vms / (1024 * 1024) # Convert to MB - - # Print CPU and memory usage - fit_logger.info(f"CPU Usage: {cpu_usage}%, Memory Usage (RSS): {memory_usage:.2f} MB, " - f"Virtual Memory Usage (VMS): {virtual_memory:.2f} MB") - - current_process = psutil.Process() - children = current_process.children(recursive=True) - for child in children: - fit_logger.info('Child pid is {}'.format(child.pid)) - - -@fitable("CodeNode.tool", "Python_REPL") -def execute_code(args: Dict[str, object], code: str) -> object: - # 由于插件初始化时使用守护进程,无法拉起进程池中的进程,选择在初次调用时初始化进程池 - executor = CodeExecutor() - if GLOBAL_CONFIG["verbose"]: - _print_process_usage() - res = execute_node_impl(executor.pools, executor.get_and_increment(), args, code, GLOBAL_CONFIG) - if res.isOk: - return res.value - raise FitException(res.error_code, res.msg) \ No newline at end of file diff --git a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl_impl.py b/framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl_impl.py deleted file mode 100644 index 7402264f..00000000 --- a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/python_repl_impl.py +++ /dev/null @@ -1,168 +0,0 @@ -# -- encoding: utf-8 -- -# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. -# This file is a part of the ModelEngine Project. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ====================================================================================================================== -import asyncio -import importlib -import inspect -import json -import multiprocessing -import platform -import re -from typing import Any, Dict, List, Tuple -from pydantic import BaseModel - -if platform.system() == 'Windows': - from enum import IntEnum - - class InternalErrorCode(IntEnum): - EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000105 - TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000106 # java 不存在 -else: - from fitframework.core.exception.fit_exception import InternalErrorCode -try: - import resource -except ImportError: - resource = None -try: - from .safe_global import safe_builtins -except ImportError as e: - from safe_global import safe_builtins - -_PYTHON_REPL_HEADER = ''' -import json -from typing import Any - -Output = Any - - -''' - -GLOBAL_CONFIG = \ - { - "header": _PYTHON_REPL_HEADER, - "header_len": len(_PYTHON_REPL_HEADER.split('\n')), - "entrypoint": 'main', - "whitelist": ['json', 'typing'], - "blacklist": ['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'], - "timeout": 10, - "max_pool": 4, - "mem_limit": 181 * 1024 * 1024, - "verbose": False - } - - -class Result(BaseModel): - isOk: bool - value: Any = None - error_code: int - msg: str = None - - @staticmethod - def ok(data: Any) -> 'Result': - return Result(isOk=True, value=data, error_code=0) - - @staticmethod - def err(err_code: int, err_msg: str) -> 'Result': - return Result(isOk=False, error_code=err_code, msg=err_msg) - - -# 创建一个安全的执行环境 -def _create_restricted_exec_env(config: Dict[str, object]): - def safer_import(name, my_globals=None, my_locals=None, fromlist=(), level=0): - if name not in config['whitelist'] or name in config['blacklist']: - raise NameError(f'model {name} is not valid') - return importlib.import_module(name) - - safe_globals = { - '__builtins__': { - **safe_builtins, - '__import__': safer_import, - 'Args': Dict - } - } - return safe_globals - - -# 获取内存使用(单位:kB) -def _get_current_memory_usage(): - with open('/proc/self/status') as f: - mem_usage = f.read().split('VmPeak:')[1].split('\n')[0].strip() - return int(mem_usage.split()[0].strip()) - - -# 执行受限代码 -def _execute_code_with_restricted_python(args: Dict[str, object], code: str, config: Dict[str, object]): - if resource: - resource.setrlimit(resource.RLIMIT_AS, (GLOBAL_CONFIG["mem_limit"], GLOBAL_CONFIG["mem_limit"])) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - full_python_code = (f"{config['header']}" - f'{code}\n\n') - - safer_globals = _create_restricted_exec_env(config) - exec(full_python_code, safer_globals) - entrypoint = config['entrypoint'] - if (entrypoint not in safer_globals or - not inspect.isfunction(safer_globals.get(entrypoint))): - raise NameError("main function not defined") - entrypoint = safer_globals.get(entrypoint) - if inspect.iscoroutinefunction(entrypoint): - ret = loop.run_until_complete(asyncio.wait_for(entrypoint(args), config['timeout'])) - return Result.ok(json.dumps(ret)) - else: - return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, - "Unable to execute non-asynchronous function") - except asyncio.TimeoutError: - return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value, - "[TimeoutError] Execution timed out") - except Exception as err: - return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, _get_except_msg(err, config)) - finally: - loop.close() - - -def _get_except_msg(error: Any, config: Dict[str, object]) -> str: - if isinstance(error, SyntaxError): - error_msg = f"{error.msg} at line {error.lineno - config['header_len']}, column {error.offset}: {error.text}" - elif isinstance(error, KeyError): - error_msg = f"key {str(error)} do not exist" - else: - error_msg = str(error) - return f"[{error.__class__.__name__}] {error_msg}" - - -def _get_free_process_pool(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index): - lock = pools[index][0] - if lock.acquire(): - return lock - raise multiprocessing.TimeoutError() - - -def execute_node_impl(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index: int, - args: Dict[str, object], code: str, config: Dict[str, object]): - match = _validate_escape(code) - if match is not None: - return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, - f'{match.group()} is not allowed in code node') - lock = _get_free_process_pool(pools, index) - pool = pools[index][1] - try: - result = pool.apply_async(_execute_code_with_restricted_python, args=[args, code, config]) - return result.get(config['timeout']) - except multiprocessing.TimeoutError: - index = pools.index((lock, pool)) - pool.terminate() - pools[index] = (lock, multiprocessing.Pool(processes=1)) - return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value, - "[TimeoutError] Execution timed out") - finally: - lock.release() - - -def _validate_escape(code: str) -> bool: - # 校验代码中是否存在获取栈帧的字段,禁用可能用于沙箱逃逸的端 - pattern = r'.gi_frame|.tb_frame|__[a-zA-Z]+__' - return re.search(pattern, code) \ No newline at end of file diff --git a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/safe_global.py b/framework/fel/python/plugins/builtins/fit_py_code_node_tools/safe_global.py deleted file mode 100644 index 95181a3d..00000000 --- a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/safe_global.py +++ /dev/null @@ -1,103 +0,0 @@ -# -- encoding: utf-8 -- -# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. -# This file is a part of the ModelEngine Project. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ====================================================================================================================== -import builtins - - -safe_builtins = {} - -_safe_names = [ - '__build_class__', - 'None', - 'False', - 'True', - 'abs', - 'bool', - 'bytes', - 'callable', - 'chr', - 'complex', - 'dict', - 'divmod', - 'float', - 'hash', - 'hex', - 'id', - 'int', - 'isinstance', - 'issubclass', - 'len', - 'list', - 'oct', - 'ord', - 'pow', - 'range', - 'repr', - 'round', - 'set', - 'slice', - 'sorted', - 'str', - 'tuple', - 'zip' -] - -_safe_exceptions = [ - 'ArithmeticError', - 'AssertionError', - 'AttributeError', - 'BaseException', - 'BufferError', - 'BytesWarning', - 'DeprecationWarning', - 'EOFError', - 'EnvironmentError', - 'Exception', - 'FloatingPointError', - 'FutureWarning', - 'GeneratorExit', - 'IOError', - 'ImportError', - 'ImportWarning', - 'IndentationError', - 'IndexError', - 'KeyError', - 'KeyboardInterrupt', - 'LookupError', - 'MemoryError', - 'NameError', - 'NotImplementedError', - 'OSError', - 'OverflowError', - 'PendingDeprecationWarning', - 'ReferenceError', - 'RuntimeError', - 'RuntimeWarning', - 'StopIteration', - 'SyntaxError', - 'SyntaxWarning', - 'SystemError', - 'SystemExit', - 'TabError', - 'TypeError', - 'UnboundLocalError', - 'UnicodeDecodeError', - 'UnicodeEncodeError', - 'UnicodeError', - 'UnicodeTranslateError', - 'UnicodeWarning', - 'UserWarning', - 'ValueError', - 'Warning', - 'ZeroDivisionError', -] - -for safe_name in _safe_names: - safe_builtins[safe_name] = getattr(builtins, safe_name) - -for safe_exception in _safe_exceptions: - safe_builtins[safe_exception] = getattr(builtins, safe_exception) - -safe_globals = {'__builtins__': safe_builtins} diff --git a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/test_python_repl_impl.py b/framework/fel/python/plugins/builtins/fit_py_code_node_tools/test_python_repl_impl.py deleted file mode 100644 index d9bcfc12..00000000 --- a/framework/fel/python/plugins/builtins/fit_py_code_node_tools/test_python_repl_impl.py +++ /dev/null @@ -1,201 +0,0 @@ -# -- encoding: utf-8 -- -# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. -# This file is a part of the ModelEngine Project. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ====================================================================================================================== -import json -import multiprocessing -import threading -import unittest - -try: - from python_repl_impl import execute_node_impl, GLOBAL_CONFIG -except ImportError as e: - from .python_repl_impl import execute_node_impl, GLOBAL_CONFIG - -WITH_ARGS_CODE = """ -async def main(args): - return args['n'] -""" - -WITHOUT_ARGS_CODE = """ -async def main(): - return args['n'] -""" - -INVALID_MODULE_CODE = """ -import time -def main(args): - return 1 -""" - -NO_ENTRYPOINT_CODE = """ -async def mine(args): - return 1 -""" - -SYNTAX_ERROR_CODE = """ -async def main(args): - return 1////2 -""" - -ASYNC_CODE = """ -import asyncio -async def main(args): - await asyncio.sleep(10) - return 1 -""" - -INF_LOOP_CODE = """ -async def main(args): - while True: - continue - return 1 -""" - -NON_ASYNC_CODE = """ -def main(args): - return 1 -""" - -WARN_CODE = ''' -async def exception_frame(): - try: - import os - except Exception as e: - return e.__traceback__.tb_frame.f_back.f_back.f_globals['_IMPORT_WHITELIST'] -b=exception_frame() -b.add('os') -''' - -WARN_CODE1 = ''' -async def exception_frame(): - ret = ().__class__.__bases__[0].__subclasses__()[133].__globals__["mkdir"]("tmp_file") -''' - -WARN_CODE2 = ''' -async def exception_frame(): - try: - import os - except Exception as e: - return e.tb_frame.f_back.f_back.f_globals['_IMPORT_WHITELIST'] -b=exception_frame() -b.add('os') -''' - -ERROR_CODE = """ -async def main(args): - return 1/0 -""" - -UNSERIALIZABLE_CODE = """ -async def main(args): - return {"n" : 1}.keys() -""" - -MEMORY_ERROR_CODE = """ -async def main(args): - k = [i for i in range(10**7)] - return k -""" - - -class CodeExecutor: - def __init__(self): - self.pools = [] - for _ in range(GLOBAL_CONFIG["max_pool"]): - lock = threading.Lock() - pool = multiprocessing.Pool(processes=1) - self.pools.append((lock, pool)) - self.index = 0 - self.index_lock = threading.Lock() - self.config = GLOBAL_CONFIG - - def get_and_increment(self) -> int: - with self.index_lock: - i = self.index - self.index = i + 1 if i < self.config["max_pool"] - 1 else 0 - return i - - -executor = CodeExecutor() -user_args = dict() - - -class TestStringMethods(unittest.TestCase): - def test_with_args(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), {"n": 1}, WITH_ARGS_CODE, executor.config) - self.assertTrue(res.isOk) - self.assertEqual(json.loads(res.value), 1) - - def test_without_args(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), {"n": 1}, WITHOUT_ARGS_CODE, - executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, "[TypeError] main() takes 0 positional arguments but 1 was given") - - def test_key_not_exist(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WITH_ARGS_CODE, - executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, "[KeyError] key 'n' do not exist") - - def test_invalid_module(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, INVALID_MODULE_CODE, - executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '[NameError] model time is not valid') - - def test_no_entrypoint(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, NO_ENTRYPOINT_CODE, - executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '[NameError] main function not defined') - - def test_syntax_error(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, SYNTAX_ERROR_CODE, - executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '[SyntaxError] invalid syntax at line 2, column 15: return 1////2') - - def test_async_timeout(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, ASYNC_CODE, executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '[TimeoutError] Execution timed out') - - def test_non_async_code(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, NON_ASYNC_CODE, - executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, "Unable to execute non-asynchronous function") - - def test_warn_code(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WARN_CODE, executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '__traceback__ is not allowed in code node') - - def test_warn_code_double_under_score(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WARN_CODE1, executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '__class__ is not allowed in code node') - - def test_warn_code_frame_escape(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WARN_CODE2, executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '.tb_frame is not allowed in code node') - - def test_code_error(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, ERROR_CODE, executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '[ZeroDivisionError] division by zero') - - def test_unserializable_code(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, UNSERIALIZABLE_CODE, - executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, "[TypeError] Object of type dict_keys is not JSON serializable") - - def test_inf_loop(self): - res = execute_node_impl(executor.pools, executor.get_and_increment(), {"n": 1}, INF_LOOP_CODE, executor.config) - self.assertFalse(res.isOk) - self.assertEqual(res.msg, '[TimeoutError] Execution timed out') diff --git a/framework/fel/python/requirements.txt b/framework/fel/python/requirements.txt index 3a5ef0cc..7caa4dc4 100644 --- a/framework/fel/python/requirements.txt +++ b/framework/fel/python/requirements.txt @@ -1,7 +1,3 @@ -pydantic==2.7.4 -psutil==6.1.1 -httpx==0.28.1 -pandas==2.1.3 llama-index==0.12.46 langchain-core==0.3.68 langchain_community==0.3.27