From d13d4ba1b1c296f3cdc9be9d4a2a760e5b5fdd9d Mon Sep 17 00:00:00 2001 From: "garrett.weaver" Date: Tue, 12 Aug 2025 09:41:19 -0700 Subject: [PATCH 1/5] add support to configure anonymous in s3 client --- pyiceberg/io/__init__.py | 1 + pyiceberg/io/fsspec.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index f89de18f12..91ca6ee443 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -52,6 +52,7 @@ AWS_SESSION_TOKEN = "client.session-token" AWS_ROLE_ARN = "client.role-arn" AWS_ROLE_SESSION_NAME = "client.role-session-name" +S3_ANONYMOUS = "s3.anonymous" S3_ENDPOINT = "s3.endpoint" S3_ACCESS_KEY_ID = "s3.access-key-id" S3_SECRET_ACCESS_KEY = "s3.secret-access-key" diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index d075765ed1..8401283554 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -65,6 +65,7 @@ HF_ENDPOINT, HF_TOKEN, S3_ACCESS_KEY_ID, + S3_ANONYMOUS, S3_CONNECT_TIMEOUT, S3_ENDPOINT, S3_PROXY_URI, @@ -163,6 +164,9 @@ def _s3(properties: Properties) -> AbstractFileSystem: if request_timeout := properties.get(S3_REQUEST_TIMEOUT): config_kwargs["read_timeout"] = float(request_timeout) + + if s3_anonymous := properties.get(S3_ANONYMOUS): + config_kwargs["anon"] = bool(s3_anonymous) fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) From 98ec0c658cf81a36b7798529904e39efd559f702 Mon Sep 17 00:00:00 2001 From: gmweaver Date: Tue, 12 Aug 2025 10:16:34 -0700 Subject: [PATCH 2/5] add unit test --- tests/io/test_fsspec.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py index 854172727d..9117b8cc8c 100644 --- a/tests/io/test_fsspec.py +++ b/tests/io/test_fsspec.py @@ -265,6 +265,36 @@ def test_fsspec_s3_session_properties() -> None: config_kwargs={}, ) +def test_fsspec_s3_session_properties_anon_config() -> None: + session_properties: Properties = { + "s3.anonymous": "true", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + "s3.region": "us-east-1", + "s3.session-token": "s3.session-token", + **UNIFIED_AWS_SESSION_PROPERTIES, + } + + with mock.patch("s3fs.S3FileSystem") as mock_s3fs: + s3_fileio = FsspecFileIO(properties=session_properties) + filename = str(uuid.uuid4()) + + s3_fileio.new_input(location=f"s3://warehouse/{filename}") + + mock_s3fs.assert_called_with( + client_kwargs={ + "endpoint_url": "http://localhost:9000", + "aws_access_key_id": "admin", + "aws_secret_access_key": "password", + "region_name": "us-east-1", + "aws_session_token": "s3.session-token", + }, + config_kwargs={ + "anon": True, + }, + ) + def test_fsspec_unified_session_properties() -> None: session_properties: Properties = { From 28cd0915339eb75e5ee9ebb31a7f1d5e091e339e Mon Sep 17 00:00:00 2001 From: gmweaver Date: Tue, 12 Aug 2025 16:54:47 -0700 Subject: [PATCH 3/5] add anonymous support to pyarrow/oss, update docs --- mkdocs/docs/configuration.md | 3 +++ pyiceberg/io/fsspec.py | 6 +++--- pyiceberg/io/pyarrow.py | 17 +++++++++++++---- tests/io/test_fsspec.py | 3 ++- tests/io/test_pyarrow.py | 29 +++++++++++++++++++++++++++++ 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 40cfc0b8c9..5e02ffae0e 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -127,6 +127,7 @@ For the FileIO there are several configuration options available: | s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. | | s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | | s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. | +| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or boto's credential resolver. | @@ -197,6 +198,8 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya | s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | | s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | | s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This is set to `True` by default as OSS can only be accessed with virtual hosted style address. | +| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or standard AWS configuration methods. | + diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 8401283554..d7ff3b0391 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -84,7 +84,7 @@ OutputStream, ) from pyiceberg.typedef import Properties -from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool +from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool, strtobool logger = logging.getLogger(__name__) @@ -164,9 +164,9 @@ def _s3(properties: Properties) -> AbstractFileSystem: if request_timeout := properties.get(S3_REQUEST_TIMEOUT): config_kwargs["read_timeout"] = float(request_timeout) - + if s3_anonymous := properties.get(S3_ANONYMOUS): - config_kwargs["anon"] = bool(s3_anonymous) + config_kwargs["anon"] = strtobool(s3_anonymous) fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index c756487c32..db65bb3084 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -109,6 +109,7 @@ HDFS_USER, PYARROW_USE_LARGE_TYPES_ON_READ, S3_ACCESS_KEY_ID, + S3_ANONYMOUS, S3_CONNECT_TIMEOUT, S3_ENDPOINT, S3_FORCE_VIRTUAL_ADDRESSING, @@ -185,7 +186,7 @@ from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.decimal import unscaled_to_decimal from pyiceberg.utils.deprecated import deprecation_message -from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int +from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int, strtobool from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -450,6 +451,9 @@ def _initialize_oss_fs(self) -> FileSystem: if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): client_kwargs["session_name"] = session_name + if s3_anonymous := self.properties.get(S3_ANONYMOUS): + client_kwargs["anonymous"] = strtobool(s3_anonymous) + return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: @@ -501,6 +505,9 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: ): client_kwargs["retry_strategy"] = retry_instance + if s3_anonymous := self.properties.get(S3_ANONYMOUS): + client_kwargs["anonymous"] = strtobool(s3_anonymous) + return S3FileSystem(**client_kwargs) def _initialize_azure_fs(self) -> FileSystem: @@ -2793,9 +2800,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T functools.reduce( operator.and_, [ - pc.field(partition_field_name) == unique_partition[partition_field_name] - if unique_partition[partition_field_name] is not None - else pc.field(partition_field_name).is_null() + ( + pc.field(partition_field_name) == unique_partition[partition_field_name] + if unique_partition[partition_field_name] is not None + else pc.field(partition_field_name).is_null() + ) for field, partition_field_name in zip(spec.fields, partition_fields) ], ) diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py index 9117b8cc8c..11b168dc16 100644 --- a/tests/io/test_fsspec.py +++ b/tests/io/test_fsspec.py @@ -265,7 +265,8 @@ def test_fsspec_s3_session_properties() -> None: config_kwargs={}, ) -def test_fsspec_s3_session_properties_anon_config() -> None: + +def test_fsspec_s3_session_properties_with_anonymous() -> None: session_properties: Properties = { "s3.anonymous": "true", "s3.endpoint": "http://localhost:9000", diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index f5c3082edc..420ff6f2c9 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -390,6 +390,35 @@ def test_pyarrow_s3_session_properties() -> None: ) +def test_pyarrow_s3_session_properties_with_anonymous() -> None: + session_properties: Properties = { + "s3.anonymous": "true", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + "s3.region": "us-east-1", + "s3.session-token": "s3.session-token", + **UNIFIED_AWS_SESSION_PROPERTIES, + } + + with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: + s3_fileio = PyArrowFileIO(properties=session_properties) + filename = str(uuid.uuid4()) + + # Mock `resolve_s3_region` to prevent from the location used resolving to a different s3 region + mock_s3_region_resolver.side_effect = OSError("S3 bucket is not found") + s3_fileio.new_input(location=f"s3://warehouse/{filename}") + + mock_s3fs.assert_called_with( + anonymous=True, + endpoint_override="http://localhost:9000", + access_key="admin", + secret_key="password", + region="us-east-1", + session_token="s3.session-token", + ) + + def test_pyarrow_unified_session_properties() -> None: session_properties: Properties = { "s3.endpoint": "http://localhost:9000", From de3b49876474474e53c401176d4047e714be712b Mon Sep 17 00:00:00 2001 From: gmweaver Date: Tue, 12 Aug 2025 17:06:35 -0700 Subject: [PATCH 4/5] rm formatting --- pyiceberg/io/pyarrow.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index db65bb3084..6bbc438af8 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2800,11 +2800,9 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T functools.reduce( operator.and_, [ - ( - pc.field(partition_field_name) == unique_partition[partition_field_name] - if unique_partition[partition_field_name] is not None - else pc.field(partition_field_name).is_null() - ) + pc.field(partition_field_name) == unique_partition[partition_field_name] + if unique_partition[partition_field_name] is not None + else pc.field(partition_field_name).is_null() for field, partition_field_name in zip(spec.fields, partition_fields) ], ) From 028d5a5d647bcffe2e20a31938022b2c731f2887 Mon Sep 17 00:00:00 2001 From: gmweaver Date: Wed, 13 Aug 2025 12:38:10 -0700 Subject: [PATCH 5/5] lint --- mkdocs/docs/configuration.md | 1 - pyiceberg/io/fsspec.py | 3 ++- pyiceberg/io/pyarrow.py | 11 +++++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 5e02ffae0e..59e89dfc18 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -200,7 +200,6 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya | s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This is set to `True` by default as OSS can only be accessed with virtual hosted style address. | | s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or standard AWS configuration methods. | - ### Hugging Face diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index d7ff3b0391..c1f95b719f 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -84,7 +84,8 @@ OutputStream, ) from pyiceberg.typedef import Properties -from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool, strtobool +from pyiceberg.types import strtobool +from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool logger = logging.getLogger(__name__) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6bbc438af8..7076d7da76 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -180,13 +180,14 @@ TimeType, UnknownType, UUIDType, + strtobool, ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.decimal import unscaled_to_decimal from pyiceberg.utils.deprecated import deprecation_message -from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int, strtobool +from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -2800,9 +2801,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T functools.reduce( operator.and_, [ - pc.field(partition_field_name) == unique_partition[partition_field_name] - if unique_partition[partition_field_name] is not None - else pc.field(partition_field_name).is_null() + ( + pc.field(partition_field_name) == unique_partition[partition_field_name] + if unique_partition[partition_field_name] is not None + else pc.field(partition_field_name).is_null() + ) for field, partition_field_name in zip(spec.fields, partition_fields) ], )