From 2823adb63041b3f1978c96c57e4886530ecc405a Mon Sep 17 00:00:00 2001 From: priyanshu-ctds Date: Thu, 18 Dec 2025 21:48:03 +0530 Subject: [PATCH 1/6] Added support for tuple datatype --- .../converters/AbstractNativeConverter.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java index ab948117..ef67cc63 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java @@ -19,6 +19,7 @@ import com.datastax.oss.cdc.NativeSchemaWrapper; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; @@ -27,12 +28,14 @@ import com.datastax.oss.driver.api.core.type.ListType; import com.datastax.oss.driver.api.core.type.MapType; import com.datastax.oss.driver.api.core.type.SetType; +import com.datastax.oss.driver.api.core.type.TupleType; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.pulsar.source.Converter; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.pulsar.common.schema.SchemaType; @@ -83,6 +86,11 @@ public AbstractNativeConverter(KeyspaceMetadata ksm, TableMetadata tm, List entry) { if(collectionValue instanceof Instant) { return ((Instant)collectionValue).toEpochMilli(); } + if(collectionValue instanceof TupleValue){ + return buildTupleValue((TupleValue) collectionValue); + } return collectionValue; } + + private GenericRecord buildTupleValue(TupleValue tupleValue) { + String typeName = tupleValue.getType().asCql(false, true); + Schema tupleSchema = subSchemas.get(typeName); + if (tupleSchema == null) { + throw new IllegalStateException("Missing tuple schema for " + typeName); + } + GenericRecord record = new GenericData.Record(tupleSchema); + for (int i = 0; i < tupleValue.getType().getComponentTypes().size(); i++) { + record.put("index_" + i, marshalCollectionValue(tupleValue.getObject(i))); + } + return record; + } } From 02168b6369656172b89b41b3f6c0fefc52708c49 Mon Sep 17 00:00:00 2001 From: priyanshu-ctds Date: Fri, 19 Dec 2025 14:32:07 +0530 Subject: [PATCH 2/6] Modified test to include the tuple data type --- .../source/PulsarCassandraSourceTests.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index f732712f..5c7a632d 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -22,7 +22,10 @@ import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.data.CqlDuration; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.data.UdtValue; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.TupleType; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.driver.shaded.guava.common.collect.Lists; @@ -648,16 +651,17 @@ public void testSchema(String ksName) throws InterruptedException, IOException { // force udt values to be null by populating 1 item, using zudt.newValue() without explicitly setting // any field to non-null value will cause the udt column itself to be null in the C* table UdtValue zudtOptionalValues = zudt.newValue(dataSpecMap.get("text").cqlValue); - + TupleType tupleType = DataTypes.tupleOf(DataTypes.INT, DataTypes.TEXT); + TupleValue tupleValue = tupleType.newValue(dataSpecMap.get("int").cqlValue, dataSpecMap.get("text").cqlValue); cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table3 (" + "xtext text, xascii ascii, xboolean boolean, xblob blob, xtimestamp timestamp, xtime time, xdate date, xuuid uuid, xtimeuuid timeuuid, xtinyint tinyint, xsmallint smallint, xint int, xbigint bigint, xvarint varint, xdecimal decimal, xdouble double, xfloat float, xinet4 inet, xinet6 inet, " + - "ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list, yset set, ymap map, ylistofmap list>>, ysetofudt set>," + + "ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list, yset set, ymap map, ytuple frozen>, ylistofmap list>>, ysetofudt set>," + "primary key (xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6)) " + "WITH CLUSTERING ORDER BY (xascii ASC, xboolean DESC, xblob ASC, xtimestamp DESC, xtime DESC, xdate ASC, xuuid DESC, xtimeuuid ASC, xtinyint DESC, xsmallint ASC, xint DESC, xbigint ASC, xvarint DESC, xdecimal ASC, xdouble DESC, xfloat ASC, xinet4 ASC, xinet6 DESC) AND cdc=true"); cqlSession.execute("INSERT INTO " + ksName + ".table3 (" + "xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6, " + - "ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ylistofmap, ysetofudt" + - ") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?)", + "ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ytuple, ylistofmap, ysetofudt" + + ") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?,?)", dataSpecMap.get("text").cqlValue, dataSpecMap.get("ascii").cqlValue, dataSpecMap.get("boolean").cqlValue, @@ -705,6 +709,7 @@ public void testSchema(String ksName) throws InterruptedException, IOException { dataSpecMap.get("list").cqlValue, dataSpecMap.get("set").cqlValue, dataSpecMap.get("map").cqlValue, + tupleValue, dataSpecMap.get("listofmap").cqlValue, ImmutableSet.of(zudtValue, zudtValue) ); @@ -809,8 +814,17 @@ void assertGenericArray(String field, GenericArray ga) { } void assertField(String fieldName, Object value) { + if (fieldName.startsWith("index_")) { + int idx = Integer.parseInt(fieldName.substring("index_".length())); + if (idx == 0) { + Assert.assertEquals(dataSpecMap.get("int").avroValue, value); + } else if (idx == 1) { + Assert.assertEquals(dataSpecMap.get("text").avroValue, value); + } + return; + } String vKey = fieldName.substring(1); - if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt")) { + if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt") && !vKey.equals("tuple")) { Assert.assertTrue("Unknown field " + vKey, dataSpecMap.containsKey(vKey)); } if (value instanceof GenericRecord) { @@ -861,7 +875,7 @@ void assertGenericRecords(String field, GenericRecord gr) { (long) gr.getField(CqlLogicalTypes.CQL_DURATION_NANOSECONDS))); } return; - case "udt": { + case "udt", "tuple": { for (Field f : gr.getFields()) { assertField(f.getName(), gr.getField(f.getName())); } From 3bdd97a80a48492c7d738dc41bb8daecc7b57838 Mon Sep 17 00:00:00 2001 From: priyanshu-ctds Date: Fri, 19 Dec 2025 14:51:41 +0530 Subject: [PATCH 3/6] Separated case --- .../oss/pulsar/source/PulsarCassandraSourceTests.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index 5c7a632d..12aa1f78 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -875,7 +875,7 @@ void assertGenericRecords(String field, GenericRecord gr) { (long) gr.getField(CqlLogicalTypes.CQL_DURATION_NANOSECONDS))); } return; - case "udt", "tuple": { + case "udt": { for (Field f : gr.getFields()) { assertField(f.getName(), gr.getField(f.getName())); } @@ -892,6 +892,12 @@ void assertGenericRecords(String field, GenericRecord gr) { } } return; + case "tuple": { + for (Field f : gr.getFields()) { + assertField(f.getName(), gr.getField(f.getName())); + } + } + return; } Assert.assertTrue("Unexpected field="+field, false); } From 135e51e9c07b5e4ad595ab87db08a3a070ac7827 Mon Sep 17 00:00:00 2001 From: priyanshu-ctds Date: Fri, 19 Dec 2025 17:45:13 +0530 Subject: [PATCH 4/6] Added value --- .../pulsar/source/converters/AbstractNativeConverter.java | 2 +- .../oss/pulsar/source/converters/NativeAvroConverter.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java index ef67cc63..c36f4302 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java @@ -331,7 +331,7 @@ Object marshalCollectionValue(Map.Entry entry) { return collectionValue; } - private GenericRecord buildTupleValue(TupleValue tupleValue) { + GenericRecord buildTupleValue(TupleValue tupleValue) { String typeName = tupleValue.getType().asCql(false, true); Schema tupleSchema = subSchemas.get(typeName); if (tupleSchema == null) { diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java index 4bf20fed..704967c1 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.data.CqlDuration; import com.datastax.oss.driver.api.core.data.CqlVector; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.data.UdtValue; import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; @@ -178,6 +179,11 @@ public byte[] toConnectData(Row row) { } } break; + case ProtocolConstants.DataType.TUPLE:{ + TupleValue tupleValue = row.getTupleValue(cm.getName()); + genericRecordBuilder.put(fieldName, buildTupleValue(tupleValue)); + } + break; default: log.debug("Ignoring unsupported column name={} type={}", cm.getName(), cm.getType().asCql(false, true)); } From d075cddb7214a8ecb87ded7f70238d2da022f01e Mon Sep 17 00:00:00 2001 From: priyanshu-ctds Date: Fri, 19 Dec 2025 20:55:14 +0530 Subject: [PATCH 5/6] Merged cases with similar block and tunred optional to true --- .../pulsar/source/converters/AbstractNativeConverter.java | 2 +- .../oss/pulsar/source/PulsarCassandraSourceTests.java | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java index c36f4302..e28a102e 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java @@ -217,7 +217,7 @@ Schema dataTypeSchema(KeyspaceMetadata ksm, DataType dataType) { } case ProtocolConstants.DataType.TUPLE: TupleType tupleType = (TupleType) dataType; - return buildTupleSchema(ksm, dataType.asCql(false, true), tupleType, false); + return buildTupleSchema(ksm, dataType.asCql(false, true), tupleType, true); default: throw new UnsupportedOperationException("Ignoring unsupported type=" + dataType.asCql(false, true)); } diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index 12aa1f78..ffe1f03d 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -875,6 +875,7 @@ void assertGenericRecords(String field, GenericRecord gr) { (long) gr.getField(CqlLogicalTypes.CQL_DURATION_NANOSECONDS))); } return; + case "tuple": case "udt": { for (Field f : gr.getFields()) { assertField(f.getName(), gr.getField(f.getName())); @@ -892,12 +893,6 @@ void assertGenericRecords(String field, GenericRecord gr) { } } return; - case "tuple": { - for (Field f : gr.getFields()) { - assertField(f.getName(), gr.getField(f.getName())); - } - } - return; } Assert.assertTrue("Unexpected field="+field, false); } From 9fe522d157a11832838585af925290cf15058872 Mon Sep 17 00:00:00 2001 From: Madhavan Date: Fri, 19 Dec 2025 15:51:43 -0500 Subject: [PATCH 6/6] nitpick formatting review comments --- .../source/converters/AbstractNativeConverter.java | 10 +++++----- .../pulsar/source/converters/NativeAvroConverter.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java index e28a102e..323dd2a6 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java @@ -242,10 +242,10 @@ Schema buildUDTSchema(KeyspaceMetadata ksm, String typeName, boolean optional) { return udtSchema; } - Schema buildTupleSchema(KeyspaceMetadata ksm, String typeName, TupleType tupleType, boolean optional){ + Schema buildTupleSchema(KeyspaceMetadata ksm, String typeName, TupleType tupleType, boolean optional) { List fieldSchemas = new ArrayList<>(); - int i=0; - for(DataType componentType : tupleType.getComponentTypes()){ + int i = 0; + for(DataType componentType : tupleType.getComponentTypes()) { String fieldName = "index_" + i; Schema.Field fieldSchema = fieldSchema(ksm, fieldName, componentType, optional); if (fieldSchema != null) { @@ -306,7 +306,7 @@ Object marshalCollectionValue(Object collectionValue) { if(collectionValue instanceof Instant) { return ((Instant)collectionValue).toEpochMilli(); } - if(collectionValue instanceof TupleValue){ + if(collectionValue instanceof TupleValue) { return buildTupleValue((TupleValue) collectionValue); } return collectionValue; @@ -325,7 +325,7 @@ Object marshalCollectionValue(Map.Entry entry) { if(collectionValue instanceof Instant) { return ((Instant)collectionValue).toEpochMilli(); } - if(collectionValue instanceof TupleValue){ + if(collectionValue instanceof TupleValue) { return buildTupleValue((TupleValue) collectionValue); } return collectionValue; diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java index 704967c1..d0267fa5 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java @@ -179,7 +179,7 @@ public byte[] toConnectData(Row row) { } } break; - case ProtocolConstants.DataType.TUPLE:{ + case ProtocolConstants.DataType.TUPLE: { TupleValue tupleValue = row.getTupleValue(cm.getName()); genericRecordBuilder.put(fieldName, buildTupleValue(tupleValue)); }