diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 21cd3f70554e..873131561526 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1452,6 +1452,24 @@
String |
The Variant shredding schema for writing. |
+
+ vector-field |
+ (none) |
+ String |
+ Specify the vector store fields. |
+
+
+ vector.file.format |
+ (none) |
+ String |
+ Specify the vector store file format. |
+
+
+ vector.target-file-size |
+ (none) |
+ MemorySize |
+ Target size of a vector-store file. Default is 10 * TARGET_FILE_SIZE. |
+
visibility-callback.check-interval |
10 s |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ee3f8dd7e215..922eb1deba65 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2230,6 +2230,29 @@ public InlineElement getDescription() {
.withDescription(
"The interval for checking visibility when visibility-callback enabled.");
+ public static final ConfigOption VECTOR_STORE_FORMAT =
+ key("vector.file.format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Specify the vector store file format.");
+
+ public static final ConfigOption VECTOR_STORE_FIELDS =
+ key("vector-field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Specify the vector store fields.");
+
+ public static final ConfigOption VECTOR_STORE_TARGET_FILE_SIZE =
+ key("vector.target-file-size")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Target size of a vector-store file."
+ + " Default is 10 * TARGET_FILE_SIZE.")
+ .build());
+
private final Options options;
public CoreOptions(Map options) {
@@ -3469,6 +3492,26 @@ public Duration visibilityCallbackCheckInterval() {
return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL);
}
+ public String vectorStoreFileFormatString() {
+ return normalizeFileFormat(options.get(VECTOR_STORE_FORMAT));
+ }
+
+ public List vectorStoreFieldNames() {
+ String vectorStoreFields = options.get(CoreOptions.VECTOR_STORE_FIELDS);
+ if (vectorStoreFields == null || vectorStoreFields.trim().isEmpty()) {
+ return new ArrayList<>();
+ } else {
+ return Arrays.asList(vectorStoreFields.split(","));
+ }
+ }
+
+ public long vectorStoreTargetFileSize() {
+ // Since vectors are large, it would be better to set a larger target size for vectors.
+ return options.getOptional(VECTOR_STORE_TARGET_FILE_SIZE)
+ .map(MemorySize::getBytes)
+ .orElse(10 * targetFileSize(false));
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 90f52099b4f9..c150269d04f6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -107,6 +107,17 @@ public static FileFormat fileFormat(CoreOptions options) {
return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration());
}
+ public static FileFormat vectorStoreFileFormat(CoreOptions options) {
+ if (options.vectorStoreFieldNames().isEmpty()) {
+ return null;
+ }
+ String vectorStoreFileFormat = options.vectorStoreFileFormatString();
+ if (vectorStoreFileFormat == null) {
+ return fileFormat(options);
+ }
+ return FileFormat.fromIdentifier(vectorStoreFileFormat, options.toConfiguration());
+ }
+
public static FileFormat manifestFormat(CoreOptions options) {
return FileFormat.fromIdentifier(options.manifestFormatString(), options.toConfiguration());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index cd7acb4a3390..92e8e60c2e14 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -52,6 +52,7 @@
import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
import org.apache.paimon.utils.StatsCollectorFactories;
+import org.apache.paimon.utils.VectorStoreUtils;
import javax.annotation.Nullable;
@@ -73,8 +74,11 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
private final FileIO fileIO;
private final long schemaId;
private final FileFormat fileFormat;
+ private final FileFormat vectorStoreFileFormat;
+ private final List vectorStoreFieldNames;
private final long targetFileSize;
private final long blobTargetFileSize;
+ private final long vectorStoreTargetFileSize;
private final RowType writeSchema;
@Nullable private final List writeCols;
private final DataFilePathFactory pathFactory;
@@ -105,8 +109,11 @@ public AppendOnlyWriter(
@Nullable IOManager ioManager,
long schemaId,
FileFormat fileFormat,
+ FileFormat vectorStoreFileFormat,
+ List vectorStoreFieldNames,
long targetFileSize,
long blobTargetFileSize,
+ long vectorStoreTargetFileSize,
RowType writeSchema,
@Nullable List writeCols,
long maxSequenceNumber,
@@ -129,8 +136,11 @@ public AppendOnlyWriter(
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
+ this.vectorStoreFileFormat = vectorStoreFileFormat;
+ this.vectorStoreFieldNames = vectorStoreFieldNames;
this.targetFileSize = targetFileSize;
this.blobTargetFileSize = blobTargetFileSize;
+ this.vectorStoreTargetFileSize = vectorStoreTargetFileSize;
this.writeSchema = writeSchema;
this.writeCols = writeCols;
this.pathFactory = pathFactory;
@@ -304,13 +314,25 @@ public void toBufferedWriter() throws Exception {
}
private RollingFileWriter createRollingRowWriter() {
- if (writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB))) {
- return new RollingBlobFileWriter(
+ boolean hasNormal =
+ writeSchema.getFields().stream()
+ .anyMatch(
+ f ->
+ !f.type().is(BLOB)
+ && !vectorStoreFieldNames.contains(f.name()));
+ boolean hasBlob = writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB));
+ boolean hasSeparatedVectorStore =
+ VectorStoreUtils.isDifferentFormat(vectorStoreFileFormat, fileFormat);
+ if (hasBlob || (hasNormal && hasSeparatedVectorStore)) {
+ return new DataEvolutionRollingFileWriter(
fileIO,
schemaId,
fileFormat,
+ vectorStoreFileFormat,
+ vectorStoreFieldNames,
targetFileSize,
blobTargetFileSize,
+ vectorStoreTargetFileSize,
writeSchema,
pathFactory,
seqNumCounterProvider,
@@ -326,13 +348,20 @@ private RollingFileWriter createRollingRowWriter() {
statsDenseStore,
blobConsumer);
}
+ FileFormat realFileFormat = hasNormal ? fileFormat : vectorStoreFileFormat;
+ long realTargetFileSize = hasNormal ? targetFileSize : vectorStoreTargetFileSize;
+ DataFilePathFactory realPathFactory =
+ hasNormal
+ ? pathFactory
+ : pathFactory.vectorStorePathFactory(
+ vectorStoreFileFormat.getFormatIdentifier());
return new RowDataRollingFileWriter(
fileIO,
schemaId,
- fileFormat,
- targetFileSize,
+ realFileFormat,
+ realTargetFileSize,
writeSchema,
- pathFactory,
+ realPathFactory,
seqNumCounterProvider,
fileCompression,
statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()),
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java
similarity index 62%
rename from paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
rename to paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java
index 80faa01d9238..4494794150da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java
@@ -28,14 +28,18 @@
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.FileWriterAbortExecutor;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
import org.apache.paimon.io.RowDataFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StatsCollectorFactories;
+import org.apache.paimon.utils.VectorStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,9 +74,10 @@
*
*
*/
-public class RollingBlobFileWriter implements RollingFileWriter {
+public class DataEvolutionRollingFileWriter
+ implements RollingFileWriter {
- private static final Logger LOG = LoggerFactory.getLogger(RollingBlobFileWriter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DataEvolutionRollingFileWriter.class);
/** Constant for checking rolling condition periodically. */
private static final long CHECK_ROLLING_RECORD_CNT = 1000L;
@@ -82,6 +87,10 @@ public class RollingBlobFileWriter implements RollingFileWriter, DataFileMeta>>
writerFactory;
private final Supplier blobWriterFactory;
+ private final Supplier<
+ ProjectedFileWriter<
+ RollingFileWriterImpl, List>>
+ vectorStoreWriterFactory;
private final long targetFileSize;
// State management
@@ -91,15 +100,21 @@ public class RollingBlobFileWriter implements RollingFileWriter, DataFileMeta>
currentWriter;
private MultipleBlobFileWriter blobWriter;
+ private ProjectedFileWriter<
+ RollingFileWriterImpl, List>
+ vectorStoreWriter;
private long recordCount = 0;
private boolean closed = false;
- public RollingBlobFileWriter(
+ public DataEvolutionRollingFileWriter(
FileIO fileIO,
long schemaId,
FileFormat fileFormat,
+ FileFormat vectorStoreFileFormat,
+ List vectorStoreFieldNames,
long targetFileSize,
long blobTargetFileSize,
+ long vectorStoreTargetFileSize,
RowType writeSchema,
DataFilePathFactory pathFactory,
Supplier seqNumCounterSupplier,
@@ -115,13 +130,25 @@ public RollingBlobFileWriter(
this.results = new ArrayList<>();
this.closedWriters = new ArrayList<>();
+ // Split into normal and vector-store parts
+ RowType normalRowType = BlobType.splitBlob(writeSchema).getLeft();
+ RowType vectorStoreRowType;
+ if (VectorStoreUtils.isDifferentFormat(vectorStoreFileFormat, fileFormat)) {
+ Pair typeWithVectorStore =
+ VectorStoreUtils.splitVectorStore(normalRowType, vectorStoreFieldNames);
+ normalRowType = typeWithVectorStore.getLeft();
+ vectorStoreRowType = typeWithVectorStore.getRight();
+ } else {
+ vectorStoreRowType = new RowType(Collections.emptyList());
+ }
+
// Initialize writer factory for normal data
this.writerFactory =
createNormalWriterFactory(
fileIO,
schemaId,
fileFormat,
- BlobType.splitBlob(writeSchema).getLeft(),
+ normalRowType,
writeSchema,
pathFactory,
seqNumCounterSupplier,
@@ -133,19 +160,45 @@ public RollingBlobFileWriter(
statsDenseStore);
// Initialize blob writer
- this.blobWriterFactory =
- () ->
- new MultipleBlobFileWriter(
- fileIO,
- schemaId,
- writeSchema,
- pathFactory,
- seqNumCounterSupplier,
- fileSource,
- asyncFileWrite,
- statsDenseStore,
- blobTargetFileSize,
- blobConsumer);
+ if (!BlobType.splitBlob(writeSchema).getRight().getFields().isEmpty()) {
+ this.blobWriterFactory =
+ () ->
+ new MultipleBlobFileWriter(
+ fileIO,
+ schemaId,
+ writeSchema,
+ pathFactory,
+ seqNumCounterSupplier,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ blobTargetFileSize,
+ blobConsumer);
+ } else {
+ this.blobWriterFactory = null;
+ }
+
+ // Initialize vector-store writer
+ if (!vectorStoreRowType.getFields().isEmpty()) {
+ this.vectorStoreWriterFactory =
+ () ->
+ createVectorStoreWriter(
+ fileIO,
+ vectorStoreFileFormat,
+ schemaId,
+ vectorStoreRowType,
+ writeSchema,
+ pathFactory,
+ seqNumCounterSupplier,
+ fileCompression,
+ statsCollectorFactories.statsCollectors(vectorStoreFieldNames),
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ vectorStoreTargetFileSize);
+ } else {
+ this.vectorStoreWriterFactory = null;
+ }
}
/** Creates a factory for normal data writers. */
@@ -192,6 +245,53 @@ public RollingBlobFileWriter(
};
}
+ /** Creates a vector-store writer for handling vector-store data. */
+ private static ProjectedFileWriter<
+ RollingFileWriterImpl, List>
+ createVectorStoreWriter(
+ FileIO fileIO,
+ FileFormat vectorStoreFileFormat,
+ long schemaId,
+ RowType vectorStoreRowType,
+ RowType writeSchema,
+ DataFilePathFactory pathFactory,
+ Supplier seqNumCounterSupplier,
+ String fileCompression,
+ SimpleColStatsCollector.Factory[] statsCollectors,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore,
+ long targetFileSize) {
+
+ List vectorStoreFieldNames = vectorStoreRowType.getFieldNames();
+
+ int[] vectorStoreProjection = writeSchema.projectIndexes(vectorStoreFieldNames);
+ DataFilePathFactory vectorStorePathFactory =
+ pathFactory.vectorStorePathFactory(vectorStoreFileFormat.getFormatIdentifier());
+ return new ProjectedFileWriter<>(
+ new RollingFileWriterImpl<>(
+ () ->
+ new RowDataFileWriter(
+ fileIO,
+ RollingFileWriter.createFileWriterContext(
+ vectorStoreFileFormat,
+ vectorStoreRowType,
+ statsCollectors,
+ fileCompression),
+ vectorStorePathFactory.newPath(),
+ vectorStoreRowType,
+ schemaId,
+ seqNumCounterSupplier,
+ new FileIndexOptions(),
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ pathFactory.isExternalPath(),
+ vectorStoreFieldNames),
+ targetFileSize),
+ vectorStoreProjection);
+ }
+
/**
* Writes a single row to both normal and blob writers. Automatically handles file rolling when
* target size is reached.
@@ -205,11 +305,19 @@ public void write(InternalRow row) throws IOException {
if (currentWriter == null) {
currentWriter = writerFactory.get();
}
- if (blobWriter == null) {
+ if ((blobWriter == null) && (blobWriterFactory != null)) {
blobWriter = blobWriterFactory.get();
}
+ if ((vectorStoreWriter == null) && (vectorStoreWriterFactory != null)) {
+ vectorStoreWriter = vectorStoreWriterFactory.get();
+ }
currentWriter.write(row);
- blobWriter.write(row);
+ if (blobWriter != null) {
+ blobWriter.write(row);
+ }
+ if (vectorStoreWriter != null) {
+ vectorStoreWriter.write(row);
+ }
recordCount++;
if (rollingFile()) {
@@ -269,6 +377,10 @@ public void abort() {
blobWriter.abort();
blobWriter = null;
}
+ if (vectorStoreWriter != null) {
+ vectorStoreWriter.abort();
+ vectorStoreWriter = null;
+ }
}
/** Checks if the current file should be rolled based on size and record count. */
@@ -295,12 +407,16 @@ private void closeCurrentWriter() throws IOException {
// Close blob writer and process blob metadata
List blobMetas = closeBlobWriter();
+ // Close vector-store writer and process vector-store metadata
+ List vectorStoreMetas = closeVectorStoreWriter();
+
// Validate consistency between main and blob files
- validateFileConsistency(mainDataFileMeta, blobMetas);
+ validateFileConsistency(mainDataFileMeta, blobMetas, vectorStoreMetas);
// Add results to the results list
results.add(mainDataFileMeta);
results.addAll(blobMetas);
+ results.addAll(vectorStoreMetas);
// Reset current writer
currentWriter = null;
@@ -324,9 +440,22 @@ private List closeBlobWriter() throws IOException {
return results;
}
+ /** Closes the vector-store writer and processes blob metadata with appropriate tags. */
+ private List closeVectorStoreWriter() throws IOException {
+ if (vectorStoreWriter == null) {
+ return Collections.emptyList();
+ }
+ vectorStoreWriter.close();
+ List results = vectorStoreWriter.result();
+ vectorStoreWriter = null;
+ return results;
+ }
+
/** Validates that the row counts match between main and blob files. */
private void validateFileConsistency(
- DataFileMeta mainDataFileMeta, List blobTaggedMetas) {
+ DataFileMeta mainDataFileMeta,
+ List blobTaggedMetas,
+ List vectorStoreMetas) {
long mainRowCount = mainDataFileMeta.rowCount();
Map blobRowCounts = new HashMap<>();
@@ -334,6 +463,9 @@ private void validateFileConsistency(
long count = file.rowCount();
blobRowCounts.compute(file.writeCols().get(0), (k, v) -> v == null ? count : v + count);
}
+ long vectorStoreRowCount =
+ vectorStoreMetas.stream().mapToLong(DataFileMeta::rowCount).sum();
+
for (String blobFieldName : blobRowCounts.keySet()) {
long blobRowCount = blobRowCounts.get(blobFieldName);
if (mainRowCount != blobRowCount) {
@@ -344,6 +476,13 @@ private void validateFileConsistency(
mainDataFileMeta, mainRowCount, blobFieldName, blobRowCount));
}
}
+ if (!vectorStoreMetas.isEmpty() && (mainRowCount != vectorStoreRowCount)) {
+ throw new IllegalStateException(
+ String.format(
+ "This is a bug: The row count of main file and vector-store files does not match. "
+ + "Main file: %s (row count: %d), vector-store files: %s (total row count: %d)",
+ mainDataFileMeta, mainRowCount, vectorStoreMetas, vectorStoreRowCount));
+ }
}
/**
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index c69932ed157f..0b107b338f9a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -47,6 +47,7 @@
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile;
/** Compact coordinator to compact data evolution table. */
public class DataEvolutionCompactCoordinator {
@@ -174,14 +175,19 @@ List compactPlan(List input) {
for (List group : ranges) {
List dataFiles = new ArrayList<>();
List blobFiles = new ArrayList<>();
+ List vectorStoreFiles = new ArrayList<>();
TreeMap treeMap = new TreeMap<>();
Map> dataFileToBlobFiles = new HashMap<>();
+ Map> dataFileToVectorStoreFiles =
+ new HashMap<>();
for (DataFileMeta f : group) {
- if (!isBlobFile(f.fileName())) {
+ if (isBlobFile(f.fileName())) {
+ blobFiles.add(f);
+ } else if (isVectorStoreFile(f.fileName())) {
+ vectorStoreFiles.add(f);
+ } else {
treeMap.put(f.nonNullFirstRowId(), f);
dataFiles.add(f);
- } else {
- blobFiles.add(f);
}
}
@@ -203,6 +209,25 @@ List compactPlan(List input) {
}
}
}
+ if (false) {
+ // associate vector-store files to data files
+ for (DataFileMeta vectorStoreFile : vectorStoreFiles) {
+ Long key = treeMap.floorKey(vectorStoreFile.nonNullFirstRowId());
+ if (key != null) {
+ DataFileMeta dataFile = treeMap.get(key);
+ if (vectorStoreFile.nonNullFirstRowId()
+ >= dataFile.nonNullFirstRowId()
+ && vectorStoreFile.nonNullFirstRowId()
+ <= dataFile.nonNullFirstRowId()
+ + dataFile.rowCount()
+ - 1) {
+ dataFileToVectorStoreFiles
+ .computeIfAbsent(dataFile, k -> new ArrayList<>())
+ .add(vectorStoreFile);
+ }
+ }
+ }
+ }
RangeHelper rangeHelper2 =
new RangeHelper<>(DataFileMeta::nonNullRowIdRange);
@@ -222,10 +247,19 @@ List compactPlan(List input) {
.sum();
if (currentGroupWeight > targetFileSize) {
// compact current file group to merge field files
- tasks.addAll(triggerTask(fileGroup, partition, dataFileToBlobFiles));
+ tasks.addAll(
+ triggerTask(
+ fileGroup,
+ partition,
+ dataFileToBlobFiles,
+ dataFileToVectorStoreFiles));
// compact wait compact files
tasks.addAll(
- triggerTask(waitCompactFiles, partition, dataFileToBlobFiles));
+ triggerTask(
+ waitCompactFiles,
+ partition,
+ dataFileToBlobFiles,
+ dataFileToVectorStoreFiles));
waitCompactFiles = new ArrayList<>();
weightSum = 0;
} else {
@@ -234,13 +268,21 @@ List compactPlan(List input) {
if (weightSum > targetFileSize) {
tasks.addAll(
triggerTask(
- waitCompactFiles, partition, dataFileToBlobFiles));
+ waitCompactFiles,
+ partition,
+ dataFileToBlobFiles,
+ dataFileToVectorStoreFiles));
waitCompactFiles = new ArrayList<>();
weightSum = 0L;
}
}
}
- tasks.addAll(triggerTask(waitCompactFiles, partition, dataFileToBlobFiles));
+ tasks.addAll(
+ triggerTask(
+ waitCompactFiles,
+ partition,
+ dataFileToBlobFiles,
+ dataFileToVectorStoreFiles));
}
}
return tasks;
@@ -249,7 +291,8 @@ List compactPlan(List input) {
private List triggerTask(
List dataFiles,
BinaryRow partition,
- Map> dataFileToBlobFiles) {
+ Map> dataFileToBlobFiles,
+ Map> dataFileToVectorStoreFiles) {
List tasks = new ArrayList<>();
if (dataFiles.size() >= compactMinFileNum) {
tasks.add(new DataEvolutionCompactTask(partition, dataFiles, false));
@@ -265,6 +308,18 @@ private List triggerTask(
tasks.add(new DataEvolutionCompactTask(partition, blobFiles, true));
}
}
+
+ if (false) {
+ List vectorStoreFiles = new ArrayList<>();
+ for (DataFileMeta dataFile : dataFiles) {
+ vectorStoreFiles.addAll(
+ dataFileToVectorStoreFiles.getOrDefault(
+ dataFile, Collections.emptyList()));
+ }
+ if (vectorStoreFiles.size() >= compactMinFileNum) {
+ tasks.add(new DataEvolutionCompactTask(partition, vectorStoreFiles, false));
+ }
+ }
return tasks;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index abbc03f17baf..da8d43c05f7e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -23,6 +23,7 @@
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
@@ -36,6 +37,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.VectorStoreUtils;
import java.util.Collections;
import java.util.List;
@@ -68,6 +70,16 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E
// TODO: support blob file compaction
throw new UnsupportedOperationException("Blob task is not supported");
}
+ if (VectorStoreUtils.isVectorStoreFile(compactBefore.get(0).fileName())) {
+ // TODO: support vector-store file compaction
+ throw new UnsupportedOperationException("Vector-store task is not supported");
+ }
+ List separatedVectorStoreFields =
+ VectorStoreUtils.isDifferentFormat(
+ FileFormat.vectorStoreFileFormat(table.coreOptions()),
+ FileFormat.fileFormat(table.coreOptions()))
+ ? table.coreOptions().vectorStoreFieldNames()
+ : Collections.emptyList();
table = table.copy(DYNAMIC_WRITE_OPTIONS);
long firstRowId = compactBefore.get(0).nonNullFirstRowId();
@@ -76,6 +88,7 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E
new RowType(
table.rowType().getFields().stream()
.filter(f -> f.type().getTypeRoot() != DataTypeRoot.BLOB)
+ .filter(f -> !separatedVectorStoreFields.contains(f.name()))
.collect(Collectors.toList()));
FileStorePathFactory pathFactory = table.store().pathFactory();
AppendOnlyFileStore store = (AppendOnlyFileStore) table.store();
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index b63a1c0b7a79..6318d70b7026 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -59,14 +59,36 @@ public DataFilePathFactory(
boolean fileSuffixIncludeCompression,
String fileCompression,
@Nullable ExternalPathProvider externalPathProvider) {
+ this(
+ parent,
+ UUID.randomUUID().toString(),
+ new AtomicInteger(0),
+ formatIdentifier,
+ dataFilePrefix,
+ changelogFilePrefix,
+ fileSuffixIncludeCompression,
+ compressFileExtension(fileCompression),
+ externalPathProvider);
+ }
+
+ private DataFilePathFactory(
+ Path parent,
+ String uuid,
+ AtomicInteger pathCount,
+ String formatIdentifier,
+ String dataFilePrefix,
+ String changelogFilePrefix,
+ boolean fileSuffixIncludeCompression,
+ @Nullable String compressExtension,
+ @Nullable ExternalPathProvider externalPathProvider) {
this.parent = parent;
- this.uuid = UUID.randomUUID().toString();
- this.pathCount = new AtomicInteger(0);
+ this.uuid = uuid;
+ this.pathCount = pathCount;
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
- this.compressExtension = compressFileExtension(fileCompression);
+ this.compressExtension = compressExtension;
this.externalPathProvider = externalPathProvider;
}
@@ -99,6 +121,10 @@ public Path newPath(String prefix) {
}
private String newFileName(String prefix) {
+ return newFileName(prefix, makeExtension(compressExtension, formatIdentifier));
+ }
+
+ protected String makeExtension(String compressExtension, String formatIdentifier) {
String extension;
if (compressExtension != null && isTextFormat(formatIdentifier)) {
extension = "." + formatIdentifier + "." + compressExtension;
@@ -107,7 +133,7 @@ private String newFileName(String prefix) {
} else {
extension = "." + formatIdentifier;
}
- return newFileName(prefix, extension);
+ return extension;
}
public Path newPathFromExtension(String extension) {
@@ -121,7 +147,7 @@ public Path newPathFromName(String fileName) {
return new Path(parent, fileName);
}
- private String newFileName(String prefix, String extension) {
+ protected String newFileName(String prefix, String extension) {
return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
}
@@ -211,4 +237,28 @@ private static String compressFileExtension(String compression) {
}
return compression;
}
+
+ public DataFilePathFactory vectorStorePathFactory(String formatIdentifier) {
+ return new VectorStoreWrapper(this, formatIdentifier);
+ }
+
+ private static class VectorStoreWrapper extends DataFilePathFactory {
+ private VectorStoreWrapper(DataFilePathFactory base, String formatIdentifier) {
+ super(
+ base.parent,
+ base.uuid,
+ base.pathCount,
+ formatIdentifier,
+ base.dataFilePrefix,
+ base.changelogFilePrefix,
+ base.fileSuffixIncludeCompression,
+ base.compressExtension,
+ base.externalPathProvider);
+ }
+
+ @Override
+ protected String makeExtension(String compressExtension, String formatIdentifier) {
+ return ".vector-store" + super.makeExtension(compressExtension, formatIdentifier);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index bc39a4ae8e76..aabaa11a3e50 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -132,8 +132,11 @@ protected RecordWriter createWriter(
ioManager,
schemaId,
fileFormat,
+ FileFormat.vectorStoreFileFormat(options),
+ options.vectorStoreFieldNames(),
options.targetFileSize(false),
options.blobTargetFileSize(),
+ options.vectorStoreTargetFileSize(),
writeType,
writeCols,
restoredMaxSeqNumber,
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index eb6a8360f2c1..b16290be535a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -53,6 +53,7 @@
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile;
/** {@link FileStoreScan} for data-evolution enabled table. */
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
@@ -156,10 +157,11 @@ static EvolutionStats evolutionStats(
TableSchema schema,
Function scanTableSchema,
List metas) {
- // exclude blob files, useless for predicate eval
+ // exclude blob and vector-store files, useless for predicate eval
metas =
metas.stream()
.filter(entry -> !isBlobFile(entry.file().fileName()))
+ .filter(entry -> !isVectorStoreFile(entry.file().fileName()))
.collect(Collectors.toList());
ToLongFunction maxSeqFunc = e -> e.file().maxSequenceNumber();
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 153fc4e6ce66..3a36340d6191 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -62,6 +62,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
@@ -72,6 +74,8 @@
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile;
/**
* A union {@link SplitRead} to read multiple inner files to merge columns, note that this class
@@ -216,13 +220,10 @@ private DataEvolutionFileReader createUnionReader(
needMergeFiles,
file -> {
checkArgument(
- isBlobFile(file.fileName()),
- "Only blob file need to call this method.");
- return schemaFetcher
- .apply(file.schemaId())
- .logicalRowType()
- .getField(file.writeCols().get(0))
- .id();
+ isBlobFile(file.fileName())
+ || isVectorStoreFile(file.fileName()),
+ "Only blob/vector-store files need to call this method.");
+ return schemaFetcher.apply(file.schemaId()).logicalRowType();
});
long rowCount = fieldsFiles.get(0).rowCount();
@@ -409,25 +410,39 @@ private FileRecordReader createFileReader(
@VisibleForTesting
public static List splitFieldBunches(
- List needMergeFiles, Function blobFileToFieldId) {
- return splitFieldBunches(needMergeFiles, blobFileToFieldId, false);
+ List needMergeFiles, Function fileToRowType) {
+ return splitFieldBunches(needMergeFiles, fileToRowType, false);
}
@VisibleForTesting
public static List splitFieldBunches(
List needMergeFiles,
- Function blobFileToFieldId,
+ Function fileToRowType,
boolean rowIdPushDown) {
List fieldsFiles = new ArrayList<>();
- Map blobBunchMap = new HashMap<>();
+ Map blobBunchMap = new HashMap<>();
+ Map vectorStoreBunchMap = new TreeMap<>();
long rowCount = -1;
for (DataFileMeta file : needMergeFiles) {
if (isBlobFile(file.fileName())) {
- int fieldId = blobFileToFieldId.apply(file);
+ RowType rowType = fileToRowType.apply(file);
+ int fieldId = rowType.getField(file.writeCols().get(0)).id();
final long expectedRowCount = rowCount;
blobBunchMap
.computeIfAbsent(
- fieldId, key -> new BlobBunch(expectedRowCount, rowIdPushDown))
+ fieldId, key -> new SplitBunch(expectedRowCount, rowIdPushDown))
+ .add(file);
+ } else if (isVectorStoreFile(file.fileName())) {
+ RowType rowType = fileToRowType.apply(file);
+ String fileFormat = DataFilePathFactory.formatIdentifier(file.fileName());
+ VectorStoreBunchKey vectorStoreKey =
+ new VectorStoreBunchKey(
+ file.schemaId(), fileFormat, file.writeCols(), rowType);
+ final long expectedRowCount = rowCount;
+ vectorStoreBunchMap
+ .computeIfAbsent(
+ vectorStoreKey,
+ key -> new SplitBunch(expectedRowCount, rowIdPushDown))
.add(file);
} else {
// Normal file, just add it to the current merge split
@@ -436,6 +451,7 @@ public static List splitFieldBunches(
}
}
fieldsFiles.addAll(blobBunchMap.values());
+ fieldsFiles.addAll(vectorStoreBunchMap.values());
return fieldsFiles;
}
@@ -467,7 +483,7 @@ public List files() {
}
@VisibleForTesting
- static class BlobBunch implements FieldBunch {
+ static class SplitBunch implements FieldBunch {
final List files;
final long expectedRowCount;
@@ -478,7 +494,7 @@ static class BlobBunch implements FieldBunch {
long latestMaxSequenceNumber = -1;
long rowCount;
- BlobBunch(long expectedRowCount, boolean rowIdPushDown) {
+ SplitBunch(long expectedRowCount, boolean rowIdPushDown) {
this.files = new ArrayList<>();
this.rowCount = 0;
this.expectedRowCount = expectedRowCount;
@@ -486,14 +502,15 @@ static class BlobBunch implements FieldBunch {
}
void add(DataFileMeta file) {
- if (!isBlobFile(file.fileName())) {
- throw new IllegalArgumentException("Only blob file can be added to a blob bunch.");
+ if (!isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) {
+ throw new IllegalArgumentException(
+ "Only blob/vector-store file can be added to this bunch.");
}
if (file.nonNullFirstRowId() == latestFistRowId) {
if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
throw new IllegalArgumentException(
- "Blob file with same first row id should have decreasing sequence number.");
+ "Blob/vector-store file with same first row id should have decreasing sequence number.");
}
return;
}
@@ -512,11 +529,11 @@ void add(DataFileMeta file) {
if (firstRowId < expectedNextFirstRowId) {
checkArgument(
file.maxSequenceNumber() < latestMaxSequenceNumber,
- "Blob file with overlapping row id should have decreasing sequence number.");
+ "Blob/vector-store file with overlapping row id should have decreasing sequence number.");
return;
} else if (firstRowId > expectedNextFirstRowId) {
throw new IllegalArgumentException(
- "Blob file first row id should be continuous, expect "
+ "Blob/vector-store file first row id should be continuous, expect "
+ expectedNextFirstRowId
+ " but got "
+ firstRowId);
@@ -525,17 +542,17 @@ void add(DataFileMeta file) {
if (!files.isEmpty()) {
checkArgument(
file.schemaId() == files.get(0).schemaId(),
- "All files in a blob bunch should have the same schema id.");
+ "All files in this bunch should have the same schema id.");
checkArgument(
file.writeCols().equals(files.get(0).writeCols()),
- "All files in a blob bunch should have the same write columns.");
+ "All files in this bunch should have the same write columns.");
}
}
files.add(file);
rowCount += file.rowCount();
checkArgument(
rowCount <= expectedRowCount,
- "Blob files row count exceed the expect " + expectedRowCount);
+ "Blob/vector-store files row count exceed the expect " + expectedRowCount);
this.latestMaxSequenceNumber = file.maxSequenceNumber();
this.latestFistRowId = file.nonNullFirstRowId();
this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
@@ -558,14 +575,17 @@ public static List> mergeRangesAndSort(List fil
RangeHelper rangeHelper = new RangeHelper<>(DataFileMeta::nonNullRowIdRange);
List> result = rangeHelper.mergeOverlappingRanges(files);
- // in group, sort by blob file and max_seq
+ // in group, sort by blob/vector-store file and max_seq
for (List group : result) {
- // split to data files and blob files
+ // split to data files, blob files, vector-store files
List dataFiles = new ArrayList<>();
List blobFiles = new ArrayList<>();
+ List vectorStoreFiles = new ArrayList<>();
for (DataFileMeta f : group) {
if (isBlobFile(f.fileName())) {
blobFiles.add(f);
+ } else if (isVectorStoreFile(f.fileName())) {
+ vectorStoreFiles.add(f);
} else {
dataFiles.add(f);
}
@@ -583,12 +603,111 @@ public static List> mergeRangesAndSort(List fil
comparingLong(DataFileMeta::nonNullFirstRowId)
.thenComparing(reverseOrder(comparingLong(maxSeqF))));
- // concat data files and blob files
+ // vector-store files sort by first row id then by reversed max sequence number
+ vectorStoreFiles.sort(
+ comparingLong(DataFileMeta::nonNullFirstRowId)
+ .thenComparing(reverseOrder(comparingLong(maxSeqF))));
+
+ // concat data files, blob files, vector-store files
group.clear();
group.addAll(dataFiles);
group.addAll(blobFiles);
+ group.addAll(vectorStoreFiles);
}
return result;
}
+
+ static final class VectorStoreBunchKey implements Comparable {
+ public final long schemaId;
+ public final String formatIdentifier;
+ public final List writeCols;
+
+ public VectorStoreBunchKey(
+ long schemaId,
+ String formatIdentifier,
+ List writeCols,
+ RowType preferredColOrder) {
+ this.schemaId = schemaId;
+ this.formatIdentifier = checkNotNull(formatIdentifier, "formatIdentifier");
+ this.writeCols = normalizeWriteCols(writeCols, preferredColOrder);
+ }
+
+ @Override
+ public int compareTo(VectorStoreBunchKey o) {
+ int c = Long.compare(this.schemaId, o.schemaId);
+ if (c != 0) {
+ return c;
+ }
+
+ c = this.formatIdentifier.compareTo(o.formatIdentifier);
+ if (c != 0) {
+ return c;
+ }
+
+ int n = Math.min(this.writeCols.size(), o.writeCols.size());
+ for (int i = 0; i < n; i++) {
+ c = this.writeCols.get(i).compareTo(o.writeCols.get(i));
+ if (c != 0) {
+ return c;
+ }
+ }
+ return Integer.compare(this.writeCols.size(), o.writeCols.size());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof VectorStoreBunchKey)) {
+ return false;
+ }
+ VectorStoreBunchKey that = (VectorStoreBunchKey) o;
+ return schemaId == that.schemaId
+ && formatIdentifier.equals(that.formatIdentifier)
+ && writeCols.equals(that.writeCols);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaId, formatIdentifier, writeCols);
+ }
+
+ @Override
+ public String toString() {
+ return "VectorStoreBunchKey{schemaId="
+ + schemaId
+ + ", format="
+ + formatIdentifier
+ + ", writeCols="
+ + writeCols
+ + "}";
+ }
+
+ private static List normalizeWriteCols(List writeCols, RowType rowType) {
+ if (writeCols == null || writeCols.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Map colPosMap = new HashMap<>();
+ List namesInRowType = rowType.getFieldNames();
+ for (int i = 0; i < namesInRowType.size(); i++) {
+ colPosMap.putIfAbsent(namesInRowType.get(i), i);
+ }
+
+ ArrayList sorted = new ArrayList<>(writeCols);
+ sorted.sort(
+ (a, b) -> {
+ int ia = colPosMap.getOrDefault(a, Integer.MAX_VALUE);
+ int ib = colPosMap.getOrDefault(b, Integer.MAX_VALUE);
+ if (ia != ib) {
+ return Integer.compare(ia, ib);
+ }
+ return a.compareTo(b);
+ });
+
+ return sorted;
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
index d2f3dc8851ef..fe66f00fd1e2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
@@ -30,6 +30,7 @@
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile;
/** Utils for row tracking commit. */
public class RowTrackingCommitUtils {
@@ -68,6 +69,7 @@ private static long assignRowTrackingMeta(
long start = firstRowIdStart;
long blobStartDefault = firstRowIdStart;
Map blobStarts = new HashMap<>();
+ long vectorStoreStart = firstRowIdStart;
for (ManifestEntry entry : deltaFiles) {
Optional fileSource = entry.file().fileSource();
checkArgument(
@@ -91,6 +93,15 @@ private static long assignRowTrackingMeta(
}
rowIdAssigned.add(entry.assignFirstRowId(blobStart));
blobStarts.put(blobFieldName, blobStart + rowCount);
+ } else if (isVectorStoreFile(entry.file().fileName())) {
+ if (vectorStoreStart >= start) {
+ throw new IllegalStateException(
+ String.format(
+ "This is a bug, vectorStoreStart %d should be less than start %d when assigning a vector-store entry file.",
+ vectorStoreStart, start));
+ }
+ rowIdAssigned.add(entry.assignFirstRowId(vectorStoreStart));
+ vectorStoreStart += rowCount;
} else {
rowIdAssigned.add(entry.assignFirstRowId(start));
blobStartDefault = start;
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index f4cb16a018ce..6f05068d8302 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -43,10 +43,12 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.VectorStoreUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -156,7 +158,17 @@ public static void validateTableSchema(TableSchema schema) {
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
- fileFormat.validateDataFields(BlobType.splitBlob(new RowType(schema.fields())).getLeft());
+ if (VectorStoreUtils.isDifferentFormat(
+ FileFormat.vectorStoreFileFormat(options), fileFormat)) {
+ fileFormat.validateDataFields(
+ VectorStoreUtils.splitVectorStore(
+ BlobType.splitBlob(new RowType(schema.fields())).getLeft(),
+ options.vectorStoreFieldNames())
+ .getLeft());
+ } else {
+ fileFormat.validateDataFields(
+ BlobType.splitBlob(new RowType(schema.fields())).getLeft());
+ }
// Check column names in schema
schema.fieldNames()
@@ -617,6 +629,36 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
!schema.partitionKeys().contains(blobNames.get(0)),
"The BLOB type column can not be part of partition keys.");
}
+
+ FileFormat vectorStoreFileFormat = FileFormat.vectorStoreFileFormat(options);
+ if (VectorStoreUtils.isDifferentFormat(
+ vectorStoreFileFormat, FileFormat.fileFormat(options))) {
+ List vectorStoreNames = options.vectorStoreFieldNames();
+ List nonBlobNames =
+ BlobType.splitBlob(schema.logicalRowType()).getLeft().getFieldNames();
+ checkArgument(
+ blobNames.stream().noneMatch(vectorStoreNames::contains),
+ "The vector-store columns can not be blob type.");
+ checkArgument(
+ new HashSet<>(nonBlobNames).containsAll(vectorStoreNames),
+ "Some of the columns specified as vector-store are unknown.");
+ checkArgument(
+ schema.partitionKeys().stream().noneMatch(vectorStoreNames::contains),
+ "The vector-store columns can not be part of partition keys.");
+ checkArgument(
+ nonBlobNames.size() > vectorStoreNames.size(),
+ "Table with vector-store must have other normal columns.");
+ checkArgument(
+ options.dataEvolutionEnabled(),
+ "Data evolution config must enabled for table with vector-store file format.");
+
+ RowType vectorStoreRowType =
+ VectorStoreUtils.splitVectorStore(
+ BlobType.splitBlob(schema.logicalRowType()).getLeft(),
+ vectorStoreNames)
+ .getRight();
+ vectorStoreFileFormat.validateDataFields(vectorStoreRowType);
+ }
}
private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java
new file mode 100644
index 000000000000..7e8c474a776e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Utils for vector-store table. */
+public class VectorStoreUtils {
+ public static boolean isDifferentFormat(FileFormat vectorStoreFormat, FileFormat normalFormat) {
+ return (vectorStoreFormat != null)
+ && !vectorStoreFormat
+ .getFormatIdentifier()
+ .equals(normalFormat.getFormatIdentifier());
+ }
+
+ public static boolean isVectorStoreFile(String fileName) {
+ return fileName.contains(".vector-store.");
+ }
+
+ public static Pair splitVectorStore(
+ RowType rowType, List vectorStoreFieldNames) {
+ List allFields = rowType.getFields();
+ List normalFields = new ArrayList<>();
+ List vectorStoreFields = new ArrayList<>();
+
+ for (DataField field : allFields) {
+ if (vectorStoreFieldNames.contains(field.name())) {
+ vectorStoreFields.add(field);
+ } else {
+ normalFields.add(field);
+ }
+ }
+
+ return Pair.of(new RowType(normalFields), new RowType(vectorStoreFields));
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index b3470cfb4522..4af226487695 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -73,6 +73,7 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -575,6 +576,34 @@ public void testNonSpillable() throws Exception {
writer.close();
}
+ @Test
+ public void testVectorStoreSameFormatUsesRowDataWriter() throws Exception {
+ FileFormat format = FileFormat.fromIdentifier(AVRO, new Options());
+ AppendOnlyWriter writer =
+ createVectorStoreWriter(1024 * 1024L, format, Collections.singletonList("name"));
+ writer.write(row(1, "AAA", PART));
+ CommitIncrement increment = writer.prepareCommit(true);
+ writer.close();
+
+ assertThat(increment.newFilesIncrement().newFiles()).hasSize(1);
+ DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
+ assertThat(meta.fileName()).doesNotContain(".vector-store");
+ }
+
+ @Test
+ public void testAllVectorStoreColumnsUseVectorStorePath() throws Exception {
+ FileFormat format = FileFormat.fromIdentifier(AVRO, new Options());
+ AppendOnlyWriter writer =
+ createVectorStoreWriter(1024 * 1024L, format, Arrays.asList("id", "name", "dt"));
+ writer.write(row(1, "AAA", PART));
+ CommitIncrement increment = writer.prepareCommit(true);
+ writer.close();
+
+ assertThat(increment.newFilesIncrement().newFiles()).hasSize(1);
+ DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
+ assertThat(meta.fileName()).contains(".vector-store");
+ }
+
private SimpleColStats initStats(Integer min, Integer max, long nullCount) {
return new SimpleColStats(min, max, nullCount);
}
@@ -665,6 +694,58 @@ private Pair> createWriter(
boolean hasIoManager,
List scannedFiles,
CountDownLatch latch) {
+ Map options = new HashMap<>();
+ options.put("metadata.stats-mode", "truncate(16)");
+ return createWriterBase(
+ targetFileSize,
+ null,
+ Collections.emptyList(),
+ forceCompact,
+ useWriteBuffer,
+ spillable,
+ hasIoManager,
+ scannedFiles,
+ compactBefore -> {
+ latch.await();
+ return compactBefore.isEmpty()
+ ? Collections.emptyList()
+ : Collections.singletonList(generateCompactAfter(compactBefore));
+ },
+ options);
+ }
+
+ private AppendOnlyWriter createVectorStoreWriter(
+ long targetFileSize,
+ FileFormat vectorStoreFileFormat,
+ List vectorStoreFieldNames) {
+ Map options = new HashMap<>();
+ options.put("metadata.stats-mode", "truncate(16)");
+ options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return createWriterBase(
+ targetFileSize,
+ vectorStoreFileFormat,
+ vectorStoreFieldNames,
+ false,
+ true,
+ false,
+ true,
+ Collections.emptyList(),
+ compactBefore -> Collections.emptyList(),
+ options)
+ .getKey();
+ }
+
+ private Pair> createWriterBase(
+ long targetFileSize,
+ FileFormat vectorStoreFileFormat,
+ List vectorStoreFieldNames,
+ boolean forceCompact,
+ boolean useWriteBuffer,
+ boolean spillable,
+ boolean hasIoManager,
+ List scannedFiles,
+ BucketedAppendCompactManager.CompactRewriter rewriter,
+ Map optionsMap) {
FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
LinkedList toCompact = new LinkedList<>(scannedFiles);
BucketedAppendCompactManager compactManager =
@@ -677,22 +758,18 @@ private Pair> createWriter(
targetFileSize,
targetFileSize / 10 * 7,
false,
- compactBefore -> {
- latch.await();
- return compactBefore.isEmpty()
- ? Collections.emptyList()
- : Collections.singletonList(
- generateCompactAfter(compactBefore));
- },
+ rewriter,
null);
- CoreOptions options =
- new CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
+ CoreOptions options = new CoreOptions(optionsMap);
AppendOnlyWriter writer =
new AppendOnlyWriter(
LocalFileIO.create(),
hasIoManager ? IOManager.create(tempDir.toString()) : null,
SCHEMA_ID,
fileFormat,
+ vectorStoreFileFormat,
+ vectorStoreFieldNames,
+ targetFileSize,
targetFileSize,
targetFileSize,
AppendOnlyWriterTest.SCHEMA,
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 3045f33b1664..6b7d94679863 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -38,7 +38,9 @@
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.UriReader;
@@ -52,6 +54,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -105,7 +108,8 @@ public void testBasic() throws Exception {
.collect(Collectors.toList());
List fieldGroups =
- DataEvolutionSplitRead.splitFieldBunches(filesMetas, key -> 0);
+ DataEvolutionSplitRead.splitFieldBunches(
+ filesMetas, key -> makeBlobRowType(key.writeCols(), f -> 0));
assertThat(fieldGroups.size()).isEqualTo(2);
assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
@@ -155,7 +159,8 @@ public void testMultiBatch() throws Exception {
assertThat(batches.size()).isEqualTo(2);
for (List batch : batches) {
List fieldGroups =
- DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0);
+ DataEvolutionSplitRead.splitFieldBunches(
+ batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
assertThat(fieldGroups.size()).isEqualTo(2);
assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
assertThat(fieldGroups.get(1).files().size()).isEqualTo(10);
@@ -249,6 +254,20 @@ protected InternalRow dataDefault(int time, int size) {
RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new BlobData(blobBytes));
}
+ private static RowType makeBlobRowType(
+ List fieldNames, Function fieldIdFunc) {
+ List fields = new ArrayList<>();
+ if (fieldNames == null) {
+ fieldNames = Collections.emptyList();
+ }
+ for (String fieldName : fieldNames) {
+ int fieldId = fieldIdFunc.apply(fieldName);
+ DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB());
+ fields.add(blobField);
+ }
+ return new RowType(fields);
+ }
+
@Override
protected byte[] randomBytes() {
byte[] binary = new byte[2 * 1024 * 124];
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java
similarity index 95%
rename from paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
rename to paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java
index 0afe95eef066..113bc3f86ed9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java
@@ -43,13 +43,14 @@
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link RollingBlobFileWriter}. */
-public class RollingBlobFileWriterTest {
+/** Tests for {@link DataEvolutionRollingFileWriter}. */
+public class DataEvolutionRollingFileWriterTest {
private static final RowType SCHEMA =
RowType.builder()
@@ -64,7 +65,7 @@ public class RollingBlobFileWriterTest {
@TempDir java.nio.file.Path tempDir;
- private RollingBlobFileWriter writer;
+ private DataEvolutionRollingFileWriter writer;
private DataFilePathFactory pathFactory;
private LongCounter seqNumCounter;
private byte[] testBlobData;
@@ -91,10 +92,13 @@ public void setUp() throws IOException {
// Initialize the writer
writer =
- new RollingBlobFileWriter(
+ new DataEvolutionRollingFileWriter(
fileIO,
SCHEMA_ID,
FileFormat.fromIdentifier("parquet", new Options()),
+ null,
+ Collections.emptyList(),
+ TARGET_FILE_SIZE,
TARGET_FILE_SIZE,
TARGET_FILE_SIZE,
SCHEMA,
@@ -182,13 +186,16 @@ public void testBlobTargetFileSize() throws IOException {
long blobTargetFileSize = 500 * 1024 * 1024L; // 2 MB for blob files
// Create a new writer with different blob target file size
- RollingBlobFileWriter blobSizeTestWriter =
- new RollingBlobFileWriter(
+ DataEvolutionRollingFileWriter blobSizeTestWriter =
+ new DataEvolutionRollingFileWriter(
LocalFileIO.create(),
SCHEMA_ID,
FileFormat.fromIdentifier("parquet", new Options()),
+ null,
+ Collections.emptyList(),
128 * 1024 * 1024,
blobTargetFileSize, // Different blob target size
+ 128 * 1024 * 1024,
SCHEMA,
new DataFilePathFactory(
new Path(tempDir + "/blob-size-test"),
@@ -267,13 +274,16 @@ public void testSchemaValidation() throws IOException {
void testBlobFileNameFormatWithSharedUuid() throws IOException {
long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files
- RollingBlobFileWriter fileNameTestWriter =
- new RollingBlobFileWriter(
+ DataEvolutionRollingFileWriter fileNameTestWriter =
+ new DataEvolutionRollingFileWriter(
LocalFileIO.create(),
SCHEMA_ID,
FileFormat.fromIdentifier("parquet", new Options()),
+ null,
+ Collections.emptyList(),
128 * 1024 * 1024,
blobTargetFileSize,
+ 128 * 1024 * 1024,
SCHEMA,
pathFactory, // Use the same pathFactory to ensure shared UUID
() -> new LongCounter(),
@@ -346,13 +356,16 @@ void testBlobFileNameFormatWithSharedUuid() throws IOException {
void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws IOException {
long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files
- RollingBlobFileWriter fileNameTestWriter =
- new RollingBlobFileWriter(
+ DataEvolutionRollingFileWriter fileNameTestWriter =
+ new DataEvolutionRollingFileWriter(
LocalFileIO.create(),
SCHEMA_ID,
FileFormat.fromIdentifier("parquet", new Options()),
+ null,
+ Collections.emptyList(),
128 * 1024 * 1024,
blobTargetFileSize,
+ 128 * 1024 * 1024,
SCHEMA,
pathFactory, // Use the same pathFactory to ensure shared UUID
() -> new LongCounter(),
@@ -565,10 +578,13 @@ void testBlobStatsSchemaWithCustomColumnName() throws IOException {
// Reinitialize writer with custom schema
writer =
- new RollingBlobFileWriter(
+ new DataEvolutionRollingFileWriter(
LocalFileIO.create(),
SCHEMA_ID,
FileFormat.fromIdentifier("parquet", new Options()),
+ null,
+ Collections.emptyList(),
+ TARGET_FILE_SIZE,
TARGET_FILE_SIZE,
TARGET_FILE_SIZE,
customSchema, // Use custom schema
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java
new file mode 100644
index 000000000000..04c7119b08f1
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.StatsCollectorFactories;
+import org.apache.paimon.utils.VectorStoreUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionRollingFileWriter} with vector-store. */
+public class DataEvolutionRollingFileWriterWithVectorStoreTest {
+
+ private static final int VECTOR_DIM = 10;
+ private static final RowType SCHEMA =
+ RowType.builder()
+ .field("f0", DataTypes.INT())
+ .field("f1", DataTypes.STRING())
+ .field("f2", DataTypes.BLOB())
+ .field("f3", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT()))
+ .field("f4", DataTypes.INT())
+ .build();
+
+ private static final long TARGET_FILE_SIZE = 2 * 1024 * 1024L; // 2 MB
+ private static final long VECTOR_STORE_TARGET_FILE_SIZE = 4 * 1024 * 1024L; // 4 MB
+ private static final long SCHEMA_ID = 1L;
+ private static final String COMPRESSION = "none";
+ private static final Random RANDOM = new Random(System.currentTimeMillis());
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private DataEvolutionRollingFileWriter writer;
+ private DataFilePathFactory pathFactory;
+ private LongCounter seqNumCounter;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ // Setup file system and path factory
+ LocalFileIO fileIO = LocalFileIO.create();
+ pathFactory =
+ new DataFilePathFactory(
+ new Path(tempDir + "/bucket-0"),
+ "parquet",
+ "data-", // dataFilePrefix should include the hyphen to match expected
+ // format: data-{uuid}-{count}
+ "changelog",
+ false,
+ null,
+ null);
+ seqNumCounter = new LongCounter();
+
+ // Initialize the writer
+ writer =
+ new DataEvolutionRollingFileWriter(
+ fileIO,
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("parquet", new Options()),
+ FileFormat.fromIdentifier("json", new Options()),
+ Arrays.asList("f3", "f4"),
+ TARGET_FILE_SIZE,
+ TARGET_FILE_SIZE,
+ VECTOR_STORE_TARGET_FILE_SIZE,
+ SCHEMA,
+ pathFactory,
+ () -> seqNumCounter,
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false,
+ false,
+ null);
+ }
+
+ @Test
+ public void testBasicWriting() throws IOException {
+ // Write a single row
+ writer.write(makeRows(1, 10).get(0));
+ assertThat(writer.recordCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMultipleWrites() throws Exception {
+ // Write multiple rows
+ int rowNum = RANDOM.nextInt(64) + 1;
+ writer.write(makeRows(rowNum, 10).iterator());
+ writer.close();
+ List metasResult = writer.result();
+
+ assertThat(metasResult.size()).isEqualTo(3); // blob is small, normal/blob/vector 3 files
+ assertThat(metasResult.get(0).fileFormat()).isEqualTo("parquet");
+ assertThat(metasResult.get(1).fileFormat()).isEqualTo("blob");
+ assertThat(metasResult.get(2).fileFormat()).isEqualTo("json");
+ assertThat(writer.recordCount()).isEqualTo(rowNum);
+
+ assertThat(metasResult.get(0).rowCount()).isEqualTo(metasResult.get(1).rowCount());
+ assertThat(metasResult.get(0).rowCount()).isEqualTo(metasResult.get(2).rowCount());
+ }
+
+ @Test
+ public void testVectorStoreTargetFileSize() throws Exception {
+ // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files
+ int rowNum = 100 * 1000;
+ writer.write(makeRows(rowNum, 1).iterator());
+ writer.close();
+ List results = writer.result();
+
+ // Verify that we have multiple files due to rolling
+ assertThat(results.size()).isGreaterThan(1);
+
+ // Check that vector-store files meet the target size requirement
+ List vectorStoreFiles =
+ results.stream()
+ .filter(file -> VectorStoreUtils.isVectorStoreFile(file.fileName()))
+ .collect(java.util.stream.Collectors.toList());
+
+ assertThat(vectorStoreFiles.size()).isEqualTo(3);
+
+ // Verify that vector-store files are close to the target size (within reasonable tolerance)
+ for (DataFileMeta file : vectorStoreFiles.subList(0, vectorStoreFiles.size() - 1)) {
+ long fileSize = file.fileSize();
+ assertThat(fileSize)
+ .as("Vector-store file size should be close to target size")
+ .isGreaterThanOrEqualTo(VECTOR_STORE_TARGET_FILE_SIZE)
+ .isLessThanOrEqualTo(VECTOR_STORE_TARGET_FILE_SIZE + 256 * 1024);
+ }
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(rowNum);
+ }
+
+ @Test
+ void testVectorStoreFileNameFormatWithSharedUuid() throws Exception {
+ // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files
+ int rowNum = 100 * 1000;
+ writer.write(makeRows(rowNum, 1).iterator());
+ writer.close();
+ List results = writer.result();
+
+ // Get uuid from vector-store files. The pattern is data-{uuid}-{count}.vector-store.json
+ DataFileMeta oneVectorStoreFile =
+ results.stream()
+ .filter(file -> VectorStoreUtils.isVectorStoreFile(file.fileName()))
+ .findAny()
+ .get();
+ String uuidAndCnt = oneVectorStoreFile.fileName().split(".vector-store.")[0];
+ String prefix = uuidAndCnt.substring(0, uuidAndCnt.lastIndexOf('-') + 1); // data-{uuid}-
+
+ // Verify all files use the same UUID and have sequential counters
+ for (int i = 0; i < results.size(); ++i) {
+ String fileName = results.get(i).fileName();
+ assertThat(fileName).as("All files should use the same UUID").startsWith(prefix);
+ int counter = Integer.parseInt(fileName.substring(prefix.length()).split("\\.")[0]);
+ assertThat(counter).as("File counter should be sequential").isEqualTo(i);
+ }
+ }
+
+ @Test
+ void testVectorStoreStatsMainPart() throws Exception {
+ // Write multiple rows
+ int rowNum = RANDOM.nextInt(64) + 1;
+ writer.write(makeRows(rowNum, 10).iterator());
+ writer.close();
+ List metasResult = writer.result();
+
+ // Check row count
+ for (DataFileMeta file : metasResult) {
+ assertThat(file.rowCount()).isEqualTo(rowNum);
+ assertThat(file.deleteRowCount().get()).isEqualTo(0); // There is no deleted rows
+ }
+
+ // Check statistics
+ for (DataFileMeta file : metasResult) {
+ if (BlobFileFormat.isBlobFile(file.fileName())) {
+ assertThat(file.writeCols()).isEqualTo(Collections.singletonList("f2"));
+ } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) {
+ assertThat(file.writeCols()).isEqualTo(Arrays.asList("f3", "f4"));
+ // Json does not implement createStatsExtractor so we skip it here.
+ // assertThat(file.valueStats().minValues().getInt(1)).isGreaterThan(0);
+ } else {
+ assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1"));
+ assertThat(file.valueStats().minValues().getInt(0)).isEqualTo(0);
+ assertThat(file.valueStats().maxValues().getInt(0)).isEqualTo(rowNum - 1);
+ }
+ }
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(rowNum);
+ }
+
+ @Test
+ void testVectorStoreStatsVectorStorePart() throws Exception {
+ // This time we set parquet as vector-store file format.
+ RowType schema =
+ RowType.builder()
+ .field("f0", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT()))
+ .field("f1", DataTypes.INT())
+ .field("f2", DataTypes.BLOB())
+ .field("f3", DataTypes.INT())
+ .field("f4", DataTypes.STRING())
+ .build();
+ writer =
+ new DataEvolutionRollingFileWriter(
+ LocalFileIO.create(),
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("json", new Options()),
+ FileFormat.fromIdentifier("parquet", new Options()),
+ Arrays.asList("f3", "f4"),
+ TARGET_FILE_SIZE,
+ TARGET_FILE_SIZE,
+ VECTOR_STORE_TARGET_FILE_SIZE,
+ schema,
+ pathFactory,
+ () -> seqNumCounter,
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false,
+ false,
+ null);
+
+ // Write multiple rows
+ int rowNum = RANDOM.nextInt(64) + 1;
+ List rows = makeRows(rowNum, 10);
+ for (InternalRow row : rows) {
+ writer.write(
+ GenericRow.of(
+ row.getVector(3),
+ row.getInt(4),
+ row.getBlob(2),
+ row.getInt(0),
+ row.getString(1)));
+ }
+ writer.close();
+ List metasResult = writer.result();
+
+ // Check row count
+ for (DataFileMeta file : metasResult) {
+ assertThat(file.rowCount()).isEqualTo(rowNum);
+ assertThat(file.deleteRowCount().get()).isEqualTo(0); // There is no deleted rows
+ }
+
+ // Check statistics
+ for (DataFileMeta file : metasResult) {
+ if (BlobFileFormat.isBlobFile(file.fileName())) {
+ assertThat(file.writeCols()).isEqualTo(Collections.singletonList("f2"));
+ } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) {
+ assertThat(file.writeCols()).isEqualTo(Arrays.asList("f3", "f4"));
+ assertThat(file.valueStats().minValues().getInt(0)).isEqualTo(0);
+ assertThat(file.valueStats().maxValues().getInt(0)).isEqualTo(rowNum - 1);
+ } else {
+ assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1"));
+ // Json does not implement createStatsExtractor so we skip it here.
+ // assertThat(file.valueStats().minValues().getInt(1)).isGreaterThan(0);
+ }
+ }
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(rowNum);
+ }
+
+ @Test
+ public void testVectorStoreNoBlob() throws Exception {
+ RowType schema =
+ RowType.builder()
+ .field("f0", DataTypes.INT())
+ .field("f1", DataTypes.STRING())
+ .field("f2", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT()))
+ .field("f3", DataTypes.INT())
+ .build();
+ writer =
+ new DataEvolutionRollingFileWriter(
+ LocalFileIO.create(),
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("parquet", new Options()),
+ FileFormat.fromIdentifier("json", new Options()),
+ Arrays.asList("f2", "f3"),
+ TARGET_FILE_SIZE,
+ TARGET_FILE_SIZE,
+ VECTOR_STORE_TARGET_FILE_SIZE,
+ schema,
+ pathFactory,
+ () -> seqNumCounter,
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false,
+ false,
+ null);
+
+ // 100k vector-store data would create 1 normal and 3 vector-store files
+ int rowNum = 100 * 1000;
+ List rows = makeRows(rowNum, 1);
+ for (InternalRow row : rows) {
+ writer.write(
+ GenericRow.of(
+ row.getInt(0), row.getString(1), row.getVector(3), row.getInt(4)));
+ }
+ writer.close();
+ List results = writer.result();
+
+ // Check normal, blob, and vector-store files
+ List normalFiles = new ArrayList<>();
+ List blobFiles = new ArrayList<>();
+ List vectorStoreFiles = new ArrayList<>();
+ for (DataFileMeta file : results) {
+ if (BlobFileFormat.isBlobFile(file.fileName())) {
+ blobFiles.add(file);
+ } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) {
+ vectorStoreFiles.add(file);
+ } else {
+ normalFiles.add(file);
+ }
+ }
+ assertThat(normalFiles.size()).isEqualTo(1);
+ assertThat(blobFiles.size()).isEqualTo(0);
+ assertThat(vectorStoreFiles.size()).isEqualTo(3);
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(rowNum);
+ }
+
+ @Test
+ public void testVectorStoreTheSameFormat() throws Exception {
+ // vector-store file format is the same as main part
+ writer =
+ new DataEvolutionRollingFileWriter(
+ LocalFileIO.create(),
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("json", new Options()),
+ FileFormat.fromIdentifier("json", new Options()),
+ Arrays.asList("f3", "f4"),
+ TARGET_FILE_SIZE,
+ TARGET_FILE_SIZE,
+ VECTOR_STORE_TARGET_FILE_SIZE,
+ SCHEMA,
+ pathFactory,
+ () -> seqNumCounter,
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false,
+ false,
+ null);
+
+ // This time we use large blob files
+ int rowNum = 10;
+ writer.write(makeRows(rowNum, 512 * 1024).iterator());
+ writer.close();
+ List results = writer.result();
+
+ // Check normal, blob, and vector-store files
+ List normalFiles = new ArrayList<>();
+ List blobFiles = new ArrayList<>();
+ List vectorStoreFiles = new ArrayList<>();
+ for (DataFileMeta file : results) {
+ if (BlobFileFormat.isBlobFile(file.fileName())) {
+ blobFiles.add(file);
+ } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) {
+ vectorStoreFiles.add(file);
+ } else {
+ assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1", "f3", "f4"));
+ normalFiles.add(file);
+ }
+ }
+ assertThat(normalFiles.size()).isEqualTo(1);
+ assertThat(blobFiles.size()).isEqualTo(3);
+ assertThat(vectorStoreFiles.size()).isEqualTo(0);
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(rowNum);
+ }
+
+ private List makeRows(int rowNum, int blobDataSize) {
+ List rows = new ArrayList<>(rowNum);
+ byte[] blobData = new byte[blobDataSize];
+ RANDOM.nextBytes(blobData);
+ for (int i = 0; i < rowNum; ++i) {
+ byte[] string = new byte[1];
+ RANDOM.nextBytes(string);
+ byte[] buf = new byte[VECTOR_DIM];
+ RANDOM.nextBytes(buf);
+ float[] vector = new float[VECTOR_DIM];
+ for (int j = 0; j < VECTOR_DIM; ++j) {
+ vector[j] = buf[j];
+ }
+ int label = RANDOM.nextInt(32) + 1;
+ rows.add(
+ GenericRow.of(
+ i,
+ BinaryString.fromBytes(string),
+ new BlobData(blobData),
+ BinaryVector.fromPrimitiveArray(vector),
+ label));
+ }
+ return rows;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
index 97f621a72024..4b6d1c579a9c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -62,9 +62,7 @@ public void testBasic() throws Exception {
.collect(Collectors.toList());
RowType rowType = table.schema().logicalRowType();
- List fieldGroups =
- splitFieldBunches(
- filesMetas, file -> rowType.getField(file.writeCols().get(0)).id());
+ List fieldGroups = splitFieldBunches(filesMetas, file -> rowType);
assertThat(fieldGroups.size()).isEqualTo(3);
assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java
new file mode 100644
index 000000000000..8bc7bad2d8c4
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for table with vector-store and data evolution. */
+public class VectorStoreTableTest extends TableTestBase {
+
+ private static final int VECTOR_DIM = 10;
+
+ private AtomicInteger uniqueIdGen = new AtomicInteger(0);
+
+ private Map rowsWritten = new HashMap<>();
+
+ @Test
+ public void testBasic() throws Exception {
+ int rowNum = RANDOM.nextInt(64) + 1;
+
+ createTableDefault();
+
+ commitDefault(writeDataDefault(rowNum, 1));
+
+ AtomicInteger counter = new AtomicInteger(0);
+
+ List filesMetas =
+ getTableDefault().store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ List fieldGroups =
+ DataEvolutionSplitRead.splitFieldBunches(
+ filesMetas, key -> makeBlobRowType(key.writeCols(), f -> 0));
+
+ assertThat(fieldGroups.size()).isEqualTo(3);
+ assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(2).files().size()).isEqualTo(1);
+
+ readDefault(
+ row -> {
+ counter.getAndIncrement();
+ InternalRow expected = rowsWritten.get(row.getInt(0));
+ assertThat(row.getString(1)).isEqualTo(expected.getString(1));
+ assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2));
+ assertThat(row.getVector(3).toFloatArray())
+ .isEqualTo(expected.getVector(3).toFloatArray());
+ assertThat(row.getInt(4)).isEqualTo(expected.getInt(4));
+ });
+
+ assertThat(counter.get()).isEqualTo(rowNum);
+ }
+
+ @Test
+ public void testMultiBatch() throws Exception {
+ int rowNum = (RANDOM.nextInt(64) + 1) * 2;
+
+ createTableDefault();
+
+ commitDefault(writeDataDefault(rowNum / 2, 2));
+
+ AtomicInteger counter = new AtomicInteger(0);
+
+ List filesMetas =
+ getTableDefault().store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas);
+ assertThat(batches.size()).isEqualTo(2);
+ for (List batch : batches) {
+ List fieldGroups =
+ DataEvolutionSplitRead.splitFieldBunches(
+ batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
+ assertThat(fieldGroups.size()).isEqualTo(3);
+ assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(2).files().size()).isEqualTo(1);
+ }
+
+ readDefault(
+ row -> {
+ counter.getAndIncrement();
+ InternalRow expected = rowsWritten.get(row.getInt(0));
+ assertThat(row.getString(1)).isEqualTo(expected.getString(1));
+ assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2));
+ assertThat(row.getVector(3).toFloatArray())
+ .isEqualTo(expected.getVector(3).toFloatArray());
+ assertThat(row.getInt(4)).isEqualTo(expected.getInt(4));
+ });
+ assertThat(counter.get()).isEqualTo(rowNum);
+ }
+
+ @Test
+ public void testRolling() throws Exception {
+ // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files
+ int rowNum = 100 * 1000 * 3;
+
+ createTableDefault();
+
+ commitDefault(writeDataDefault(rowNum / 3, 3));
+
+ AtomicInteger counter = new AtomicInteger(0);
+
+ List filesMetas =
+ getTableDefault().store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas);
+ assertThat(batches.size()).isEqualTo(3);
+ for (List batch : batches) {
+ List fieldGroups =
+ DataEvolutionSplitRead.splitFieldBunches(
+ batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
+ assertThat(fieldGroups.size()).isEqualTo(3);
+ assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(2).files().size()).isEqualTo(3);
+ }
+
+ readDefault(
+ row -> {
+ counter.getAndIncrement();
+ InternalRow expected = rowsWritten.get(row.getInt(0));
+ assertThat(row.getString(1)).isEqualTo(expected.getString(1));
+ assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2));
+ assertThat(row.getVector(3).toFloatArray())
+ .isEqualTo(expected.getVector(3).toFloatArray());
+ assertThat(row.getInt(4)).isEqualTo(expected.getInt(4));
+ });
+
+ assertThat(counter.get()).isEqualTo(rowNum);
+ }
+
+ @Test
+ public void testWithoutBlob() throws Exception {
+ // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files
+ int rowNum = 100 * 1000 * 3;
+
+ catalog.createTable(identifier(), schemaWithoutBlob(), true);
+
+ commitDefault(writeDataWithoutBlob(rowNum / 3, 3));
+
+ AtomicInteger counter = new AtomicInteger(0);
+
+ List filesMetas =
+ getTableDefault().store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas);
+ assertThat(batches.size()).isEqualTo(3);
+ for (List batch : batches) {
+ List fieldGroups =
+ DataEvolutionSplitRead.splitFieldBunches(
+ batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
+ assertThat(fieldGroups.size()).isEqualTo(2);
+ assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(3);
+ }
+
+ readDefault(
+ row -> {
+ counter.getAndIncrement();
+ InternalRow expected = rowsWritten.get(row.getInt(0));
+ assertThat(row.getString(1)).isEqualTo(expected.getString(1));
+ assertThat(row.getVector(2).toFloatArray())
+ .isEqualTo(expected.getVector(3).toFloatArray());
+ assertThat(row.getInt(3)).isEqualTo(expected.getInt(4));
+ });
+
+ assertThat(counter.get()).isEqualTo(rowNum);
+ }
+
+ @Override
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.column("f3", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT()));
+ schemaBuilder.column("f4", DataTypes.INT());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB");
+ schemaBuilder.option(CoreOptions.VECTOR_STORE_TARGET_FILE_SIZE.key(), "4 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.VECTOR_STORE_FIELDS.key(), "f3,f4");
+ schemaBuilder.option(CoreOptions.VECTOR_STORE_FORMAT.key(), "json");
+ schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none");
+ return schemaBuilder.build();
+ }
+
+ private Schema schemaWithoutBlob() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT()));
+ schemaBuilder.column("f3", DataTypes.INT());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB");
+ schemaBuilder.option(CoreOptions.VECTOR_STORE_TARGET_FILE_SIZE.key(), "4 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.VECTOR_STORE_FIELDS.key(), "f2,f3");
+ schemaBuilder.option(CoreOptions.VECTOR_STORE_FORMAT.key(), "json");
+ schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none");
+ return schemaBuilder.build();
+ }
+
+ protected List writeDataWithoutBlob(int size, int times) throws Exception {
+ Table table = getTableDefault();
+ List messages = new ArrayList<>();
+ for (int time = 0; time < times; time++) {
+ StreamWriteBuilder builder = table.newStreamWriteBuilder();
+ builder.withCommitUser(commitUser);
+ try (StreamTableWrite streamTableWrite = builder.newWrite()) {
+ for (int j = 0; j < size; j++) {
+ InternalRow row = dataDefault(time, j);
+ streamTableWrite.write(
+ GenericRow.of(
+ row.getInt(0),
+ row.getString(1),
+ row.getVector(3),
+ row.getInt(4)));
+ }
+ messages.addAll(streamTableWrite.prepareCommit(false, Long.MAX_VALUE));
+ }
+ }
+ return messages;
+ }
+
+ @Override
+ protected InternalRow dataDefault(int time, int size) {
+ byte[] stringBytes = new byte[1];
+ RANDOM.nextBytes(stringBytes);
+ byte[] blobBytes = new byte[1];
+ RANDOM.nextBytes(blobBytes);
+ byte[] vectorBytes = new byte[VECTOR_DIM];
+ RANDOM.nextBytes(vectorBytes);
+ float[] vector = new float[VECTOR_DIM];
+ for (int i = 0; i < VECTOR_DIM; i++) {
+ vector[i] = vectorBytes[i];
+ }
+ int id = uniqueIdGen.getAndIncrement();
+ InternalRow row =
+ GenericRow.of(
+ id,
+ BinaryString.fromBytes(stringBytes),
+ new BlobData(blobBytes),
+ BinaryVector.fromPrimitiveArray(vector),
+ RANDOM.nextInt(32) + 1);
+ rowsWritten.put(id, row);
+ return row;
+ }
+
+ private static RowType makeBlobRowType(
+ List fieldNames, Function fieldIdFunc) {
+ List fields = new ArrayList<>();
+ if (fieldNames == null) {
+ fieldNames = Collections.emptyList();
+ }
+ for (String fieldName : fieldNames) {
+ int fieldId = fieldIdFunc.apply(fieldName);
+ DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB());
+ fields.add(blobField);
+ }
+ return new RowType(fields);
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 1edf91e1d22d..ed314e80292b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -271,6 +271,9 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
IOManager.create(tempDir.toString()),
0,
fileFormat,
+ null,
+ Collections.emptyList(),
+ 10,
10,
10,
schema,
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index 452813d9d93c..19d53f45309b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -21,9 +21,12 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.operation.DataEvolutionSplitRead.BlobBunch;
import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
+import org.apache.paimon.operation.DataEvolutionSplitRead.SplitBunch;
import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -32,20 +35,21 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Function;
import static org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link BlobBunch}. */
+/** Tests for {@link SplitBunch}. */
public class DataEvolutionReadTest {
- private BlobBunch blobBunch;
+ private SplitBunch blobBunch;
@BeforeEach
public void setUp() {
- blobBunch = new BlobBunch(Long.MAX_VALUE, false);
+ blobBunch = new SplitBunch(Long.MAX_VALUE, false);
}
@Test
@@ -84,7 +88,7 @@ public void testAddNonBlobFileThrowsException() {
assertThatThrownBy(() -> blobBunch.add(normalFile))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Only blob file can be added to a blob bunch.");
+ .hasMessage("Only blob/vector-store file can be added to this bunch.");
}
@Test
@@ -97,7 +101,7 @@ public void testAddBlobFileWithSameFirstRowId() {
assertThatThrownBy(() -> blobBunch.add(blobEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob file with same first row id should have decreasing sequence number.");
+ "Blob/vector-store file with same first row id should have decreasing sequence number.");
}
@Test
@@ -136,7 +140,7 @@ public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() {
assertThatThrownBy(() -> blobBunch.add(blobEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob file with overlapping row id should have decreasing sequence number.");
+ "Blob/vector-store file with overlapping row id should have decreasing sequence number.");
}
@Test
@@ -148,7 +152,8 @@ public void testAddBlobFileWithNonContinuousRowId() {
// Adding file with non-continuous row id should throw exception
assertThatThrownBy(() -> blobBunch.add(blobEntry2))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Blob file first row id should be continuous, expect 100 but got 200");
+ .hasMessage(
+ "Blob/vector-store file first row id should be continuous, expect 100 but got 200");
}
@Test
@@ -161,7 +166,7 @@ public void testAddBlobFileWithDifferentWriteCols() {
// Adding file with different write columns should throw exception
assertThatThrownBy(() -> blobBunch.add(blobEntry2))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("All files in a blob bunch should have the same write columns.");
+ .hasMessage("All files in this bunch should have the same write columns.");
}
@Test
@@ -214,10 +219,11 @@ public void testComplexBlobBunchScenario2() {
assertThat(batch.get(8).fileName()).contains("blob4"); // skip
assertThat(batch.get(9).fileName()).contains("blob8"); // pick
- List fieldBunches = splitFieldBunches(batch, file -> 0);
+ List fieldBunches =
+ splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
assertThat(fieldBunches.size()).isEqualTo(2);
- BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1);
+ SplitBunch blobBunch = (SplitBunch) fieldBunches.get(1);
assertThat(blobBunch.files).hasSize(4);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
@@ -265,17 +271,18 @@ public void testComplexBlobBunchScenario3() {
List batch = batches.get(0);
List fieldBunches =
- splitFieldBunches(batch, file -> file.writeCols().get(0).hashCode());
+ splitFieldBunches(
+ batch, file -> makeBlobRowType(file.writeCols(), String::hashCode));
assertThat(fieldBunches.size()).isEqualTo(3);
- BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1);
+ SplitBunch blobBunch = (SplitBunch) fieldBunches.get(1);
assertThat(blobBunch.files).hasSize(4);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
- blobBunch = (BlobBunch) fieldBunches.get(2);
+ blobBunch = (SplitBunch) fieldBunches.get(2);
assertThat(blobBunch.files).hasSize(4);
assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
@@ -322,22 +329,22 @@ private DataFileMeta createBlobFileWithCols(
@Test
public void testRowIdPushDown() {
- BlobBunch blobBunch = new BlobBunch(Long.MAX_VALUE, true);
+ SplitBunch blobBunch = new SplitBunch(Long.MAX_VALUE, true);
DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
blobBunch.add(blobEntry1);
- BlobBunch finalBlobBunch = blobBunch;
+ SplitBunch finalBlobBunch = blobBunch;
DataFileMeta finalBlobEntry = blobEntry2;
assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
- blobBunch = new BlobBunch(Long.MAX_VALUE, true);
+ blobBunch = new SplitBunch(Long.MAX_VALUE, true);
blobEntry1 = createBlobFile("blob1", 0, 100, 1);
blobEntry2 = createBlobFile("blob2", 50, 200, 2);
blobBunch.add(blobEntry1);
blobBunch.add(blobEntry2);
assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
- BlobBunch finalBlobBunch2 = blobBunch;
+ SplitBunch finalBlobBunch2 = blobBunch;
DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2);
assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException();
}
@@ -371,4 +378,18 @@ private DataFileMeta createNormalFile(
firstRowId,
null);
}
+
+ private static RowType makeBlobRowType(
+ List fieldNames, Function fieldIdFunc) {
+ List fields = new ArrayList<>();
+ if (fieldNames == null) {
+ fieldNames = Collections.emptyList();
+ }
+ for (String fieldName : fieldNames) {
+ int fieldId = fieldIdFunc.apply(fieldName);
+ DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB());
+ fields.add(blobField);
+ }
+ return new RowType(fields);
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java
index 2f0b1b2f1b4d..305c5c2b78f9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java
@@ -115,6 +115,24 @@ public void testSplitWithMultipleBlobFilesPerGroup() {
assertEquals(Arrays.asList(file4, file5, file6), result.get(1));
}
+ @Test
+ public void testSplitWithMultipleVectorStoreFilesPerGroup() {
+ DataFileMeta file1 = createFile("file1.parquet", 1L, 10, 1);
+ DataFileMeta file2 = createFile("file2.vector-store.json", 1L, 1, 1);
+ DataFileMeta file3 = createFile("file3.vector-store.json", 2L, 9, 1);
+ DataFileMeta file4 = createFile("file4.parquet", 20L, 10, 2);
+ DataFileMeta file5 = createFile("file5.vector-store.json", 20L, 5, 2);
+ DataFileMeta file6 = createFile("file6.vector-store.json", 25L, 5, 2);
+ DataFileMeta file7 = createFile("file7.parquet", 1L, 10, 3);
+
+ List files = Arrays.asList(file1, file2, file3, file4, file5, file6, file7);
+ List> result = DataEvolutionSplitRead.mergeRangesAndSort(files);
+
+ assertEquals(2, result.size());
+ assertEquals(Arrays.asList(file7, file1, file2, file3), result.get(0));
+ assertEquals(Arrays.asList(file4, file5, file6), result.get(1));
+ }
+
private static DataFileMeta createFile(
String name, long firstRowId, long rowCount, long maxSequence) {
return DataFileMeta.create(
diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 9b789e246be2..4ced9061c546 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -35,7 +35,10 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.VECTOR_STORE_FIELDS;
+import static org.apache.paimon.CoreOptions.VECTOR_STORE_FORMAT;
import static org.apache.paimon.schema.SchemaValidation.validateTableSchema;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatNoException;
@@ -200,4 +203,142 @@ public void testChainTableAllowsNonDeduplicateMergeEngine() {
assertThatNoException().isThrownBy(() -> validateTableSchema(schema));
}
+
+ @Test
+ public void testVectorStoreUnknownColumn() {
+ Map options = new HashMap<>();
+ options.put(BUCKET.key(), String.valueOf(-1));
+ options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ options.put(DATA_EVOLUTION_ENABLED.key(), "true");
+ options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+ options.put(VECTOR_STORE_FORMAT.key(), "json");
+ options.put(VECTOR_STORE_FIELDS.key(), "f99");
+
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.STRING()));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ "")))
+ .hasMessage("Some of the columns specified as vector-store are unknown.");
+ }
+
+ @Test
+ public void testVectorStoreContainsBlobColumn() {
+ Map options = new HashMap<>();
+ options.put(BUCKET.key(), String.valueOf(-1));
+ options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ options.put(DATA_EVOLUTION_ENABLED.key(), "true");
+ options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+ options.put(VECTOR_STORE_FORMAT.key(), "json");
+ options.put(VECTOR_STORE_FIELDS.key(), "blob");
+
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "blob", DataTypes.BLOB()));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ "")))
+ .hasMessage("The vector-store columns can not be blob type.");
+ }
+
+ @Test
+ public void testVectorStoreContainsPartitionColumn() {
+ Map options = new HashMap<>();
+ options.put(BUCKET.key(), String.valueOf(-1));
+ options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ options.put(DATA_EVOLUTION_ENABLED.key(), "true");
+ options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+ options.put(VECTOR_STORE_FORMAT.key(), "json");
+ options.put(VECTOR_STORE_FIELDS.key(), "f1");
+
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.STRING()));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ singletonList("f1"),
+ emptyList(),
+ options,
+ "")))
+ .hasMessage("The vector-store columns can not be part of partition keys.");
+ }
+
+ @Test
+ public void testVectorStoreRequireNormalColumns() {
+ Map options = new HashMap<>();
+ options.put(BUCKET.key(), String.valueOf(-1));
+ options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ options.put(DATA_EVOLUTION_ENABLED.key(), "true");
+ options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+ options.put(VECTOR_STORE_FORMAT.key(), "json");
+ options.put(VECTOR_STORE_FIELDS.key(), "f0,f1");
+
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.STRING()));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ "")))
+ .hasMessage("Table with vector-store must have other normal columns.");
+ }
+
+ @Test
+ public void testVectorStoreRequiresDataEvolutionEnabled() {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+ options.put(VECTOR_STORE_FORMAT.key(), "json");
+ options.put(VECTOR_STORE_FIELDS.key(), "f1");
+
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.STRING()));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ "")))
+ .hasMessage(
+ "Data evolution config must enabled for table with vector-store file format.");
+ }
}
diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java
new file mode 100644
index 000000000000..be89fbfca155
--- /dev/null
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/** E2E test for vector-store with data evolution. */
+public class FlinkVectorStoreE2eTest extends E2eTestBase {
+
+ @Test
+ public void testVectorStoreTable() throws Exception {
+ Random rnd = new Random(System.currentTimeMillis());
+ int vectorDim = rnd.nextInt(10) + 1;
+ final int itemNum = rnd.nextInt(3) + 1;
+
+ String catalogDdl =
+ String.format(
+ "CREATE CATALOG ts_catalog WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s'\n"
+ + ");",
+ TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store");
+
+ String useCatalogCmd = "USE CATALOG ts_catalog;";
+
+ String createTableDdl =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS ts_table (\n"
+ + " id BIGINT,\n"
+ + " embed ARRAY\n"
+ + ") WITH (\n"
+ + " 'file.format' = 'parquet',\n"
+ + " 'file.compression' = 'none',\n"
+ + " 'row-tracking.enabled' = 'true',\n"
+ + " 'data-evolution.enabled' = 'true',\n"
+ + " 'vector.file.format' = 'json',\n"
+ + " 'vector-field' = 'embed',\n"
+ + " 'field.embed.vector-dim' = '%d'\n"
+ + ");",
+ vectorDim);
+
+ float[][] vectors = new float[itemNum][vectorDim];
+ byte[] vectorDataBuf = new byte[vectorDim];
+ for (int i = 0; i < itemNum; ++i) {
+ vectors[i] = new float[vectorDim];
+ rnd.nextBytes(vectorDataBuf);
+ for (int j = 0; j < vectorDim; ++j) {
+ vectors[i][j] = vectorDataBuf[j];
+ }
+ }
+
+ List values = new ArrayList<>();
+ String[] expected = new String[itemNum];
+ for (int id = 0; id < itemNum; ++id) {
+ values.add(String.format("(%d, %s)", id, arrayLiteral(vectors[id])));
+ expected[id] = String.format("%d, %s", id, Arrays.toString(vectors[id]));
+ }
+
+ runBatchSql(
+ "INSERT INTO ts_table VALUES " + String.join(", ", values) + ";",
+ catalogDdl,
+ useCatalogCmd,
+ createTableDdl);
+
+ runBatchSql(
+ "INSERT INTO result1 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ createTableDdl,
+ createResultSink("result1", "id BIGINT, embed ARRAY"));
+ checkResult(expected);
+ clearCurrentResults();
+
+ runBatchSql(
+ "INSERT INTO result2 SELECT "
+ + "COUNT(*) AS total, "
+ + "SUM(CASE WHEN file_path LIKE '%.vector-store%.json' THEN 1 ELSE 0 END) "
+ + "AS vector_files "
+ + "FROM \\`ts_table\\$files\\`;",
+ catalogDdl,
+ useCatalogCmd,
+ createTableDdl,
+ createResultSink("result2", "total BIGINT, vector_files BIGINT"));
+ checkResult("2, 1");
+ }
+
+ private String arrayLiteral(float[] vector) {
+ return "ARRAY" + Arrays.toString(vector);
+ }
+}