Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/content/primary-key-table/merge-engine/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,11 @@ public static class BitmapContainsUDF extends ScalarFunction {

{{< /tabs >}}

### nested_partial_update
The nested_partial_update function collects multiple rows into one array<row> (so-called 'nested table'). It supports
ARRAY<ROW> data types. You need to use `fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys of the
nested table. The values in each row are written by partial updating some columns.

### collect
The collect function collects elements into an Array. You can set `fields.<field-name>.distinct=true` to deduplicate elements.
It only supports ARRAY type.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.RowType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.codegen.CodeGenUtils.newProjection;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Used to partial update a field which representing a nested table. The data type of nested table
* field is {@code ARRAY<ROW>}.
*/
public class FieldNestedPartialUpdateAgg extends FieldAggregator {

private static final long serialVersionUID = 1L;

private final int nestedFields;
private final Projection keyProjection;
private final FieldGetter[] fieldGetters;

public FieldNestedPartialUpdateAgg(String name, ArrayType dataType, List<String> nestedKey) {
super(name, dataType);
RowType nestedType = (RowType) dataType.getElementType();
this.nestedFields = nestedType.getFieldCount();
checkArgument(!nestedKey.isEmpty());
this.keyProjection = newProjection(nestedType, nestedKey);
this.fieldGetters = new FieldGetter[nestedFields];
for (int i = 0; i < nestedFields; i++) {
fieldGetters[i] = InternalRow.createFieldGetter(nestedType.getTypeAt(i), i);
}
}

@Override
public Object agg(Object accumulator, Object inputField) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a null check for the key?

if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}

InternalArray acc = (InternalArray) accumulator;
InternalArray input = (InternalArray) inputField;

List<InternalRow> rows = new ArrayList<>(acc.size() + input.size());
addNonNullRows(acc, rows);
addNonNullRows(input, rows);

if (keyProjection != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when keyProjection is null ?

Map<BinaryRow, GenericRow> map = new HashMap<>();
for (InternalRow row : rows) {
BinaryRow key = keyProjection.apply(row).copy();
GenericRow toUpdate = map.computeIfAbsent(key, k -> new GenericRow(nestedFields));
partialUpdate(toUpdate, row);
}

rows = new ArrayList<>(map.values());
}

return new GenericArray(rows.toArray());
}

private void addNonNullRows(InternalArray array, List<InternalRow> rows) {
for (int i = 0; i < array.size(); i++) {
if (array.isNullAt(i)) {
continue;
}
rows.add(array.getRow(i, nestedFields));
}
}

private void partialUpdate(GenericRow toUpdate, InternalRow input) {
for (int i = 0; i < fieldGetters.length; i++) {
FieldGetter fieldGetter = fieldGetters[i];
Object field = fieldGetter.getFieldOrNull(input);
if (field != null) {
toUpdate.setField(i, field);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate.factory;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.mergetree.compact.aggregate.FieldNestedPartialUpdateAgg;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Factory for #{@link FieldNestedPartialUpdateAgg}. */
public class FieldNestedPartialUpdateAggFactory implements FieldAggregatorFactory {

public static final String NAME = "nested_partial_update";

@Override
public FieldNestedPartialUpdateAgg create(
DataType fieldType, CoreOptions options, String field) {
return createFieldNestedPartialUpdateAgg(
fieldType, options.fieldNestedUpdateAggNestedKey(field));
}

@Override
public String identifier() {
return NAME;
}

private FieldNestedPartialUpdateAgg createFieldNestedPartialUpdateAgg(
DataType fieldType, List<String> nestedKey) {
checkArgument(!nestedKey.isEmpty());
String typeErrorMsg =
"Data type for nested table column must be 'Array<Row>' but was '%s'.";
checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType);
ArrayType arrayType = (ArrayType) fieldType;
checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType);
return new FieldNestedPartialUpdateAgg(identifier(), arrayType, nestedKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory;
Expand Down Expand Up @@ -626,7 +627,7 @@ private GenericArray singletonArray(InternalRow row) {
return new GenericArray(new InternalRow[] {row});
}

private InternalRow row(int k0, int k1, String v) {
private InternalRow row(Integer k0, Integer k1, String v) {
return GenericRow.of(k0, k1, BinaryString.fromString(v));
}

Expand Down Expand Up @@ -1154,6 +1155,48 @@ public void testCustomAgg() throws IOException {
assertThat(agg).isEqualTo("test");
}

@Test
public void testFieldNestedPartialUpdateAgg() {
DataType elementRowType =
DataTypes.ROW(
DataTypes.FIELD(0, "k", DataTypes.INT()),
DataTypes.FIELD(1, "v1", DataTypes.INT()),
DataTypes.FIELD(2, "v2", DataTypes.STRING()));
FieldNestedPartialUpdateAgg agg =
new FieldNestedPartialUpdateAgg(
FieldNestedPartialUpdateAggFactory.NAME,
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD(0, "k", DataTypes.INT()),
DataTypes.FIELD(1, "v1", DataTypes.INT()),
DataTypes.FIELD(2, "v2", DataTypes.STRING()))),
Collections.singletonList("k"));

InternalArray accumulator;
InternalArray.ElementGetter elementGetter =
InternalArray.createElementGetter(elementRowType);

InternalRow current = row(0, 0, null);
accumulator = (InternalArray) agg.agg(null, singletonArray(current));
assertThat(unnest(accumulator, elementGetter))
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(current));

current = row(0, null, "A");
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
assertThat(unnest(accumulator, elementGetter))
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 0, "A")));

current = row(0, 1, "B");
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
assertThat(unnest(accumulator, elementGetter))
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));

current = row(1, 2, "C");
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
assertThat(unnest(accumulator, elementGetter))
.containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(1, 2, "C")));
}

private Map<Object, Object> toMap(Object... kvs) {
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < kvs.length; i += 2) {
Expand Down