diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 21cd3f70554e..873131561526 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1452,6 +1452,24 @@ String The Variant shredding schema for writing. + +
vector-field
+ (none) + String + Specify the vector store fields. + + +
vector.file.format
+ (none) + String + Specify the vector store file format. + + +
vector.target-file-size
+ (none) + MemorySize + Target size of a vector-store file. Default is 10 * TARGET_FILE_SIZE. +
visibility-callback.check-interval
10 s diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index ee3f8dd7e215..922eb1deba65 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2230,6 +2230,29 @@ public InlineElement getDescription() { .withDescription( "The interval for checking visibility when visibility-callback enabled."); + public static final ConfigOption VECTOR_STORE_FORMAT = + key("vector.file.format") + .stringType() + .noDefaultValue() + .withDescription("Specify the vector store file format."); + + public static final ConfigOption VECTOR_STORE_FIELDS = + key("vector-field") + .stringType() + .noDefaultValue() + .withDescription("Specify the vector store fields."); + + public static final ConfigOption VECTOR_STORE_TARGET_FILE_SIZE = + key("vector.target-file-size") + .memoryType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Target size of a vector-store file." + + " Default is 10 * TARGET_FILE_SIZE.") + .build()); + private final Options options; public CoreOptions(Map options) { @@ -3469,6 +3492,26 @@ public Duration visibilityCallbackCheckInterval() { return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL); } + public String vectorStoreFileFormatString() { + return normalizeFileFormat(options.get(VECTOR_STORE_FORMAT)); + } + + public List vectorStoreFieldNames() { + String vectorStoreFields = options.get(CoreOptions.VECTOR_STORE_FIELDS); + if (vectorStoreFields == null || vectorStoreFields.trim().isEmpty()) { + return new ArrayList<>(); + } else { + return Arrays.asList(vectorStoreFields.split(",")); + } + } + + public long vectorStoreTargetFileSize() { + // Since vectors are large, it would be better to set a larger target size for vectors. + return options.getOptional(VECTOR_STORE_TARGET_FILE_SIZE) + .map(MemorySize::getBytes) + .orElse(10 * targetFileSize(false)); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 90f52099b4f9..c150269d04f6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -107,6 +107,17 @@ public static FileFormat fileFormat(CoreOptions options) { return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration()); } + public static FileFormat vectorStoreFileFormat(CoreOptions options) { + if (options.vectorStoreFieldNames().isEmpty()) { + return null; + } + String vectorStoreFileFormat = options.vectorStoreFileFormatString(); + if (vectorStoreFileFormat == null) { + return fileFormat(options); + } + return FileFormat.fromIdentifier(vectorStoreFileFormat, options.toConfiguration()); + } + public static FileFormat manifestFormat(CoreOptions options) { return FileFormat.fromIdentifier(options.manifestFormatString(), options.toConfiguration()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index cd7acb4a3390..92e8e60c2e14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -52,6 +52,7 @@ import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter; import org.apache.paimon.utils.SinkWriter.DirectSinkWriter; import org.apache.paimon.utils.StatsCollectorFactories; +import org.apache.paimon.utils.VectorStoreUtils; import javax.annotation.Nullable; @@ -73,8 +74,11 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { private final FileIO fileIO; private final long schemaId; private final FileFormat fileFormat; + private final FileFormat vectorStoreFileFormat; + private final List vectorStoreFieldNames; private final long targetFileSize; private final long blobTargetFileSize; + private final long vectorStoreTargetFileSize; private final RowType writeSchema; @Nullable private final List writeCols; private final DataFilePathFactory pathFactory; @@ -105,8 +109,11 @@ public AppendOnlyWriter( @Nullable IOManager ioManager, long schemaId, FileFormat fileFormat, + FileFormat vectorStoreFileFormat, + List vectorStoreFieldNames, long targetFileSize, long blobTargetFileSize, + long vectorStoreTargetFileSize, RowType writeSchema, @Nullable List writeCols, long maxSequenceNumber, @@ -129,8 +136,11 @@ public AppendOnlyWriter( this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; + this.vectorStoreFileFormat = vectorStoreFileFormat; + this.vectorStoreFieldNames = vectorStoreFieldNames; this.targetFileSize = targetFileSize; this.blobTargetFileSize = blobTargetFileSize; + this.vectorStoreTargetFileSize = vectorStoreTargetFileSize; this.writeSchema = writeSchema; this.writeCols = writeCols; this.pathFactory = pathFactory; @@ -304,13 +314,25 @@ public void toBufferedWriter() throws Exception { } private RollingFileWriter createRollingRowWriter() { - if (writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB))) { - return new RollingBlobFileWriter( + boolean hasNormal = + writeSchema.getFields().stream() + .anyMatch( + f -> + !f.type().is(BLOB) + && !vectorStoreFieldNames.contains(f.name())); + boolean hasBlob = writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB)); + boolean hasSeparatedVectorStore = + VectorStoreUtils.isDifferentFormat(vectorStoreFileFormat, fileFormat); + if (hasBlob || (hasNormal && hasSeparatedVectorStore)) { + return new DataEvolutionRollingFileWriter( fileIO, schemaId, fileFormat, + vectorStoreFileFormat, + vectorStoreFieldNames, targetFileSize, blobTargetFileSize, + vectorStoreTargetFileSize, writeSchema, pathFactory, seqNumCounterProvider, @@ -326,13 +348,20 @@ private RollingFileWriter createRollingRowWriter() { statsDenseStore, blobConsumer); } + FileFormat realFileFormat = hasNormal ? fileFormat : vectorStoreFileFormat; + long realTargetFileSize = hasNormal ? targetFileSize : vectorStoreTargetFileSize; + DataFilePathFactory realPathFactory = + hasNormal + ? pathFactory + : pathFactory.vectorStorePathFactory( + vectorStoreFileFormat.getFormatIdentifier()); return new RowDataRollingFileWriter( fileIO, schemaId, - fileFormat, - targetFileSize, + realFileFormat, + realTargetFileSize, writeSchema, - pathFactory, + realPathFactory, seqNumCounterProvider, fileCompression, statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()), diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java similarity index 62% rename from paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java rename to paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java index 80faa01d9238..4494794150da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java @@ -28,14 +28,18 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.FileWriterAbortExecutor; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.io.RollingFileWriterImpl; import org.apache.paimon.io.RowDataFileWriter; import org.apache.paimon.io.SingleFileWriter; import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StatsCollectorFactories; +import org.apache.paimon.utils.VectorStoreUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,9 +74,10 @@ * * */ -public class RollingBlobFileWriter implements RollingFileWriter { +public class DataEvolutionRollingFileWriter + implements RollingFileWriter { - private static final Logger LOG = LoggerFactory.getLogger(RollingBlobFileWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(DataEvolutionRollingFileWriter.class); /** Constant for checking rolling condition periodically. */ private static final long CHECK_ROLLING_RECORD_CNT = 1000L; @@ -82,6 +87,10 @@ public class RollingBlobFileWriter implements RollingFileWriter, DataFileMeta>> writerFactory; private final Supplier blobWriterFactory; + private final Supplier< + ProjectedFileWriter< + RollingFileWriterImpl, List>> + vectorStoreWriterFactory; private final long targetFileSize; // State management @@ -91,15 +100,21 @@ public class RollingBlobFileWriter implements RollingFileWriter, DataFileMeta> currentWriter; private MultipleBlobFileWriter blobWriter; + private ProjectedFileWriter< + RollingFileWriterImpl, List> + vectorStoreWriter; private long recordCount = 0; private boolean closed = false; - public RollingBlobFileWriter( + public DataEvolutionRollingFileWriter( FileIO fileIO, long schemaId, FileFormat fileFormat, + FileFormat vectorStoreFileFormat, + List vectorStoreFieldNames, long targetFileSize, long blobTargetFileSize, + long vectorStoreTargetFileSize, RowType writeSchema, DataFilePathFactory pathFactory, Supplier seqNumCounterSupplier, @@ -115,13 +130,25 @@ public RollingBlobFileWriter( this.results = new ArrayList<>(); this.closedWriters = new ArrayList<>(); + // Split into normal and vector-store parts + RowType normalRowType = BlobType.splitBlob(writeSchema).getLeft(); + RowType vectorStoreRowType; + if (VectorStoreUtils.isDifferentFormat(vectorStoreFileFormat, fileFormat)) { + Pair typeWithVectorStore = + VectorStoreUtils.splitVectorStore(normalRowType, vectorStoreFieldNames); + normalRowType = typeWithVectorStore.getLeft(); + vectorStoreRowType = typeWithVectorStore.getRight(); + } else { + vectorStoreRowType = new RowType(Collections.emptyList()); + } + // Initialize writer factory for normal data this.writerFactory = createNormalWriterFactory( fileIO, schemaId, fileFormat, - BlobType.splitBlob(writeSchema).getLeft(), + normalRowType, writeSchema, pathFactory, seqNumCounterSupplier, @@ -133,19 +160,45 @@ public RollingBlobFileWriter( statsDenseStore); // Initialize blob writer - this.blobWriterFactory = - () -> - new MultipleBlobFileWriter( - fileIO, - schemaId, - writeSchema, - pathFactory, - seqNumCounterSupplier, - fileSource, - asyncFileWrite, - statsDenseStore, - blobTargetFileSize, - blobConsumer); + if (!BlobType.splitBlob(writeSchema).getRight().getFields().isEmpty()) { + this.blobWriterFactory = + () -> + new MultipleBlobFileWriter( + fileIO, + schemaId, + writeSchema, + pathFactory, + seqNumCounterSupplier, + fileSource, + asyncFileWrite, + statsDenseStore, + blobTargetFileSize, + blobConsumer); + } else { + this.blobWriterFactory = null; + } + + // Initialize vector-store writer + if (!vectorStoreRowType.getFields().isEmpty()) { + this.vectorStoreWriterFactory = + () -> + createVectorStoreWriter( + fileIO, + vectorStoreFileFormat, + schemaId, + vectorStoreRowType, + writeSchema, + pathFactory, + seqNumCounterSupplier, + fileCompression, + statsCollectorFactories.statsCollectors(vectorStoreFieldNames), + fileSource, + asyncFileWrite, + statsDenseStore, + vectorStoreTargetFileSize); + } else { + this.vectorStoreWriterFactory = null; + } } /** Creates a factory for normal data writers. */ @@ -192,6 +245,53 @@ public RollingBlobFileWriter( }; } + /** Creates a vector-store writer for handling vector-store data. */ + private static ProjectedFileWriter< + RollingFileWriterImpl, List> + createVectorStoreWriter( + FileIO fileIO, + FileFormat vectorStoreFileFormat, + long schemaId, + RowType vectorStoreRowType, + RowType writeSchema, + DataFilePathFactory pathFactory, + Supplier seqNumCounterSupplier, + String fileCompression, + SimpleColStatsCollector.Factory[] statsCollectors, + FileSource fileSource, + boolean asyncFileWrite, + boolean statsDenseStore, + long targetFileSize) { + + List vectorStoreFieldNames = vectorStoreRowType.getFieldNames(); + + int[] vectorStoreProjection = writeSchema.projectIndexes(vectorStoreFieldNames); + DataFilePathFactory vectorStorePathFactory = + pathFactory.vectorStorePathFactory(vectorStoreFileFormat.getFormatIdentifier()); + return new ProjectedFileWriter<>( + new RollingFileWriterImpl<>( + () -> + new RowDataFileWriter( + fileIO, + RollingFileWriter.createFileWriterContext( + vectorStoreFileFormat, + vectorStoreRowType, + statsCollectors, + fileCompression), + vectorStorePathFactory.newPath(), + vectorStoreRowType, + schemaId, + seqNumCounterSupplier, + new FileIndexOptions(), + fileSource, + asyncFileWrite, + statsDenseStore, + pathFactory.isExternalPath(), + vectorStoreFieldNames), + targetFileSize), + vectorStoreProjection); + } + /** * Writes a single row to both normal and blob writers. Automatically handles file rolling when * target size is reached. @@ -205,11 +305,19 @@ public void write(InternalRow row) throws IOException { if (currentWriter == null) { currentWriter = writerFactory.get(); } - if (blobWriter == null) { + if ((blobWriter == null) && (blobWriterFactory != null)) { blobWriter = blobWriterFactory.get(); } + if ((vectorStoreWriter == null) && (vectorStoreWriterFactory != null)) { + vectorStoreWriter = vectorStoreWriterFactory.get(); + } currentWriter.write(row); - blobWriter.write(row); + if (blobWriter != null) { + blobWriter.write(row); + } + if (vectorStoreWriter != null) { + vectorStoreWriter.write(row); + } recordCount++; if (rollingFile()) { @@ -269,6 +377,10 @@ public void abort() { blobWriter.abort(); blobWriter = null; } + if (vectorStoreWriter != null) { + vectorStoreWriter.abort(); + vectorStoreWriter = null; + } } /** Checks if the current file should be rolled based on size and record count. */ @@ -295,12 +407,16 @@ private void closeCurrentWriter() throws IOException { // Close blob writer and process blob metadata List blobMetas = closeBlobWriter(); + // Close vector-store writer and process vector-store metadata + List vectorStoreMetas = closeVectorStoreWriter(); + // Validate consistency between main and blob files - validateFileConsistency(mainDataFileMeta, blobMetas); + validateFileConsistency(mainDataFileMeta, blobMetas, vectorStoreMetas); // Add results to the results list results.add(mainDataFileMeta); results.addAll(blobMetas); + results.addAll(vectorStoreMetas); // Reset current writer currentWriter = null; @@ -324,9 +440,22 @@ private List closeBlobWriter() throws IOException { return results; } + /** Closes the vector-store writer and processes blob metadata with appropriate tags. */ + private List closeVectorStoreWriter() throws IOException { + if (vectorStoreWriter == null) { + return Collections.emptyList(); + } + vectorStoreWriter.close(); + List results = vectorStoreWriter.result(); + vectorStoreWriter = null; + return results; + } + /** Validates that the row counts match between main and blob files. */ private void validateFileConsistency( - DataFileMeta mainDataFileMeta, List blobTaggedMetas) { + DataFileMeta mainDataFileMeta, + List blobTaggedMetas, + List vectorStoreMetas) { long mainRowCount = mainDataFileMeta.rowCount(); Map blobRowCounts = new HashMap<>(); @@ -334,6 +463,9 @@ private void validateFileConsistency( long count = file.rowCount(); blobRowCounts.compute(file.writeCols().get(0), (k, v) -> v == null ? count : v + count); } + long vectorStoreRowCount = + vectorStoreMetas.stream().mapToLong(DataFileMeta::rowCount).sum(); + for (String blobFieldName : blobRowCounts.keySet()) { long blobRowCount = blobRowCounts.get(blobFieldName); if (mainRowCount != blobRowCount) { @@ -344,6 +476,13 @@ private void validateFileConsistency( mainDataFileMeta, mainRowCount, blobFieldName, blobRowCount)); } } + if (!vectorStoreMetas.isEmpty() && (mainRowCount != vectorStoreRowCount)) { + throw new IllegalStateException( + String.format( + "This is a bug: The row count of main file and vector-store files does not match. " + + "Main file: %s (row count: %d), vector-store files: %s (total row count: %d)", + mainDataFileMeta, mainRowCount, vectorStoreMetas, vectorStoreRowCount)); + } } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java index c69932ed157f..0b107b338f9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java @@ -47,6 +47,7 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** Compact coordinator to compact data evolution table. */ public class DataEvolutionCompactCoordinator { @@ -174,14 +175,19 @@ List compactPlan(List input) { for (List group : ranges) { List dataFiles = new ArrayList<>(); List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); TreeMap treeMap = new TreeMap<>(); Map> dataFileToBlobFiles = new HashMap<>(); + Map> dataFileToVectorStoreFiles = + new HashMap<>(); for (DataFileMeta f : group) { - if (!isBlobFile(f.fileName())) { + if (isBlobFile(f.fileName())) { + blobFiles.add(f); + } else if (isVectorStoreFile(f.fileName())) { + vectorStoreFiles.add(f); + } else { treeMap.put(f.nonNullFirstRowId(), f); dataFiles.add(f); - } else { - blobFiles.add(f); } } @@ -203,6 +209,25 @@ List compactPlan(List input) { } } } + if (false) { + // associate vector-store files to data files + for (DataFileMeta vectorStoreFile : vectorStoreFiles) { + Long key = treeMap.floorKey(vectorStoreFile.nonNullFirstRowId()); + if (key != null) { + DataFileMeta dataFile = treeMap.get(key); + if (vectorStoreFile.nonNullFirstRowId() + >= dataFile.nonNullFirstRowId() + && vectorStoreFile.nonNullFirstRowId() + <= dataFile.nonNullFirstRowId() + + dataFile.rowCount() + - 1) { + dataFileToVectorStoreFiles + .computeIfAbsent(dataFile, k -> new ArrayList<>()) + .add(vectorStoreFile); + } + } + } + } RangeHelper rangeHelper2 = new RangeHelper<>(DataFileMeta::nonNullRowIdRange); @@ -222,10 +247,19 @@ List compactPlan(List input) { .sum(); if (currentGroupWeight > targetFileSize) { // compact current file group to merge field files - tasks.addAll(triggerTask(fileGroup, partition, dataFileToBlobFiles)); + tasks.addAll( + triggerTask( + fileGroup, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); // compact wait compact files tasks.addAll( - triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); + triggerTask( + waitCompactFiles, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); waitCompactFiles = new ArrayList<>(); weightSum = 0; } else { @@ -234,13 +268,21 @@ List compactPlan(List input) { if (weightSum > targetFileSize) { tasks.addAll( triggerTask( - waitCompactFiles, partition, dataFileToBlobFiles)); + waitCompactFiles, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); waitCompactFiles = new ArrayList<>(); weightSum = 0L; } } } - tasks.addAll(triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); + tasks.addAll( + triggerTask( + waitCompactFiles, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); } } return tasks; @@ -249,7 +291,8 @@ List compactPlan(List input) { private List triggerTask( List dataFiles, BinaryRow partition, - Map> dataFileToBlobFiles) { + Map> dataFileToBlobFiles, + Map> dataFileToVectorStoreFiles) { List tasks = new ArrayList<>(); if (dataFiles.size() >= compactMinFileNum) { tasks.add(new DataEvolutionCompactTask(partition, dataFiles, false)); @@ -265,6 +308,18 @@ private List triggerTask( tasks.add(new DataEvolutionCompactTask(partition, blobFiles, true)); } } + + if (false) { + List vectorStoreFiles = new ArrayList<>(); + for (DataFileMeta dataFile : dataFiles) { + vectorStoreFiles.addAll( + dataFileToVectorStoreFiles.getOrDefault( + dataFile, Collections.emptyList())); + } + if (vectorStoreFiles.size() >= compactMinFileNum) { + tasks.add(new DataEvolutionCompactTask(partition, vectorStoreFiles, false)); + } + } return tasks; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java index abbc03f17baf..da8d43c05f7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java @@ -23,6 +23,7 @@ import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormat; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -36,6 +37,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; +import org.apache.paimon.utils.VectorStoreUtils; import java.util.Collections; import java.util.List; @@ -68,6 +70,16 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E // TODO: support blob file compaction throw new UnsupportedOperationException("Blob task is not supported"); } + if (VectorStoreUtils.isVectorStoreFile(compactBefore.get(0).fileName())) { + // TODO: support vector-store file compaction + throw new UnsupportedOperationException("Vector-store task is not supported"); + } + List separatedVectorStoreFields = + VectorStoreUtils.isDifferentFormat( + FileFormat.vectorStoreFileFormat(table.coreOptions()), + FileFormat.fileFormat(table.coreOptions())) + ? table.coreOptions().vectorStoreFieldNames() + : Collections.emptyList(); table = table.copy(DYNAMIC_WRITE_OPTIONS); long firstRowId = compactBefore.get(0).nonNullFirstRowId(); @@ -76,6 +88,7 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E new RowType( table.rowType().getFields().stream() .filter(f -> f.type().getTypeRoot() != DataTypeRoot.BLOB) + .filter(f -> !separatedVectorStoreFields.contains(f.name())) .collect(Collectors.toList())); FileStorePathFactory pathFactory = table.store().pathFactory(); AppendOnlyFileStore store = (AppendOnlyFileStore) table.store(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index b63a1c0b7a79..6318d70b7026 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -59,14 +59,36 @@ public DataFilePathFactory( boolean fileSuffixIncludeCompression, String fileCompression, @Nullable ExternalPathProvider externalPathProvider) { + this( + parent, + UUID.randomUUID().toString(), + new AtomicInteger(0), + formatIdentifier, + dataFilePrefix, + changelogFilePrefix, + fileSuffixIncludeCompression, + compressFileExtension(fileCompression), + externalPathProvider); + } + + private DataFilePathFactory( + Path parent, + String uuid, + AtomicInteger pathCount, + String formatIdentifier, + String dataFilePrefix, + String changelogFilePrefix, + boolean fileSuffixIncludeCompression, + @Nullable String compressExtension, + @Nullable ExternalPathProvider externalPathProvider) { this.parent = parent; - this.uuid = UUID.randomUUID().toString(); - this.pathCount = new AtomicInteger(0); + this.uuid = uuid; + this.pathCount = pathCount; this.formatIdentifier = formatIdentifier; this.dataFilePrefix = dataFilePrefix; this.changelogFilePrefix = changelogFilePrefix; this.fileSuffixIncludeCompression = fileSuffixIncludeCompression; - this.compressExtension = compressFileExtension(fileCompression); + this.compressExtension = compressExtension; this.externalPathProvider = externalPathProvider; } @@ -99,6 +121,10 @@ public Path newPath(String prefix) { } private String newFileName(String prefix) { + return newFileName(prefix, makeExtension(compressExtension, formatIdentifier)); + } + + protected String makeExtension(String compressExtension, String formatIdentifier) { String extension; if (compressExtension != null && isTextFormat(formatIdentifier)) { extension = "." + formatIdentifier + "." + compressExtension; @@ -107,7 +133,7 @@ private String newFileName(String prefix) { } else { extension = "." + formatIdentifier; } - return newFileName(prefix, extension); + return extension; } public Path newPathFromExtension(String extension) { @@ -121,7 +147,7 @@ public Path newPathFromName(String fileName) { return new Path(parent, fileName); } - private String newFileName(String prefix, String extension) { + protected String newFileName(String prefix, String extension) { return prefix + uuid + "-" + pathCount.getAndIncrement() + extension; } @@ -211,4 +237,28 @@ private static String compressFileExtension(String compression) { } return compression; } + + public DataFilePathFactory vectorStorePathFactory(String formatIdentifier) { + return new VectorStoreWrapper(this, formatIdentifier); + } + + private static class VectorStoreWrapper extends DataFilePathFactory { + private VectorStoreWrapper(DataFilePathFactory base, String formatIdentifier) { + super( + base.parent, + base.uuid, + base.pathCount, + formatIdentifier, + base.dataFilePrefix, + base.changelogFilePrefix, + base.fileSuffixIncludeCompression, + base.compressExtension, + base.externalPathProvider); + } + + @Override + protected String makeExtension(String compressExtension, String formatIdentifier) { + return ".vector-store" + super.makeExtension(compressExtension, formatIdentifier); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index bc39a4ae8e76..aabaa11a3e50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -132,8 +132,11 @@ protected RecordWriter createWriter( ioManager, schemaId, fileFormat, + FileFormat.vectorStoreFileFormat(options), + options.vectorStoreFieldNames(), options.targetFileSize(false), options.blobTargetFileSize(), + options.vectorStoreTargetFileSize(), writeType, writeCols, restoredMaxSeqNumber, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java index eb6a8360f2c1..b16290be535a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java @@ -53,6 +53,7 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.utils.Preconditions.checkNotNull; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** {@link FileStoreScan} for data-evolution enabled table. */ public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan { @@ -156,10 +157,11 @@ static EvolutionStats evolutionStats( TableSchema schema, Function scanTableSchema, List metas) { - // exclude blob files, useless for predicate eval + // exclude blob and vector-store files, useless for predicate eval metas = metas.stream() .filter(entry -> !isBlobFile(entry.file().fileName())) + .filter(entry -> !isVectorStoreFile(entry.file().fileName())) .collect(Collectors.toList()); ToLongFunction maxSeqFunc = e -> e.file().maxSequenceNumber(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 153fc4e6ce66..3a36340d6191 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -62,6 +62,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; import java.util.function.Function; import java.util.function.ToLongFunction; import java.util.stream.Collectors; @@ -72,6 +74,8 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** * A union {@link SplitRead} to read multiple inner files to merge columns, note that this class @@ -216,13 +220,10 @@ private DataEvolutionFileReader createUnionReader( needMergeFiles, file -> { checkArgument( - isBlobFile(file.fileName()), - "Only blob file need to call this method."); - return schemaFetcher - .apply(file.schemaId()) - .logicalRowType() - .getField(file.writeCols().get(0)) - .id(); + isBlobFile(file.fileName()) + || isVectorStoreFile(file.fileName()), + "Only blob/vector-store files need to call this method."); + return schemaFetcher.apply(file.schemaId()).logicalRowType(); }); long rowCount = fieldsFiles.get(0).rowCount(); @@ -409,25 +410,39 @@ private FileRecordReader createFileReader( @VisibleForTesting public static List splitFieldBunches( - List needMergeFiles, Function blobFileToFieldId) { - return splitFieldBunches(needMergeFiles, blobFileToFieldId, false); + List needMergeFiles, Function fileToRowType) { + return splitFieldBunches(needMergeFiles, fileToRowType, false); } @VisibleForTesting public static List splitFieldBunches( List needMergeFiles, - Function blobFileToFieldId, + Function fileToRowType, boolean rowIdPushDown) { List fieldsFiles = new ArrayList<>(); - Map blobBunchMap = new HashMap<>(); + Map blobBunchMap = new HashMap<>(); + Map vectorStoreBunchMap = new TreeMap<>(); long rowCount = -1; for (DataFileMeta file : needMergeFiles) { if (isBlobFile(file.fileName())) { - int fieldId = blobFileToFieldId.apply(file); + RowType rowType = fileToRowType.apply(file); + int fieldId = rowType.getField(file.writeCols().get(0)).id(); final long expectedRowCount = rowCount; blobBunchMap .computeIfAbsent( - fieldId, key -> new BlobBunch(expectedRowCount, rowIdPushDown)) + fieldId, key -> new SplitBunch(expectedRowCount, rowIdPushDown)) + .add(file); + } else if (isVectorStoreFile(file.fileName())) { + RowType rowType = fileToRowType.apply(file); + String fileFormat = DataFilePathFactory.formatIdentifier(file.fileName()); + VectorStoreBunchKey vectorStoreKey = + new VectorStoreBunchKey( + file.schemaId(), fileFormat, file.writeCols(), rowType); + final long expectedRowCount = rowCount; + vectorStoreBunchMap + .computeIfAbsent( + vectorStoreKey, + key -> new SplitBunch(expectedRowCount, rowIdPushDown)) .add(file); } else { // Normal file, just add it to the current merge split @@ -436,6 +451,7 @@ public static List splitFieldBunches( } } fieldsFiles.addAll(blobBunchMap.values()); + fieldsFiles.addAll(vectorStoreBunchMap.values()); return fieldsFiles; } @@ -467,7 +483,7 @@ public List files() { } @VisibleForTesting - static class BlobBunch implements FieldBunch { + static class SplitBunch implements FieldBunch { final List files; final long expectedRowCount; @@ -478,7 +494,7 @@ static class BlobBunch implements FieldBunch { long latestMaxSequenceNumber = -1; long rowCount; - BlobBunch(long expectedRowCount, boolean rowIdPushDown) { + SplitBunch(long expectedRowCount, boolean rowIdPushDown) { this.files = new ArrayList<>(); this.rowCount = 0; this.expectedRowCount = expectedRowCount; @@ -486,14 +502,15 @@ static class BlobBunch implements FieldBunch { } void add(DataFileMeta file) { - if (!isBlobFile(file.fileName())) { - throw new IllegalArgumentException("Only blob file can be added to a blob bunch."); + if (!isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) { + throw new IllegalArgumentException( + "Only blob/vector-store file can be added to this bunch."); } if (file.nonNullFirstRowId() == latestFistRowId) { if (file.maxSequenceNumber() >= latestMaxSequenceNumber) { throw new IllegalArgumentException( - "Blob file with same first row id should have decreasing sequence number."); + "Blob/vector-store file with same first row id should have decreasing sequence number."); } return; } @@ -512,11 +529,11 @@ void add(DataFileMeta file) { if (firstRowId < expectedNextFirstRowId) { checkArgument( file.maxSequenceNumber() < latestMaxSequenceNumber, - "Blob file with overlapping row id should have decreasing sequence number."); + "Blob/vector-store file with overlapping row id should have decreasing sequence number."); return; } else if (firstRowId > expectedNextFirstRowId) { throw new IllegalArgumentException( - "Blob file first row id should be continuous, expect " + "Blob/vector-store file first row id should be continuous, expect " + expectedNextFirstRowId + " but got " + firstRowId); @@ -525,17 +542,17 @@ void add(DataFileMeta file) { if (!files.isEmpty()) { checkArgument( file.schemaId() == files.get(0).schemaId(), - "All files in a blob bunch should have the same schema id."); + "All files in this bunch should have the same schema id."); checkArgument( file.writeCols().equals(files.get(0).writeCols()), - "All files in a blob bunch should have the same write columns."); + "All files in this bunch should have the same write columns."); } } files.add(file); rowCount += file.rowCount(); checkArgument( rowCount <= expectedRowCount, - "Blob files row count exceed the expect " + expectedRowCount); + "Blob/vector-store files row count exceed the expect " + expectedRowCount); this.latestMaxSequenceNumber = file.maxSequenceNumber(); this.latestFistRowId = file.nonNullFirstRowId(); this.expectedNextFirstRowId = latestFistRowId + file.rowCount(); @@ -558,14 +575,17 @@ public static List> mergeRangesAndSort(List fil RangeHelper rangeHelper = new RangeHelper<>(DataFileMeta::nonNullRowIdRange); List> result = rangeHelper.mergeOverlappingRanges(files); - // in group, sort by blob file and max_seq + // in group, sort by blob/vector-store file and max_seq for (List group : result) { - // split to data files and blob files + // split to data files, blob files, vector-store files List dataFiles = new ArrayList<>(); List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); for (DataFileMeta f : group) { if (isBlobFile(f.fileName())) { blobFiles.add(f); + } else if (isVectorStoreFile(f.fileName())) { + vectorStoreFiles.add(f); } else { dataFiles.add(f); } @@ -583,12 +603,111 @@ public static List> mergeRangesAndSort(List fil comparingLong(DataFileMeta::nonNullFirstRowId) .thenComparing(reverseOrder(comparingLong(maxSeqF)))); - // concat data files and blob files + // vector-store files sort by first row id then by reversed max sequence number + vectorStoreFiles.sort( + comparingLong(DataFileMeta::nonNullFirstRowId) + .thenComparing(reverseOrder(comparingLong(maxSeqF)))); + + // concat data files, blob files, vector-store files group.clear(); group.addAll(dataFiles); group.addAll(blobFiles); + group.addAll(vectorStoreFiles); } return result; } + + static final class VectorStoreBunchKey implements Comparable { + public final long schemaId; + public final String formatIdentifier; + public final List writeCols; + + public VectorStoreBunchKey( + long schemaId, + String formatIdentifier, + List writeCols, + RowType preferredColOrder) { + this.schemaId = schemaId; + this.formatIdentifier = checkNotNull(formatIdentifier, "formatIdentifier"); + this.writeCols = normalizeWriteCols(writeCols, preferredColOrder); + } + + @Override + public int compareTo(VectorStoreBunchKey o) { + int c = Long.compare(this.schemaId, o.schemaId); + if (c != 0) { + return c; + } + + c = this.formatIdentifier.compareTo(o.formatIdentifier); + if (c != 0) { + return c; + } + + int n = Math.min(this.writeCols.size(), o.writeCols.size()); + for (int i = 0; i < n; i++) { + c = this.writeCols.get(i).compareTo(o.writeCols.get(i)); + if (c != 0) { + return c; + } + } + return Integer.compare(this.writeCols.size(), o.writeCols.size()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof VectorStoreBunchKey)) { + return false; + } + VectorStoreBunchKey that = (VectorStoreBunchKey) o; + return schemaId == that.schemaId + && formatIdentifier.equals(that.formatIdentifier) + && writeCols.equals(that.writeCols); + } + + @Override + public int hashCode() { + return Objects.hash(schemaId, formatIdentifier, writeCols); + } + + @Override + public String toString() { + return "VectorStoreBunchKey{schemaId=" + + schemaId + + ", format=" + + formatIdentifier + + ", writeCols=" + + writeCols + + "}"; + } + + private static List normalizeWriteCols(List writeCols, RowType rowType) { + if (writeCols == null || writeCols.isEmpty()) { + return Collections.emptyList(); + } + + Map colPosMap = new HashMap<>(); + List namesInRowType = rowType.getFieldNames(); + for (int i = 0; i < namesInRowType.size(); i++) { + colPosMap.putIfAbsent(namesInRowType.get(i), i); + } + + ArrayList sorted = new ArrayList<>(writeCols); + sorted.sort( + (a, b) -> { + int ia = colPosMap.getOrDefault(a, Integer.MAX_VALUE); + int ib = colPosMap.getOrDefault(b, Integer.MAX_VALUE); + if (ia != ib) { + return Integer.compare(ia, ib); + } + return a.compareTo(b); + }); + + return sorted; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java index d2f3dc8851ef..fe66f00fd1e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java @@ -30,6 +30,7 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** Utils for row tracking commit. */ public class RowTrackingCommitUtils { @@ -68,6 +69,7 @@ private static long assignRowTrackingMeta( long start = firstRowIdStart; long blobStartDefault = firstRowIdStart; Map blobStarts = new HashMap<>(); + long vectorStoreStart = firstRowIdStart; for (ManifestEntry entry : deltaFiles) { Optional fileSource = entry.file().fileSource(); checkArgument( @@ -91,6 +93,15 @@ private static long assignRowTrackingMeta( } rowIdAssigned.add(entry.assignFirstRowId(blobStart)); blobStarts.put(blobFieldName, blobStart + rowCount); + } else if (isVectorStoreFile(entry.file().fileName())) { + if (vectorStoreStart >= start) { + throw new IllegalStateException( + String.format( + "This is a bug, vectorStoreStart %d should be less than start %d when assigning a vector-store entry file.", + vectorStoreStart, start)); + } + rowIdAssigned.add(entry.assignFirstRowId(vectorStoreStart)); + vectorStoreStart += rowCount; } else { rowIdAssigned.add(entry.assignFirstRowId(start)); blobStartDefault = start; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index f4cb16a018ce..6f05068d8302 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -43,10 +43,12 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.utils.VectorStoreUtils; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -156,7 +158,17 @@ public static void validateTableSchema(TableSchema schema) { FileFormat fileFormat = FileFormat.fromIdentifier(options.formatType(), new Options(schema.options())); - fileFormat.validateDataFields(BlobType.splitBlob(new RowType(schema.fields())).getLeft()); + if (VectorStoreUtils.isDifferentFormat( + FileFormat.vectorStoreFileFormat(options), fileFormat)) { + fileFormat.validateDataFields( + VectorStoreUtils.splitVectorStore( + BlobType.splitBlob(new RowType(schema.fields())).getLeft(), + options.vectorStoreFieldNames()) + .getLeft()); + } else { + fileFormat.validateDataFields( + BlobType.splitBlob(new RowType(schema.fields())).getLeft()); + } // Check column names in schema schema.fieldNames() @@ -617,6 +629,36 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) !schema.partitionKeys().contains(blobNames.get(0)), "The BLOB type column can not be part of partition keys."); } + + FileFormat vectorStoreFileFormat = FileFormat.vectorStoreFileFormat(options); + if (VectorStoreUtils.isDifferentFormat( + vectorStoreFileFormat, FileFormat.fileFormat(options))) { + List vectorStoreNames = options.vectorStoreFieldNames(); + List nonBlobNames = + BlobType.splitBlob(schema.logicalRowType()).getLeft().getFieldNames(); + checkArgument( + blobNames.stream().noneMatch(vectorStoreNames::contains), + "The vector-store columns can not be blob type."); + checkArgument( + new HashSet<>(nonBlobNames).containsAll(vectorStoreNames), + "Some of the columns specified as vector-store are unknown."); + checkArgument( + schema.partitionKeys().stream().noneMatch(vectorStoreNames::contains), + "The vector-store columns can not be part of partition keys."); + checkArgument( + nonBlobNames.size() > vectorStoreNames.size(), + "Table with vector-store must have other normal columns."); + checkArgument( + options.dataEvolutionEnabled(), + "Data evolution config must enabled for table with vector-store file format."); + + RowType vectorStoreRowType = + VectorStoreUtils.splitVectorStore( + BlobType.splitBlob(schema.logicalRowType()).getLeft(), + vectorStoreNames) + .getRight(); + vectorStoreFileFormat.validateDataFields(vectorStoreRowType); + } } private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java new file mode 100644 index 000000000000..7e8c474a776e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.List; + +/** Utils for vector-store table. */ +public class VectorStoreUtils { + public static boolean isDifferentFormat(FileFormat vectorStoreFormat, FileFormat normalFormat) { + return (vectorStoreFormat != null) + && !vectorStoreFormat + .getFormatIdentifier() + .equals(normalFormat.getFormatIdentifier()); + } + + public static boolean isVectorStoreFile(String fileName) { + return fileName.contains(".vector-store."); + } + + public static Pair splitVectorStore( + RowType rowType, List vectorStoreFieldNames) { + List allFields = rowType.getFields(); + List normalFields = new ArrayList<>(); + List vectorStoreFields = new ArrayList<>(); + + for (DataField field : allFields) { + if (vectorStoreFieldNames.contains(field.name())) { + vectorStoreFields.add(field); + } else { + normalFields.add(field); + } + } + + return Pair.of(new RowType(normalFields), new RowType(vectorStoreFields)); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index b3470cfb4522..4af226487695 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -73,6 +73,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -575,6 +576,34 @@ public void testNonSpillable() throws Exception { writer.close(); } + @Test + public void testVectorStoreSameFormatUsesRowDataWriter() throws Exception { + FileFormat format = FileFormat.fromIdentifier(AVRO, new Options()); + AppendOnlyWriter writer = + createVectorStoreWriter(1024 * 1024L, format, Collections.singletonList("name")); + writer.write(row(1, "AAA", PART)); + CommitIncrement increment = writer.prepareCommit(true); + writer.close(); + + assertThat(increment.newFilesIncrement().newFiles()).hasSize(1); + DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0); + assertThat(meta.fileName()).doesNotContain(".vector-store"); + } + + @Test + public void testAllVectorStoreColumnsUseVectorStorePath() throws Exception { + FileFormat format = FileFormat.fromIdentifier(AVRO, new Options()); + AppendOnlyWriter writer = + createVectorStoreWriter(1024 * 1024L, format, Arrays.asList("id", "name", "dt")); + writer.write(row(1, "AAA", PART)); + CommitIncrement increment = writer.prepareCommit(true); + writer.close(); + + assertThat(increment.newFilesIncrement().newFiles()).hasSize(1); + DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0); + assertThat(meta.fileName()).contains(".vector-store"); + } + private SimpleColStats initStats(Integer min, Integer max, long nullCount) { return new SimpleColStats(min, max, nullCount); } @@ -665,6 +694,58 @@ private Pair> createWriter( boolean hasIoManager, List scannedFiles, CountDownLatch latch) { + Map options = new HashMap<>(); + options.put("metadata.stats-mode", "truncate(16)"); + return createWriterBase( + targetFileSize, + null, + Collections.emptyList(), + forceCompact, + useWriteBuffer, + spillable, + hasIoManager, + scannedFiles, + compactBefore -> { + latch.await(); + return compactBefore.isEmpty() + ? Collections.emptyList() + : Collections.singletonList(generateCompactAfter(compactBefore)); + }, + options); + } + + private AppendOnlyWriter createVectorStoreWriter( + long targetFileSize, + FileFormat vectorStoreFileFormat, + List vectorStoreFieldNames) { + Map options = new HashMap<>(); + options.put("metadata.stats-mode", "truncate(16)"); + options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return createWriterBase( + targetFileSize, + vectorStoreFileFormat, + vectorStoreFieldNames, + false, + true, + false, + true, + Collections.emptyList(), + compactBefore -> Collections.emptyList(), + options) + .getKey(); + } + + private Pair> createWriterBase( + long targetFileSize, + FileFormat vectorStoreFileFormat, + List vectorStoreFieldNames, + boolean forceCompact, + boolean useWriteBuffer, + boolean spillable, + boolean hasIoManager, + List scannedFiles, + BucketedAppendCompactManager.CompactRewriter rewriter, + Map optionsMap) { FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options()); LinkedList toCompact = new LinkedList<>(scannedFiles); BucketedAppendCompactManager compactManager = @@ -677,22 +758,18 @@ private Pair> createWriter( targetFileSize, targetFileSize / 10 * 7, false, - compactBefore -> { - latch.await(); - return compactBefore.isEmpty() - ? Collections.emptyList() - : Collections.singletonList( - generateCompactAfter(compactBefore)); - }, + rewriter, null); - CoreOptions options = - new CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)")); + CoreOptions options = new CoreOptions(optionsMap); AppendOnlyWriter writer = new AppendOnlyWriter( LocalFileIO.create(), hasIoManager ? IOManager.create(tempDir.toString()) : null, SCHEMA_ID, fileFormat, + vectorStoreFileFormat, + vectorStoreFieldNames, + targetFileSize, targetFileSize, targetFileSize, AppendOnlyWriterTest.SCHEMA, diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index 3045f33b1664..6b7d94679863 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -38,7 +38,9 @@ import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.UriReader; @@ -52,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -105,7 +108,8 @@ public void testBasic() throws Exception { .collect(Collectors.toList()); List fieldGroups = - DataEvolutionSplitRead.splitFieldBunches(filesMetas, key -> 0); + DataEvolutionSplitRead.splitFieldBunches( + filesMetas, key -> makeBlobRowType(key.writeCols(), f -> 0)); assertThat(fieldGroups.size()).isEqualTo(2); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); @@ -155,7 +159,8 @@ public void testMultiBatch() throws Exception { assertThat(batches.size()).isEqualTo(2); for (List batch : batches) { List fieldGroups = - DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0); + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); assertThat(fieldGroups.size()).isEqualTo(2); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); assertThat(fieldGroups.get(1).files().size()).isEqualTo(10); @@ -249,6 +254,20 @@ protected InternalRow dataDefault(int time, int size) { RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new BlobData(blobBytes)); } + private static RowType makeBlobRowType( + List fieldNames, Function fieldIdFunc) { + List fields = new ArrayList<>(); + if (fieldNames == null) { + fieldNames = Collections.emptyList(); + } + for (String fieldName : fieldNames) { + int fieldId = fieldIdFunc.apply(fieldName); + DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB()); + fields.add(blobField); + } + return new RowType(fields); + } + @Override protected byte[] randomBytes() { byte[] binary = new byte[2 * 1024 * 124]; diff --git a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java similarity index 95% rename from paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java rename to paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java index 0afe95eef066..113bc3f86ed9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java @@ -43,13 +43,14 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Random; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link RollingBlobFileWriter}. */ -public class RollingBlobFileWriterTest { +/** Tests for {@link DataEvolutionRollingFileWriter}. */ +public class DataEvolutionRollingFileWriterTest { private static final RowType SCHEMA = RowType.builder() @@ -64,7 +65,7 @@ public class RollingBlobFileWriterTest { @TempDir java.nio.file.Path tempDir; - private RollingBlobFileWriter writer; + private DataEvolutionRollingFileWriter writer; private DataFilePathFactory pathFactory; private LongCounter seqNumCounter; private byte[] testBlobData; @@ -91,10 +92,13 @@ public void setUp() throws IOException { // Initialize the writer writer = - new RollingBlobFileWriter( + new DataEvolutionRollingFileWriter( fileIO, SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, + Collections.emptyList(), + TARGET_FILE_SIZE, TARGET_FILE_SIZE, TARGET_FILE_SIZE, SCHEMA, @@ -182,13 +186,16 @@ public void testBlobTargetFileSize() throws IOException { long blobTargetFileSize = 500 * 1024 * 1024L; // 2 MB for blob files // Create a new writer with different blob target file size - RollingBlobFileWriter blobSizeTestWriter = - new RollingBlobFileWriter( + DataEvolutionRollingFileWriter blobSizeTestWriter = + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, + Collections.emptyList(), 128 * 1024 * 1024, blobTargetFileSize, // Different blob target size + 128 * 1024 * 1024, SCHEMA, new DataFilePathFactory( new Path(tempDir + "/blob-size-test"), @@ -267,13 +274,16 @@ public void testSchemaValidation() throws IOException { void testBlobFileNameFormatWithSharedUuid() throws IOException { long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files - RollingBlobFileWriter fileNameTestWriter = - new RollingBlobFileWriter( + DataEvolutionRollingFileWriter fileNameTestWriter = + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, + Collections.emptyList(), 128 * 1024 * 1024, blobTargetFileSize, + 128 * 1024 * 1024, SCHEMA, pathFactory, // Use the same pathFactory to ensure shared UUID () -> new LongCounter(), @@ -346,13 +356,16 @@ void testBlobFileNameFormatWithSharedUuid() throws IOException { void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws IOException { long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files - RollingBlobFileWriter fileNameTestWriter = - new RollingBlobFileWriter( + DataEvolutionRollingFileWriter fileNameTestWriter = + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, + Collections.emptyList(), 128 * 1024 * 1024, blobTargetFileSize, + 128 * 1024 * 1024, SCHEMA, pathFactory, // Use the same pathFactory to ensure shared UUID () -> new LongCounter(), @@ -565,10 +578,13 @@ void testBlobStatsSchemaWithCustomColumnName() throws IOException { // Reinitialize writer with custom schema writer = - new RollingBlobFileWriter( + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, + Collections.emptyList(), + TARGET_FILE_SIZE, TARGET_FILE_SIZE, TARGET_FILE_SIZE, customSchema, // Use custom schema diff --git a/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java new file mode 100644 index 000000000000..04c7119b08f1 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.blob.BlobFileFormat; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.StatsCollectorFactories; +import org.apache.paimon.utils.VectorStoreUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DataEvolutionRollingFileWriter} with vector-store. */ +public class DataEvolutionRollingFileWriterWithVectorStoreTest { + + private static final int VECTOR_DIM = 10; + private static final RowType SCHEMA = + RowType.builder() + .field("f0", DataTypes.INT()) + .field("f1", DataTypes.STRING()) + .field("f2", DataTypes.BLOB()) + .field("f3", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())) + .field("f4", DataTypes.INT()) + .build(); + + private static final long TARGET_FILE_SIZE = 2 * 1024 * 1024L; // 2 MB + private static final long VECTOR_STORE_TARGET_FILE_SIZE = 4 * 1024 * 1024L; // 4 MB + private static final long SCHEMA_ID = 1L; + private static final String COMPRESSION = "none"; + private static final Random RANDOM = new Random(System.currentTimeMillis()); + + @TempDir java.nio.file.Path tempDir; + + private DataEvolutionRollingFileWriter writer; + private DataFilePathFactory pathFactory; + private LongCounter seqNumCounter; + + @BeforeEach + public void setUp() throws IOException { + // Setup file system and path factory + LocalFileIO fileIO = LocalFileIO.create(); + pathFactory = + new DataFilePathFactory( + new Path(tempDir + "/bucket-0"), + "parquet", + "data-", // dataFilePrefix should include the hyphen to match expected + // format: data-{uuid}-{count} + "changelog", + false, + null, + null); + seqNumCounter = new LongCounter(); + + // Initialize the writer + writer = + new DataEvolutionRollingFileWriter( + fileIO, + SCHEMA_ID, + FileFormat.fromIdentifier("parquet", new Options()), + FileFormat.fromIdentifier("json", new Options()), + Arrays.asList("f3", "f4"), + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + VECTOR_STORE_TARGET_FILE_SIZE, + SCHEMA, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(new Options())), + new FileIndexOptions(), + FileSource.APPEND, + false, + false, + null); + } + + @Test + public void testBasicWriting() throws IOException { + // Write a single row + writer.write(makeRows(1, 10).get(0)); + assertThat(writer.recordCount()).isEqualTo(1); + } + + @Test + public void testMultipleWrites() throws Exception { + // Write multiple rows + int rowNum = RANDOM.nextInt(64) + 1; + writer.write(makeRows(rowNum, 10).iterator()); + writer.close(); + List metasResult = writer.result(); + + assertThat(metasResult.size()).isEqualTo(3); // blob is small, normal/blob/vector 3 files + assertThat(metasResult.get(0).fileFormat()).isEqualTo("parquet"); + assertThat(metasResult.get(1).fileFormat()).isEqualTo("blob"); + assertThat(metasResult.get(2).fileFormat()).isEqualTo("json"); + assertThat(writer.recordCount()).isEqualTo(rowNum); + + assertThat(metasResult.get(0).rowCount()).isEqualTo(metasResult.get(1).rowCount()); + assertThat(metasResult.get(0).rowCount()).isEqualTo(metasResult.get(2).rowCount()); + } + + @Test + public void testVectorStoreTargetFileSize() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000; + writer.write(makeRows(rowNum, 1).iterator()); + writer.close(); + List results = writer.result(); + + // Verify that we have multiple files due to rolling + assertThat(results.size()).isGreaterThan(1); + + // Check that vector-store files meet the target size requirement + List vectorStoreFiles = + results.stream() + .filter(file -> VectorStoreUtils.isVectorStoreFile(file.fileName())) + .collect(java.util.stream.Collectors.toList()); + + assertThat(vectorStoreFiles.size()).isEqualTo(3); + + // Verify that vector-store files are close to the target size (within reasonable tolerance) + for (DataFileMeta file : vectorStoreFiles.subList(0, vectorStoreFiles.size() - 1)) { + long fileSize = file.fileSize(); + assertThat(fileSize) + .as("Vector-store file size should be close to target size") + .isGreaterThanOrEqualTo(VECTOR_STORE_TARGET_FILE_SIZE) + .isLessThanOrEqualTo(VECTOR_STORE_TARGET_FILE_SIZE + 256 * 1024); + } + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + @Test + void testVectorStoreFileNameFormatWithSharedUuid() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000; + writer.write(makeRows(rowNum, 1).iterator()); + writer.close(); + List results = writer.result(); + + // Get uuid from vector-store files. The pattern is data-{uuid}-{count}.vector-store.json + DataFileMeta oneVectorStoreFile = + results.stream() + .filter(file -> VectorStoreUtils.isVectorStoreFile(file.fileName())) + .findAny() + .get(); + String uuidAndCnt = oneVectorStoreFile.fileName().split(".vector-store.")[0]; + String prefix = uuidAndCnt.substring(0, uuidAndCnt.lastIndexOf('-') + 1); // data-{uuid}- + + // Verify all files use the same UUID and have sequential counters + for (int i = 0; i < results.size(); ++i) { + String fileName = results.get(i).fileName(); + assertThat(fileName).as("All files should use the same UUID").startsWith(prefix); + int counter = Integer.parseInt(fileName.substring(prefix.length()).split("\\.")[0]); + assertThat(counter).as("File counter should be sequential").isEqualTo(i); + } + } + + @Test + void testVectorStoreStatsMainPart() throws Exception { + // Write multiple rows + int rowNum = RANDOM.nextInt(64) + 1; + writer.write(makeRows(rowNum, 10).iterator()); + writer.close(); + List metasResult = writer.result(); + + // Check row count + for (DataFileMeta file : metasResult) { + assertThat(file.rowCount()).isEqualTo(rowNum); + assertThat(file.deleteRowCount().get()).isEqualTo(0); // There is no deleted rows + } + + // Check statistics + for (DataFileMeta file : metasResult) { + if (BlobFileFormat.isBlobFile(file.fileName())) { + assertThat(file.writeCols()).isEqualTo(Collections.singletonList("f2")); + } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f3", "f4")); + // Json does not implement createStatsExtractor so we skip it here. + // assertThat(file.valueStats().minValues().getInt(1)).isGreaterThan(0); + } else { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1")); + assertThat(file.valueStats().minValues().getInt(0)).isEqualTo(0); + assertThat(file.valueStats().maxValues().getInt(0)).isEqualTo(rowNum - 1); + } + } + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + @Test + void testVectorStoreStatsVectorStorePart() throws Exception { + // This time we set parquet as vector-store file format. + RowType schema = + RowType.builder() + .field("f0", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())) + .field("f1", DataTypes.INT()) + .field("f2", DataTypes.BLOB()) + .field("f3", DataTypes.INT()) + .field("f4", DataTypes.STRING()) + .build(); + writer = + new DataEvolutionRollingFileWriter( + LocalFileIO.create(), + SCHEMA_ID, + FileFormat.fromIdentifier("json", new Options()), + FileFormat.fromIdentifier("parquet", new Options()), + Arrays.asList("f3", "f4"), + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + VECTOR_STORE_TARGET_FILE_SIZE, + schema, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(new Options())), + new FileIndexOptions(), + FileSource.APPEND, + false, + false, + null); + + // Write multiple rows + int rowNum = RANDOM.nextInt(64) + 1; + List rows = makeRows(rowNum, 10); + for (InternalRow row : rows) { + writer.write( + GenericRow.of( + row.getVector(3), + row.getInt(4), + row.getBlob(2), + row.getInt(0), + row.getString(1))); + } + writer.close(); + List metasResult = writer.result(); + + // Check row count + for (DataFileMeta file : metasResult) { + assertThat(file.rowCount()).isEqualTo(rowNum); + assertThat(file.deleteRowCount().get()).isEqualTo(0); // There is no deleted rows + } + + // Check statistics + for (DataFileMeta file : metasResult) { + if (BlobFileFormat.isBlobFile(file.fileName())) { + assertThat(file.writeCols()).isEqualTo(Collections.singletonList("f2")); + } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f3", "f4")); + assertThat(file.valueStats().minValues().getInt(0)).isEqualTo(0); + assertThat(file.valueStats().maxValues().getInt(0)).isEqualTo(rowNum - 1); + } else { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1")); + // Json does not implement createStatsExtractor so we skip it here. + // assertThat(file.valueStats().minValues().getInt(1)).isGreaterThan(0); + } + } + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + @Test + public void testVectorStoreNoBlob() throws Exception { + RowType schema = + RowType.builder() + .field("f0", DataTypes.INT()) + .field("f1", DataTypes.STRING()) + .field("f2", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())) + .field("f3", DataTypes.INT()) + .build(); + writer = + new DataEvolutionRollingFileWriter( + LocalFileIO.create(), + SCHEMA_ID, + FileFormat.fromIdentifier("parquet", new Options()), + FileFormat.fromIdentifier("json", new Options()), + Arrays.asList("f2", "f3"), + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + VECTOR_STORE_TARGET_FILE_SIZE, + schema, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(new Options())), + new FileIndexOptions(), + FileSource.APPEND, + false, + false, + null); + + // 100k vector-store data would create 1 normal and 3 vector-store files + int rowNum = 100 * 1000; + List rows = makeRows(rowNum, 1); + for (InternalRow row : rows) { + writer.write( + GenericRow.of( + row.getInt(0), row.getString(1), row.getVector(3), row.getInt(4))); + } + writer.close(); + List results = writer.result(); + + // Check normal, blob, and vector-store files + List normalFiles = new ArrayList<>(); + List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); + for (DataFileMeta file : results) { + if (BlobFileFormat.isBlobFile(file.fileName())) { + blobFiles.add(file); + } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) { + vectorStoreFiles.add(file); + } else { + normalFiles.add(file); + } + } + assertThat(normalFiles.size()).isEqualTo(1); + assertThat(blobFiles.size()).isEqualTo(0); + assertThat(vectorStoreFiles.size()).isEqualTo(3); + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + @Test + public void testVectorStoreTheSameFormat() throws Exception { + // vector-store file format is the same as main part + writer = + new DataEvolutionRollingFileWriter( + LocalFileIO.create(), + SCHEMA_ID, + FileFormat.fromIdentifier("json", new Options()), + FileFormat.fromIdentifier("json", new Options()), + Arrays.asList("f3", "f4"), + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + VECTOR_STORE_TARGET_FILE_SIZE, + SCHEMA, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(new Options())), + new FileIndexOptions(), + FileSource.APPEND, + false, + false, + null); + + // This time we use large blob files + int rowNum = 10; + writer.write(makeRows(rowNum, 512 * 1024).iterator()); + writer.close(); + List results = writer.result(); + + // Check normal, blob, and vector-store files + List normalFiles = new ArrayList<>(); + List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); + for (DataFileMeta file : results) { + if (BlobFileFormat.isBlobFile(file.fileName())) { + blobFiles.add(file); + } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) { + vectorStoreFiles.add(file); + } else { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1", "f3", "f4")); + normalFiles.add(file); + } + } + assertThat(normalFiles.size()).isEqualTo(1); + assertThat(blobFiles.size()).isEqualTo(3); + assertThat(vectorStoreFiles.size()).isEqualTo(0); + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + private List makeRows(int rowNum, int blobDataSize) { + List rows = new ArrayList<>(rowNum); + byte[] blobData = new byte[blobDataSize]; + RANDOM.nextBytes(blobData); + for (int i = 0; i < rowNum; ++i) { + byte[] string = new byte[1]; + RANDOM.nextBytes(string); + byte[] buf = new byte[VECTOR_DIM]; + RANDOM.nextBytes(buf); + float[] vector = new float[VECTOR_DIM]; + for (int j = 0; j < VECTOR_DIM; ++j) { + vector[j] = buf[j]; + } + int label = RANDOM.nextInt(32) + 1; + rows.add( + GenericRow.of( + i, + BinaryString.fromBytes(string), + new BlobData(blobData), + BinaryVector.fromPrimitiveArray(vector), + label)); + } + return rows; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java index 97f621a72024..4b6d1c579a9c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java @@ -62,9 +62,7 @@ public void testBasic() throws Exception { .collect(Collectors.toList()); RowType rowType = table.schema().logicalRowType(); - List fieldGroups = - splitFieldBunches( - filesMetas, file -> rowType.getField(file.writeCols().get(0)).id()); + List fieldGroups = splitFieldBunches(filesMetas, file -> rowType); assertThat(fieldGroups.size()).isEqualTo(3); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java new file mode 100644 index 000000000000..8bc7bad2d8c4 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.DataEvolutionSplitRead; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Tests for table with vector-store and data evolution. */ +public class VectorStoreTableTest extends TableTestBase { + + private static final int VECTOR_DIM = 10; + + private AtomicInteger uniqueIdGen = new AtomicInteger(0); + + private Map rowsWritten = new HashMap<>(); + + @Test + public void testBasic() throws Exception { + int rowNum = RANDOM.nextInt(64) + 1; + + createTableDefault(); + + commitDefault(writeDataDefault(rowNum, 1)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + filesMetas, key -> makeBlobRowType(key.writeCols(), f -> 0)); + + assertThat(fieldGroups.size()).isEqualTo(3); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(2).files().size()).isEqualTo(1); + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2)); + assertThat(row.getVector(3).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(4)).isEqualTo(expected.getInt(4)); + }); + + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Test + public void testMultiBatch() throws Exception { + int rowNum = (RANDOM.nextInt(64) + 1) * 2; + + createTableDefault(); + + commitDefault(writeDataDefault(rowNum / 2, 2)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas); + assertThat(batches.size()).isEqualTo(2); + for (List batch : batches) { + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); + assertThat(fieldGroups.size()).isEqualTo(3); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(2).files().size()).isEqualTo(1); + } + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2)); + assertThat(row.getVector(3).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(4)).isEqualTo(expected.getInt(4)); + }); + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Test + public void testRolling() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000 * 3; + + createTableDefault(); + + commitDefault(writeDataDefault(rowNum / 3, 3)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas); + assertThat(batches.size()).isEqualTo(3); + for (List batch : batches) { + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); + assertThat(fieldGroups.size()).isEqualTo(3); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(2).files().size()).isEqualTo(3); + } + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2)); + assertThat(row.getVector(3).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(4)).isEqualTo(expected.getInt(4)); + }); + + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Test + public void testWithoutBlob() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000 * 3; + + catalog.createTable(identifier(), schemaWithoutBlob(), true); + + commitDefault(writeDataWithoutBlob(rowNum / 3, 3)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas); + assertThat(batches.size()).isEqualTo(3); + for (List batch : batches) { + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); + assertThat(fieldGroups.size()).isEqualTo(2); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(3); + } + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getVector(2).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(3)).isEqualTo(expected.getInt(4)); + }); + + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Override + protected Schema schemaDefault() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.column("f3", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())); + schemaBuilder.column("f4", DataTypes.INT()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB"); + schemaBuilder.option(CoreOptions.VECTOR_STORE_TARGET_FILE_SIZE.key(), "4 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.VECTOR_STORE_FIELDS.key(), "f3,f4"); + schemaBuilder.option(CoreOptions.VECTOR_STORE_FORMAT.key(), "json"); + schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none"); + return schemaBuilder.build(); + } + + private Schema schemaWithoutBlob() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())); + schemaBuilder.column("f3", DataTypes.INT()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB"); + schemaBuilder.option(CoreOptions.VECTOR_STORE_TARGET_FILE_SIZE.key(), "4 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.VECTOR_STORE_FIELDS.key(), "f2,f3"); + schemaBuilder.option(CoreOptions.VECTOR_STORE_FORMAT.key(), "json"); + schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none"); + return schemaBuilder.build(); + } + + protected List writeDataWithoutBlob(int size, int times) throws Exception { + Table table = getTableDefault(); + List messages = new ArrayList<>(); + for (int time = 0; time < times; time++) { + StreamWriteBuilder builder = table.newStreamWriteBuilder(); + builder.withCommitUser(commitUser); + try (StreamTableWrite streamTableWrite = builder.newWrite()) { + for (int j = 0; j < size; j++) { + InternalRow row = dataDefault(time, j); + streamTableWrite.write( + GenericRow.of( + row.getInt(0), + row.getString(1), + row.getVector(3), + row.getInt(4))); + } + messages.addAll(streamTableWrite.prepareCommit(false, Long.MAX_VALUE)); + } + } + return messages; + } + + @Override + protected InternalRow dataDefault(int time, int size) { + byte[] stringBytes = new byte[1]; + RANDOM.nextBytes(stringBytes); + byte[] blobBytes = new byte[1]; + RANDOM.nextBytes(blobBytes); + byte[] vectorBytes = new byte[VECTOR_DIM]; + RANDOM.nextBytes(vectorBytes); + float[] vector = new float[VECTOR_DIM]; + for (int i = 0; i < VECTOR_DIM; i++) { + vector[i] = vectorBytes[i]; + } + int id = uniqueIdGen.getAndIncrement(); + InternalRow row = + GenericRow.of( + id, + BinaryString.fromBytes(stringBytes), + new BlobData(blobBytes), + BinaryVector.fromPrimitiveArray(vector), + RANDOM.nextInt(32) + 1); + rowsWritten.put(id, row); + return row; + } + + private static RowType makeBlobRowType( + List fieldNames, Function fieldIdFunc) { + List fields = new ArrayList<>(); + if (fieldNames == null) { + fieldNames = Collections.emptyList(); + } + for (String fieldName : fieldNames) { + int fieldId = fieldIdFunc.apply(fieldName); + DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB()); + fields.add(blobField); + } + return new RowType(fields); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 1edf91e1d22d..ed314e80292b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -271,6 +271,9 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception IOManager.create(tempDir.toString()), 0, fileFormat, + null, + Collections.emptyList(), + 10, 10, 10, schema, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java index 452813d9d93c..19d53f45309b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java @@ -21,9 +21,12 @@ import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileSource; -import org.apache.paimon.operation.DataEvolutionSplitRead.BlobBunch; import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch; +import org.apache.paimon.operation.DataEvolutionSplitRead.SplitBunch; import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,20 +35,21 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Function; import static org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link BlobBunch}. */ +/** Tests for {@link SplitBunch}. */ public class DataEvolutionReadTest { - private BlobBunch blobBunch; + private SplitBunch blobBunch; @BeforeEach public void setUp() { - blobBunch = new BlobBunch(Long.MAX_VALUE, false); + blobBunch = new SplitBunch(Long.MAX_VALUE, false); } @Test @@ -84,7 +88,7 @@ public void testAddNonBlobFileThrowsException() { assertThatThrownBy(() -> blobBunch.add(normalFile)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Only blob file can be added to a blob bunch."); + .hasMessage("Only blob/vector-store file can be added to this bunch."); } @Test @@ -97,7 +101,7 @@ public void testAddBlobFileWithSameFirstRowId() { assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob file with same first row id should have decreasing sequence number."); + "Blob/vector-store file with same first row id should have decreasing sequence number."); } @Test @@ -136,7 +140,7 @@ public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() { assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob file with overlapping row id should have decreasing sequence number."); + "Blob/vector-store file with overlapping row id should have decreasing sequence number."); } @Test @@ -148,7 +152,8 @@ public void testAddBlobFileWithNonContinuousRowId() { // Adding file with non-continuous row id should throw exception assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Blob file first row id should be continuous, expect 100 but got 200"); + .hasMessage( + "Blob/vector-store file first row id should be continuous, expect 100 but got 200"); } @Test @@ -161,7 +166,7 @@ public void testAddBlobFileWithDifferentWriteCols() { // Adding file with different write columns should throw exception assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("All files in a blob bunch should have the same write columns."); + .hasMessage("All files in this bunch should have the same write columns."); } @Test @@ -214,10 +219,11 @@ public void testComplexBlobBunchScenario2() { assertThat(batch.get(8).fileName()).contains("blob4"); // skip assertThat(batch.get(9).fileName()).contains("blob8"); // pick - List fieldBunches = splitFieldBunches(batch, file -> 0); + List fieldBunches = + splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); assertThat(fieldBunches.size()).isEqualTo(2); - BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1); + SplitBunch blobBunch = (SplitBunch) fieldBunches.get(1); assertThat(blobBunch.files).hasSize(4); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); @@ -265,17 +271,18 @@ public void testComplexBlobBunchScenario3() { List batch = batches.get(0); List fieldBunches = - splitFieldBunches(batch, file -> file.writeCols().get(0).hashCode()); + splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), String::hashCode)); assertThat(fieldBunches.size()).isEqualTo(3); - BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1); + SplitBunch blobBunch = (SplitBunch) fieldBunches.get(1); assertThat(blobBunch.files).hasSize(4); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); assertThat(blobBunch.files.get(2).fileName()).contains("blob7"); assertThat(blobBunch.files.get(3).fileName()).contains("blob8"); - blobBunch = (BlobBunch) fieldBunches.get(2); + blobBunch = (SplitBunch) fieldBunches.get(2); assertThat(blobBunch.files).hasSize(4); assertThat(blobBunch.files.get(0).fileName()).contains("blob15"); assertThat(blobBunch.files.get(1).fileName()).contains("blob19"); @@ -322,22 +329,22 @@ private DataFileMeta createBlobFileWithCols( @Test public void testRowIdPushDown() { - BlobBunch blobBunch = new BlobBunch(Long.MAX_VALUE, true); + SplitBunch blobBunch = new SplitBunch(Long.MAX_VALUE, true); DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1); blobBunch.add(blobEntry1); - BlobBunch finalBlobBunch = blobBunch; + SplitBunch finalBlobBunch = blobBunch; DataFileMeta finalBlobEntry = blobEntry2; assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException(); - blobBunch = new BlobBunch(Long.MAX_VALUE, true); + blobBunch = new SplitBunch(Long.MAX_VALUE, true); blobEntry1 = createBlobFile("blob1", 0, 100, 1); blobEntry2 = createBlobFile("blob2", 50, 200, 2); blobBunch.add(blobEntry1); blobBunch.add(blobEntry2); assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2); - BlobBunch finalBlobBunch2 = blobBunch; + SplitBunch finalBlobBunch2 = blobBunch; DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2); assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException(); } @@ -371,4 +378,18 @@ private DataFileMeta createNormalFile( firstRowId, null); } + + private static RowType makeBlobRowType( + List fieldNames, Function fieldIdFunc) { + List fields = new ArrayList<>(); + if (fieldNames == null) { + fieldNames = Collections.emptyList(); + } + for (String fieldName : fieldNames) { + int fieldId = fieldIdFunc.apply(fieldName); + DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB()); + fields.add(blobField); + } + return new RowType(fields); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java index 2f0b1b2f1b4d..305c5c2b78f9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java @@ -115,6 +115,24 @@ public void testSplitWithMultipleBlobFilesPerGroup() { assertEquals(Arrays.asList(file4, file5, file6), result.get(1)); } + @Test + public void testSplitWithMultipleVectorStoreFilesPerGroup() { + DataFileMeta file1 = createFile("file1.parquet", 1L, 10, 1); + DataFileMeta file2 = createFile("file2.vector-store.json", 1L, 1, 1); + DataFileMeta file3 = createFile("file3.vector-store.json", 2L, 9, 1); + DataFileMeta file4 = createFile("file4.parquet", 20L, 10, 2); + DataFileMeta file5 = createFile("file5.vector-store.json", 20L, 5, 2); + DataFileMeta file6 = createFile("file6.vector-store.json", 25L, 5, 2); + DataFileMeta file7 = createFile("file7.parquet", 1L, 10, 3); + + List files = Arrays.asList(file1, file2, file3, file4, file5, file6, file7); + List> result = DataEvolutionSplitRead.mergeRangesAndSort(files); + + assertEquals(2, result.size()); + assertEquals(Arrays.asList(file7, file1, file2, file3), result.get(0)); + assertEquals(Arrays.asList(file4, file5, file6), result.get(1)); + } + private static DataFileMeta createFile( String name, long firstRowId, long rowCount, long maxSequence) { return DataFileMeta.create( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 9b789e246be2..4ced9061c546 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -35,7 +35,10 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.paimon.CoreOptions.BUCKET; +import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; +import static org.apache.paimon.CoreOptions.VECTOR_STORE_FIELDS; +import static org.apache.paimon.CoreOptions.VECTOR_STORE_FORMAT; import static org.apache.paimon.schema.SchemaValidation.validateTableSchema; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatNoException; @@ -200,4 +203,142 @@ public void testChainTableAllowsNonDeduplicateMergeEngine() { assertThatNoException().isThrownBy(() -> validateTableSchema(schema)); } + + @Test + public void testVectorStoreUnknownColumn() { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_STORE_FORMAT.key(), "json"); + options.put(VECTOR_STORE_FIELDS.key(), "f99"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.STRING())); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage("Some of the columns specified as vector-store are unknown."); + } + + @Test + public void testVectorStoreContainsBlobColumn() { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_STORE_FORMAT.key(), "json"); + options.put(VECTOR_STORE_FIELDS.key(), "blob"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "blob", DataTypes.BLOB())); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage("The vector-store columns can not be blob type."); + } + + @Test + public void testVectorStoreContainsPartitionColumn() { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_STORE_FORMAT.key(), "json"); + options.put(VECTOR_STORE_FIELDS.key(), "f1"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.STRING())); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + singletonList("f1"), + emptyList(), + options, + ""))) + .hasMessage("The vector-store columns can not be part of partition keys."); + } + + @Test + public void testVectorStoreRequireNormalColumns() { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_STORE_FORMAT.key(), "json"); + options.put(VECTOR_STORE_FIELDS.key(), "f0,f1"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.STRING())); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage("Table with vector-store must have other normal columns."); + } + + @Test + public void testVectorStoreRequiresDataEvolutionEnabled() { + Map options = new HashMap<>(); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_STORE_FORMAT.key(), "json"); + options.put(VECTOR_STORE_FIELDS.key(), "f1"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.STRING())); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage( + "Data evolution config must enabled for table with vector-store file format."); + } } diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java new file mode 100644 index 000000000000..be89fbfca155 --- /dev/null +++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tests; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** E2E test for vector-store with data evolution. */ +public class FlinkVectorStoreE2eTest extends E2eTestBase { + + @Test + public void testVectorStoreTable() throws Exception { + Random rnd = new Random(System.currentTimeMillis()); + int vectorDim = rnd.nextInt(10) + 1; + final int itemNum = rnd.nextInt(3) + 1; + + String catalogDdl = + String.format( + "CREATE CATALOG ts_catalog WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '%s'\n" + + ");", + TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store"); + + String useCatalogCmd = "USE CATALOG ts_catalog;"; + + String createTableDdl = + String.format( + "CREATE TABLE IF NOT EXISTS ts_table (\n" + + " id BIGINT,\n" + + " embed ARRAY\n" + + ") WITH (\n" + + " 'file.format' = 'parquet',\n" + + " 'file.compression' = 'none',\n" + + " 'row-tracking.enabled' = 'true',\n" + + " 'data-evolution.enabled' = 'true',\n" + + " 'vector.file.format' = 'json',\n" + + " 'vector-field' = 'embed',\n" + + " 'field.embed.vector-dim' = '%d'\n" + + ");", + vectorDim); + + float[][] vectors = new float[itemNum][vectorDim]; + byte[] vectorDataBuf = new byte[vectorDim]; + for (int i = 0; i < itemNum; ++i) { + vectors[i] = new float[vectorDim]; + rnd.nextBytes(vectorDataBuf); + for (int j = 0; j < vectorDim; ++j) { + vectors[i][j] = vectorDataBuf[j]; + } + } + + List values = new ArrayList<>(); + String[] expected = new String[itemNum]; + for (int id = 0; id < itemNum; ++id) { + values.add(String.format("(%d, %s)", id, arrayLiteral(vectors[id]))); + expected[id] = String.format("%d, %s", id, Arrays.toString(vectors[id])); + } + + runBatchSql( + "INSERT INTO ts_table VALUES " + String.join(", ", values) + ";", + catalogDdl, + useCatalogCmd, + createTableDdl); + + runBatchSql( + "INSERT INTO result1 SELECT * FROM ts_table;", + catalogDdl, + useCatalogCmd, + createTableDdl, + createResultSink("result1", "id BIGINT, embed ARRAY")); + checkResult(expected); + clearCurrentResults(); + + runBatchSql( + "INSERT INTO result2 SELECT " + + "COUNT(*) AS total, " + + "SUM(CASE WHEN file_path LIKE '%.vector-store%.json' THEN 1 ELSE 0 END) " + + "AS vector_files " + + "FROM \\`ts_table\\$files\\`;", + catalogDdl, + useCatalogCmd, + createTableDdl, + createResultSink("result2", "total BIGINT, vector_files BIGINT")); + checkResult("2, 1"); + } + + private String arrayLiteral(float[] vector) { + return "ARRAY" + Arrays.toString(vector); + } +}