Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
import com.datamate.cleaning.interfaces.dto.*;
import com.datamate.common.domain.enums.EdgeType;
import com.datamate.common.domain.enums.NodeType;
import com.datamate.common.domain.model.LineageEdge;
import com.datamate.common.domain.model.LineageNode;
import com.datamate.common.domain.service.LineageService;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.common.interfaces.PagedResponse;
Expand Down Expand Up @@ -73,6 +78,8 @@ public class CleaningTaskService {

private final CleanTaskValidator cleanTaskValidator;

private final LineageService lineageService;

private final String DATASET_PATH = "/dataset";

private final String FLOW_PATH = "/flow";
Expand Down Expand Up @@ -134,6 +141,8 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
task.setBeforeSize(srcDataset.getSizeBytes());
task.setFileCount(srcDataset.getFileCount().intValue());
cleaningTaskRepo.insertTask(task);
// 记录血缘关系
addCleaningToGraph(srcDataset, task, destDataset);

operatorInstanceRepo.insertInstance(taskId, request.getInstance());
operatorRepo.incrementUsageCount(request.getInstance().stream()
Expand All @@ -146,6 +155,30 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
return task;
}

private void addCleaningToGraph(Dataset srcDataset, CleaningTaskDto task, Dataset destDataset) {
LineageNode fromNode = new LineageNode();
fromNode.setId(srcDataset.getId());
fromNode.setName(srcDataset.getName());
fromNode.setDescription(srcDataset.getDescription());
fromNode.setNodeType(NodeType.DATASET);

LineageNode toNode = new LineageNode();
toNode.setId(destDataset.getId());
toNode.setName(destDataset.getName());
toNode.setDescription(destDataset.getDescription());
toNode.setNodeType(NodeType.DATASET);

LineageEdge edge = new LineageEdge();
edge.setProcessId(task.getId());
edge.setName(task.getName());
edge.setDescription(task.getDescription());
edge.setEdgeType(EdgeType.DATA_CLEANING);
edge.setFromNodeId(fromNode.getId());
edge.setToNodeId(toNode.getId());

lineageService.generateGraph(fromNode, edge, toNode);
}

public CleaningTaskDto getTask(String taskId) {
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
setProcess(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.common.domain.enums.EdgeType;
import com.datamate.common.domain.enums.NodeType;
import com.datamate.common.domain.model.LineageEdge;
import com.datamate.common.domain.model.LineageNode;
import com.datamate.common.domain.service.LineageService;
import com.datamate.common.domain.utils.ChunksSaver;
import com.datamate.common.setting.application.SysParamApplicationService;
import com.datamate.datamanagement.interfaces.dto.*;
Expand All @@ -17,7 +22,6 @@
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
import com.datamate.datamanagement.interfaces.dto.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand Down Expand Up @@ -54,6 +58,7 @@ public class DatasetApplicationService {
private final CollectionTaskClient collectionTaskClient;
private final DatasetFileApplicationService datasetFileApplicationService;
private final SysParamApplicationService sysParamService;
private final LineageService lineageService;

@Value("${datamate.data-management.base-path:/dataset}")
private String datasetBasePath;
Expand All @@ -72,6 +77,8 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
dataset.setTags(processTagNames(createDatasetRequest.getTags()));
}
datasetRepository.save(dataset);
// 记录血缘关系
addDatasetToGraph(dataset, null);

//todo 需要解耦这块逻辑
if (StringUtils.hasText(createDatasetRequest.getDataSource())) {
Expand All @@ -81,6 +88,43 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
return dataset;
}

private void addDatasetToGraph(Dataset dataset, CollectionTaskDetailResponse collection) {
LineageNode datasetNode = new LineageNode();
datasetNode.setId(dataset.getId());
datasetNode.setNodeType(NodeType.DATASET);
datasetNode.setName(dataset.getName());
datasetNode.setDescription(dataset.getDescription());

LineageNode collectionNode = null;
LineageEdge collectionEdge = null;
if(Objects.nonNull(collection)) {
collectionNode = new LineageNode();
collectionNode.setId(collection.getId());
collectionNode.setName(collection.getName());
collectionNode.setDescription(collection.getDescription());
collectionNode.setNodeType(NodeType.DATASOURCE);

collectionEdge = new LineageEdge();
collectionEdge.setProcessId(collection.getId());
collectionEdge.setName(collection.getName());
collectionEdge.setEdgeType(EdgeType.DATA_COLLECTION);
collectionEdge.setDescription(dataset.getDescription());
collectionEdge.setFromNodeId(collectionNode.getId());
collectionEdge.setToNodeId(datasetNode.getId());
}
lineageService.generateGraph(collectionNode, collectionEdge, datasetNode);
}

public DatasetLineage getDatasetLineage(String datasetId) {
Dataset dataset = datasetRepository.getById(datasetId);
if (Objects.isNull(dataset)) {
return new DatasetLineage();
}
LineageNode datasetNode = lineageService.getNodeById(datasetId);
String graphId = datasetNode.getGraphId();
return new DatasetLineage(lineageService.getNodesByGraphId(graphId), lineageService.getEdgesByGraphId(graphId));
}

public String getDatasetPvcName() {
return sysParamService.getParamByKey(DATASET_PVC_NAME);
}
Expand All @@ -100,11 +144,11 @@ public Dataset updateDataset(String datasetId, UpdateDatasetRequest updateDatase
if (Objects.nonNull(updateDatasetRequest.getStatus())) {
dataset.setStatus(updateDatasetRequest.getStatus());
}
datasetRepository.updateById(dataset);
if (StringUtils.hasText(updateDatasetRequest.getDataSource())) {
// 数据源id不为空,使用异步线程进行文件扫盘落库
processDataSourceAsync(dataset.getId(), updateDatasetRequest.getDataSource());
}
datasetRepository.updateById(dataset);
return dataset;
}

Expand Down Expand Up @@ -261,6 +305,8 @@ private List<String> getFilePaths(String dataSourceId, Dataset dataset) {
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
return Collections.emptyList();
}
// 记录血缘关系
addDatasetToGraph(dataset, taskDetail);
Path targetPath = Paths.get(taskDetail.getTargetPath());
if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) {
log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.datamate.datamanagement.interfaces.dto;

import com.datamate.common.domain.model.LineageEdge;
import com.datamate.common.domain.model.LineageNode;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.List;

/**
* 数据集血缘
*
* @since 2026/1/23
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class DatasetLineage {
/**
* 节点列表
*/
private List<LineageNode> lineageNodes;
/**
* 边列表
*/
private List<LineageEdge> lineageEdges;
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public DatasetResponse updateDataset(@PathVariable("datasetId") String datasetId
return DatasetConverter.INSTANCE.convertToResponse(dataset);
}

@GetMapping("/{datasetId}/lineage")
public DatasetLineage getDatasetLineage(@PathVariable("datasetId") String datasetId) {
return datasetApplicationService.getDatasetLineage(datasetId);
}

/**
* 根据ID删除数据集
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.datamate.common.domain.enums;

/**
* 边类型:DATA_COLLECTION/DATA_CLEANING/DATA_LABELING/DATA_SYNTHESIS/DATA_RATIO
*
* @since 2026/1/23
*/
public enum EdgeType {
DATA_COLLECTION,
DATA_CLEANING,
DATA_LABELING,
DATA_SYNTHESIS,
DATA_RATIO
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.datamate.common.domain.enums;

/**
* 节点类型:DATASOURCE/DATASET/KNOWLEDGE_BASE/MODEL等
*
* @since 2026/1/23
*/
public enum NodeType {
DATASOURCE,
DATASET,
KNOWLEDGE_BASE,
MODEL
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.datamate.common.domain.model;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.common.domain.enums.EdgeType;
import lombok.Getter;
import lombok.Setter;

/**
* 数据血缘:边表
* 边表示处理流程(归集任务、数据清洗、数据标注、数据合成、数据配比等)
*
* @since 2026/1/23
*/

@Getter
@Setter
@TableName("t_lineage_edge")
public class LineageEdge {
/**
* 边ID
*/
@TableId(type = IdType.ASSIGN_ID)
private String id;
/**
* 图ID
*/
private String graphId;

/**
* 处理流程ID
*/
private String processId;

/**
* 边类型:DATA_COLLECTION/DATA_CLEANING/DATA_LABELING/DATA_SYNTHESIS/DATA_RATIO等
*/
private EdgeType edgeType;

/**
* 边名称
*/
private String name;

/**
* 边描述
*/
private String description;

/**
* 边扩展信息(JSON)
*/
private String edgeMetadata;

/**
* 源节点ID
*/
private String fromNodeId;

/**
* 目标节点ID
*/
private String toNodeId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.datamate.common.domain.model;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.common.domain.enums.NodeType;
import lombok.Getter;
import lombok.Setter;

/**
* 数据血缘:节点表
* 节点表示实体对象(归集来源、数据集、知识库、模型等)
*
* @since 2026/1/23
*/
@Getter
@Setter
@TableName("t_lineage_node")
public class LineageNode {
/**
* 节点ID
*/
@TableId(type = IdType.ASSIGN_ID)
private String id;

/**
* 图ID
*/
private String graphId;

/**
* 节点类型:DATASOURCE/DATASET/KNOWLEDGE_BASE/MODEL等
*/
private NodeType nodeType;

/**
* 节点名称
*/
private String name;

/**
* 节点描述
*/
private String description;

/**
* 节点扩展信息(JSON)
*/
private String nodeMetadata;
}
Loading
Loading