Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
Expand All @@ -22,17 +23,19 @@
import java.nio.charset.StandardCharsets;

/**
* 用户信息过滤器
* 鉴权过滤器
*
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class UserContextFilter implements GlobalFilter {
public class AuthFilter implements GlobalFilter {
private static final String AUTH_HEADER = "Authorization";

private static final String TOKEN_PREFIX = "Bearer ";

private static final String USER_HEADER = "User";

private final UserService userService;

@Value("${datamate.jwt.enable:false}")
Expand All @@ -55,10 +58,22 @@ public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return sendUnauthorizedResponse(exchange);
}
String token = authHeader.substring(TOKEN_PREFIX.length());
if (!userService.validateToken(token)) {
String user = userService.validateToken(token);
if (StringUtils.isBlank(user)) {
return sendUnauthorizedResponse(exchange);
}
return chain.filter(exchange);
// 4. 创建新的请求
ServerHttpRequest mutatedRequest = request.mutate()
.headers(httpHeaders -> {
// 或者直接操作headers
httpHeaders.add(USER_HEADER, user);
})
.build();
// 5. 使用新的请求创建新的exchange
ServerWebExchange mutatedExchange = exchange.mutate()
.request(mutatedRequest)
.build();
return chain.filter(mutatedExchange);
} catch (Exception e) {
log.error("get current user info error", e);
return sendUnauthorizedResponse(exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.datamate.gateway.domain.entity.User;
import com.datamate.gateway.domain.repository.UserRepository;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.*;
import io.jsonwebtoken.security.Keys;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -26,6 +24,8 @@
@Service
@RequiredArgsConstructor
public class UserService {
private static final String SYSTEM_USER = "system";

private final UserRepository userRepository;

@Value("${datamate.jwt.expiration-seconds:3600}")
Expand Down Expand Up @@ -70,12 +70,12 @@ private String generateToken(User user) {
.compact();
}

public boolean validateToken(String token) {
public String validateToken(String token) {
try {
Jwts.parser().setSigningKey(secret.getBytes()).parseClaimsJws(token);
return true;
Jws<Claims> claimsJws = Jwts.parserBuilder().setSigningKey(Keys.hmacShaKeyFor(secret.getBytes(StandardCharsets.UTF_8))).build().parseClaimsJws(token);
return claimsJws.getBody().getSubject();
} catch (JwtException | IllegalArgumentException ex) {
return false;
return null;
}
}

Expand All @@ -89,7 +89,7 @@ public Optional<User> register(RegisterRequest registerRequest) {
// Check if username already exists
LambdaQueryWrapper<User> usernameQuery = new LambdaQueryWrapper<>();
usernameQuery.eq(User::getUsername, registerRequest.getUsername());
if (userRepository.getOne(usernameQuery) != null) {
if (userRepository.getOne(usernameQuery) != null || SYSTEM_USER.equals(registerRequest.getUsername())) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
import com.datamate.common.domain.model.base.BaseEntity;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -14,9 +15,7 @@
@Getter
@Setter
@TableName(value = "t_clean_task", autoResultMap = true)
public class CleaningTask {
private String id;

public class CleaningTask extends BaseEntity<String> {
private String name;

private String description;
Expand All @@ -37,8 +36,6 @@ public class CleaningTask {

private Integer fileCount;

private LocalDateTime createdAt;

private LocalDateTime startedAt;

private LocalDateTime finishedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.cleaning.domain.model.entity.CleaningResult;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import org.apache.ibatis.annotations.Mapper;

@Mapper
@IgnoreDataScopeAnnotation
public interface CleaningResultMapper extends BaseMapper<CleaningResult> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
import com.datamate.cleaning.domain.model.entity.CleaningTemplate;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.util.List;

@Mapper
@IgnoreDataScopeAnnotation
public interface CleaningTemplateMapper extends BaseMapper<CleaningTemplate> {
@Select("SELECT t.id AS id, name, description, created_at, updated_at, created_by, operator_id, op_index, " +
"settings_override FROM t_clean_template t LEFT JOIN t_operator_instance o ON t.id = o.instance_id " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import com.datamate.operator.domain.model.OperatorView;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
Expand All @@ -10,6 +11,7 @@


@Mapper
@IgnoreDataScopeAnnotation
public interface OperatorInstanceMapper extends BaseMapper<OperatorInstance> {
@Select("SELECT o.operator_id as id, o.operator_name as name, o.description, o.version, o.inputs, o.outputs, " +
"o.runtime, o.settings, o.created_at, o.updated_at, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public AllDatasetStatisticsResponse getAllDatasetStatistics() {
public void processDataSourceAsync(String datasetId, String dataSourceId) {
try {
log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId);
List<String> filePaths = getFilePaths(dataSourceId);
List<String> filePaths = getFilePaths(dataSourceId, datasetRepository.getById(datasetId));
if (CollectionUtils.isEmpty(filePaths)) {
return;
}
Expand All @@ -255,8 +255,8 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) {
}
}

private List<String> getFilePaths(String dataSourceId) {
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
private List<String> getFilePaths(String dataSourceId, Dataset dataset) {
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId, dataset.getCreatedBy()).getData();
if (taskDetail == null) {
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public PagedResponse<DatasetFile> getDatasetFilesWithDirectory(String datasetId,

return new PagedResponse<>(page, size, total, totalPages, datasetFiles);
} catch (IOException e) {
log.error("list dataset path error", e);
log.warn("list dataset path error");
return PagedResponse.of(new Page<>(page, size));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;

/**
* 数据归集服务 Feign Client
Expand All @@ -18,5 +19,5 @@ public interface CollectionTaskClient {
* @return 任务详情
*/
@GetMapping("/api/data-collection/tasks/{id}")
Response<CollectionTaskDetailResponse> getTaskDetail(@PathVariable("id") String taskId);
Response<CollectionTaskDetailResponse> getTaskDetail(@PathVariable("id") String taskId, @RequestHeader("User") String authorization);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datamate.datamanagement.infrastructure.persistence.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
Expand All @@ -9,6 +10,7 @@
import java.util.List;

