From de0e10320c1a10c567674c8ab9da52310fdec086 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 3 Nov 2025 10:56:55 -0500 Subject: [PATCH 1/6] feat(typehints): add Python 3.12 TypeAliasType support Handle Python 3.12's new type alias statements by unwrapping TypeAliasType to its underlying value in type conversion and pickling. This ensures compatibility with Beam's type checking and serialization for Python 3.12+. --- .../internal/cloudpickle/cloudpickle.py | 19 +++++++++++ .../apache_beam/transforms/ptransform_test.py | 32 +++++++++++++++++++ .../typehints/native_type_compatibility.py | 14 ++++++++ 3 files changed, 65 insertions(+) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index ab066b954b66..23fae0666de9 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -169,6 +169,17 @@ class CloudPickleConfig: DEFAULT_CONFIG = CloudPickleConfig() + +# Minimal helper to return the provided object during unpickling. +def _return_obj(obj): + return obj + + +# Optional import for Python 3.12 TypeAliasType +try: # pragma: no cover - dependent on Python version + from typing import TypeAliasType # type: ignore[attr-defined] +except Exception: + TypeAliasType = None # type: ignore[assignment] builtin_code_type = None if PYPY: # builtin-code objects only exist in pypy @@ -1535,6 +1546,14 @@ def reducer_override(self, obj): return _class_reduce(obj, self.config) elif isinstance(obj, typing.TypeVar): # Add this check return _typevar_reduce(obj, self.config) + elif TypeAliasType is not None and isinstance(obj, TypeAliasType): + # Unwrap typing.TypeAliasType to its underlying value to make pickling + # robust for locally-defined `type` aliases (Python 3.12+). + underlying = getattr(obj, '__value__', None) + if underlying is not None: + return (_return_obj, (underlying, )) + # Fallback to default behavior if no underlying value. + return NotImplemented elif isinstance(obj, types.CodeType): return _code_reduce(obj, self.config) elif isinstance(obj, types.FunctionType): diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index ea736dceddb1..600f098a5e38 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -25,6 +25,7 @@ import pickle import random import re +import sys import typing import unittest from functools import reduce @@ -2910,6 +2911,37 @@ def test_threshold(self): use_subprocess=self.use_subprocess)) +class PTransformTypeAliasTest(unittest.TestCase): + @unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required") + def test_type_alias_statement_supported_in_with_output_types(self): + ns = {} + exec("type InputType = tuple[int, ...]", ns) + InputType = ns["InputType"] + + def print_element(element: InputType) -> InputType: + return element + + with beam.Pipeline() as p: + _ = ( + p + | beam.Create([(1, 2)]) + | beam.Map(lambda x: x) + | beam.Map(print_element)) + + @unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required") + def test_type_alias_supported_in_ptransform_with_output_types(self): + ns = {} + exec("type OutputType = tuple[int, int]", ns) + OutputType = ns["OutputType"] + + with beam.Pipeline() as p: + _ = ( + p + | beam.Create([(1, 2)]) + | beam.Map(lambda x: x) + | beam.Map(lambda x: x).with_output_types(OutputType)) + + class TestPTransformFn(TypeHintTestCase): def test_type_checking_fail(self): @beam.ptransform_fn diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index b6bf6d37fe02..a958858f0f8e 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -35,6 +35,12 @@ except ImportError: from typing_extensions import is_typeddict +# Python 3.12 adds TypeAliasType for `type` statements; keep optional import. +try: + from typing import TypeAliasType # type: ignore[attr-defined] +except Exception: # pragma: no cover - pre-3.12 + TypeAliasType = None # type: ignore[assignment] + T = TypeVar('T') _LOGGER = logging.getLogger(__name__) @@ -332,6 +338,14 @@ def convert_to_beam_type(typ): sys.version_info.minor >= 10) and (isinstance(typ, types.UnionType)): typ = typing.Union[typ] + # Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying value. + # This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]). + if sys.version_info >= (3, 12) and TypeAliasType is not None: + if isinstance(typ, TypeAliasType): # pylint: disable=isinstance-second-argument-not-valid-type + underlying = getattr(typ, '__value__', None) + if underlying is not None: + typ = underlying + if getattr(typ, '__module__', None) == 'typing': typ = convert_typing_to_builtin(typ) From aa7879d12cefb6d9db2a660e5404ae1cc57ef6e2 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 3 Nov 2025 11:40:56 -0500 Subject: [PATCH 2/6] ingore exec --- sdks/python/apache_beam/transforms/ptransform_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 600f098a5e38..e70fd3db0b88 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -2915,7 +2915,7 @@ class PTransformTypeAliasTest(unittest.TestCase): @unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required") def test_type_alias_statement_supported_in_with_output_types(self): ns = {} - exec("type InputType = tuple[int, ...]", ns) + exec("type InputType = tuple[int, ...]", ns) # pylint: disable=exec-used InputType = ns["InputType"] def print_element(element: InputType) -> InputType: @@ -2931,7 +2931,7 @@ def print_element(element: InputType) -> InputType: @unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required") def test_type_alias_supported_in_ptransform_with_output_types(self): ns = {} - exec("type OutputType = tuple[int, int]", ns) + exec("type OutputType = tuple[int, int]", ns) # pylint: disable=exec-used OutputType = ns["OutputType"] with beam.Pipeline() as p: From 9295597e368af53b9d8fa17d2197d4ac67038e3a Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 3 Nov 2025 19:09:14 -0500 Subject: [PATCH 3/6] use dispatcher --- .../internal/cloudpickle/cloudpickle.py | 20 ---------------- .../internal/cloudpickle_pickler.py | 24 +++++++++++++++++++ 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 23fae0666de9..8ee770d61691 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -168,18 +168,6 @@ class CloudPickleConfig: DEFAULT_CONFIG = CloudPickleConfig() - - -# Minimal helper to return the provided object during unpickling. -def _return_obj(obj): - return obj - - -# Optional import for Python 3.12 TypeAliasType -try: # pragma: no cover - dependent on Python version - from typing import TypeAliasType # type: ignore[attr-defined] -except Exception: - TypeAliasType = None # type: ignore[assignment] builtin_code_type = None if PYPY: # builtin-code objects only exist in pypy @@ -1546,14 +1534,6 @@ def reducer_override(self, obj): return _class_reduce(obj, self.config) elif isinstance(obj, typing.TypeVar): # Add this check return _typevar_reduce(obj, self.config) - elif TypeAliasType is not None and isinstance(obj, TypeAliasType): - # Unwrap typing.TypeAliasType to its underlying value to make pickling - # robust for locally-defined `type` aliases (Python 3.12+). - underlying = getattr(obj, '__value__', None) - if underlying is not None: - return (_return_obj, (underlying, )) - # Fallback to default behavior if no underlying value. - return NotImplemented elif isinstance(obj, types.CodeType): return _code_reduce(obj, self.config) elif isinstance(obj, types.FunctionType): diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index eebba178e7c3..1734f3c69f5f 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -95,6 +95,27 @@ def _get_proto_enum_descriptor_class(): _LOGGER = logging.getLogger(__name__) +# Helper to return an object directly during unpickling. +def _return_obj(obj): + return obj + + +# Optional import for Python 3.12 TypeAliasType +try: # pragma: no cover - dependent on Python version + from typing import TypeAliasType # type: ignore[attr-defined] +except Exception: + TypeAliasType = None # type: ignore[assignment] + + +def _typealias_reduce(obj): + # Unwrap typing.TypeAliasType to its underlying value for robust pickling. + underlying = getattr(obj, '__value__', None) + if underlying is None: + # Fallback: return the object itself; lets default behavior handle it. + return _return_obj, (obj, ) + return _return_obj, (underlying, ) + + def _reconstruct_enum_descriptor(full_name): for _, module in list(sys.modules.items()): if not hasattr(module, 'DESCRIPTOR'): @@ -171,6 +192,9 @@ def _dumps( pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: pass + # Register Python 3.12 `type` alias reducer to unwrap to underlying value. + if TypeAliasType is not None: + pickler.dispatch_table[TypeAliasType] = _typealias_reduce try: pickler.dispatch_table[RLOCK_TYPE] = _pickle_rlock except NameError: From 244d7adf24e57834bef3e6f411d576050ec7ac62 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 3 Nov 2025 20:33:05 -0500 Subject: [PATCH 4/6] lint --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 1734f3c69f5f..53cd7aace868 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -102,9 +102,9 @@ def _return_obj(obj): # Optional import for Python 3.12 TypeAliasType try: # pragma: no cover - dependent on Python version - from typing import TypeAliasType # type: ignore[attr-defined] + from typing import TypeAliasType as _TypeAliasType # type: ignore[attr-defined] except Exception: - TypeAliasType = None # type: ignore[assignment] + _TypeAliasType = None def _typealias_reduce(obj): @@ -193,8 +193,8 @@ def _dumps( except NameError: pass # Register Python 3.12 `type` alias reducer to unwrap to underlying value. - if TypeAliasType is not None: - pickler.dispatch_table[TypeAliasType] = _typealias_reduce + if _TypeAliasType is not None: + pickler.dispatch_table[_TypeAliasType] = _typealias_reduce try: pickler.dispatch_table[RLOCK_TYPE] = _pickle_rlock except NameError: From 0aa1a964e5020e2fe2803844443fed4d5f8e2d67 Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 5 Nov 2025 13:01:45 -0500 Subject: [PATCH 5/6] added one unit test --- .../typehints/native_type_compatibility.py | 2 ++ .../native_type_compatibility_test.py | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index a958858f0f8e..2360df142167 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -36,6 +36,8 @@ from typing_extensions import is_typeddict # Python 3.12 adds TypeAliasType for `type` statements; keep optional import. +# pylint: disable=ungrouped-imports +# isort: off try: from typing import TypeAliasType # type: ignore[attr-defined] except Exception: # pragma: no cover - pre-3.12 diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py index f6a13d7795a0..780619bd9352 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -491,6 +491,24 @@ def test_convert_typing_to_builtin(self): builtin_type = convert_typing_to_builtin(typing_type) self.assertEqual(builtin_type, expected_builtin_type, description) + def test_type_alias_type_unwrapped(self): + # Only applicable on Python 3.12+, where typing.TypeAliasType exists + # and the `type` statement is available. + TypeAliasType = getattr(typing, 'TypeAliasType', None) + if TypeAliasType is None: + self.skipTest('TypeAliasType not available') + + ns = {} + try: + exec('type AliasTuple = tuple[int, ...]', {}, ns) # pylint: disable=exec-used + except SyntaxError: + self.skipTest('type statement not supported') + + AliasTuple = ns['AliasTuple'] + self.assertTrue(isinstance(AliasTuple, TypeAliasType)) + self.assertEqual( + typehints.Tuple[int, ...], convert_to_beam_type(AliasTuple)) + if __name__ == '__main__': unittest.main() From e1d909ca9f19087d8e083f8a8810c9ce5df1eaa1 Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 5 Nov 2025 13:23:18 -0500 Subject: [PATCH 6/6] lint --- .../apache_beam/typehints/native_type_compatibility_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py index 780619bd9352..0e933b0d4925 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -505,7 +505,7 @@ def test_type_alias_type_unwrapped(self): self.skipTest('type statement not supported') AliasTuple = ns['AliasTuple'] - self.assertTrue(isinstance(AliasTuple, TypeAliasType)) + self.assertTrue(isinstance(AliasTuple, TypeAliasType)) # pylint: disable=isinstance-second-argument-not-valid-type self.assertEqual( typehints.Tuple[int, ...], convert_to_beam_type(AliasTuple))