diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java index 5050ee8f01..ba999fb1d0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java @@ -33,6 +33,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -97,6 +98,11 @@ public > FilterPredicate visit(Contains contains) { return contains; } + @Override + public FilterPredicate visit(Size size) { + return size; + } + @Override public FilterPredicate visit(And and) { final FilterPredicate left; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index 3c51680667..787f0cf90c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -40,6 +40,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.SingleColumnFilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.SupportsEqNotEq; import org.apache.parquet.filter2.predicate.Operators.SupportsLtGt; import org.apache.parquet.filter2.predicate.Operators.UserDefined; @@ -263,6 +264,10 @@ public static , P extends SingleColumnFilterPredicate return Contains.of(pred); } + public static Size size(Column column, Size.Operator operator, int value) { + return new Size(column, operator, value); + } + /** * Keeps records that pass the provided {@link UserDefinedPredicate} *

diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java index a662bb0b17..5e75a3bc6a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java @@ -31,6 +31,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -89,6 +90,10 @@ default > R visit(Contains contains) { throw new UnsupportedOperationException("visit Contains is not supported."); } + default R visit(Size size) { + throw new UnsupportedOperationException("visit Size is not supported."); + } + R visit(And and); R visit(Or or); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java index d1d7f07e80..d4817460c6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java @@ -36,6 +36,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -104,6 +105,11 @@ public > FilterPredicate visit(Contains contains) { return contains; } + @Override + public FilterPredicate visit(Size size) { + return size; + } + @Override public FilterPredicate visit(And and) { return and(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java index 506b8f0e56..6c7eed1e1d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java @@ -33,6 +33,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -98,6 +99,19 @@ public > FilterPredicate visit(Contains contains) { return contains.not(); } + @Override + public FilterPredicate visit(Size size) { + final int value = size.getValue(); + final Operators.Column column = size.getColumn(); + + return size.filter( + (eq) -> new Or(new Size(column, Size.Operator.LT, value), new Size(column, Size.Operator.GT, value)), + (lt) -> new Size(column, Size.Operator.GTE, value), + (ltEq) -> new Size(column, Size.Operator.GT, value), + (gt) -> new Size(column, Size.Operator.LTE, value), + (gtEq) -> new Size(column, Size.Operator.LT, value)); + } + @Override public FilterPredicate visit(And and) { return new Or(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 60dc80cd7b..81e0d976ed 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -505,6 +505,82 @@ public R filter( } } + public static final class Size implements FilterPredicate, Serializable { + public enum Operator { + EQ, + LT, + LTE, + GT, + GTE + } + + private final Column column; + private final Operator operator; + private final int value; + + Size(Column column, Operator operator, int value) { + this.column = column; + this.operator = operator; + if (value < 0 || (operator == Operator.LT && value == 0)) { + throw new IllegalArgumentException("Invalid predicate " + this + ": array size can never be negative"); + } + this.value = value; + } + + @Override + public R accept(Visitor visitor) { + return visitor.visit(this); + } + + public int getValue() { + return value; + } + + public Column getColumn() { + return column; + } + + public R filter( + Function onEq, + Function onLt, + Function onLtEq, + Function onGt, + Function onGtEq) { + if (operator == Operator.EQ) { + return onEq.apply(value); + } else if (operator == Operator.LT) { + return onLt.apply(value); + } else if (operator == Operator.LTE) { + return onLtEq.apply(value); + } else if (operator == Operator.GT) { + return onGt.apply(value); + } else if (operator == Operator.GTE) { + return onGtEq.apply(value); + } else { + throw new UnsupportedOperationException("Operator " + operator + " cannot be used with size() filter"); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + return column.equals(((Size) o).column) && operator == ((Size) o).operator && value == ((Size) o).value; + } + + @Override + public int hashCode() { + return Objects.hash(column, operator, value); + } + + @Override + public String toString() { + return "size(" + column.getColumnPath().toDotString() + " " + + operator.toString().toLowerCase() + " " + value + ")"; + } + } + public static final class NotIn> extends SetColumnFilterPredicate { NotIn(Column column, Set values) { diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java index b5708a4a0c..4b94d20b13 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -38,9 +38,11 @@ import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; /** * Inspects the column types found in the provided {@link FilterPredicate} and compares them @@ -135,6 +137,12 @@ public > Void visit(Contains pred) { return null; } + @Override + public Void visit(Size size) { + validateColumn(size.getColumn(), true, true); + return null; + } + @Override public Void visit(And and) { and.getLeft().accept(this); @@ -175,14 +183,15 @@ private > void validateColumnFilterPredicate(SetColumnFi } private > void validateColumnFilterPredicate(Contains pred) { - validateColumn(pred.getColumn(), true); + validateColumn(pred.getColumn(), true, false); } private > void validateColumn(Column column) { - validateColumn(column, false); + validateColumn(column, false, false); } - private > void validateColumn(Column column, boolean shouldBeRepeated) { + private > void validateColumn( + Column column, boolean isRepeatedColumn, boolean mustBeRequired) { ColumnPath path = column.getColumnPath(); Class alreadySeen = columnTypesEncountered.get(path); @@ -204,15 +213,21 @@ private > void validateColumn(Column column, boolean return; } - if (shouldBeRepeated && descriptor.getMaxRepetitionLevel() == 0) { + if (isRepeatedColumn && descriptor.getMaxRepetitionLevel() == 0) { throw new IllegalArgumentException( "FilterPredicate for column " + path.toDotString() + " requires a repeated " + "schema, but found max repetition level " + descriptor.getMaxRepetitionLevel()); - } else if (!shouldBeRepeated && descriptor.getMaxRepetitionLevel() > 0) { + } else if (!isRepeatedColumn && descriptor.getMaxRepetitionLevel() > 0) { throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. " + "Column " + path.toDotString() + " is repeated."); } + if (mustBeRequired && descriptor.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL)) { + throw new IllegalArgumentException("FilterPredicate for column " + path.toDotString() + + " requires schema to have repetition REQUIRED, but found " + + descriptor.getPrimitiveType().getRepetition() + "."); + } + ValidTypeMap.assertTypeValid(column, descriptor.getType()); } diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index c2aab2b6bf..0302121591 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Objects; +import java.util.function.Function; import org.apache.parquet.io.api.Binary; /** @@ -223,6 +224,83 @@ public void reset() { } } + class CountingValueInspector extends ValueInspector { + private int observedValueCount; + private final ValueInspector delegate; + + /** + * Triggering function to update the underlying delegate. We want to be careful not to trigger before + * all relevant column values have been considered. + * + * For example, given the predicate `size(col, LT, 3)` and a record with 4 array values, we don't want the + * underlying `lt(3)` predicate to be evaluated on the first or second elements of the array, since it would + * return a premature True value. + */ + private final Function shouldUpdateDelegate; + + public CountingValueInspector(ValueInspector delegate, Function shouldUpdateDelegate) { + this.observedValueCount = 0; + this.delegate = delegate; + this.shouldUpdateDelegate = shouldUpdateDelegate; + } + + @Override + public void updateNull() { + delegate.update(observedValueCount); + if (!delegate.isKnown()) { + delegate.updateNull(); + } + setResult(delegate.getResult()); + } + + @Override + public void update(int value) { + incrementCount(); + } + + @Override + public void update(long value) { + incrementCount(); + } + + @Override + public void update(double value) { + incrementCount(); + } + + @Override + public void update(float value) { + incrementCount(); + } + + @Override + public void update(boolean value) { + incrementCount(); + } + + @Override + public void update(Binary value) { + incrementCount(); + } + + @Override + public void reset() { + super.reset(); + delegate.reset(); + observedValueCount = 0; + } + + private void incrementCount() { + observedValueCount++; + if (!delegate.isKnown() && shouldUpdateDelegate.apply(observedValueCount)) { + delegate.update(observedValueCount); + if (delegate.isKnown()) { + setResult(delegate.getResult()); + } + } + } + } + // base class for and / or abstract static class BinaryLogical implements IncrementallyUpdatedFilterPredicate { private final IncrementallyUpdatedFilterPredicate left; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index 588d06300b..f1759cd09b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Map; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.Or; @@ -34,6 +36,8 @@ import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.PrimitiveColumnIO; import org.apache.parquet.schema.PrimitiveComparator; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; /** * The implementation of this abstract class is auto-generated by @@ -56,6 +60,8 @@ */ public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor { + static final Operators.IntColumn SIZE_PSUEDOCOLUMN = FilterApi.intColumn("$SIZE"); + private boolean built = false; private final Map> valueInspectorsByColumn = new HashMap<>(); private final Map> comparatorsByColumn = new HashMap<>(); @@ -70,6 +76,13 @@ public IncrementallyUpdatedFilterPredicateBuilderBase(List le PrimitiveComparator comparator = descriptor.getPrimitiveType().comparator(); comparatorsByColumn.put(path, comparator); } + comparatorsByColumn.put( + SIZE_PSUEDOCOLUMN.getColumnPath(), + new PrimitiveType( + Type.Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.INT32, + SIZE_PSUEDOCOLUMN.getColumnPath().toDotString()) + .comparator()); } public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { @@ -80,6 +93,11 @@ public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { } protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) { + if (columnPath.equals(SIZE_PSUEDOCOLUMN.getColumnPath())) { + // do not add psuedocolumn to list of value inspectors + return; + } + List valueInspectors = valueInspectorsByColumn.get(columnPath); if (valueInspectors == null) { valueInspectors = new ArrayList<>(); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index ffbb82197b..e23c2de138 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -53,6 +53,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; @@ -378,6 +379,73 @@ public > PrimitiveIterator.OfInt visit(Contains conta indices -> IndexIterator.all(getPageCount())); } + @Override + public PrimitiveIterator.OfInt visit(Size size) { + if (repLevelHistogram == null || defLevelHistogram == null) { + return IndexIterator.all(getPageCount()); + } + + final int[] repLevelOffsets = calculateOffsetsForHistogram(repLevelHistogram, nullPages); + final int[] defLevelOffsets = calculateOffsetsForHistogram(defLevelHistogram, nullPages); + + return IndexIterator.filter(getPageCount(), pageIndex -> { + final boolean isFinalPage = pageIndex + 1 == nullPages.length; + final List pageRepLevelHistogram = getRepetitionLevelHistogram() + .subList( + repLevelOffsets[pageIndex], + isFinalPage ? repLevelHistogram.length : repLevelOffsets[pageIndex + 1]); + final List pageDefLevelHistogram = getDefinitionLevelHistogram() + .subList( + defLevelOffsets[pageIndex], + isFinalPage ? defLevelHistogram.length : defLevelOffsets[pageIndex + 1]); + + if (pageRepLevelHistogram.isEmpty() || pageDefLevelHistogram.isEmpty()) { + // Page might match; cannot be filtered out + return true; + } + + final int defLevelCount = pageDefLevelHistogram.size(); + + // If all values have repetition level 0, then no array has more than 1 element + if (pageRepLevelHistogram.size() == 1 + || (pageRepLevelHistogram.get(0) > 0 + && pageRepLevelHistogram.subList(1, pageRepLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { + + if ( + // all lists are null or empty + (pageDefLevelHistogram.subList(1, defLevelCount).stream().allMatch(l -> l == 0))) { + return size.filter( + (eq) -> eq <= 0, (lt) -> true, (lte) -> true, (gt) -> gt < 0, (gte) -> gte <= 0); + } + + final int maxDefinitionLevel = defLevelCount - 1; + + // If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all + // lists are of size 1 + if (pageDefLevelHistogram.subList(0, maxDefinitionLevel - 1).stream() + .allMatch(l -> l == 0)) { + return size.filter( + (eq) -> eq == 1, (lt) -> lt > 1, (lte) -> lte >= 1, (gt) -> gt < 1, (gte) -> gte <= 1); + } + } + + final long nonNullElementCount = + pageRepLevelHistogram.stream().mapToLong(l -> l).sum() - pageDefLevelHistogram.get(0); + final long numNonNullRecords = pageRepLevelHistogram.get(0) - pageDefLevelHistogram.get(0); + + // Given the total number of elements and non-null fields, we can compute the max size of any array + // field + final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); + return size.filter( + (eq) -> eq <= maxArrayElementCount, + (lt) -> true, + (lte) -> true, + (gt) -> gt < maxArrayElementCount, + (gte) -> gte <= maxArrayElementCount); + }); + } + @Override public , U extends UserDefinedPredicate> PrimitiveIterator.OfInt visit( UserDefined udp) { @@ -438,6 +506,23 @@ public boolean test(int pageIndex) { } }); } + + // Calculates each page's starting offset in a concatenated histogram + private static int[] calculateOffsetsForHistogram(long[] histogram, boolean[] nullPages) { + final int numNullPages = + (int) BooleanList.of(nullPages).stream().filter(p -> p).count(); + final int numNonNullPages = nullPages.length - numNullPages; + final int numLevelsPerNonNullPage = (histogram.length - numNullPages) / numNonNullPages; + + int[] offsets = new int[nullPages.length]; + int currOffset = 0; + for (int i = 0; i < nullPages.length; ++i) { + offsets[i] = currOffset; + currOffset += (nullPages[i] ? 1 : numLevelsPerNonNullPage); + } + + return offsets; + } } private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() { diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 8b6ee1f95d..ad07f3df89 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -39,6 +39,7 @@ import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -161,6 +162,11 @@ public > RowRanges visit(Contains contains) { return contains.filter(this, RowRanges::intersection, RowRanges::union, ranges -> allRows()); } + @Override + public RowRanges visit(Size size) { + return applyPredicate(size.getColumn(), ci -> ci.visit(size), allRows()); + } + @Override public , U extends UserDefinedPredicate> RowRanges visit(UserDefined udp) { return applyPredicate( diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java index 9bb180087b..e5c5a77676 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java @@ -29,6 +29,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.Operators.NotEq; import static org.junit.Assert.assertEquals; @@ -139,6 +140,9 @@ public void testToString() { assertEquals( "or(contains(eq(a.string.column, Binary{\"foo\"})), and(contains(eq(a.string.column, Binary{\"bar\"})), not(contains(eq(a.string.column, Binary{\"baz\"})))))", pred.toString()); + + pred = size(binColumn, Operators.Size.Operator.GTE, 5); + assertEquals("size(a.string.column gte 5)", pred.toString()); } @Test diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java index 6a7f81a6c2..dde742bbc8 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java @@ -29,8 +29,10 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; +import static org.apache.parquet.filter2.predicate.Operators.Size.Operator; import static org.junit.Assert.assertEquals; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; @@ -87,5 +89,13 @@ public void testBaseCases() { @Test public void testComplex() { assertEquals(complexInverse, invert(complex)); + + assertEquals( + or(size(intColumn, Operator.LT, 5), size(intColumn, Operator.GT, 5)), + invert(size(intColumn, Operator.EQ, 5))); + assertEquals(size(intColumn, Operator.GTE, 5), invert(size(intColumn, Operator.LT, 5))); + assertEquals(size(intColumn, Operator.GT, 5), invert(size(intColumn, Operator.LTE, 5))); + assertEquals(size(intColumn, Operator.LTE, 5), invert(size(intColumn, Operator.GT, 5))); + assertEquals(size(intColumn, Operator.LT, 5), invert(size(intColumn, Operator.GTE, 5))); } } diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java index 47e9bdd5e6..8a44504adb 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java @@ -29,6 +29,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate; import static org.junit.Assert.assertEquals; @@ -47,6 +48,8 @@ public class TestSchemaCompatibilityValidator { private static final LongColumn longBar = longColumn("x.bar"); private static final IntColumn intBar = intColumn("x.bar"); private static final LongColumn lotsOfLongs = longColumn("lotsOfLongs"); + private static final BinaryColumn threeLevelList = binaryColumn("nestedGroup.threeLevelList.list.element"); + private static final BinaryColumn listOfOptionals = binaryColumn("listOfOptionals.list.element"); private static final String schemaString = "message Document {\n" + " required int32 a;\n" @@ -54,6 +57,18 @@ public class TestSchemaCompatibilityValidator { + " required binary c (UTF8);\n" + " required group x { required int32 bar; }\n" + " repeated int64 lotsOfLongs;\n" + + " optional group nestedGroup {\n" + + " required group threeLevelList (LIST) {\n" + + " repeated group list {\n" + + " required binary element (STRING);\n" + + " }\n" + + " }\n" + + " }\n" + + " required group listOfOptionals (LIST) {\n" + + " repeated group list {\n" + + " optional binary element (STRING);\n" + + " }\n" + + " }\n" + "}\n"; private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString); @@ -160,4 +175,32 @@ public void testNonRepeatedNotSupportedForContainsPredicates() { e.getMessage()); } } + + @Test + public void testSizePredicate() { + // Size predicate should succeed on repeated columns + try { + validate(size(lotsOfLongs, Operators.Size.Operator.LT, 10), schema); + validate(size(threeLevelList, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + fail("Valid repeated column predicates should not throw exceptions, but threw " + e); + } + + // Size predicate should fail on non-repeated columns and on non-optional list element types + try { + validate(size(intBar, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + assertEquals( + "FilterPredicate for column x.bar requires a repeated schema, but found max repetition level 0", + e.getMessage()); + } + + try { + validate(size(listOfOptionals, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + assertEquals( + "FilterPredicate for column listOfOptionals.list.element requires schema to have repetition REQUIRED, but found OPTIONAL.", + e.getMessage()); + } + } } diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java index 58a899eefc..1f2ba8550a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java @@ -36,6 +36,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; import static org.apache.parquet.schema.OriginalType.DECIMAL; @@ -56,6 +57,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -64,9 +66,11 @@ import java.util.List; import java.util.Set; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.ContainsRewriter; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; @@ -1627,6 +1631,99 @@ public void testNoOpBuilder() { assertNull(builder.build()); } + @Test + public void testSizeRequiredElements() { + final PrimitiveType type = Types.required(DOUBLE).named("element"); + final DoubleColumn col = doubleColumn(type.getName()); + + final List> pageValueList = new ArrayList<>(); + pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0)); + pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0, 4.0, 5.0)); + pageValueList.add(ImmutableList.of(-1.0)); + pageValueList.add(ImmutableList.of()); + pageValueList.add(null); + + final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList); + + assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder()); + assertCorrectNullCounts(columnIndex, 0, 0, 0, 0, 0); + assertCorrectNullPages(columnIndex, false, false, false, true, true); + assertCorrectValues(columnIndex.getMaxValues(), 3.0, 5.0, -1.0, null, null); + assertCorrectValues(columnIndex.getMinValues(), 1.0, 1.0, -1.0, null, null); + + // we know max array size is 5; all elements of page 2 have size 1; and page 3 and 4 are null or empty + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0, 1, 3, 4); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 4), 1); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 3), 0, 1); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 2), 0, 1, 2, 3, 4); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 1), 0, 1, 2, 3, 4); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0, 1, 2); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 0), 0, 1, 2, 3, 4); + } + + @Test + public void testSizeOptionalElements() { + final PrimitiveType type = Types.optional(DOUBLE).named("element"); + final DoubleColumn col = doubleColumn(type.getName()); + + final List listWithNulls = new ArrayList<>(); + listWithNulls.add(null); + listWithNulls.add(3.0); + listWithNulls.add(null); + + final List> pageValueList = new ArrayList<>(); + pageValueList.add(listWithNulls); + + final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList); + + assertCorrectNullCounts(columnIndex, 2); + assertCorrectNullPages(columnIndex, false); + assertCorrectValues(columnIndex.getMaxValues(), 3.0); + assertCorrectValues(columnIndex.getMinValues(), 3.0); + + // We know that the array values for the page have min size 0 and max size 3 + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 5)); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 4), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 3), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 3)); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 3), 0); + assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 4)); + } + + private static ColumnIndex createArrayColumnIndex(PrimitiveType type, List> pageValueList) { + final ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); + + for (List pageValues : pageValueList) { + final StatsBuilder sb = new StatsBuilder(); + boolean isNullOrEmpty = pageValues == null || pageValues.isEmpty(); + + final SizeStatistics.Builder sizeStatistics = + SizeStatistics.newBuilder(type, isNullOrEmpty ? 0 : 1, isNullOrEmpty ? 0 : 1); + + if (isNullOrEmpty) sizeStatistics.add(0, 0); + + if (pageValues != null) { + for (int i = 0; i < pageValues.size(); i++) { + if (i == 0) { + sizeStatistics.add(0, 1); + } else { + sizeStatistics.add(1, 1); + } + } + } + + if (pageValues == null) { + builder.add(sb.stats(type), sizeStatistics.build()); + } else { + builder.add(sb.stats(type, pageValues.toArray(new Double[0])), sizeStatistics.build()); + } + } + + return builder.build(); + } + private static List toBBList(Binary... values) { List buffers = new ArrayList<>(values.length); for (Binary value : values) { diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java index 1574ce2474..f9c4e24b21 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java @@ -35,6 +35,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING; @@ -62,6 +63,7 @@ import java.util.stream.LongStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -970,6 +972,13 @@ public void testFiltering() { 11, 12, 13); + assertRows( + calculateRowRanges( + FilterCompat.get(size(intColumn("column6"), Operators.Size.Operator.GT, 5)), + STORE, + paths, + TOTAL_ROW_COUNT), + LongStream.range(0, 30).toArray()); } @Test diff --git a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index 7f66ce3821..a2eb218fc2 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -71,6 +71,7 @@ public void run() throws IOException { + "import java.util.Set;\n" + "\n" + "import org.apache.parquet.hadoop.metadata.ColumnPath;\n" + + "import org.apache.parquet.filter2.predicate.FilterApi;\n" + "import org.apache.parquet.filter2.predicate.FilterPredicate;\n" + "import org.apache.parquet.filter2.predicate.Operators;\n" + "import org.apache.parquet.filter2.predicate.Operators.Contains;\n" @@ -83,6 +84,7 @@ public void run() throws IOException { + "import org.apache.parquet.filter2.predicate.Operators.LtEq;\n" + "import org.apache.parquet.filter2.predicate.Operators.NotEq;\n" + "import org.apache.parquet.filter2.predicate.Operators.NotIn;\n" + + "import org.apache.parquet.filter2.predicate.Operators.Size;\n" + "import org.apache.parquet.filter2.predicate.Operators.UserDefined;\n" + "import org.apache.parquet.filter2.predicate.UserDefinedPredicate;\n" + "import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;\n" @@ -133,6 +135,8 @@ public void run() throws IOException { addContainsEnd(); addVisitEnd(); + addSizeCase(); + addVisitBegin("Lt"); for (TypeInfo info : TYPES) { addInequalityCase(info, "<", false); @@ -338,6 +342,30 @@ private void addUdpBegin() throws IOException { + "\n"); } + private void addSizeCase() throws IOException { + add(" @Override\n" + " public IncrementallyUpdatedFilterPredicate visit(Size size) {\n" + + " final ValueInspector delegate = (ValueInspector) size.filter(\n" + + " (onEq) -> visit(FilterApi.eq(SIZE_PSUEDOCOLUMN, onEq)),\n" + + " (lt) -> visit(FilterApi.lt(SIZE_PSUEDOCOLUMN, lt)),\n" + + " (lte) -> visit(FilterApi.ltEq(SIZE_PSUEDOCOLUMN, lte)),\n" + + " (gt) -> visit(FilterApi.gt(SIZE_PSUEDOCOLUMN, gt)),\n" + + " (gte) -> visit(FilterApi.gtEq(SIZE_PSUEDOCOLUMN, gte)));\n" + + "\n" + + " final ValueInspector valueInspector = new IncrementallyUpdatedFilterPredicate.CountingValueInspector(\n" + + " delegate,\n" + + " size.filter(\n" + + " (eqValue) -> (count) -> count > eqValue,\n" + + " (ltValue) -> (count) -> count >= ltValue,\n" + + " (lteValue) -> (count) -> count > lteValue,\n" + + " (gtValue) -> (count) -> count > gtValue,\n" + + " (gteValue) -> (count) -> count >= gteValue)\n" + + " );\n" + + "\n" + + " addValueInspector(size.getColumn().getColumnPath(), valueInspector);\n" + + " return valueInspector;\n" + + " }\n"); + } + private void addContainsInspectorVisitor(String op) throws IOException { add(" @Override\n" + " public > ContainsPredicate visit(" + op + " pred) {\n" @@ -499,6 +527,10 @@ private void addContainsBegin() throws IOException { + " @Override\n" + " public > ContainsPredicate visit(Contains contains) {\n" + " return contains.filter(this, ContainsAndPredicate::new, ContainsOrPredicate::new, ContainsPredicate::not);\n" + + " }\n" + "\n" + + " @Override\n" + + " public ContainsPredicate visit(Size size) {\n" + + " throw new UnsupportedOperationException(\"Unsupported predicate \" + size + \" cannot be used with contains()\");\n" + " }\n"); addContainsInspectorVisitor("Eq"); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java index 39babc0ac3..4ceb6c7bb3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java @@ -124,6 +124,11 @@ public > Boolean visit(Operators.Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Operators.Size size) { + return BLOCK_MIGHT_MATCH; + } + @Override public > Boolean visit(Operators.In in) { Set values = in.getValues(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index be4455eeba..e2eea2ef5e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -48,6 +48,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -493,6 +494,39 @@ public > Boolean visit(Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Size size) { + ColumnChunkMetaData meta = getColumnChunk(size.getColumn().getColumnPath()); + + if (meta == null) { + // the column isn't in this file, so fail eq/gt/gte targeting size > 0 + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + try { + // We know the block has at least as many array elements as the dictionary sizes + final Set dict = expandDictionary(meta); + + // If the column doesn't have a dictionary encoding, we can't infer anything about its size + if (dict == null || dict.isEmpty()) { + return BLOCK_MIGHT_MATCH; + } + + // Column has at least (nonempty) dict.size() values spread across over all records; + // predicates that match empty arrays cannot match + final boolean blockCannotMatch = + size.filter((eq) -> eq == 0, (lt) -> lt == 1, (lte) -> lte <= 1, (gt) -> false, (gte) -> false); + + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + @Override public Boolean visit(And and) { return and.getLeft().accept(this) || and.getRight().accept(this); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index 4d7918c4f1..07d04fdb8a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Set; import org.apache.parquet.column.MinMax; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.Operators.And; @@ -40,6 +41,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -217,6 +219,71 @@ public > Boolean visit(Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + /** + * Logically equivalent to {@link org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder}, + * but for block granularity + **/ + @Override + public Boolean visit(Size size) { + final ColumnChunkMetaData metadata = getColumnChunk(size.getColumn().getColumnPath()); + if (metadata == null) { + // the column isn't in this file, so fail eq/gt/gte targeting size > 0 + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + final SizeStatistics stats = metadata.getSizeStatistics(); + final List repetitionLevelHistogram = stats.getRepetitionLevelHistogram(); + final List definitionLevelHistogram = stats.getDefinitionLevelHistogram(); + + if (repetitionLevelHistogram.isEmpty() || definitionLevelHistogram.isEmpty()) { + return BLOCK_MIGHT_MATCH; + } + + // If all values have repetition level 0, then no array has more than 1 element + if (repetitionLevelHistogram.size() == 1 + || (repetitionLevelHistogram.get(0) > 0 + && repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { + + // All lists are null or empty + if ((definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { + + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + final int maxDefinitionLevel = definitionLevelHistogram.size() - 1; + + // If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all lists + // are of size 1 + if (definitionLevelHistogram.subList(0, maxDefinitionLevel - 1).stream() + .allMatch(l -> l == 0)) { + final boolean blockCannotMatch = size.filter( + (eq) -> eq != 1, (lt) -> lt <= 1, (lte) -> lte < 1, (gt) -> gt >= 1, (gte) -> gte > 1); + + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + } + final long nonNullElementCount = + repetitionLevelHistogram.stream().mapToLong(l -> l).sum() - definitionLevelHistogram.get(0); + final long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0); + + // Given the total number of elements and non-null fields, we can compute the max size of any array field + final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); + final boolean blockCannotMatch = size.filter( + (eq) -> eq > maxArrayElementCount, + (lt) -> false, + (lte) -> false, + (gt) -> gt >= maxArrayElementCount, + (gte) -> gte > maxArrayElementCount); + + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + @Override @SuppressWarnings("unchecked") public > Boolean visit(NotEq notEq) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 5b9e638d60..9adaa420c9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -39,6 +39,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; @@ -115,6 +116,7 @@ public class DictionaryFilterTest { + "required binary fallback_binary_field; " + "required int96 int96_field; " + "repeated binary repeated_binary_field;" + + "repeated binary repeated_binary_field_high_cardinality;" // high cardinality, no dict encoding produced + "} "); private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; @@ -196,6 +198,10 @@ private static void writeData(SimpleGroupFactory f, ParquetWriter writer) group.append("optional_single_value_field", "sharp"); } + for (char letter : ALPHABET.toCharArray()) { + group = group.append("repeated_binary_field_high_cardinality", String.valueOf(letter)); + } + writer.write(group); } writer.close(); @@ -217,6 +223,7 @@ private static void prepareFile(WriterVersion version, Path file) throws IOExcep .withRowGroupSize(1024 * 1024) .withPageSize(1024) .enableDictionaryEncoding() + .withDictionaryEncoding("repeated_binary_field_high_cardinality", false) .withDictionaryPageSize(2 * 1024) .withConf(conf) .build(); @@ -506,6 +513,42 @@ public void testGtEqDouble() throws Exception { "Should not drop: contains matching values", canDrop(gtEq(d, Double.MIN_VALUE), ccmd, dictionaries)); } + @Test + public void testSizeBinary() throws Exception { + // repeated_binary_field dict has 26 distinct values + final BinaryColumn b = binaryColumn("repeated_binary_field"); + + // DictionaryFilter infers that col `repeated_binary_field` has >= 26 values spread across row group + assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); + + assertFalse(canDrop(size(b, Operators.Size.Operator.EQ, 30), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.GTE, 1), ccmd, dictionaries)); + + // If column doesn't exist in meta, it has no values and can be treated as having size 0 + final BinaryColumn nonExistentColumn = binaryColumn("nonexistant_col"); + + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GTE, 1), ccmd, dictionaries)); + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.EQ, 1), ccmd, dictionaries)); + + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); + + // If column exists but doesn't have a dict, we cannot infer anything about its size + final BinaryColumn noDictColumn = binaryColumn("repeated_binary_field_high_cardinality"); + + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.GTE, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.EQ, 1), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(noDictColumn, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); + } + @Test public void testInBinary() throws Exception { BinaryColumn b = binaryColumn("binary_field"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 1a1a31e73c..2c1d1145d9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -33,7 +33,9 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableMap; @@ -50,6 +52,7 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; import org.apache.parquet.filter2.predicate.Operators.LongColumn; @@ -60,6 +63,7 @@ import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User; import org.apache.parquet.io.api.Binary; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; public class TestRecordLevelFilters { @@ -159,10 +163,9 @@ private static void assertFilter(List found, UserFilter f) { private static void assertPredicate(FilterPredicate predicate, long... expectedIds) throws IOException { List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(predicate)); - assertEquals(expectedIds.length, found.size()); - for (int i = 0; i < expectedIds.length; i++) { - assertEquals(expectedIds[i], found.get(i).getLong("id", 0)); - } + assertArrayEquals( + Arrays.stream(expectedIds).boxed().toArray(), + found.stream().map(f -> f.getLong("id", 0)).toArray(Long[]::new)); } @Test @@ -410,6 +413,50 @@ public void testArrayContainsMixedColumns() throws Exception { 30L); } + @Test + public void testArraySizeRequiredColumn() throws Exception { + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 2), 27L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 4), 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.GT, 1), 27L, 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.GTE, 4), 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 0), 17L, 18L, 19L); + + assertPredicate( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.LT, 2), + LongStream.concat(LongStream.of(17L, 18L, 19L, 20L, 30L, 31L), LongStream.range(100, 200)) + .toArray()); + + assertPredicate( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.LTE, 2), + LongStream.concat(LongStream.of(17L, 18L, 19L, 20L, 27L, 30L, 31L), LongStream.range(100, 200)) + .toArray()); + + assertPredicate( + not(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 1)), + 17L, + 18L, + 19L, + 27L, + 28L); + + assertPredicate( + and( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 0), + size(binaryColumn("accounts.key_value.key"), Operators.Size.Operator.GT, 1)), + 17L, + 19L); + } + + @Ignore(value = "Not yet supported") + @Test + public void testArraySizeOptionalColumn() throws Exception { + assertPredicate(size(binaryColumn("phoneNumbers.phone.kind"), Operators.Size.Operator.EQ, 4), 28L); + } + @Test public void testNameNotNull() throws Exception { BinaryColumn name = binaryColumn("name"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index 15d0a8ab13..0c1d0f6550 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -33,21 +33,37 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.statisticslevel.StatisticsFilter.canDrop; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.io.api.Binary.fromString; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.statistics.DoubleStatistics; import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.LogicalInverseRewriter; import org.apache.parquet.filter2.predicate.Operators; @@ -56,10 +72,19 @@ import org.apache.parquet.filter2.predicate.Operators.IntColumn; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Types; import org.junit.Test; @@ -68,17 +93,27 @@ public class TestStatisticsFilter { private static ColumnChunkMetaData getIntColumnMeta( org.apache.parquet.column.statistics.Statistics stats, long valueCount) { + return getIntColumnMeta(ColumnPath.get("int", "column"), stats, null, valueCount); + } + + private static ColumnChunkMetaData getIntColumnMeta( + ColumnPath columnPath, + org.apache.parquet.column.statistics.Statistics stats, + SizeStatistics sizeStatistics, + long valueCount) { return ColumnChunkMetaData.get( - ColumnPath.get("int", "column"), - PrimitiveTypeName.INT32, + columnPath, + new PrimitiveType(REQUIRED, INT32, columnPath.toDotString()), CompressionCodecName.GZIP, + null, new HashSet(Arrays.asList(Encoding.PLAIN)), stats, 0L, 0L, valueCount, 0L, - 0L); + 0L, + sizeStatistics); } private static ColumnChunkMetaData getDoubleColumnMeta( @@ -100,6 +135,7 @@ private static ColumnChunkMetaData getDoubleColumnMeta( private static final DoubleColumn doubleColumn = doubleColumn("double.column"); private static final BinaryColumn missingColumn = binaryColumn("missing"); private static final IntColumn missingColumn2 = intColumn("missing.int"); + private static final IntColumn nestedListColumn = intColumn("nestedGroup.listField.element"); private static final IntStatistics intStats = new IntStatistics(); private static final IntStatistics nullIntStats = new IntStatistics(); @@ -435,6 +471,256 @@ public void testOr() { assertFalse(canDrop(or(no, no), columnMetas)); } + @Test + public void testSizeFilterRequiredGroupRequiredElements() throws Exception { + for (int nestingLevel = 0; nestingLevel < 3; nestingLevel++) { + final String nestingPrefix = IntStream.range(0, nestingLevel) + .mapToObj(l -> "NestedGroup" + l) + .collect(Collectors.joining(".")); + final IntColumn columnName = + intColumn(((nestingPrefix.isEmpty() ? "" : nestingPrefix + ".") + "nestedGroup.listField.element")); + + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: Lists are populated + List columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + true, + ImmutableList.of( + ImmutableList.of(1, 2, 3), + ImmutableList.of(1), + ImmutableList.of(1, 2, 3), + ImmutableList.of())), + 4)); + + // SizeStats tells us that there are 7 total array elements spread across 3 non-empty list_fields, + // so the max size any single list_field could have is 5 + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: All lists are empty + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + true, + ImmutableList.of(ImmutableList.of(), ImmutableList.of(), ImmutableList.of())), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 0), columnMeta)); + + // Case 3: all lists have size 1 + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, true, ImmutableList.of(ImmutableList.of(1), ImmutableList.of(1))), + 2)); + + // We know that records have max array size 1 + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 2), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 2), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 2), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 1), columnMeta)); + } + } + + @Test + public void testSizeFilterRequiredGroupOptionalElements() throws Exception { + for (int nestingLevel = 0; nestingLevel < 3; nestingLevel++) { + final String nestingPrefix = IntStream.range(0, nestingLevel) + .mapToObj(l -> "NestedGroup" + l) + .collect(Collectors.joining(".")); + final IntColumn columnName = + intColumn(((nestingPrefix.isEmpty() ? "" : nestingPrefix + ".") + "nestedGroup.listField.element")); + + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-empty + List listWithNulls = new ArrayList<>(); + listWithNulls.add(1); + listWithNulls.add(null); + listWithNulls.add(null); + List columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + true, + ImmutableList.of( + listWithNulls, ImmutableList.of(1), ImmutableList.of(1, 2, 3), ImmutableList.of())), + 4)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + } + } + + @Test + public void testSizeFilterOptionalGroup() throws Exception { + for (int nestingLevel = 0; nestingLevel < 3; nestingLevel++) { + final String nestingPrefix = IntStream.range(0, nestingLevel) + .mapToObj(l -> "NestedGroup" + l) + .collect(Collectors.joining(".")); + final IntColumn columnName = + intColumn(((nestingPrefix.isEmpty() ? "" : nestingPrefix + ".") + "nestedGroup.listField.element")); + + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-null + List columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, + false, + ImmutableList.of( + ImmutableList.of(1, 2, 3), ImmutableList.of(1), ImmutableList.of(1, 2, 3))), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GT, 2), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: Lists are null + final List> listWithNull = new ArrayList<>(); + listWithNull.add(null); + listWithNull.add(null); + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField(nestingLevel, true, listWithNull), + 2)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 0), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GTE, 0), columnMeta)); + + // Case 3: lists are empty + columnMeta = Collections.singletonList(getIntColumnMeta( + columnName.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + nestingLevel, true, ImmutableList.of(ImmutableList.of(), ImmutableList.of())), + 2)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(columnName, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(columnName, Operators.Size.Operator.GTE, 0), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(columnName, Operators.Size.Operator.EQ, 0), columnMeta)); + } + } + + private static SizeStatistics createSizeStatisticsForRepeatedField( + int nestingLevel, boolean arrayGroupRequired, List> arrayValues) throws Exception { + + GroupType groupSchema = Types.requiredGroup() + .addField((arrayGroupRequired ? Types.requiredGroup() : Types.optionalGroup()) + .as(LogicalTypeAnnotation.listType()) + .addField(Types.repeatedGroup() + .addField(Types.primitive(INT32, REQUIRED).named("element")) + .named("list")) + .named("listField")) + .named("nestedGroup"); + + for (int i = 0; i < nestingLevel; i++) { + groupSchema = Types.requiredGroup().addField(groupSchema).named("NestedGroup" + i); + } + + final MessageType messageSchema = + Types.buildMessage().addField(groupSchema).named("MyRecord"); + + // Write data + final File tmp = File.createTempFile(TestStatisticsFilter.class.getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + final Path file = new Path(tmp.getPath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, messageSchema.toString()) + .build()) { + + for (List arrayValue : arrayValues) { + final SimpleGroup record = new SimpleGroup(messageSchema); + + Group group = record; + for (int i = nestingLevel - 1; i >= 0; i--) { + group = group.addGroup("NestedGroup" + i); + } + + final Group nestedGroup = group.addGroup("nestedGroup"); + if (arrayValue != null) { + Group listField = nestedGroup.addGroup("listField"); + for (Integer value : arrayValue) { + Group list = listField.addGroup("list"); + if (value != null) { + list.append("element", value); + } + } + } + + writer.write(record); + } + } + + // Read size statistics + final ParquetMetadata footer = readFooter(new Configuration(), file, NO_FILTER); + assert (footer.getBlocks().size() == 1); + final BlockMetaData blockMetaData = footer.getBlocks().get(0); + assert (blockMetaData.getColumns().size() == 1); + return blockMetaData.getColumns().get(0).getSizeStatistics(); + } + public static class SevensAndEightsUdp extends UserDefinedPredicate { @Override diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java index 59e4aff2de..3fdcd7dd72 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java @@ -18,26 +18,38 @@ */ package org.apache.parquet.statistics; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -96,6 +108,125 @@ public void testBinaryColumnSizeStatistics() throws IOException { } } + private static final List> REPEATED_RECORD_VALUES = ImmutableList.of( + ImmutableList.of(1, 2, 3), ImmutableList.of(1), ImmutableList.of(1, 2), ImmutableList.of()); + + private static final MessageType MESSAGE_SCHEMA_REPEATED_FIELD = Types.buildMessage() + .addField(Types.requiredGroup() + .as(LogicalTypeAnnotation.listType()) + .addField(Types.repeatedGroup() + .addField(Types.primitive(INT32, REQUIRED).named("element")) + .named("list")) + .named("my_list")) + .named("MyRecord"); + + @Test + public void testSizeStatsWrittenWithExampleWriter() throws Exception { + final File tmp = File.createTempFile(TestSizeStatisticsRoundTrip.class.getSimpleName(), ".parquet"); + tmp.deleteOnExit(); + tmp.delete(); + final Path file = new Path(tmp.getPath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, MESSAGE_SCHEMA_REPEATED_FIELD.toString()) + .build()) { + + for (List arrayValue : REPEATED_RECORD_VALUES) { + final SimpleGroup record = new SimpleGroup(MESSAGE_SCHEMA_REPEATED_FIELD); + + Group listField = record.addGroup("my_list"); + for (Integer value : arrayValue) { + Group list = listField.addGroup("list"); + if (value != null) { + list.append("element", value); + } + } + writer.write(record); + } + } + + final SizeStatistics stats = getSizeStatisticsFromFile(file); + + Assert.assertEquals(ImmutableList.of(1L, 6L), stats.getDefinitionLevelHistogram()); + Assert.assertEquals(ImmutableList.of(4L, 3L), stats.getRepetitionLevelHistogram()); + } + + @Test + public void testSizeStatsWrittenWithRecordConsumer() throws Exception { + final File tmp = File.createTempFile(TestSizeStatisticsRoundTrip.class.getSimpleName(), ".parquet"); + tmp.deleteOnExit(); + tmp.delete(); + final Path file = new Path(tmp.getPath()); + + ParquetWriter> writer = new ParquetWriter<>(file, new WriteSupport>() { + RecordConsumer rc = null; + + @Override + public WriteSupport.WriteContext init(Configuration configuration) { + return init((ParquetConfiguration) null); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { + return new WriteContext(MESSAGE_SCHEMA_REPEATED_FIELD, new HashMap<>()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.rc = recordConsumer; + } + + @Override + public void write(List arrayValue) { + rc.startMessage(); + + rc.startField("my_list", 0); + rc.startGroup(); + if (arrayValue != null) { + for (int i = 0; i < arrayValue.size(); i++) { + rc.startField("list", 0); + rc.startGroup(); + + rc.startField("element", 0); + rc.addInteger(arrayValue.get(i)); + rc.endField("element", 0); + + rc.endGroup(); + rc.endField("list", 0); + } + } + rc.endGroup(); + rc.endField("my_list", 0); + rc.endMessage(); + } + }); + + for (List recordArrayValue : REPEATED_RECORD_VALUES) { + writer.write(recordArrayValue); + } + + writer.close(); + + final SizeStatistics stats = getSizeStatisticsFromFile(file); + + // Assert that these records have the same rep- and def-level histograms as the ExampleParquetWriter test + + // this assertion passes + Assert.assertEquals(ImmutableList.of(1L, 6L), stats.getDefinitionLevelHistogram()); + + // this assertion FAILS, actual repetition list is [7, 0] + Assert.assertEquals(ImmutableList.of(4L, 3L), stats.getRepetitionLevelHistogram()); + } + + private static SizeStatistics getSizeStatisticsFromFile(Path file) throws IOException { + final ParquetMetadata footer = + ParquetFileReader.readFooter(new Configuration(), file, ParquetMetadataConverter.NO_FILTER); + assert (footer.getBlocks().size() == 1); + final BlockMetaData blockMetaData = footer.getBlocks().get(0); + assert (blockMetaData.getColumns().size() == 1); + return blockMetaData.getColumns().get(0).getSizeStatistics(); + } + private Path newTempPath() throws IOException { File file = temp.newFile(); Preconditions.checkArgument(file.delete(), "Could not remove temp file");