@Mapper
@IgnoreDataScopeAnnotation
public interface DatasetFileMapper extends BaseMapper<DatasetFile> {
DatasetFile findById(@Param("id") String id);
List<DatasetFile> findByDatasetId(@Param("datasetId") String datasetId, RowBounds rowBounds);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.datamate.main.filter;

import com.datamate.common.infrastructure.common.Response;
import com.datamate.common.infrastructure.config.DataScopeHandle;
import com.datamate.common.infrastructure.exception.CommonErrorCode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.servlet.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 用户信息过滤器
*
* @since 2026/1/19
*/
@Slf4j
@Component
public class UserContextFilter implements Filter {
private static final String USER_HEADER = "User";

@Value("${datamate.jwt.enable:false}")
private Boolean jwtEnable;

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {
try {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
String user = httpRequest.getHeader(USER_HEADER);
ObjectMapper objectMapper = new ObjectMapper();
if (jwtEnable && StringUtils.isBlank(user)) {
httpResponse.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
httpResponse.getWriter().write(objectMapper.writeValueAsString(Response.error(CommonErrorCode.UNAUTHORIZED)));
return;
}
DataScopeHandle.setUserInfo(user);
filterChain.doFilter(servletRequest, servletResponse);
} catch (IOException | ServletException e) {
log.error("Request failed!");
throw new RuntimeException(e);
} finally {
DataScopeHandle.removeUserInfo();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datamate.operator.domain.model;

import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.common.domain.model.base.BaseEntity;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -9,9 +10,7 @@
@Getter
@Setter
@TableName(value = "t_operator")
public class Operator {
private String id;

public class Operator extends BaseEntity<String> {
private String name;

private String description;
Expand All @@ -35,9 +34,5 @@ public class Operator {
private Integer usageCount;

private Boolean isStar;

private LocalDateTime createdAt;

private LocalDateTime updatedAt;
}

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private QueryWrapper<OperatorView> getQueryWrapper(String keyword, List<List<Str
}

queryWrapper.groupBy("operator_id", "operator_name", "description", "version", "inputs", "outputs",
"runtime", "settings", "is_star", "file_size", "usage_count", "created_at", "updated_at")
"runtime", "settings", "is_star", "file_size", "usage_count", "created_at", "updated_at", "created_by", "updated_by")
.having(!havingSql.isEmpty(), havingSql.toString())
.orderByDesc("created_at");
return queryWrapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.datamate.operator.infrastructure.persistence.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import com.datamate.operator.domain.model.Category;
import org.apache.ibatis.annotations.Mapper;

@Mapper
@IgnoreDataScopeAnnotation
public interface CategoryMapper extends BaseMapper<Category> {
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.datamate.operator.infrastructure.persistence.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import com.datamate.operator.domain.model.CategoryRelation;
import org.apache.ibatis.annotations.Mapper;

@Mapper
@IgnoreDataScopeAnnotation
public interface CategoryRelationMapper extends BaseMapper<CategoryRelation> {
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package com.datamate.operator.infrastructure.persistence.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import com.datamate.operator.domain.model.Operator;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;

@Mapper
public interface OperatorMapper extends BaseMapper<Operator> {

@IgnoreDataScopeAnnotation
@Select("SELECT count(1) FROM t_operator_instance oi JOIN t_clean_template t ON oi.instance_id = t.id " +
"WHERE oi.operator_id = #{operatorId}")
int operatorInTemplate(String operatorId);

@IgnoreDataScopeAnnotation
@Select("SELECT count(1) FROM t_operator_instance oi JOIN t_clean_task t ON oi.instance_id = t.id " +
"WHERE oi.operator_id = #{operatorId} AND t.status != 'COMPLETED'")
int operatorInUnstopTask(String operatorId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.datamate.operator.infrastructure.persistence.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.common.infrastructure.config.IgnoreDataScopeAnnotation;
import com.datamate.operator.domain.model.OperatorRelease;
import org.apache.ibatis.annotations.Mapper;

@Mapper
@IgnoreDataScopeAnnotation
public interface OperatorReleaseMapper extends BaseMapper<OperatorRelease> {
}
Loading
Loading