Skip to content

Commit 4bd1fb8

Browse files
authored
Flink: DynamicSink: Report writer records/bytes send metrics (#14878)
1 parent 3048d77 commit 4bd1fb8

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ public void write(DynamicRecordInternal element, Context context)
145145
return taskWriterFactory.create();
146146
})
147147
.write(element.rowData());
148+
metrics.mainMetricsGroup().getNumRecordsSendCounter().inc();
148149
}
149150

150151
@Override

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,37 @@
1818
*/
1919
package org.apache.iceberg.flink.sink.dynamic;
2020

21+
import java.util.Arrays;
2122
import java.util.Map;
22-
import org.apache.flink.metrics.MetricGroup;
23+
import java.util.function.ToLongFunction;
24+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
25+
import org.apache.iceberg.ContentFile;
26+
import org.apache.iceberg.DataFile;
27+
import org.apache.iceberg.DeleteFile;
2328
import org.apache.iceberg.flink.sink.IcebergStreamWriterMetrics;
2429
import org.apache.iceberg.io.WriteResult;
2530
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
31+
import org.apache.iceberg.util.ScanTaskUtil;
2632

2733
class DynamicWriterMetrics {
2834

2935
private final Map<String, IcebergStreamWriterMetrics> metrics;
30-
private final MetricGroup mainMetricsGroup;
36+
private final SinkWriterMetricGroup mainMetricsGroup;
3137

32-
DynamicWriterMetrics(MetricGroup mainMetricsGroup) {
38+
DynamicWriterMetrics(SinkWriterMetricGroup mainMetricsGroup) {
3339
this.mainMetricsGroup = mainMetricsGroup;
3440
this.metrics = Maps.newHashMap();
3541
}
3642

43+
SinkWriterMetricGroup mainMetricsGroup() {
44+
return this.mainMetricsGroup;
45+
}
46+
3747
public void updateFlushResult(String fullTableName, WriteResult result) {
3848
writerMetrics(fullTableName).updateFlushResult(result);
49+
50+
long bytesOutTotal = sum(result.dataFiles()) + sum(result.deleteFiles());
51+
this.mainMetricsGroup.getNumBytesSendCounter().inc(bytesOutTotal);
3952
}
4053

4154
public void flushDuration(String fullTableName, long flushDurationMs) {
@@ -46,4 +59,16 @@ IcebergStreamWriterMetrics writerMetrics(String fullTableName) {
4659
return metrics.computeIfAbsent(
4760
fullTableName, tableName -> new IcebergStreamWriterMetrics(mainMetricsGroup, tableName));
4861
}
62+
63+
private static long sum(DataFile[] files) {
64+
return sum(files, DataFile::fileSizeInBytes);
65+
}
66+
67+
private static long sum(DeleteFile[] files) {
68+
return sum(files, ScanTaskUtil::contentSizeInBytes);
69+
}
70+
71+
private static <T extends ContentFile<T>> long sum(T[] files, ToLongFunction<T> sizeExtractor) {
72+
return Arrays.stream(files).mapToLong(sizeExtractor).sum();
73+
}
4974
}

flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void testUniqueFileSuffixOnFactoryRecreation() throws Exception {
243243
1024L,
244244
properties,
245245
100,
246-
new DynamicWriterMetrics(new UnregisteredMetricsGroup()),
246+
new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
247247
0,
248248
0);
249249
return dynamicWriter;

0 commit comments

Comments
 (0)