From 58ab077b8afe9e875e7da1a5d253fa00b6aa21c0 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 5 Dec 2024 13:39:04 -0500 Subject: [PATCH 01/15] PARQUET-34: implement Size() filter for repeated columns --- .../filter2/predicate/ContainsRewriter.java | 6 + .../parquet/filter2/predicate/FilterApi.java | 5 + .../filter2/predicate/FilterPredicate.java | 5 + .../predicate/LogicalInverseRewriter.java | 6 + .../filter2/predicate/LogicalInverter.java | 14 ++ .../parquet/filter2/predicate/Operators.java | 76 +++++++ .../SchemaCompatibilityValidator.java | 25 ++- .../IncrementallyUpdatedFilterPredicate.java | 69 ++++++ ...allyUpdatedFilterPredicateBuilderBase.java | 18 ++ .../columnindex/ColumnIndexBuilder.java | 6 + .../columnindex/ColumnIndexFilter.java | 6 + .../predicate/TestLogicalInverter.java | 10 + .../TestSchemaCompatibilityValidator.java | 43 ++++ .../columnindex/TestColumnIndexFilter.java | 9 + ...ntallyUpdatedFilterPredicateGenerator.java | 32 +++ .../bloomfilterlevel/BloomFilterImpl.java | 5 + .../dictionarylevel/DictionaryFilter.java | 33 +++ .../statisticslevel/StatisticsFilter.java | 66 ++++++ .../dictionarylevel/DictionaryFilterTest.java | 26 +++ .../recordlevel/TestRecordLevelFilters.java | 41 +++- .../statisticslevel/TestStatisticsFilter.java | 207 +++++++++++++++++- 21 files changed, 696 insertions(+), 12 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java index 5050ee8f01..ba999fb1d0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java @@ -33,6 +33,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -97,6 +98,11 @@ public > FilterPredicate visit(Contains contains) { return contains; } + @Override + public FilterPredicate visit(Size size) { + return size; + } + @Override public FilterPredicate visit(And and) { final FilterPredicate left; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index 3c51680667..787f0cf90c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -40,6 +40,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.SingleColumnFilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.SupportsEqNotEq; import org.apache.parquet.filter2.predicate.Operators.SupportsLtGt; import org.apache.parquet.filter2.predicate.Operators.UserDefined; @@ -263,6 +264,10 @@ public static , P extends SingleColumnFilterPredicate return Contains.of(pred); } + public static Size size(Column column, Size.Operator operator, int value) { + return new Size(column, operator, value); + } + /** * Keeps records that pass the provided {@link UserDefinedPredicate} *

diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java index a662bb0b17..5e75a3bc6a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java @@ -31,6 +31,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -89,6 +90,10 @@ default > R visit(Contains contains) { throw new UnsupportedOperationException("visit Contains is not supported."); } + default R visit(Size size) { + throw new UnsupportedOperationException("visit Size is not supported."); + } + R visit(And and); R visit(Or or); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java index d1d7f07e80..d4817460c6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java @@ -36,6 +36,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -104,6 +105,11 @@ public > FilterPredicate visit(Contains contains) { return contains; } + @Override + public FilterPredicate visit(Size size) { + return size; + } + @Override public FilterPredicate visit(And and) { return and(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java index 506b8f0e56..9ffbae069d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java @@ -33,6 +33,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -98,6 +99,19 @@ public > FilterPredicate visit(Contains contains) { return contains.not(); } + @Override + public FilterPredicate visit(Size size) { + final long value = size.getValue(); + final Operators.Column column = size.getColumn(); + + return size.filter( + (eq) -> new Or(new Size(column, Size.Operator.LT, value), new Size(column, Size.Operator.GT, value)), + (lt) -> new Size(column, Size.Operator.GTE, value), + (ltEq) -> new Size(column, Size.Operator.GT, value), + (gt) -> new Size(column, Size.Operator.LTE, value), + (gtEq) -> new Size(column, Size.Operator.LT, value)); + } + @Override public FilterPredicate visit(And and) { return new Or(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 60dc80cd7b..16df708c54 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -505,6 +505,82 @@ public R filter( } } + public static final class Size implements FilterPredicate, Serializable { + public enum Operator { + EQ, + LT, + LTE, + GT, + GTE + } + + private final Column column; + private final Operator operator; + private final long value; + + Size(Column column, Operator operator, long value) { + this.column = column; + this.operator = operator; + if (value < 0) { + throw new IllegalArgumentException("Argument to size() operator cannot be negative: " + value); + } + this.value = value; + } + + @Override + public R accept(Visitor visitor) { + return visitor.visit(this); + } + + public long getValue() { + return value; + } + + public Column getColumn() { + return column; + } + + public R filter( + Function onEq, + Function onLt, + Function onLtEq, + Function onGt, + Function onGtEq) { + if (operator == Operator.EQ) { + return onEq.apply(value); + } else if (operator == Operator.LT) { + return onLt.apply(value); + } else if (operator == Operator.LTE) { + return onLtEq.apply(value); + } else if (operator == Operator.GT) { + return onGt.apply(value); + } else if (operator == Operator.GTE) { + return onGtEq.apply(value); + } else { + throw new UnsupportedOperationException("Operator " + operator + " cannot be used with size() filter"); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + return column.equals(((Size) o).column) && operator == ((Size) o).operator && value == ((Size) o).value; + } + + @Override + public int hashCode() { + return Objects.hash(column, operator, value); + } + + @Override + public String toString() { + String name = Size.class.getSimpleName().toLowerCase(Locale.ENGLISH); + return name + "(" + operator.toString().toLowerCase() + " " + value + ")"; + } + } + public static final class NotIn> extends SetColumnFilterPredicate { NotIn(Column column, Set values) { diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java index b5708a4a0c..4b94d20b13 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -38,9 +38,11 @@ import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; /** * Inspects the column types found in the provided {@link FilterPredicate} and compares them @@ -135,6 +137,12 @@ public > Void visit(Contains pred) { return null; } + @Override + public Void visit(Size size) { + validateColumn(size.getColumn(), true, true); + return null; + } + @Override public Void visit(And and) { and.getLeft().accept(this); @@ -175,14 +183,15 @@ private > void validateColumnFilterPredicate(SetColumnFi } private > void validateColumnFilterPredicate(Contains pred) { - validateColumn(pred.getColumn(), true); + validateColumn(pred.getColumn(), true, false); } private > void validateColumn(Column column) { - validateColumn(column, false); + validateColumn(column, false, false); } - private > void validateColumn(Column column, boolean shouldBeRepeated) { + private > void validateColumn( + Column column, boolean isRepeatedColumn, boolean mustBeRequired) { ColumnPath path = column.getColumnPath(); Class alreadySeen = columnTypesEncountered.get(path); @@ -204,15 +213,21 @@ private > void validateColumn(Column column, boolean return; } - if (shouldBeRepeated && descriptor.getMaxRepetitionLevel() == 0) { + if (isRepeatedColumn && descriptor.getMaxRepetitionLevel() == 0) { throw new IllegalArgumentException( "FilterPredicate for column " + path.toDotString() + " requires a repeated " + "schema, but found max repetition level " + descriptor.getMaxRepetitionLevel()); - } else if (!shouldBeRepeated && descriptor.getMaxRepetitionLevel() > 0) { + } else if (!isRepeatedColumn && descriptor.getMaxRepetitionLevel() > 0) { throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. " + "Column " + path.toDotString() + " is repeated."); } + if (mustBeRequired && descriptor.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL)) { + throw new IllegalArgumentException("FilterPredicate for column " + path.toDotString() + + " requires schema to have repetition REQUIRED, but found " + + descriptor.getPrimitiveType().getRepetition() + "."); + } + ValidTypeMap.assertTypeValid(column, descriptor.getType()); } diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index c2aab2b6bf..96550f627f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Objects; +import java.util.function.Function; import org.apache.parquet.io.api.Binary; /** @@ -223,6 +224,74 @@ public void reset() { } } + class CountingValueInspector extends ValueInspector { + private long observedValueCount; + private final ValueInspector delegate; + private final Function shouldUpdateDelegate; + + public CountingValueInspector(ValueInspector delegate, Function shouldUpdateDelegate) { + this.observedValueCount = 0; + this.delegate = delegate; + this.shouldUpdateDelegate = shouldUpdateDelegate; + } + + @Override + public void updateNull() { + delegate.update(observedValueCount); + if (!delegate.isKnown()) { + delegate.updateNull(); + } + setResult(delegate.getResult()); + } + + @Override + public void update(int value) { + incrementCount(); + } + + @Override + public void update(long value) { + incrementCount(); + } + + @Override + public void update(double value) { + incrementCount(); + } + + @Override + public void update(float value) { + incrementCount(); + } + + @Override + public void update(boolean value) { + incrementCount(); + } + + @Override + public void update(Binary value) { + incrementCount(); + } + + @Override + public void reset() { + super.reset(); + delegate.reset(); + observedValueCount = 0; + } + + private void incrementCount() { + observedValueCount++; + if (!delegate.isKnown() && shouldUpdateDelegate.apply(observedValueCount)) { + delegate.update(observedValueCount); + if (delegate.isKnown()) { + setResult(delegate.getResult()); + } + } + } + } + // base class for and / or abstract static class BinaryLogical implements IncrementallyUpdatedFilterPredicate { private final IncrementallyUpdatedFilterPredicate left; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index 588d06300b..d820945722 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Map; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.Or; @@ -34,6 +36,8 @@ import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.PrimitiveColumnIO; import org.apache.parquet.schema.PrimitiveComparator; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; /** * The implementation of this abstract class is auto-generated by @@ -56,6 +60,8 @@ */ public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor { + static final Operators.LongColumn SIZE_PSUEDOCOLUMN = FilterApi.longColumn("SIZE"); + private boolean built = false; private final Map> valueInspectorsByColumn = new HashMap<>(); private final Map> comparatorsByColumn = new HashMap<>(); @@ -70,6 +76,13 @@ public IncrementallyUpdatedFilterPredicateBuilderBase(List le PrimitiveComparator comparator = descriptor.getPrimitiveType().comparator(); comparatorsByColumn.put(path, comparator); } + comparatorsByColumn.put( + SIZE_PSUEDOCOLUMN.getColumnPath(), + new PrimitiveType( + Type.Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.INT64, + SIZE_PSUEDOCOLUMN.getColumnPath().toDotString()) + .comparator()); } public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { @@ -80,6 +93,11 @@ public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { } protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) { + if (columnPath.equals(SIZE_PSUEDOCOLUMN.getColumnPath())) { + // do not add psuedocolumn to list of value inspectors + return; + } + List valueInspectors = valueInspectorsByColumn.get(columnPath); if (valueInspectors == null) { valueInspectors = new ArrayList<>(); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index ffbb82197b..7468b51ac0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -53,6 +53,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; @@ -378,6 +379,11 @@ public > PrimitiveIterator.OfInt visit(Contains conta indices -> IndexIterator.all(getPageCount())); } + @Override + public PrimitiveIterator.OfInt visit(Size size) { + return IndexIterator.all(getPageCount()); + } + @Override public , U extends UserDefinedPredicate> PrimitiveIterator.OfInt visit( UserDefined udp) { diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 8b6ee1f95d..ad07f3df89 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -39,6 +39,7 @@ import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -161,6 +162,11 @@ public > RowRanges visit(Contains contains) { return contains.filter(this, RowRanges::intersection, RowRanges::union, ranges -> allRows()); } + @Override + public RowRanges visit(Size size) { + return applyPredicate(size.getColumn(), ci -> ci.visit(size), allRows()); + } + @Override public , U extends UserDefinedPredicate> RowRanges visit(UserDefined udp) { return applyPredicate( diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java index 6a7f81a6c2..dde742bbc8 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java @@ -29,8 +29,10 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; +import static org.apache.parquet.filter2.predicate.Operators.Size.Operator; import static org.junit.Assert.assertEquals; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; @@ -87,5 +89,13 @@ public void testBaseCases() { @Test public void testComplex() { assertEquals(complexInverse, invert(complex)); + + assertEquals( + or(size(intColumn, Operator.LT, 5), size(intColumn, Operator.GT, 5)), + invert(size(intColumn, Operator.EQ, 5))); + assertEquals(size(intColumn, Operator.GTE, 5), invert(size(intColumn, Operator.LT, 5))); + assertEquals(size(intColumn, Operator.GT, 5), invert(size(intColumn, Operator.LTE, 5))); + assertEquals(size(intColumn, Operator.LTE, 5), invert(size(intColumn, Operator.GT, 5))); + assertEquals(size(intColumn, Operator.LT, 5), invert(size(intColumn, Operator.GTE, 5))); } } diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java index 47e9bdd5e6..8a44504adb 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java @@ -29,6 +29,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate; import static org.junit.Assert.assertEquals; @@ -47,6 +48,8 @@ public class TestSchemaCompatibilityValidator { private static final LongColumn longBar = longColumn("x.bar"); private static final IntColumn intBar = intColumn("x.bar"); private static final LongColumn lotsOfLongs = longColumn("lotsOfLongs"); + private static final BinaryColumn threeLevelList = binaryColumn("nestedGroup.threeLevelList.list.element"); + private static final BinaryColumn listOfOptionals = binaryColumn("listOfOptionals.list.element"); private static final String schemaString = "message Document {\n" + " required int32 a;\n" @@ -54,6 +57,18 @@ public class TestSchemaCompatibilityValidator { + " required binary c (UTF8);\n" + " required group x { required int32 bar; }\n" + " repeated int64 lotsOfLongs;\n" + + " optional group nestedGroup {\n" + + " required group threeLevelList (LIST) {\n" + + " repeated group list {\n" + + " required binary element (STRING);\n" + + " }\n" + + " }\n" + + " }\n" + + " required group listOfOptionals (LIST) {\n" + + " repeated group list {\n" + + " optional binary element (STRING);\n" + + " }\n" + + " }\n" + "}\n"; private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString); @@ -160,4 +175,32 @@ public void testNonRepeatedNotSupportedForContainsPredicates() { e.getMessage()); } } + + @Test + public void testSizePredicate() { + // Size predicate should succeed on repeated columns + try { + validate(size(lotsOfLongs, Operators.Size.Operator.LT, 10), schema); + validate(size(threeLevelList, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + fail("Valid repeated column predicates should not throw exceptions, but threw " + e); + } + + // Size predicate should fail on non-repeated columns and on non-optional list element types + try { + validate(size(intBar, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + assertEquals( + "FilterPredicate for column x.bar requires a repeated schema, but found max repetition level 0", + e.getMessage()); + } + + try { + validate(size(listOfOptionals, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + assertEquals( + "FilterPredicate for column listOfOptionals.list.element requires schema to have repetition REQUIRED, but found OPTIONAL.", + e.getMessage()); + } + } } diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java index 1574ce2474..f9c4e24b21 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java @@ -35,6 +35,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING; @@ -62,6 +63,7 @@ import java.util.stream.LongStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -970,6 +972,13 @@ public void testFiltering() { 11, 12, 13); + assertRows( + calculateRowRanges( + FilterCompat.get(size(intColumn("column6"), Operators.Size.Operator.GT, 5)), + STORE, + paths, + TOTAL_ROW_COUNT), + LongStream.range(0, 30).toArray()); } @Test diff --git a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index 7f66ce3821..a2eb218fc2 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -71,6 +71,7 @@ public void run() throws IOException { + "import java.util.Set;\n" + "\n" + "import org.apache.parquet.hadoop.metadata.ColumnPath;\n" + + "import org.apache.parquet.filter2.predicate.FilterApi;\n" + "import org.apache.parquet.filter2.predicate.FilterPredicate;\n" + "import org.apache.parquet.filter2.predicate.Operators;\n" + "import org.apache.parquet.filter2.predicate.Operators.Contains;\n" @@ -83,6 +84,7 @@ public void run() throws IOException { + "import org.apache.parquet.filter2.predicate.Operators.LtEq;\n" + "import org.apache.parquet.filter2.predicate.Operators.NotEq;\n" + "import org.apache.parquet.filter2.predicate.Operators.NotIn;\n" + + "import org.apache.parquet.filter2.predicate.Operators.Size;\n" + "import org.apache.parquet.filter2.predicate.Operators.UserDefined;\n" + "import org.apache.parquet.filter2.predicate.UserDefinedPredicate;\n" + "import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;\n" @@ -133,6 +135,8 @@ public void run() throws IOException { addContainsEnd(); addVisitEnd(); + addSizeCase(); + addVisitBegin("Lt"); for (TypeInfo info : TYPES) { addInequalityCase(info, "<", false); @@ -338,6 +342,30 @@ private void addUdpBegin() throws IOException { + "\n"); } + private void addSizeCase() throws IOException { + add(" @Override\n" + " public IncrementallyUpdatedFilterPredicate visit(Size size) {\n" + + " final ValueInspector delegate = (ValueInspector) size.filter(\n" + + " (onEq) -> visit(FilterApi.eq(SIZE_PSUEDOCOLUMN, onEq)),\n" + + " (lt) -> visit(FilterApi.lt(SIZE_PSUEDOCOLUMN, lt)),\n" + + " (lte) -> visit(FilterApi.ltEq(SIZE_PSUEDOCOLUMN, lte)),\n" + + " (gt) -> visit(FilterApi.gt(SIZE_PSUEDOCOLUMN, gt)),\n" + + " (gte) -> visit(FilterApi.gtEq(SIZE_PSUEDOCOLUMN, gte)));\n" + + "\n" + + " final ValueInspector valueInspector = new IncrementallyUpdatedFilterPredicate.CountingValueInspector(\n" + + " delegate,\n" + + " size.filter(\n" + + " (eqValue) -> (count) -> count > eqValue,\n" + + " (ltValue) -> (count) -> count >= ltValue,\n" + + " (lteValue) -> (count) -> count > lteValue,\n" + + " (gtValue) -> (count) -> count > gtValue,\n" + + " (gteValue) -> (count) -> count >= gteValue)\n" + + " );\n" + + "\n" + + " addValueInspector(size.getColumn().getColumnPath(), valueInspector);\n" + + " return valueInspector;\n" + + " }\n"); + } + private void addContainsInspectorVisitor(String op) throws IOException { add(" @Override\n" + " public > ContainsPredicate visit(" + op + " pred) {\n" @@ -499,6 +527,10 @@ private void addContainsBegin() throws IOException { + " @Override\n" + " public > ContainsPredicate visit(Contains contains) {\n" + " return contains.filter(this, ContainsAndPredicate::new, ContainsOrPredicate::new, ContainsPredicate::not);\n" + + " }\n" + "\n" + + " @Override\n" + + " public ContainsPredicate visit(Size size) {\n" + + " throw new UnsupportedOperationException(\"Unsupported predicate \" + size + \" cannot be used with contains()\");\n" + " }\n"); addContainsInspectorVisitor("Eq"); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java index 39babc0ac3..4ceb6c7bb3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java @@ -124,6 +124,11 @@ public > Boolean visit(Operators.Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Operators.Size size) { + return BLOCK_MIGHT_MATCH; + } + @Override public > Boolean visit(Operators.In in) { Set values = in.getValues(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index be4455eeba..27024f66c8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -48,6 +48,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -493,6 +494,38 @@ public > Boolean visit(Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Size size) { + ColumnChunkMetaData meta = getColumnChunk(size.getColumn().getColumnPath()); + + if (meta == null) { + // the column isn't in this file, so fail eq/gt/gte targeting size > 0 + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + try { + // We know the block has at most `dictSize` array element values + final Set dict = expandDictionary(meta); + if (dict == null) { + return BLOCK_MIGHT_MATCH; + } + int dictSize = dict.size(); + final boolean blockCannotMatch = size.filter( + (eq) -> eq > dictSize, + (lt) -> false, + (lte) -> false, + (gt) -> gt >= dictSize, + (gte) -> gte > dictSize); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + @Override public Boolean visit(And and) { return and.getLeft().accept(this) || and.getRight().accept(this); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index 4d7918c4f1..793fb0c8be 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Set; import org.apache.parquet.column.MinMax; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.Operators.And; @@ -40,6 +41,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -217,6 +219,70 @@ public > Boolean visit(Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Size size) { + final ColumnChunkMetaData metadata = getColumnChunk(size.getColumn().getColumnPath()); + if (metadata == null) { + // the column isn't in this file, so fail eq/gt/gte targeting size > 0 + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + final SizeStatistics stats = metadata.getSizeStatistics(); + final List repetitionLevelHistogram = stats.getRepetitionLevelHistogram(); + final List definitionLevelHistogram = stats.getDefinitionLevelHistogram(); + + if (repetitionLevelHistogram.isEmpty() || definitionLevelHistogram.isEmpty()) { + return BLOCK_MIGHT_MATCH; + } + + // If all values have repetition level 0, then no array has more than 1 element + if (repetitionLevelHistogram.size() == 1 + || repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0)) { + + // Null list fields are treated as having size 0 + if (( // all lists are nulls + definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0)) + || // all lists are size 0 + (definitionLevelHistogram.get(0) == 0 + && definitionLevelHistogram.subList(2, definitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { + + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + long maxDefinitionLevel = definitionLevelHistogram.get(definitionLevelHistogram.size() - 1); + + // If all repetition levels are zero and all definitions level are > MAX_DEFINITION_LEVEL - 1, all lists + // are of size 1 + if (definitionLevelHistogram.stream().allMatch(l -> l > maxDefinitionLevel - 1)) { + final boolean blockCannotMatch = size.filter( + (eq) -> eq != 1, (lt) -> lt <= 1, (lte) -> lte < 1, (gt) -> gt >= 1, (gte) -> gte > 1); + + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + } + long nonNullElementCount = + repetitionLevelHistogram.stream().mapToLong(l -> l).sum() - definitionLevelHistogram.get(0); + long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0); + + // Given the total number of elements and non-null fields, we can compute the max size of any array field + long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); + final boolean blockCannotMatch = size.filter( + (eq) -> eq > maxArrayElementCount, + (lt) -> false, + (lte) -> false, + (gt) -> gt >= maxArrayElementCount, + (gte) -> gte > maxArrayElementCount); + + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + @Override @SuppressWarnings("unchecked") public > Boolean visit(NotEq notEq) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 5b9e638d60..fa5f529612 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -39,6 +39,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; @@ -506,6 +507,31 @@ public void testGtEqDouble() throws Exception { "Should not drop: contains matching values", canDrop(gtEq(d, Double.MIN_VALUE), ccmd, dictionaries)); } + @Test + public void testSizeBinary() throws Exception { + BinaryColumn b = binaryColumn("repeated_binary_field"); + + // DictionaryFilter knows that `repeated_binary_field` column has at most 26 element values + assertTrue(canDrop(size(b, Operators.Size.Operator.GT, 26), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.GTE, 27), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 27), ccmd, dictionaries)); + + assertFalse(canDrop(size(b, Operators.Size.Operator.LT, 27), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.LTE, 26), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.EQ, 26), ccmd, dictionaries)); + + // If column doesn't exist in meta, it should be treated as having size 0 + BinaryColumn nonExistentColumn = binaryColumn("nonexistant_col"); + + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GTE, 1), ccmd, dictionaries)); + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.EQ, 1), ccmd, dictionaries)); + + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); + } + @Test public void testInBinary() throws Exception { BinaryColumn b = binaryColumn("binary_field"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 1a1a31e73c..2ee785b383 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -33,7 +33,9 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableMap; @@ -50,6 +52,7 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; import org.apache.parquet.filter2.predicate.Operators.LongColumn; @@ -159,10 +162,9 @@ private static void assertFilter(List found, UserFilter f) { private static void assertPredicate(FilterPredicate predicate, long... expectedIds) throws IOException { List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(predicate)); - assertEquals(expectedIds.length, found.size()); - for (int i = 0; i < expectedIds.length; i++) { - assertEquals(expectedIds[i], found.get(i).getLong("id", 0)); - } + assertArrayEquals( + Arrays.stream(expectedIds).boxed().toArray(), + found.stream().map(f -> f.getLong("id", 0)).toArray(Long[]::new)); } @Test @@ -410,6 +412,37 @@ public void testArrayContainsMixedColumns() throws Exception { 30L); } + @Test + public void testArraySizeRequiredColumn() throws Exception { + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 2), 27L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 4), 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.GT, 1), 27L, 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.GTE, 4), 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 0), 17L, 18L, 19L); + + assertPredicate( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.LT, 2), + LongStream.concat(LongStream.of(17L, 18L, 19L, 20L, 30L, 31L), LongStream.range(100, 200)) + .toArray()); + + assertPredicate( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.LTE, 2), + LongStream.concat(LongStream.of(17L, 18L, 19L, 20L, 27L, 30L, 31L), LongStream.range(100, 200)) + .toArray()); + + assertPredicate( + not(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 1)), + 17L, + 18L, + 19L, + 27L, + 28L); + } + @Test public void testNameNotNull() throws Exception { BinaryColumn name = binaryColumn("name"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index 15d0a8ab13..b42d05f730 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -33,21 +33,35 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.statisticslevel.StatisticsFilter.canDrop; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.io.api.Binary.fromString; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.statistics.DoubleStatistics; import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.LogicalInverseRewriter; import org.apache.parquet.filter2.predicate.Operators; @@ -56,10 +70,18 @@ import org.apache.parquet.filter2.predicate.Operators.IntColumn; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Types; import org.junit.Test; @@ -68,17 +90,27 @@ public class TestStatisticsFilter { private static ColumnChunkMetaData getIntColumnMeta( org.apache.parquet.column.statistics.Statistics stats, long valueCount) { + return getIntColumnMeta(ColumnPath.get("int", "column"), stats, null, valueCount); + } + + private static ColumnChunkMetaData getIntColumnMeta( + ColumnPath columnPath, + org.apache.parquet.column.statistics.Statistics stats, + SizeStatistics sizeStatistics, + long valueCount) { return ColumnChunkMetaData.get( - ColumnPath.get("int", "column"), - PrimitiveTypeName.INT32, + columnPath, + new PrimitiveType(REQUIRED, INT32, columnPath.toDotString()), CompressionCodecName.GZIP, + null, new HashSet(Arrays.asList(Encoding.PLAIN)), stats, 0L, 0L, valueCount, 0L, - 0L); + 0L, + sizeStatistics); } private static ColumnChunkMetaData getDoubleColumnMeta( @@ -100,6 +132,7 @@ private static ColumnChunkMetaData getDoubleColumnMeta( private static final DoubleColumn doubleColumn = doubleColumn("double.column"); private static final BinaryColumn missingColumn = binaryColumn("missing"); private static final IntColumn missingColumn2 = intColumn("missing.int"); + private static final IntColumn nestedListColumn = intColumn("nestedGroup.listField.element"); private static final IntStatistics intStats = new IntStatistics(); private static final IntStatistics nullIntStats = new IntStatistics(); @@ -435,6 +468,174 @@ public void testOr() { assertFalse(canDrop(or(no, no), columnMetas)); } + @Test + public void testSizeFilterRequiredGroupRequiredElements() throws Exception { + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: Lists are populated + List columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + true, + ImmutableList.of( + ImmutableList.of(1, 2, 3), + ImmutableList.of(1), + ImmutableList.of(1, 2, 3), + ImmutableList.of())), + 4)); + + // SizeStats tells us that there are 7 total array elements spread across 3 non-empty list_fields, + // so the max size any single list_field could have is 5 + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: All lists are empty + columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + true, ImmutableList.of(ImmutableList.of(), ImmutableList.of(), ImmutableList.of())), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); + } + + @Test + public void testSizeFilterRequiredGroupOptionalElements() throws Exception { + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-empty + List listWithNulls = new ArrayList<>(); + listWithNulls.add(1); + listWithNulls.add(null); + listWithNulls.add(null); + List columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + true, + ImmutableList.of( + listWithNulls, ImmutableList.of(1), ImmutableList.of(1, 2, 3), ImmutableList.of())), + 4)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + } + + @Test + public void testSizeFilterOptionalGroup() throws Exception { + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-null + List columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + false, + ImmutableList.of(ImmutableList.of(1, 2, 3), ImmutableList.of(1), ImmutableList.of(1, 2, 3))), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: List is null + columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField(true, ImmutableList.of()), + 5)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); + } + + private static SizeStatistics createSizeStatisticsForRepeatedField( + boolean arrayGroupRequired, List> arrayValues) throws Exception { + + final MessageType messageSchema = Types.buildMessage() + .addField(Types.requiredGroup() + .addField((arrayGroupRequired ? Types.requiredGroup() : Types.optionalGroup()) + .as(LogicalTypeAnnotation.listType()) + .addField(Types.repeatedGroup() + .addField( + Types.primitive(INT32, REQUIRED).named("element")) + .named("list")) + .named("listField")) + .named("nestedGroup")) + .named("MyRecord"); + + // Write data + final File tmp = File.createTempFile(TestStatisticsFilter.class.getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + final Path file = new Path(tmp.getPath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, messageSchema.toString()) + .build()) { + + final SimpleGroup record = new SimpleGroup(messageSchema); + final Group nestedGroup = record.addGroup("nestedGroup"); + + for (List arrayValue : arrayValues) { + if (arrayValue != null) { + Group listField = nestedGroup.addGroup("listField"); + for (Integer value : arrayValue) { + Group list = listField.addGroup("list"); + if (value != null) { + list.append("element", value); + } + } + } + } + + writer.write(record); + } + + // Read size statistics + final ParquetMetadata footer = readFooter(new Configuration(), file, NO_FILTER); + assert (footer.getBlocks().size() == 1); + final BlockMetaData blockMetaData = footer.getBlocks().get(0); + assert (blockMetaData.getColumns().size() == 1); + return blockMetaData.getColumns().get(0).getSizeStatistics(); + } + public static class SevensAndEightsUdp extends UserDefinedPredicate { @Override From b0e352624520b6c3c10a45973b73deaaef3eb3f4 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 5 Dec 2024 15:14:52 -0500 Subject: [PATCH 02/15] PARQUET-34: Fix FilterApi signature --- .../java/org/apache/parquet/filter2/predicate/FilterApi.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index 787f0cf90c..06d8c048b8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -264,7 +264,7 @@ public static , P extends SingleColumnFilterPredicate return Contains.of(pred); } - public static Size size(Column column, Size.Operator operator, int value) { + public static Size size(Column column, Size.Operator operator, long value) { return new Size(column, operator, value); } From 8c6a11a5c60ffe0dca650a2a52ad91fafc98efad Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 5 Dec 2024 15:24:34 -0500 Subject: [PATCH 03/15] PARQUET-34: Test multiple size() predicates on different columns --- .../IncrementallyUpdatedFilterPredicateBuilderBase.java | 2 +- .../filter2/recordlevel/TestRecordLevelFilters.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index d820945722..b4a7b1aa59 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -60,7 +60,7 @@ */ public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor { - static final Operators.LongColumn SIZE_PSUEDOCOLUMN = FilterApi.longColumn("SIZE"); + static final Operators.LongColumn SIZE_PSUEDOCOLUMN = FilterApi.longColumn("$SIZE"); private boolean built = false; private final Map> valueInspectorsByColumn = new HashMap<>(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 2ee785b383..becee4f527 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -441,6 +441,13 @@ public void testArraySizeRequiredColumn() throws Exception { 19L, 27L, 28L); + + assertPredicate( + and( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 0), + size(binaryColumn("accounts.key_value.key"), Operators.Size.Operator.GT, 1)), + 17L, + 19L); } @Test From ee219e5124ec4db30fe41467c0cbdee74e7279b9 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 6 Dec 2024 12:20:50 -0500 Subject: [PATCH 04/15] PARQUET-34: Add ignore test for optional array field filter --- .../filter2/recordlevel/TestRecordLevelFilters.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index becee4f527..2c1d1145d9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -63,6 +63,7 @@ import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User; import org.apache.parquet.io.api.Binary; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; public class TestRecordLevelFilters { @@ -450,6 +451,12 @@ public void testArraySizeRequiredColumn() throws Exception { 19L); } + @Ignore(value = "Not yet supported") + @Test + public void testArraySizeOptionalColumn() throws Exception { + assertPredicate(size(binaryColumn("phoneNumbers.phone.kind"), Operators.Size.Operator.EQ, 4), 28L); + } + @Test public void testNameNotNull() throws Exception { BinaryColumn name = binaryColumn("name"); From d3323e2a06f6ad658b7ea6b3c0953f189f45e043 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 6 Dec 2024 14:26:58 -0500 Subject: [PATCH 05/15] PARQUET-34: Fix DictionaryFilter logic --- .../dictionarylevel/DictionaryFilter.java | 15 ++++++++------- .../dictionarylevel/DictionaryFilterTest.java | 16 ++++++++-------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index 27024f66c8..4f045f4dab 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -506,18 +506,19 @@ public Boolean visit(Size size) { } try { - // We know the block has at most `dictSize` array element values + // We know the block has at least as many array elements as the dictionary sizes final Set dict = expandDictionary(meta); if (dict == null) { return BLOCK_MIGHT_MATCH; } - int dictSize = dict.size(); + int numDistinctValues = dict.size(); final boolean blockCannotMatch = size.filter( - (eq) -> eq > dictSize, - (lt) -> false, - (lte) -> false, - (gt) -> gt >= dictSize, - (gte) -> gte > dictSize); + (eq) -> eq < numDistinctValues, + (lt) -> lt <= numDistinctValues, + (lte) -> lte < numDistinctValues, + (gt) -> false, + (gte) -> false); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; } catch (IOException e) { LOG.warn("Failed to process dictionary for filter evaluation.", e); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index fa5f529612..7b2b990ff9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -511,14 +511,14 @@ public void testGtEqDouble() throws Exception { public void testSizeBinary() throws Exception { BinaryColumn b = binaryColumn("repeated_binary_field"); - // DictionaryFilter knows that `repeated_binary_field` column has at most 26 element values - assertTrue(canDrop(size(b, Operators.Size.Operator.GT, 26), ccmd, dictionaries)); - assertTrue(canDrop(size(b, Operators.Size.Operator.GTE, 27), ccmd, dictionaries)); - assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 27), ccmd, dictionaries)); - - assertFalse(canDrop(size(b, Operators.Size.Operator.LT, 27), ccmd, dictionaries)); - assertFalse(canDrop(size(b, Operators.Size.Operator.LTE, 26), ccmd, dictionaries)); - assertFalse(canDrop(size(b, Operators.Size.Operator.EQ, 26), ccmd, dictionaries)); + // DictionaryFilter knows that `repeated_binary_field` column has at least 26 element values + assertFalse(canDrop(size(b, Operators.Size.Operator.GT, 26), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.GTE, 27), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.EQ, 27), ccmd, dictionaries)); + + assertTrue(canDrop(size(b, Operators.Size.Operator.LT, 26), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.LTE, 25), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 25), ccmd, dictionaries)); // If column doesn't exist in meta, it should be treated as having size 0 BinaryColumn nonExistentColumn = binaryColumn("nonexistant_col"); From c555fa20a5410d7c3c4da1e8e397a07c3c4d7b82 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 17 Jan 2025 13:45:24 -0500 Subject: [PATCH 06/15] PARQUET-34: fix StatisticsFilter test --- .../statisticslevel/TestStatisticsFilter.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index b42d05f730..34a2051a61 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -568,10 +568,12 @@ public void testSizeFilterOptionalGroup() throws Exception { assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); // Case 2: List is null + final List> listWithNull = new ArrayList<>(); + listWithNull.add(null); columnMeta = Collections.singletonList(getIntColumnMeta( nestedListColumn.getColumnPath(), minMaxStats, - createSizeStatisticsForRepeatedField(true, ImmutableList.of()), + createSizeStatisticsForRepeatedField(true, listWithNull), 5)); // These predicates should be able to filter out the page @@ -610,10 +612,10 @@ private static SizeStatistics createSizeStatisticsForRepeatedField( .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, messageSchema.toString()) .build()) { - final SimpleGroup record = new SimpleGroup(messageSchema); - final Group nestedGroup = record.addGroup("nestedGroup"); - for (List arrayValue : arrayValues) { + final SimpleGroup record = new SimpleGroup(messageSchema); + final Group nestedGroup = record.addGroup("nestedGroup"); + if (arrayValue != null) { Group listField = nestedGroup.addGroup("listField"); for (Integer value : arrayValue) { @@ -623,9 +625,9 @@ private static SizeStatistics createSizeStatisticsForRepeatedField( } } } - } - writer.write(record); + writer.write(record); + } } // Read size statistics From 5d08d3d764a96b1b9c7a16b352f0212e51f6c033 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 17 Jan 2025 13:52:39 -0500 Subject: [PATCH 07/15] PARQUET-34: Add comment for shouldUpdateDelegate --- .../recordlevel/IncrementallyUpdatedFilterPredicate.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index 96550f627f..03f769f661 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -227,6 +227,15 @@ public void reset() { class CountingValueInspector extends ValueInspector { private long observedValueCount; private final ValueInspector delegate; + + /** + * Triggering function to update the underlying delegate. We want to be careful not to trigger before + * all relevant column values have been considered. + * + * For example, given the predicate `size(col, LT, 3)` and a record with 4 array values, we don't want the + * underlying `lt(3)` predicate to be evaluated on the first or second elements of the array, since it would + * return a premature True value. + */ private final Function shouldUpdateDelegate; public CountingValueInspector(ValueInspector delegate, Function shouldUpdateDelegate) { From 685c91877b8f7fdbbb31f0fe9468e45e5ac704db Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 17 Jan 2025 14:31:22 -0500 Subject: [PATCH 08/15] PARQUET-34: Fix DictionaryFilter --- .../dictionarylevel/DictionaryFilter.java | 16 ++++++++-------- .../dictionarylevel/DictionaryFilterTest.java | 16 +++++++++------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index 4f045f4dab..e2eea2ef5e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -508,16 +508,16 @@ public Boolean visit(Size size) { try { // We know the block has at least as many array elements as the dictionary sizes final Set dict = expandDictionary(meta); - if (dict == null) { + + // If the column doesn't have a dictionary encoding, we can't infer anything about its size + if (dict == null || dict.isEmpty()) { return BLOCK_MIGHT_MATCH; } - int numDistinctValues = dict.size(); - final boolean blockCannotMatch = size.filter( - (eq) -> eq < numDistinctValues, - (lt) -> lt <= numDistinctValues, - (lte) -> lte < numDistinctValues, - (gt) -> false, - (gte) -> false); + + // Column has at least (nonempty) dict.size() values spread across over all records; + // predicates that match empty arrays cannot match + final boolean blockCannotMatch = + size.filter((eq) -> eq == 0, (lt) -> lt == 1, (lte) -> lte <= 1, (gt) -> false, (gte) -> false); return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; } catch (IOException e) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 7b2b990ff9..6f62883b44 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -509,16 +509,18 @@ public void testGtEqDouble() throws Exception { @Test public void testSizeBinary() throws Exception { + // repeated_binary_field dict has 26 distinct values BinaryColumn b = binaryColumn("repeated_binary_field"); - // DictionaryFilter knows that `repeated_binary_field` column has at least 26 element values - assertFalse(canDrop(size(b, Operators.Size.Operator.GT, 26), ccmd, dictionaries)); - assertFalse(canDrop(size(b, Operators.Size.Operator.GTE, 27), ccmd, dictionaries)); - assertFalse(canDrop(size(b, Operators.Size.Operator.EQ, 27), ccmd, dictionaries)); + // DictionaryFilter knows that `repeated_binary_field` column has at least 26 element values spread across + // records + assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); - assertTrue(canDrop(size(b, Operators.Size.Operator.LT, 26), ccmd, dictionaries)); - assertTrue(canDrop(size(b, Operators.Size.Operator.LTE, 25), ccmd, dictionaries)); - assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 25), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.EQ, 30), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.GTE, 1), ccmd, dictionaries)); // If column doesn't exist in meta, it should be treated as having size 0 BinaryColumn nonExistentColumn = binaryColumn("nonexistant_col"); From 495d5041d4190ab040b0fdb4a29b3aff904ad58b Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 17 Jan 2025 14:41:35 -0500 Subject: [PATCH 09/15] PARQUET-34: Test DictionaryFilter for column with skipped dict encoding --- .../dictionarylevel/DictionaryFilterTest.java | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 6f62883b44..9adaa420c9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -116,6 +116,7 @@ public class DictionaryFilterTest { + "required binary fallback_binary_field; " + "required int96 int96_field; " + "repeated binary repeated_binary_field;" + + "repeated binary repeated_binary_field_high_cardinality;" // high cardinality, no dict encoding produced + "} "); private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; @@ -197,6 +198,10 @@ private static void writeData(SimpleGroupFactory f, ParquetWriter writer) group.append("optional_single_value_field", "sharp"); } + for (char letter : ALPHABET.toCharArray()) { + group = group.append("repeated_binary_field_high_cardinality", String.valueOf(letter)); + } + writer.write(group); } writer.close(); @@ -218,6 +223,7 @@ private static void prepareFile(WriterVersion version, Path file) throws IOExcep .withRowGroupSize(1024 * 1024) .withPageSize(1024) .enableDictionaryEncoding() + .withDictionaryEncoding("repeated_binary_field_high_cardinality", false) .withDictionaryPageSize(2 * 1024) .withConf(conf) .build(); @@ -510,10 +516,9 @@ public void testGtEqDouble() throws Exception { @Test public void testSizeBinary() throws Exception { // repeated_binary_field dict has 26 distinct values - BinaryColumn b = binaryColumn("repeated_binary_field"); + final BinaryColumn b = binaryColumn("repeated_binary_field"); - // DictionaryFilter knows that `repeated_binary_field` column has at least 26 element values spread across - // records + // DictionaryFilter infers that col `repeated_binary_field` has >= 26 values spread across row group assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); assertTrue(canDrop(size(b, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); assertTrue(canDrop(size(b, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); @@ -522,8 +527,8 @@ public void testSizeBinary() throws Exception { assertFalse(canDrop(size(b, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); assertFalse(canDrop(size(b, Operators.Size.Operator.GTE, 1), ccmd, dictionaries)); - // If column doesn't exist in meta, it should be treated as having size 0 - BinaryColumn nonExistentColumn = binaryColumn("nonexistant_col"); + // If column doesn't exist in meta, it has no values and can be treated as having size 0 + final BinaryColumn nonExistentColumn = binaryColumn("nonexistant_col"); assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GTE, 1), ccmd, dictionaries)); @@ -532,6 +537,16 @@ public void testSizeBinary() throws Exception { assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); + + // If column exists but doesn't have a dict, we cannot infer anything about its size + final BinaryColumn noDictColumn = binaryColumn("repeated_binary_field_high_cardinality"); + + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.GTE, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.EQ, 1), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); } @Test From 6a7207b09ff033e9997c888fdd3898d5e54196c5 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 17 Jan 2025 14:47:18 -0500 Subject: [PATCH 10/15] PARQUET-34: Simplify size#toString and include col name --- .../java/org/apache/parquet/filter2/predicate/Operators.java | 4 ++-- .../parquet/filter2/predicate/TestFilterApiMethods.java | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 16df708c54..070d1e6916 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -576,8 +576,8 @@ public int hashCode() { @Override public String toString() { - String name = Size.class.getSimpleName().toLowerCase(Locale.ENGLISH); - return name + "(" + operator.toString().toLowerCase() + " " + value + ")"; + return "size(" + column.getColumnPath().toDotString() + " " + + operator.toString().toLowerCase() + " " + value + ")"; } } diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java index 9bb180087b..e5c5a77676 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java @@ -29,6 +29,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.Operators.NotEq; import static org.junit.Assert.assertEquals; @@ -139,6 +140,9 @@ public void testToString() { assertEquals( "or(contains(eq(a.string.column, Binary{\"foo\"})), and(contains(eq(a.string.column, Binary{\"bar\"})), not(contains(eq(a.string.column, Binary{\"baz\"})))))", pred.toString()); + + pred = size(binColumn, Operators.Size.Operator.GTE, 5); + assertEquals("size(a.string.column gte 5)", pred.toString()); } @Test From d2e8dc3b0168770637013ecbadf52f5194f5143b Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 17 Jan 2025 14:56:19 -0500 Subject: [PATCH 11/15] PARQUET-34: Use Int for Size predicate value --- .../parquet/filter2/predicate/FilterApi.java | 2 +- .../filter2/predicate/LogicalInverter.java | 2 +- .../parquet/filter2/predicate/Operators.java | 16 ++++++++-------- .../IncrementallyUpdatedFilterPredicate.java | 6 +++--- ...entallyUpdatedFilterPredicateBuilderBase.java | 4 ++-- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index 06d8c048b8..787f0cf90c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -264,7 +264,7 @@ public static , P extends SingleColumnFilterPredicate return Contains.of(pred); } - public static Size size(Column column, Size.Operator operator, long value) { + public static Size size(Column column, Size.Operator operator, int value) { return new Size(column, operator, value); } diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java index 9ffbae069d..6c7eed1e1d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java @@ -101,7 +101,7 @@ public > FilterPredicate visit(Contains contains) { @Override public FilterPredicate visit(Size size) { - final long value = size.getValue(); + final int value = size.getValue(); final Operators.Column column = size.getColumn(); return size.filter( diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 070d1e6916..5a191c5b96 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -516,9 +516,9 @@ public enum Operator { private final Column column; private final Operator operator; - private final long value; + private final int value; - Size(Column column, Operator operator, long value) { + Size(Column column, Operator operator, int value) { this.column = column; this.operator = operator; if (value < 0) { @@ -532,7 +532,7 @@ public R accept(Visitor visitor) { return visitor.visit(this); } - public long getValue() { + public int getValue() { return value; } @@ -541,11 +541,11 @@ public Column getColumn() { } public R filter( - Function onEq, - Function onLt, - Function onLtEq, - Function onGt, - Function onGtEq) { + Function onEq, + Function onLt, + Function onLtEq, + Function onGt, + Function onGtEq) { if (operator == Operator.EQ) { return onEq.apply(value); } else if (operator == Operator.LT) { diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index 03f769f661..0302121591 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -225,7 +225,7 @@ public void reset() { } class CountingValueInspector extends ValueInspector { - private long observedValueCount; + private int observedValueCount; private final ValueInspector delegate; /** @@ -236,9 +236,9 @@ class CountingValueInspector extends ValueInspector { * underlying `lt(3)` predicate to be evaluated on the first or second elements of the array, since it would * return a premature True value. */ - private final Function shouldUpdateDelegate; + private final Function shouldUpdateDelegate; - public CountingValueInspector(ValueInspector delegate, Function shouldUpdateDelegate) { + public CountingValueInspector(ValueInspector delegate, Function shouldUpdateDelegate) { this.observedValueCount = 0; this.delegate = delegate; this.shouldUpdateDelegate = shouldUpdateDelegate; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index b4a7b1aa59..f1759cd09b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -60,7 +60,7 @@ */ public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor { - static final Operators.LongColumn SIZE_PSUEDOCOLUMN = FilterApi.longColumn("$SIZE"); + static final Operators.IntColumn SIZE_PSUEDOCOLUMN = FilterApi.intColumn("$SIZE"); private boolean built = false; private final Map> valueInspectorsByColumn = new HashMap<>(); @@ -80,7 +80,7 @@ public IncrementallyUpdatedFilterPredicateBuilderBase(List le SIZE_PSUEDOCOLUMN.getColumnPath(), new PrimitiveType( Type.Repetition.REQUIRED, - PrimitiveType.PrimitiveTypeName.INT64, + PrimitiveType.PrimitiveTypeName.INT32, SIZE_PSUEDOCOLUMN.getColumnPath().toDotString()) .comparator()); } From 2e8ba2299181ccae2306bcf87b355882161de9c8 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 17 Jan 2025 16:10:13 -0500 Subject: [PATCH 12/15] PARQUET-34: Assert that size(lt 0) is invalid --- .../java/org/apache/parquet/filter2/predicate/Operators.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 5a191c5b96..81e0d976ed 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -521,8 +521,8 @@ public enum Operator { Size(Column column, Operator operator, int value) { this.column = column; this.operator = operator; - if (value < 0) { - throw new IllegalArgumentException("Argument to size() operator cannot be negative: " + value); + if (value < 0 || (operator == Operator.LT && value == 0)) { + throw new IllegalArgumentException("Invalid predicate " + this + ": array size can never be negative"); } this.value = value; } From 9586427b780bf28e274949448dfc4ebbe39284cc Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 20 Jan 2025 12:49:22 -0500 Subject: [PATCH 13/15] PARQUET-34: Implement page-level Size filter --- .../columnindex/ColumnIndexBuilder.java | 81 +++++++++++++++- .../columnindex/TestColumnIndexBuilder.java | 97 +++++++++++++++++++ .../statisticslevel/StatisticsFilter.java | 35 +++---- .../statisticslevel/TestStatisticsFilter.java | 44 ++++++++- 4 files changed, 236 insertions(+), 21 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 7468b51ac0..e23c2de138 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -381,7 +381,69 @@ public > PrimitiveIterator.OfInt visit(Contains conta @Override public PrimitiveIterator.OfInt visit(Size size) { - return IndexIterator.all(getPageCount()); + if (repLevelHistogram == null || defLevelHistogram == null) { + return IndexIterator.all(getPageCount()); + } + + final int[] repLevelOffsets = calculateOffsetsForHistogram(repLevelHistogram, nullPages); + final int[] defLevelOffsets = calculateOffsetsForHistogram(defLevelHistogram, nullPages); + + return IndexIterator.filter(getPageCount(), pageIndex -> { + final boolean isFinalPage = pageIndex + 1 == nullPages.length; + final List pageRepLevelHistogram = getRepetitionLevelHistogram() + .subList( + repLevelOffsets[pageIndex], + isFinalPage ? repLevelHistogram.length : repLevelOffsets[pageIndex + 1]); + final List pageDefLevelHistogram = getDefinitionLevelHistogram() + .subList( + defLevelOffsets[pageIndex], + isFinalPage ? defLevelHistogram.length : defLevelOffsets[pageIndex + 1]); + + if (pageRepLevelHistogram.isEmpty() || pageDefLevelHistogram.isEmpty()) { + // Page might match; cannot be filtered out + return true; + } + + final int defLevelCount = pageDefLevelHistogram.size(); + + // If all values have repetition level 0, then no array has more than 1 element + if (pageRepLevelHistogram.size() == 1 + || (pageRepLevelHistogram.get(0) > 0 + && pageRepLevelHistogram.subList(1, pageRepLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { + + if ( + // all lists are null or empty + (pageDefLevelHistogram.subList(1, defLevelCount).stream().allMatch(l -> l == 0))) { + return size.filter( + (eq) -> eq <= 0, (lt) -> true, (lte) -> true, (gt) -> gt < 0, (gte) -> gte <= 0); + } + + final int maxDefinitionLevel = defLevelCount - 1; + + // If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all + // lists are of size 1 + if (pageDefLevelHistogram.subList(0, maxDefinitionLevel - 1).stream() + .allMatch(l -> l == 0)) { + return size.filter( + (eq) -> eq == 1, (lt) -> lt > 1, (lte) -> lte >= 1, (gt) -> gt < 1, (gte) -> gte <= 1); + } + } + + final long nonNullElementCount = + pageRepLevelHistogram.stream().mapToLong(l -> l).sum() - pageDefLevelHistogram.get(0); + final long numNonNullRecords = pageRepLevelHistogram.get(0) - pageDefLevelHistogram.get(0); + + // Given the total number of elements and non-null fields, we can compute the max size of any array + // field + final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); + return size.filter( + (eq) -> eq <= maxArrayElementCount, + (lt) -> true, + (lte) -> true, + (gt) -> gt < maxArrayElementCount, + (gte) -> gte <= maxArrayElementCount); + }); } @Override @@ -444,6 +506,23 @@ public boolean test(int pageIndex) { } }); } + + // Calculates each page's starting offset in a concatenated histogram + private static int[] calculateOffsetsForHistogram(long[] histogram, boolean[] nullPages) { + final int numNullPages = + (int) BooleanList.of(nullPages).stream().filter(p -> p).count(); + final int numNonNullPages = nullPages.length - numNullPages; + final int numLevelsPerNonNullPage = (histogram.length - numNullPages) / numNonNullPages; + + int[] offsets = new int[nullPages.length]; + int currOffset = 0; + for (int i = 0; i < nullPages.length; ++i) { + offsets[i] = currOffset; + currOffset += (nullPages[i] ? 1 : numLevelsPerNonNullPage); + } + + return offsets; + } } private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() { diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java index 58a899eefc..1f2ba8550a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java @@ -36,6 +36,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; import static org.apache.parquet.schema.OriginalType.DECIMAL; @@ -56,6 +57,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -64,9 +66,11 @@ import java.util.List; import java.util.Set; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.ContainsRewriter; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; @@ -1627,6 +1631,99 @@ public void testNoOpBuilder() { assertNull(builder.build()); } + @Test + public void testSizeRequiredElements() { + final PrimitiveType type = Types.required(DOUBLE).named("element"); + final DoubleColumn col = doubleColumn(type.getName()); + + final List> pageValueList = new ArrayList<>(); + pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0)); + pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0, 4.0, 5.0)); + pageValueList.add(ImmutableList.of(-1.0)); + pageValueList.add(ImmutableList.of()); + pageValueList.add(null); + + final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList); + + assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder()); + assertCorrectNullCounts(columnIndex, 0, 0, 0, 0, 0); + assertCorrectNullPages(columnIndex, false, false, false, true, true); + assertCorrectValues(columnIndex.getMaxValues(), 3.0, 5.0, -1.0, null, null); + assertCorrectValues(columnIndex.getMinValues(), 1.0, 1.0, -1.0, null, null); + + // we know max array size is 5; all elements of page 2 have size 1; and page 3 and 4 are null or empty + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0, 1, 3, 4); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 4), 1); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 3), 0, 1); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 2), 0, 1, 2, 3, 4); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 1), 0, 1, 2, 3, 4); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0, 1, 2); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 0), 0, 1, 2, 3, 4); + } + + @Test + public void testSizeOptionalElements() { + final PrimitiveType type = Types.optional(DOUBLE).named("element"); + final DoubleColumn col = doubleColumn(type.getName()); + + final List listWithNulls = new ArrayList<>(); + listWithNulls.add(null); + listWithNulls.add(3.0); + listWithNulls.add(null); + + final List> pageValueList = new ArrayList<>(); + pageValueList.add(listWithNulls); + + final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList); + + assertCorrectNullCounts(columnIndex, 2); + assertCorrectNullPages(columnIndex, false); + assertCorrectValues(columnIndex.getMaxValues(), 3.0); + assertCorrectValues(columnIndex.getMinValues(), 3.0); + + // We know that the array values for the page have min size 0 and max size 3 + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 5)); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 4), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 3), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 3)); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 3), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 4)); + } + + private static ColumnIndex createArrayColumnIndex(PrimitiveType type, List> pageValueList) { + final ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); + + for (List pageValues : pageValueList) { + final StatsBuilder sb = new StatsBuilder(); + boolean isNullOrEmpty = pageValues == null || pageValues.isEmpty(); + + final SizeStatistics.Builder sizeStatistics = + SizeStatistics.newBuilder(type, isNullOrEmpty ? 0 : 1, isNullOrEmpty ? 0 : 1); + + if (isNullOrEmpty) sizeStatistics.add(0, 0); + + if (pageValues != null) { + for (int i = 0; i < pageValues.size(); i++) { + if (i == 0) { + sizeStatistics.add(0, 1); + } else { + sizeStatistics.add(1, 1); + } + } + } + + if (pageValues == null) { + builder.add(sb.stats(type), sizeStatistics.build()); + } else { + builder.add(sb.stats(type, pageValues.toArray(new Double[0])), sizeStatistics.build()); + } + } + + return builder.build(); + } + private static List toBBList(Binary... values) { List buffers = new ArrayList<>(values.length); for (Binary value : values) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index 793fb0c8be..07d04fdb8a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -219,6 +219,10 @@ public > Boolean visit(Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + /** + * Logically equivalent to {@link org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder}, + * but for block granularity + **/ @Override public Boolean visit(Size size) { final ColumnChunkMetaData metadata = getColumnChunk(size.getColumn().getColumnPath()); @@ -239,40 +243,37 @@ public Boolean visit(Size size) { // If all values have repetition level 0, then no array has more than 1 element if (repetitionLevelHistogram.size() == 1 - || repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream() - .allMatch(l -> l == 0)) { - - // Null list fields are treated as having size 0 - if (( // all lists are nulls - definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream() - .allMatch(l -> l == 0)) - || // all lists are size 0 - (definitionLevelHistogram.get(0) == 0 - && definitionLevelHistogram.subList(2, definitionLevelHistogram.size()).stream() - .allMatch(l -> l == 0))) { + || (repetitionLevelHistogram.get(0) > 0 + && repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { + + // All lists are null or empty + if ((definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { final boolean blockCannotMatch = size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; } - long maxDefinitionLevel = definitionLevelHistogram.get(definitionLevelHistogram.size() - 1); + final int maxDefinitionLevel = definitionLevelHistogram.size() - 1; - // If all repetition levels are zero and all definitions level are > MAX_DEFINITION_LEVEL - 1, all lists + // If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all lists // are of size 1 - if (definitionLevelHistogram.stream().allMatch(l -> l > maxDefinitionLevel - 1)) { + if (definitionLevelHistogram.subList(0, maxDefinitionLevel - 1).stream() + .allMatch(l -> l == 0)) { final boolean blockCannotMatch = size.filter( (eq) -> eq != 1, (lt) -> lt <= 1, (lte) -> lte < 1, (gt) -> gt >= 1, (gte) -> gte > 1); return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; } } - long nonNullElementCount = + final long nonNullElementCount = repetitionLevelHistogram.stream().mapToLong(l -> l).sum() - definitionLevelHistogram.get(0); - long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0); + final long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0); // Given the total number of elements and non-null fields, we can compute the max size of any array field - long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); + final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); final boolean blockCannotMatch = size.filter( (eq) -> eq > maxArrayElementCount, (lt) -> false, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index 34a2051a61..3c08e36e7c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -513,6 +513,23 @@ public void testSizeFilterRequiredGroupRequiredElements() throws Exception { assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); + + // Case 3: all lists have size 1 + columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(1), ImmutableList.of(1))), + 2)); + + // We know that records have max array size 1 + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 2), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 1), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 2), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 2), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 1), columnMeta)); } @Test @@ -563,18 +580,39 @@ public void testSizeFilterOptionalGroup() throws Exception { assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 3), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 2), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); - // Case 2: List is null + // Case 2: Lists are null final List> listWithNull = new ArrayList<>(); listWithNull.add(null); + listWithNull.add(null); columnMeta = Collections.singletonList(getIntColumnMeta( nestedListColumn.getColumnPath(), minMaxStats, createSizeStatisticsForRepeatedField(true, listWithNull), - 5)); + 2)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta)); + + // Case 3: lists are empty + columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(), ImmutableList.of())), + 2)); // These predicates should be able to filter out the page assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); @@ -582,6 +620,7 @@ public void testSizeFilterOptionalGroup() throws Exception { assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); @@ -615,7 +654,6 @@ private static SizeStatistics createSizeStatisticsForRepeatedField( for (List arrayValue : arrayValues) { final SimpleGroup record = new SimpleGroup(messageSchema); final Group nestedGroup = record.addGroup("nestedGroup"); - if (arrayValue != null) { Group listField = nestedGroup.addGroup("listField"); for (Integer value : arrayValue) { From 39981e9f8eeec2f8492fbfcb35e997aa299af18d Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 20 Jan 2025 14:10:31 -0500 Subject: [PATCH 14/15] PARQUET-34: Test StatisticsFilter for nested list types --- .../statisticslevel/TestStatisticsFilter.java | 365 ++++++++++-------- 1 file changed, 205 insertions(+), 160 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index 3c08e36e7c..0c1d0f6550 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -54,6 +54,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.Encoding; @@ -79,6 +81,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -470,176 +473,212 @@ public void testOr() { @Test public void testSizeFilterRequiredGroupRequiredElements() throws Exception { - final IntStatistics minMaxStats = new IntStatistics(); - - // Case 1: Lists are populated - List columnMeta = Collections.singletonList(getIntColumnMeta( - nestedListColumn.getColumnPath(), - minMaxStats, - createSizeStatisticsForRepeatedField( - true, - ImmutableList.of( - ImmutableList.of(1, 2, 3), - ImmutableList.of(1), - ImmutableList.of(1, 2, 3), - ImmutableList.of())), - 4)); - - // SizeStats tells us that there are 7 total array elements spread across 3 non-empty list_fields, - // so the max size any single list_field could have is 5 - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); - - // These predicates should not be able to filter out the page - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); - - // Case 2: All lists are empty - columnMeta = Collections.singletonList(getIntColumnMeta( - nestedListColumn.getColumnPath(), - minMaxStats, - createSizeStatisticsForRepeatedField( - true, ImmutableList.of(ImmutableList.of(), ImmutableList.of(), ImmutableList.of())), - 3)); - - // These predicates should be able to filter out the page - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); - - // These predicates should not be able to filter out the page - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); - - // Case 3: all lists have size 1 - columnMeta = Collections.singletonList(getIntColumnMeta( - nestedListColumn.getColumnPath(), - minMaxStats, - createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(1), ImmutableList.of(1))), - 2)); - - // We know that records have max array size 1 - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 2), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 1), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 2), columnMeta)); - - // These predicates should not be able to filter out the page - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 2), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 1), columnMeta)); + for (int nestingLevel = 0; nestingLevel < 3; nestingLevel++) { + final String nestingPrefix = IntStream.range(0, nestingLevel) + .mapToObj(l -> "NestedGroup" + l) + .collect(Collectors.joining(".")); + final IntColumn columnName = + intColumn(((nestingPrefix.isEmpty() ? "" : nestingPrefix + ".") + "nestedGroup.listField.element")); + + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: Lists are populated + List columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + true, + ImmutableList.of( + ImmutableList.of(1, 2, 3), + ImmutableList.of(1), + ImmutableList.of(1, 2, 3), + ImmutableList.of())), + 4)); + + // SizeStats tells us that there are 7 total array elements spread across 3 non-empty list_fields, + // so the max size any single list_field could have is 5 + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: All lists are empty + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + true, + ImmutableList.of(ImmutableList.of(), ImmutableList.of(), ImmutableList.of())), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 0), columnMeta)); + + // Case 3: all lists have size 1 + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, true, ImmutableList.of(ImmutableList.of(1), ImmutableList.of(1))), + 2)); + + // We know that records have max array size 1 + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 2), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 2), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 2), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 1), columnMeta)); + } } @Test public void testSizeFilterRequiredGroupOptionalElements() throws Exception { - final IntStatistics minMaxStats = new IntStatistics(); - - // Case 1: List is non-empty - List listWithNulls = new ArrayList<>(); - listWithNulls.add(1); - listWithNulls.add(null); - listWithNulls.add(null); - List columnMeta = Collections.singletonList(getIntColumnMeta( - nestedListColumn.getColumnPath(), - minMaxStats, - createSizeStatisticsForRepeatedField( - true, - ImmutableList.of( - listWithNulls, ImmutableList.of(1), ImmutableList.of(1, 2, 3), ImmutableList.of())), - 4)); - - // These predicates should be able to filter out the page - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); - - // These predicates should not be able to filter out the page - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + for (int nestingLevel = 0; nestingLevel < 3; nestingLevel++) { + final String nestingPrefix = IntStream.range(0, nestingLevel) + .mapToObj(l -> "NestedGroup" + l) + .collect(Collectors.joining(".")); + final IntColumn columnName = + intColumn(((nestingPrefix.isEmpty() ? "" : nestingPrefix + ".") + "nestedGroup.listField.element")); + + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-empty + List listWithNulls = new ArrayList<>(); + listWithNulls.add(1); + listWithNulls.add(null); + listWithNulls.add(null); + List columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + true, + ImmutableList.of( + listWithNulls, ImmutableList.of(1), ImmutableList.of(1, 2, 3), ImmutableList.of())), + 4)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + } } @Test public void testSizeFilterOptionalGroup() throws Exception { - final IntStatistics minMaxStats = new IntStatistics(); - - // Case 1: List is non-null - List columnMeta = Collections.singletonList(getIntColumnMeta( - nestedListColumn.getColumnPath(), - minMaxStats, - createSizeStatisticsForRepeatedField( - false, - ImmutableList.of(ImmutableList.of(1, 2, 3), ImmutableList.of(1), ImmutableList.of(1, 2, 3))), - 3)); - - // These predicates should be able to filter out the page - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); - - // These predicates should not be able to filter out the page - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 3), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 2), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); - - // Case 2: Lists are null - final List> listWithNull = new ArrayList<>(); - listWithNull.add(null); - listWithNull.add(null); - columnMeta = Collections.singletonList(getIntColumnMeta( - nestedListColumn.getColumnPath(), - minMaxStats, - createSizeStatisticsForRepeatedField(true, listWithNull), - 2)); - - // These predicates should be able to filter out the page - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); - - // These predicates should not be able to filter out the page - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta)); - - // Case 3: lists are empty - columnMeta = Collections.singletonList(getIntColumnMeta( - nestedListColumn.getColumnPath(), - minMaxStats, - createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(), ImmutableList.of())), - 2)); - - // These predicates should be able to filter out the page - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); - assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); - - // These predicates should not be able to filter out the page - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); - assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); + for (int nestingLevel = 0; nestingLevel < 3; nestingLevel++) { + final String nestingPrefix = IntStream.range(0, nestingLevel) + .mapToObj(l -> "NestedGroup" + l) + .collect(Collectors.joining(".")); + final IntColumn columnName = + intColumn(((nestingPrefix.isEmpty() ? "" : nestingPrefix + ".") + "nestedGroup.listField.element")); + + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-null + List columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + false, + ImmutableList.of( + ImmutableList.of(1, 2, 3), ImmutableList.of(1), ImmutableList.of(1, 2, 3))), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GT, 2), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: Lists are null + final List> listWithNull = new ArrayList<>(); + listWithNull.add(null); + listWithNull.add(null); + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField(nestingLevel, true, listWithNull), + 2)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 0), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GTE, 0), columnMeta)); + + // Case 3: lists are empty + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, true, ImmutableList.of(ImmutableList.of(), ImmutableList.of())), + 2)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GTE, 0), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 0), columnMeta)); + } } private static SizeStatistics createSizeStatisticsForRepeatedField( - boolean arrayGroupRequired, List> arrayValues) throws Exception { - - final MessageType messageSchema = Types.buildMessage() - .addField(Types.requiredGroup() - .addField((arrayGroupRequired ? Types.requiredGroup() : Types.optionalGroup()) - .as(LogicalTypeAnnotation.listType()) - .addField(Types.repeatedGroup() - .addField( - Types.primitive(INT32, REQUIRED).named("element")) - .named("list")) - .named("listField")) - .named("nestedGroup")) - .named("MyRecord"); + int nestingLevel, boolean arrayGroupRequired, List> arrayValues) throws Exception { + + GroupType groupSchema = Types.requiredGroup() + .addField((arrayGroupRequired ? Types.requiredGroup() : Types.optionalGroup()) + .as(LogicalTypeAnnotation.listType()) + .addField(Types.repeatedGroup() + .addField(Types.primitive(INT32, REQUIRED).named("element")) + .named("list")) + .named("listField")) + .named("nestedGroup"); + + for (int i = 0; i < nestingLevel; i++) { + groupSchema = Types.requiredGroup().addField(groupSchema).named("NestedGroup" + i); + } + + final MessageType messageSchema = + Types.buildMessage().addField(groupSchema).named("MyRecord"); // Write data final File tmp = File.createTempFile(TestStatisticsFilter.class.getSimpleName(), ".tmp"); @@ -653,7 +692,13 @@ private static SizeStatistics createSizeStatisticsForRepeatedField( for (List arrayValue : arrayValues) { final SimpleGroup record = new SimpleGroup(messageSchema); - final Group nestedGroup = record.addGroup("nestedGroup"); + + Group group = record; + for (int i = nestingLevel - 1; i >= 0; i--) { + group = group.addGroup("NestedGroup" + i); + } + + final Group nestedGroup = group.addGroup("nestedGroup"); if (arrayValue != null) { Group listField = nestedGroup.addGroup("listField"); for (Integer value : arrayValue) { From 3b46d8f7534660dd97e970039c7a2ea27c114b4e Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 4 Feb 2025 11:33:33 -0500 Subject: [PATCH 15/15] PARQUET-34: Reproduce inconsistent SizeStatistics test result --- .../TestSizeStatisticsRoundTrip.java | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java index 59e4aff2de..3fdcd7dd72 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java @@ -18,26 +18,38 @@ */ package org.apache.parquet.statistics; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -96,6 +108,125 @@ public void testBinaryColumnSizeStatistics() throws IOException { } } + private static final List> REPEATED_RECORD_VALUES = ImmutableList.of( + ImmutableList.of(1, 2, 3), ImmutableList.of(1), ImmutableList.of(1, 2), ImmutableList.of()); + + private static final MessageType MESSAGE_SCHEMA_REPEATED_FIELD = Types.buildMessage() + .addField(Types.requiredGroup() + .as(LogicalTypeAnnotation.listType()) + .addField(Types.repeatedGroup() + .addField(Types.primitive(INT32, REQUIRED).named("element")) + .named("list")) + .named("my_list")) + .named("MyRecord"); + + @Test + public void testSizeStatsWrittenWithExampleWriter() throws Exception { + final File tmp = File.createTempFile(TestSizeStatisticsRoundTrip.class.getSimpleName(), ".parquet"); + tmp.deleteOnExit(); + tmp.delete(); + final Path file = new Path(tmp.getPath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, MESSAGE_SCHEMA_REPEATED_FIELD.toString()) + .build()) { + + for (List arrayValue : REPEATED_RECORD_VALUES) { + final SimpleGroup record = new SimpleGroup(MESSAGE_SCHEMA_REPEATED_FIELD); + + Group listField = record.addGroup("my_list"); + for (Integer value : arrayValue) { + Group list = listField.addGroup("list"); + if (value != null) { + list.append("element", value); + } + } + writer.write(record); + } + } + + final SizeStatistics stats = getSizeStatisticsFromFile(file); + + Assert.assertEquals(ImmutableList.of(1L, 6L), stats.getDefinitionLevelHistogram()); + Assert.assertEquals(ImmutableList.of(4L, 3L), stats.getRepetitionLevelHistogram()); + } + + @Test + public void testSizeStatsWrittenWithRecordConsumer() throws Exception { + final File tmp = File.createTempFile(TestSizeStatisticsRoundTrip.class.getSimpleName(), ".parquet"); + tmp.deleteOnExit(); + tmp.delete(); + final Path file = new Path(tmp.getPath()); + + ParquetWriter> writer = new ParquetWriter<>(file, new WriteSupport>() { + RecordConsumer rc = null; + + @Override + public WriteSupport.WriteContext init(Configuration configuration) { + return init((ParquetConfiguration) null); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { + return new WriteContext(MESSAGE_SCHEMA_REPEATED_FIELD, new HashMap<>()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.rc = recordConsumer; + } + + @Override + public void write(List arrayValue) { + rc.startMessage(); + + rc.startField("my_list", 0); + rc.startGroup(); + if (arrayValue != null) { + for (int i = 0; i < arrayValue.size(); i++) { + rc.startField("list", 0); + rc.startGroup(); + + rc.startField("element", 0); + rc.addInteger(arrayValue.get(i)); + rc.endField("element", 0); + + rc.endGroup(); + rc.endField("list", 0); + } + } + rc.endGroup(); + rc.endField("my_list", 0); + rc.endMessage(); + } + }); + + for (List recordArrayValue : REPEATED_RECORD_VALUES) { + writer.write(recordArrayValue); + } + + writer.close(); + + final SizeStatistics stats = getSizeStatisticsFromFile(file); + + // Assert that these records have the same rep- and def-level histograms as the ExampleParquetWriter test + + // this assertion passes + Assert.assertEquals(ImmutableList.of(1L, 6L), stats.getDefinitionLevelHistogram()); + + // this assertion FAILS, actual repetition list is [7, 0] + Assert.assertEquals(ImmutableList.of(4L, 3L), stats.getRepetitionLevelHistogram()); + } + + private static SizeStatistics getSizeStatisticsFromFile(Path file) throws IOException { + final ParquetMetadata footer = + ParquetFileReader.readFooter(new Configuration(), file, ParquetMetadataConverter.NO_FILTER); + assert (footer.getBlocks().size() == 1); + final BlockMetaData blockMetaData = footer.getBlocks().get(0); + assert (blockMetaData.getColumns().size() == 1); + return blockMetaData.getColumns().get(0).getSizeStatistics(); + } + private Path newTempPath() throws IOException { File file = temp.newFile(); Preconditions.checkArgument(file.delete(), "Could not remove temp file");