Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
public class BlobDescriptor implements Serializable {

private static final long serialVersionUID = 1L;

private static final byte CURRENT_VERSION = 1;
private static final long MAGIC = 0x424C4F4244455343L; // "BLOBDESC"
private static final byte CURRENT_VERSION = 2;

private final byte version;
private final String uri;
Expand Down Expand Up @@ -113,11 +113,12 @@ public byte[] serialize() {
byte[] uriBytes = uri.getBytes(UTF_8);
int uriLength = uriBytes.length;

int totalSize = 1 + 4 + uriLength + 8 + 8;
int totalSize = 1 + 8 + 4 + uriLength + 8 + 8;
ByteBuffer buffer = ByteBuffer.allocate(totalSize);
buffer.order(ByteOrder.LITTLE_ENDIAN);

buffer.put(version);
buffer.putLong(MAGIC);
buffer.putInt(uriLength);
buffer.put(uriBytes);

Expand All @@ -130,16 +131,26 @@ public byte[] serialize() {
public static BlobDescriptor deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.order(ByteOrder.LITTLE_ENDIAN);

byte version = buffer.get();
if (version != CURRENT_VERSION) {
if (version > CURRENT_VERSION) {
throw new UnsupportedOperationException(
"Expecting BlobDescriptor version to be "
"Expecting BlobDescriptor version to be less than or equal to "
+ CURRENT_VERSION
+ ", but found "
+ version
+ ".");
}

if (version > 1) {
if (MAGIC != buffer.getLong()) {
throw new IllegalArgumentException(
"Invalid BlobDescriptor: missing magic header. Expected magic: "
+ MAGIC
+ ", but found: "
+ buffer.getLong());
}
}

int uriLength = buffer.getInt();
byte[] uriBytes = new byte[uriLength];
buffer.get(uriBytes);
Expand All @@ -149,4 +160,21 @@ public static BlobDescriptor deserialize(byte[] bytes) {
long length = buffer.getLong();
return new BlobDescriptor(version, uri, offset, length);
}

public static boolean isBlobDescriptor(byte[] bytes) {
if (bytes.length < 9) {
return false;
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.order(ByteOrder.LITTLE_ENDIAN);

byte version = buffer.get();
if (version == 1) {
return true;
} else if (version > CURRENT_VERSION) {
return false;
} else {
return MAGIC == buffer.getLong();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/** A factory to create and cache {@link UriReader}. */
public class UriReaderFactory {
public class UriReaderFactory implements Serializable {

private final CatalogContext context;
private final Map<UriKey, UriReader> readers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.data;

import org.apache.paimon.utils.IOUtils;

import org.junit.jupiter.api.Test;

import java.lang.reflect.Constructor;
Expand All @@ -38,7 +40,7 @@ public void testEquals() throws Exception {
BlobDescriptor descriptor3 = new BlobDescriptor(uri2, 100L, 200L);
BlobDescriptor descriptor4 = new BlobDescriptor(uri1, 150L, 200L);
BlobDescriptor descriptor5 = new BlobDescriptor(uri1, 100L, 250L);
BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 2, uri1, 100L, 200L);
BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 3, uri1, 100L, 200L);
assertThat(descriptor1).isEqualTo(descriptor2);
assertThat(descriptor1).isNotEqualTo(descriptor3);
assertThat(descriptor1).isNotEqualTo(descriptor4);
Expand All @@ -64,7 +66,7 @@ public void testToString() {
BlobDescriptor descriptor = new BlobDescriptor(uri, 100L, 200L);

String toString = descriptor.toString();
assertThat(toString).contains("version=1");
assertThat(toString).contains("version=2");
assertThat(toString).contains("uri='/test/path'");
assertThat(toString).contains("offset=100");
assertThat(toString).contains("length=200");
Expand All @@ -90,10 +92,26 @@ public void testSerializeAndDeserialize() {
public void testDeserializeWithUnsupportedVersion() {
String uri = "/test/path";
byte[] serialized = new BlobDescriptor(uri, 1, 1).serialize();
serialized[0] = 2;
serialized[0] = 3;
assertThatThrownBy(() -> BlobDescriptor.deserialize(serialized))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Expecting BlobDescriptor version to be 1, but found 2.");
.hasMessageContaining(
"Expecting BlobDescriptor version to be less than or equal to 2, but found 3.");
}

@Test
public void testBlobVersionCompatible() throws Exception {
byte[] serialized =
IOUtils.readFully(
BlobDescriptorTest.class
.getClassLoader()
.getResourceAsStream("compatible/blob_descriptor_v1"),
true);

BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(serialized);
assertThat(blobDescriptor.uri()).isEqualTo("/test/path");
assertThat(blobDescriptor.offset()).isEqualTo(100L);
assertThat(blobDescriptor.length()).isEqualTo(200L);
}

private BlobDescriptor createDescriptorWithVersion(
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,14 @@
public class FlinkRowWrapper implements InternalRow {

private final org.apache.flink.table.data.RowData row;
private final boolean blobAsDescriptor;
private final UriReaderFactory uriReaderFactory;

public FlinkRowWrapper(org.apache.flink.table.data.RowData row) {
this(row, false, null);
this(row, null);
}

public FlinkRowWrapper(
org.apache.flink.table.data.RowData row,
boolean blobAsDescriptor,
CatalogContext catalogContext) {
public FlinkRowWrapper(org.apache.flink.table.data.RowData row, CatalogContext catalogContext) {
this.row = row;
this.blobAsDescriptor = blobAsDescriptor;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
}

Expand Down Expand Up @@ -147,12 +142,14 @@ public Variant getVariant(int pos) {

@Override
public Blob getBlob(int pos) {
if (blobAsDescriptor) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getBinary(pos));
byte[] bytes = row.getBinary(pos);
boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
if (blobDes) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
return new BlobData(row.getBinary(pos));
return new BlobData(bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,11 @@ protected boolean buildForPostponeBucketCompaction(
partitionSpec,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));

boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream<InternalRow> partitioned =
FlinkStreamPartitioner.partition(
FlinkSinkBuilder.mapToInternalRow(
sourcePair.getLeft(),
realTable.rowType(),
blobAsDescriptor,
table.catalogEnvironment().catalogContext()),
new RowDataChannelComputer(realTable.schema()),
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,11 @@ protected List<DataStream<Committable>> buildCompactOperator(
// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when doWrite()
RowAppendTableSink sink = new RowAppendTableSink(table, null, null);
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream<Committable> written =
sink.doWrite(
FlinkSinkBuilder.mapToInternalRow(
sorted,
table.rowType(),
blobAsDescriptor,
table.catalogEnvironment().catalogContext()),
commitUser,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,13 @@ public FlinkSinkBuilder clusteringIfPossible(
public DataStreamSink<?> build() {
setParallelismIfAdaptiveConflict();
input = trySortInput(input);
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
CatalogContext contextForDescriptor =
BlobDescriptorUtils.getCatalogContext(
table.catalogEnvironment().catalogContext(),
table.coreOptions().toConfiguration());

DataStream<InternalRow> input =
mapToInternalRow(
this.input, table.rowType(), blobAsDescriptor, contextForDescriptor);
mapToInternalRow(this.input, table.rowType(), contextForDescriptor);
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
SingleOutputStreamOperator<InternalRow> newInput =
input.forward()
Expand Down Expand Up @@ -247,14 +245,11 @@ public DataStreamSink<?> build() {
public static DataStream<InternalRow> mapToInternalRow(
DataStream<RowData> input,
org.apache.paimon.types.RowType rowType,
boolean blobAsDescriptor,
CatalogContext catalogContext) {
SingleOutputStreamOperator<InternalRow> result =
input.map(
(MapFunction<RowData, InternalRow>)
r ->
new FlinkRowWrapper(
r, blobAsDescriptor, catalogContext))
r -> new FlinkRowWrapper(r, catalogContext))
.returns(
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(
rowType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,25 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable {

private final StructType tableSchema;
private final int length;
private final boolean blobAsDescriptor;
@Nullable private final UriReaderFactory uriReaderFactory;
@Nullable private final int[] fieldIndexMap;

private transient org.apache.spark.sql.catalyst.InternalRow internalRow;

public SparkInternalRowWrapper(StructType tableSchema, int length) {
this(tableSchema, length, null, false, null);
this(tableSchema, length, null, null);
}

public SparkInternalRowWrapper(
StructType tableSchema,
int length,
StructType dataSchema,
boolean blobAsDescriptor,
CatalogContext catalogContext) {
this.tableSchema = tableSchema;
this.length = length;
this.fieldIndexMap =
dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null;
this.blobAsDescriptor = blobAsDescriptor;
this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
}

public SparkInternalRowWrapper replace(org.apache.spark.sql.catalyst.InternalRow internalRow) {
Expand Down Expand Up @@ -243,12 +240,14 @@ public Variant getVariant(int pos) {

@Override
public Blob getBlob(int pos) {
if (blobAsDescriptor) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(internalRow.getBinary(pos));
byte[] bytes = internalRow.getBinary(pos);
boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
if (blobDes) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
return new BlobData(internalRow.getBinary(pos));
return new BlobData(bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,17 @@ public class SparkRow implements InternalRow, Serializable {
private final RowType type;
private final Row row;
private final RowKind rowKind;
private final boolean blobAsDescriptor;
private final UriReaderFactory uriReaderFactory;

public SparkRow(RowType type, Row row) {
this(type, row, RowKind.INSERT, false, null);
this(type, row, RowKind.INSERT, null);
}

public SparkRow(
RowType type,
Row row,
RowKind rowkind,
boolean blobAsDescriptor,
CatalogContext catalogContext) {
public SparkRow(RowType type, Row row, RowKind rowkind, CatalogContext catalogContext) {
this.type = type;
this.row = row;
this.rowKind = rowkind;
this.blobAsDescriptor = blobAsDescriptor;
this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
}

@Override
Expand Down Expand Up @@ -168,12 +161,14 @@ public Variant getVariant(int i) {

@Override
public Blob getBlob(int i) {
if (blobAsDescriptor) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getAs(i));
byte[] bytes = row.getAs(i);
boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
if (blobDes) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
return new BlobData(row.getAs(i));
return new BlobData(bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se
table.newBatchWriteBuilder(),
writeType,
firstRowIdToPartitionMap,
coreOptions.blobAsDescriptor(),
table.catalogEnvironment().catalogContext())
try {
iter.foreach(row => write.write(row))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ case class PaimonSparkWriter(
writeRowTracking,
fullCompactionDeltaCommits,
batchId,
coreOptions.blobAsDescriptor(),
table.catalogEnvironment().catalogContext(),
postponePartitionBucketComputer
)
Expand Down Expand Up @@ -455,7 +454,6 @@ case class PaimonSparkWriter(
val toPaimonRow = SparkRowUtils.toPaimonRow(
rowType,
rowKindColIdx,
table.coreOptions().blobAsDescriptor(),
table.catalogEnvironment().catalogContext())

bootstrapIterator.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ case class SparkPostponeCompactProcedure(
writeRowTracking = coreOptions.dataEvolutionEnabled(),
Option.apply(coreOptions.fullCompactionDeltaCommits()),
None,
coreOptions.blobAsDescriptor(),
table.catalogEnvironment().catalogContext(),
Some(postponePartitionBucketComputer)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ object SparkRowUtils {
def toPaimonRow(
writeType: RowType,
rowkindColIdx: Int,
blobAsDescriptor: Boolean,
catalogContext: CatalogContext): Row => SparkRow = {
if (rowkindColIdx != -1) {
row =>
new SparkRow(
writeType,
row,
RowKind.fromByteValue(row.getByte(rowkindColIdx)),
blobAsDescriptor,
catalogContext)
} else { row => new SparkRow(writeType, row, RowKind.INSERT, blobAsDescriptor, catalogContext) }
} else { row => new SparkRow(writeType, row, RowKind.INSERT, catalogContext) }
}

def getFieldIndex(schema: StructType, colName: String): Int = {
Expand Down
Loading