diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java index 6f0cb5cffc..87b594af9e 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java @@ -332,7 +332,17 @@ private static T buildBaseTypeSchema( case List: case FixedSizeList: - return buildArraySchema(builder.array(), field, namespace); + // Arrow uses "$data$" as the field name for list items, that is not a valid Avro name + Field itemField = field.getChildren().get(0); + if (ListVector.DATA_VECTOR_NAME.equals(itemField.getName())) { + Field safeItemField = + new Field("item", itemField.getFieldType(), itemField.getChildren()); + Field safeListField = + new Field(field.getName(), field.getFieldType(), List.of(safeItemField)); + return buildArraySchema(builder.array(), safeListField, namespace); + } else { + return buildArraySchema(builder.array(), field, namespace); + } case Map: return buildMapSchema(builder.map(), field, namespace); diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrow.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrow.java index 2392c36f94..2a28ad393b 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrow.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrow.java @@ -59,4 +59,23 @@ public static AvroToArrowVectorIterator avroToArrowIterator( return AvroToArrowVectorIterator.create(decoder, schema, config); } + + /** + * Convert an Avro schema to its Arrow equivalent. + * + *

The resulting set of Arrow fields matches what would be set in the VSR after calling + * avroToArrow() or avroToArrowIterator(), respecting the configuration in the config parameter. + * + * @param schema The Avro schema to convert + * @param config Configuration options for conversion + * @return The equivalent Arrow schema + */ + public static org.apache.arrow.vector.types.pojo.Schema avroToAvroSchema( + Schema schema, AvroToArrowConfig config) { + + Preconditions.checkNotNull(schema, "Avro schema object cannot be null"); + Preconditions.checkNotNull(config, "config cannot be null"); + + return AvroToArrowUtils.createArrowSchema(schema, config); + } } diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowConfig.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowConfig.java index bd70c2b8ba..5596138586 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowConfig.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowConfig.java @@ -41,6 +41,12 @@ public class AvroToArrowConfig { /** The field names which to skip when reading decoder values. */ private final Set skipFieldNames; + /** + * Use legacy-mode to keep compatibility with old behavior (pre-2025), enabled by default. This + * affects how the AvroToArrow code interprets the Avro schema. + */ + private final boolean legacyMode; + /** * Instantiate an instance. * @@ -64,6 +70,37 @@ public class AvroToArrowConfig { this.targetBatchSize = targetBatchSize; this.provider = provider; this.skipFieldNames = skipFieldNames; + + // Default values for optional parameters + legacyMode = true; // Keep compatibility with old behavior by default + } + + /** + * Instantiate an instance. + * + * @param allocator The memory allocator to construct the Arrow vectors with. + * @param targetBatchSize The maximum rowCount to read each time when partially convert data. + * @param provider The dictionary provider used for enum type, adapter will update this provider. + * @param skipFieldNames Field names which to skip. + * @param legacyMode Keep compatibility with old behavior (pre-2025) + */ + AvroToArrowConfig( + BufferAllocator allocator, + int targetBatchSize, + DictionaryProvider.MapDictionaryProvider provider, + Set skipFieldNames, + boolean legacyMode) { + + Preconditions.checkArgument( + targetBatchSize == AvroToArrowVectorIterator.NO_LIMIT_BATCH_SIZE || targetBatchSize > 0, + "invalid targetBatchSize: %s", + targetBatchSize); + + this.allocator = allocator; + this.targetBatchSize = targetBatchSize; + this.provider = provider; + this.skipFieldNames = skipFieldNames; + this.legacyMode = legacyMode; } public BufferAllocator getAllocator() { @@ -81,4 +118,8 @@ public DictionaryProvider.MapDictionaryProvider getProvider() { public Set getSkipFieldNames() { return skipFieldNames; } + + public boolean isLegacyMode() { + return legacyMode; + } } diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java index ed7642aabd..aedef7732e 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java @@ -41,6 +41,7 @@ import org.apache.arrow.adapter.avro.consumers.AvroLongConsumer; import org.apache.arrow.adapter.avro.consumers.AvroMapConsumer; import org.apache.arrow.adapter.avro.consumers.AvroNullConsumer; +import org.apache.arrow.adapter.avro.consumers.AvroNullableConsumer; import org.apache.arrow.adapter.avro.consumers.AvroStringConsumer; import org.apache.arrow.adapter.avro.consumers.AvroStructConsumer; import org.apache.arrow.adapter.avro.consumers.AvroUnionsConsumer; @@ -49,17 +50,23 @@ import org.apache.arrow.adapter.avro.consumers.SkipConsumer; import org.apache.arrow.adapter.avro.consumers.SkipFunction; import org.apache.arrow.adapter.avro.consumers.logical.AvroDateConsumer; +import org.apache.arrow.adapter.avro.consumers.logical.AvroDecimal256Consumer; import org.apache.arrow.adapter.avro.consumers.logical.AvroDecimalConsumer; import org.apache.arrow.adapter.avro.consumers.logical.AvroTimeMicroConsumer; import org.apache.arrow.adapter.avro.consumers.logical.AvroTimeMillisConsumer; import org.apache.arrow.adapter.avro.consumers.logical.AvroTimestampMicrosConsumer; +import org.apache.arrow.adapter.avro.consumers.logical.AvroTimestampMicrosTzConsumer; import org.apache.arrow.adapter.avro.consumers.logical.AvroTimestampMillisConsumer; +import org.apache.arrow.adapter.avro.consumers.logical.AvroTimestampMillisTzConsumer; +import org.apache.arrow.adapter.avro.consumers.logical.AvroTimestampNanosConsumer; +import org.apache.arrow.adapter.avro.consumers.logical.AvroTimestampNanosTzConsumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.BaseIntVector; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.Decimal256Vector; import org.apache.arrow.vector.DecimalVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.FixedSizeBinaryVector; @@ -69,8 +76,12 @@ import org.apache.arrow.vector.NullVector; import org.apache.arrow.vector.TimeMicroVector; import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -169,42 +180,69 @@ private static Consumer createConsumer( switch (type) { case UNION: - consumer = createUnionConsumer(schema, name, config, consumerVector); + boolean nullableUnion = + schema.getTypes().stream().anyMatch(t -> t.getType() == Schema.Type.NULL); + if (schema.getTypes().size() == 2 && nullableUnion && !config.isLegacyMode()) { + // For a simple nullable (null | type), interpret the union as a single nullable field. + // Not available in legacy mode, which uses the literal interpretation instead + int nullIndex = schema.getTypes().get(0).getType() == Schema.Type.NULL ? 0 : 1; + int childIndex = nullIndex == 0 ? 1 : 0; + Schema childSchema = schema.getTypes().get(childIndex); + Consumer childConsumer = + createConsumer(childSchema, name, true, config, consumerVector); + consumer = new AvroNullableConsumer<>(childConsumer, nullIndex); + } else { + // Literal interpretation of a union, which may or may not include a null element. + consumer = createUnionConsumer(schema, name, nullableUnion, config, consumerVector); + } break; case ARRAY: - consumer = createArrayConsumer(schema, name, config, consumerVector); + consumer = createArrayConsumer(schema, name, nullable, config, consumerVector); break; case MAP: - consumer = createMapConsumer(schema, name, config, consumerVector); + consumer = createMapConsumer(schema, name, nullable, config, consumerVector); break; case RECORD: - consumer = createStructConsumer(schema, name, config, consumerVector); + consumer = createStructConsumer(schema, name, nullable, config, consumerVector); break; case ENUM: - consumer = createEnumConsumer(schema, name, config, consumerVector); + consumer = createEnumConsumer(schema, name, nullable, config, consumerVector); break; case STRING: arrowType = new ArrowType.Utf8(); - fieldType = new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + fieldType = + new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroStringConsumer((VarCharVector) vector); break; case FIXED: - Map extProps = createExternalProps(schema); + Map extProps = createExternalProps(schema, config); if (logicalType instanceof LogicalTypes.Decimal) { - arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType, schema); fieldType = new FieldType( - nullable, arrowType, /* dictionary= */ null, getMetaData(schema, extProps)); + nullable, + arrowType, + /* dictionary= */ null, + getMetaData(schema, extProps, config)); vector = createVector(consumerVector, fieldType, name, allocator); - consumer = - new AvroDecimalConsumer.FixedDecimalConsumer( - (DecimalVector) vector, schema.getFixedSize()); + if (schema.getFixedSize() <= 16) { + consumer = + new AvroDecimalConsumer.FixedDecimalConsumer( + (DecimalVector) vector, schema.getFixedSize()); + } else { + consumer = + new AvroDecimal256Consumer.FixedDecimal256Consumer( + (Decimal256Vector) vector, schema.getFixedSize()); + } } else { arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); fieldType = new FieldType( - nullable, arrowType, /* dictionary= */ null, getMetaData(schema, extProps)); + nullable, + arrowType, + /* dictionary= */ null, + getMetaData(schema, extProps, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize()); } @@ -213,26 +251,30 @@ private static Consumer createConsumer( if (logicalType instanceof LogicalTypes.Date) { arrowType = new ArrowType.Date(DateUnit.DAY); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroDateConsumer((DateDayVector) vector); } else if (logicalType instanceof LogicalTypes.TimeMillis) { arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector); } else { arrowType = new ArrowType.Int(32, /* isSigned= */ true); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroIntConsumer((IntVector) vector); } break; case BOOLEAN: arrowType = new ArrowType.Bool(); - fieldType = new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + fieldType = + new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroBooleanConsumer((BitVector) vector); break; @@ -240,60 +282,109 @@ private static Consumer createConsumer( if (logicalType instanceof LogicalTypes.TimeMicros) { arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroTimeMicroConsumer((TimeMicroVector) vector); - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + } else if (logicalType instanceof LogicalTypes.TimestampMillis && !config.isLegacyMode()) { + // In legacy mode the timestamp-xxx types are treated as local, there is no zone aware + // type + arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"); + fieldType = + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimestampMillisTzConsumer((TimeStampMilliTZVector) vector); + } else if (logicalType instanceof LogicalTypes.TimestampMicros && !config.isLegacyMode()) { + arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + fieldType = + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimestampMicrosTzConsumer((TimeStampMicroTZVector) vector); + } else if (logicalType instanceof LogicalTypes.TimestampNanos && !config.isLegacyMode()) { + arrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"); + fieldType = + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimestampNanosTzConsumer((TimeStampNanoTZVector) vector); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis + || (logicalType instanceof LogicalTypes.TimestampMillis && config.isLegacyMode())) { arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroTimestampMillisConsumer((TimeStampMilliVector) vector); - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros + || (logicalType instanceof LogicalTypes.TimestampMicros && config.isLegacyMode())) { + // In legacy mode the timestamp-xxx types are treated as local arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector); + } else if (logicalType instanceof LogicalTypes.LocalTimestampNanos + || (logicalType instanceof LogicalTypes.TimestampNanos && config.isLegacyMode())) { + arrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, null); + fieldType = + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimestampNanosConsumer((TimeStampNanoVector) vector); } else { arrowType = new ArrowType.Int(64, /* isSigned= */ true); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroLongConsumer((BigIntVector) vector); } break; case FLOAT: arrowType = new ArrowType.FloatingPoint(SINGLE); - fieldType = new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + fieldType = + new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroFloatConsumer((Float4Vector) vector); break; case DOUBLE: arrowType = new ArrowType.FloatingPoint(DOUBLE); - fieldType = new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + fieldType = + new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroDoubleConsumer((Float8Vector) vector); break; case BYTES: if (logicalType instanceof LogicalTypes.Decimal) { - arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + arrowType = createDecimalArrowType(decimalType, schema); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); - consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector); + if (decimalType.getPrecision() <= 38) { + consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector); + } else { + consumer = + new AvroDecimal256Consumer.BytesDecimal256Consumer((Decimal256Vector) vector); + } } else { arrowType = new ArrowType.Binary(); fieldType = - new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); + new FieldType( + nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); vector = createVector(consumerVector, fieldType, name, allocator); consumer = new AvroBytesConsumer((VarBinaryVector) vector); } break; case NULL: arrowType = new ArrowType.Null(); - fieldType = new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, /* schemaCallBack= */ null); + fieldType = + new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config)); + vector = new NullVector(name, fieldType); // Respect nullability defined in fieldType consumer = new AvroNullConsumer((NullVector) vector); break; default: @@ -304,19 +395,31 @@ private static Consumer createConsumer( return consumer; } - private static ArrowType createDecimalArrowType(LogicalTypes.Decimal logicalType) { + private static ArrowType createDecimalArrowType(LogicalTypes.Decimal logicalType, Schema schema) { final int scale = logicalType.getScale(); final int precision = logicalType.getPrecision(); Preconditions.checkArgument( - precision > 0 && precision <= 38, "Precision must be in range of 1 to 38"); - Preconditions.checkArgument(scale >= 0 && scale <= 38, "Scale must be in range of 0 to 38."); + precision > 0 && precision <= 76, "Precision must be in range of 1 to 76"); + Preconditions.checkArgument(scale >= 0 && scale <= 76, "Scale must be in range of 0 to 76."); Preconditions.checkArgument( scale <= precision, "Invalid decimal scale: %s (greater than precision: %s)", scale, precision); - return new ArrowType.Decimal(precision, scale, 128); + if (schema.getType() == Schema.Type.FIXED) { + if (schema.getFixedSize() <= 16) { + return new ArrowType.Decimal(precision, scale, 128); + } else { + return new ArrowType.Decimal(precision, scale, 256); + } + } else { + if (precision <= 38) { + return new ArrowType.Decimal(precision, scale, 128); + } else { + return new ArrowType.Decimal(precision, scale, 256); + } + } } private static Consumer createSkipConsumer(Schema schema) { @@ -406,6 +509,30 @@ private static Consumer createSkipConsumer(Schema schema) { return new SkipConsumer(skipFunction); } + static org.apache.arrow.vector.types.pojo.Schema createArrowSchema( + Schema schema, AvroToArrowConfig config) { + + // Create an Arrow schema matching the structure of vectors built by createCompositeConsumer() + + Set skipFieldNames = config.getSkipFieldNames(); + List arrowFields = new ArrayList<>(schema.getFields().size()); + + Schema.Type type = schema.getType(); + if (type == Schema.Type.RECORD) { + for (Schema.Field field : schema.getFields()) { + if (!skipFieldNames.contains(field.name())) { + Field arrowField = avroSchemaToField(field.schema(), field.name(), config); + arrowFields.add(arrowField); + } + } + } else { + Field arrowField = avroSchemaToField(schema, schema.getName(), config); + arrowFields.add(arrowField); + } + + return new org.apache.arrow.vector.types.pojo.Schema(arrowFields); + } + static CompositeAvroConsumer createCompositeConsumer(Schema schema, AvroToArrowConfig config) { List consumers = new ArrayList<>(); @@ -442,11 +569,20 @@ private static String getDefaultFieldName(ArrowType type) { } private static Field avroSchemaToField(Schema schema, String name, AvroToArrowConfig config) { - return avroSchemaToField(schema, name, config, null); + return avroSchemaToField(schema, name, false, config, null); } private static Field avroSchemaToField( Schema schema, String name, AvroToArrowConfig config, Map externalProps) { + return avroSchemaToField(schema, name, false, config, externalProps); + } + + private static Field avroSchemaToField( + Schema schema, + String name, + boolean nullable, + AvroToArrowConfig config, + Map externalProps) { final Schema.Type type = schema.getType(); final LogicalType logicalType = schema.getLogicalType(); @@ -455,33 +591,53 @@ private static Field avroSchemaToField( switch (type) { case UNION: - for (int i = 0; i < schema.getTypes().size(); i++) { - Schema childSchema = schema.getTypes().get(i); - // Union child vector should use default name - children.add(avroSchemaToField(childSchema, null, config)); + boolean nullableUnion = + schema.getTypes().stream().anyMatch(t -> t.getType() == Schema.Type.NULL); + if (nullableUnion && schema.getTypes().size() == 2 && !config.isLegacyMode()) { + // For a simple nullable (null | type), interpret the union as a single nullable field. + // Not available in legacy mode, which uses the literal interpretation instead + Schema childSchema = + schema.getTypes().get(0).getType() == Schema.Type.NULL + ? schema.getTypes().get(1) + : schema.getTypes().get(0); + return avroSchemaToField(childSchema, name, true, config, externalProps); + } else { + // Literal interpretation of a union, which may or may not include a null element. + for (int i = 0; i < schema.getTypes().size(); i++) { + Schema childSchema = schema.getTypes().get(i); + // Union child vector should use default name + children.add(avroSchemaToField(childSchema, null, nullableUnion, config, null)); + } + fieldType = + createFieldType( + new ArrowType.Union(UnionMode.Sparse, null), schema, externalProps, config); } - fieldType = - createFieldType(new ArrowType.Union(UnionMode.Sparse, null), schema, externalProps); break; case ARRAY: Schema elementSchema = schema.getElementType(); - children.add(avroSchemaToField(elementSchema, elementSchema.getName(), config)); - fieldType = createFieldType(new ArrowType.List(), schema, externalProps); + children.add(avroSchemaToField(elementSchema, ListVector.DATA_VECTOR_NAME, config)); + fieldType = createFieldType(nullable, new ArrowType.List(), schema, externalProps, config); break; case MAP: // MapVector internal struct field and key field should be non-nullable FieldType keyFieldType = new FieldType(/* nullable= */ false, new ArrowType.Utf8(), /* dictionary= */ null); - Field keyField = new Field("key", keyFieldType, /* children= */ null); - Field valueField = avroSchemaToField(schema.getValueType(), "value", config); + Field keyField = new Field(MapVector.KEY_NAME, keyFieldType, /* children= */ null); + Field valueField = avroSchemaToField(schema.getValueType(), MapVector.VALUE_NAME, config); FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /* dictionary= */ null); Field structField = - new Field("internal", structFieldType, Arrays.asList(keyField, valueField)); + new Field( + MapVector.DATA_VECTOR_NAME, structFieldType, Arrays.asList(keyField, valueField)); children.add(structField); fieldType = - createFieldType(new ArrowType.Map(/* keysSorted= */ false), schema, externalProps); + createFieldType( + nullable, + new ArrowType.Map(/* keysSorted= */ false), + schema, + externalProps, + config); break; case RECORD: final Set skipFieldNames = config.getSkipFieldNames(); @@ -496,13 +652,14 @@ private static Field avroSchemaToField( if (doc != null) { extProps.put("doc", doc); } - if (aliases != null) { + if (aliases != null && (!aliases.isEmpty() || config.isLegacyMode())) { extProps.put("aliases", convertAliases(aliases)); } children.add(avroSchemaToField(childSchema, fullChildName, config, extProps)); } } - fieldType = createFieldType(new ArrowType.Struct(), schema, externalProps); + fieldType = + createFieldType(nullable, new ArrowType.Struct(), schema, externalProps, config); break; case ENUM: DictionaryProvider.MapDictionaryProvider provider = config.getProvider(); @@ -512,23 +669,25 @@ private static Field avroSchemaToField( fieldType = createFieldType( + nullable, indexType, schema, externalProps, - new DictionaryEncoding(current, /* ordered= */ false, /* indexType= */ indexType)); + new DictionaryEncoding(current, /* ordered= */ false, /* indexType= */ indexType), + config); break; case STRING: - fieldType = createFieldType(new ArrowType.Utf8(), schema, externalProps); + fieldType = createFieldType(nullable, new ArrowType.Utf8(), schema, externalProps, config); break; case FIXED: final ArrowType fixedArrowType; if (logicalType instanceof LogicalTypes.Decimal) { - fixedArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + fixedArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType, schema); } else { fixedArrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); } - fieldType = createFieldType(fixedArrowType, schema, externalProps); + fieldType = createFieldType(nullable, fixedArrowType, schema, externalProps, config); break; case INT: final ArrowType intArrowType; @@ -539,41 +698,62 @@ private static Field avroSchemaToField( } else { intArrowType = new ArrowType.Int(32, /* isSigned= */ true); } - fieldType = createFieldType(intArrowType, schema, externalProps); + fieldType = createFieldType(nullable, intArrowType, schema, externalProps, config); break; case BOOLEAN: - fieldType = createFieldType(new ArrowType.Bool(), schema, externalProps); + fieldType = createFieldType(nullable, new ArrowType.Bool(), schema, externalProps, config); break; case LONG: final ArrowType longArrowType; if (logicalType instanceof LogicalTypes.TimeMicros) { longArrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64); } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - longArrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + // In legacy mode the timestamp-xxx types are treated as local + String tz = config.isLegacyMode() ? null : "UTC"; + longArrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, tz); } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + String tz = config.isLegacyMode() ? null : "UTC"; + longArrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, tz); + } else if (logicalType instanceof LogicalTypes.TimestampNanos) { + String tz = config.isLegacyMode() ? null : "UTC"; + longArrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, tz); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis + && !config.isLegacyMode()) { + // In legacy mode the local-timestamp-xxx types are not recognized (result is just type = + // long) + longArrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros + && !config.isLegacyMode()) { longArrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else if (logicalType instanceof LogicalTypes.LocalTimestampNanos + && !config.isLegacyMode()) { + longArrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, null); } else { longArrowType = new ArrowType.Int(64, /* isSigned= */ true); } - fieldType = createFieldType(longArrowType, schema, externalProps); + fieldType = createFieldType(nullable, longArrowType, schema, externalProps, config); break; case FLOAT: - fieldType = createFieldType(new ArrowType.FloatingPoint(SINGLE), schema, externalProps); + fieldType = + createFieldType( + nullable, new ArrowType.FloatingPoint(SINGLE), schema, externalProps, config); break; case DOUBLE: - fieldType = createFieldType(new ArrowType.FloatingPoint(DOUBLE), schema, externalProps); + fieldType = + createFieldType( + nullable, new ArrowType.FloatingPoint(DOUBLE), schema, externalProps, config); break; case BYTES: final ArrowType bytesArrowType; if (logicalType instanceof LogicalTypes.Decimal) { - bytesArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + bytesArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType, schema); } else { bytesArrowType = new ArrowType.Binary(); } - fieldType = createFieldType(bytesArrowType, schema, externalProps); + fieldType = createFieldType(nullable, bytesArrowType, schema, externalProps, config); break; case NULL: - fieldType = createFieldType(ArrowType.Null.INSTANCE, schema, externalProps); + fieldType = createFieldType(ArrowType.Null.INSTANCE, schema, externalProps, config); break; default: // no-op, shouldn't get here @@ -583,15 +763,24 @@ private static Field avroSchemaToField( if (name == null) { name = getDefaultFieldName(fieldType.getType()); } + if (name.contains(".") && !config.isLegacyMode()) { + // Do not include namespace as part of the field name + name = name.substring(name.lastIndexOf(".") + 1); + } return new Field(name, fieldType, children.size() == 0 ? null : children); } private static Consumer createArrayConsumer( - Schema schema, String name, AvroToArrowConfig config, FieldVector consumerVector) { + Schema schema, + String name, + boolean nullable, + AvroToArrowConfig config, + FieldVector consumerVector) { ListVector listVector; if (consumerVector == null) { - final Field field = avroSchemaToField(schema, name, config); + final Field field = + avroSchemaToField(schema, name, nullable, config, /* externalProps= */ null); listVector = (ListVector) field.createVector(config.getAllocator()); } else { listVector = (ListVector) consumerVector; @@ -607,13 +796,18 @@ private static Consumer createArrayConsumer( } private static Consumer createStructConsumer( - Schema schema, String name, AvroToArrowConfig config, FieldVector consumerVector) { + Schema schema, + String name, + boolean nullable, + AvroToArrowConfig config, + FieldVector consumerVector) { final Set skipFieldNames = config.getSkipFieldNames(); StructVector structVector; if (consumerVector == null) { - final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema)); + final Field field = + avroSchemaToField(schema, name, nullable, config, createExternalProps(schema, config)); structVector = (StructVector) field.createVector(config.getAllocator()); } else { structVector = (StructVector) consumerVector; @@ -644,11 +838,16 @@ private static Consumer createStructConsumer( } private static Consumer createEnumConsumer( - Schema schema, String name, AvroToArrowConfig config, FieldVector consumerVector) { + Schema schema, + String name, + boolean nullable, + AvroToArrowConfig config, + FieldVector consumerVector) { BaseIntVector indexVector; if (consumerVector == null) { - final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema)); + final Field field = + avroSchemaToField(schema, name, nullable, config, createExternalProps(schema, config)); indexVector = (BaseIntVector) field.createVector(config.getAllocator()); } else { indexVector = (BaseIntVector) consumerVector; @@ -668,11 +867,16 @@ private static Consumer createEnumConsumer( } private static Consumer createMapConsumer( - Schema schema, String name, AvroToArrowConfig config, FieldVector consumerVector) { + Schema schema, + String name, + boolean nullable, + AvroToArrowConfig config, + FieldVector consumerVector) { MapVector mapVector; if (consumerVector == null) { - final Field field = avroSchemaToField(schema, name, config); + final Field field = + avroSchemaToField(schema, name, nullable, config, /* externalProps= */ null); mapVector = (MapVector) field.createVector(config.getAllocator()); } else { mapVector = (MapVector) consumerVector; @@ -698,12 +902,13 @@ private static Consumer createMapConsumer( } private static Consumer createUnionConsumer( - Schema schema, String name, AvroToArrowConfig config, FieldVector consumerVector) { + Schema schema, + String name, + boolean nullableUnion, + AvroToArrowConfig config, + FieldVector consumerVector) { final int size = schema.getTypes().size(); - final boolean nullable = - schema.getTypes().stream().anyMatch(t -> t.getType() == Schema.Type.NULL); - UnionVector unionVector; if (consumerVector == null) { final Field field = avroSchemaToField(schema, name, config); @@ -720,7 +925,8 @@ private static Consumer createUnionConsumer( for (int i = 0; i < size; i++) { FieldVector child = childVectors.get(i); Schema subSchema = schema.getTypes().get(i); - Consumer delegate = createConsumer(subSchema, subSchema.getName(), nullable, config, child); + Consumer delegate = + createConsumer(subSchema, subSchema.getName(), nullableUnion, config, child); delegates[i] = delegate; types[i] = child.getMinorType(); } @@ -785,14 +991,24 @@ static VectorSchemaRoot avroToArrowVectors( return root; } - private static Map getMetaData(Schema schema) { + // Do not include props that are part of the Avro format itself as field metadata + // These are already represented in the field / type structure and are not custom attributes + private static final List AVRO_FORMAT_METADATA = + Arrays.asList("logicalType", "precision", "scale"); + + private static Map getMetaData(Schema schema, AvroToArrowConfig config) { Map metadata = new HashMap<>(); - schema.getObjectProps().forEach((k, v) -> metadata.put(k, v.toString())); + for (Map.Entry prop : schema.getObjectProps().entrySet()) { + if (!AVRO_FORMAT_METADATA.contains(prop.getKey()) || config.isLegacyMode()) { + metadata.put(prop.getKey(), prop.getValue().toString()); + } + } return metadata; } - private static Map getMetaData(Schema schema, Map externalProps) { - Map metadata = getMetaData(schema); + private static Map getMetaData( + Schema schema, Map externalProps, AvroToArrowConfig config) { + Map metadata = getMetaData(schema, config); if (externalProps != null) { metadata.putAll(externalProps); } @@ -800,32 +1016,58 @@ private static Map getMetaData(Schema schema, Map createExternalProps(Schema schema) { + private static Map createExternalProps(Schema schema, AvroToArrowConfig config) { final Map extProps = new HashMap<>(); String doc = schema.getDoc(); Set aliases = schema.getAliases(); if (doc != null) { extProps.put("doc", doc); } - if (aliases != null) { + if (aliases != null && (!aliases.isEmpty() || config.isLegacyMode())) { extProps.put("aliases", convertAliases(aliases)); } return extProps; } private static FieldType createFieldType( - ArrowType arrowType, Schema schema, Map externalProps) { - return createFieldType(arrowType, schema, externalProps, /* dictionary= */ null); + ArrowType arrowType, + Schema schema, + Map externalProps, + AvroToArrowConfig config) { + return createFieldType(arrowType, schema, externalProps, /* dictionary= */ null, config); + } + + private static FieldType createFieldType( + boolean nullable, + ArrowType arrowType, + Schema schema, + Map externalProps, + AvroToArrowConfig config) { + return createFieldType( + nullable, arrowType, schema, externalProps, /* dictionary= */ null, config); } private static FieldType createFieldType( ArrowType arrowType, Schema schema, Map externalProps, - DictionaryEncoding dictionary) { + DictionaryEncoding dictionary, + AvroToArrowConfig config) { + + return createFieldType( + /* nullable= */ false, arrowType, schema, externalProps, dictionary, config); + } + + private static FieldType createFieldType( + boolean nullable, + ArrowType arrowType, + Schema schema, + Map externalProps, + DictionaryEncoding dictionary, + AvroToArrowConfig config) { return new FieldType( - /* nullable= */ false, arrowType, dictionary, getMetaData(schema, externalProps)); + nullable, arrowType, dictionary, getMetaData(schema, externalProps, config)); } private static String convertAliases(Set aliases) { diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/AvroNullableConsumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/AvroNullableConsumer.java new file mode 100644 index 0000000000..b67819cb9d --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/AvroNullableConsumer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro.consumers; + +import java.io.IOException; +import org.apache.arrow.vector.FieldVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer wrapper which consumes nullable type values from avro decoder. Write the data to the + * underlying {@link FieldVector}. + * + * @param The vector within consumer or its delegate. + */ +public class AvroNullableConsumer extends BaseAvroConsumer { + + private final Consumer delegate; + private final int nullIndex; + + /** Instantiate a AvroNullableConsumer. */ + @SuppressWarnings("unchecked") + public AvroNullableConsumer(Consumer delegate, int nullIndex) { + super((T) delegate.getVector()); + this.delegate = delegate; + this.nullIndex = nullIndex; + } + + @Override + public void consume(Decoder decoder) throws IOException { + int typeIndex = decoder.readInt(); + if (typeIndex == nullIndex) { + decoder.readNull(); + delegate.addNull(); + } else { + delegate.consume(decoder); + } + currentIndex++; + } + + @Override + public void addNull() { + // Can be called by containers of nullable types + delegate.addNull(); + currentIndex++; + } + + @Override + public void setPosition(int index) { + if (index < 0 || index > vector.getValueCount()) { + throw new IllegalArgumentException("Index out of bounds"); + } + delegate.setPosition(index); + super.setPosition(index); + } + + @Override + public boolean resetValueVector(T vector) { + boolean delegateOk = delegate.resetValueVector(vector); + boolean thisOk = super.resetValueVector(vector); + return thisOk && delegateOk; + } + + @Override + public void close() throws Exception { + super.close(); + delegate.close(); + } +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroDecimal256Consumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroDecimal256Consumer.java new file mode 100644 index 0000000000..12652833a1 --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroDecimal256Consumer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro.consumers.logical; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.Decimal256Vector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume 256-bit decimal type values from avro decoder. Write the data to {@link + * Decimal256Vector}. + */ +public abstract class AvroDecimal256Consumer extends BaseAvroConsumer { + + protected AvroDecimal256Consumer(Decimal256Vector vector) { + super(vector); + } + + /** Consumer for decimal logical type with 256 bit width and original bytes type. */ + public static class BytesDecimal256Consumer extends AvroDecimal256Consumer { + + private ByteBuffer cacheBuffer; + + /** Instantiate a BytesDecimal256Consumer. */ + public BytesDecimal256Consumer(Decimal256Vector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + cacheBuffer = decoder.readBytes(cacheBuffer); + byte[] bytes = new byte[cacheBuffer.limit()]; + Preconditions.checkArgument(bytes.length <= 32, "Decimal bytes length should <= 32."); + cacheBuffer.get(bytes); + vector.setBigEndian(currentIndex++, bytes); + } + } + + /** Consumer for decimal logical type with 256 bit width and original fixed type. */ + public static class FixedDecimal256Consumer extends AvroDecimal256Consumer { + + private final byte[] reuseBytes; + + /** Instantiate a FixedDecimal256Consumer. */ + public FixedDecimal256Consumer(Decimal256Vector vector, int size) { + super(vector); + Preconditions.checkArgument(size <= 32, "Decimal bytes length should <= 32."); + reuseBytes = new byte[size]; + } + + @Override + public void consume(Decoder decoder) throws IOException { + decoder.readFixed(reuseBytes); + vector.setBigEndian(currentIndex++, reuseBytes); + } + } +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosConsumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosConsumer.java index 88acf7b329..5af40ed17d 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosConsumer.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosConsumer.java @@ -22,7 +22,7 @@ import org.apache.avro.io.Decoder; /** - * Consumer which consume date timestamp-micro values from avro decoder. Write the data to {@link + * Consumer which consumes local-timestamp-micros values from avro decoder. Write the data to {@link * TimeStampMicroVector}. */ public class AvroTimestampMicrosConsumer extends BaseAvroConsumer { diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosTzConsumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosTzConsumer.java new file mode 100644 index 0000000000..a5dede4988 --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMicrosTzConsumer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro.consumers.logical; + +import java.io.IOException; +import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consumes timestamp-micros values from avro decoder. Write the data to {@link + * TimeStampMicroTZVector}. + */ +public class AvroTimestampMicrosTzConsumer extends BaseAvroConsumer { + + /** Instantiate a AvroTimestampMicrosTzConsumer. */ + public AvroTimestampMicrosTzConsumer(TimeStampMicroTZVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisConsumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisConsumer.java index ec50d79023..bc451bd1dc 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisConsumer.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisConsumer.java @@ -22,7 +22,7 @@ import org.apache.avro.io.Decoder; /** - * Consumer which consume date timestamp-millis values from avro decoder. Write the data to {@link + * Consumer which consume local-timestamp-millis values from avro decoder. Write the data to {@link * TimeStampMilliVector}. */ public class AvroTimestampMillisConsumer extends BaseAvroConsumer { diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisTzConsumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisTzConsumer.java new file mode 100644 index 0000000000..255fe501fb --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampMillisTzConsumer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro.consumers.logical; + +import java.io.IOException; +import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume timestamp-millis values from avro decoder. Write the data to {@link + * TimeStampMilliTZVector}. + */ +public class AvroTimestampMillisTzConsumer extends BaseAvroConsumer { + + /** Instantiate a AvroTimestampMillisTzConsumer. */ + public AvroTimestampMillisTzConsumer(TimeStampMilliTZVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampNanosConsumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampNanosConsumer.java new file mode 100644 index 0000000000..b5044d221f --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampNanosConsumer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro.consumers.logical; + +import java.io.IOException; +import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume local-timestamp-nanos values from avro decoder. Write the data to {@link + * TimeStampNanoVector}. + */ +public class AvroTimestampNanosConsumer extends BaseAvroConsumer { + + /** Instantiate a AvroTimestampNanosConsumer. */ + public AvroTimestampNanosConsumer(TimeStampNanoVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampNanosTzConsumer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampNanosTzConsumer.java new file mode 100644 index 0000000000..3f42b7ccbb --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/logical/AvroTimestampNanosTzConsumer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro.consumers.logical; + +import java.io.IOException; +import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume timestamp-nanos values from avro decoder. Write the data to {@link + * TimeStampNanoTZVector}. + */ +public class AvroTimestampNanosTzConsumer extends BaseAvroConsumer { + + /** Instantiate a AvroTimestampNanosConsumer. */ + public AvroTimestampNanosTzConsumer(TimeStampNanoTZVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/AvroNullableProducer.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/AvroNullableProducer.java index f4215dbf84..5f8b314f49 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/AvroNullableProducer.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/AvroNullableProducer.java @@ -21,8 +21,8 @@ import org.apache.avro.io.Encoder; /** - * Producer wrapper which producers nullable types to an avro encoder. Write the data to the - * underlying {@link FieldVector}. + * Producer wrapper which produces nullable types to an avro encoder. Read data from the underlying + * {@link FieldVector}. * * @param The vector within producer or its delegate, used for partially produce purpose. */ diff --git a/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/ArrowToAvroSchemaTest.java b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/ArrowToAvroSchemaTest.java index a05bbc1653..d3e12e763a 100644 --- a/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/ArrowToAvroSchemaTest.java +++ b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/ArrowToAvroSchemaTest.java @@ -319,10 +319,10 @@ public void testConvertDecimalTypes() { FieldType.notNullable(new ArrowType.Decimal(20, 10, 128)), null), new Field( - "nullableDecimal256", FieldType.nullable(new ArrowType.Decimal(20, 4, 256)), null), + "nullableDecimal256", FieldType.nullable(new ArrowType.Decimal(55, 15, 256)), null), new Field( "nonNullableDecimal2561", - FieldType.notNullable(new ArrowType.Decimal(20, 4, 256)), + FieldType.notNullable(new ArrowType.Decimal(55, 25, 256)), null), new Field( "nonNullableDecimal2562", @@ -330,7 +330,7 @@ public void testConvertDecimalTypes() { null), new Field( "nonNullableDecimal2563", - FieldType.notNullable(new ArrowType.Decimal(30, 15, 256)), + FieldType.notNullable(new ArrowType.Decimal(60, 50, 256)), null)); Schema schema = ArrowToAvroUtils.createAvroSchema(fields, "TestRecord"); @@ -383,9 +383,9 @@ public void testConvertDecimalTypes() { schema.getField("nullableDecimal256").schema().getTypes().get(0); assertEquals(Schema.Type.FIXED, nullableDecimal256Schema.getType()); assertEquals(32, nullableDecimal256Schema.getFixedSize()); - assertEquals(LogicalTypes.decimal(20, 4), nullableDecimal256Schema.getLogicalType()); - assertEquals(20, nullableDecimal256Schema.getObjectProp("precision")); - assertEquals(4, nullableDecimal256Schema.getObjectProp("scale")); + assertEquals(LogicalTypes.decimal(55, 15), nullableDecimal256Schema.getLogicalType()); + assertEquals(55, nullableDecimal256Schema.getObjectProp("precision")); + assertEquals(15, nullableDecimal256Schema.getObjectProp("scale")); assertEquals( Schema.Type.NULL, schema.getField("nullableDecimal256").schema().getTypes().get(1).getType()); @@ -394,9 +394,9 @@ public void testConvertDecimalTypes() { Schema nonNullableDecimal2561Schema = schema.getField("nonNullableDecimal2561").schema(); assertEquals(Schema.Type.FIXED, nonNullableDecimal2561Schema.getType()); assertEquals(32, nonNullableDecimal2561Schema.getFixedSize()); - assertEquals(LogicalTypes.decimal(20, 4), nonNullableDecimal2561Schema.getLogicalType()); - assertEquals(20, nonNullableDecimal2561Schema.getObjectProp("precision")); - assertEquals(4, nonNullableDecimal2561Schema.getObjectProp("scale")); + assertEquals(LogicalTypes.decimal(55, 25), nonNullableDecimal2561Schema.getLogicalType()); + assertEquals(55, nonNullableDecimal2561Schema.getObjectProp("precision")); + assertEquals(25, nonNullableDecimal2561Schema.getObjectProp("scale")); // Assertions for nonNullableDecimal2562 Schema nonNullableDecimal2562Schema = schema.getField("nonNullableDecimal2562").schema(); @@ -410,9 +410,9 @@ public void testConvertDecimalTypes() { Schema nonNullableDecimal2563Schema = schema.getField("nonNullableDecimal2563").schema(); assertEquals(Schema.Type.FIXED, nonNullableDecimal2563Schema.getType()); assertEquals(32, nonNullableDecimal2563Schema.getFixedSize()); - assertEquals(LogicalTypes.decimal(30, 15), nonNullableDecimal2563Schema.getLogicalType()); - assertEquals(30, nonNullableDecimal2563Schema.getObjectProp("precision")); - assertEquals(15, nonNullableDecimal2563Schema.getObjectProp("scale")); + assertEquals(LogicalTypes.decimal(60, 50), nonNullableDecimal2563Schema.getLogicalType()); + assertEquals(60, nonNullableDecimal2563Schema.getObjectProp("precision")); + assertEquals(50, nonNullableDecimal2563Schema.getObjectProp("scale")); } @Test diff --git a/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/AvroLogicalTypesTest.java b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/AvroLogicalTypesTest.java index 173cc855b1..801456d79b 100644 --- a/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/AvroLogicalTypesTest.java +++ b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/AvroLogicalTypesTest.java @@ -173,7 +173,7 @@ public void testInvalidDecimalPrecision() throws Exception { IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> writeAndRead(schema, data)); - assertTrue(e.getMessage().contains("Precision must be in range of 1 to 38")); + assertTrue(e.getMessage().contains("Precision must be in range of 1 to 76")); } @Test diff --git a/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/RoundTripDataTest.java b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/RoundTripDataTest.java new file mode 100644 index 0000000000..85e6a960b0 --- /dev/null +++ b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/RoundTripDataTest.java @@ -0,0 +1,1606 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.Decimal256Vector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.NullVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.writer.BaseWriter; +import org.apache.arrow.vector.complex.writer.FieldWriter; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class RoundTripDataTest { + + @TempDir public static File TMP; + + private static AvroToArrowConfig basicConfig(BufferAllocator allocator) { + return new AvroToArrowConfig(allocator, 1000, null, Collections.emptySet(), false); + } + + private static VectorSchemaRoot readDataFile( + Schema schema, File dataFile, BufferAllocator allocator) throws Exception { + + try (FileInputStream fis = new FileInputStream(dataFile)) { + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(fis, null); + return AvroToArrow.avroToArrow(schema, decoder, basicConfig(allocator)); + } + } + + private static void roundTripTest( + VectorSchemaRoot root, BufferAllocator allocator, File dataFile, int rowCount) + throws Exception { + + // Write an AVRO block using the producer classes + try (FileOutputStream fos = new FileOutputStream(dataFile)) { + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null); + CompositeAvroProducer producer = + ArrowToAvroUtils.createCompositeProducer(root.getFieldVectors()); + for (int row = 0; row < rowCount; row++) { + producer.produce(encoder); + } + encoder.flush(); + } + + // Generate AVRO schema + Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields()); + + // Read back in and compare + try (VectorSchemaRoot roundTrip = readDataFile(schema, dataFile, allocator)) { + + assertEquals(root.getSchema(), roundTrip.getSchema()); + assertEquals(rowCount, roundTrip.getRowCount()); + + // Read and check values + for (int row = 0; row < rowCount; row++) { + assertEquals(root.getVector(0).getObject(row), roundTrip.getVector(0).getObject(row)); + } + } + } + + private static void roundTripByteArrayTest( + VectorSchemaRoot root, BufferAllocator allocator, File dataFile, int rowCount) + throws Exception { + + // Write an AVRO block using the producer classes + try (FileOutputStream fos = new FileOutputStream(dataFile)) { + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null); + CompositeAvroProducer producer = + ArrowToAvroUtils.createCompositeProducer(root.getFieldVectors()); + for (int row = 0; row < rowCount; row++) { + producer.produce(encoder); + } + encoder.flush(); + } + + // Generate AVRO schema + Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields()); + + // Read back in and compare + try (VectorSchemaRoot roundTrip = readDataFile(schema, dataFile, allocator)) { + + assertEquals(root.getSchema(), roundTrip.getSchema()); + assertEquals(rowCount, roundTrip.getRowCount()); + + // Read and check values + for (int row = 0; row < rowCount; row++) { + byte[] rootBytes = (byte[]) root.getVector(0).getObject(row); + byte[] roundTripBytes = (byte[]) roundTrip.getVector(0).getObject(row); + assertArrayEquals(rootBytes, roundTripBytes); + } + } + } + + // Data round trip for primitive types, nullable and non-nullable + + @Test + public void testRoundTripNullColumn() throws Exception { + + // The current read implementation expects EOF, which never happens for a single null vector + // Include a boolean vector with this test for now, so that EOF exception will be triggered + + // Field definition + FieldType nullField = new FieldType(false, new ArrowType.Null(), null); + FieldType booleanField = new FieldType(false, new ArrowType.Bool(), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + NullVector nullVector = new NullVector(new Field("nullColumn", nullField, null)); + BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator); + + int rowCount = 10; + + // Set up VSR + List vectors = Arrays.asList(nullVector, booleanVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set all values to null + for (int row = 0; row < rowCount; row++) { + nullVector.setNull(row); + booleanVector.set(row, 0); + } + + File dataFile = new File(TMP, "testRoundTripNullColumn.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripBooleans() throws Exception { + + // Field definition + FieldType booleanField = new FieldType(false, new ArrowType.Bool(), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator); + + // Set up VSR + List vectors = Arrays.asList(booleanVector); + int rowCount = 10; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + for (int row = 0; row < rowCount; row++) { + booleanVector.set(row, row % 2 == 0 ? 1 : 0); + } + + File dataFile = new File(TMP, "testRoundTripBooleans.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableBooleans() throws Exception { + + // Field definition + FieldType booleanField = new FieldType(true, new ArrowType.Bool(), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = Arrays.asList(booleanVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Null value + booleanVector.setNull(0); + + // False value + booleanVector.set(1, 0); + + // True value + booleanVector.set(2, 1); + + File dataFile = new File(TMP, "testRoundTripNullableBooleans.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripIntegers() throws Exception { + + // Field definitions + FieldType int32Field = new FieldType(false, new ArrowType.Int(32, true), null); + FieldType int64Field = new FieldType(false, new ArrowType.Int(64, true), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + IntVector int32Vector = new IntVector(new Field("int32", int32Field, null), allocator); + BigIntVector int64Vector = new BigIntVector(new Field("int64", int64Field, null), allocator); + + // Set up VSR + List vectors = Arrays.asList(int32Vector, int64Vector); + + int rowCount = 12; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + for (int row = 0; row < 10; row++) { + int32Vector.set(row, 513 * row * (row % 2 == 0 ? 1 : -1)); + int64Vector.set(row, 3791L * row * (row % 2 == 0 ? 1 : -1)); + } + + // Min values + int32Vector.set(10, Integer.MIN_VALUE); + int64Vector.set(10, Long.MIN_VALUE); + + // Max values + int32Vector.set(11, Integer.MAX_VALUE); + int64Vector.set(11, Long.MAX_VALUE); + + File dataFile = new File(TMP, "testRoundTripIntegers.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableIntegers() throws Exception { + + // Field definitions + FieldType int32Field = new FieldType(true, new ArrowType.Int(32, true), null); + FieldType int64Field = new FieldType(true, new ArrowType.Int(64, true), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + IntVector int32Vector = new IntVector(new Field("int32", int32Field, null), allocator); + BigIntVector int64Vector = new BigIntVector(new Field("int64", int64Field, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = Arrays.asList(int32Vector, int64Vector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Null values + int32Vector.setNull(0); + int64Vector.setNull(0); + + // Zero values + int32Vector.set(1, 0); + int64Vector.set(1, 0); + + // Non-zero values + int32Vector.set(2, Integer.MAX_VALUE); + int64Vector.set(2, Long.MAX_VALUE); + + File dataFile = new File(TMP, "testRoundTripNullableIntegers.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripFloatingPoints() throws Exception { + + // Field definitions + FieldType float32Field = + new FieldType(false, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), null); + FieldType float64Field = + new FieldType(false, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + Float4Vector float32Vector = + new Float4Vector(new Field("float32", float32Field, null), allocator); + Float8Vector float64Vector = + new Float8Vector(new Field("float64", float64Field, null), allocator); + + // Set up VSR + List vectors = Arrays.asList(float32Vector, float64Vector); + int rowCount = 15; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + for (int row = 0; row < 10; row++) { + float32Vector.set(row, 37.6f * row * (row % 2 == 0 ? 1 : -1)); + float64Vector.set(row, 37.6d * row * (row % 2 == 0 ? 1 : -1)); + } + + float32Vector.set(10, Float.MIN_VALUE); + float64Vector.set(10, Double.MIN_VALUE); + + float32Vector.set(11, Float.MAX_VALUE); + float64Vector.set(11, Double.MAX_VALUE); + + float32Vector.set(12, Float.NaN); + float64Vector.set(12, Double.NaN); + + float32Vector.set(13, Float.POSITIVE_INFINITY); + float64Vector.set(13, Double.POSITIVE_INFINITY); + + float32Vector.set(14, Float.NEGATIVE_INFINITY); + float64Vector.set(14, Double.NEGATIVE_INFINITY); + + File dataFile = new File(TMP, "testRoundTripFloatingPoints.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableFloatingPoints() throws Exception { + + // Field definitions + FieldType float32Field = + new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), null); + FieldType float64Field = + new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + Float4Vector float32Vector = + new Float4Vector(new Field("float32", float32Field, null), allocator); + Float8Vector float64Vector = + new Float8Vector(new Field("float64", float64Field, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = Arrays.asList(float32Vector, float64Vector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Null values + float32Vector.setNull(0); + float64Vector.setNull(0); + + // Zero values + float32Vector.set(1, 0.0f); + float64Vector.set(1, 0.0); + + // Non-zero values + float32Vector.set(2, 1.0f); + float64Vector.set(2, 1.0); + + File dataFile = new File(TMP, "testRoundTripNullableFloatingPoints.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripStrings() throws Exception { + + // Field definition + FieldType stringField = new FieldType(false, new ArrowType.Utf8(), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + VarCharVector stringVector = + new VarCharVector(new Field("string", stringField, null), allocator); + + // Set up VSR + List vectors = Arrays.asList(stringVector); + int rowCount = 5; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + stringVector.setSafe(0, "Hello world!".getBytes()); + stringVector.setSafe(1, "<%**\r\n\t\\abc\0$$>".getBytes()); + stringVector.setSafe(2, "你好世界".getBytes()); + stringVector.setSafe(3, "مرحبا بالعالم".getBytes()); + stringVector.setSafe(4, "(P ∧ P ⇒ Q) ⇒ Q".getBytes()); + + File dataFile = new File(TMP, "testRoundTripStrings.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableStrings() throws Exception { + + // Field definition + FieldType stringField = new FieldType(true, new ArrowType.Utf8(), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + VarCharVector stringVector = + new VarCharVector(new Field("string", stringField, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = Arrays.asList(stringVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + stringVector.setNull(0); + stringVector.setSafe(1, "".getBytes()); + stringVector.setSafe(2, "not empty".getBytes()); + + File dataFile = new File(TMP, "testRoundTripNullableStrings.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripBinary() throws Exception { + + // Field definition + FieldType binaryField = new FieldType(false, new ArrowType.Binary(), null); + FieldType fixedField = new FieldType(false, new ArrowType.FixedSizeBinary(5), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + VarBinaryVector binaryVector = + new VarBinaryVector(new Field("binary", binaryField, null), allocator); + FixedSizeBinaryVector fixedVector = + new FixedSizeBinaryVector(new Field("fixed", fixedField, null), allocator); + + // Set up VSR + List vectors = Arrays.asList(binaryVector, fixedVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + binaryVector.setSafe(0, new byte[] {1, 2, 3}); + binaryVector.setSafe(1, new byte[] {4, 5, 6, 7}); + binaryVector.setSafe(2, new byte[] {8, 9}); + + fixedVector.setSafe(0, new byte[] {1, 2, 3, 4, 5}); + fixedVector.setSafe(1, new byte[] {4, 5, 6, 7, 8, 9}); + fixedVector.setSafe(2, new byte[] {8, 9, 10, 11, 12}); + + File dataFile = new File(TMP, "testRoundTripBinary.avro"); + + roundTripByteArrayTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableBinary() throws Exception { + + // Field definition + FieldType binaryField = new FieldType(true, new ArrowType.Binary(), null); + FieldType fixedField = new FieldType(true, new ArrowType.FixedSizeBinary(5), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + VarBinaryVector binaryVector = + new VarBinaryVector(new Field("binary", binaryField, null), allocator); + FixedSizeBinaryVector fixedVector = + new FixedSizeBinaryVector(new Field("fixed", fixedField, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = Arrays.asList(binaryVector, fixedVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + binaryVector.setNull(0); + binaryVector.setSafe(1, new byte[] {}); + binaryVector.setSafe(2, new byte[] {10, 11, 12}); + + fixedVector.setNull(0); + fixedVector.setSafe(1, new byte[] {0, 0, 0, 0, 0}); + fixedVector.setSafe(2, new byte[] {10, 11, 12, 13, 14}); + + File dataFile = new File(TMP, "testRoundTripNullableBinary.avro"); + + roundTripByteArrayTest(root, allocator, dataFile, rowCount); + } + } + + // Data round trip for logical types, nullable and non-nullable + + @Test + public void testRoundTripDecimals() throws Exception { + + // Field definitions + FieldType decimal128Field1 = new FieldType(false, new ArrowType.Decimal(38, 10, 128), null); + FieldType decimal128Field2 = new FieldType(false, new ArrowType.Decimal(38, 5, 128), null); + FieldType decimal256Field1 = new FieldType(false, new ArrowType.Decimal(76, 20, 256), null); + FieldType decimal256Field2 = new FieldType(false, new ArrowType.Decimal(76, 10, 256), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + DecimalVector decimal128Vector1 = + new DecimalVector(new Field("decimal128_1", decimal128Field1, null), allocator); + DecimalVector decimal128Vector2 = + new DecimalVector(new Field("decimal128_2", decimal128Field2, null), allocator); + Decimal256Vector decimal256Vector1 = + new Decimal256Vector(new Field("decimal256_1", decimal256Field1, null), allocator); + Decimal256Vector decimal256Vector2 = + new Decimal256Vector(new Field("decimal256_2", decimal256Field2, null), allocator); + + // Set up VSR + List vectors = + Arrays.asList(decimal128Vector1, decimal128Vector2, decimal256Vector1, decimal256Vector2); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + decimal128Vector1.setSafe( + 0, new BigDecimal("12345.67890").setScale(10, RoundingMode.UNNECESSARY)); + decimal128Vector1.setSafe( + 1, new BigDecimal("-98765.43210").setScale(10, RoundingMode.UNNECESSARY)); + decimal128Vector1.setSafe( + 2, new BigDecimal("54321.09876").setScale(10, RoundingMode.UNNECESSARY)); + + decimal128Vector2.setSafe( + 0, new BigDecimal("12345.67890").setScale(5, RoundingMode.UNNECESSARY)); + decimal128Vector2.setSafe( + 1, new BigDecimal("-98765.43210").setScale(5, RoundingMode.UNNECESSARY)); + decimal128Vector2.setSafe( + 2, new BigDecimal("54321.09876").setScale(5, RoundingMode.UNNECESSARY)); + + decimal256Vector1.setSafe( + 0, + new BigDecimal("12345678901234567890.12345678901234567890") + .setScale(20, RoundingMode.UNNECESSARY)); + decimal256Vector1.setSafe( + 1, + new BigDecimal("-98765432109876543210.98765432109876543210") + .setScale(20, RoundingMode.UNNECESSARY)); + decimal256Vector1.setSafe( + 2, + new BigDecimal("54321098765432109876.54321098765432109876") + .setScale(20, RoundingMode.UNNECESSARY)); + + decimal256Vector2.setSafe( + 0, + new BigDecimal("12345678901234567890.1234567890").setScale(10, RoundingMode.UNNECESSARY)); + decimal256Vector2.setSafe( + 1, + new BigDecimal("-98765432109876543210.9876543210") + .setScale(10, RoundingMode.UNNECESSARY)); + decimal256Vector2.setSafe( + 2, + new BigDecimal("54321098765432109876.5432109876").setScale(10, RoundingMode.UNNECESSARY)); + + File dataFile = new File(TMP, "testRoundTripDecimals.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableDecimals() throws Exception { + + // Field definitions + FieldType decimal128Field1 = new FieldType(true, new ArrowType.Decimal(38, 10, 128), null); + FieldType decimal128Field2 = new FieldType(true, new ArrowType.Decimal(38, 5, 128), null); + FieldType decimal256Field1 = new FieldType(true, new ArrowType.Decimal(76, 20, 256), null); + FieldType decimal256Field2 = new FieldType(true, new ArrowType.Decimal(76, 10, 256), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + DecimalVector decimal128Vector1 = + new DecimalVector(new Field("decimal128_1", decimal128Field1, null), allocator); + DecimalVector decimal128Vector2 = + new DecimalVector(new Field("decimal128_2", decimal128Field2, null), allocator); + Decimal256Vector decimal256Vector1 = + new Decimal256Vector(new Field("decimal256_1", decimal256Field1, null), allocator); + Decimal256Vector decimal256Vector2 = + new Decimal256Vector(new Field("decimal256_2", decimal256Field2, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = + Arrays.asList(decimal128Vector1, decimal128Vector2, decimal256Vector1, decimal256Vector2); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + decimal128Vector1.setNull(0); + decimal128Vector1.setSafe(1, BigDecimal.ZERO.setScale(10, RoundingMode.UNNECESSARY)); + decimal128Vector1.setSafe( + 2, new BigDecimal("12345.67890").setScale(10, RoundingMode.UNNECESSARY)); + + decimal128Vector2.setNull(0); + decimal128Vector2.setSafe(1, BigDecimal.ZERO.setScale(5, RoundingMode.UNNECESSARY)); + decimal128Vector2.setSafe( + 2, new BigDecimal("98765.43210").setScale(5, RoundingMode.UNNECESSARY)); + + decimal256Vector1.setNull(0); + decimal256Vector1.setSafe(1, BigDecimal.ZERO.setScale(20, RoundingMode.UNNECESSARY)); + decimal256Vector1.setSafe( + 2, + new BigDecimal("12345678901234567890.12345678901234567890") + .setScale(20, RoundingMode.UNNECESSARY)); + + decimal256Vector2.setNull(0); + decimal256Vector2.setSafe(1, BigDecimal.ZERO.setScale(10, RoundingMode.UNNECESSARY)); + decimal256Vector2.setSafe( + 2, + new BigDecimal("98765432109876543210.9876543210").setScale(10, RoundingMode.UNNECESSARY)); + + File dataFile = new File(TMP, "testRoundTripNullableDecimals.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripDates() throws Exception { + + // Field definitions + FieldType dateDayField = new FieldType(false, new ArrowType.Date(DateUnit.DAY), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + DateDayVector dateDayVector = + new DateDayVector(new Field("dateDay", dateDayField, null), allocator); + + // Set up VSR + List vectors = Arrays.asList(dateDayVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + dateDayVector.setSafe(0, (int) LocalDate.now().toEpochDay()); + dateDayVector.setSafe(1, (int) LocalDate.now().toEpochDay() + 1); + dateDayVector.setSafe(2, (int) LocalDate.now().toEpochDay() + 2); + + File dataFile = new File(TMP, "testRoundTripDates.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableDates() throws Exception { + + // Field definitions + FieldType dateDayField = new FieldType(true, new ArrowType.Date(DateUnit.DAY), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + DateDayVector dateDayVector = + new DateDayVector(new Field("dateDay", dateDayField, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = Arrays.asList(dateDayVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + dateDayVector.setNull(0); + dateDayVector.setSafe(1, 0); + dateDayVector.setSafe(2, (int) LocalDate.now().toEpochDay()); + + File dataFile = new File(TMP, "testRoundTripNullableDates.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripTimes() throws Exception { + + // Field definitions + FieldType timeMillisField = + new FieldType(false, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); + FieldType timeMicrosField = + new FieldType(false, new ArrowType.Time(TimeUnit.MICROSECOND, 64), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + TimeMilliVector timeMillisVector = + new TimeMilliVector(new Field("timeMillis", timeMillisField, null), allocator); + TimeMicroVector timeMicrosVector = + new TimeMicroVector(new Field("timeMicros", timeMicrosField, null), allocator); + + // Set up VSR + List vectors = Arrays.asList(timeMillisVector, timeMicrosVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + timeMillisVector.setSafe( + 0, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000)); + timeMillisVector.setSafe( + 1, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000) - 1000); + timeMillisVector.setSafe( + 2, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000) - 2000); + + timeMicrosVector.setSafe(0, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000); + timeMicrosVector.setSafe(1, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000 - 1000000); + timeMicrosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000 - 2000000); + + File dataFile = new File(TMP, "testRoundTripTimes.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableTimes() throws Exception { + + // Field definitions + FieldType timeMillisField = + new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); + FieldType timeMicrosField = + new FieldType(true, new ArrowType.Time(TimeUnit.MICROSECOND, 64), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + TimeMilliVector timeMillisVector = + new TimeMilliVector(new Field("timeMillis", timeMillisField, null), allocator); + TimeMicroVector timeMicrosVector = + new TimeMicroVector(new Field("timeMicros", timeMicrosField, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = Arrays.asList(timeMillisVector, timeMicrosVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + timeMillisVector.setNull(0); + timeMillisVector.setSafe(1, 0); + timeMillisVector.setSafe( + 2, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000)); + + timeMicrosVector.setNull(0); + timeMicrosVector.setSafe(1, 0); + timeMicrosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000); + + File dataFile = new File(TMP, "testRoundTripNullableTimes.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripZoneAwareTimestamps() throws Exception { + + // Field definitions + FieldType timestampMillisField = + new FieldType(false, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null); + FieldType timestampMicrosField = + new FieldType(false, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), null); + FieldType timestampNanosField = + new FieldType(false, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + TimeStampMilliTZVector timestampMillisVector = + new TimeStampMilliTZVector( + new Field("timestampMillis", timestampMillisField, null), allocator); + TimeStampMicroTZVector timestampMicrosVector = + new TimeStampMicroTZVector( + new Field("timestampMicros", timestampMicrosField, null), allocator); + TimeStampNanoTZVector timestampNanosVector = + new TimeStampNanoTZVector( + new Field("timestampNanos", timestampNanosField, null), allocator); + + // Set up VSR + List vectors = + Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + timestampMillisVector.setSafe(0, (int) Instant.now().toEpochMilli()); + timestampMillisVector.setSafe(1, (int) Instant.now().toEpochMilli() - 1000); + timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli() - 2000); + + timestampMicrosVector.setSafe(0, Instant.now().toEpochMilli() * 1000); + timestampMicrosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000); + timestampMicrosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000); + + timestampNanosVector.setSafe(0, Instant.now().toEpochMilli() * 1000000); + timestampNanosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000000); + timestampNanosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000000); + + File dataFile = new File(TMP, "testRoundTripZoneAwareTimestamps.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableZoneAwareTimestamps() throws Exception { + + // Field definitions + FieldType timestampMillisField = + new FieldType(true, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null); + FieldType timestampMicrosField = + new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), null); + FieldType timestampNanosField = + new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + TimeStampMilliTZVector timestampMillisVector = + new TimeStampMilliTZVector( + new Field("timestampMillis", timestampMillisField, null), allocator); + TimeStampMicroTZVector timestampMicrosVector = + new TimeStampMicroTZVector( + new Field("timestampMicros", timestampMicrosField, null), allocator); + TimeStampNanoTZVector timestampNanosVector = + new TimeStampNanoTZVector( + new Field("timestampNanos", timestampNanosField, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = + Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + timestampMillisVector.setNull(0); + timestampMillisVector.setSafe(1, 0); + timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli()); + + timestampMicrosVector.setNull(0); + timestampMicrosVector.setSafe(1, 0); + timestampMicrosVector.setSafe(2, Instant.now().toEpochMilli() * 1000); + + timestampNanosVector.setNull(0); + timestampNanosVector.setSafe(1, 0); + timestampNanosVector.setSafe(2, Instant.now().toEpochMilli() * 1000000); + + File dataFile = new File(TMP, "testRoundTripNullableZoneAwareTimestamps.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripLocalTimestamps() throws Exception { + + // Field definitions + FieldType timestampMillisField = + new FieldType(false, new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), null); + FieldType timestampMicrosField = + new FieldType(false, new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), null); + FieldType timestampNanosField = + new FieldType(false, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + TimeStampMilliVector timestampMillisVector = + new TimeStampMilliVector( + new Field("timestampMillis", timestampMillisField, null), allocator); + TimeStampMicroVector timestampMicrosVector = + new TimeStampMicroVector( + new Field("timestampMicros", timestampMicrosField, null), allocator); + TimeStampNanoVector timestampNanosVector = + new TimeStampNanoVector(new Field("timestampNanos", timestampNanosField, null), allocator); + + // Set up VSR + List vectors = + Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + timestampMillisVector.setSafe(0, (int) Instant.now().toEpochMilli()); + timestampMillisVector.setSafe(1, (int) Instant.now().toEpochMilli() - 1000); + timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli() - 2000); + + timestampMicrosVector.setSafe(0, Instant.now().toEpochMilli() * 1000); + timestampMicrosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000); + timestampMicrosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000); + + timestampNanosVector.setSafe(0, Instant.now().toEpochMilli() * 1000000); + timestampNanosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000000); + timestampNanosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000000); + + File dataFile = new File(TMP, "testRoundTripLocalTimestamps.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableLocalTimestamps() throws Exception { + + // Field definitions + FieldType timestampMillisField = + new FieldType(true, new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), null); + FieldType timestampMicrosField = + new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), null); + FieldType timestampNanosField = + new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + TimeStampMilliVector timestampMillisVector = + new TimeStampMilliVector( + new Field("timestampMillis", timestampMillisField, null), allocator); + TimeStampMicroVector timestampMicrosVector = + new TimeStampMicroVector( + new Field("timestampMicros", timestampMicrosField, null), allocator); + TimeStampNanoVector timestampNanosVector = + new TimeStampNanoVector(new Field("timestampNanos", timestampNanosField, null), allocator); + + int rowCount = 3; + + // Set up VSR + List vectors = + Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector); + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + timestampMillisVector.setNull(0); + timestampMillisVector.setSafe(1, 0); + timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli()); + + timestampMicrosVector.setNull(0); + timestampMicrosVector.setSafe(1, 0); + timestampMicrosVector.setSafe(2, Instant.now().toEpochMilli() * 1000); + + timestampNanosVector.setNull(0); + timestampNanosVector.setSafe(1, 0); + timestampNanosVector.setSafe(2, Instant.now().toEpochMilli() * 1000000); + + File dataFile = new File(TMP, "testRoundTripNullableLocalTimestamps.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + // Data round trip for containers of primitive and logical types, nullable and non-nullable + + @Test + public void testRoundTripLists() throws Exception { + + // Field definitions + FieldType intListField = new FieldType(false, new ArrowType.List(), null); + FieldType stringListField = new FieldType(false, new ArrowType.List(), null); + FieldType dateListField = new FieldType(false, new ArrowType.List(), null); + + Field intField = new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Field stringField = new Field("item", FieldType.notNullable(new ArrowType.Utf8()), null); + Field dateField = + new Field("item", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + ListVector intListVector = new ListVector("intList", allocator, intListField, null); + ListVector stringListVector = new ListVector("stringList", allocator, stringListField, null); + ListVector dateListVector = new ListVector("dateList", allocator, dateListField, null); + + intListVector.initializeChildrenFromFields(Arrays.asList(intField)); + stringListVector.initializeChildrenFromFields(Arrays.asList(stringField)); + dateListVector.initializeChildrenFromFields(Arrays.asList(dateField)); + + // Set up VSR + List vectors = Arrays.asList(intListVector, stringListVector, dateListVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + FieldWriter intListWriter = intListVector.getWriter(); + FieldWriter stringListWriter = stringListVector.getWriter(); + FieldWriter dateListWriter = dateListVector.getWriter(); + + // Set test data for intList + for (int i = 0; i < rowCount; i++) { + intListWriter.startList(); + for (int j = 0; j < 5 - i; j++) { + intListWriter.writeInt(j); + } + intListWriter.endList(); + } + + // Set test data for stringList + for (int i = 0; i < rowCount; i++) { + stringListWriter.startList(); + for (int j = 0; j < 5 - i; j++) { + stringListWriter.writeVarChar("string" + j); + } + stringListWriter.endList(); + } + + // Set test data for dateList + for (int i = 0; i < rowCount; i++) { + dateListWriter.startList(); + for (int j = 0; j < 5 - i; j++) { + dateListWriter.writeDateDay((int) LocalDate.now().plusDays(j).toEpochDay()); + } + dateListWriter.endList(); + } + + // Update count for the vectors + intListVector.setValueCount(rowCount); + stringListVector.setValueCount(rowCount); + dateListVector.setValueCount(rowCount); + + File dataFile = new File(TMP, "testRoundTripLists.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableLists() throws Exception { + + // Field definitions + FieldType nullListType = new FieldType(true, new ArrowType.List(), null); + FieldType nonNullListType = new FieldType(false, new ArrowType.List(), null); + + Field nullFieldType = new Field("item", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field nonNullFieldType = + new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + ListVector nullEntriesVector = + new ListVector("nullEntriesVector", allocator, nonNullListType, null); + ListVector nullListVector = new ListVector("nullListVector", allocator, nullListType, null); + ListVector nullBothVector = new ListVector("nullBothVector", allocator, nullListType, null); + + nullEntriesVector.initializeChildrenFromFields(Arrays.asList(nullFieldType)); + nullListVector.initializeChildrenFromFields(Arrays.asList(nonNullFieldType)); + nullBothVector.initializeChildrenFromFields(Arrays.asList(nullFieldType)); + + // Set up VSR + List vectors = Arrays.asList(nullEntriesVector, nullListVector, nullBothVector); + int rowCount = 4; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data for nullEntriesVector + FieldWriter nullEntriesWriter = nullEntriesVector.getWriter(); + nullEntriesWriter.startList(); + nullEntriesWriter.integer().writeNull(); + nullEntriesWriter.integer().writeNull(); + nullEntriesWriter.endList(); + nullEntriesWriter.startList(); + nullEntriesWriter.integer().writeInt(0); + nullEntriesWriter.integer().writeInt(0); + nullEntriesWriter.endList(); + nullEntriesWriter.startList(); + nullEntriesWriter.integer().writeInt(123); + nullEntriesWriter.integer().writeInt(456); + nullEntriesWriter.endList(); + nullEntriesWriter.startList(); + nullEntriesWriter.integer().writeInt(789); + nullEntriesWriter.integer().writeInt(789); + nullEntriesWriter.endList(); + + // Set test data for nullListVector + FieldWriter nullListWriter = nullListVector.getWriter(); + nullListWriter.writeNull(); + nullListWriter.setPosition(1); // writeNull() does not inc. idx() on list vector + nullListWriter.startList(); + nullListWriter.integer().writeInt(0); + nullListWriter.integer().writeInt(0); + nullListWriter.endList(); + nullEntriesWriter.startList(); + nullEntriesWriter.integer().writeInt(123); + nullEntriesWriter.integer().writeInt(456); + nullEntriesWriter.endList(); + nullEntriesWriter.startList(); + nullEntriesWriter.integer().writeInt(789); + nullEntriesWriter.integer().writeInt(789); + nullEntriesWriter.endList(); + + // Set test data for nullBothVector + FieldWriter nullBothWriter = nullBothVector.getWriter(); + nullBothWriter.writeNull(); + nullBothWriter.setPosition(1); + nullBothWriter.startList(); + nullBothWriter.integer().writeNull(); + nullBothWriter.integer().writeNull(); + nullBothWriter.endList(); + nullListWriter.startList(); + nullListWriter.integer().writeInt(0); + nullListWriter.integer().writeInt(0); + nullListWriter.endList(); + nullEntriesWriter.startList(); + nullEntriesWriter.integer().writeInt(123); + nullEntriesWriter.integer().writeInt(456); + nullEntriesWriter.endList(); + + // Update count for the vectors + nullListVector.setValueCount(4); + nullEntriesVector.setValueCount(4); + nullBothVector.setValueCount(4); + + File dataFile = new File(TMP, "testRoundTripNullableLists.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripMap() throws Exception { + + // Field definitions + FieldType intMapField = new FieldType(false, new ArrowType.Map(false), null); + FieldType stringMapField = new FieldType(false, new ArrowType.Map(false), null); + FieldType dateMapField = new FieldType(false, new ArrowType.Map(false), null); + + Field keyField = new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null); + Field intField = new Field("value", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Field stringField = new Field("value", FieldType.notNullable(new ArrowType.Utf8()), null); + Field dateField = + new Field("value", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null); + + Field intEntryField = + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(keyField, intField)); + Field stringEntryField = + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(keyField, stringField)); + Field dateEntryField = + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(keyField, dateField)); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + MapVector intMapVector = new MapVector("intMap", allocator, intMapField, null); + MapVector stringMapVector = new MapVector("stringMap", allocator, stringMapField, null); + MapVector dateMapVector = new MapVector("dateMap", allocator, dateMapField, null); + + intMapVector.initializeChildrenFromFields(Arrays.asList(intEntryField)); + stringMapVector.initializeChildrenFromFields(Arrays.asList(stringEntryField)); + dateMapVector.initializeChildrenFromFields(Arrays.asList(dateEntryField)); + + // Set up VSR + List vectors = Arrays.asList(intMapVector, stringMapVector, dateMapVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Total number of entries that will be writen to each vector + int entryCount = 5 + 4 + 3; + + // Set test data for intList + BaseWriter.MapWriter writer = intMapVector.getWriter(); + for (int i = 0; i < rowCount; i++) { + writer.startMap(); + for (int j = 0; j < 5 - i; j++) { + writer.startEntry(); + writer.key().varChar().writeVarChar("key" + j); + writer.value().integer().writeInt(j); + writer.endEntry(); + } + writer.endMap(); + } + + // Update count for data vector (map writer does not do this) + intMapVector.getDataVector().setValueCount(entryCount); + + // Set test data for stringList + BaseWriter.MapWriter stringWriter = stringMapVector.getWriter(); + for (int i = 0; i < rowCount; i++) { + stringWriter.startMap(); + for (int j = 0; j < 5 - i; j++) { + stringWriter.startEntry(); + stringWriter.key().varChar().writeVarChar("key" + j); + stringWriter.value().varChar().writeVarChar("string" + j); + stringWriter.endEntry(); + } + stringWriter.endMap(); + } + + // Update count for the vectors + intMapVector.setValueCount(rowCount); + stringMapVector.setValueCount(rowCount); + dateMapVector.setValueCount(rowCount); + + // Update count for data vector (map writer does not do this) + stringMapVector.getDataVector().setValueCount(entryCount); + + // Set test data for dateList + BaseWriter.MapWriter dateWriter = dateMapVector.getWriter(); + for (int i = 0; i < rowCount; i++) { + dateWriter.startMap(); + for (int j = 0; j < 5 - i; j++) { + dateWriter.startEntry(); + dateWriter.key().varChar().writeVarChar("key" + j); + dateWriter.value().dateDay().writeDateDay((int) LocalDate.now().plusDays(j).toEpochDay()); + dateWriter.endEntry(); + } + dateWriter.endMap(); + } + + // Update count for data vector (map writer does not do this) + dateMapVector.getDataVector().setValueCount(entryCount); + + File dataFile = new File(TMP, "testRoundTripMap.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableMap() throws Exception { + + // Field definitions + FieldType nullMapType = new FieldType(true, new ArrowType.Map(false), null); + FieldType nonNullMapType = new FieldType(false, new ArrowType.Map(false), null); + + Field keyField = new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null); + Field nullFieldType = new Field("value", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field nonNullFieldType = + new Field("value", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Field nullEntryField = + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(keyField, nullFieldType)); + Field nonNullEntryField = + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList(keyField, nonNullFieldType)); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + MapVector nullEntriesVector = + new MapVector("nullEntriesVector", allocator, nonNullMapType, null); + MapVector nullMapVector = new MapVector("nullMapVector", allocator, nullMapType, null); + MapVector nullBothVector = new MapVector("nullBothVector", allocator, nullMapType, null); + + nullEntriesVector.initializeChildrenFromFields(Arrays.asList(nullEntryField)); + nullMapVector.initializeChildrenFromFields(Arrays.asList(nonNullEntryField)); + nullBothVector.initializeChildrenFromFields(Arrays.asList(nullEntryField)); + + // Set up VSR + List vectors = Arrays.asList(nullEntriesVector, nullMapVector, nullBothVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data for intList + BaseWriter.MapWriter writer = nullEntriesVector.getWriter(); + writer.startMap(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key0"); + writer.value().integer().writeNull(); + writer.endEntry(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key1"); + writer.value().integer().writeNull(); + writer.endEntry(); + writer.endMap(); + writer.startMap(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key2"); + writer.value().integer().writeInt(0); + writer.endEntry(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key3"); + writer.value().integer().writeInt(0); + writer.endEntry(); + writer.endMap(); + writer.startMap(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key4"); + writer.value().integer().writeInt(123); + writer.endEntry(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key5"); + writer.value().integer().writeInt(456); + writer.endEntry(); + writer.endMap(); + + // Set test data for stringList + BaseWriter.MapWriter nullMapWriter = nullMapVector.getWriter(); + nullMapWriter.writeNull(); + nullMapWriter.setPosition(1); // writeNull() does not inc. idx() on map (list) vector + nullMapWriter.startMap(); + nullMapWriter.startEntry(); + nullMapWriter.key().varChar().writeVarChar("key2"); + nullMapWriter.value().integer().writeInt(0); + nullMapWriter.endEntry(); + writer.startMap(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key3"); + writer.value().integer().writeInt(0); + writer.endEntry(); + nullMapWriter.endMap(); + nullMapWriter.startMap(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key4"); + writer.value().integer().writeInt(123); + writer.endEntry(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key5"); + writer.value().integer().writeInt(456); + writer.endEntry(); + nullMapWriter.endMap(); + + // Set test data for dateList + BaseWriter.MapWriter nullBothWriter = nullBothVector.getWriter(); + nullBothWriter.writeNull(); + nullBothWriter.setPosition(1); + nullBothWriter.startMap(); + nullBothWriter.startEntry(); + nullBothWriter.key().varChar().writeVarChar("key2"); + nullBothWriter.value().integer().writeNull(); + nullBothWriter.endEntry(); + nullBothWriter.startEntry(); + nullBothWriter.key().varChar().writeVarChar("key3"); + nullBothWriter.value().integer().writeNull(); + nullBothWriter.endEntry(); + nullBothWriter.endMap(); + nullBothWriter.startMap(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key4"); + writer.value().integer().writeInt(123); + writer.endEntry(); + writer.startEntry(); + writer.key().varChar().writeVarChar("key5"); + writer.value().integer().writeInt(456); + writer.endEntry(); + nullBothWriter.endMap(); + + // Update count for the vectors + nullEntriesVector.setValueCount(3); + nullMapVector.setValueCount(3); + nullBothVector.setValueCount(3); + + File dataFile = new File(TMP, "testRoundTripNullableMap.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripStruct() throws Exception { + + // Field definitions + FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), null); + Field intField = + new Field("intField", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Field stringField = new Field("stringField", FieldType.notNullable(new ArrowType.Utf8()), null); + Field dateField = + new Field("dateField", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null); + + // Create empty vector + BufferAllocator allocator = new RootAllocator(); + StructVector structVector = new StructVector("struct", allocator, structFieldType, null); + structVector.initializeChildrenFromFields(Arrays.asList(intField, stringField, dateField)); + + // Set up VSR + List vectors = Arrays.asList(structVector); + int rowCount = 3; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data + BaseWriter.StructWriter structWriter = structVector.getWriter(); + + for (int i = 0; i < rowCount; i++) { + structWriter.start(); + structWriter.integer("intField").writeInt(i); + structWriter.varChar("stringField").writeVarChar("string" + i); + structWriter.dateDay("dateField").writeDateDay((int) LocalDate.now().toEpochDay() + i); + structWriter.end(); + } + + File dataFile = new File(TMP, "testRoundTripStruct.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } + + @Test + public void testRoundTripNullableStructs() throws Exception { + + // Field definitions + FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), null); + FieldType nullableStructFieldType = new FieldType(true, new ArrowType.Struct(), null); + Field intField = + new Field("intField", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Field nullableIntField = + new Field("nullableIntField", FieldType.nullable(new ArrowType.Int(32, true)), null); + + // Create empty vectors + BufferAllocator allocator = new RootAllocator(); + StructVector structVector = new StructVector("struct", allocator, structFieldType, null); + StructVector nullableStructVector = + new StructVector("nullableStruct", allocator, nullableStructFieldType, null); + structVector.initializeChildrenFromFields(Arrays.asList(intField, nullableIntField)); + nullableStructVector.initializeChildrenFromFields(Arrays.asList(intField, nullableIntField)); + + // Set up VSR + List vectors = Arrays.asList(structVector, nullableStructVector); + int rowCount = 4; + + try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) { + + root.setRowCount(rowCount); + root.allocateNew(); + + // Set test data for structVector + BaseWriter.StructWriter structWriter = structVector.getWriter(); + for (int i = 0; i < rowCount; i++) { + structWriter.setPosition(i); + structWriter.start(); + structWriter.integer("intField").writeInt(i); + if (i % 2 == 0) { + structWriter.integer("nullableIntField").writeInt(i * 10); + } else { + structWriter.integer("nullableIntField").writeNull(); + } + structWriter.end(); + } + + // Set test data for nullableStructVector + BaseWriter.StructWriter nullableStructWriter = nullableStructVector.getWriter(); + for (int i = 0; i < rowCount; i++) { + nullableStructWriter.setPosition(i); + if (i >= 2) { + nullableStructWriter.start(); + nullableStructWriter.integer("intField").writeInt(i); + if (i % 2 == 0) { + nullableStructWriter.integer("nullableIntField").writeInt(i * 10); + } else { + nullableStructWriter.integer("nullableIntField").writeNull(); + } + nullableStructWriter.end(); + } else { + nullableStructWriter.writeNull(); + } + } + + // Update count for the vector + structVector.setValueCount(rowCount); + nullableStructVector.setValueCount(rowCount); + + File dataFile = new File(TMP, "testRoundTripNullableStructs.avro"); + + roundTripTest(root, allocator, dataFile, rowCount); + } + } +} diff --git a/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/RoundTripSchemaTest.java b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/RoundTripSchemaTest.java new file mode 100644 index 0000000000..864e2c8b59 --- /dev/null +++ b/adapter/avro/src/test/java/org/apache/arrow/adapter/avro/RoundTripSchemaTest.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adapter.avro; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; + +public class RoundTripSchemaTest { + + private void doRoundTripTest(List fields) { + + AvroToArrowConfig config = new AvroToArrowConfig(null, 1, null, Collections.emptySet(), false); + + Schema avroSchema = ArrowToAvroUtils.createAvroSchema(fields, "TestRecord"); + org.apache.arrow.vector.types.pojo.Schema arrowSchema = + AvroToArrowUtils.createArrowSchema(avroSchema, config); + + // Compare string representations - equality not defined for logical types + assertEquals(fields, arrowSchema.getFields()); + } + + // Schema round trip for primitive types, nullable and non-nullable + + @Test + public void testRoundTripNullType() { + + List fields = + Arrays.asList(new Field("nullType", FieldType.notNullable(new ArrowType.Null()), null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripBooleanType() { + + List fields = + Arrays.asList( + new Field("nullableBool", FieldType.nullable(new ArrowType.Bool()), null), + new Field("nonNullableBool", FieldType.notNullable(new ArrowType.Bool()), null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripIntegerTypes() { + + AvroToArrowConfig config = new AvroToArrowConfig(null, 1, null, Collections.emptySet(), false); + + // Only round trip types with direct equivalent in Avro + + List fields = + Arrays.asList( + new Field("nullableInt32", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("nonNullableInt32", FieldType.notNullable(new ArrowType.Int(32, true)), null), + new Field("nullableInt64", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field( + "nonNullableInt64", FieldType.notNullable(new ArrowType.Int(64, true)), null)); + + Schema avroSchema = ArrowToAvroUtils.createAvroSchema(fields, "TestRecord"); + org.apache.arrow.vector.types.pojo.Schema arrowSchema = + AvroToArrowUtils.createArrowSchema(avroSchema, config); + + // Exact match on fields after round trip + assertEquals(fields, arrowSchema.getFields()); + } + + @Test + public void testRoundTripFloatingPointTypes() { + + // Only round trip types with direct equivalent in Avro + + List fields = + Arrays.asList( + new Field( + "nullableFloat32", + FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), + null), + new Field( + "nonNullableFloat32", + FieldType.notNullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), + null), + new Field( + "nullableFloat64", + FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null), + new Field( + "nonNullableFloat64", + FieldType.notNullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripStringTypes() { + + List fields = + Arrays.asList( + new Field("nullableUtf8", FieldType.nullable(new ArrowType.Utf8()), null), + new Field("nonNullableUtf8", FieldType.notNullable(new ArrowType.Utf8()), null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripBinaryTypes() { + + List fields = + Arrays.asList( + new Field("nullableBinary", FieldType.nullable(new ArrowType.Binary()), null), + new Field("nonNullableBinary", FieldType.notNullable(new ArrowType.Binary()), null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripFixedSizeBinaryTypes() { + + List fields = + Arrays.asList( + new Field( + "nullableFixedSizeBinary", + FieldType.nullable(new ArrowType.FixedSizeBinary(10)), + null), + new Field( + "nonNullableFixedSizeBinary", + FieldType.notNullable(new ArrowType.FixedSizeBinary(10)), + null)); + + doRoundTripTest(fields); + } + + // Schema round trip for logical types, nullable and non-nullable + + @Test + public void testRoundTripDecimalTypes() { + + List fields = + Arrays.asList( + new Field( + "nullableDecimal128", FieldType.nullable(new ArrowType.Decimal(10, 2, 128)), null), + new Field( + "nonNullableDecimal1281", + FieldType.notNullable(new ArrowType.Decimal(10, 2, 128)), + null), + new Field( + "nonNullableDecimal1282", + FieldType.notNullable(new ArrowType.Decimal(15, 5, 128)), + null), + new Field( + "nonNullableDecimal1283", + FieldType.notNullable(new ArrowType.Decimal(20, 10, 128)), + null), + new Field( + "nullableDecimal256", FieldType.nullable(new ArrowType.Decimal(55, 15, 256)), null), + new Field( + "nonNullableDecimal2561", + FieldType.notNullable(new ArrowType.Decimal(55, 25, 256)), + null), + new Field( + "nonNullableDecimal2562", + FieldType.notNullable(new ArrowType.Decimal(25, 8, 256)), + null), + new Field( + "nonNullableDecimal2563", + FieldType.notNullable(new ArrowType.Decimal(60, 50, 256)), + null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripDateTypes() { + + List fields = + Arrays.asList( + new Field( + "nullableDateDay", FieldType.nullable(new ArrowType.Date(DateUnit.DAY)), null), + new Field( + "nonNullableDateDay", + FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), + null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripTimeTypes() { + + List fields = + Arrays.asList( + new Field( + "nullableTimeMillis", + FieldType.nullable(new ArrowType.Time(TimeUnit.MILLISECOND, 32)), + null), + new Field( + "nonNullableTimeMillis", + FieldType.notNullable(new ArrowType.Time(TimeUnit.MILLISECOND, 32)), + null), + new Field( + "nullableTimeMicros", + FieldType.nullable(new ArrowType.Time(TimeUnit.MICROSECOND, 64)), + null), + new Field( + "nonNullableTimeMicros", + FieldType.notNullable(new ArrowType.Time(TimeUnit.MICROSECOND, 64)), + null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripZoneAwareTimestampTypes() { + + List fields = + Arrays.asList( + new Field( + "nullableTimestampMillisTz", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null), + new Field( + "nonNullableTimestampMillisTz", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null), + new Field( + "nullableTimestampMicrosTz", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), + null), + new Field( + "nonNullableTimestampMicrosTz", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), + null), + new Field( + "nullableTimestampNanosTz", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")), + null), + new Field( + "nonNullableTimestampNanosTz", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")), + null)); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripLocalTimestampTypes() { + + List fields = + Arrays.asList( + new Field( + "nullableTimestampMillis", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), + null), + new Field( + "nonNullableTimestampMillis", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), + null), + new Field( + "nullableTimestampMicros", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), + null), + new Field( + "nonNullableTimestampMicros", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), + null), + new Field( + "nullableTimestampNanos", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, null)), + null), + new Field( + "nonNullableTimestampNanos", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, null)), + null)); + + doRoundTripTest(fields); + } + + // Schema round trip for complex types, where the contents are primitive and logical types + + @Test + public void testRoundTripListType() { + + List fields = + Arrays.asList( + new Field( + "nullableIntList", + FieldType.nullable(new ArrowType.List()), + Arrays.asList( + new Field("$data$", FieldType.nullable(new ArrowType.Int(32, true)), null))), + new Field( + "nullableDoubleList", + FieldType.nullable(new ArrowType.List()), + Arrays.asList( + new Field( + "$data$", + FieldType.notNullable( + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null))), + new Field( + "nonNullableDecimalList", + FieldType.notNullable(new ArrowType.List()), + Arrays.asList( + new Field( + "$data$", FieldType.nullable(new ArrowType.Decimal(10, 2, 128)), null))), + new Field( + "nonNullableTimestampList", + FieldType.notNullable(new ArrowType.List()), + Arrays.asList( + new Field( + "$data$", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null)))); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripMapType() { + + List fields = + Arrays.asList( + new Field( + "nullableMapWithNullableInt", + FieldType.nullable(new ArrowType.Map(false)), + Arrays.asList( + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList( + new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null), + new Field( + "value", FieldType.nullable(new ArrowType.Int(32, true)), null))))), + new Field( + "nullableMapWithNonNullableDouble", + FieldType.nullable(new ArrowType.Map(false)), + Arrays.asList( + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList( + new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null), + new Field( + "value", + FieldType.notNullable( + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null))))), + new Field( + "nonNullableMapWithNullableDecimal", + FieldType.notNullable(new ArrowType.Map(false)), + Arrays.asList( + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList( + new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null), + new Field( + "value", + FieldType.nullable(new ArrowType.Decimal(10, 2, 128)), + null))))), + new Field( + "nonNullableMapWithNonNullableTimestamp", + FieldType.notNullable(new ArrowType.Map(false)), + Arrays.asList( + new Field( + "entries", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList( + new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null), + new Field( + "value", + FieldType.notNullable( + new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null)))))); + + doRoundTripTest(fields); + } + + @Test + public void testRoundTripStructType() { + + List fields = + Arrays.asList( + new Field( + "nullableRecord", + FieldType.nullable(new ArrowType.Struct()), + Arrays.asList( + new Field("field1", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field( + "field2", + FieldType.notNullable( + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null), + new Field( + "field3", FieldType.nullable(new ArrowType.Decimal(10, 2, 128)), null), + new Field( + "field4", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null))), + new Field( + "nonNullableRecord", + FieldType.notNullable(new ArrowType.Struct()), + Arrays.asList( + new Field("field1", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field( + "field2", + FieldType.notNullable( + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null), + new Field( + "field3", FieldType.nullable(new ArrowType.Decimal(10, 2, 128)), null), + new Field( + "field4", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null)))); + + doRoundTripTest(fields); + } +} diff --git a/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc b/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc index 18d7d63fc7..c1867811c7 100644 --- a/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc +++ b/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc @@ -20,6 +20,6 @@ "name": "test", "type": "bytes", "logicalType" : "decimal", - "precision": 39, + "precision": 77, "scale": 2 } diff --git a/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_micros.avsc b/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_micros.avsc new file mode 100644 index 0000000000..db456e8a84 --- /dev/null +++ b/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_micros.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "long", + "logicalType" : "local-timestamp-micros" +} diff --git a/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_millis.avsc b/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_millis.avsc new file mode 100644 index 0000000000..6a3cf9bccb --- /dev/null +++ b/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_millis.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "long", + "logicalType" : "local-timestamp-millis" +} diff --git a/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_nanos.avsc b/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_nanos.avsc new file mode 100644 index 0000000000..96ca8bbfa4 --- /dev/null +++ b/adapter/avro/src/test/resources/schema/logical/test_local_timestamp_nanos.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "long", + "logicalType" : "local-timestamp-nanos" +} diff --git a/adapter/avro/src/test/resources/schema/logical/test_timestamp_nanos.avsc b/adapter/avro/src/test/resources/schema/logical/test_timestamp_nanos.avsc new file mode 100644 index 0000000000..9e05eab408 --- /dev/null +++ b/adapter/avro/src/test/resources/schema/logical/test_timestamp_nanos.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "long", + "logicalType" : "timestamp-nanos" +}