From 59f872a18a8bf0aaf508b82dcd5739755b062d47 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 29 Dec 2025 02:22:14 +0800 Subject: [PATCH 1/4] [python] optimize io retry --- paimon-python/pypaimon/common/file_io.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 1ba9caa7917f..925523dd357c 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -63,6 +63,17 @@ def parse_location(location: str): else: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + @staticmethod + def _create_s3_retry_config(max_attempts: int = 10, request_timeout: int = 60, connect_timeout: int = 60) -> Dict[str, Any]: + from pyarrow.fs import AwsStandardS3RetryStrategy + + retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) + return { + 'retry_strategy': retry_strategy, + 'request_timeout': request_timeout, + 'connect_timeout': connect_timeout + } + def _extract_oss_bucket(self, location) -> str: uri = urlparse(location) if uri.scheme and uri.scheme != "oss": @@ -104,6 +115,9 @@ def _initialize_oss_fs(self, path) -> FileSystem: client_kwargs['endpoint_override'] = (oss_bucket + "." + self.properties.get(OssOptions.OSS_ENDPOINT)) + retry_config = self._create_s3_retry_config() + client_kwargs.update(retry_config) + return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self) -> FileSystem: @@ -118,6 +132,9 @@ def _initialize_s3_fs(self) -> FileSystem: "force_virtual_addressing": True, } + retry_config = self._create_s3_retry_config() + client_kwargs.update(retry_config) + return S3FileSystem(**client_kwargs) def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: From 7bc391b2dc2098e1bb7d9d613b7e9307415af6e4 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 29 Dec 2025 21:47:41 +0800 Subject: [PATCH 2/4] fix code format --- paimon-python/pypaimon/common/file_io.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 925523dd357c..3d7dfa333df6 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -64,7 +64,11 @@ def parse_location(location: str): return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" @staticmethod - def _create_s3_retry_config(max_attempts: int = 10, request_timeout: int = 60, connect_timeout: int = 60) -> Dict[str, Any]: + def _create_s3_retry_config( + max_attempts: int = 10, + request_timeout: int = 60, + connect_timeout: int = 60 + ) -> Dict[str, Any]: from pyarrow.fs import AwsStandardS3RetryStrategy retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) From 0dc9f3d6fe87e66b8e0f24c0c7d840b30c85d1ad Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 29 Dec 2025 22:13:52 +0800 Subject: [PATCH 3/4] make file_io retry compitable with python 3.6 --- paimon-python/pypaimon/common/file_io.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 3d7dfa333df6..b8aa02c0709b 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -69,15 +69,24 @@ def _create_s3_retry_config( request_timeout: int = 60, connect_timeout: int = 60 ) -> Dict[str, Any]: - from pyarrow.fs import AwsStandardS3RetryStrategy - - retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) - return { - 'retry_strategy': retry_strategy, + """ + AwsStandardS3RetryStrategy is only available in PyArrow >= 8.0.0. + """ + config = { 'request_timeout': request_timeout, 'connect_timeout': connect_timeout } + if parse(pyarrow.__version__) >= parse("8.0.0"): + try: + from pyarrow.fs import AwsStandardS3RetryStrategy + retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) + config['retry_strategy'] = retry_strategy + except ImportError: + pass + + return config + def _extract_oss_bucket(self, location) -> str: uri = urlparse(location) if uri.scheme and uri.scheme != "oss": From 2de94bacd5f51d45b60459c491c6bea7e08fb556 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 29 Dec 2025 23:40:45 +0800 Subject: [PATCH 4/4] do not provide request_timeout and connect_timeout when pyarrow version < 8.0.0 --- paimon-python/pypaimon/common/file_io.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index b8aa02c0709b..497d711a49e0 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -70,22 +70,23 @@ def _create_s3_retry_config( connect_timeout: int = 60 ) -> Dict[str, Any]: """ - AwsStandardS3RetryStrategy is only available in PyArrow >= 8.0.0. + AwsStandardS3RetryStrategy and timeout parameters are only available + in PyArrow >= 8.0.0. """ - config = { - 'request_timeout': request_timeout, - 'connect_timeout': connect_timeout - } - if parse(pyarrow.__version__) >= parse("8.0.0"): + config = { + 'request_timeout': request_timeout, + 'connect_timeout': connect_timeout + } try: from pyarrow.fs import AwsStandardS3RetryStrategy retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) config['retry_strategy'] = retry_strategy except ImportError: pass - - return config + return config + else: + return {} def _extract_oss_bucket(self, location) -> str: uri = urlparse(location)