From 054f3e7edfe6f4754d597a8b665fd342acb80627 Mon Sep 17 00:00:00 2001 From: helmiazizm Date: Wed, 12 Mar 2025 12:26:11 +0700 Subject: [PATCH 1/4] Added fsspecio method for OSS, standardized key configurations for OSS --- mkdocs/docs/configuration.md | 13 ++++++------- pyiceberg/io/__init__.py | 6 +++++- pyiceberg/io/fsspec.py | 21 +++++++++++++++++++++ pyiceberg/io/pyarrow.py | 32 +++++++++----------------------- 4 files changed, 41 insertions(+), 31 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 5758628f48..6459d0b647 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -94,7 +94,7 @@ Iceberg works with the concept of a FileIO which is a pluggable module for readi - **file**: `PyArrowFileIO` - **hdfs**: `PyArrowFileIO` - **abfs**, **abfss**: `FsspecFileIO` -- **oss**: `PyArrowFileIO` +- **oss**: `PyArrowFileIO`, `FsspecFileIO` You can also set the FileIO explicitly: @@ -181,15 +181,14 @@ For the FileIO there are several configuration options available: -PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss) as long as the endpoint is addressed with virtual hosted style. +PyIceberg uses `PyArrowFileIO`'s `S3FileSystem` and `FsspecFileIO`'s `S3FileSystem` class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss). With `oss` protocol, both method has virtual hosted style set to default. | Key | Example | Description | | -------------------- | ------------------- | ------------------------------------------------ | -| s3.endpoint | | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. | -| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | -| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | -| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | -| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This must be set to True as OSS can only be accessed with virtual hosted style address. | +| oss.endpoint | | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. | +| oss.access-key-id | admin | Configure the static access key id used to access the FileIO. | +| oss.access-key-secret | password | Configure the static secret access key used to access the FileIO. | +| oss.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index f4606b6f30..221db9f06e 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -69,6 +69,10 @@ S3_ROLE_ARN = "s3.role-arn" S3_ROLE_SESSION_NAME = "s3.role-session-name" S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing" +OSS_ENDPOINT = "oss.endpoint" +OSS_ACCESS_KEY_ID = "oss.access-key-id" +OSS_ACCESS_KEY_SECRET = "oss.access-key-secret" +OSS_SESSION_TOKEN = "oss.session-token" HDFS_HOST = "hdfs.host" HDFS_PORT = "hdfs.port" HDFS_USER = "hdfs.user" @@ -298,7 +302,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: "s3": [ARROW_FILE_IO, FSSPEC_FILE_IO], "s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO], "s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO], - "oss": [ARROW_FILE_IO], + "oss": [ARROW_FILE_IO, FSSPEC_FILE_IO], "gs": [ARROW_FILE_IO], "file": [ARROW_FILE_IO, FSSPEC_FILE_IO], "hdfs": [ARROW_FILE_IO], diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 4cbae52304..70c5e08c5f 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -59,6 +59,10 @@ GCS_SESSION_KWARGS, GCS_TOKEN, GCS_VERSION_AWARE, + OSS_ACCESS_KEY_ID, + OSS_ACCESS_KEY_SECRET, + OSS_ENDPOINT, + OSS_SESSION_TOKEN, S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, S3_ENDPOINT, @@ -124,6 +128,22 @@ def _file(_: Properties) -> LocalFileSystem: return LocalFileSystem(auto_mkdir=True) +def _oss(properties: Properties) -> AbstractFileSystem: + from s3fs import S3FileSystem + + client_kwargs = { + "endpoint_url": properties.get(OSS_ENDPOINT), + "aws_access_key_id": properties.get(OSS_ACCESS_KEY_ID), + "aws_secret_access_key": properties.get(OSS_ACCESS_KEY_SECRET), + "aws_session_token": properties.get(OSS_SESSION_TOKEN), + } + config_kwargs = {"s3": {"addressing_style": "virtual"}, "signature_version": "v4"} + + fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) + + return fs + + def _s3(properties: Properties) -> AbstractFileSystem: from s3fs import S3FileSystem @@ -217,6 +237,7 @@ def _adls(properties: Properties) -> AbstractFileSystem: "abfss": _adls, "gs": _gs, "gcs": _gs, + "oss": _oss, } diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7c8aaaab1b..10d2e51236 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -99,6 +99,10 @@ HDFS_KERB_TICKET, HDFS_PORT, HDFS_USER, + OSS_ACCESS_KEY_ID, + OSS_ACCESS_KEY_SECRET, + OSS_ENDPOINT, + OSS_SESSION_TOKEN, PYARROW_USE_LARGE_TYPES_ON_READ, S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, @@ -398,31 +402,13 @@ def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + "endpoint_override": self.properties.get(OSS_ENDPOINT), + "access_key": self.properties.get(OSS_ACCESS_KEY_ID), + "secret_key": self.properties.get(OSS_ACCESS_KEY_SECRET), + "session_token": self.properties.get(OSS_SESSION_TOKEN), + "force_virtual_addressing": True, } - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri - - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) - - if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT): - client_kwargs["request_timeout"] = float(request_timeout) - - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn - - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name - - if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) - return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: From 0d5d8f7bf58367c7d641710b2dc3f720a2d4c615 Mon Sep 17 00:00:00 2001 From: helmiazizm Date: Thu, 13 Mar 2025 09:15:54 +0700 Subject: [PATCH 2/4] Added deprecation messages --- pyiceberg/io/fsspec.py | 4 +--- pyiceberg/io/pyarrow.py | 37 +++++++++++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 70c5e08c5f..1e8f11e0cb 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -139,9 +139,7 @@ def _oss(properties: Properties) -> AbstractFileSystem: } config_kwargs = {"s3": {"addressing_style": "virtual"}, "signature_version": "v4"} - fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) - - return fs + return S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) def _s3(properties: Properties) -> AbstractFileSystem: diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 10d2e51236..e85d1c686b 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -175,6 +175,7 @@ from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime +from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -402,13 +403,41 @@ def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(OSS_ENDPOINT), - "access_key": self.properties.get(OSS_ACCESS_KEY_ID), - "secret_key": self.properties.get(OSS_ACCESS_KEY_SECRET), - "session_token": self.properties.get(OSS_SESSION_TOKEN), + "endpoint_override": get_first_property_value(self.properties, OSS_ENDPOINT, S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, OSS_ACCESS_KEY_ID, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, OSS_ACCESS_KEY_SECRET, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, OSS_SESSION_TOKEN, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), "force_virtual_addressing": True, } + if self.properties.get(S3_ENDPOINT): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {S3_ENDPOINT} is deprecated, please use {OSS_ENDPOINT} instead." + ) + + if access_key := get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {access_key} is deprecated, please use {OSS_ACCESS_KEY_ID} instead." + ) + + if secret_key := get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {secret_key} is deprecated, please use {OSS_ACCESS_KEY_SECRET} instead." + ) + + if session_token := get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {session_token} is deprecated, please use {OSS_SESSION_TOKEN} instead." + ) + return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: From dc94e23efbfc79aedebc8b83974a43a5e81dd06f Mon Sep 17 00:00:00 2001 From: helmiazizm Date: Thu, 13 Mar 2025 09:17:39 +0700 Subject: [PATCH 3/4] Linter change --- pyiceberg/io/pyarrow.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e85d1c686b..d2e050b39d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -405,7 +405,9 @@ def _initialize_oss_fs(self) -> FileSystem: client_kwargs: Dict[str, Any] = { "endpoint_override": get_first_property_value(self.properties, OSS_ENDPOINT, S3_ENDPOINT), "access_key": get_first_property_value(self.properties, OSS_ACCESS_KEY_ID, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, OSS_ACCESS_KEY_SECRET, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "secret_key": get_first_property_value( + self.properties, OSS_ACCESS_KEY_SECRET, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY + ), "session_token": get_first_property_value(self.properties, OSS_SESSION_TOKEN, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), "force_virtual_addressing": True, } @@ -414,28 +416,28 @@ def _initialize_oss_fs(self) -> FileSystem: deprecation_message( deprecated_in="0.9.1", removed_in="1.0", - help_message=f"The property {S3_ENDPOINT} is deprecated, please use {OSS_ENDPOINT} instead." + help_message=f"The property {S3_ENDPOINT} is deprecated, please use {OSS_ENDPOINT} instead.", ) if access_key := get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID): deprecation_message( deprecated_in="0.9.1", removed_in="1.0", - help_message=f"The property {access_key} is deprecated, please use {OSS_ACCESS_KEY_ID} instead." + help_message=f"The property {access_key} is deprecated, please use {OSS_ACCESS_KEY_ID} instead.", ) if secret_key := get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY): deprecation_message( deprecated_in="0.9.1", removed_in="1.0", - help_message=f"The property {secret_key} is deprecated, please use {OSS_ACCESS_KEY_SECRET} instead." + help_message=f"The property {secret_key} is deprecated, please use {OSS_ACCESS_KEY_SECRET} instead.", ) if session_token := get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN): deprecation_message( deprecated_in="0.9.1", removed_in="1.0", - help_message=f"The property {session_token} is deprecated, please use {OSS_SESSION_TOKEN} instead." + help_message=f"The property {session_token} is deprecated, please use {OSS_SESSION_TOKEN} instead.", ) return S3FileSystem(**client_kwargs) From c208a2d790249111e99bba840b15c11e40775e94 Mon Sep 17 00:00:00 2001 From: helmiazizm Date: Fri, 14 Mar 2025 11:40:05 +0700 Subject: [PATCH 4/4] Added typing on client config kwargs --- pyiceberg/io/fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 1e8f11e0cb..a4a95cf474 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -131,13 +131,13 @@ def _file(_: Properties) -> LocalFileSystem: def _oss(properties: Properties) -> AbstractFileSystem: from s3fs import S3FileSystem - client_kwargs = { + client_kwargs: Dict[str, Any] = { "endpoint_url": properties.get(OSS_ENDPOINT), "aws_access_key_id": properties.get(OSS_ACCESS_KEY_ID), "aws_secret_access_key": properties.get(OSS_ACCESS_KEY_SECRET), "aws_session_token": properties.get(OSS_SESSION_TOKEN), } - config_kwargs = {"s3": {"addressing_style": "virtual"}, "signature_version": "v4"} + config_kwargs: Dict[str, Any] = {"s3": {"addressing_style": "virtual"}, "signature_version": "v4"} return S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)