diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java index ab948117..323dd2a6 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java @@ -19,6 +19,7 @@ import com.datastax.oss.cdc.NativeSchemaWrapper; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; @@ -27,12 +28,14 @@ import com.datastax.oss.driver.api.core.type.ListType; import com.datastax.oss.driver.api.core.type.MapType; import com.datastax.oss.driver.api.core.type.SetType; +import com.datastax.oss.driver.api.core.type.TupleType; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.pulsar.source.Converter; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.pulsar.common.schema.SchemaType; @@ -83,6 +86,11 @@ public AbstractNativeConverter(KeyspaceMetadata ksm, TableMetadata tm, List entry) { if(collectionValue instanceof Instant) { return ((Instant)collectionValue).toEpochMilli(); } + if(collectionValue instanceof TupleValue) { + return buildTupleValue((TupleValue) collectionValue); + } return collectionValue; } + + GenericRecord buildTupleValue(TupleValue tupleValue) { + String typeName = tupleValue.getType().asCql(false, true); + Schema tupleSchema = subSchemas.get(typeName); + if (tupleSchema == null) { + throw new IllegalStateException("Missing tuple schema for " + typeName); + } + GenericRecord record = new GenericData.Record(tupleSchema); + for (int i = 0; i < tupleValue.getType().getComponentTypes().size(); i++) { + record.put("index_" + i, marshalCollectionValue(tupleValue.getObject(i))); + } + return record; + } } diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java index 4bf20fed..d0267fa5 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.data.CqlDuration; import com.datastax.oss.driver.api.core.data.CqlVector; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.data.UdtValue; import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; @@ -178,6 +179,11 @@ public byte[] toConnectData(Row row) { } } break; + case ProtocolConstants.DataType.TUPLE: { + TupleValue tupleValue = row.getTupleValue(cm.getName()); + genericRecordBuilder.put(fieldName, buildTupleValue(tupleValue)); + } + break; default: log.debug("Ignoring unsupported column name={} type={}", cm.getName(), cm.getType().asCql(false, true)); } diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index f732712f..ffe1f03d 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -22,7 +22,10 @@ import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.data.CqlDuration; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.data.UdtValue; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.TupleType; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.driver.shaded.guava.common.collect.Lists; @@ -648,16 +651,17 @@ public void testSchema(String ksName) throws InterruptedException, IOException { // force udt values to be null by populating 1 item, using zudt.newValue() without explicitly setting // any field to non-null value will cause the udt column itself to be null in the C* table UdtValue zudtOptionalValues = zudt.newValue(dataSpecMap.get("text").cqlValue); - + TupleType tupleType = DataTypes.tupleOf(DataTypes.INT, DataTypes.TEXT); + TupleValue tupleValue = tupleType.newValue(dataSpecMap.get("int").cqlValue, dataSpecMap.get("text").cqlValue); cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table3 (" + "xtext text, xascii ascii, xboolean boolean, xblob blob, xtimestamp timestamp, xtime time, xdate date, xuuid uuid, xtimeuuid timeuuid, xtinyint tinyint, xsmallint smallint, xint int, xbigint bigint, xvarint varint, xdecimal decimal, xdouble double, xfloat float, xinet4 inet, xinet6 inet, " + - "ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list, yset set, ymap map, ylistofmap list>>, ysetofudt set>," + + "ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list, yset set, ymap map, ytuple frozen>, ylistofmap list>>, ysetofudt set>," + "primary key (xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6)) " + "WITH CLUSTERING ORDER BY (xascii ASC, xboolean DESC, xblob ASC, xtimestamp DESC, xtime DESC, xdate ASC, xuuid DESC, xtimeuuid ASC, xtinyint DESC, xsmallint ASC, xint DESC, xbigint ASC, xvarint DESC, xdecimal ASC, xdouble DESC, xfloat ASC, xinet4 ASC, xinet6 DESC) AND cdc=true"); cqlSession.execute("INSERT INTO " + ksName + ".table3 (" + "xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6, " + - "ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ylistofmap, ysetofudt" + - ") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?)", + "ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ytuple, ylistofmap, ysetofudt" + + ") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?,?)", dataSpecMap.get("text").cqlValue, dataSpecMap.get("ascii").cqlValue, dataSpecMap.get("boolean").cqlValue, @@ -705,6 +709,7 @@ public void testSchema(String ksName) throws InterruptedException, IOException { dataSpecMap.get("list").cqlValue, dataSpecMap.get("set").cqlValue, dataSpecMap.get("map").cqlValue, + tupleValue, dataSpecMap.get("listofmap").cqlValue, ImmutableSet.of(zudtValue, zudtValue) ); @@ -809,8 +814,17 @@ void assertGenericArray(String field, GenericArray ga) { } void assertField(String fieldName, Object value) { + if (fieldName.startsWith("index_")) { + int idx = Integer.parseInt(fieldName.substring("index_".length())); + if (idx == 0) { + Assert.assertEquals(dataSpecMap.get("int").avroValue, value); + } else if (idx == 1) { + Assert.assertEquals(dataSpecMap.get("text").avroValue, value); + } + return; + } String vKey = fieldName.substring(1); - if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt")) { + if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt") && !vKey.equals("tuple")) { Assert.assertTrue("Unknown field " + vKey, dataSpecMap.containsKey(vKey)); } if (value instanceof GenericRecord) { @@ -861,6 +875,7 @@ void assertGenericRecords(String field, GenericRecord gr) { (long) gr.getField(CqlLogicalTypes.CQL_DURATION_NANOSECONDS))); } return; + case "tuple": case "udt": { for (Field f : gr.getFields()) { assertField(f.getName(), gr.getField(f.getName()));