From 0141d586361d159dd775bbc7c78165a1e98f6666 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 4 Nov 2025 15:16:06 -0500 Subject: [PATCH 1/2] Fix a distribution metric problem when count is zero. --- .../org/apache/beam/model/pipeline/v1/metrics.proto | 10 +++++++++- .../beam/runners/prism/internal/jobservices/metrics.go | 6 ++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index d5951c23c10e..a43b54da24ef 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -457,7 +457,7 @@ message MonitoringInfo { SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }]; SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }]; SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }]; - // Label which if has a "true" value indicates that the metric is intended + // Label which if has a "true" value indicates that the metric is intended // to be aggregated per-worker. PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }]; } @@ -517,6 +517,10 @@ message MonitoringInfoTypeUrns { // - sum: beam:coder:varint:v1 // - min: beam:coder:varint:v1 // - max: beam:coder:varint:v1 + // + // Note that when count is zero, the SDK may or may not send sum, min, and + // max in the response. If those fields are included in the payload, runners + // should ignore them. DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:distribution_int64:v1"]; @@ -531,6 +535,10 @@ message MonitoringInfoTypeUrns { // - sum: beam:coder:double:v1 // - min: beam:coder:double:v1 // - max: beam:coder:double:v1 + // + // Note that when count is zero, the SDK may or may not send sum, min, and + // max in the response. If those fields are included in the payload, runners + // should ignore them. DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:distribution_double:v1"]; diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index bbbdfd1eba4f..12d935815461 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -326,6 +326,12 @@ func (m *distributionInt64) accumulate(pyld []byte) error { if dist.Count, err = coder.DecodeVarInt(buf); err != nil { return err } + if dist.Count == 0 { + // When there is no elements reported, the payload may contain the values + // for count, sum, min and max, or it may contain only one 0x00 byte for + // count. No matter what, we will skip aggregation in this case. + return nil + } if dist.Sum, err = coder.DecodeVarInt(buf); err != nil { return err } From 4e6f2c4e9433a7820e3e3213e70e47647ad5efa5 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 4 Nov 2025 16:00:36 -0500 Subject: [PATCH 2/2] Revise the notes. --- .../org/apache/beam/model/pipeline/v1/metrics.proto | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index a43b54da24ef..fcce35394b91 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -518,9 +518,9 @@ message MonitoringInfoTypeUrns { // - min: beam:coder:varint:v1 // - max: beam:coder:varint:v1 // - // Note that when count is zero, the SDK may or may not send sum, min, and - // max in the response. If those fields are included in the payload, runners - // should ignore them. + // Note that when count is zero, the SDK may not send sum, min, and max in + // the response. If those fields are included in the payload, runners should + // omit them. DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:distribution_int64:v1"]; @@ -536,9 +536,9 @@ message MonitoringInfoTypeUrns { // - min: beam:coder:double:v1 // - max: beam:coder:double:v1 // - // Note that when count is zero, the SDK may or may not send sum, min, and - // max in the response. If those fields are included in the payload, runners - // should ignore them. + // Note that when count is zero, the SDK may not send sum, min, and max in + // the response. If those fields are included in the payload, runners should + // omit them. DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:distribution_double:v1"];