Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.datastax.oss.cdc.NativeSchemaWrapper;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
Expand All @@ -27,12 +28,14 @@
import com.datastax.oss.driver.api.core.type.ListType;
import com.datastax.oss.driver.api.core.type.MapType;
import com.datastax.oss.driver.api.core.type.SetType;
import com.datastax.oss.driver.api.core.type.TupleType;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.pulsar.source.Converter;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;

Expand Down Expand Up @@ -83,6 +86,11 @@ public AbstractNativeConverter(KeyspaceMetadata ksm, TableMetadata tm, List<Colu
log.info("Add vector schema {}={}", field.name(), vectorSchema);
}
break;
case ProtocolConstants.DataType.TUPLE:
Schema tupleSchema = dataTypeSchema(ksm, cm.getType());
subSchemas.put(field.name(), tupleSchema);
log.info("Add tuple schema {}={}", field.name(), tupleSchema);
break;
}
}
}
Expand Down Expand Up @@ -132,6 +140,8 @@ boolean isSupportedCqlType(DataType dataType) {
return true;
case ProtocolConstants.DataType.CUSTOM:
return dataType instanceof CqlVectorType;
case ProtocolConstants.DataType.TUPLE:
return true;
}
return false;
}
Expand Down Expand Up @@ -205,6 +215,9 @@ Schema dataTypeSchema(KeyspaceMetadata ksm, DataType dataType) {
CqlVectorType vectorType = (CqlVectorType) dataType;
return org.apache.avro.Schema.createArray(dataTypeSchema(ksm, vectorType.getSubtype()));
}
case ProtocolConstants.DataType.TUPLE:
TupleType tupleType = (TupleType) dataType;
return buildTupleSchema(ksm, dataType.asCql(false, true), tupleType, false);
default:
throw new UnsupportedOperationException("Ignoring unsupported type=" + dataType.asCql(false, true));
}
Expand All @@ -229,6 +242,26 @@ Schema buildUDTSchema(KeyspaceMetadata ksm, String typeName, boolean optional) {
return udtSchema;
}

Schema buildTupleSchema(KeyspaceMetadata ksm, String typeName, TupleType tupleType, boolean optional){
List<Schema.Field> fieldSchemas = new ArrayList<>();
int i=0;
for(DataType componentType : tupleType.getComponentTypes()){
String fieldName = "index_" + i;
Schema.Field fieldSchema = fieldSchema(ksm, fieldName, componentType, optional);
if (fieldSchema != null) {
fieldSchemas.add(fieldSchema);
String path = typeName + "." + fieldName;
subSchemas.put(path, dataTypeSchema(ksm, componentType));
}
i++;
}
Schema tupleSchema = Schema.createRecord("Tuple_" + Integer.toHexString(
tupleType.asCql(false, true).hashCode()
), "CQL type " + typeName, ksm.getName().asInternal(), false, fieldSchemas);
subSchemas.put(typeName, tupleSchema);
return tupleSchema;
}

