Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,24 @@ For the FileIO there are several configuration options available:

<!-- markdown-link-check-disable -->

| Key | Example | Description |
|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.role-session-name | session | An optional identifier for the assumed role session. |
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
| Key | Example | Description |
|-----------------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.role-session-name | session | An optional identifier for the assumed role session. |
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically tries to resolve the region if this isn't set (only supported for AWS S3 Buckets). |
| s3.resolve-region | False | Only supported for `PyArrowFileIO`, when enabled, it will always try to resolve the location of the bucket (only supported for AWS S3 Buckets). |
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |

<!-- markdown-link-check-enable-->

Expand Down
1 change: 1 addition & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
S3_ROLE_ARN = "s3.role-arn"
S3_ROLE_SESSION_NAME = "s3.role-session-name"
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl"
HDFS_HOST = "hdfs.host"
HDFS_PORT = "hdfs.port"
HDFS_USER = "hdfs.user"
Expand Down
22 changes: 22 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import fnmatch
import functools
import importlib
import itertools
import logging
import operator
Expand Down Expand Up @@ -63,6 +64,7 @@
import pyarrow.lib
import pyarrow.parquet as pq
from pyarrow import ChunkedArray
from pyarrow._s3fs import S3RetryStrategy
from pyarrow.fs import (
FileInfo,
FileSystem,
Expand Down Expand Up @@ -111,6 +113,7 @@
S3_REGION,
S3_REQUEST_TIMEOUT,
S3_RESOLVE_REGION,
S3_RETRY_STRATEGY_IMPL,
S3_ROLE_ARN,
S3_ROLE_SESSION_NAME,
S3_SECRET_ACCESS_KEY,
Expand Down Expand Up @@ -214,6 +217,20 @@ def _cached_resolve_s3_region(bucket: str) -> Optional[str]:
return None


def _import_retry_strategy(impl: str) -> Optional[S3RetryStrategy]:
try:
path_parts = impl.split(".")
if len(path_parts) < 2:
raise ValueError(f"retry-strategy-impl should be full path (module.CustomS3RetryStrategy), got: {impl}")
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
return class_()
except (ModuleNotFoundError, AttributeError):
warnings.warn(f"Could not initialize S3 retry strategy: {impl}")
return None


class UnsupportedPyArrowTypeException(Exception):
"""Cannot convert PyArrow type to corresponding Iceberg type."""

Expand Down Expand Up @@ -476,6 +493,11 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)

if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
retry_instance := _import_retry_strategy(retry_strategy_impl)
):
client_kwargs["retry_strategy"] = retry_instance

return S3FileSystem(**client_kwargs)

def _initialize_azure_fs(self) -> FileSystem:
Expand Down
23 changes: 21 additions & 2 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import tempfile
import uuid
import warnings
from datetime import date
from typing import Any, List, Optional
from unittest.mock import MagicMock, patch
Expand All @@ -29,7 +30,7 @@
import pyarrow.parquet as pq
import pytest
from packaging import version
from pyarrow.fs import FileType, LocalFileSystem, S3FileSystem
from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem

from pyiceberg.exceptions import ResolveError
from pyiceberg.expressions import (
Expand Down Expand Up @@ -57,7 +58,7 @@
Or,
)
from pyiceberg.expressions.literals import literal
from pyiceberg.io import InputStream, OutputStream, load_file_io
from pyiceberg.io import S3_RETRY_STRATEGY_IMPL, InputStream, OutputStream, load_file_io
from pyiceberg.io.pyarrow import (
ICEBERG_SCHEMA,
ArrowScan,
Expand Down Expand Up @@ -2512,3 +2513,21 @@ def test_pyarrow_io_multi_fs() -> None:

# Same PyArrowFileIO instance resolves local file input to LocalFileSystem
assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem)


class SomeRetryStrategy(AwsDefaultS3RetryStrategy):
def __init__(self) -> None:
super().__init__()
warnings.warn("Initialized SomeRetryStrategy 👍")


def test_retry_strategy() -> None:
io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "tests.io.test_pyarrow.SomeRetryStrategy"})
with pytest.warns(UserWarning, match="Initialized SomeRetryStrategy.*"):
io.new_input("s3://bucket/path/to/file")


def test_retry_strategy_not_found() -> None:
io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"})
with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"):
io.new_input("s3://bucket/path/to/file")