From ce8a0f958650501b1677492e231df66639805eea Mon Sep 17 00:00:00 2001 From: Oliver <1021782472@qq.com> Date: Sun, 24 Aug 2025 15:54:31 +0800 Subject: [PATCH] feat Apache Linkis MCP --- linkis-mcp-server/auth.py | 11 + linkis-mcp-server/config.json | 13 + linkis-mcp-server/config.py | 59 ++++ linkis-mcp-server/datasource.py | 19 + linkis-mcp-server/knowledge.py | 326 ++++++++++++++++++ linkis-mcp-server/linkis_client.py | 130 +++++++ linkis-mcp-server/mcp_server.py | 220 ++++++++++++ linkis-mcp-server/metadata.py | 30 ++ linkis-mcp-server/readme.md | 74 ++++ linkis-mcp-server/requirements.txt | 3 + linkis-mcp-server/tasks.py | 53 +++ linkis-mcp-server/test/__init__.py | 0 linkis-mcp-server/test/test_auth_api.py | 13 + linkis-mcp-server/test/test_datasource_api.py | 30 ++ linkis-mcp-server/test/test_knowledge_api.py | 20 ++ linkis-mcp-server/test/test_linkis_client.py | 24 ++ linkis-mcp-server/test/test_mcp_tools.py | 14 + linkis-mcp-server/test/test_metadata_api.py | 51 +++ linkis-mcp-server/test/test_tasks_api.py | 69 ++++ 19 files changed, 1159 insertions(+) create mode 100644 linkis-mcp-server/auth.py create mode 100644 linkis-mcp-server/config.json create mode 100644 linkis-mcp-server/config.py create mode 100644 linkis-mcp-server/datasource.py create mode 100644 linkis-mcp-server/knowledge.py create mode 100644 linkis-mcp-server/linkis_client.py create mode 100644 linkis-mcp-server/mcp_server.py create mode 100644 linkis-mcp-server/metadata.py create mode 100644 linkis-mcp-server/readme.md create mode 100644 linkis-mcp-server/requirements.txt create mode 100644 linkis-mcp-server/tasks.py create mode 100644 linkis-mcp-server/test/__init__.py create mode 100644 linkis-mcp-server/test/test_auth_api.py create mode 100644 linkis-mcp-server/test/test_datasource_api.py create mode 100644 linkis-mcp-server/test/test_knowledge_api.py create mode 100644 linkis-mcp-server/test/test_linkis_client.py create mode 100644 linkis-mcp-server/test/test_mcp_tools.py create mode 100644 linkis-mcp-server/test/test_metadata_api.py create mode 100644 linkis-mcp-server/test/test_tasks_api.py diff --git a/linkis-mcp-server/auth.py b/linkis-mcp-server/auth.py new file mode 100644 index 00000000000..3f3137a1702 --- /dev/null +++ b/linkis-mcp-server/auth.py @@ -0,0 +1,11 @@ +from typing import Dict, Any +from linkis_client import LinkisClient +from config import API_PATHS + +class AuthAPI: + def __init__(self, client: LinkisClient): + self.client = client + + def login(self, username: str, password: str) -> Dict[str, Any]: + payload = {"userName": username, "password": password} + return self.client.post(API_PATHS["login"], payload) diff --git a/linkis-mcp-server/config.json b/linkis-mcp-server/config.json new file mode 100644 index 00000000000..3345a974bb6 --- /dev/null +++ b/linkis-mcp-server/config.json @@ -0,0 +1,13 @@ +{ + "current_env": "dev", + "dev": { + "LINKIS_BASE_URL": "http://localhost:9001", + "LINKIS_TOKEN": "your_dev_token", + "LINKIS_ENV": "dev" + }, + "prod": { + "LINKIS_BASE_URL": "http://localhost:9001", + "LINKIS_TOKEN": "your_prod_token", + "LINKIS_ENV": "prod" + } +} diff --git a/linkis-mcp-server/config.py b/linkis-mcp-server/config.py new file mode 100644 index 00000000000..c167d7b71c8 --- /dev/null +++ b/linkis-mcp-server/config.py @@ -0,0 +1,59 @@ +import json +import os + +import json +import os + +CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.json") + +if not os.path.exists(CONFIG_PATH): + raise FileNotFoundError(f"未找到配置文件: {CONFIG_PATH}") + +with open(CONFIG_PATH, "r", encoding="utf-8") as f: + all_cfg = json.load(f) + +env_mode = all_cfg.get("current_env", "dev") +if env_mode not in all_cfg: + raise ValueError(f"未知配置环境: {env_mode},可选值: {list(all_cfg.keys())}") + +cfg = all_cfg[env_mode] + +LINKIS_BASE_URL = cfg.get("LINKIS_BASE_URL") +LINKIS_TOKEN = cfg.get("LINKIS_TOKEN") +LINKIS_ENV = cfg.get("LINKIS_ENV", env_mode) + +if not LINKIS_BASE_URL: + raise ValueError(f"[{env_mode}] LINKIS_BASE_URL 未配置") +if not LINKIS_TOKEN: + raise ValueError(f"[{env_mode}] LINKIS_TOKEN 未配置") + +# API 路径集中管理 +API_PATHS = { + # Auth + "login": "/api/rest_j/v1/user/login", + + # Metadata + "get_columns": "/api/rest_j/v1/metadataQuery/getColumns", + "get_databases": "/api/rest_j/v1/metadataQuery/getDatabases", + "get_tables": "/api/rest_j/v1/metadataQuery/getTables", + "get_partitions": "/api/rest_j/v1/metadataQuery/getPartitions", + "get_table_props": "/api/rest_j/v1/metadataQuery/getTableProps", + "get_partition_props": "/api/rest_j/v1/metadataQuery/getPartitionProps", + + # DataSource + "displaysql": "/api/rest_j/v1/datasource/displaysql", + "get_table_fields_info": "/api/rest_j/v1/datasource/getTableFieldsInfo", + "get_table_statistic_info": "/api/rest_j/v1/datasource/getTableStatisticInfo", + + # Tasks + "execute": "/api/rest_j/v1/entrance/execute", + "submit": "/api/rest_j/v1/entrance/submit", + "kill": "/api/rest_j/v1/entrance/{id}/kill", + "kill_jobs": "/api/rest_j/v1/entrance/{id}/killJobs", + "pause": "/api/rest_j/v1/entrance/{id}/pause", + "progress": "/api/rest_j/v1/entrance/{id}/progress", + "progress_with_resource": "/api/rest_j/v1/entrance/{id}/progressWithResource", + "status": "/api/rest_j/v1/entrance/{id}/status", + "runningtask": "/api/rest_j/v1/entrance/api/metrics/runningtask", + "taskinfo": "/api/rest_j/v1/entrance/api/metrics/taskinfo" +} diff --git a/linkis-mcp-server/datasource.py b/linkis-mcp-server/datasource.py new file mode 100644 index 00000000000..bb23e7e91f1 --- /dev/null +++ b/linkis-mcp-server/datasource.py @@ -0,0 +1,19 @@ +from typing import Dict, Any +from linkis_client import LinkisClient +from config import API_PATHS + +class DataSourceAPI: + def __init__(self, client: LinkisClient): + self.client = client + + def displaysql(self, datasource_id: int, sql: str) -> Dict[str, Any]: + payload = {"id": datasource_id, "sql": sql} + return self.client.post(API_PATHS["displaysql"], payload) + + def get_table_fields_info(self, datasource_id: int, table: str) -> Dict[str, Any]: + params = {"id": datasource_id, "table": table} + return self.client.get(API_PATHS["get_table_fields_info"], params) + + def get_table_statistic_info(self, datasource_id: int, table: str) -> Dict[str, Any]: + params = {"id": datasource_id, "table": table} + return self.client.get(API_PATHS["get_table_statistic_info"], params) diff --git a/linkis-mcp-server/knowledge.py b/linkis-mcp-server/knowledge.py new file mode 100644 index 00000000000..4182fe3cdf8 --- /dev/null +++ b/linkis-mcp-server/knowledge.py @@ -0,0 +1,326 @@ +from typing import List, Dict, Optional + +class KnowledgeEntry: + def __init__(self, id: str, question: str, answer: str, + tags: Optional[List[str]] = None, aliases: Optional[List[str]] = None): + self.id = id + self.question = question + self.answer = answer + self.tags = tags or [] + self.aliases = aliases or [] + + def matches(self, query: str) -> bool: + query_lower = query.lower() + return (query_lower in self.question.lower() or + any(query_lower in alias.lower() for alias in self.aliases) or + query_lower == self.id.lower()) + + +class KnowledgeQA: + def __init__(self): + self._entries: List[KnowledgeEntry] = [] + self._load_entries() + + def _load_entries(self): + raw_data = [ + { + "id": "api-entrance-submit-sql", + "question": "如何提交一条 SQL 任务", + "answer": "POST /api/rest_j/v1/entrance/submit\n必填: executionContent.code=SQL文本, executionContent.runType=sql, labels.engineType(如 hive-1.2.1), labels.userCreator(如 IDE_user)", + "tags": ["api", "task", "sql"], + "aliases": ["提交SQL任务", "SQL任务提交接口", "SQL 提交"] + }, + { + "id": "api-entrance-submit-script", + "question": "如何提交脚本任务并指定语言", + "answer": "POST /api/rest_j/v1/entrance/submit\nexecutionContent.code=脚本, executionContent.runType=python/shell/spark, labels.engineType 与引擎匹配", + "tags": ["api", "task", "script"], + "aliases": ["提交脚本", "提交 Python 任务", "提交 Shell 任务"] + }, + { + "id": "api-entrance-status", + "question": "如何查询任务状态", + "answer": "GET /api/rest_j/v1/entrance/{execID}/status\n路径参数: execID=任务ID", + "tags": ["api", "task", "status"], + "aliases": ["任务状态查询", "查询状态"] + }, + { + "id": "api-entrance-progress", + "question": "如何查询任务进度", + "answer": "GET /api/rest_j/v1/entrance/{execID}/progress\n返回阶段进度与总进度", + "tags": ["api", "task", "progress"], + "aliases": ["任务进度", "进度查询"] + }, + { + "id": "api-entrance-log", + "question": "如何获取任务日志", + "answer": "GET /api/rest_j/v1/entrance/{execID}/log?fromLine=1&size=200\n参数: fromLine 起始行, size 行数", + "tags": ["api", "task", "log"], + "aliases": ["查看日志", "拉取日志"] + }, + { + "id": "api-entrance-result-list", + "question": "如何列出任务的结果集", + "answer": "GET /api/rest_j/v1/entrance/{execID}/resultset\n返回结果集索引与基本信息", + "tags": ["api", "task", "result"], + "aliases": ["结果集列表", "列出结果集"] + }, + { + "id": "api-entrance-result", + "question": "如何获取任务结果集", + "answer": "GET /api/rest_j/v1/entrance/{execID}/resultset/{index}?size=500\n参数: index 结果集序号, size 返回行数", + "tags": ["api", "task", "result"], + "aliases": ["下载结果集", "查看结果集"] + }, + { + "id": "api-entrance-kill", + "question": "如何杀死任务", + "answer": "POST /api/rest_j/v1/entrance/{execID}/kill", + "tags": ["api", "task", "kill"], + "aliases": ["终止任务", "取消任务", "停止任务"] + }, + { + "id": "api-metadata-databases", + "question": "如何查询数据源下的数据库列表", + "answer": "GET /api/rest_j/v1/metadataQuery/getDatabases?dataSourceName=xxx&system=hive", + "tags": ["api", "metadata", "database"], + "aliases": ["获取数据库列表", "列出数据库"] + }, + { + "id": "api-metadata-tables", + "question": "如何查询数据库下的表列表", + "answer": "GET /api/rest_j/v1/metadataQuery/getTables?dataSourceName=xxx&database=xxx&system=hive", + "tags": ["api", "metadata", "table"], + "aliases": ["获取表列表", "列出表"] + }, + { + "id": "api-metadata-columns", + "question": "如何查询表的字段信息", + "answer": "GET /api/rest_j/v1/metadataQuery/getColumns?dataSourceName=xxx&database=xxx&table=xxx&system=hive", + "tags": ["api", "metadata", "column"], + "aliases": ["获取表结构", "查看字段信息"] + }, + { + "id": "api-metadata-partitions", + "question": "如何查询表的分区信息", + "answer": "GET /api/rest_j/v1/metadataQuery/getPartitions?dataSourceName=xxx&database=xxx&table=xxx&system=hive", + "tags": ["api", "metadata", "partition"], + "aliases": ["查看分区", "分区信息查询"] + }, + { + "id": "api-metadata-tableinfo", + "question": "如何获取表的详细信息", + "answer": "GET /api/rest_j/v1/metadataQuery/getTableInfo?dataSourceName=xxx&database=xxx&table=xxx&system=hive", + "tags": ["api", "metadata", "table"], + "aliases": ["表信息", "表详情"] + }, + { + "id": "api-datasource-list", + "question": "如何获取已注册数据源列表", + "answer": "GET /api/rest_j/v1/datasource/getAll", + "tags": ["api", "datasource", "list"], + "aliases": ["列出数据源", "获取数据源列表"] + }, + { + "id": "api-datasource-detail", + "question": "如何根据 ID 获取数据源详情", + "answer": "GET /api/rest_j/v1/datasource/get?id=123", + "tags": ["api", "datasource", "detail"], + "aliases": ["数据源详情", "查询数据源"] + }, + { + "id": "api-datasource-search", + "question": "如何按名称搜索数据源", + "answer": "GET /api/rest_j/v1/datasource/search?name=xxx", + "tags": ["api", "datasource", "search"], + "aliases": ["搜索数据源", "按名称查数据源"] + }, + { + "id": "api-datasource-create", + "question": "如何创建数据源", + "answer": "POST /api/rest_j/v1/datasource/create\nBody: name, type(如 hive/mysql), connectParams(主机/端口/库/用户名等), labels(可选)", + "tags": ["api", "datasource", "create"], + "aliases": ["新建数据源", "注册数据源"] + }, + { + "id": "api-datasource-update", + "question": "如何更新数据源配置", + "answer": "POST /api/rest_j/v1/datasource/update\nBody: id, 需更新的字段,如 connectParams 或 labels", + "tags": ["api", "datasource", "update"], + "aliases": ["修改数据源", "编辑数据源"] + }, + { + "id": "api-datasource-delete", + "question": "如何删除数据源", + "answer": "POST /api/rest_j/v1/datasource/delete\nBody: id 或 ids 列表", + "tags": ["api", "datasource", "delete"], + "aliases": ["移除数据源", "删除数据源"] + }, + { + "id": "api-datasource-test", + "question": "如何测试数据源连通性", + "answer": "POST /api/rest_j/v1/datasource/testConnect\nBody: type, connectParams(与创建一致)", + "tags": ["api", "datasource", "test"], + "aliases": ["测试连接", "连通性测试"] + }, + { + "id": "api-datasource-displaysql", + "question": "如何生成建表 DDL", + "answer": "POST /api/rest_j/v1/datasource/displaysql\nBody 可包含 table/schema 字段,用于生成建库建表 DDL", + "tags": ["api", "datasource", "ddl"], + "aliases": ["生成DDL", "建表语句"] + }, + { + "id": "api-jobhistory-list", + "question": "如何分页查询任务历史", + "answer": "GET /api/rest_j/v1/jobhistory/list?user=xxx&pageNow=1&pageSize=20", + "tags": ["api", "task", "history"], + "aliases": ["任务历史查询", "任务列表"] + }, + { + "id": "api-jobhistory-detail", + "question": "如何获取历史任务详情", + "answer": "GET /api/rest_j/v1/jobhistory/{jobID}", + "tags": ["api", "task", "history"], + "aliases": ["任务详情", "历史任务详情"] + }, + { + "id": "cfg-engine-type", + "question": "labels.engineType 如何配置", + "answer": "labels.engineType 需与部署的引擎标识一致,如 hive-1.2.1, spark-2.4.7。示例: {\"labels\":{\"engineType\":\"hive-1.2.1\",\"userCreator\":\"IDE_user\"}}", + "tags": ["config", "engine"], + "aliases": ["engineType 配置", "引擎类型标签"] + }, + { + "id": "cfg-user-creator", + "question": "labels.userCreator 有什么作用", + "answer": "用于标识请求来源或创建者类型,例如 IDE_user、scheduler_xxx,便于路由和审计", + "tags": ["config", "labels"], + "aliases": ["userCreator 配置", "用户创建者标签"] + }, + { + "id": "cfg-timeout", + "question": "任务超时时间如何设置", + "answer": "可通过 runtime.max.askExecutorTimes 或 engineConn.timeout 控制任务超时(毫秒),具体取决于你的引擎与网关实现", + "tags": ["config", "timeout"], + "aliases": ["设置超时", "执行超时配置"] + }, + { + "id": "cfg-queue", + "question": "如何指定资源队列", + "answer": "通常通过 labels.queue 或执行参数中设置队列名(如 YARN 队列),也可在引擎侧配置默认队列", + "tags": ["config", "resource"], + "aliases": ["设置队列", "资源队列配置"] + }, + { + "id": "cfg-script-path", + "question": "如何记录脚本来源路径", + "answer": "可在 source.scriptPath 中传入源脚本路径(如 /tmp/demo.sql),便于审计与追踪", + "tags": ["config", "source"], + "aliases": ["scriptPath", "脚本路径"] + }, + { + "id": "cfg-vars", + "question": "如何在任务中传递变量", + "answer": "可通过 params.variable 传递键值对变量,由引擎侧在执行前进行替换/注入", + "tags": ["config", "params"], + "aliases": ["任务变量", "参数变量"] + }, + { + "id": "err-11001", + "question": "Linkis 报错 11001 是什么原因", + "answer": "11001 通常表示认证失败或未登录。请检查登录状态、凭据有效性以及网关鉴权配置", + "tags": ["error", "auth"], + "aliases": ["错误码 11001", "未登录 11001"] + }, + { + "id": "err-12001", + "question": "参数错误 12001 如何处理", + "answer": "检查必填参数是否缺失,类型是否正确(如 envId 应为字符串),并参考接口文档修正", + "tags": ["error", "params"], + "aliases": ["错误码 12001", "参数错误 12001"] + }, + { + "id": "err-13002", + "question": "执行引擎不可用 13002 怎么办", + "answer": "该错误表示目标执行引擎未启动或标签不匹配,请检查引擎服务状态、资源队列与 labels.engineType", + "tags": ["error", "engine"], + "aliases": ["错误码 13002", "引擎不可用"] + }, + { + "id": "err-14001", + "question": "权限不足 14001 如何排查", + "answer": "确认调用用户是否具备操作权限(数据源、数据库、表级权限),检查网关与后端鉴权策略", + "tags": ["error", "permission"], + "aliases": ["错误码 14001", "无权限 14001"] + }, + { + "id": "err-15001", + "question": "任务执行超时 15001 如何处理", + "answer": "适当提升超时阈值(如 engineConn.timeout),优化 SQL/脚本,或检查资源队列拥塞情况", + "tags": ["error", "timeout"], + "aliases": ["错误码 15001", "执行超时 15001"] + }, + { + "id": "err-16001", + "question": "资源不足 16001 如何处理", + "answer": "检查队列/集群资源是否充足,调大资源配额或调整并发度,必要时更换空闲队列", + "tags": ["error", "resource"], + "aliases": ["错误码 16001", "资源不足 16001"] + }, + { + "id": "howto-no-resultset", + "question": "提交成功但没有结果集怎么办", + "answer": "确认语句是否产生结果(如 DDL/DML 通常无结果集),检查 resultset 索引列表与权限限制", + "tags": ["howto", "result"], + "aliases": ["无结果集", "没有结果"] + }, + { + "id": "howto-paginate-result", + "question": "如何分页获取大结果集", + "answer": "通过 size 控制每次拉取行数,并循环拉取;若支持 offset/nextToken,可按游标分页", + "tags": ["howto", "result"], + "aliases": ["结果集分页", "分页获取结果"] + }, + { + "id": "howto-retry", + "question": "任务失败如何重试", + "answer": "可重新提交同一脚本/SQL;若系统支持重试 API,优先使用;同时排查失败原因(日志、资源、权限)", + "tags": ["howto", "task"], + "aliases": ["失败重试", "任务重试"] + }, + { + "id": "howto-choose-engine", + "question": "如何选择合适的引擎", + "answer": "按任务类型与生态匹配引擎(如 SQL 选 hive/spark-sql,批处理选 spark,脚本选 python/shell),并配置对应 engineType", + "tags": ["howto", "engine"], + "aliases": ["选择引擎", "引擎如何选"] + }, + ] + + self._entries = [KnowledgeEntry(**item) for item in raw_data] + + def ask(self, query: str, top_k: int = 1) -> List[Dict]: + matches = [e for e in self._entries if e.matches(query)] + results = [{ + "id": e.id, + "question": e.question, + "answer": e.answer, + "tags": e.tags, + "aliases": e.aliases + } for e in matches] + return results[:top_k] + + def search_by_tag(self, tag: str) -> List[Dict]: + tag_lower = tag.lower() + matches = [e for e in self._entries if tag_lower in (t.lower() for t in e.tags)] + return [{ + "id": e.id, + "question": e.question, + "answer": e.answer, + "tags": e.tags, + "aliases": e.aliases + } for e in matches] + + def add_entry(self, entry: Dict): + self._entries.append(KnowledgeEntry(**entry)) \ No newline at end of file diff --git a/linkis-mcp-server/linkis_client.py b/linkis-mcp-server/linkis_client.py new file mode 100644 index 00000000000..d9e36d63093 --- /dev/null +++ b/linkis-mcp-server/linkis_client.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, Optional, Union + +import requests + +from config import LINKIS_BASE_URL, LINKIS_TOKEN, LINKIS_ENV + +class LinkisError(Exception): + def __init__(self, message: str, status: Optional[int] = None, payload: Any = None): + super().__init__(message) + self.status = status + self.payload = payload + + def __str__(self) -> str: + base = super().__str__() + if self.status is not None: + base += f" (HTTP {self.status})" + if self.payload is not None: + base += f" | payload={self.payload}" + return base + + +def _join_url(base_url: str, path: str) -> str: + return f"{base_url.rstrip('/')}/{path.lstrip('/')}" + + +def _clean_params(d: Optional[Dict[str, Any]]) -> Dict[str, Any]: + return {k: v for k, v in (d or {}).items() if v is not None} + + +def _interpolate_path(path_template: str, path_params: Optional[Dict[str, Any]]) -> str: + if not path_params: + return path_template + out = path_template + for k, v in path_params.items(): + out = out.replace(f"{{{k}}}", requests.utils.quote(str(v), safe="")) + return out + + +def _safe_resp_payload(resp: requests.Response) -> Any: + try: + return resp.json() + except Exception: + return resp.text + + +class LinkisClient: + + def __init__( + self, + base_url: Optional[str] = None, + token: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + timeout: Optional[Union[int, float]] = None, + verify_ssl: Optional[bool] = True, + extra_headers: Optional[Dict[str, str]] = None, + ): + self.base_url = (base_url or LINKIS_BASE_URL).strip() + if not self.base_url: + raise ValueError("LINKIS_BASE_URL 未配置") + + self.token = (token or LINKIS_TOKEN).strip() if (token or LINKIS_TOKEN) else None + self.username = username + self.password = password + self.timeout = float(timeout if timeout is not None else 30) + self.verify_ssl = verify_ssl + + self._session = requests.Session() + self._session.headers.update({"Accept": "*/*"}) + if self.token: + self._session.headers.update({"Authorization": f"Bearer {self.token}"}) + if extra_headers: + self._session.headers.update(extra_headers) + + if self.username and self.password: + try: + self.login(self.username, self.password) + except Exception as e: + print(f"[LinkisClient] Auto login failed: {e}") + + def login(self, user_name: str, password: str) -> Dict[str, Any]: + path = "/api/rest_j/v1/user/login" + url = _join_url(self.base_url, path) + payload = {"userName": user_name, "password": password} + + resp = self._session.post(url, json=payload, timeout=self.timeout, verify=self.verify_ssl) + if not resp.ok: + raise LinkisError("Login failed", status=resp.status_code, payload=_safe_resp_payload(resp)) + + data = _safe_resp_payload(resp) + return data + + def get( + self, + path: str, + params: Optional[Dict[str, Any]] = None, + path_params: Optional[Dict[str, Any]] = None, + ) -> Any: + url = _join_url(self.base_url, _interpolate_path(path, path_params)) + resp = self._session.get(url, params=_clean_params(params), timeout=self.timeout, verify=self.verify_ssl) + return self._handle_response(resp) + + def post( + self, + path: str, + json_body: Optional[Dict[str, Any]] = None, + path_params: Optional[Dict[str, Any]] = None, + ) -> Any: + url = _join_url(self.base_url, _interpolate_path(path, path_params)) + headers = {"Content-Type": "application/json"} + resp = self._session.post( + url, + data=None if json_body is None else json.dumps(json_body), + headers=headers, + timeout=self.timeout, + verify=self.verify_ssl, + ) + return self._handle_response(resp) + + def _handle_response(self, resp: requests.Response) -> Any: + if not resp.ok: + raise LinkisError("Request failed", status=resp.status_code, payload=_safe_resp_payload(resp)) + + try: + return resp.json() + except ValueError: + return resp.text diff --git a/linkis-mcp-server/mcp_server.py b/linkis-mcp-server/mcp_server.py new file mode 100644 index 00000000000..439cfb7b6c4 --- /dev/null +++ b/linkis-mcp-server/mcp_server.py @@ -0,0 +1,220 @@ +from mcp.server.fastmcp import FastMCP + +from config import LINKIS_TOKEN, LINKIS_ENV +from linkis_client import LinkisClient +from auth import AuthAPI +from metadata import MetadataAPI +from datasource import DataSourceAPI +from tasks import TaskAPI +from knowledge import KnowledgeQA + +mcp = FastMCP("Linkis MCP Server") + +# 初始化 Linkis 客户端与各模块 +client = LinkisClient(LINKIS_TOKEN) + +auth_api = AuthAPI(client) +metadata_api = MetadataAPI(client) +datasource_api = DataSourceAPI(client) +task_api = TaskAPI(client) +qa = KnowledgeQA() + + +def _ok(data): + return {"ok": True, "data": data} + + +def _err(e: Exception): + return {"ok": False, "error": str(e)} + + +# ========== Auth ========== +@mcp.tool() +def login(username: str, password: str): + """用户登录,获取/验证凭据""" + try: + return _ok(auth_api.login(username, password)) + except Exception as e: + return _err(e) + + +# ========== Metadata ========== +@mcp.tool() +def get_databases(): + try: + return _ok(metadata_api.get_databases()) + except Exception as e: + return _err(e) + +@mcp.tool() +def get_tables(database: str): + try: + return _ok(metadata_api.get_tables(database)) + except Exception as e: + return _err(e) + +@mcp.tool() +def get_columns(database: str, table: str): + try: + return _ok(metadata_api.get_columns(database, table)) + except Exception as e: + return _err(e) + +@mcp.tool() +def get_partitions(database: str, table: str): + try: + return _ok(metadata_api.get_partitions(database, table)) + except Exception as e: + return _err(e) + +@mcp.tool() +def get_table_props(database: str, table: str): + try: + return _ok(metadata_api.get_table_props(database, table)) + except Exception as e: + return _err(e) + +@mcp.tool() +def get_partition_props(database: str, table: str, partition: str): + try: + return _ok(metadata_api.get_partition_props(database, table, partition)) + except Exception as e: + return _err(e) + + +# ========== DataSource ========== +@mcp.tool() +def displaysql(datasource_id: int, sql: str): + try: + return _ok(datasource_api.displaysql(datasource_id, sql)) + except Exception as e: + return _err(e) + +@mcp.tool() +def get_table_fields_info(datasource_id: int, table: str): + try: + return _ok(datasource_api.get_table_fields_info(datasource_id, table)) + except Exception as e: + return _err(e) + +@mcp.tool() +def get_table_statistic_info(datasource_id: int, table: str): + try: + return _ok(datasource_api.get_table_statistic_info(datasource_id, table)) + except Exception as e: + return _err(e) + + +# ========== Tasks ========== +@mcp.tool() +def execute(code: str, execute_user: str, engine_type: str = "spark"): + try: + return _ok(task_api.execute(code, execute_user, engine_type)) + except Exception as e: + return _err(e) + +@mcp.tool() +def submit(code: str, execute_user: str, engine_type: str = "spark"): + try: + return _ok(task_api.submit(code, execute_user, engine_type)) + except Exception as e: + return _err(e) + +@mcp.tool() +def status(exec_id: str): + try: + return _ok(task_api.status(exec_id)) + except Exception as e: + return _err(e) + +@mcp.tool() +def progress(exec_id: str): + try: + return _ok(task_api.progress(exec_id)) + except Exception as e: + return _err(e) + +@mcp.tool() +def progress_with_resource(exec_id: str): + try: + return _ok(task_api.progress_with_resource(exec_id)) + except Exception as e: + return _err(e) + +@mcp.tool() +def pause(exec_id: str): + try: + return _ok(task_api.pause(exec_id)) + except Exception as e: + return _err(e) + +@mcp.tool() +def kill(exec_id: str): + try: + return _ok(task_api.kill(exec_id)) + except Exception as e: + return _err(e) + +@mcp.tool() +def kill_jobs(exec_id: str): + try: + return _ok(task_api.kill_jobs(exec_id)) + except Exception as e: + return _err(e) + +@mcp.tool() +def runningtask(): + try: + return _ok(task_api.runningtask()) + except Exception as e: + return _err(e) + +@mcp.tool() +def taskinfo(): + try: + return _ok(task_api.taskinfo()) + except Exception as e: + return _err(e) + + +# ========== Knowledge QA ========== +@mcp.tool() +def qa_ask(question: str, top_k: int = 3): + try: + return _ok(qa.ask(question, top_k=top_k)) + except Exception as e: + return _err(e) + +@mcp.tool() +def qa_add(question: str, answer: str, tags: str = "", aliases: str = ""): + try: + entry = { + "question": question, + "answer": answer, + "tags": [t.strip() for t in tags.split(",") if t.strip()], + "aliases": [a.strip() for a in aliases.split(",") if a.strip()], + } + created = qa.add_entry(entry, save=True) + return _ok({"entry": created.__dict__}) + except Exception as e: + return _err(e) + +@mcp.tool() +def qa_topics(): + try: + return _ok({"topics": qa.topics()}) + except Exception as e: + return _err(e) + +@mcp.tool() +def qa_reload(): + try: + qa.reload() + return _ok({"reloaded": True}) + except Exception as e: + return _err(e) + + +if __name__ == "__main__": + print(f"[MCP] Starting Linkis MCP Server | ENV={LINKIS_ENV}") + mcp.run() diff --git a/linkis-mcp-server/metadata.py b/linkis-mcp-server/metadata.py new file mode 100644 index 00000000000..52021273674 --- /dev/null +++ b/linkis-mcp-server/metadata.py @@ -0,0 +1,30 @@ +from typing import Dict, Any +from linkis_client import LinkisClient +from config import API_PATHS + +class MetadataAPI: + def __init__(self, client: LinkisClient): + self.client = client + + def get_columns(self, database: str, table: str) -> Dict[str, Any]: + params = {"database": database, "table": table} + return self.client.get(API_PATHS["get_columns"], params) + + def get_databases(self) -> Dict[str, Any]: + return self.client.get(API_PATHS["get_databases"]) + + def get_tables(self, database: str) -> Dict[str, Any]: + params = {"database": database} + return self.client.get(API_PATHS["get_tables"], params) + + def get_partitions(self, database: str, table: str) -> Dict[str, Any]: + params = {"database": database, "table": table} + return self.client.get(API_PATHS["get_partitions"], params) + + def get_table_props(self, database: str, table: str) -> Dict[str, Any]: + params = {"database": database, "table": table} + return self.client.get(API_PATHS["get_table_props"], params) + + def get_partition_props(self, database: str, table: str, partition: str) -> Dict[str, Any]: + params = {"database": database, "table": table, "partition": partition} + return self.client.get(API_PATHS["get_partition_props"], params) diff --git a/linkis-mcp-server/readme.md b/linkis-mcp-server/readme.md new file mode 100644 index 00000000000..76d3380455e --- /dev/null +++ b/linkis-mcp-server/readme.md @@ -0,0 +1,74 @@ +# 📌 Linkis MCP Server + +## 📖 项目简介 +**Linkis MCP Server** 是一个基于 **Model Context Protocol (MCP)** 的服务端实现, +它将 **Linkis** 的元数据管理、数据源管理、任务调度与监控等核心能力,统一封装为标准化接口,并以 **MCP 工具**形式对外提供服务。 +这样,AI Agent、自动化脚本或其他客户端都能通过统一协议便捷调用、集成和扩展 Linkis 功能。 + +--- + +## 🎯 设计目标 +- **统一 API 接口层**:降低直接调用 Linkis REST API 的复杂度 +- **模块化结构**:便于扩展新功能模块 +- **健壮的错误处理机制**:保障生产环境的稳定性 +- **对 AI 友好**:天然适配 AI Agent 等智能系统 + +--- + +## 🛠 功能特点 +- **认证管理**:封装登录/登出等认证流程 +- **元数据 API**:查询库、表、字段、分区信息 +- **数据源 API**:管理、创建、修改、删除数据源 +- **任务管理 API**:任务提交、状态查询、进度监控、杀任务等 +- **标准化 MCP 工具**:便于跨应用调用 +- **统一错误处理**:所有接口返回格式统一,易于调试与接入 + +--- + +## 🏗 架构概览 +``` +linkis-mcp-server/ +├── mcp_server.py # MCP Server 主入口,注册所有工具 +├── linkis_client.py # 底层 HTTP 客户端 +├── auth.py # 认证/登录模块 +├── metadata.py # 元数据 API +├── knowledge.py # 知识问答 +├── datasource.py # 数据源集成 API +├── tasks.py # 任务和计算治理 API +├── config.py # 配置管理(环境变量、常量) +├── requirements.txt # Python 依赖 +├── README.md # 项目文档 +└──tests/ # 单元测试 + ├── test_auth_api.py + ├── test_metadata_api.py + ├── test_datasource_api.py + ├── test_tasks_api.py + ├── test_knowledge_api.py + └── test_mcp_tools.py + +``` +--- + +## 📦 安装与运行 + +```bash +# 1. 克隆代码仓库 +git clone https://your-repo-url.git +cd linkis-mcp-server + +# 2. 创建并启用虚拟环境 +python -m venv venv +source venv/bin/activate # macOS / Linux +venv\Scripts\activate # Windows + +# 3. 安装依赖 +pip install --upgrade pip +pip install -r requirements.txt + +# 4. 配置环境 +# 在 config.json 中设置 Linkis 地址、端口、Token 等信息 +nano config.json + +# 5. 启动 MCP 服务 +python mcp_server.py + diff --git a/linkis-mcp-server/requirements.txt b/linkis-mcp-server/requirements.txt new file mode 100644 index 00000000000..4e632947c1a --- /dev/null +++ b/linkis-mcp-server/requirements.txt @@ -0,0 +1,3 @@ +pytest~=7.4.0 +mcp~=1.12.2 +requests~=2.32.4 \ No newline at end of file diff --git a/linkis-mcp-server/tasks.py b/linkis-mcp-server/tasks.py new file mode 100644 index 00000000000..823afb804c0 --- /dev/null +++ b/linkis-mcp-server/tasks.py @@ -0,0 +1,53 @@ +from typing import Dict, Any +from linkis_client import LinkisClient +from config import API_PATHS + +class TaskAPI: + def __init__(self, client: LinkisClient): + self.client = client + + def execute(self, code: str, execute_user: str, engine_type: str = "spark") -> Dict[str, Any]: + payload = { + "executeUser": execute_user, + "executionCode": code, + "engineType": engine_type + } + return self.client.post(API_PATHS["execute"], payload) + + def submit(self, code: str, execute_user: str, engine_type: str = "spark") -> Dict[str, Any]: + payload = { + "executeUser": execute_user, + "executionCode": code, + "engineType": engine_type + } + return self.client.post(API_PATHS["submit"], payload) + + def kill(self, exec_id: str) -> Dict[str, Any]: + path = API_PATHS["kill"].format(id=exec_id) + return self.client.get(path) + + def kill_jobs(self, exec_id: str) -> Dict[str, Any]: + path = API_PATHS["kill_jobs"].format(id=exec_id) + return self.client.get(path) + + def pause(self, exec_id: str) -> Dict[str, Any]: + path = API_PATHS["pause"].format(id=exec_id) + return self.client.get(path) + + def progress(self, exec_id: str) -> Dict[str, Any]: + path = API_PATHS["progress"].format(id=exec_id) + return self.client.get(path) + + def progress_with_resource(self, exec_id: str) -> Dict[str, Any]: + path = API_PATHS["progress_with_resource"].format(id=exec_id) + return self.client.get(path) + + def status(self, exec_id: str) -> Dict[str, Any]: + path = API_PATHS["status"].format(id=exec_id) + return self.client.get(path) + + def runningtask(self) -> Dict[str, Any]: + return self.client.get(API_PATHS["runningtask"]) + + def taskinfo(self) -> Dict[str, Any]: + return self.client.get(API_PATHS["taskinfo"]) diff --git a/linkis-mcp-server/test/__init__.py b/linkis-mcp-server/test/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/linkis-mcp-server/test/test_auth_api.py b/linkis-mcp-server/test/test_auth_api.py new file mode 100644 index 00000000000..7da9dde2df1 --- /dev/null +++ b/linkis-mcp-server/test/test_auth_api.py @@ -0,0 +1,13 @@ +import pytest +from config import LINKIS_BASE_URL, LINKIS_TOKEN +from linkis_client import LinkisClient +from auth import AuthAPI + +@pytest.fixture(scope="module") +def auth_api(): + client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN) + return AuthAPI(client) + +def test_login_fail(auth_api): + with pytest.raises(Exception): + auth_api.login("fake_user", "wrong_password") diff --git a/linkis-mcp-server/test/test_datasource_api.py b/linkis-mcp-server/test/test_datasource_api.py new file mode 100644 index 00000000000..db8f0f8697c --- /dev/null +++ b/linkis-mcp-server/test/test_datasource_api.py @@ -0,0 +1,30 @@ +import pytest +from config import LINKIS_BASE_URL, LINKIS_TOKEN +from linkis_client import LinkisClient +from datasource import DataSourceAPI + +@pytest.fixture(scope="module") +def datasource_api(): + client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN) + return DataSourceAPI(client) + +def test_displaysql(datasource_api): + try: + res = datasource_api.displaysql(1, "CREATE TABLE test (id INT)") + assert isinstance(res, dict) + except Exception as e: + pytest.skip(f"displaysql skipped: {e}") + +def test_get_table_fields_info(datasource_api): + try: + res = datasource_api.get_table_fields_info(1, "some_table") + assert isinstance(res, list) + except Exception as e: + pytest.skip(f"get_table_fields_info skipped: {e}") + +def test_get_table_statistic_info(datasource_api): + try: + res = datasource_api.get_table_statistic_info(1, "some_table") + assert isinstance(res, dict) + except Exception as e: + pytest.skip(f"get_table_statistic_info skipped: {e}") diff --git a/linkis-mcp-server/test/test_knowledge_api.py b/linkis-mcp-server/test/test_knowledge_api.py new file mode 100644 index 00000000000..a4c4bd2884f --- /dev/null +++ b/linkis-mcp-server/test/test_knowledge_api.py @@ -0,0 +1,20 @@ +import pytest +from knowledge import KnowledgeQA + +@pytest.fixture(scope="module") +def qa(): + return KnowledgeQA() + +def test_qa_add_and_ask(qa): + qa.add_entry({ + "question": "测试问题", + "answer": "测试答案", + "tags": ["test"], + "aliases": ["测试别名"] + }, save=False) + results = qa.ask("测试问题", top_k=1) + assert results and results[0]["answer"] == "测试答案" + +def test_qa_topics(qa): + topics = qa.topics() + assert isinstance(topics, list) diff --git a/linkis-mcp-server/test/test_linkis_client.py b/linkis-mcp-server/test/test_linkis_client.py new file mode 100644 index 00000000000..eca8246880a --- /dev/null +++ b/linkis-mcp-server/test/test_linkis_client.py @@ -0,0 +1,24 @@ +import pytest +from config import LINKIS_BASE_URL, LINKIS_TOKEN +from linkis_client import LinkisClient, LinkisError + + +@pytest.fixture(scope="module") +def client(): + return LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN) + + +def test_client_init(client): + assert client.base_url == LINKIS_BASE_URL + assert client.token == LINKIS_TOKEN + assert hasattr(client, "_session") + + +def test_client_get_fail(client): + with pytest.raises(LinkisError): + client.get("/api/not_exist_path") + + +def test_client_post_fail(client): + with pytest.raises(LinkisError): + client.post("/api/not_exist_path", json_body={}) diff --git a/linkis-mcp-server/test/test_mcp_tools.py b/linkis-mcp-server/test/test_mcp_tools.py new file mode 100644 index 00000000000..2a0c4b86bf8 --- /dev/null +++ b/linkis-mcp-server/test/test_mcp_tools.py @@ -0,0 +1,14 @@ +import pytest +import mcp_server + +def test_mcp_tools_exist(): + tools = mcp_server.mcp.list_tools() + expected_tools = [ + "login", "get_databases", "get_tables", "get_columns", "get_partitions", + "get_table_props", "get_partition_props", "displaysql", + "get_table_fields_info", "get_table_statistic_info", "execute", "submit", + "status", "progress", "progress_with_resource", "pause", "kill", "kill_jobs", + "runningtask", "taskinfo", "qa_ask", "qa_add", "qa_topics", "qa_reload" + ] + for tool in expected_tools: + assert any(t.name == tool for t in tools) diff --git a/linkis-mcp-server/test/test_metadata_api.py b/linkis-mcp-server/test/test_metadata_api.py new file mode 100644 index 00000000000..451c0adf4ff --- /dev/null +++ b/linkis-mcp-server/test/test_metadata_api.py @@ -0,0 +1,51 @@ +import pytest +from config import LINKIS_BASE_URL, LINKIS_TOKEN +from linkis_client import LinkisClient +from metadata import MetadataAPI + +@pytest.fixture(scope="module") +def metadata_api(): + client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN) + return MetadataAPI(client) + +def test_get_databases(metadata_api): + try: + dbs = metadata_api.get_databases() + assert isinstance(dbs, list) + except Exception as e: + pytest.skip(f"get_databases skipped: {e}") + +def test_get_tables(metadata_api): + try: + tables = metadata_api.get_tables("default") + assert isinstance(tables, list) + except Exception as e: + pytest.skip(f"get_tables skipped: {e}") + +def test_get_columns(metadata_api): + try: + cols = metadata_api.get_columns("default", "some_table") + assert isinstance(cols, list) + except Exception as e: + pytest.skip(f"get_columns skipped: {e}") + +def test_get_partitions(metadata_api): + try: + parts = metadata_api.get_partitions("default", "some_partitioned_table") + assert isinstance(parts, list) + except Exception as e: + pytest.skip(f"get_partitions skipped: {e}") + +def test_get_table_props(metadata_api): + try: + props = metadata_api.get_table_props("default", "some_table") + assert isinstance(props, dict) + except Exception as e: + pytest.skip(f"get_table_props skipped: {e}") + +def test_get_partition_props(metadata_api): + try: + props = metadata_api.get_partition_props("default", "some_partitioned_table", "part_col=1") + assert isinstance(props, dict) + except Exception as e: + pytest.skip(f"get_partition_props skipped: {e}") diff --git a/linkis-mcp-server/test/test_tasks_api.py b/linkis-mcp-server/test/test_tasks_api.py new file mode 100644 index 00000000000..9f902c704b3 --- /dev/null +++ b/linkis-mcp-server/test/test_tasks_api.py @@ -0,0 +1,69 @@ +import pytest +from config import LINKIS_BASE_URL, LINKIS_TOKEN +from linkis_client import LinkisClient +from tasks import TaskAPI + +@pytest.fixture(scope="module") +def task_api(): + client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN) + return TaskAPI(client) + +def test_execute(task_api): + try: + res = task_api.execute("SELECT 1", "test_user") + assert isinstance(res, dict) + except Exception as e: + pytest.skip(f"execute skipped: {e}") + +def test_submit_status_kill(task_api): + try: + sub = task_api.submit("SELECT 1", "test_user") + assert isinstance(sub, dict) + exec_id = sub.get("execID") or sub.get("execId") + if exec_id: + st = task_api.status(exec_id) + assert isinstance(st, dict) + task_api.kill(exec_id) + except Exception as e: + pytest.skip(f"submit/status/kill skipped: {e}") + +def test_progress_methods(task_api): + try: + sub = task_api.submit("SELECT 1", "test_user") + exec_id = sub.get("execID") or sub.get("execId") + if exec_id: + prog = task_api.progress(exec_id) + prog_res = task_api.progress_with_resource(exec_id) + assert isinstance(prog, dict) + assert isinstance(prog_res, dict) + except Exception as e: + pytest.skip(f"progress skipped: {e}") + +def test_pause(task_api): + try: + sub = task_api.submit("SELECT 1", "test_user") + exec_id = sub.get("execID") or sub.get("execId") + if exec_id: + res = task_api.pause(exec_id) + assert isinstance(res, dict) + except Exception as e: + pytest.skip(f"pause skipped: {e}") + +def test_kill_jobs(task_api): + try: + sub = task_api.submit("SELECT 1", "test_user") + exec_id = sub.get("execID") or sub.get("execId") + if exec_id: + res = task_api.kill_jobs(exec_id) + assert isinstance(res, dict) + except Exception as e: + pytest.skip(f"kill_jobs skipped: {e}") + +def test_runningtask_taskinfo(task_api): + try: + run_info = task_api.runningtask() + ti = task_api.taskinfo() + assert isinstance(run_info, dict) + assert isinstance(ti, dict) + except Exception as e: + pytest.skip(f"runningtask/taskinfo skipped: {e}")