diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 539fa48266..8503a846d6 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -109,23 +109,24 @@ For the FileIO there are several configuration options available: -| Key | Example | Description | -|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| s3.endpoint | | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | -| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | -| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | -| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | -| s3.role-session-name | session | An optional identifier for the assumed role session. | -| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. | -| s3.signer | bearer | Configure the signature version of the FileIO. | -| s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. | -| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | -| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). | -| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). | -| s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | -| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | -| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. | +| Key | Example | Description | +|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| s3.endpoint | | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | +| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | +| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | +| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | +| s3.role-session-name | session | An optional identifier for the assumed role session. | +| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. | +| s3.signer | bearer | Configure the signature version of the FileIO. | +| s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. | +| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | +| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). | +| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). | +| s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | +| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | +| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. | | s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | +| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. | diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index 2a73aba7f2..0b03a7dd4f 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -68,6 +68,7 @@ S3_ROLE_ARN = "s3.role-arn" S3_ROLE_SESSION_NAME = "s3.role-session-name" S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing" +S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl" HDFS_HOST = "hdfs.host" HDFS_PORT = "hdfs.port" HDFS_USER = "hdfs.user" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index cc81e37119..3e49885e58 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -27,6 +27,7 @@ import fnmatch import functools +import importlib import itertools import logging import operator @@ -63,6 +64,7 @@ import pyarrow.lib import pyarrow.parquet as pq from pyarrow import ChunkedArray +from pyarrow._s3fs import S3RetryStrategy from pyarrow.fs import ( FileInfo, FileSystem, @@ -111,6 +113,7 @@ S3_REGION, S3_REQUEST_TIMEOUT, S3_RESOLVE_REGION, + S3_RETRY_STRATEGY_IMPL, S3_ROLE_ARN, S3_ROLE_SESSION_NAME, S3_SECRET_ACCESS_KEY, @@ -214,6 +217,20 @@ def _cached_resolve_s3_region(bucket: str) -> Optional[str]: return None +def _import_retry_strategy(impl: str) -> Optional[S3RetryStrategy]: + try: + path_parts = impl.split(".") + if len(path_parts) < 2: + raise ValueError(f"retry-strategy-impl should be full path (module.CustomS3RetryStrategy), got: {impl}") + module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] + module = importlib.import_module(module_name) + class_ = getattr(module, class_name) + return class_() + except (ModuleNotFoundError, AttributeError): + warnings.warn(f"Could not initialize S3 retry strategy: {impl}") + return None + + class UnsupportedPyArrowTypeException(Exception): """Cannot convert PyArrow type to corresponding Iceberg type.""" @@ -476,6 +493,11 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None: client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False) + if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and ( + retry_instance := _import_retry_strategy(retry_strategy_impl) + ): + client_kwargs["retry_strategy"] = retry_instance + return S3FileSystem(**client_kwargs) def _initialize_azure_fs(self) -> FileSystem: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 850b1292a8..92494455af 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -19,6 +19,7 @@ import os import tempfile import uuid +import warnings from datetime import date from typing import Any, List, Optional from unittest.mock import MagicMock, patch @@ -29,7 +30,7 @@ import pyarrow.parquet as pq import pytest from packaging import version -from pyarrow.fs import FileType, LocalFileSystem, S3FileSystem +from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem from pyiceberg.exceptions import ResolveError from pyiceberg.expressions import ( @@ -57,7 +58,7 @@ Or, ) from pyiceberg.expressions.literals import literal -from pyiceberg.io import InputStream, OutputStream, load_file_io +from pyiceberg.io import S3_RETRY_STRATEGY_IMPL, InputStream, OutputStream, load_file_io from pyiceberg.io.pyarrow import ( ICEBERG_SCHEMA, ArrowScan, @@ -2512,3 +2513,21 @@ def test_pyarrow_io_multi_fs() -> None: # Same PyArrowFileIO instance resolves local file input to LocalFileSystem assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem) + + +class SomeRetryStrategy(AwsDefaultS3RetryStrategy): + def __init__(self) -> None: + super().__init__() + warnings.warn("Initialized SomeRetryStrategy 👍") + + +def test_retry_strategy() -> None: + io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "tests.io.test_pyarrow.SomeRetryStrategy"}) + with pytest.warns(UserWarning, match="Initialized SomeRetryStrategy.*"): + io.new_input("s3://bucket/path/to/file") + + +def test_retry_strategy_not_found() -> None: + io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"}) + with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): + io.new_input("s3://bucket/path/to/file")