Skip to content

Commit d04986f

Browse files
clairemcgintyFokko
authored andcommitted
GH-2992: Gate LocalTimestamp references in AvroSchemaConverter (#2993)
* PARQUET-2992: Gate LocalTimestamp references in AvroSchemaConverter * GH-2992: Test logical type conversion for different Avro versions
1 parent 7204a11 commit d04986f

File tree

2 files changed

+81
-6
lines changed

2 files changed

+81
-6
lines changed

parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.util.Optional.of;
2323
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
2424
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
25+
import static org.apache.parquet.avro.AvroRecordConverter.getRuntimeAvroVersion;
2526
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
2627
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
2728
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
@@ -488,15 +489,20 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
488489
return timeType(true, MICROS);
489490
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
490491
return timestampType(true, MILLIS);
491-
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
492-
return timestampType(false, MILLIS);
493492
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
494493
return timestampType(true, MICROS);
495-
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
496-
return timestampType(false, MICROS);
497494
} else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) {
498495
return uuidType();
499496
}
497+
498+
if (avroVersionSupportsLocalTimestampTypes()) {
499+
if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
500+
return timestampType(false, MILLIS);
501+
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
502+
return timestampType(false, MICROS);
503+
}
504+
}
505+
500506
return null;
501507
}
502508

@@ -538,7 +544,7 @@ public Optional<LogicalType> visit(
538544
LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
539545
boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();
540546

541-
if (isAdjustedToUTC) {
547+
if (isAdjustedToUTC || !avroVersionSupportsLocalTimestampTypes()) {
542548
switch (unit) {
543549
case MILLIS:
544550
return of(LogicalTypes.timestampMillis());
@@ -605,4 +611,14 @@ private static String namespace(String name, Map<String, Integer> names) {
605611
Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue + 1);
606612
return nameCount > 1 ? name + nameCount : null;
607613
}
614+
615+
/* Avro <= 1.9 does not support conversions to LocalTimestamp{Micros, Millis} classes */
616+
private static boolean avroVersionSupportsLocalTimestampTypes() {
617+
final String avroVersion = getRuntimeAvroVersion();
618+
619+
return avroVersion == null
620+
|| !(avroVersion.startsWith("1.7.")
621+
|| avroVersion.startsWith("1.8.")
622+
|| avroVersion.startsWith("1.9."));
623+
}
608624
}

parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
4545
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
4646
import static org.junit.Assert.assertEquals;
47+
import static org.mockito.Mockito.CALLS_REAL_METHODS;
4748

49+
import com.google.common.collect.ImmutableSet;
4850
import com.google.common.collect.Lists;
4951
import com.google.common.io.Resources;
5052
import java.util.Arrays;
@@ -53,19 +55,34 @@
5355
import org.apache.avro.LogicalTypes;
5456
import org.apache.avro.Schema;
5557
import org.apache.hadoop.conf.Configuration;
58+
import org.apache.parquet.schema.LogicalTypeAnnotation;
5659
import org.apache.parquet.schema.MessageType;
5760
import org.apache.parquet.schema.MessageTypeParser;
5861
import org.apache.parquet.schema.PrimitiveType;
5962
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
63+
import org.apache.parquet.schema.Type;
6064
import org.apache.parquet.schema.Types;
6165
import org.junit.Assert;
66+
import org.junit.Before;
6267
import org.junit.BeforeClass;
6368
import org.junit.Test;
64-
69+
import org.junit.runner.RunWith;
70+
import org.mockito.Mockito;
71+
import org.powermock.api.mockito.PowerMockito;
72+
import org.powermock.core.classloader.annotations.PrepareForTest;
73+
import org.powermock.modules.junit4.PowerMockRunner;
74+
75+
@RunWith(PowerMockRunner.class)
76+
@PrepareForTest(AvroRecordConverter.class)
6577
public class TestAvroSchemaConverter {
6678

6779
private static final Configuration NEW_BEHAVIOR = new Configuration(false);
6880

81+
@Before
82+
public void setupMockito() {
83+
PowerMockito.mockStatic(AvroRecordConverter.class, CALLS_REAL_METHODS);
84+
}
85+
6986
@BeforeClass
7087
public static void setupConf() {
7188
NEW_BEHAVIOR.setBoolean("parquet.avro.add-list-element-records", false);
@@ -665,6 +682,27 @@ public void testTimestampMillisType() throws Exception {
665682
testRoundTripConversion(
666683
expected, "message myrecord {\n" + " required int64 timestamp (TIMESTAMP(MILLIS,true));\n" + "}\n");
667684

685+
// Test that conversions for timestamp types only use APIs that are available in the user's Avro version
686+
for (String avroVersion : ImmutableSet.of("1.7.0", "1.8.0", "1.9.0", "1.10.0", "1.11.0")) {
687+
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn(avroVersion);
688+
final Schema converted = new AvroSchemaConverter()
689+
.convert(Types.buildMessage()
690+
.addField(Types.primitive(INT64, Type.Repetition.REQUIRED)
691+
.as(LogicalTypeAnnotation.timestampType(
692+
false, LogicalTypeAnnotation.TimeUnit.MILLIS))
693+
.length(1)
694+
.named("timestamp_type"))
695+
.named("TestAvro"));
696+
697+
assertEquals(
698+
avroVersion.matches("1\\.[789]\\.\\d+") ? "timestamp-millis" : "local-timestamp-millis",
699+
converted
700+
.getField("timestamp_type")
701+
.schema()
702+
.getLogicalType()
703+
.getName());
704+
}
705+
668706
for (PrimitiveTypeName primitive :
669707
new PrimitiveTypeName[] {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
670708
final PrimitiveType type;
@@ -729,6 +767,27 @@ public void testTimestampMicrosType() throws Exception {
729767
IllegalArgumentException.class,
730768
() -> new AvroSchemaConverter().convert(message(type)));
731769
}
770+
771+
// Test that conversions for timestamp types only use APIs that are available in the user's Avro version
772+
for (String avroVersion : ImmutableSet.of("1.7.0", "1.8.0", "1.9.0", "1.10.0", "1.11.0")) {
773+
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn(avroVersion);
774+
final Schema converted = new AvroSchemaConverter()
775+
.convert(Types.buildMessage()
776+
.addField(Types.primitive(INT64, Type.Repetition.REQUIRED)
777+
.as(LogicalTypeAnnotation.timestampType(
778+
false, LogicalTypeAnnotation.TimeUnit.MICROS))
779+
.length(1)
780+
.named("timestamp_type"))
781+
.named("TestAvro"));
782+
783+
assertEquals(
784+
avroVersion.matches("1\\.[789]\\.\\d+") ? "timestamp-micros" : "local-timestamp-micros",
785+
converted
786+
.getField("timestamp_type")
787+
.schema()
788+
.getLogicalType()
789+
.getName());
790+
}
732791
}
733792

734793
@Test

0 commit comments

Comments
 (0)