From b4660a8da644037f0b5720f9394620e99e5c23b6 Mon Sep 17 00:00:00 2001 From: praneetnadella Date: Thu, 6 Nov 2025 00:14:16 +0000 Subject: [PATCH 1/4] changes to uses path_normalization with file_interceptor in CONFIG --- .../internal/cloudpickle_pickler.py | 7 ++-- .../internal/cloudpickle_pickler_test.py | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index eebba178e7c3..2920c4c2d8db 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -37,13 +37,14 @@ from apache_beam.internal import code_object_pickler from apache_beam.internal.cloudpickle import cloudpickle +from apache_beam.internal.code_object_pickler import get_normalized_path DEFAULT_CONFIG = cloudpickle.CloudPickleConfig( - skip_reset_dynamic_type_state=True) + skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path) NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig( - id_generator=None, skip_reset_dynamic_type_state=True) + id_generator=None, skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path) STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig( - skip_reset_dynamic_type_state=True, + skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path, get_code_object_params=cloudpickle.GetCodeObjectParams( get_code_object_identifier=code_object_pickler. get_code_object_identifier, diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index b63ebd6c7109..621bbc877220 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -19,11 +19,14 @@ # pytype: skip-file +import os import threading import types import unittest from apache_beam.coders import proto2_coder_test_messages_pb2 +from apache_beam.internal import cloudpickle_pickler as beam_cloudpickle +from apache_beam.internal import code_object_pickler from apache_beam.internal import module_test from apache_beam.internal.cloudpickle_pickler import dumps from apache_beam.internal.cloudpickle_pickler import loads @@ -220,6 +223,36 @@ def test_best_effort_determinism_not_implemented(self): 'Ignoring unsupported option: enable_best_effort_determinism', '\n'.join(l.output)) + @unittest.mock.patch.object( + code_object_pickler, + 'get_normalized_path', + wraps=code_object_pickler.get_normalized_path) + def test_default_config_interceptor(self, mock_get_normalized_path): + """Tests config.filepath_interceptor is called for CodeType pickling.""" + + def sample_func(): + return "Beam" + + code_obj = sample_func.__code__ + original_filename = os.path.abspath(code_obj.co_filename) + + try: + pickled_code = beam_cloudpickle.dumps(code_obj) + unpickled_code = beam_cloudpickle.loads(pickled_code) + + mock_get_normalized_path.assert_called() + + unpickled_filename = os.path.abspath(unpickled_code.co_filename) + self.assertEqual(unpickled_filename, original_filename) + + except AttributeError as e: + if 'get_code_object_params' in str(e): + self.fail( + "Vendored cloudpickle BUG: AttributeError 'get_code_object_params' " + f"raised during CodeType pickling. Error: {e}") + else: + raise + if __name__ == '__main__': unittest.main() From 9fdcba85332895868136bb6ab6c5b1b38285bb27 Mon Sep 17 00:00:00 2001 From: praneetnadella Date: Thu, 6 Nov 2025 18:38:46 +0000 Subject: [PATCH 2/4] pylint, pyformat and originally pushed wrong test cases (fixed) --- .../internal/cloudpickle_pickler.py | 8 +++---- .../internal/cloudpickle_pickler_test.py | 21 ++++++++++++------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 2920c4c2d8db..41d79a80ab16 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -40,11 +40,11 @@ from apache_beam.internal.code_object_pickler import get_normalized_path DEFAULT_CONFIG = cloudpickle.CloudPickleConfig( - skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path) -NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig( - id_generator=None, skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path) + skip_reset_dynamic_type_state=True, + filepath_interceptor=get_normalized_path) STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig( - skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path, + skip_reset_dynamic_type_state=True, + filepath_interceptor=get_normalized_path, get_code_object_params=cloudpickle.GetCodeObjectParams( get_code_object_identifier=code_object_pickler. get_code_object_identifier, diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index 621bbc877220..2cfeb0ef334d 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -31,6 +31,7 @@ from apache_beam.internal.cloudpickle_pickler import dumps from apache_beam.internal.cloudpickle_pickler import loads from apache_beam.utils import shared +from unittest import mock GLOBAL_DICT_REF = module_test.GLOBAL_DICT @@ -223,12 +224,14 @@ def test_best_effort_determinism_not_implemented(self): 'Ignoring unsupported option: enable_best_effort_determinism', '\n'.join(l.output)) - @unittest.mock.patch.object( - code_object_pickler, - 'get_normalized_path', - wraps=code_object_pickler.get_normalized_path) - def test_default_config_interceptor(self, mock_get_normalized_path): + @mock.patch.object( + beam_cloudpickle.DEFAULT_CONFIG, 'filepath_interceptor', autospec=True + ) + def test_default_config_interceptor(self, mock_filepath_interceptor): """Tests config.filepath_interceptor is called for CodeType pickling.""" + mock_filepath_interceptor.side_effect = ( + code_object_pickler.get_normalized_path + ) def sample_func(): return "Beam" @@ -240,7 +243,7 @@ def sample_func(): pickled_code = beam_cloudpickle.dumps(code_obj) unpickled_code = beam_cloudpickle.loads(pickled_code) - mock_get_normalized_path.assert_called() + mock_filepath_interceptor.assert_called() unpickled_filename = os.path.abspath(unpickled_code.co_filename) self.assertEqual(unpickled_filename, original_filename) @@ -248,8 +251,10 @@ def sample_func(): except AttributeError as e: if 'get_code_object_params' in str(e): self.fail( - "Vendored cloudpickle BUG: AttributeError 'get_code_object_params' " - f"raised during CodeType pickling. Error: {e}") + "Vendored cloudpickle BUG: AttributeError " + f"'get_code_object_params' raised during CodeType pickling. " + f"Error: {e}" + ) else: raise From 9192f18830cb1b7f2d958d82c2b988532bea7094 Mon Sep 17 00:00:00 2001 From: praneetnadella Date: Thu, 6 Nov 2025 19:21:23 +0000 Subject: [PATCH 3/4] pyformatter / pylint fixes --- .../internal/cloudpickle_pickler_test.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index 2cfeb0ef334d..35046d5a2d5c 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -23,6 +23,7 @@ import threading import types import unittest +from unittest import mock from apache_beam.coders import proto2_coder_test_messages_pb2 from apache_beam.internal import cloudpickle_pickler as beam_cloudpickle @@ -31,7 +32,6 @@ from apache_beam.internal.cloudpickle_pickler import dumps from apache_beam.internal.cloudpickle_pickler import loads from apache_beam.utils import shared -from unittest import mock GLOBAL_DICT_REF = module_test.GLOBAL_DICT @@ -225,13 +225,11 @@ def test_best_effort_determinism_not_implemented(self): '\n'.join(l.output)) @mock.patch.object( - beam_cloudpickle.DEFAULT_CONFIG, 'filepath_interceptor', autospec=True - ) + beam_cloudpickle.DEFAULT_CONFIG, 'filepath_interceptor', autospec=True) def test_default_config_interceptor(self, mock_filepath_interceptor): """Tests config.filepath_interceptor is called for CodeType pickling.""" mock_filepath_interceptor.side_effect = ( - code_object_pickler.get_normalized_path - ) + code_object_pickler.get_normalized_path) def sample_func(): return "Beam" @@ -251,10 +249,9 @@ def sample_func(): except AttributeError as e: if 'get_code_object_params' in str(e): self.fail( - "Vendored cloudpickle BUG: AttributeError " - f"'get_code_object_params' raised during CodeType pickling. " - f"Error: {e}" - ) + "Vendored cloudpickle BUG: AttributeError " + f"'get_code_object_params' raised during CodeType pickling. " + f"Error: {e}") else: raise From a2a1690cf4aa32c806941d8d2b5f6cad2b3efa8e Mon Sep 17 00:00:00 2001 From: praneetnadella Date: Thu, 6 Nov 2025 22:41:30 +0000 Subject: [PATCH 4/4] clean up unused part of cloudpickle_pickler_test --- .../internal/cloudpickle_pickler_test.py | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index 35046d5a2d5c..4a51c56c24be 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -236,24 +236,13 @@ def sample_func(): code_obj = sample_func.__code__ original_filename = os.path.abspath(code_obj.co_filename) + pickled_code = beam_cloudpickle.dumps(code_obj) + unpickled_code = beam_cloudpickle.loads(pickled_code) - try: - pickled_code = beam_cloudpickle.dumps(code_obj) - unpickled_code = beam_cloudpickle.loads(pickled_code) + mock_filepath_interceptor.assert_called() - mock_filepath_interceptor.assert_called() - - unpickled_filename = os.path.abspath(unpickled_code.co_filename) - self.assertEqual(unpickled_filename, original_filename) - - except AttributeError as e: - if 'get_code_object_params' in str(e): - self.fail( - "Vendored cloudpickle BUG: AttributeError " - f"'get_code_object_params' raised during CodeType pickling. " - f"Error: {e}") - else: - raise + unpickled_filename = os.path.abspath(unpickled_code.co_filename) + self.assertEqual(unpickled_filename, original_filename) if __name__ == '__main__':