From 58ab3570e006394757657736be2a1b09f1428a1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 5 Feb 2026 16:08:27 +0800 Subject: [PATCH 1/5] [flink] [spark] Write blob with adaptive match blob-descriptor --- .../apache/paimon/data/BlobDescriptor.java | 34 ++++++++++++++---- .../apache/paimon/utils/UriReaderFactory.java | 3 +- .../paimon/data/BlobDescriptorTest.java | 28 ++++++++++++--- .../resources/compatible/blob_descriptor_v1 | Bin 0 -> 31 bytes .../apache/paimon/flink/FlinkRowWrapper.java | 17 ++++----- .../paimon/flink/action/CompactAction.java | 2 -- .../compact/IncrementalClusterCompact.java | 2 -- .../paimon/flink/sink/FlinkSinkBuilder.java | 9 ++--- .../paimon/spark/SparkInternalRowWrapper.java | 15 ++++---- .../org/apache/paimon/spark/SparkRow.java | 21 +++++------ .../commands/DataEvolutionPaimonWriter.scala | 1 - .../spark/commands/PaimonSparkWriter.scala | 2 -- .../SparkPostponeCompactProcedure.scala | 1 - .../paimon/spark/util/SparkRowUtils.scala | 4 +-- .../write/DataEvolutionTableDataWrite.scala | 3 +- .../paimon/spark/write/PaimonDataWrite.scala | 3 +- .../spark/write/PaimonV2DataWriter.scala | 9 ++--- .../paimon/spark/sql/BlobTestBase.scala | 7 ++-- 18 files changed, 87 insertions(+), 74 deletions(-) create mode 100644 paimon-common/src/test/resources/compatible/blob_descriptor_v1 diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java index 3e60ea34867d..7a0b3bccd2a2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java @@ -45,8 +45,8 @@ public class BlobDescriptor implements Serializable { private static final long serialVersionUID = 1L; - - private static final byte CURRENT_VERSION = 1; + private static final long MAGIC = 0x424C4F4244455343L; // "BLOBDESC" + private static final byte CURRENT_VERSION = 2; private final byte version; private final String uri; @@ -113,11 +113,12 @@ public byte[] serialize() { byte[] uriBytes = uri.getBytes(UTF_8); int uriLength = uriBytes.length; - int totalSize = 1 + 4 + uriLength + 8 + 8; + int totalSize = 1 + 8 + 4 + uriLength + 8 + 8; ByteBuffer buffer = ByteBuffer.allocate(totalSize); buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.put(version); + buffer.putLong(MAGIC); buffer.putInt(uriLength); buffer.put(uriBytes); @@ -130,16 +131,26 @@ public byte[] serialize() { public static BlobDescriptor deserialize(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); buffer.order(ByteOrder.LITTLE_ENDIAN); - byte version = buffer.get(); - if (version != CURRENT_VERSION) { + if (version > CURRENT_VERSION) { throw new UnsupportedOperationException( - "Expecting BlobDescriptor version to be " + "Expecting BlobDescriptor version to be less than or equal to " + CURRENT_VERSION + ", but found " + version + "."); } + + if (version > 1) { + if (MAGIC != buffer.getLong()) { + throw new IllegalArgumentException( + "Invalid BlobDescriptor: missing magic header. Expected magic: " + + MAGIC + + ", but found: " + + buffer.getLong()); + } + } + int uriLength = buffer.getInt(); byte[] uriBytes = new byte[uriLength]; buffer.get(uriBytes); @@ -149,4 +160,15 @@ public static BlobDescriptor deserialize(byte[] bytes) { long length = buffer.getLong(); return new BlobDescriptor(version, uri, offset, length); } + + public static boolean isBlobDescriptor(byte[] bytes) { + if (bytes.length < 8) { + return false; + } + byte[] copy = new byte[8]; + System.arraycopy(bytes, 1, copy, 0, 8); + ByteBuffer magicBuffer = ByteBuffer.wrap(copy); + magicBuffer.order(ByteOrder.LITTLE_ENDIAN); + return MAGIC == magicBuffer.getLong(); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java index 92e3b8e6a842..6e21aee2d236 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java @@ -25,13 +25,14 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.io.Serializable; import java.net.URI; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** A factory to create and cache {@link UriReader}. */ -public class UriReaderFactory { +public class UriReaderFactory implements Serializable { private final CatalogContext context; private final Map readers; diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java index ef860e8c55a7..53911556885e 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java @@ -18,6 +18,8 @@ package org.apache.paimon.data; +import org.apache.paimon.utils.IOUtils; + import org.junit.jupiter.api.Test; import java.lang.reflect.Constructor; @@ -38,7 +40,7 @@ public void testEquals() throws Exception { BlobDescriptor descriptor3 = new BlobDescriptor(uri2, 100L, 200L); BlobDescriptor descriptor4 = new BlobDescriptor(uri1, 150L, 200L); BlobDescriptor descriptor5 = new BlobDescriptor(uri1, 100L, 250L); - BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 2, uri1, 100L, 200L); + BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 3, uri1, 100L, 200L); assertThat(descriptor1).isEqualTo(descriptor2); assertThat(descriptor1).isNotEqualTo(descriptor3); assertThat(descriptor1).isNotEqualTo(descriptor4); @@ -64,7 +66,7 @@ public void testToString() { BlobDescriptor descriptor = new BlobDescriptor(uri, 100L, 200L); String toString = descriptor.toString(); - assertThat(toString).contains("version=1"); + assertThat(toString).contains("version=2"); assertThat(toString).contains("uri='/test/path'"); assertThat(toString).contains("offset=100"); assertThat(toString).contains("length=200"); @@ -90,10 +92,26 @@ public void testSerializeAndDeserialize() { public void testDeserializeWithUnsupportedVersion() { String uri = "/test/path"; byte[] serialized = new BlobDescriptor(uri, 1, 1).serialize(); - serialized[0] = 2; + serialized[8] = 3; assertThatThrownBy(() -> BlobDescriptor.deserialize(serialized)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Expecting BlobDescriptor version to be 1, but found 2."); + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Invalid BlobDescriptor: missing magic header. Expected magic: 4777280450765083459, but found: 8315180032221773834"); + } + + @Test + public void testBlobVersionCompatible() throws Exception { + byte[] serialized = + IOUtils.readFully( + BlobDescriptorTest.class + .getClassLoader() + .getResourceAsStream("compatible/blob_descriptor_v1"), + true); + + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(serialized); + assertThat(blobDescriptor.uri()).isEqualTo("/test/path"); + assertThat(blobDescriptor.offset()).isEqualTo(100L); + assertThat(blobDescriptor.length()).isEqualTo(200L); } private BlobDescriptor createDescriptorWithVersion( diff --git a/paimon-common/src/test/resources/compatible/blob_descriptor_v1 b/paimon-common/src/test/resources/compatible/blob_descriptor_v1 new file mode 100644 index 0000000000000000000000000000000000000000..9e3412deac81b95558ad47e5509eae8e1b0514af GIT binary patch literal 31 ecmZSNVqjp partitioned = FlinkStreamPartitioner.partition( FlinkSinkBuilder.mapToInternalRow( sourcePair.getLeft(), realTable.rowType(), - blobAsDescriptor, table.catalogEnvironment().catalogContext()), new RowDataChannelComputer(realTable.schema()), null); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java index d1e3d11e6225..ee3b68f609a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java @@ -157,13 +157,11 @@ protected List> buildCompactOperator( // 2.3 write and then reorganize the committable // set parallelism to null, and it'll forward parallelism when doWrite() RowAppendTableSink sink = new RowAppendTableSink(table, null, null); - boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); DataStream written = sink.doWrite( FlinkSinkBuilder.mapToInternalRow( sorted, table.rowType(), - blobAsDescriptor, table.catalogEnvironment().catalogContext()), commitUser, null); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 0a4dc174539f..87d12924533a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -207,15 +207,13 @@ public FlinkSinkBuilder clusteringIfPossible( public DataStreamSink build() { setParallelismIfAdaptiveConflict(); input = trySortInput(input); - boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); CatalogContext contextForDescriptor = BlobDescriptorUtils.getCatalogContext( table.catalogEnvironment().catalogContext(), table.coreOptions().toConfiguration()); DataStream input = - mapToInternalRow( - this.input, table.rowType(), blobAsDescriptor, contextForDescriptor); + mapToInternalRow(this.input, table.rowType(), contextForDescriptor); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { SingleOutputStreamOperator newInput = input.forward() @@ -247,14 +245,11 @@ public DataStreamSink build() { public static DataStream mapToInternalRow( DataStream input, org.apache.paimon.types.RowType rowType, - boolean blobAsDescriptor, CatalogContext catalogContext) { SingleOutputStreamOperator result = input.map( (MapFunction) - r -> - new FlinkRowWrapper( - r, blobAsDescriptor, catalogContext)) + r -> new FlinkRowWrapper(r, catalogContext)) .returns( org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType( rowType)); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index 3ad3666e9760..d2e70b9ed3b4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -61,28 +61,25 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { private final StructType tableSchema; private final int length; - private final boolean blobAsDescriptor; @Nullable private final UriReaderFactory uriReaderFactory; @Nullable private final int[] fieldIndexMap; private transient org.apache.spark.sql.catalyst.InternalRow internalRow; public SparkInternalRowWrapper(StructType tableSchema, int length) { - this(tableSchema, length, null, false, null); + this(tableSchema, length, null, null); } public SparkInternalRowWrapper( StructType tableSchema, int length, StructType dataSchema, - boolean blobAsDescriptor, CatalogContext catalogContext) { this.tableSchema = tableSchema; this.length = length; this.fieldIndexMap = dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null; - this.blobAsDescriptor = blobAsDescriptor; - this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null; + this.uriReaderFactory = new UriReaderFactory(catalogContext); } public SparkInternalRowWrapper replace(org.apache.spark.sql.catalyst.InternalRow internalRow) { @@ -243,12 +240,14 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - if (blobAsDescriptor) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(internalRow.getBinary(pos)); + byte[] bytes = internalRow.getBinary(pos); + boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); + if (blobDes) { + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); return Blob.fromDescriptor(uriReader, blobDescriptor); } else { - return new BlobData(internalRow.getBinary(pos)); + return new BlobData(bytes); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index cc947c7ea04b..36b5624ff52f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -62,24 +62,17 @@ public class SparkRow implements InternalRow, Serializable { private final RowType type; private final Row row; private final RowKind rowKind; - private final boolean blobAsDescriptor; private final UriReaderFactory uriReaderFactory; public SparkRow(RowType type, Row row) { - this(type, row, RowKind.INSERT, false, null); + this(type, row, RowKind.INSERT, null); } - public SparkRow( - RowType type, - Row row, - RowKind rowkind, - boolean blobAsDescriptor, - CatalogContext catalogContext) { + public SparkRow(RowType type, Row row, RowKind rowkind, CatalogContext catalogContext) { this.type = type; this.row = row; this.rowKind = rowkind; - this.blobAsDescriptor = blobAsDescriptor; - this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null; + this.uriReaderFactory = new UriReaderFactory(catalogContext); } @Override @@ -168,12 +161,14 @@ public Variant getVariant(int i) { @Override public Blob getBlob(int i) { - if (blobAsDescriptor) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getAs(i)); + byte[] bytes = row.getAs(i); + boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); + if (blobDes) { + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); return Blob.fromDescriptor(uriReader, blobDescriptor); } else { - return new BlobData(row.getAs(i)); + return new BlobData(bytes); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index 077d92cc292b..a5d9278523ab 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -74,7 +74,6 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se table.newBatchWriteBuilder(), writeType, firstRowIdToPartitionMap, - coreOptions.blobAsDescriptor(), table.catalogEnvironment().catalogContext()) try { iter.foreach(row => write.write(row)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index de5804cc72f2..ebdbf038fff2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -138,7 +138,6 @@ case class PaimonSparkWriter( writeRowTracking, fullCompactionDeltaCommits, batchId, - coreOptions.blobAsDescriptor(), table.catalogEnvironment().catalogContext(), postponePartitionBucketComputer ) @@ -455,7 +454,6 @@ case class PaimonSparkWriter( val toPaimonRow = SparkRowUtils.toPaimonRow( rowType, rowKindColIdx, - table.coreOptions().blobAsDescriptor(), table.catalogEnvironment().catalogContext()) bootstrapIterator.asScala diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala index 572e1cf96c22..5c25b70e6754 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala @@ -104,7 +104,6 @@ case class SparkPostponeCompactProcedure( writeRowTracking = coreOptions.dataEvolutionEnabled(), Option.apply(coreOptions.fullCompactionDeltaCommits()), None, - coreOptions.blobAsDescriptor(), table.catalogEnvironment().catalogContext(), Some(postponePartitionBucketComputer) ) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala index 86b070083fe9..eee55454546c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala @@ -30,7 +30,6 @@ object SparkRowUtils { def toPaimonRow( writeType: RowType, rowkindColIdx: Int, - blobAsDescriptor: Boolean, catalogContext: CatalogContext): Row => SparkRow = { if (rowkindColIdx != -1) { row => @@ -38,9 +37,8 @@ object SparkRowUtils { writeType, row, RowKind.fromByteValue(row.getByte(rowkindColIdx)), - blobAsDescriptor, catalogContext) - } else { row => new SparkRow(writeType, row, RowKind.INSERT, blobAsDescriptor, catalogContext) } + } else { row => new SparkRow(writeType, row, RowKind.INSERT, catalogContext) } } def getFieldIndex(schema: StructType, colName: String): Int = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala index 0bc9bc0a7684..c0e5190d5dfa 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala @@ -40,7 +40,6 @@ case class DataEvolutionTableDataWrite( writeBuilder: BatchWriteBuilder, writeType: RowType, firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Long)], - blobAsDescriptor: Boolean, catalogContext: CatalogContext) extends InnerTableV1DataWrite { @@ -51,7 +50,7 @@ case class DataEvolutionTableDataWrite( private val commitMessages = ListBuffer[CommitMessageImpl]() private val toPaimonRow = { - SparkRowUtils.toPaimonRow(writeType, -1, blobAsDescriptor, catalogContext) + SparkRowUtils.toPaimonRow(writeType, -1, catalogContext) } def write(row: Row): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala index f0861c27b5cb..02b9338b4bc9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala @@ -37,7 +37,6 @@ case class PaimonDataWrite( writeRowTracking: Boolean = false, fullCompactionDeltaCommits: Option[Int], batchId: Option[Long], - blobAsDescriptor: Boolean, catalogContext: CatalogContext, postponePartitionBucketComputer: Option[BinaryRow => Integer]) extends abstractInnerTableDataWrite[Row] @@ -58,7 +57,7 @@ case class PaimonDataWrite( } private val toPaimonRow = { - SparkRowUtils.toPaimonRow(writeType, rowKindColIdx, blobAsDescriptor, catalogContext) + SparkRowUtils.toPaimonRow(writeType, rowKindColIdx, catalogContext) } def write(row: Row): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala index fbd166a18312..aa2dfcdf8f56 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala @@ -45,7 +45,6 @@ case class PaimonV2DataWriter( val fullCompactionDeltaCommits: Option[Int] = Option.apply(coreOptions.fullCompactionDeltaCommits()) - val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor() val write: TableWriteImpl[InternalRow] = { writeBuilder @@ -57,12 +56,8 @@ case class PaimonV2DataWriter( private val rowConverter: InternalRow => SparkInternalRowWrapper = { val numFields = writeSchema.fields.length - val reusableWrapper = new SparkInternalRowWrapper( - writeSchema, - numFields, - dataSchema, - blobAsDescriptor, - catalogContext) + val reusableWrapper = + new SparkInternalRowWrapper(writeSchema, numFields, dataSchema, catalogContext) record => reusableWrapper.replace(record) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 08c874cd3bee..1c108a9abf31 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -91,13 +91,15 @@ class BlobTestBase extends PaimonSparkTestBase { val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length) sql( - "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')") + "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')") sql( "INSERT INTO t VALUES (1, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(5, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(2, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(3, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(4, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')") + + sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='true')") val newDescriptorBytes = sql("SELECT picture FROM t WHERE id = 1").collect()(0).get(0).asInstanceOf[Array[Byte]] val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes) @@ -132,10 +134,11 @@ class BlobTestBase extends PaimonSparkTestBase { sql( "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n" + "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" + "content BINARY\n" + ") \n" + "PARTITIONED BY (ds STRING, batch STRING) \n" + - "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content','blob-as-descriptor' = 'true')") + "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content')") sql( "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES \n('1','paimon','1024','12345678','20241017',X'" + bytesToHex( blobDescriptor.serialize()) + "')") + sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='true')") val newDescriptorBytes = sql("SELECT content FROM t WHERE id = '1'").collect()(0).get(0).asInstanceOf[Array[Byte]] val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes) From 71aeb046254eddd4ab4f1a2601059eaaaf462364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 11 Feb 2026 14:15:58 +0800 Subject: [PATCH 2/5] Fix minus --- .../src/main/java/org/apache/paimon/data/BlobDescriptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java index 7a0b3bccd2a2..ec9189c42965 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java @@ -162,7 +162,7 @@ public static BlobDescriptor deserialize(byte[] bytes) { } public static boolean isBlobDescriptor(byte[] bytes) { - if (bytes.length < 8) { + if (bytes.length < 9) { return false; } byte[] copy = new byte[8]; From 593eacf183aca3949b618e6ac8d3e80c9d84c947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 12 Feb 2026 09:59:33 +0800 Subject: [PATCH 3/5] Fix minus --- .../org/apache/paimon/data/BlobDescriptor.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java index ec9189c42965..c01c8875d68f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java @@ -165,10 +165,16 @@ public static boolean isBlobDescriptor(byte[] bytes) { if (bytes.length < 9) { return false; } - byte[] copy = new byte[8]; - System.arraycopy(bytes, 1, copy, 0, 8); - ByteBuffer magicBuffer = ByteBuffer.wrap(copy); - magicBuffer.order(ByteOrder.LITTLE_ENDIAN); - return MAGIC == magicBuffer.getLong(); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + int version = buffer.get(); + if (version == 1) { + return true; + } else if (version > CURRENT_VERSION) { + return false; + } else { + return MAGIC == buffer.getLong(); + } } } From 1023b0b622654594aa03121c5c46ba36403881a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 12 Feb 2026 10:10:15 +0800 Subject: [PATCH 4/5] Fix comment --- .../src/main/java/org/apache/paimon/data/BlobDescriptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java index c01c8875d68f..b30af2e0d4c5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java @@ -168,7 +168,7 @@ public static boolean isBlobDescriptor(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); buffer.order(ByteOrder.LITTLE_ENDIAN); - int version = buffer.get(); + byte version = buffer.get(); if (version == 1) { return true; } else if (version > CURRENT_VERSION) { From 46882639592b5d8aa3c0da13a46d5463e1f0221e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 12 Feb 2026 10:12:51 +0800 Subject: [PATCH 5/5] Fix comment --- .../java/org/apache/paimon/data/BlobDescriptorTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java index 53911556885e..eaf9d3dea88b 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java @@ -92,11 +92,11 @@ public void testSerializeAndDeserialize() { public void testDeserializeWithUnsupportedVersion() { String uri = "/test/path"; byte[] serialized = new BlobDescriptor(uri, 1, 1).serialize(); - serialized[8] = 3; + serialized[0] = 3; assertThatThrownBy(() -> BlobDescriptor.deserialize(serialized)) - .isInstanceOf(IllegalArgumentException.class) + .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining( - "Invalid BlobDescriptor: missing magic header. Expected magic: 4777280450765083459, but found: 8315180032221773834"); + "Expecting BlobDescriptor version to be less than or equal to 2, but found 3."); } @Test