diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md b/docs/content/primary-key-table/merge-engine/aggregation.md index ed845eebfbc5..fd8414ca1d43 100644 --- a/docs/content/primary-key-table/merge-engine/aggregation.md +++ b/docs/content/primary-key-table/merge-engine/aggregation.md @@ -376,6 +376,11 @@ public static class BitmapContainsUDF extends ScalarFunction { {{< /tabs >}} +### nested_partial_update + The nested_partial_update function collects multiple rows into one array (so-called 'nested table'). It supports + ARRAY data types. You need to use `fields..nested-key=pk0,pk1,...` to specify the primary keys of the + nested table. The values in each row are written by partial updating some columns. + ### collect The collect function collects elements into an Array. You can set `fields..distinct=true` to deduplicate elements. It only supports ARRAY type. diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java new file mode 100644 index 000000000000..d8e058e83972 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.aggregate; + +import org.apache.paimon.codegen.Projection; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalRow.FieldGetter; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.codegen.CodeGenUtils.newProjection; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Used to partial update a field which representing a nested table. The data type of nested table + * field is {@code ARRAY}. + */ +public class FieldNestedPartialUpdateAgg extends FieldAggregator { + + private static final long serialVersionUID = 1L; + + private final int nestedFields; + private final Projection keyProjection; + private final FieldGetter[] fieldGetters; + + public FieldNestedPartialUpdateAgg(String name, ArrayType dataType, List nestedKey) { + super(name, dataType); + RowType nestedType = (RowType) dataType.getElementType(); + this.nestedFields = nestedType.getFieldCount(); + checkArgument(!nestedKey.isEmpty()); + this.keyProjection = newProjection(nestedType, nestedKey); + this.fieldGetters = new FieldGetter[nestedFields]; + for (int i = 0; i < nestedFields; i++) { + fieldGetters[i] = InternalRow.createFieldGetter(nestedType.getTypeAt(i), i); + } + } + + @Override + public Object agg(Object accumulator, Object inputField) { + if (accumulator == null || inputField == null) { + return accumulator == null ? inputField : accumulator; + } + + InternalArray acc = (InternalArray) accumulator; + InternalArray input = (InternalArray) inputField; + + List rows = new ArrayList<>(acc.size() + input.size()); + addNonNullRows(acc, rows); + addNonNullRows(input, rows); + + if (keyProjection != null) { + Map map = new HashMap<>(); + for (InternalRow row : rows) { + BinaryRow key = keyProjection.apply(row).copy(); + GenericRow toUpdate = map.computeIfAbsent(key, k -> new GenericRow(nestedFields)); + partialUpdate(toUpdate, row); + } + + rows = new ArrayList<>(map.values()); + } + + return new GenericArray(rows.toArray()); + } + + private void addNonNullRows(InternalArray array, List rows) { + for (int i = 0; i < array.size(); i++) { + if (array.isNullAt(i)) { + continue; + } + rows.add(array.getRow(i, nestedFields)); + } + } + + private void partialUpdate(GenericRow toUpdate, InternalRow input) { + for (int i = 0; i < fieldGetters.length; i++) { + FieldGetter fieldGetter = fieldGetters[i]; + Object field = fieldGetter.getFieldOrNull(input); + if (field != null) { + toUpdate.setField(i, field); + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java new file mode 100644 index 000000000000..80b899a8b2bc --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.aggregate.factory; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.mergetree.compact.aggregate.FieldNestedPartialUpdateAgg; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; + +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Factory for #{@link FieldNestedPartialUpdateAgg}. */ +public class FieldNestedPartialUpdateAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "nested_partial_update"; + + @Override + public FieldNestedPartialUpdateAgg create( + DataType fieldType, CoreOptions options, String field) { + return createFieldNestedPartialUpdateAgg( + fieldType, options.fieldNestedUpdateAggNestedKey(field)); + } + + @Override + public String identifier() { + return NAME; + } + + private FieldNestedPartialUpdateAgg createFieldNestedPartialUpdateAgg( + DataType fieldType, List nestedKey) { + checkArgument(!nestedKey.isEmpty()); + String typeErrorMsg = + "Data type for nested table column must be 'Array' but was '%s'."; + checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType); + ArrayType arrayType = (ArrayType) fieldType; + checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType); + return new FieldNestedPartialUpdateAgg(identifier(), arrayType, nestedKey); + } +} diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 7f221e517248..3bfb3480de1b 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -30,6 +30,7 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory +org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 0b838c360929..8541954be827 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -40,6 +40,7 @@ import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory; @@ -626,7 +627,7 @@ private GenericArray singletonArray(InternalRow row) { return new GenericArray(new InternalRow[] {row}); } - private InternalRow row(int k0, int k1, String v) { + private InternalRow row(Integer k0, Integer k1, String v) { return GenericRow.of(k0, k1, BinaryString.fromString(v)); } @@ -1154,6 +1155,48 @@ public void testCustomAgg() throws IOException { assertThat(agg).isEqualTo("test"); } + @Test + public void testFieldNestedPartialUpdateAgg() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k", DataTypes.INT()), + DataTypes.FIELD(1, "v1", DataTypes.INT()), + DataTypes.FIELD(2, "v2", DataTypes.STRING())); + FieldNestedPartialUpdateAgg agg = + new FieldNestedPartialUpdateAgg( + FieldNestedPartialUpdateAggFactory.NAME, + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.FIELD(0, "k", DataTypes.INT()), + DataTypes.FIELD(1, "v1", DataTypes.INT()), + DataTypes.FIELD(2, "v2", DataTypes.STRING()))), + Collections.singletonList("k")); + + InternalArray accumulator; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + InternalRow current = row(0, 0, null); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, null, "A"); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 0, "A"))); + + current = row(0, 1, "B"); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B"))); + + current = row(1, 2, "C"); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(1, 2, "C"))); + } + private Map toMap(Object... kvs) { Map result = new HashMap<>(); for (int i = 0; i < kvs.length; i += 2) {