Skip to content

Commit bdeba3f

Browse files
committed
PARQUET-34: Implement page-level Size filter
1 parent 2e8ba22 commit bdeba3f

File tree

4 files changed

+237
-21
lines changed

4 files changed

+237
-21
lines changed

parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,69 @@ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Contains<T> conta
381381

382382
@Override
383383
public PrimitiveIterator.OfInt visit(Size size) {
384-
return IndexIterator.all(getPageCount());
384+
if (repLevelHistogram == null || defLevelHistogram == null) {
385+
return IndexIterator.all(getPageCount());
386+
}
387+
388+
final int[] repLevelOffsets = calculateOffsetsForHistogram(repLevelHistogram, nullPages);
389+
final int[] defLevelOffsets = calculateOffsetsForHistogram(defLevelHistogram, nullPages);
390+
391+
return IndexIterator.filter(getPageCount(), pageIndex -> {
392+
final boolean isFinalPage = pageIndex + 1 == nullPages.length;
393+
final List<Long> pageRepLevelHistogram = getRepetitionLevelHistogram()
394+
.subList(
395+
repLevelOffsets[pageIndex],
396+
isFinalPage ? repLevelHistogram.length : repLevelOffsets[pageIndex + 1]);
397+
final List<Long> pageDefLevelHistogram = getDefinitionLevelHistogram()
398+
.subList(
399+
defLevelOffsets[pageIndex],
400+
isFinalPage ? defLevelHistogram.length : defLevelOffsets[pageIndex + 1]);
401+
402+
if (pageRepLevelHistogram.isEmpty() || pageDefLevelHistogram.isEmpty()) {
403+
// Page might match; cannot be filtered out
404+
return true;
405+
}
406+
407+
final int defLevelCount = pageDefLevelHistogram.size();
408+
409+
// If all values have repetition level 0, then no array has more than 1 element
410+
if (pageRepLevelHistogram.size() == 1
411+
|| (pageRepLevelHistogram.get(0) > 0
412+
&& pageRepLevelHistogram.subList(1, pageRepLevelHistogram.size()).stream()
413+
.allMatch(l -> l == 0))) {
414+
415+
if (
416+
// all lists are null or empty
417+
(pageDefLevelHistogram.subList(1, defLevelCount).stream().allMatch(l -> l == 0))) {
418+
return size.filter(
419+
(eq) -> eq <= 0, (lt) -> true, (lte) -> true, (gt) -> gt < 0, (gte) -> gte <= 0);
420+
}
421+
422+
final int maxDefinitionLevel = defLevelCount - 1;
423+
424+
// If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all
425+
// lists are of size 1
426+
if (pageDefLevelHistogram.subList(0, maxDefinitionLevel - 1).stream()
427+
.allMatch(l -> l == 0)) {
428+
return size.filter(
429+
(eq) -> eq == 1, (lt) -> lt > 1, (lte) -> lte >= 1, (gt) -> gt < 1, (gte) -> gte <= 1);
430+
}
431+
}
432+
433+
final long nonNullElementCount =
434+
pageRepLevelHistogram.stream().mapToLong(l -> l).sum() - pageDefLevelHistogram.get(0);
435+
final long numNonNullRecords = pageRepLevelHistogram.get(0) - pageDefLevelHistogram.get(0);
436+
437+
// Given the total number of elements and non-null fields, we can compute the max size of any array
438+
// field
439+
final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords);
440+
return size.filter(
441+
(eq) -> eq <= maxArrayElementCount,
442+
(lt) -> true,
443+
(lte) -> true,
444+
(gt) -> gt < maxArrayElementCount,
445+
(gte) -> gte <= maxArrayElementCount);
446+
});
385447
}
386448

