diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/entity/FieldTruncationResult.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/entity/FieldTruncationResult.java new file mode 100644 index 0000000000..6760d95d9d --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/entity/FieldTruncationResult.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.entity; + +import java.util.List; + +/** Result of field truncation detection and processing */ +public class FieldTruncationResult { + private boolean hasOversizedFields; + private List oversizedFields; + private Integer maxOversizedFieldCount; + private List data; + + public FieldTruncationResult() {} + + public FieldTruncationResult( + boolean hasOversizedFields, + List oversizedFields, + Integer maxOversizedFieldCount, + List data) { + this.hasOversizedFields = hasOversizedFields; + this.oversizedFields = oversizedFields; + this.maxOversizedFieldCount = maxOversizedFieldCount; + this.data = data; + } + + public boolean isHasOversizedFields() { + return hasOversizedFields; + } + + public void setHasOversizedFields(boolean hasOversizedFields) { + this.hasOversizedFields = hasOversizedFields; + } + + public List getOversizedFields() { + return oversizedFields; + } + + public void setOversizedFields(List oversizedFields) { + this.oversizedFields = oversizedFields; + } + + public Integer getMaxOversizedFieldCount() { + return maxOversizedFieldCount; + } + + public void setMaxOversizedFieldCount(Integer maxOversizedFieldCount) { + this.maxOversizedFieldCount = maxOversizedFieldCount; + } + + public List getData() { + return data; + } + + public void setData(List data) { + this.data = data; + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/entity/OversizedFieldInfo.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/entity/OversizedFieldInfo.java new file mode 100644 index 0000000000..f1fd1b985b --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/entity/OversizedFieldInfo.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.entity; + +/** Represents information about an oversized field in a result set */ +public class OversizedFieldInfo { + private String fieldName; + private Integer rowIndex; + private Integer actualLength; + private Integer maxLength; + + public OversizedFieldInfo() {} + + public OversizedFieldInfo( + String fieldName, Integer rowIndex, Integer actualLength, Integer maxLength) { + this.fieldName = fieldName; + this.rowIndex = rowIndex; + this.actualLength = actualLength; + this.maxLength = maxLength; + } + + public String getFieldName() { + return fieldName; + } + + public void setFieldName(String fieldName) { + this.fieldName = fieldName; + } + + public Integer getRowIndex() { + return rowIndex; + } + + public void setRowIndex(Integer rowIndex) { + this.rowIndex = rowIndex; + } + + public Integer getActualLength() { + return actualLength; + } + + public void setActualLength(Integer actualLength) { + this.actualLength = actualLength; + } + + public Integer getMaxLength() { + return maxLength; + } + + public void setMaxLength(Integer maxLength) { + this.maxLength = maxLength; + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/ResultUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/ResultUtils.java new file mode 100644 index 0000000000..3b8a051143 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/ResultUtils.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.utils; + +import org.apache.linkis.common.io.FsWriter; +import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.entity.FieldTruncationResult; +import org.apache.linkis.storage.entity.OversizedFieldInfo; +import org.apache.linkis.storage.excel.StorageExcelWriter; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; +import org.apache.linkis.storage.source.FileSource; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.math3.util.Pair; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.*; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResultUtils { + + public static final Logger LOGGER = LoggerFactory.getLogger(ResultUtils.class); + /** + * 删除指定字段的内容 + * + * @param metadata 元数据数组,包含字段信息 + * @param contentList 需要处理的二维字符串数组 + * @param fieldsToRemove 需要删除的字段集合 + * @return 处理后的字符串数组,若输入无效返回空集合而非null + */ + @SuppressWarnings("unchecked") + public static List removeFieldsFromContent( + Object metadata, List contentList, Set fieldsToRemove) { + // 1. 参数校验 + if (metadata == null + || fieldsToRemove == null + || fieldsToRemove.isEmpty() + || contentList == null + || !(metadata instanceof Map[])) { + return contentList; + } + + // 2. 安全类型转换 + Map[] fieldMetadata = (Map[]) metadata; + + // 3. 收集需要删除的列索引(去重并排序) + List columnsToRemove = + IntStream.range(0, fieldMetadata.length) + .filter( + i -> { + Map meta = fieldMetadata[i]; + Object columnName = meta.get("columnName"); + return columnName != null + && fieldsToRemove.contains(columnName.toString().toLowerCase()); + }) + .distinct() + .boxed() + .sorted((a, b) -> Integer.compare(b, a)) + .collect(Collectors.toList()); + + // 如果没有需要删除的列,直接返回副本 + if (columnsToRemove.isEmpty()) { + return new ArrayList<>(contentList); + } + // 4. 对每行数据进行处理(删除指定列) + return contentList.stream() + .map( + row -> { + if (row == null || row.length == 0) { + return row; + } + // 创建可变列表以便删除元素 + List rowList = new ArrayList<>(Arrays.asList(row)); + // 从后向前删除列,避免索引变化问题 + for (int columnIndex : columnsToRemove) { + if (columnIndex < rowList.size()) { + rowList.remove(columnIndex); + LOGGER.info("MaskedField Remove Data columnIndex:" + columnIndex); + } + } + return rowList.toArray(new String[0]); + }) + .collect(Collectors.toList()); + } + + @SuppressWarnings("unchecked") + public static Map[] filterMaskedFieldsFromMetadata(Object metadata, Set maskedFields) { + // 1. 参数校验 + if (metadata == null || maskedFields == null || !(metadata instanceof Map[])) { + return new Map[0]; + } + + // 2. 类型转换(已通过校验,可安全强转) + Map[] originalMaps = (Map[]) metadata; + + // 3. 过滤逻辑(提取谓词增强可读性) + Predicate> isNotMaskedField = + map -> !maskedFields.contains(map.get("columnName").toString().toLowerCase()); + + // 4. 流处理 + 结果转换 + return Arrays.stream(originalMaps) + .filter(isNotMaskedField) + .toArray(Map[]::new); // 等价于 toArray(new Map[0]) + } + + /** + * Convert Map array to TableMetaData + * + * @param metadataArray Array of Map containing column information + * @return TableMetaData object + */ + @SuppressWarnings("unchecked") + public static TableMetaData convertMapArrayToTableMetaData(Map[] metadataArray) { + if (metadataArray == null || metadataArray.length == 0) { + return new TableMetaData(new org.apache.linkis.storage.domain.Column[0]); + } + + org.apache.linkis.storage.domain.Column[] columns = + new org.apache.linkis.storage.domain.Column[metadataArray.length]; + + for (int i = 0; i < metadataArray.length; i++) { + Map columnMap = (Map) metadataArray[i]; + String columnName = + columnMap.get("columnName") != null ? columnMap.get("columnName").toString() : ""; + String dataType = + columnMap.get("dataType") != null ? columnMap.get("dataType").toString() : "string"; + String comment = columnMap.get("comment") != null ? columnMap.get("comment").toString() : ""; + + // Create Column object + org.apache.linkis.storage.domain.DataType dtype = + org.apache.linkis.storage.domain.DataType$.MODULE$.toDataType(dataType); + columns[i] = new org.apache.linkis.storage.domain.Column(columnName, dtype, comment); + } + + return new TableMetaData(columns); + } + + public static void dealMaskedField( + String maskedFieldNames, FsWriter fsWriter, FileSource fileSource) throws IOException { + + LOGGER.info("Applying field masking for fields: {}", maskedFieldNames); + + // Parse masked field names + Set maskedFieldsSet = + Arrays.stream(maskedFieldNames.split(",")) + .map(String::trim) + .map(String::toLowerCase) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toSet()); + + // Collect data from file source + Pair>[] collectedData = fileSource.collect(); + + // Process each result set + for (int i = 0; i < collectedData.length; i++) { + Pair> collectedDataSet = collectedData[i]; + Object metadata = collectedDataSet.getFirst(); + ArrayList content = collectedDataSet.getSecond(); + + // Filter metadata and content + Map[] filteredMetadata = filterMaskedFieldsFromMetadata(metadata, maskedFieldsSet); + List filteredContent = removeFieldsFromContent(metadata, content, maskedFieldsSet); + + // Convert Map[] to TableMetaData + TableMetaData tableMetaData = convertMapArrayToTableMetaData(filteredMetadata); + + // Write filtered data + fsWriter.addMetaData(tableMetaData); + for (String[] row : filteredContent) { + fsWriter.addRecord(new TableRecord(row)); + } + LOGGER.info( + "Field masking applied for result set {}. Original columns: {}, Filtered columns: {}", + i, + ((Map[]) metadata).length, + filteredMetadata.length); + } + } + + /** + * Detect and handle oversized fields in result set + * + * @param metadata Column names list + * @param FileContent Data rows list (each row is an ArrayList or Object[]) + * @param truncate Whether to truncate (false means detection only) + * @return FieldTruncationResult containing detection results and processed data + */ + public static FieldTruncationResult detectAndHandle( + Object metadata, List FileContent, Integer maxLength, boolean truncate) { + if (metadata == null || !(metadata instanceof Map[])) { + return new FieldTruncationResult(); + } + + // 2. 类型转换(已通过校验,可安全强转) + Map[] originalMaps = (Map[]) metadata; + + // 提取列名 + List columnNames = new ArrayList<>(); + if (metadata != null) { + for (Map meta : originalMaps) { + Object columnName = meta.get("columnName"); + columnNames.add(columnName != null ? columnName.toString() : ""); + } + } + + // 转换 String[] 数组为 ArrayList + List> dataList = new ArrayList<>(); + for (String[] row : FileContent) { + ArrayList rowList = new ArrayList<>(Arrays.asList(row)); + dataList.add(rowList); + } + + int maxCount = LinkisStorageConf.OVERSIZED_FIELD_MAX_COUNT(); + + // Detect oversized fields + List oversizedFields = + detectOversizedFields(columnNames, dataList, maxLength, maxCount); + + boolean hasOversizedFields = !oversizedFields.isEmpty(); + + // Truncate if requested + List> processedData = dataList; + LOGGER.info( + "Staring Field truncation detection function ,truncate is {},hasOversizedFields {}", + truncate, + hasOversizedFields); + if (truncate && hasOversizedFields) { + processedData = truncateFields(columnNames, dataList, maxLength); + } + List convertedList = + processedData.stream() + .map(row -> row != null ? row.toArray(new String[0]) : null) + .collect(Collectors.toList()); + return new FieldTruncationResult(hasOversizedFields, oversizedFields, maxCount, convertedList); + } + + public static void detectAndHandle( + StorageExcelWriter fsWriter, FileSource fileSource, Integer maxLength) throws IOException { + // Collect data from file source + Pair>[] collectedData = fileSource.collect(); + + // Process each result set + for (int i = 0; i < collectedData.length; i++) { + Pair> collectedDataSet = collectedData[i]; + Object metadata = collectedDataSet.getFirst(); + ArrayList content = collectedDataSet.getSecond(); + + FieldTruncationResult fieldTruncationResult = + detectAndHandle(metadata, content, maxLength, true); + + List data = fieldTruncationResult.getData(); + + // Convert Map[] to TableMetaData and add truncation markers for oversized fields + TableMetaData tableMetaData = + convertMapArrayToTableMetaData((Map[]) metadata); + // Create a set of oversized field names for quick lookup + List oversizedFieldNames = + fieldTruncationResult.getOversizedFields().stream() + .map(OversizedFieldInfo::getFieldName) + .distinct() + .collect(Collectors.toList()); + // If there are oversized fields, add markers to column names in the metadata + if (fieldTruncationResult.isHasOversizedFields() + && fieldTruncationResult.getOversizedFields() != null) { + // Update column names to indicate truncation with max length + org.apache.linkis.storage.domain.Column[] columns = tableMetaData.columns(); + for (int j = 0; j < columns.length; j++) { + if (oversizedFieldNames.contains(columns[j].columnName())) { + // Get the max length for this field + String truncatedInfo = + maxLength != null ? "(truncated to " + maxLength + " chars)" : "(truncated)"; + // Create a new column with truncation info suffix to indicate truncation + columns[j] = + new org.apache.linkis.storage.domain.Column( + columns[j].columnName() + truncatedInfo, + columns[j].dataType(), + columns[j].comment()); + } + } + + // Create new TableMetaData with updated column names + tableMetaData = new TableMetaData(columns); + } + // Write filtered data + if (oversizedFieldNames.isEmpty()) { + fsWriter.addMetaData(tableMetaData); + } else { + StringJoiner joiner = new StringJoiner(","); + oversizedFieldNames.forEach(joiner::add); + String note = + MessageFormat.format( + "结果集存在字段值超过{0}字符,无法全量下载,以下字段截取前{0}字符展示:{1}", + LinkisStorageConf.FIELD_EXPORT_DOWNLOAD_LENGTH(), joiner); + fsWriter.addMetaDataWithNote(tableMetaData, note); + } + for (String[] row : data) { + fsWriter.addRecord(new TableRecord(row)); + } + } + } + + /** + * Detect oversized fields + * + * @param metadata Column names + * @param dataList Data rows + * @param maxLength Max length threshold + * @param maxCount Max number of oversized fields to collect + * @return List of oversized field info + */ + private static List detectOversizedFields( + List metadata, List> dataList, int maxLength, int maxCount) { + + List oversizedFields = new ArrayList<>(); + + if (metadata == null || dataList == null || dataList.isEmpty()) { + return oversizedFields; + } + + // 使用Set来存储已经检查过的超长字段名,避免重复检查 + Set detectedOversizedFields = new HashSet<>(); + + // Iterate through data rows + for (int rowIndex = 0; rowIndex < dataList.size(); rowIndex++) { + + ArrayList row = dataList.get(rowIndex); + if (row == null) { + continue; + } + + // Check each field in the row + for (int colIndex = 0; colIndex < row.size() && colIndex < metadata.size(); colIndex++) { + + String fieldName = metadata.get(colIndex); + + // 如果该字段已经被检测为超长字段,则跳过检查,提高效率 + if (detectedOversizedFields.contains(fieldName)) { + continue; + } + + String fieldValue = row.get(colIndex); + int fieldLength = getFieldLength(fieldValue); + + if (fieldLength > maxLength) { + oversizedFields.add(new OversizedFieldInfo(fieldName, rowIndex, fieldLength, maxLength)); + // 将超长字段名加入Set,避免重复检查 + detectedOversizedFields.add(fieldName); + LOGGER.info( + "Detected oversized field: field={}, row={}, actualLength={}, maxLength={}", + fieldName, + rowIndex, + fieldLength, + maxLength); + } + } + } + + return oversizedFields; + } + + /** + * Truncate oversized fields + * + * @param metadata Column names + * @param dataList Data rows + * @param maxLength Max length + * @return Truncated data list + */ + private static List> truncateFields( + List metadata, List> dataList, int maxLength) { + + if (dataList == null || dataList.isEmpty()) { + return dataList; + } + + List> truncatedData = new ArrayList<>(); + + for (ArrayList row : dataList) { + if (row == null) { + truncatedData.add(null); + continue; + } + + ArrayList truncatedRow = new ArrayList<>(); + for (String fieldValue : row) { + String truncatedValue = truncateFieldValue(fieldValue, maxLength); + truncatedRow.add(truncatedValue); + } + truncatedData.add(truncatedRow); + } + + return truncatedData; + } + + /** + * Get field value character length + * + * @param value Field value + * @return Character length + */ + private static int getFieldLength(Object value) { + if (value == null) { + return 0; + } + return value.toString().length(); + } + + /** + * Truncate single field value + * + * @param value Field value + * @param maxLength Max length + * @return Truncated value + */ + private static String truncateFieldValue(Object value, int maxLength) { + if (value == null) { + return null; + } + String str = value.toString(); + if (str.length() <= maxLength) { + return str; + } + return str.substring(0, maxLength); + } + + /** + * Apply both field masking and truncation + * + * @param maskedFieldNames Comma-separated list of field names to mask + * @param fsWriter The FsWriter to write results to + * @param fileSource The FileSource to read data from + * @throws IOException + */ + public static void applyFieldMaskingAndTruncation( + String maskedFieldNames, + StorageExcelWriter fsWriter, + FileSource fileSource, + Integer maxLength) + throws IOException { + + LOGGER.info("Applying both field masking and truncation"); + // First collect data from file source + Pair>[] collectedData = fileSource.collect(); + + // Process each result set + for (int i = 0; i < collectedData.length; i++) { + Pair> collectedDataSet = collectedData[i]; + Object metadata = collectedDataSet.getFirst(); + ArrayList content = collectedDataSet.getSecond(); + + // Apply field masking + Set maskedFieldsSet = + Arrays.stream(maskedFieldNames.split(",")) + .map(String::trim) + .map(String::toLowerCase) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toSet()); + + Map[] filteredMetadata = filterMaskedFieldsFromMetadata(metadata, maskedFieldsSet); + List filteredContent = removeFieldsFromContent(metadata, content, maskedFieldsSet); + + // Then apply field truncation + FieldTruncationResult fieldTruncationResult = + detectAndHandle(filteredMetadata, filteredContent, maxLength, true); + List finalData = fieldTruncationResult.getData(); + + // Write data + TableMetaData tableMetaData = convertMapArrayToTableMetaData(filteredMetadata); + // Create a set of oversized field names for quick lookup + List oversizedFieldNames = + fieldTruncationResult.getOversizedFields().stream() + .map(OversizedFieldInfo::getFieldName) + .distinct() + .collect(Collectors.toList()); + // If there are oversized fields, add markers to column names in the metadata + if (fieldTruncationResult.isHasOversizedFields() + && fieldTruncationResult.getOversizedFields() != null) { + // Update column names to indicate truncation with max length + org.apache.linkis.storage.domain.Column[] columns = tableMetaData.columns(); + for (int j = 0; j < columns.length; j++) { + if (oversizedFieldNames.contains(columns[j].columnName())) { + // Get the max length for this field + String truncatedInfo = + maxLength != null + ? MessageFormat.format(LinkisStorageConf.FIELD_TRUNCATION_NOTE(), maxLength) + : LinkisStorageConf.FIELD_NOT_TRUNCATION_NOTE(); + // Create a new column with truncation info suffix to indicate truncation + columns[j] = + new org.apache.linkis.storage.domain.Column( + columns[j].columnName() + truncatedInfo, + columns[j].dataType(), + columns[j].comment()); + } + } + + // Create new TableMetaData with updated column names + tableMetaData = new TableMetaData(columns); + } + if (oversizedFieldNames.isEmpty()) { + fsWriter.addMetaData(tableMetaData); + } else { + StringJoiner joiner = new StringJoiner(","); + oversizedFieldNames.forEach(joiner::add); + String note = + MessageFormat.format( + LinkisStorageConf.FIELD_OPEN_FILE_TRUNCATION_NOTE(), + LinkisStorageConf.FIELD_EXPORT_DOWNLOAD_LENGTH(), + joiner); + fsWriter.addMetaDataWithNote(tableMetaData, note); + } + for (String[] row : finalData) { + fsWriter.addRecord(new TableRecord(row)); + } + LOGGER.info( + "Field masking and truncation applied for result set {}. Original columns: {}, Filtered columns: {}, Truncated fields: {}", + i, + ((Map[]) metadata).length, + filteredMetadata.length, + fieldTruncationResult.getOversizedFields().size()); + } + } +} diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala index 50c60fecd2..a28ede48df 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala @@ -83,4 +83,33 @@ object LinkisStorageConf { val LINKIS_READ_ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(LINKIS__READ_RESULT_ROW_MAX_LEN_STR) + val FIELD_TRUNCATION_ENABLED = + CommonVars("linkis.resultset.field.truncation.enabled", false).getValue + + val FIELD_MASKED_ENABLED = + CommonVars("linkis.resultset.field.masked.enabled", false).getValue + + val FIELD_VIEW_MAX_LENGTH = CommonVars("linkis.resultset.field.view.max.length", 10000).getValue + + val FIELD_EXPORT_DOWNLOAD_LENGTH = + CommonVars("linkis.resultset.field.download.max.length", 32767).getValue + + val FIELD_EXPORT_MAX_LENGTH = + CommonVars("linkis.resultset.field.export.max.length", 32767).getValue + + val OVERSIZED_FIELD_MAX_COUNT = + CommonVars("linkis.resultset.field.oversized.max.count", 20).getValue + + val FIELD_TRUNCATION_NOTE = + CommonVars("linkis.resultset.field.truncation.excel.note", "(truncated to {0} chars)").getValue + + val FIELD_NOT_TRUNCATION_NOTE = + CommonVars("linkis.resultset.field.truncation.default.note", "(truncated)").getValue + + val FIELD_OPEN_FILE_TRUNCATION_NOTE = + CommonVars( + "linkis.resultset.field.truncation.note", + "结果集存在字段值超过{0}字符,无法全量下载,以下字段截取前{0}字符展示:{1}" + ).getValue + } diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala index 9ea83130df..8baeb792e2 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala @@ -24,6 +24,7 @@ import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord} import org.apache.commons.io.IOUtils import org.apache.poi.ss.usermodel._ +import org.apache.poi.ss.util.CellRangeAddress import org.apache.poi.xssf.streaming.{SXSSFCell, SXSSFSheet, SXSSFWorkbook} import java.io._ @@ -68,6 +69,15 @@ class StorageExcelWriter( headerCellStyle } + def getWarningStyle: CellStyle = { + val warningFont = workBook.createFont + warningFont.setBold(true) + warningFont.setColor(IndexedColors.RED.getIndex) + val warningCellStyle = workBook.createCellStyle + warningCellStyle.setFont(warningFont) + warningCellStyle + } + def getWorkBook: Workbook = { // 自适应列宽 sheet.trackAllColumnsForAutoSizing() @@ -133,6 +143,38 @@ class StorageExcelWriter( rowPoint += 1 } + @scala.throws[IOException] + def addMetaDataWithNote(metaData: MetaData, note: String): Unit = { + init + // 创建公告行(第0行) + val noticeRow = sheet.createRow(0) + val noticeCell = noticeRow.createCell(0) + noticeCell.setCellValue(note) + noticeCell.setCellStyle(getWarningStyle) + + // 合并单元格(从第0列到最后一列) + val columns = metaData.asInstanceOf[TableMetaData].columns + val columnCount = columns.length + if (columnCount > 1) { + val mergeRange = new CellRangeAddress(0, 0, 0, columnCount - 1) + sheet.addMergedRegion(mergeRange) + } + + // 创建表头行(第1行) + val tableHead = sheet.createRow(1) + val columnType = new ArrayBuffer[DataType]() + columnCounter = 0 + for (elem <- columns) { + val headCell = tableHead.createCell(columnCounter) + headCell.setCellValue(elem.columnName) + headCell.setCellStyle(getDefaultHeadStyle) + columnType += elem.dataType + columnCounter += 1 + } + types = columnType.toArray + rowPoint += 2 // 由于增加了公告行,所以行指针需要增加2(公告行和表头行) + } + @scala.throws[IOException] override def addRecord(record: Record): Unit = { // TODO: 是否需要替换null值 diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala index 86d09e9532..323d816c9b 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala @@ -127,14 +127,16 @@ class TableResultDeserializer extends ResultDeserializer[TableMetaData, TableRec val len = colArray(i).toInt val res = Dolphin.getString(bytes, index, len) if (res.length > LinkisStorageConf.LINKIS_RESULT_COL_LENGTH && enableLimit) { - throw new ColLengthExceedException( - LinkisStorageErrorCodeSummary.RESULT_COL_LENGTH.getErrorCode, - MessageFormat.format( - LinkisStorageErrorCodeSummary.RESULT_COL_LENGTH.getErrorDesc, - res.length.asInstanceOf[Object], - LinkisStorageConf.LINKIS_RESULT_COL_LENGTH.asInstanceOf[Object] + if (!LinkisStorageConf.FIELD_TRUNCATION_ENABLED) { + throw new ColLengthExceedException( + LinkisStorageErrorCodeSummary.RESULT_COL_LENGTH.getErrorCode, + MessageFormat.format( + LinkisStorageErrorCodeSummary.RESULT_COL_LENGTH.getErrorDesc, + res.length.asInstanceOf[Object], + LinkisStorageConf.LINKIS_RESULT_COL_LENGTH.asInstanceOf[Object] + ) ) - ) + } } index += len // 如果enableLimit为true,则采取的是列分页 diff --git a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/constant/PipeLineConstant.scala b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/constant/PipeLineConstant.scala index 77c2a3a8a1..477addcfff 100644 --- a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/constant/PipeLineConstant.scala +++ b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/constant/PipeLineConstant.scala @@ -30,4 +30,7 @@ object PipeLineConstant { "wds.linkis.engine.pipeline.field.quote.retoch.enable" val BLANK = "BLANK" + + val PIPELINE_MASKED_CONF = "pipeline.masked.field.names" + } diff --git a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/CSVExecutor.scala b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/CSVExecutor.scala index 68b5010f17..9f4a7833fa 100644 --- a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/CSVExecutor.scala +++ b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/CSVExecutor.scala @@ -18,6 +18,7 @@ package org.apache.linkis.manager.engineplugin.pipeline.executor import org.apache.linkis.common.io.FsPath +import org.apache.linkis.common.utils.Logging import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfiguration.{ PIPELINE_FIELD_QUOTE_RETOUCH_ENABLE, @@ -25,6 +26,7 @@ import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfig PIPELINE_OUTPUT_CHARSET_STR, PIPELINE_OUTPUT_ISOVERWRITE_SWITCH } +import org.apache.linkis.manager.engineplugin.pipeline.constant.PipeLineConstant import org.apache.linkis.manager.engineplugin.pipeline.constant.PipeLineConstant._ import org.apache.linkis.manager.engineplugin.pipeline.errorcode.PopelineErrorCodeSummary._ import org.apache.linkis.manager.engineplugin.pipeline.exception.PipeLineErrorException @@ -32,13 +34,15 @@ import org.apache.linkis.scheduler.executer.ExecuteResponse import org.apache.linkis.storage.FSFactory import org.apache.linkis.storage.csv.CSVFsWriter import org.apache.linkis.storage.source.FileSource +import org.apache.linkis.storage.utils.ResultUtils import org.apache.linkis.storage.utils.StorageConfiguration.STORAGE_RS_FILE_SUFFIX import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils import java.io.OutputStream -class CSVExecutor extends PipeLineExecutor { +class CSVExecutor extends PipeLineExecutor with Logging { override def execute( sourcePath: String, @@ -57,35 +61,58 @@ class CSVExecutor extends PipeLineExecutor { NOT_A_RESULT_SET_FILE.getErrorDesc ) } + + // Extract masked field names from options + val maskedFieldNames = options.getOrDefault(PipeLineConstant.PIPELINE_MASKED_CONF, "") + val sourceFsPath = new FsPath(sourcePath) val destFsPath = new FsPath(destPath) val sourceFs = FSFactory.getFs(sourceFsPath) sourceFs.init(null) val destFs = FSFactory.getFs(destFsPath) destFs.init(null) - val fileSource = FileSource.create(sourceFsPath, sourceFs) - if (!FileSource.isTableResultSet(fileSource)) { - throw new PipeLineErrorException( - ONLY_RESULT_CONVERTED_TO_CSV.getErrorCode, - ONLY_RESULT_CONVERTED_TO_CSV.getErrorDesc + + try { + val fileSource = FileSource.create(sourceFsPath, sourceFs) + if (!FileSource.isTableResultSet(fileSource)) { + throw new PipeLineErrorException( + ONLY_RESULT_CONVERTED_TO_CSV.getErrorCode, + ONLY_RESULT_CONVERTED_TO_CSV.getErrorDesc + ) + } + + var nullValue = options.getOrDefault(PIPELINE_OUTPUT_SHUFFLE_NULL_TYPE, "NULL") + if (BLANK.equalsIgnoreCase(nullValue)) nullValue = "" + + val outputStream: OutputStream = + destFs.write(destFsPath, PIPELINE_OUTPUT_ISOVERWRITE_SWITCH.getValue(options)) + OutputStreamCache.osCache.put(engineExecutionContext.getJobId.get, outputStream) + + val cSVFsWriter = CSVFsWriter.getCSVFSWriter( + PIPELINE_OUTPUT_CHARSET_STR.getValue(options), + PIPELINE_FIELD_SPLIT_STR.getValue(options), + PIPELINE_FIELD_QUOTE_RETOUCH_ENABLE.getValue(options), + outputStream ) + + try { + if (StringUtils.isNotBlank(maskedFieldNames)) { + // Apply field masking if maskedFieldNames is provided + ResultUtils.dealMaskedField(maskedFieldNames, cSVFsWriter, fileSource) + } else { + // Original stream write logic + logger.info("No field masking, using stream write for CSV export") + fileSource.addParams("nullValue", nullValue).write(cSVFsWriter) + } + } finally { + IOUtils.closeQuietly(cSVFsWriter) + IOUtils.closeQuietly(fileSource) + } + } finally { + IOUtils.closeQuietly(sourceFs) + IOUtils.closeQuietly(destFs) } - var nullValue = options.getOrDefault(PIPELINE_OUTPUT_SHUFFLE_NULL_TYPE, "NULL") - if (BLANK.equalsIgnoreCase(nullValue)) nullValue = "" - val outputStream: OutputStream = - destFs.write(destFsPath, PIPELINE_OUTPUT_ISOVERWRITE_SWITCH.getValue(options)) - OutputStreamCache.osCache.put(engineExecutionContext.getJobId.get, outputStream) - val cSVFsWriter = CSVFsWriter.getCSVFSWriter( - PIPELINE_OUTPUT_CHARSET_STR.getValue(options), - PIPELINE_FIELD_SPLIT_STR.getValue(options), - PIPELINE_FIELD_QUOTE_RETOUCH_ENABLE.getValue(options), - outputStream - ) - fileSource.addParams("nullValue", nullValue).write(cSVFsWriter) - IOUtils.closeQuietly(cSVFsWriter) - IOUtils.closeQuietly(fileSource) - IOUtils.closeQuietly(sourceFs) - IOUtils.closeQuietly(destFs) + super.execute(sourcePath, destPath, engineExecutionContext) } diff --git a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala index 42c0e27cd2..3d6d924c4e 100644 --- a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala +++ b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala @@ -18,33 +18,41 @@ package org.apache.linkis.manager.engineplugin.pipeline.executor import org.apache.linkis.common.io.FsPath -import org.apache.linkis.common.utils.ResultSetUtils +import org.apache.linkis.common.utils.{Logging, ResultSetUtils} import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfiguration import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfiguration.PIPELINE_OUTPUT_ISOVERWRITE_SWITCH +import org.apache.linkis.manager.engineplugin.pipeline.constant.PipeLineConstant import org.apache.linkis.manager.engineplugin.pipeline.constant.PipeLineConstant._ import org.apache.linkis.manager.engineplugin.pipeline.errorcode.PopelineErrorCodeSummary._ import org.apache.linkis.manager.engineplugin.pipeline.exception.PipeLineErrorException import org.apache.linkis.scheduler.executer.ExecuteResponse import org.apache.linkis.storage.FSFactory -import org.apache.linkis.storage.excel.{ExcelFsWriter, StorageMultiExcelWriter} +import org.apache.linkis.storage.conf.LinkisStorageConf +import org.apache.linkis.storage.conf.LinkisStorageConf.FIELD_TRUNCATION_ENABLED +import org.apache.linkis.storage.excel.{ExcelFsWriter, StorageExcelWriter, StorageMultiExcelWriter} import org.apache.linkis.storage.fs.FileSystem import org.apache.linkis.storage.source.FileSource +import org.apache.linkis.storage.utils.ResultUtils import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils import java.io.OutputStream import java.util -import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.JavaConverters._ -class ExcelExecutor extends PipeLineExecutor { +class ExcelExecutor extends PipeLineExecutor with Logging { override def execute( sourcePath: String, destPath: String, engineExecutorContext: EngineExecutionContext ): ExecuteResponse = { + // Extract masked field names from options + val maskedFieldNames = options.getOrDefault(PipeLineConstant.PIPELINE_MASKED_CONF, "") + var fileSource: FileSource = null var excelFsWriter: ExcelFsWriter = null val sourceFsPath = new FsPath(sourcePath) @@ -53,56 +61,93 @@ class ExcelExecutor extends PipeLineExecutor { sourceFs.init(null) val destFs = FSFactory.getFs(destFsPath) destFs.init(null) - val outputStream: OutputStream = - destFs.write(destFsPath, PIPELINE_OUTPUT_ISOVERWRITE_SWITCH.getValue(options)) - val paramsMap = new util.HashMap[String, String]() - engineExecutorContext.getProperties.asScala - .filter(_._2 != null) - .map(kv => (kv._1, kv._2.toString)) - .foreach(kv => paramsMap.put(kv._1, kv._2)) - val excelAutoFormat = PipelineEngineConfiguration.EXPORT_EXCEL_AUTO_FORMAT.getValue(paramsMap) - if (sourcePath.contains(".")) { - // sourcePaht 是文件形式 - // TODO: fs 加目录判断 - if (!FileSource.isResultSet(sourcePath)) { + + try { + val outputStream: OutputStream = + destFs.write(destFsPath, PIPELINE_OUTPUT_ISOVERWRITE_SWITCH.getValue(options)) + val paramsMap = new util.HashMap[String, String]() + engineExecutorContext.getProperties.asScala + .filter(_._2 != null) + .map(kv => (kv._1, kv._2.toString)) + .foreach(kv => paramsMap.put(kv._1, kv._2)) + val excelAutoFormat = + PipelineEngineConfiguration.EXPORT_EXCEL_AUTO_FORMAT.getValue(paramsMap) + if (sourcePath.contains(".")) { + // sourcePaht 是文件形式 + // TODO: fs 加目录判断 + if (!FileSource.isResultSet(sourcePath)) { + throw new PipeLineErrorException( + NOT_A_RESULT_SET_FILE.getErrorCode, + NOT_A_RESULT_SET_FILE.getErrorDesc + ) + } + fileSource = FileSource.create(sourceFsPath, sourceFs) + excelFsWriter = ExcelFsWriter.getExcelFsWriter( + DEFAULTC_HARSET, + DEFAULT_SHEETNAME, + DEFAULT_DATEFORMATE, + outputStream, + excelAutoFormat + ) + } else { + // 目录形式 + excelFsWriter = new StorageMultiExcelWriter(outputStream, excelAutoFormat) + val fsPathListWithError = + sourceFs.asInstanceOf[FileSystem].listPathWithError(sourceFsPath) + if (fsPathListWithError == null) { + throw new PipeLineErrorException(EMPTY_DIR.getErrorCode, EMPTY_DIR.getErrorDesc) + } + val fsPathList = fsPathListWithError.getFsPaths + ResultSetUtils.sortByNameNum(fsPathList) + fileSource = FileSource.create(fsPathList.toArray(Array[FsPath]()), sourceFs) + } + if (!FileSource.isTableResultSet(fileSource)) { throw new PipeLineErrorException( - NOT_A_RESULT_SET_FILE.getErrorCode, - NOT_A_RESULT_SET_FILE.getErrorDesc + ONLY_RESULT_CONVERTED_TO_EXCEL.getErrorCode, + ONLY_RESULT_CONVERTED_TO_EXCEL.getErrorDesc ) } - fileSource = FileSource.create(sourceFsPath, sourceFs) - excelFsWriter = ExcelFsWriter.getExcelFsWriter( - DEFAULTC_HARSET, - DEFAULT_SHEETNAME, - DEFAULT_DATEFORMATE, - outputStream, - excelAutoFormat - ) - } else { - // 目录形式 - excelFsWriter = new StorageMultiExcelWriter(outputStream, excelAutoFormat) - val fsPathListWithError = sourceFs.asInstanceOf[FileSystem].listPathWithError(sourceFsPath) - if (fsPathListWithError == null) { - throw new PipeLineErrorException(EMPTY_DIR.getErrorCode, EMPTY_DIR.getErrorDesc) + + var nullValue = options.getOrDefault(PIPELINE_OUTPUT_SHUFFLE_NULL_TYPE, "NULL") + if (BLANK.equalsIgnoreCase(nullValue)) nullValue = "" + OutputStreamCache.osCache.put(engineExecutorContext.getJobId.get, outputStream) + + try { + // Apply field masking if maskedFieldNames is provided + fileSource.addParams("nullValue", nullValue) + // 如果同时提供了字段屏蔽和字段截取参数,则先执行字段屏蔽,再执行字段截取 + if (StringUtils.isNotBlank(maskedFieldNames) && FIELD_TRUNCATION_ENABLED) { + // 同时执行字段屏蔽和字段截取 + ResultUtils.applyFieldMaskingAndTruncation( + maskedFieldNames, + excelFsWriter.asInstanceOf[StorageExcelWriter], + fileSource, + LinkisStorageConf.FIELD_EXPORT_MAX_LENGTH + ) + } else if (StringUtils.isNotBlank(maskedFieldNames)) { + // 只执行字段屏蔽 + ResultUtils.dealMaskedField(maskedFieldNames, excelFsWriter, fileSource) + } else if (FIELD_TRUNCATION_ENABLED) { + // 只执行字段截取 + ResultUtils.detectAndHandle( + excelFsWriter.asInstanceOf[StorageExcelWriter], + fileSource, + LinkisStorageConf.FIELD_EXPORT_MAX_LENGTH + ) + } else { + // Original stream write logic + logger.info("No field masking, using stream write for Excel export") + fileSource.write(excelFsWriter) + } + } finally { + IOUtils.closeQuietly(excelFsWriter) + IOUtils.closeQuietly(fileSource) } - val fsPathList = fsPathListWithError.getFsPaths - ResultSetUtils.sortByNameNum(fsPathList) - fileSource = FileSource.create(fsPathList.toArray(Array[FsPath]()), sourceFs) + } finally { + IOUtils.closeQuietly(sourceFs) + IOUtils.closeQuietly(destFs) } - if (!FileSource.isTableResultSet(fileSource)) { - throw new PipeLineErrorException( - ONLY_RESULT_CONVERTED_TO_EXCEL.getErrorCode, - ONLY_RESULT_CONVERTED_TO_EXCEL.getErrorDesc - ) - } - var nullValue = options.getOrDefault(PIPELINE_OUTPUT_SHUFFLE_NULL_TYPE, "NULL") - if (BLANK.equalsIgnoreCase(nullValue)) nullValue = "" - OutputStreamCache.osCache.put(engineExecutorContext.getJobId.get, outputStream) - fileSource.addParams("nullValue", nullValue).write(excelFsWriter) - IOUtils.closeQuietly(excelFsWriter) - IOUtils.closeQuietly(fileSource) - IOUtils.closeQuietly(sourceFs) - IOUtils.closeQuietly(destFs) + super.execute(sourcePath, destPath, engineExecutorContext) } diff --git a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala index da1bd0e6e1..f22e2ce6ba 100644 --- a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipelineEngineConnExecutor.scala @@ -30,12 +30,14 @@ import org.apache.linkis.manager.common.entity.resource.{ } import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils +import org.apache.linkis.manager.engineplugin.pipeline.constant.PipeLineConstant import org.apache.linkis.manager.engineplugin.pipeline.errorcode.PopelineErrorCodeSummary._ import org.apache.linkis.manager.engineplugin.pipeline.exception.PipeLineErrorException import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.rpc.Sender import org.apache.linkis.scheduler.executer.ExecuteResponse +import org.apache.linkis.storage.conf.LinkisStorageConf import java.util @@ -66,11 +68,25 @@ class PipelineEngineConnExecutor(val id: Int) extends ComputationExecutor with L newOptions.put(keyAndValue._1, keyAndValue._2.toString) } newOptions.asScala.foreach({ case (k, v) => logger.info(s"key is $k, value is $v") }) - val regex = "(?i)\\s*from\\s+(\\S+)\\s+to\\s+(\\S+)\\s?".r + + // Regex patterns for Pipeline syntax + val regexWithMask = "(?i)\\s*from\\s+(\\S+)\\s+to\\s+(\\S+)\\s+without\\s+\"([^\"]+)\"\\s*".r + val regexNormal = "(?i)\\s*from\\s+(\\S+)\\s+to\\s+(\\S+)\\s*".r + try { thread = Thread.currentThread() code match { - case regex(sourcePath, destPath) => + case regexWithMask(sourcePath, destPath, maskedFields) => + logger.info(s"Pipeline execution with field masking: $maskedFields") + val enhancedOptions = new util.HashMap[String, String](newOptions) + if (LinkisStorageConf.FIELD_MASKED_ENABLED) { + enhancedOptions.put(PipeLineConstant.PIPELINE_MASKED_CONF, maskedFields) + } + PipelineExecutorSelector + .select(sourcePath, destPath, enhancedOptions.asInstanceOf[util.Map[String, String]]) + .execute(sourcePath, destPath, engineExecutorContext) + case regexNormal(sourcePath, destPath) => + logger.info("Pipeline execution without field masking") PipelineExecutorSelector .select(sourcePath, destPath, newOptions.asInstanceOf[util.Map[String, String]]) .execute(sourcePath, destPath, engineExecutorContext) diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 0cf8d8334f..90a60de51b 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -40,12 +40,14 @@ import org.apache.linkis.storage.conf.LinkisStorageConf; import org.apache.linkis.storage.csv.CSVFsWriter; import org.apache.linkis.storage.domain.FsPathListWithError; +import org.apache.linkis.storage.entity.FieldTruncationResult; import org.apache.linkis.storage.excel.*; import org.apache.linkis.storage.exception.ColLengthExceedException; import org.apache.linkis.storage.fs.FileSystem; import org.apache.linkis.storage.script.*; import org.apache.linkis.storage.source.FileSource; import org.apache.linkis.storage.source.FileSource$; +import org.apache.linkis.storage.utils.ResultUtils; import org.apache.linkis.storage.utils.StorageUtils; import org.apache.commons.collections.CollectionUtils; @@ -71,9 +73,7 @@ import java.text.MessageFormat; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.IntStream; import com.fasterxml.jackson.databind.JsonNode; import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport; @@ -635,7 +635,8 @@ public Message openFile( @RequestParam(value = "columnPage", required = false, defaultValue = "1") Integer columnPage, @RequestParam(value = "columnPageSize", required = false, defaultValue = "500") Integer columnPageSize, - @RequestParam(value = "maskedFieldNames", required = false) String maskedFieldNames) + @RequestParam(value = "maskedFieldNames", required = false) String maskedFieldNames, + @RequestParam(value = "truncateColumn", required = false) String truncateColumn) throws IOException, WorkSpaceException { Message message = Message.ok(); @@ -663,6 +664,23 @@ public Message openFile( int[] columnIndices = null; FileSource fileSource = null; + String zh_msg = + MessageFormat.format( + "结果集存在字段值字符数超过{0},如需查看全部数据请导出文件或使用字符串截取函数(substring、substr)截取相关字符即可前端展示数据内容", + LinkisStorageConf.LINKIS_RESULT_COL_LENGTH()); + String en_msg = + MessageFormat.format( + "There is a field value exceed {0} characters or col size exceed {1} in the result set. If you want to view it, please use the result set export function.", + LinkisStorageConf.LINKIS_RESULT_COL_LENGTH(), + LinkisStorageConf.LINKIS_RESULT_COLUMN_SIZE()); + String truncateColumn_msg = + MessageFormat.format( + "结果集存在字段值字符数超过{0},如需查看全部数据请导出文件或确认截取展示数据内容", + LinkisStorageConf.LINKIS_RESULT_COL_LENGTH()); + String truncateColumn_en_msg = + MessageFormat.format( + "The result set contains field values exceeding {0} characters. To view the full data, please export the file or confirm the displayed content is truncated", + LinkisStorageConf.LINKIS_RESULT_COL_LENGTH()); try { fileSource = FileSource$.MODULE$.create(fsPath, fileSystem); if (nullValue != null && BLANK.equalsIgnoreCase(nullValue)) { @@ -737,16 +755,65 @@ public Message openFile( } // 增加字段屏蔽 Object resultmap = newMap == null ? metaMap : newMap; - if (FileSource$.MODULE$.isResultSet(fsPath.getPath()) - && StringUtils.isNotBlank(maskedFieldNames)) { - // 如果结果集并且屏蔽字段不为空,则执行屏蔽逻辑,反之则保持原逻辑 - Set maskedFields = - new HashSet<>(Arrays.asList(maskedFieldNames.toLowerCase().split(","))); - Map[] metadata = filterMaskedFieldsFromMetadata(resultmap, maskedFields); - List fileContent = - removeFieldsFromContent(resultmap, result.getSecond(), maskedFields); - message.data("metadata", metadata).data("fileContent", fileContent); + if (FileSource$.MODULE$.isResultSet(fsPath.getPath()) && (resultmap instanceof Map[])) { + // 2. 类型转换(已通过校验,可安全强转) + Map[] filteredMetadata = (Map[]) resultmap; + List filteredContent = result.getSecond(); + // 优先过滤屏蔽字段 + if (LinkisStorageConf.FIELD_MASKED_ENABLED() + && StringUtils.isNotBlank(maskedFieldNames)) { + Set maskedFields = + new HashSet<>(Arrays.asList(maskedFieldNames.toLowerCase().split(","))); + filteredMetadata = ResultUtils.filterMaskedFieldsFromMetadata(resultmap, maskedFields); + filteredContent = + ResultUtils.removeFieldsFromContent(resultmap, filteredContent, maskedFields); + } + // 优先截取大字段 + if (LinkisStorageConf.FIELD_TRUNCATION_ENABLED()) { + FieldTruncationResult fieldTruncationResult = + ResultUtils.detectAndHandle( + filteredMetadata, + filteredContent, + LinkisStorageConf.FIELD_VIEW_MAX_LENGTH(), + false); + if (fieldTruncationResult.isHasOversizedFields()) { + // 检测到超长字段 + if (null == truncateColumn) { + message.data("oversizedFields", fieldTruncationResult.getOversizedFields()); + message.data("zh_msg", truncateColumn_msg); + message.data("en_msg", truncateColumn_en_msg); + return message; + } + boolean truncateColumnSwitch = Boolean.parseBoolean(truncateColumn); + if (truncateColumnSwitch) { + // 用户选择截取 + FieldTruncationResult truncationResult = + ResultUtils.detectAndHandle( + filteredMetadata, + filteredContent, + LinkisStorageConf.FIELD_VIEW_MAX_LENGTH(), + truncateColumnSwitch); + filteredContent = truncationResult.getData(); + + } else { + // 用户未选择截取,提示用户 + message.data("type", fileSource.getFileSplits()[0].type()); + message.data("display_prohibited", true); + message.data("zh_msg", zh_msg); + message.data("en_msg", en_msg); + return message; + } + } + } + if (StringUtils.isNotBlank(maskedFieldNames) + || LinkisStorageConf.FIELD_TRUNCATION_ENABLED()) { + message.data("metadata", filteredMetadata).data("fileContent", filteredContent); + } else { + // 不执行字段屏蔽也不执行字段截取 + message.data("metadata", resultmap).data("fileContent", result.getSecond()); + } } else { + // 不执行字段屏蔽也不执行字段截取 message.data("metadata", resultmap).data("fileContent", result.getSecond()); } message.data("type", fileSource.getFileSplits()[0].type()); @@ -756,17 +823,8 @@ public Message openFile( LOGGER.info("Failed to open file {}", path, e); message.data("type", fileSource.getFileSplits()[0].type()); message.data("display_prohibited", true); - message.data( - "zh_msg", - MessageFormat.format( - "结果集存在字段值字符数超过{0},如需查看全部数据请导出文件或使用字符串截取函数(substring、substr)截取相关字符即可前端展示数据内容", - LinkisStorageConf.LINKIS_RESULT_COL_LENGTH())); - message.data( - "en_msg", - MessageFormat.format( - "There is a field value exceed {0} characters or col size exceed {1} in the result set. If you want to view it, please use the result set export function.", - LinkisStorageConf.LINKIS_RESULT_COL_LENGTH(), - LinkisStorageConf.LINKIS_RESULT_COLUMN_SIZE())); + message.data("zh_msg", zh_msg); + message.data("en_msg", en_msg); return message; } } finally { @@ -778,87 +836,6 @@ public Message openFile( IOUtils.closeQuietly(fileSource); } } - /** - * 删除指定字段的内容 - * - * @param metadata 元数据数组,包含字段信息 - * @param contentList 需要处理的二维字符串数组 - * @param fieldsToRemove 需要删除的字段集合 - * @return 处理后的字符串数组,若输入无效返回空集合而非null - */ - @SuppressWarnings("unchecked") - private List removeFieldsFromContent( - Object metadata, List contentList, Set fieldsToRemove) { - // 1. 参数校验 - if (metadata == null - || fieldsToRemove == null - || fieldsToRemove.isEmpty() - || contentList == null - || !(metadata instanceof Map[])) { - return contentList; - } - - // 2. 安全类型转换 - Map[] fieldMetadata = (Map[]) metadata; - - // 3. 收集需要删除的列索引(去重并排序) - List columnsToRemove = - IntStream.range(0, fieldMetadata.length) - .filter( - i -> { - Map meta = fieldMetadata[i]; - Object columnName = meta.get("columnName"); - return columnName != null - && fieldsToRemove.contains(columnName.toString().toLowerCase()); - }) - .distinct() - .boxed() - .sorted((a, b) -> Integer.compare(b, a)) - .collect(Collectors.toList()); - - // 如果没有需要删除的列,直接返回副本 - if (columnsToRemove.isEmpty()) { - return new ArrayList<>(contentList); - } - // 4. 对每行数据进行处理(删除指定列) - return contentList.stream() - .map( - row -> { - if (row == null || row.length == 0) { - return row; - } - // 创建可变列表以便删除元素 - List rowList = new ArrayList<>(Arrays.asList(row)); - // 从后向前删除列,避免索引变化问题 - for (int columnIndex : columnsToRemove) { - if (columnIndex < rowList.size()) { - rowList.remove(columnIndex); - } - } - return rowList.toArray(new String[0]); - }) - .collect(Collectors.toList()); - } - - @SuppressWarnings("unchecked") - private Map[] filterMaskedFieldsFromMetadata(Object metadata, Set maskedFields) { - // 1. 参数校验 - if (metadata == null || maskedFields == null || !(metadata instanceof Map[])) { - return new Map[0]; - } - - // 2. 类型转换(已通过校验,可安全强转) - Map[] originalMaps = (Map[]) metadata; - - // 3. 过滤逻辑(提取谓词增强可读性) - Predicate> isNotMaskedField = - map -> !maskedFields.contains(map.get("columnName").toString().toLowerCase()); - - // 4. 流处理 + 结果转换 - return Arrays.stream(originalMaps) - .filter(isNotMaskedField) - .toArray(Map[]::new); // 等价于 toArray(new Map[0]) - } /** * 组装获取列索引 @@ -970,7 +947,12 @@ public Message saveScript(HttpServletRequest req, @RequestBody Map