diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 1ba9caa7917f..497d711a49e0 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -63,6 +63,31 @@ def parse_location(location: str): else: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + @staticmethod + def _create_s3_retry_config( + max_attempts: int = 10, + request_timeout: int = 60, + connect_timeout: int = 60 + ) -> Dict[str, Any]: + """ + AwsStandardS3RetryStrategy and timeout parameters are only available + in PyArrow >= 8.0.0. + """ + if parse(pyarrow.__version__) >= parse("8.0.0"): + config = { + 'request_timeout': request_timeout, + 'connect_timeout': connect_timeout + } + try: + from pyarrow.fs import AwsStandardS3RetryStrategy + retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) + config['retry_strategy'] = retry_strategy + except ImportError: + pass + return config + else: + return {} + def _extract_oss_bucket(self, location) -> str: uri = urlparse(location) if uri.scheme and uri.scheme != "oss": @@ -104,6 +129,9 @@ def _initialize_oss_fs(self, path) -> FileSystem: client_kwargs['endpoint_override'] = (oss_bucket + "." + self.properties.get(OssOptions.OSS_ENDPOINT)) + retry_config = self._create_s3_retry_config() + client_kwargs.update(retry_config) + return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self) -> FileSystem: @@ -118,6 +146,9 @@ def _initialize_s3_fs(self) -> FileSystem: "force_virtual_addressing": True, } + retry_config = self._create_s3_retry_config() + client_kwargs.update(retry_config) + return S3FileSystem(**client_kwargs) def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: