diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md
index 539fa48266..8503a846d6 100644
--- a/mkdocs/docs/configuration.md
+++ b/mkdocs/docs/configuration.md
@@ -109,23 +109,24 @@ For the FileIO there are several configuration options available:
-| Key | Example | Description |
-|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| s3.endpoint | | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
-| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
-| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
-| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
-| s3.role-session-name | session | An optional identifier for the assumed role session. |
-| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
-| s3.signer | bearer | Configure the signature version of the FileIO. |
-| s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. |
-| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). |
-| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
-| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
-| s3.proxy-uri | | Configure the proxy server to be used by the FileIO. |
-| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
-| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
+| Key | Example | Description |
+|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| s3.endpoint | | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
+| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
+| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
+| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
+| s3.role-session-name | session | An optional identifier for the assumed role session. |
+| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
+| s3.signer | bearer | Configure the signature version of the FileIO. |
+| s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. |
+| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). |
+| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
+| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
+| s3.proxy-uri | | Configure the proxy server to be used by the FileIO. |
+| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
+| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
+| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |
diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py
index 2a73aba7f2..0b03a7dd4f 100644
--- a/pyiceberg/io/__init__.py
+++ b/pyiceberg/io/__init__.py
@@ -68,6 +68,7 @@
S3_ROLE_ARN = "s3.role-arn"
S3_ROLE_SESSION_NAME = "s3.role-session-name"
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
+S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl"
HDFS_HOST = "hdfs.host"
HDFS_PORT = "hdfs.port"
HDFS_USER = "hdfs.user"
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index cc81e37119..3e49885e58 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -27,6 +27,7 @@
import fnmatch
import functools
+import importlib
import itertools
import logging
import operator
@@ -63,6 +64,7 @@
import pyarrow.lib
import pyarrow.parquet as pq
from pyarrow import ChunkedArray
+from pyarrow._s3fs import S3RetryStrategy
from pyarrow.fs import (
FileInfo,
FileSystem,
@@ -111,6 +113,7 @@
S3_REGION,
S3_REQUEST_TIMEOUT,
S3_RESOLVE_REGION,
+ S3_RETRY_STRATEGY_IMPL,
S3_ROLE_ARN,
S3_ROLE_SESSION_NAME,
S3_SECRET_ACCESS_KEY,
@@ -214,6 +217,20 @@ def _cached_resolve_s3_region(bucket: str) -> Optional[str]:
return None
+def _import_retry_strategy(impl: str) -> Optional[S3RetryStrategy]:
+ try:
+ path_parts = impl.split(".")
+ if len(path_parts) < 2:
+ raise ValueError(f"retry-strategy-impl should be full path (module.CustomS3RetryStrategy), got: {impl}")
+ module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
+ module = importlib.import_module(module_name)
+ class_ = getattr(module, class_name)
+ return class_()
+ except (ModuleNotFoundError, AttributeError):
+ warnings.warn(f"Could not initialize S3 retry strategy: {impl}")
+ return None
+
+
class UnsupportedPyArrowTypeException(Exception):
"""Cannot convert PyArrow type to corresponding Iceberg type."""
@@ -476,6 +493,11 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)
+ if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
+ retry_instance := _import_retry_strategy(retry_strategy_impl)
+ ):
+ client_kwargs["retry_strategy"] = retry_instance
+
return S3FileSystem(**client_kwargs)
def _initialize_azure_fs(self) -> FileSystem:
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 850b1292a8..92494455af 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -19,6 +19,7 @@
import os
import tempfile
import uuid
+import warnings
from datetime import date
from typing import Any, List, Optional
from unittest.mock import MagicMock, patch
@@ -29,7 +30,7 @@
import pyarrow.parquet as pq
import pytest
from packaging import version
-from pyarrow.fs import FileType, LocalFileSystem, S3FileSystem
+from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem
from pyiceberg.exceptions import ResolveError
from pyiceberg.expressions import (
@@ -57,7 +58,7 @@
Or,
)
from pyiceberg.expressions.literals import literal
-from pyiceberg.io import InputStream, OutputStream, load_file_io
+from pyiceberg.io import S3_RETRY_STRATEGY_IMPL, InputStream, OutputStream, load_file_io
from pyiceberg.io.pyarrow import (
ICEBERG_SCHEMA,
ArrowScan,
@@ -2512,3 +2513,21 @@ def test_pyarrow_io_multi_fs() -> None:
# Same PyArrowFileIO instance resolves local file input to LocalFileSystem
assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem)
+
+
+class SomeRetryStrategy(AwsDefaultS3RetryStrategy):
+ def __init__(self) -> None:
+ super().__init__()
+ warnings.warn("Initialized SomeRetryStrategy 👍")
+
+
+def test_retry_strategy() -> None:
+ io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "tests.io.test_pyarrow.SomeRetryStrategy"})
+ with pytest.warns(UserWarning, match="Initialized SomeRetryStrategy.*"):
+ io.new_input("s3://bucket/path/to/file")
+
+
+def test_retry_strategy_not_found() -> None:
+ io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"})
+ with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"):
+ io.new_input("s3://bucket/path/to/file")