387449
@Override
@@ -444,6 +506,23 @@ public boolean test(int pageIndex) {
444506
}
445507
});
446508
}
509+
510+
// Calculates each page's starting offset in a concatenated histogram
511+
private static int[] calculateOffsetsForHistogram(long[] histogram, boolean[] nullPages) {
512+
final int numNullPages =
513+
(int) BooleanList.of(nullPages).stream().filter(p -> p).count();
514+
final int numNonNullPages = nullPages.length - numNullPages;
515+
final int numLevelsPerNonNullPage = (histogram.length - numNullPages) / numNonNullPages;
516+
517+
int[] offsets = new int[nullPages.length];
518+
int currOffset = 0;
519+
for (int i = 0; i < nullPages.length; ++i) {
520+
offsets[i] = currOffset;
521+
currOffset += (nullPages[i] ? 1 : numLevelsPerNonNullPage);
522+
}
523+
524+
return offsets;
525+
}
447526
}
448527

449528
private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() {

parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
3737
import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
3838
import static org.apache.parquet.filter2.predicate.FilterApi.or;
39+
import static org.apache.parquet.filter2.predicate.FilterApi.size;
3940
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
4041
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
4142
import static org.apache.parquet.schema.OriginalType.DECIMAL;
@@ -56,6 +57,7 @@
5657
import static org.junit.Assert.assertTrue;
5758
import static org.junit.Assert.fail;
5859

60+
import com.google.common.collect.ImmutableList;
5961
import java.math.BigDecimal;
6062
import java.nio.ByteBuffer;
6163
import java.util.ArrayList;
@@ -64,9 +66,11 @@
6466
import java.util.List;
6567
import java.util.Set;
6668
import org.apache.parquet.bytes.BytesUtils;
69+
import org.apache.parquet.column.statistics.SizeStatistics;
6770
import org.apache.parquet.column.statistics.Statistics;
6871
import org.apache.parquet.filter2.predicate.ContainsRewriter;
6972
import org.apache.parquet.filter2.predicate.FilterPredicate;
73+
import org.apache.parquet.filter2.predicate.Operators;
7074
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
7175
import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
7276
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
@@ -1627,6 +1631,99 @@ public void testNoOpBuilder() {
16271631
assertNull(builder.build());
16281632
}
16291633

1634+
@Test
1635+
public void testSizeRequiredElements() {
1636+
final PrimitiveType type = Types.required(DOUBLE).named("element");
1637+
final DoubleColumn col = doubleColumn(type.getName());
1638+
1639+
final List<List<Double>> pageValueList = new ArrayList<>();
1640+
pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0));
1641+
pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0, 4.0, 5.0));
1642+
pageValueList.add(ImmutableList.of(-1.0));
1643+
pageValueList.add(ImmutableList.of());
1644+
pageValueList.add(null);
1645+
1646+
final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList);
1647+
1648+
assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
1649+
assertCorrectNullCounts(columnIndex, 0, 0, 0, 0, 0);
1650+
assertCorrectNullPages(columnIndex, false, false, false, true, true);
1651+
assertCorrectValues(columnIndex.getMaxValues(), 3.0, 5.0, -1.0, null, null);
1652+
assertCorrectValues(columnIndex.getMinValues(), 1.0, 1.0, -1.0, null, null);
1653+
1654+
// we know max array size is 5; all elements of page 2 have size 1; and page 3 and 4 are null or empty
1655+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0, 1, 3, 4);
1656+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 4), 1);
1657+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 3), 0, 1);
1658+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 2), 0, 1, 2, 3, 4);
1659+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 1), 0, 1, 2, 3, 4);
1660+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0, 1, 2);
1661+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 0), 0, 1, 2, 3, 4);
1662+
}
1663+
1664+
@Test
1665+
public void testSizeOptionalElements() {
1666+
final PrimitiveType type = Types.optional(DOUBLE).named("element");
1667+
final DoubleColumn col = doubleColumn(type.getName());
1668+
1669+
final List<Double> listWithNulls = new ArrayList<>();
1670+
listWithNulls.add(null);
1671+
listWithNulls.add(3.0);
1672+
listWithNulls.add(null);
1673+
1674+
final List<List<Double>> pageValueList = new ArrayList<>();
1675+
pageValueList.add(listWithNulls);
1676+
1677+
final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList);
1678+
1679+
assertCorrectNullCounts(columnIndex, 2);
1680+
assertCorrectNullPages(columnIndex, false);
1681+
assertCorrectValues(columnIndex.getMaxValues(), 3.0);
1682+
assertCorrectValues(columnIndex.getMinValues(), 3.0);
1683+
1684+
// We know that the array values for the page have min size 0 and max size 3
1685+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0);
1686+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 5));
1687+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 4), 0);
1688+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 3), 0);
1689+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0);
1690+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 3));
1691+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 3), 0);
1692+
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 4));
1693+
}
1694+
1695+
private static ColumnIndex createArrayColumnIndex(PrimitiveType type, List<List<Double>> pageValueList) {
1696+
final ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
1697+
1698+
for (List<Double> pageValues : pageValueList) {
1699+
final StatsBuilder sb = new StatsBuilder();
1700+
boolean isNullOrEmpty = pageValues == null || pageValues.isEmpty();
1701+
1702+
final SizeStatistics.Builder sizeStatistics =
1703+
SizeStatistics.newBuilder(type, isNullOrEmpty ? 0 : 1, isNullOrEmpty ? 0 : 1);
1704+
1705+
if (isNullOrEmpty) sizeStatistics.add(0, 0);
1706+
1707+
if (pageValues != null) {
1708+
for (int i = 0; i < pageValues.size(); i++) {
1709+
if (i == 0) {
1710+
sizeStatistics.add(0, 1);
1711+
} else {
1712+
sizeStatistics.add(1, 1);
1713+
}
1714+
}
1715+
}
1716+
1717+
if (pageValues == null) {
1718+
builder.add(sb.stats(type), sizeStatistics.build());
1719+
} else {
1720+
builder.add(sb.stats(type, pageValues.toArray(new Double[0])), sizeStatistics.build());
1721+
}
1722+
}
1723+
1724+
return builder.build();
1725+
}
1726+
16301727
private static List<ByteBuffer> toBBList(Binary... values) {
16311728
List<ByteBuffer> buffers = new ArrayList<>(values.length);
16321729
for (Binary value : values) {

parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ public <T extends Comparable<T>> Boolean visit(Contains<T> contains) {
219219
return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH);
220220
}
221221

222+
/**
223+
* Logically equivalent to {@link org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder},
224+
* but for block granularity
225+
**/
222226
@Override
223227
public Boolean visit(Size size) {
224228
final ColumnChunkMetaData metadata = getColumnChunk(size.getColumn().getColumnPath());
@@ -239,40 +243,37 @@ public Boolean visit(Size size) {
239243

240244
// If all values have repetition level 0, then no array has more than 1 element
241245
if (repetitionLevelHistogram.size() == 1
242-
|| repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream()
243-
.allMatch(l -> l == 0)) {
244-
245-
// Null list fields are treated as having size 0
246-
if (( // all lists are nulls
247-
definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream()
248-
.allMatch(l -> l == 0))
249-
|| // all lists are size 0
250-
(definitionLevelHistogram.get(0) == 0
251-
&& definitionLevelHistogram.subList(2, definitionLevelHistogram.size()).stream()
252-
.allMatch(l -> l == 0))) {
246+
|| (repetitionLevelHistogram.get(0) > 0
247+
&& repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream()
248+
.allMatch(l -> l == 0))) {
249+
250+
// All lists are null or empty
251+
if ((definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream()
252+
.allMatch(l -> l == 0))) {
253253

254254
final boolean blockCannotMatch =
255255
size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0);
256256
return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH;
257257
}
258258

259-
long maxDefinitionLevel = definitionLevelHistogram.get(definitionLevelHistogram.size() - 1);
259+
final int maxDefinitionLevel = definitionLevelHistogram.size() - 1;
260260

261-
// If all repetition levels are zero and all definitions level are > MAX_DEFINITION_LEVEL - 1, all lists
261+
// If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all lists
262262
// are of size 1
263-
if (definitionLevelHistogram.stream().allMatch(l -> l > maxDefinitionLevel - 1)) {
263+
if (definitionLevelHistogram.subList(0, maxDefinitionLevel - 1).stream()
264+
.allMatch(l -> l == 0)) {
264265
final boolean blockCannotMatch = size.filter(
265266
(eq) -> eq != 1, (lt) -> lt <= 1, (lte) -> lte < 1, (gt) -> gt >= 1, (gte) -> gte > 1);
266267

267268
return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH;
268269
}
269270
}
270-
long nonNullElementCount =
271+
final long nonNullElementCount =
271272
repetitionLevelHistogram.stream().mapToLong(l -> l).sum() - definitionLevelHistogram.get(0);
272-
long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0);
273+
final long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0);
273274

274275
// Given the total number of elements and non-null fields, we can compute the max size of any array field
275-
long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords);
276+
final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords);
276277
final boolean blockCannotMatch = size.filter(
277278
(eq) -> eq > maxArrayElementCount,
278279
(lt) -> false,

parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,23 @@ public void testSizeFilterRequiredGroupRequiredElements() throws Exception {
513513
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
514514
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta));
515515
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta));
516+
517+
// Case 3: all lists have size 1
518+
columnMeta = Collections.singletonList(getIntColumnMeta(
519+
nestedListColumn.getColumnPath(),
520+
minMaxStats,
521+
createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(1), ImmutableList.of(1))),
522+
2));
523+
524+
// We know that records have max array size 1
525+
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 2), columnMeta));
526+
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 1), columnMeta));
527+
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 2), columnMeta));
528+
529+
// These predicates should not be able to filter out the page
530+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 2), columnMeta));
531+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
532+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 1), columnMeta));
516533
}
517534