String stringify(DataType dataType, Object value) {
switch (dataType.getProtocolCode()) {
case ProtocolConstants.DataType.ASCII:
Expand Down Expand Up @@ -273,6 +306,9 @@ Object marshalCollectionValue(Object collectionValue) {
if(collectionValue instanceof Instant) {
return ((Instant)collectionValue).toEpochMilli();
}
if(collectionValue instanceof TupleValue){
return buildTupleValue((TupleValue) collectionValue);
}
return collectionValue;
}

Expand All @@ -289,6 +325,22 @@ Object marshalCollectionValue(Map.Entry<? super Object, ? super Object> entry) {
if(collectionValue instanceof Instant) {
return ((Instant)collectionValue).toEpochMilli();
}
if(collectionValue instanceof TupleValue){
return buildTupleValue((TupleValue) collectionValue);
}
return collectionValue;
}

GenericRecord buildTupleValue(TupleValue tupleValue) {
String typeName = tupleValue.getType().asCql(false, true);
Schema tupleSchema = subSchemas.get(typeName);
if (tupleSchema == null) {
throw new IllegalStateException("Missing tuple schema for " + typeName);
}
GenericRecord record = new GenericData.Record(tupleSchema);
for (int i = 0; i < tupleValue.getType().getComponentTypes().size(); i++) {
record.put("index_" + i, marshalCollectionValue(tupleValue.getObject(i)));
}
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.datastax.oss.driver.api.core.data.CqlVector;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.data.UdtValue;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
Expand Down Expand Up @@ -178,6 +179,11 @@ public byte[] toConnectData(Row row) {
}
}
break;
case ProtocolConstants.DataType.TUPLE:{
TupleValue tupleValue = row.getTupleValue(cm.getName());
genericRecordBuilder.put(fieldName, buildTupleValue(tupleValue));
}
break;
default:
log.debug("Ignoring unsupported column name={} type={}", cm.getName(), cm.getType().asCql(false, true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.data.UdtValue;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.TupleType;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
Expand Down Expand Up @@ -648,16 +651,17 @@ public void testSchema(String ksName) throws InterruptedException, IOException {
// force udt values to be null by populating 1 item, using zudt.newValue() without explicitly setting
// any field to non-null value will cause the udt column itself to be null in the C* table
UdtValue zudtOptionalValues = zudt.newValue(dataSpecMap.get("text").cqlValue);

TupleType tupleType = DataTypes.tupleOf(DataTypes.INT, DataTypes.TEXT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you please able to add a complex test case of a CQL column of type map<text, frozen<tuple<text, text, bigint, double, text>>> in addition to the simple tuple type here?

TupleValue tupleValue = tupleType.newValue(dataSpecMap.get("int").cqlValue, dataSpecMap.get("text").cqlValue);
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table3 (" +
"xtext text, xascii ascii, xboolean boolean, xblob blob, xtimestamp timestamp, xtime time, xdate date, xuuid uuid, xtimeuuid timeuuid, xtinyint tinyint, xsmallint smallint, xint int, xbigint bigint, xvarint varint, xdecimal decimal, xdouble double, xfloat float, xinet4 inet, xinet6 inet, " +
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ytuple frozen<tuple<int, text>>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
"primary key (xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6)) " +
"WITH CLUSTERING ORDER BY (xascii ASC, xboolean DESC, xblob ASC, xtimestamp DESC, xtime DESC, xdate ASC, xuuid DESC, xtimeuuid ASC, xtinyint DESC, xsmallint ASC, xint DESC, xbigint ASC, xvarint DESC, xdecimal ASC, xdouble DESC, xfloat ASC, xinet4 ASC, xinet6 DESC) AND cdc=true");
cqlSession.execute("INSERT INTO " + ksName + ".table3 (" +
"xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6, " +
"ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ylistofmap, ysetofudt" +
") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?)",
"ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ytuple, ylistofmap, ysetofudt" +
") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?,?)",
dataSpecMap.get("text").cqlValue,
dataSpecMap.get("ascii").cqlValue,
dataSpecMap.get("boolean").cqlValue,
Expand Down Expand Up @@ -705,6 +709,7 @@ public void testSchema(String ksName) throws InterruptedException, IOException {
dataSpecMap.get("list").cqlValue,
dataSpecMap.get("set").cqlValue,
dataSpecMap.get("map").cqlValue,
tupleValue,
dataSpecMap.get("listofmap").cqlValue,
ImmutableSet.of(zudtValue, zudtValue)
);
Expand Down Expand Up @@ -809,8 +814,17 @@ void assertGenericArray(String field, GenericArray ga) {
}

void assertField(String fieldName, Object value) {
if (fieldName.startsWith("index_")) {
int idx = Integer.parseInt(fieldName.substring("index_".length()));
if (idx == 0) {
Assert.assertEquals(dataSpecMap.get("int").avroValue, value);
} else if (idx == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this fully, so please bear with my limited knowledge here. Are we stopping just at the 2nd field? What if the tuple field has more than 2 fileds?

Assert.assertEquals(dataSpecMap.get("text").avroValue, value);
}
return;
}
String vKey = fieldName.substring(1);
if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt")) {
if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt") && !vKey.equals("tuple")) {
Assert.assertTrue("Unknown field " + vKey, dataSpecMap.containsKey(vKey));
}
if (value instanceof GenericRecord) {
Expand Down Expand Up @@ -878,6 +892,12 @@ void assertGenericRecords(String field, GenericRecord gr) {
}
}
return;
case "tuple": {
for (Field f : gr.getFields()) {
assertField(f.getName(), gr.getField(f.getName()));
}
}
return;
}
Assert.assertTrue("Unexpected field="+field, false);
}
Expand Down
Loading