From ac9c4fc18bfa5ce93969fd28455fb7de942493fd Mon Sep 17 00:00:00 2001 From: Akeyiii Date: Mon, 29 Sep 2025 17:27:30 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[fit]=20build=E5=91=BD=E4=BB=A4=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=87=AA=E5=AE=9A=E4=B9=89=E7=BB=93=E6=9E=84=E4=BD=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fit/python/fit_cli/utils/build.py | 30 ++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/framework/fit/python/fit_cli/utils/build.py b/framework/fit/python/fit_cli/utils/build.py index 27ad085d..a9101ba7 100644 --- a/framework/fit/python/fit_cli/utils/build.py +++ b/framework/fit/python/fit_cli/utils/build.py @@ -25,7 +25,7 @@ type_errors = [] -def parse_type(annotation): +def parse_type(annotation, custom_classes): """解析参数类型""" global type_errors @@ -36,6 +36,8 @@ def parse_type(annotation): elif isinstance(annotation, ast.Name): if annotation.id in TYPE_MAP: return TYPE_MAP[annotation.id], None, True + elif annotation.id in custom_classes: + return "object", None, True else: type_errors.append(f"不支持的类型: {annotation.id}") return "invalid", None, True @@ -49,7 +51,7 @@ def parse_type(annotation): # List[int] if container in ("list", "List"): - item_type, _, _ = parse_type(annotation.slice) + item_type, _, _ = parse_type(annotation.slice, custom_classes) if item_type == "invalid": type_errors.append(f"不支持的列表元素类型: {annotation.slice}") return "invalid", None, True @@ -61,7 +63,7 @@ def parse_type(annotation): # Optional[int] elif container == "Optional": - inner_type, inner_items, _ = parse_type(annotation.slice) + inner_type, inner_items, _ = parse_type(annotation.slice, custom_classes) if inner_type == "invalid": type_errors.append(f"不支持的Optional类型: {annotation.slice}") return "invalid", None, False @@ -76,14 +78,14 @@ def parse_type(annotation): items = [] if isinstance(annotation.slice, ast.Tuple): for elt in annotation.slice.elts: - item_type, _, _ = parse_type(elt) + item_type, _, _ = parse_type(elt, custom_classes) if item_type == "invalid": type_errors.append(f"不支持的元组元素类型: {ast.dump(elt)}") return "invalid", None, True items.append({"type":item_type}) return "array", f"{items}", True else: - item_type, _, _ = parse_type(annotation.slice) + item_type, _, _ = parse_type(annotation.slice, custom_classes) if item_type == "invalid": type_errors.append(f"不支持的元组元素类型: {ast.dump(annotation.slice)}") return "invalid", None, True @@ -91,7 +93,7 @@ def parse_type(annotation): # Set[int] elif container in ("set", "Set"): - item_type, _, _ = parse_type(annotation.slice) + item_type, _, _ = parse_type(annotation.slice, custom_classes) if item_type == "invalid": type_errors.append(f"不支持的集合元素类型: {annotation.slice}") return "invalid", None, True @@ -106,7 +108,7 @@ def parse_type(annotation): return "invalid", None, True -def parse_parameters(args): +def parse_parameters(args, custom_classes): """解析函数参数""" properties = {} order = [] @@ -115,7 +117,7 @@ def parse_parameters(args): for arg in args.args: arg_name = arg.arg order.append(arg_name) - arg_type, items, is_required = parse_type(arg.annotation) + arg_type, items, is_required = parse_type(arg.annotation, custom_classes) # 定义参数 prop_def = { "defaultValue": "", @@ -132,12 +134,12 @@ def parse_parameters(args): return properties, order, required -def parse_return(annotation): +def parse_return(annotation, custom_classes): """解析返回值类型""" if not annotation: return {"type": "string", "convertor": ""} - return_type, items, _ = parse_type(annotation) + return_type, items, _ = parse_type(annotation, custom_classes) ret = { "type": return_type, **({"items": items} if items else {}), @@ -155,8 +157,12 @@ def parse_python_file(file_path: Path): py_name = file_path.stem definitions = [] tool_groups = [] + custom_classes = set() for node in tree.body: + if isinstance(node, ast.ClassDef): + custom_classes.add(node.name) + if isinstance(node, ast.FunctionDef): func_name = node.name # 默认描述 @@ -185,8 +191,8 @@ def parse_python_file(file_path: Path): continue # 解析参数和返回值 - properties, order, required = parse_parameters(node.args) - return_schema = parse_return(node.returns) + properties, order, required = parse_parameters(node.args, custom_classes) + return_schema = parse_return(node.returns, custom_classes) # definition schema definition_schema = { From b19ea3f56338ce542f3d154ab45b3ab671683b3c Mon Sep 17 00:00:00 2001 From: Akeyiii Date: Wed, 1 Oct 2025 14:45:40 +0800 Subject: [PATCH 2/2] =?UTF-8?q?[fit]=20=E5=AE=8C=E5=96=84build=20=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fit/python/fit_cli/utils/build.py | 275 +++++++++++++++----- 1 file changed, 203 insertions(+), 72 deletions(-) diff --git a/framework/fit/python/fit_cli/utils/build.py b/framework/fit/python/fit_cli/utils/build.py index a9101ba7..7898815c 100644 --- a/framework/fit/python/fit_cli/utils/build.py +++ b/framework/fit/python/fit_cli/utils/build.py @@ -25,7 +25,23 @@ type_errors = [] -def parse_type(annotation, custom_classes): +class ClassInfo: + """存储自定义类的信息""" + def __init__(self, name:str): + self.name = name + self.fields: dict[str, ast.AST] = {} # 字段名 -> 类型注解 + self.bases: list[str] = [] # 基类名 + + def add_field(self, field_name: str, annotation: ast.AST): + """添加字段""" + self.fields[field_name] = annotation + + def add_base(self, base_name: str): + """添加基类""" + self.bases.append(base_name) + + +def parse_type(annotation, class_map): """解析参数类型""" global type_errors @@ -34,81 +50,133 @@ def parse_type(annotation, custom_classes): return "invalid", None, True elif isinstance(annotation, ast.Name): - if annotation.id in TYPE_MAP: + if annotation.id in TYPE_MAP: # 基础类型 return TYPE_MAP[annotation.id], None, True - elif annotation.id in custom_classes: - return "object", None, True - else: + elif annotation.id in class_map: # 自定义类型 + class_info = class_map[annotation.id] + properties = {} + for field_name, field_annotation in class_info.fields.items(): + field_type, field_items, _ = parse_type(field_annotation, class_map) + + if field_type == "invalid": + type_errors.append(f"类 {annotation.id} 的字段 {field_name} 类型无效") + return "invalid", None, True + + field_schema = {"type": field_type} + if field_type == "array": + field_schema["items"] = field_items if field_items else {} + elif field_type == "object" and field_items: + if "properties" in field_items: + field_schema["properties"] = field_items["properties"] + if "anyOf" in field_items: + field_schema["anyOf"] = field_items["anyOf"] + if "additionalProperties" in field_items: + field_schema["additionalProperties"] = field_items["additionalProperties"] + + properties[field_name] = field_schema + return "object", {"type":"object", "properties":properties}, True + else: # 未知类型 type_errors.append(f"不支持的类型: {annotation.id}") return "invalid", None, True - elif isinstance(annotation, ast.Constant) and annotation.value is None: + elif isinstance(annotation, ast.Constant) and annotation.value is None: # None return "null", None, False - elif isinstance(annotation, ast.Subscript): - if isinstance(annotation.value, ast.Name): - container = annotation.value.id + elif isinstance(annotation, ast.Subscript) and isinstance(annotation.value, ast.Name): # 容器类型 + container = annotation.value.id - # List[int] - if container in ("list", "List"): - item_type, _, _ = parse_type(annotation.slice, custom_classes) - if item_type == "invalid": - type_errors.append(f"不支持的列表元素类型: {annotation.slice}") + # List[int] + if container in ("list", "List"): + item_type, item_schema, _ = parse_type(annotation.slice, class_map) + if item_type == "invalid": + type_errors.append(f"不支持的列表元素类型: {annotation.slice}") + return "invalid", None, True + items = item_schema if item_schema else {"type": item_type} + return "array", items, True + + # Dict[str, int] → object + elif container in ("dict", "Dict"): + if isinstance(annotation.slice, ast.Tuple) and len(annotation.slice.elts) == 2: + key_annot, value_annot = annotation.slice.elts + key_type, _, _ = parse_type(key_annot, class_map) + if key_type != "string": + type_errors.append(f"Dict 的键类型必须是 string,实际是 {key_type}") return "invalid", None, True - return "array", {"type": item_type}, True + value_type, value_schema, _ = parse_type(value_annot, class_map) + items = value_schema if value_schema else {"type": value_type} + return "object", {"additionalProperties": items}, True - # Dict[str, int] → object - elif container in ("dict", "Dict"): - return "object", None, True + # Optional[int] + elif container == "Optional": + inner_type, inner_items, _ = parse_type(annotation.slice, class_map) - # Optional[int] - elif container == "Optional": - inner_type, inner_items, _ = parse_type(annotation.slice, custom_classes) + if inner_type == "invalid": + type_errors.append(f"不支持的Optional类型: {annotation.slice}") + return "invalid", None, False + return inner_type, inner_items, False + + # Union[str, int] + elif container == "Union": + if isinstance(annotation.slice, ast.Tuple): + schemas = [] + for elt in annotation.slice.elts: + elt_type, elt_items, _ = parse_type(elt, class_map) + if elt_type == "invalid": + type_errors.append(f"不支持的 Union 元素类型: {ast.dump(elt)}") + return "invalid", None, True + schema = {"type": elt_type} + if elt_items: + schema.update(elt_items) + schemas.append(schema) + return "object", {"anyOf": schemas}, True + else: + inner_type, inner_items, _ = parse_type(annotation.slice, class_map) if inner_type == "invalid": - type_errors.append(f"不支持的Optional类型: {annotation.slice}") - return "invalid", None, False - return inner_type, inner_items, False - - # Union[str, int] - elif container == "Union": - return "object", None, True - - # Tuple[str] - elif container in ("tuple", "Tuple"): - items = [] - if isinstance(annotation.slice, ast.Tuple): - for elt in annotation.slice.elts: - item_type, _, _ = parse_type(elt, custom_classes) - if item_type == "invalid": - type_errors.append(f"不支持的元组元素类型: {ast.dump(elt)}") - return "invalid", None, True - items.append({"type":item_type}) - return "array", f"{items}", True - else: - item_type, _, _ = parse_type(annotation.slice, custom_classes) + type_errors.append(f"不支持的 Union 类型: {ast.dump(annotation.slice)}") + return "invalid", None, True + schema = {"type": inner_type} + if inner_items: + schema.update(inner_items) + return "object", {"anyOf": [schema]}, True + + # Tuple[str] + elif container in ("tuple", "Tuple"): + if isinstance(annotation.slice, ast.Tuple): + tuple_items = [] + for elt in annotation.slice.elts: + item_type, item_schema, _ = parse_type(elt, class_map) if item_type == "invalid": - type_errors.append(f"不支持的元组元素类型: {ast.dump(annotation.slice)}") + type_errors.append(f"不支持的元组元素类型: {ast.dump(elt)}") return "invalid", None, True - return "array", {"type":item_type}, True - - # Set[int] - elif container in ("set", "Set"): - item_type, _, _ = parse_type(annotation.slice, custom_classes) + tuple_items.append(item_schema if item_schema else {"type": item_type}) + # 返回固定长度 tuple 的 items 列表 + return "array", {"items": tuple_items}, True + else: + # 单元素 Tuple + item_type, item_schema, _ = parse_type(annotation.slice, class_map) if item_type == "invalid": - type_errors.append(f"不支持的集合元素类型: {annotation.slice}") + type_errors.append(f"不支持的元组元素类型: {ast.dump(annotation.slice)}") return "invalid", None, True - return "array", {"type": item_type}, True - - - else: - type_errors.append(f"不支持的容器类型: {container}") + return "array", {"items": item_schema if item_schema else {"type": item_type}}, True + + # Set[int] + elif container in ("set", "Set"): + item_type, item_schema, _ = parse_type(annotation.slice, class_map) + if item_type == "invalid": + type_errors.append(f"不支持的集合元素类型: {annotation.slice}") return "invalid", None, True + items = item_schema if item_schema else {"type": item_type} + return "array", items, True + + else: + type_errors.append(f"不支持的容器类型: {container}") + return "invalid", None, True type_errors.append(f"无法识别的类型: {ast.dump(annotation)}") return "invalid", None, True -def parse_parameters(args, custom_classes): +def parse_parameters(args, class_map): """解析函数参数""" properties = {} order = [] @@ -117,17 +185,38 @@ def parse_parameters(args, custom_classes): for arg in args.args: arg_name = arg.arg order.append(arg_name) - arg_type, items, is_required = parse_type(arg.annotation, custom_classes) + arg_type, items, is_required = parse_type(arg.annotation, class_map) # 定义参数 prop_def = { "defaultValue": "", "description": f"参数 {arg_name}", "name": arg_name, "type": arg_type, - **({"items": items} if items else {}), - "examples": "", - "required": is_required, } + if arg_type == "array" and items: + if "items" in items: + prop_def["items"] = items["items"] + else: + arr_items = {"type": items.get("type", "object")} + if "properties" in items: + arr_items["properties"] = items["properties"] + if "anyOf" in items: + arr_items["anyOf"] = items["anyOf"] + if "additionalProperties" in items: + arr_items["additionalProperties"] = items["additionalProperties"] + prop_def["items"] = arr_items + + if arg_type == "object" and items: + if "properties" in items: + prop_def["properties"] = items["properties"] + if "anyOf" in items: + prop_def["anyOf"] = items["anyOf"] + if "additionalProperties" in items: + prop_def["additionalProperties"] = items["additionalProperties"] + + prop_def["examples"] = "" + prop_def["required"] = is_required + properties[arg_name] = prop_def if is_required: required.append(arg_name) @@ -140,12 +229,37 @@ def parse_return(annotation, custom_classes): return {"type": "string", "convertor": ""} return_type, items, _ = parse_type(annotation, custom_classes) - ret = { - "type": return_type, - **({"items": items} if items else {}), - "convertor": "" - } - return ret + if return_type == "array": + ret = {"type": "array"} + if items: + if "items" in items: + ret["items"] = items["items"] + else: + arr_items = {"type": items.get("type", "object")} + if isinstance(items, dict) and "properties" in items: + arr_items["properties"] = items["properties"] + if isinstance(items, dict) and "anyOf" in items: + arr_items["anyOf"] = items["anyOf"] + if isinstance(items, dict) and "additionalProperties" in items: + arr_items["additionalProperties"] = items["additionalProperties"] + ret["items"] = arr_items + ret["convertor"] = "" + return ret + + elif return_type == "object": + ret = {"type": "object"} + if items and isinstance(items, dict): + if "properties" in items: + ret["properties"] = items["properties"] + if "anyOf" in items: + ret["anyOf"] = items["anyOf"] + if "additionalProperties" in items: + ret["additionalProperties"] = items["additionalProperties"] + ret["convertor"] = "" + return ret + + else: + return {"type": return_type, "convertor": ""} def parse_python_file(file_path: Path): @@ -157,15 +271,26 @@ def parse_python_file(file_path: Path): py_name = file_path.stem definitions = [] tool_groups = [] - custom_classes = set() - + class_map: dict[str, ClassInfo] = {} # 类名, 类信息 + # 收集自定义类 for node in tree.body: if isinstance(node, ast.ClassDef): - custom_classes.add(node.name) - + class_info = ClassInfo(node.name) + for base in node.bases: + if isinstance(base, ast.Name): + class_info.add_base(base.id) + for subnode in node.body: + if isinstance(subnode, ast.FunctionDef) and subnode.name == "__init__": + for arg in subnode.args.args[1:]: # 跳过 self + arg_name = arg.arg + class_info.add_field(arg_name, arg.annotation) + class_map[node.name] = class_info + # 解析函数定义 + for node in tree.body: if isinstance(node, ast.FunctionDef): func_name = node.name - # 默认描述 + + # 获取描述 description = f"执行 {func_name} 方法" if node.body and isinstance(node.body[0], ast.Expr): expr_value = node.body[0].value @@ -191,8 +316,8 @@ def parse_python_file(file_path: Path): continue # 解析参数和返回值 - properties, order, required = parse_parameters(node.args, custom_classes) - return_schema = parse_return(node.returns, custom_classes) + properties, order, required = parse_parameters(node.args, class_map) + return_schema = parse_return(node.returns, class_map) # definition schema definition_schema = { @@ -219,6 +344,9 @@ def parse_python_file(file_path: Path): "name": v["name"], "type": v["type"], **({"items": v["items"]} if "items" in v else {}), + **({"properties": v["properties"]} if "properties" in v else {}), + **({"anyOf": v["anyOf"]} if "anyOf" in v else {}), + **({"additionalProperties": v["additionalProperties"]} if "additionalProperties" in v else {}), "required": False, # 工具里参数默认非必填 } for k, v in properties.items() @@ -231,6 +359,9 @@ def parse_python_file(file_path: Path): "description": f"{func_name} 函数的返回值", "type": return_schema["type"], **({"items": return_schema["items"]} if "items" in return_schema else {}), + **({"properties": return_schema["properties"]} if "properties" in return_schema else {}), + **({"anyOf": return_schema["anyOf"]} if "anyOf" in return_schema else {}), + **({"additionalProperties": return_schema["additionalProperties"]} if "additionalProperties" in return_schema else {}), "convertor": "", "examples": "", },