From a0a708f79cb29a97270c0421384c2bab4da82dfe Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 15 Jan 2026 21:06:39 +0800 Subject: [PATCH 01/14] feat: integrate Graph RAG client and enhance knowledge base with RAG type support --- .../DataManagementServiceConfiguration.java | 2 -- .../datamate/main/DataMateApplication.java | 2 ++ .../indexer/domain/model/KnowledgeBase.java | 6 +++++ .../infrastructure/client/GraphRagClient.java | 23 +++++++++++++++++++ .../infrastructure/event/RagEtlService.java | 9 ++++++++ .../dto/KnowledgeBaseCreateReq.java | 2 ++ .../rag/indexer/interfaces/dto/RagType.java | 19 +++++++++++++++ scripts/db/rag-management-init.sql | 2 ++ 8 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java create mode 100644 backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/RagType.java diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/DataManagementServiceConfiguration.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/DataManagementServiceConfiguration.java index 8e2b586cb..f502beb6d 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/DataManagementServiceConfiguration.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/DataManagementServiceConfiguration.java @@ -1,6 +1,5 @@ package com.datamate.datamanagement; -import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; @@ -10,7 +9,6 @@ * 数据管理服务配置类 - 多源接入、元数据、血缘治理 */ @Configuration -@EnableFeignClients(basePackages = "com.datamate.datamanagement.infrastructure.client") @EnableAsync @ComponentScan(basePackages = { "com.datamate.datamanagement", diff --git a/backend/services/main-application/src/main/java/com/datamate/main/DataMateApplication.java b/backend/services/main-application/src/main/java/com/datamate/main/DataMateApplication.java index 01a463109..0a0d5a655 100644 --- a/backend/services/main-application/src/main/java/com/datamate/main/DataMateApplication.java +++ b/backend/services/main-application/src/main/java/com/datamate/main/DataMateApplication.java @@ -4,6 +4,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCaching; +import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @@ -23,6 +24,7 @@ @EnableAsync @EnableScheduling @EnableCaching +@EnableFeignClients(basePackages = "com.datamate.*") public class DataMateApplication { public static void main(String[] args) { SpringApplication.run(DataMateApplication.class, args); diff --git a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/domain/model/KnowledgeBase.java b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/domain/model/KnowledgeBase.java index 4a571b3c3..b41922c1e 100644 --- a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/domain/model/KnowledgeBase.java +++ b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/domain/model/KnowledgeBase.java @@ -2,6 +2,7 @@ import com.baomidou.mybatisplus.annotation.TableName; import com.datamate.common.domain.model.base.BaseEntity; +import com.datamate.rag.indexer.interfaces.dto.RagType; import lombok.Getter; import lombok.Setter; @@ -25,6 +26,11 @@ public class KnowledgeBase extends BaseEntity { */ private String description; + /** + * RAG 类型 + */ + private RagType type; + /** * 嵌入模型 */ diff --git a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java new file mode 100644 index 000000000..02bd53faa --- /dev/null +++ b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java @@ -0,0 +1,23 @@ +package com.datamate.rag.indexer.infrastructure.client; + +import com.datamate.common.infrastructure.common.Response; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; + +/** + * 知识图谱RAG客户端 + * + * @author dallas + * @since 2026-01-15 + */ +@FeignClient(name = "rag-service", url = "${collection.service.url:http://datamate-backend-python:18000}") +public interface GraphRagClient { + /** + * 启动知识图谱RAG任务 + * @param knowledgeBaseId 知识库ID + * @return 任务详情 + */ + @PostMapping("/api/rag/{id}") + Response startGraphRagTask(@PathVariable("id") String knowledgeBaseId); +} diff --git a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/event/RagEtlService.java b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/event/RagEtlService.java index 355e4b2ce..4dbfac59b 100644 --- a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/event/RagEtlService.java +++ b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/event/RagEtlService.java @@ -8,8 +8,10 @@ import com.datamate.rag.indexer.domain.model.FileStatus; import com.datamate.rag.indexer.domain.model.RagFile; import com.datamate.rag.indexer.domain.repository.RagFileRepository; +import com.datamate.rag.indexer.infrastructure.client.GraphRagClient; import com.datamate.rag.indexer.infrastructure.milvus.MilvusService; import com.datamate.rag.indexer.interfaces.dto.AddFilesReq; +import com.datamate.rag.indexer.interfaces.dto.RagType; import com.google.common.collect.Lists; import dev.langchain4j.data.document.Document; import dev.langchain4j.data.document.DocumentParser; @@ -58,6 +60,8 @@ public class RagEtlService { private final ModelConfigRepository modelConfigRepository; + private final GraphRagClient graphRagClient; + private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); @Async @@ -65,6 +69,11 @@ public class RagEtlService { public void processAfterCommit(DataInsertedEvent event) { // 执行 RAG 处理流水线 List ragFiles = ragFileRepository.findNotSuccessByKnowledgeBaseId(event.knowledgeBase().getId()); + if (RagType.GRAPH.equals(event.knowledgeBase().getType())){ + log.info("Knowledge base {} is of type GRAPH. Skipping RAG ETL processing.", event.knowledgeBase().getName()); + graphRagClient.startGraphRagTask(event.knowledgeBase().getId()); + return; + } ragFiles.forEach(ragFile -> { try { diff --git a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/KnowledgeBaseCreateReq.java b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/KnowledgeBaseCreateReq.java index 1f07553a3..fd62dc625 100644 --- a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/KnowledgeBaseCreateReq.java +++ b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/KnowledgeBaseCreateReq.java @@ -28,6 +28,8 @@ public class KnowledgeBaseCreateReq { @Size(max = 512, message = "知识库描述长度必须在 0 到 512 之间") private String description; + private RagType type = RagType.DOCUMENT; + /** * 嵌入模型 */ diff --git a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/RagType.java b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/RagType.java new file mode 100644 index 000000000..a00737358 --- /dev/null +++ b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/interfaces/dto/RagType.java @@ -0,0 +1,19 @@ +package com.datamate.rag.indexer.interfaces.dto; + +/** + * RAG 类型 + * + * @author dallas + * @since 2026-01-15 + */ +public enum RagType { + /** + * 文档 + */ + DOCUMENT, + + /** + * 知识图谱 + */ + GRAPH, +} diff --git a/scripts/db/rag-management-init.sql b/scripts/db/rag-management-init.sql index 9486d2e29..1877284ef 100644 --- a/scripts/db/rag-management-init.sql +++ b/scripts/db/rag-management-init.sql @@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS t_rag_knowledge_base ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, + type VARCHAR(50) NOT NULL, description VARCHAR(512), embedding_model VARCHAR(255) NOT NULL, chat_model VARCHAR(255), @@ -19,6 +20,7 @@ CREATE TABLE IF NOT EXISTS t_rag_knowledge_base COMMENT ON TABLE t_rag_knowledge_base IS '知识库表'; COMMENT ON COLUMN t_rag_knowledge_base.id IS 'UUID'; COMMENT ON COLUMN t_rag_knowledge_base.name IS '知识库名称'; +COMMENT ON COLUMN t_rag_knowledge_base.type IS '知识库类型'; COMMENT ON COLUMN t_rag_knowledge_base.description IS '知识库描述'; COMMENT ON COLUMN t_rag_knowledge_base.embedding_model IS '嵌入模型'; COMMENT ON COLUMN t_rag_knowledge_base.chat_model IS '聊天模型'; From 39e50cec9d411fb0c2c2c3dace100a1547ab1b11 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 15 Jan 2026 22:35:44 +0800 Subject: [PATCH 02/14] feat: implement RAG management module with knowledge base and file models, and add processing endpoint --- .../app/db/models/knowledge_gen.py | 47 +++++++++++++ .../datamate-python/app/module/__init__.py | 2 + .../app/module/rag/__init__.py | 0 .../app/module/rag/interface/__init__.py | 0 .../app/module/rag/interface/rag_interface.py | 24 +++++++ .../app/module/rag/schema/rag_schema.py | 5 ++ .../app/module/rag/service/__init__.py | 0 .../app/module/rag/service/graph_rag.py | 60 +++++++++++++++++ .../app/module/rag/service/rag_service.py | 67 +++++++++++++++++++ 9 files changed, 205 insertions(+) create mode 100644 runtime/datamate-python/app/db/models/knowledge_gen.py create mode 100644 runtime/datamate-python/app/module/rag/__init__.py create mode 100644 runtime/datamate-python/app/module/rag/interface/__init__.py create mode 100644 runtime/datamate-python/app/module/rag/interface/rag_interface.py create mode 100644 runtime/datamate-python/app/module/rag/schema/rag_schema.py create mode 100644 runtime/datamate-python/app/module/rag/service/__init__.py create mode 100644 runtime/datamate-python/app/module/rag/service/graph_rag.py create mode 100644 runtime/datamate-python/app/module/rag/service/rag_service.py diff --git a/runtime/datamate-python/app/db/models/knowledge_gen.py b/runtime/datamate-python/app/db/models/knowledge_gen.py new file mode 100644 index 000000000..423ae2246 --- /dev/null +++ b/runtime/datamate-python/app/db/models/knowledge_gen.py @@ -0,0 +1,47 @@ +""" +Tables of RAG Management Module +""" +import uuid +from sqlalchemy import Column, String, TIMESTAMP, Text, Integer, JSON +from sqlalchemy.sql import func +from app.db.session import Base + + +class RagKnowledgeBase(Base): + """知识库模型""" + __tablename__ = "t_rag_knowledge_base" + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="UUID") + name = Column(String(255), nullable=False, comment="知识库名称") + type = Column(String(50), nullable=False, comment="知识库类型") + description = Column(String(512), nullable=True, comment="知识库描述") + embedding_model = Column(String(255), nullable=False, comment="嵌入模型") + chat_model = Column(String(255), nullable=True, comment="聊天模型") + created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="创建时间") + updated_at = Column(TIMESTAMP, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), + comment="更新时间") + created_by = Column(String(255), nullable=True, comment="创建者") + updated_by = Column(String(255), nullable=True, comment="更新者") + + def __repr__(self): + return f"" + + +class RagFile(Base): + """知识库文件模型""" + __tablename__ = "t_rag_file" + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="UUID") + knowledge_base_id = Column(String(36), nullable=False, comment="知识库ID") + file_name = Column(String(255), nullable=False, comment="文件名") + file_id = Column(String(255), nullable=False, comment="文件ID") + chunk_count = Column(Integer, nullable=True, comment="切片数") + file_metadata = Column("metadata", JSON, nullable=True, comment="元数据") + status = Column(String(50), nullable=True, comment="文件状态") + err_msg = Column(Text, nullable=True, comment="错误信息") + created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="创建时间") + updated_at = Column(TIMESTAMP, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), + comment="更新时间") + created_by = Column(String(255), nullable=True, comment="创建者") + updated_by = Column(String(255), nullable=True, comment="更新者") + diff --git a/runtime/datamate-python/app/module/__init__.py b/runtime/datamate-python/app/module/__init__.py index 74ae7c2d3..7d3c482b6 100644 --- a/runtime/datamate-python/app/module/__init__.py +++ b/runtime/datamate-python/app/module/__init__.py @@ -6,6 +6,7 @@ from .generation.interface import router as generation_router from .evaluation.interface import router as evaluation_router from .collection.interface import router as collection_route +from .rag.interface.rag_interface import router as rag_router router = APIRouter( prefix="/api" @@ -17,5 +18,6 @@ router.include_router(generation_router) router.include_router(evaluation_router) router.include_router(collection_route) +router.include_router(rag_router) __all__ = ["router"] diff --git a/runtime/datamate-python/app/module/rag/__init__.py b/runtime/datamate-python/app/module/rag/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/runtime/datamate-python/app/module/rag/interface/__init__.py b/runtime/datamate-python/app/module/rag/interface/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/runtime/datamate-python/app/module/rag/interface/rag_interface.py b/runtime/datamate-python/app/module/rag/interface/rag_interface.py new file mode 100644 index 000000000..4f299275c --- /dev/null +++ b/runtime/datamate-python/app/module/rag/interface/rag_interface.py @@ -0,0 +1,24 @@ +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.session import get_db +from app.module.rag.service.rag_service import RAGService +from app.module.shared.schema import StandardResponse + +router = APIRouter(prefix="/rag", tags=["rag"]) + +@router.post("/process/{knowledge_base_id}") +async def process_knowledge_base(knowledge_base_id: str, db: AsyncSession = Depends(get_db)): + """ + Process all unprocessed files in a knowledge base. + """ + try: + await RAGService(db).init_graph_rag(knowledge_base_id) + return StandardResponse( + code=200, + message="Processing started for knowledge base.", + data=None + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + diff --git a/runtime/datamate-python/app/module/rag/schema/rag_schema.py b/runtime/datamate-python/app/module/rag/schema/rag_schema.py new file mode 100644 index 000000000..e74eca309 --- /dev/null +++ b/runtime/datamate-python/app/module/rag/schema/rag_schema.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + +class ProcessRequest(BaseModel): + knowledge_base_id: str + diff --git a/runtime/datamate-python/app/module/rag/service/__init__.py b/runtime/datamate-python/app/module/rag/service/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/runtime/datamate-python/app/module/rag/service/graph_rag.py b/runtime/datamate-python/app/module/rag/service/graph_rag.py new file mode 100644 index 000000000..683d0af70 --- /dev/null +++ b/runtime/datamate-python/app/module/rag/service/graph_rag.py @@ -0,0 +1,60 @@ +import asyncio +import os +from typing import Awaitable, Callable, Optional + +import numpy as np +from lightrag import LightRAG, QueryParam +from lightrag.llm.openai import openai_embed, openai_complete_if_cache +from lightrag.utils import setup_logger, EmbeddingFunc + +setup_logger("lightrag", level="DEBUG") +DEFAULT_WORKING_DIR = "/rag_storage" + + +async def build_llm_model_func(model_name: str, base_url: str, api_key: str) -> Callable[..., Awaitable[str]]: + async def _llm_model( + prompt, system_prompt=None, history_messages=None, **kwargs + ) -> str: + history_messages = history_messages or [] + return await openai_complete_if_cache( + model_name, + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + api_key=api_key, + base_url=base_url, + **kwargs, + ) + + return _llm_model + + +async def build_embedding_func( + model_name: str, base_url: str, api_key: str, embedding_dim: int +) -> EmbeddingFunc: + async def _embedding_func(texts: list[str]) -> np.ndarray: + return await openai_embed.func( + texts, + model=model_name, + api_key=api_key, + base_url=base_url, + embedding_dim=embedding_dim, + ) + + return EmbeddingFunc(embedding_dim=embedding_dim, func=_embedding_func) + + +async def initialize_rag( + llm_callable: Callable[..., Awaitable[str]], + embedding_callable: EmbeddingFunc, + working_dir: Optional[str] = None, +): + target_dir = working_dir or DEFAULT_WORKING_DIR + os.makedirs(target_dir, exist_ok=True) + rag = LightRAG( + working_dir=target_dir, + llm_model_func=llm_callable, + embedding_func=embedding_callable, + ) + await rag.initialize_storages() + return rag diff --git a/runtime/datamate-python/app/module/rag/service/rag_service.py b/runtime/datamate-python/app/module/rag/service/rag_service.py new file mode 100644 index 000000000..3b864757f --- /dev/null +++ b/runtime/datamate-python/app/module/rag/service/rag_service.py @@ -0,0 +1,67 @@ +import os +from typing import Optional + +from fastapi import Depends +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.models.knowledge_gen import RagKnowledgeBase +from app.db.models.model_config import ModelConfig +from app.db.session import AsyncSessionLocal +from .graph_rag import ( + DEFAULT_WORKING_DIR, + build_embedding_func, + build_llm_model_func, + initialize_rag, +) + + +class RAGService: + def __init__( + self, + db: AsyncSession = Depends(AsyncSessionLocal), + ): + self.db = db + self.rag = None + + + async def get_unprocessed_files(self, knowledge_base_id: str) -> list[str]: + pass + + async def init_graph_rag(self, knowledge_base_id: str): + kb = await self._get_knowledge_base(knowledge_base_id) + embedding_model = await self._get_model_config(kb.embedding_model) + chat_model = await self._get_model_config(kb.chat_model) + + llm_callable = await build_llm_model_func( + chat_model.model_name, chat_model.base_url, chat_model.api_key + ) + embedding_callable = await build_embedding_func( + embedding_model.model_name, + embedding_model.base_url, + embedding_model.api_key, + embedding_dim=embedding_model.embedding_dim if hasattr(embedding_model, "embedding_dim") else 1024, + ) + + kb_working_dir = os.path.join(DEFAULT_WORKING_DIR, kb.name) + self.rag = await initialize_rag(llm_callable, embedding_callable, kb_working_dir) + return {"status": "initialized", "knowledge_base_id": knowledge_base_id} + + async def _get_knowledge_base(self, knowledge_base_id: str): + result = await self.db.execute( + select(RagKnowledgeBase).where(RagKnowledgeBase.id == knowledge_base_id) + ) + knowledge_base = result.scalars().first() + if not knowledge_base: + raise ValueError(f"Knowledge base with ID {knowledge_base_id} not found.") + return knowledge_base + + async def _get_model_config(self, model_id: Optional[str]): + if not model_id: + raise ValueError("Model ID is required for initializing RAG.") + result = await self.db.execute(select(ModelConfig).where(ModelConfig.id == model_id)) + model = result.scalars().first() + if not model: + raise ValueError(f"Model config with ID {model_id} not found.") + return model + From e6659fe4b550ee244f9f7c538bc7745458130e01 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 15 Jan 2026 22:53:04 +0800 Subject: [PATCH 03/14] feat: enhance RAG service with background processing for unprocessed files and improve file handling logic --- .../app/module/rag/service/graph_rag.py | 4 +- .../app/module/rag/service/rag_service.py | 65 ++++++++++++++++--- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/runtime/datamate-python/app/module/rag/service/graph_rag.py b/runtime/datamate-python/app/module/rag/service/graph_rag.py index 683d0af70..49ee8fb31 100644 --- a/runtime/datamate-python/app/module/rag/service/graph_rag.py +++ b/runtime/datamate-python/app/module/rag/service/graph_rag.py @@ -8,7 +8,7 @@ from lightrag.utils import setup_logger, EmbeddingFunc setup_logger("lightrag", level="DEBUG") -DEFAULT_WORKING_DIR = "/rag_storage" +DEFAULT_WORKING_DIR = os.path.join(os.getcwd(), "rag_storage") async def build_llm_model_func(model_name: str, base_url: str, api_key: str) -> Callable[..., Awaitable[str]]: @@ -33,7 +33,7 @@ async def build_embedding_func( model_name: str, base_url: str, api_key: str, embedding_dim: int ) -> EmbeddingFunc: async def _embedding_func(texts: list[str]) -> np.ndarray: - return await openai_embed.func( + return await openai_embed( texts, model=model_name, api_key=api_key, diff --git a/runtime/datamate-python/app/module/rag/service/rag_service.py b/runtime/datamate-python/app/module/rag/service/rag_service.py index 3b864757f..dba535d54 100644 --- a/runtime/datamate-python/app/module/rag/service/rag_service.py +++ b/runtime/datamate-python/app/module/rag/service/rag_service.py @@ -1,13 +1,16 @@ import os -from typing import Optional +from typing import Optional, Sequence -from fastapi import Depends +from fastapi import BackgroundTasks, Depends from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from app.db.models.knowledge_gen import RagKnowledgeBase +from app.core.logging import get_logger +from app.db.models.dataset_management import DatasetFiles +from app.db.models.knowledge_gen import RagFile, RagKnowledgeBase from app.db.models.model_config import ModelConfig -from app.db.session import AsyncSessionLocal +from app.db.session import get_db +from app.module.shared.common.document_loaders import load_documents from .graph_rag import ( DEFAULT_WORKING_DIR, build_embedding_func, @@ -15,18 +18,27 @@ initialize_rag, ) +logger = get_logger(__name__) + class RAGService: def __init__( self, - db: AsyncSession = Depends(AsyncSessionLocal), + db: AsyncSession = Depends(get_db), + background_tasks: BackgroundTasks | None = None, ): self.db = db + self.background_tasks = background_tasks self.rag = None - - async def get_unprocessed_files(self, knowledge_base_id: str) -> list[str]: - pass + async def get_unprocessed_files(self, knowledge_base_id: str) -> Sequence[RagFile]: + result = await self.db.execute( + select(RagFile).where( + RagFile.knowledge_base_id == knowledge_base_id, + RagFile.status != "PROCESSED", + ) + ) + return result.scalars().all() async def init_graph_rag(self, knowledge_base_id: str): kb = await self._get_knowledge_base(knowledge_base_id) @@ -45,8 +57,45 @@ async def init_graph_rag(self, knowledge_base_id: str): kb_working_dir = os.path.join(DEFAULT_WORKING_DIR, kb.name) self.rag = await initialize_rag(llm_callable, embedding_callable, kb_working_dir) + + if self.background_tasks is not None: + self.background_tasks.add_task(self._process_pending_files, knowledge_base_id) + else: + await self._process_pending_files(knowledge_base_id) + return {"status": "initialized", "knowledge_base_id": knowledge_base_id} + async def _process_pending_files(self, knowledge_base_id: str): + rag_files = await self.get_unprocessed_files(knowledge_base_id) + if not rag_files: + logger.info(f"No pending files to process for knowledge base {knowledge_base_id}") + return + + for rag_file in rag_files: + await self._process_single_file(rag_file) + + async def _process_single_file(self, rag_file: RagFile): + dataset_file = await self._get_dataset_file(rag_file.file_id) + documents = load_documents(dataset_file.file_path) + for doc in documents: + await self.rag.ainsert(text=doc.page_content) + await self._mark_file_processed(rag_file) + + async def _get_dataset_file(self, file_id: str) -> DatasetFiles: + result = await self.db.execute( + select(DatasetFiles).where(DatasetFiles.id == file_id) + ) + dataset_file = result.scalars().first() + if not dataset_file: + raise ValueError(f"Dataset file with ID {file_id} not found.") + return dataset_file + + async def _mark_file_processed(self, rag_file: RagFile): + rag_file.status = "PROCESSED" + self.db.add(rag_file) + await self.db.commit() + await self.db.refresh(rag_file) + async def _get_knowledge_base(self, knowledge_base_id: str): result = await self.db.execute( select(RagKnowledgeBase).where(RagKnowledgeBase.id == knowledge_base_id) From f7bd707b14f0fe5e11c809ee776db18c9bc67523 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Fri, 16 Jan 2026 00:15:16 +0800 Subject: [PATCH 04/14] feat: update GraphRagClient API endpoint and enhance error handling in rag_service --- .../infrastructure/client/GraphRagClient.java | 2 +- .../components/AddDataDialog.tsx | 1 + .../app/module/rag/service/graph_rag.py | 17 +++++++++------ .../app/module/rag/service/rag_service.py | 21 ++++++++++++++----- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java index 02bd53faa..924f4e7d3 100644 --- a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java +++ b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/infrastructure/client/GraphRagClient.java @@ -18,6 +18,6 @@ public interface GraphRagClient { * @param knowledgeBaseId 知识库ID * @return 任务详情 */ - @PostMapping("/api/rag/{id}") + @PostMapping("/api/rag/process/{id}") Response startGraphRagTask(@PathVariable("id") String knowledgeBaseId); } diff --git a/frontend/src/pages/KnowledgeBase/components/AddDataDialog.tsx b/frontend/src/pages/KnowledgeBase/components/AddDataDialog.tsx index 9fa964773..86826a802 100644 --- a/frontend/src/pages/KnowledgeBase/components/AddDataDialog.tsx +++ b/frontend/src/pages/KnowledgeBase/components/AddDataDialog.tsx @@ -15,6 +15,7 @@ import { addKnowledgeBaseFilesUsingPost } from "../knowledge-base.api"; import DatasetFileTransfer from "@/components/business/DatasetFileTransfer"; import { DescriptionsItemType } from "antd/es/descriptions"; import { DatasetFileCols } from "../knowledge-base.const"; +import { DatasetType } from "@/pages/DataManagement/dataset.model"; const sliceOptions = [ { label: "默认分块", value: "DEFAULT_CHUNK" }, diff --git a/runtime/datamate-python/app/module/rag/service/graph_rag.py b/runtime/datamate-python/app/module/rag/service/graph_rag.py index 49ee8fb31..f2edefa01 100644 --- a/runtime/datamate-python/app/module/rag/service/graph_rag.py +++ b/runtime/datamate-python/app/module/rag/service/graph_rag.py @@ -1,11 +1,12 @@ -import asyncio import os from typing import Awaitable, Callable, Optional import numpy as np -from lightrag import LightRAG, QueryParam +from lightrag import LightRAG +from lightrag.constants import DEFAULT_ENTITY_TYPES +from lightrag.kg.shared_storage import initialize_pipeline_status from lightrag.llm.openai import openai_embed, openai_complete_if_cache -from lightrag.utils import setup_logger, EmbeddingFunc +from lightrag.utils import setup_logger, EmbeddingFunc, get_env_value setup_logger("lightrag", level="DEBUG") DEFAULT_WORKING_DIR = os.path.join(os.getcwd(), "rag_storage") @@ -33,15 +34,14 @@ async def build_embedding_func( model_name: str, base_url: str, api_key: str, embedding_dim: int ) -> EmbeddingFunc: async def _embedding_func(texts: list[str]) -> np.ndarray: - return await openai_embed( + return await openai_embed.func( texts, model=model_name, api_key=api_key, base_url=base_url, - embedding_dim=embedding_dim, ) - return EmbeddingFunc(embedding_dim=embedding_dim, func=_embedding_func) + return EmbeddingFunc(embedding_dim=embedding_dim, func=_embedding_func, max_token_size=8192) async def initialize_rag( @@ -55,6 +55,11 @@ async def initialize_rag( working_dir=target_dir, llm_model_func=llm_callable, embedding_func=embedding_callable, + addon_params={ + "language": "Chinese", + "entity_types": get_env_value("ENTITY_TYPES", DEFAULT_ENTITY_TYPES, list), + } ) await rag.initialize_storages() + await initialize_pipeline_status() return rag diff --git a/runtime/datamate-python/app/module/rag/service/rag_service.py b/runtime/datamate-python/app/module/rag/service/rag_service.py index dba535d54..37938ecfd 100644 --- a/runtime/datamate-python/app/module/rag/service/rag_service.py +++ b/runtime/datamate-python/app/module/rag/service/rag_service.py @@ -75,10 +75,16 @@ async def _process_pending_files(self, knowledge_base_id: str): await self._process_single_file(rag_file) async def _process_single_file(self, rag_file: RagFile): - dataset_file = await self._get_dataset_file(rag_file.file_id) - documents = load_documents(dataset_file.file_path) - for doc in documents: - await self.rag.ainsert(text=doc.page_content) + try: + dataset_file = await self._get_dataset_file(rag_file.file_id) + documents = load_documents(dataset_file.file_path) + for doc in documents: + logger.info(f"Processing document {doc.page_content}") + await self.rag.ainsert(input=doc.page_content, file_paths=[dataset_file.file_path]) + except Exception: # noqa: BLE001 + logger.exception("Failed to process rag file %s", rag_file.id) + await self._mark_file_failed(rag_file) + return await self._mark_file_processed(rag_file) async def _get_dataset_file(self, file_id: str) -> DatasetFiles: @@ -96,6 +102,12 @@ async def _mark_file_processed(self, rag_file: RagFile): await self.db.commit() await self.db.refresh(rag_file) + async def _mark_file_failed(self, rag_file: RagFile): + rag_file.status = "PROCESS_FAILED" + self.db.add(rag_file) + await self.db.commit() + await self.db.refresh(rag_file) + async def _get_knowledge_base(self, knowledge_base_id: str): result = await self.db.execute( select(RagKnowledgeBase).where(RagKnowledgeBase.id == knowledge_base_id) @@ -113,4 +125,3 @@ async def _get_model_config(self, model_id: Optional[str]): if not model: raise ValueError(f"Model config with ID {model_id} not found.") return model - From 64ffcc63d2321a406168ed8ee317ee97ac5f51da Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Fri, 16 Jan 2026 00:47:20 +0800 Subject: [PATCH 05/14] feat: enhance knowledge base management with new types and custom entity support --- frontend/src/components/CardView.tsx | 63 +++++++++++++++-- frontend/src/mock/knowledgeBase.tsx | 31 ++++++++- .../components/CreateKnowledgeBase.tsx | 67 +++++++++++++++++-- .../KnowledgeBase/knowledge-base.const.tsx | 39 +++++++---- .../KnowledgeBase/knowledge-base.model.ts | 5 +- 5 files changed, 178 insertions(+), 27 deletions(-) diff --git a/frontend/src/components/CardView.tsx b/frontend/src/components/CardView.tsx index ace44d243..316df21a0 100644 --- a/frontend/src/components/CardView.tsx +++ b/frontend/src/components/CardView.tsx @@ -6,6 +6,12 @@ import { formatDateTime } from "@/utils/unit"; import ActionDropdown from "./ActionDropdown"; import { Database } from "lucide-react"; +interface BadgeItem { + label: string; + color?: string; + background?: string; +} + interface BaseCardDataType { id: string | number; name: string; @@ -18,7 +24,7 @@ interface BaseCardDataType { color?: string; } | null; description: string; - tags?: string[]; + tags?: Array; statistics?: { label: string; value: string | number }[]; updatedAt?: string; } @@ -47,7 +53,7 @@ interface CardViewProps { } // 标签渲染组件 -const TagsRenderer = ({ tags }: { tags?: any[] }) => { +const TagsRenderer = ({ tags }: { tags?: Array }) => { const [visibleTags, setVisibleTags] = useState([]); const [hiddenTags, setHiddenTags] = useState([]); const containerRef = useRef(null); @@ -75,7 +81,11 @@ const TagsRenderer = ({ tags }: { tags?: any[] }) => { const tagElement = document.createElement("span"); tagElement.className = "ant-tag ant-tag-default"; tagElement.style.margin = "2px"; - tagElement.textContent = typeof tag === "string" ? tag : tag.name; + if (typeof tag === "string") { + tagElement.textContent = tag; + } else { + tagElement.textContent = tag.label; + } tempDiv.appendChild(tagElement); tagElements.push(tagElement); @@ -126,7 +136,17 @@ const TagsRenderer = ({ tags }: { tags?: any[] }) => {
{hiddenTags.map((tag, index) => ( - {typeof tag === "string" ? tag : tag.name} + + {typeof tag === "string" ? tag : tag.label} + ))}
@@ -135,7 +155,17 @@ const TagsRenderer = ({ tags }: { tags?: any[] }) => { return (
{visibleTags.map((tag, index) => ( - {typeof tag === "string" ? tag : tag.name} + + {typeof tag === "string" ? tag : tag.label} + ))} {hiddenTags.length > 0 && ( (props: CardViewProps) { > {item?.name} + {(item?.tags?.length ?? 0) > 0 && + item.tags[0] && + typeof item.tags[0] !== "string" && ( + + {item.tags[0].label} + + )} {item?.status && (
@@ -242,7 +285,15 @@ function CardView(props: CardViewProps) {
{/* Tags */} - + + typeof tag === "string" || index !== 0 + ) + : [] + } + /> {/* Description */}

diff --git a/frontend/src/mock/knowledgeBase.tsx b/frontend/src/mock/knowledgeBase.tsx index b2b9fb59f..d249ca62b 100644 --- a/frontend/src/mock/knowledgeBase.tsx +++ b/frontend/src/mock/knowledgeBase.tsx @@ -129,7 +129,7 @@ export const mockKnowledgeBases: KnowledgeBase[] = [ name: "产品技术文档库", description: "包含所有产品相关的技术文档和API说明,支持多种格式文档的智能解析和向量化处理", - type: "unstructured", + type: "DOCUMENT", status: "ready", fileCount: 45, chunkCount: 1250, @@ -216,7 +216,7 @@ export const mockKnowledgeBases: KnowledgeBase[] = [ id: 2, name: "FAQ结构化知识库", description: "客服常见问题的结构化问答对,支持快速检索和智能匹配", - type: "structured", + type: "DOCUMENT", status: "vectorizing", fileCount: 12, chunkCount: 890, @@ -251,4 +251,31 @@ export const mockKnowledgeBases: KnowledgeBase[] = [ ], vectorizationHistory: [], }, + { + id: 3, + name: "企业知识图谱", + description: "包含企业人员、项目、合作伙伴等关系的知识图谱", + type: "GRAPH", + status: "ready", + fileCount: 8, + chunkCount: 420, + vectorCount: 0, + size: "32 MB", + progress: 100, + createdAt: "2024-01-10", + lastUpdated: "2024-01-25", + vectorDatabase: "neo4j", + customEntities: ["人员", "组织", "项目"], + config: { + embeddingModel: "text-embedding-3-large", + chunkSize: 256, + overlap: 20, + sliceMethod: "paragraph", + enableQA: true, + vectorDimension: 1536, + sliceOperators: ["semantic-split"], + }, + files: [], + vectorizationHistory: [], + }, ]; diff --git a/frontend/src/pages/KnowledgeBase/components/CreateKnowledgeBase.tsx b/frontend/src/pages/KnowledgeBase/components/CreateKnowledgeBase.tsx index c0a16f6a6..b1fe1abd8 100644 --- a/frontend/src/pages/KnowledgeBase/components/CreateKnowledgeBase.tsx +++ b/frontend/src/pages/KnowledgeBase/components/CreateKnowledgeBase.tsx @@ -1,5 +1,7 @@ -import { Button, Form, Input, message, Modal, Select } from "antd"; -import { PlusOutlined } from "@ant-design/icons"; +import { Button, Form, Input, message, Modal, Select, Tooltip } from "antd"; +import RadioCard from "@/components/RadioCard"; +import { InfoCircleOutlined, PlusOutlined } from "@ant-design/icons"; +import { Share2, BookOpen } from "lucide-react"; import { useEffect, useState } from "react"; import { useDispatch } from "react-redux"; import { queryModelListUsingGet } from "@/pages/SettingsPage/settings.apis"; @@ -8,7 +10,7 @@ import { createKnowledgeBaseUsingPost, updateKnowledgeBaseByIdUsingPut, } from "../knowledge-base.api"; -import { KnowledgeBaseItem } from "../knowledge-base.model"; +import { KnowledgeBaseItem, KBType } from "../knowledge-base.model"; import { showSettings } from "@/store/slices/settingsSlice"; export default function CreateKnowledgeBase({ @@ -60,18 +62,29 @@ export default function CreateKnowledgeBase({ description: data.description, embeddingModel: data.embeddingModel, chatModel: data.chatModel, + type: data.type ?? KBType.DOCUMENT, + customEntities: data.customEntities ?? [], }); + } else { + form.setFieldsValue({ type: KBType.DOCUMENT, customEntities: [] }); } }, [isEdit, data, form]); + const typeValue = Form.useWatch("type", form); + const isGraphKB = typeValue === KBType.GRAPH; + const handleCreateKnowledgeBase = async () => { try { const values = await form.validateFields(); + const payload = { + ...values, + customEntities: values.type === KBType.GRAPH ? values.customEntities : undefined, + }; if (isEdit && data) { - await updateKnowledgeBaseByIdUsingPut(data.id!, values); + await updateKnowledgeBaseByIdUsingPut(data.id!, payload); message.success("知识库更新成功"); } else { - await createKnowledgeBaseUsingPost(values); + await createKnowledgeBaseUsingPost(payload); message.success("知识库创建成功"); } setOpen(false); @@ -110,6 +123,50 @@ export default function CreateKnowledgeBase({ onOk={handleCreateKnowledgeBase} >

+ + form.setFieldValue("type", val)} + options={[ + { + value: KBType.DOCUMENT, + label: "向量知识库", + description: "适用于文档类知识,支持召回+生成", + icon: BookOpen, + }, + { + value: KBType.GRAPH, + label: "知识图谱", + description: "自定义实体/关系,强化结构化推理", + icon: Share2, + }, + ]} + /> + + {isGraphKB && ( + + 识别实体(可多选) + + + +
+ } + name="customEntities" + rules={[{ type: "array" }]} + > +