518535
@Test
@@ -563,25 +580,47 @@ public void testSizeFilterOptionalGroup() throws Exception {
563580
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta));
564581

565582
// These predicates should not be able to filter out the page
583+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 3), columnMeta));
584+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 2), columnMeta));
566585
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta));
567586
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta));
568587
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta));
569588

570-
// Case 2: List is null
589+
// Case 2: Lists are null
571590
final List<List<Integer>> listWithNull = new ArrayList<>();
572591
listWithNull.add(null);
592+
listWithNull.add(null);
573593
columnMeta = Collections.singletonList(getIntColumnMeta(
574594
nestedListColumn.getColumnPath(),
575595
minMaxStats,
576596
createSizeStatisticsForRepeatedField(true, listWithNull),
577-
5));
597+
2));
598+
599+
// These predicates should be able to filter out the page
600+
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta));
601+
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta));
602+
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta));
603+
604+
// These predicates should not be able to filter out the page
605+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
606+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta));
607+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta));
608+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta));
609+
610+
// Case 3: lists are empty
611+
columnMeta = Collections.singletonList(getIntColumnMeta(
612+
nestedListColumn.getColumnPath(),
613+
minMaxStats,
614+
createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(), ImmutableList.of())),
615+
2));
578616

579617
// These predicates should be able to filter out the page
580618
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta));
581619
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta));
582620
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta));
583621

584622
// These predicates should not be able to filter out the page
623+
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta));
585624
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
586625
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta));
587626
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta));
@@ -615,7 +654,6 @@ private static SizeStatistics createSizeStatisticsForRepeatedField(
615654
for (List<Integer> arrayValue : arrayValues) {
616655
final SimpleGroup record = new SimpleGroup(messageSchema);
617656
final Group nestedGroup = record.addGroup("nestedGroup");
618-
619657
if (arrayValue != null) {
620658
Group listField = nestedGroup.addGroup("listField");
621659
for (Integer value : arrayValue) {
@@ -635,6 +673,7 @@ private static SizeStatistics createSizeStatisticsForRepeatedField(
635673
assert (footer.getBlocks().size() == 1);
636674
final BlockMetaData blockMetaData = footer.getBlocks().get(0);
637675
assert (blockMetaData.getColumns().size() == 1);
676+
System.out.println("Stats= " + blockMetaData.getColumns().get(0).getSizeStatistics());
638677
return blockMetaData.getColumns().get(0).getSizeStatistics();
639678
}
640679

0 commit comments

Comments
 (0)