From 64f368295352f7ca6eb49fcd9a312892f7f68a4b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 14 Nov 2025 14:23:04 -0500 Subject: [PATCH 1/2] Improve error message for decoding input stream --- sdks/python/apache_beam/runners/worker/bundle_processor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 4094fd1d8058..5c2d75f246f5 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -234,9 +234,11 @@ def process_encoded(self, encoded_windowed_values: bytes) -> None: decoded_value = self.windowed_coder_impl.decode_from_stream( input_stream, True) except Exception as exn: + coder = str(self.windowed_coder) + step = self.name_context.step_name raise ValueError( - "Error decoding input stream with coder " + - str(self.windowed_coder)) from exn + f"Error decoding input stream with coder ${coder} in step ${step}" + ) from exn self.output(decoded_value) def monitoring_infos( From ff85067ed682032c3a22dd7524b34e2ef00d493e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 14 Nov 2025 14:23:54 -0500 Subject: [PATCH 2/2] Fix string interpolation in error message --- sdks/python/apache_beam/runners/worker/bundle_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 5c2d75f246f5..faa756d7c5c5 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -237,7 +237,7 @@ def process_encoded(self, encoded_windowed_values: bytes) -> None: coder = str(self.windowed_coder) step = self.name_context.step_name raise ValueError( - f"Error decoding input stream with coder ${coder} in step ${step}" + f"Error decoding input stream with coder {coder} in step {step}" ) from exn self.output(decoded_value)