diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 5758628f48..6459d0b647 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -94,7 +94,7 @@ Iceberg works with the concept of a FileIO which is a pluggable module for readi - **file**: `PyArrowFileIO` - **hdfs**: `PyArrowFileIO` - **abfs**, **abfss**: `FsspecFileIO` -- **oss**: `PyArrowFileIO` +- **oss**: `PyArrowFileIO`, `FsspecFileIO` You can also set the FileIO explicitly: @@ -181,15 +181,14 @@ For the FileIO there are several configuration options available: -PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss) as long as the endpoint is addressed with virtual hosted style. +PyIceberg uses `PyArrowFileIO`'s `S3FileSystem` and `FsspecFileIO`'s `S3FileSystem` class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss). With `oss` protocol, both method has virtual hosted style set to default. | Key | Example | Description | | -------------------- | ------------------- | ------------------------------------------------ | -| s3.endpoint | | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. | -| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | -| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | -| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | -| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This must be set to True as OSS can only be accessed with virtual hosted style address. | +| oss.endpoint | | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. | +| oss.access-key-id | admin | Configure the static access key id used to access the FileIO. | +| oss.access-key-secret | password | Configure the static secret access key used to access the FileIO. | +| oss.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index f4606b6f30..221db9f06e 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -69,6 +69,10 @@ S3_ROLE_ARN = "s3.role-arn" S3_ROLE_SESSION_NAME = "s3.role-session-name" S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing" +OSS_ENDPOINT = "oss.endpoint" +OSS_ACCESS_KEY_ID = "oss.access-key-id" +OSS_ACCESS_KEY_SECRET = "oss.access-key-secret" +OSS_SESSION_TOKEN = "oss.session-token" HDFS_HOST = "hdfs.host" HDFS_PORT = "hdfs.port" HDFS_USER = "hdfs.user" @@ -298,7 +302,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: "s3": [ARROW_FILE_IO, FSSPEC_FILE_IO], "s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO], "s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO], - "oss": [ARROW_FILE_IO], + "oss": [ARROW_FILE_IO, FSSPEC_FILE_IO], "gs": [ARROW_FILE_IO], "file": [ARROW_FILE_IO, FSSPEC_FILE_IO], "hdfs": [ARROW_FILE_IO], diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 4cbae52304..a4a95cf474 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -59,6 +59,10 @@ GCS_SESSION_KWARGS, GCS_TOKEN, GCS_VERSION_AWARE, + OSS_ACCESS_KEY_ID, + OSS_ACCESS_KEY_SECRET, + OSS_ENDPOINT, + OSS_SESSION_TOKEN, S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, S3_ENDPOINT, @@ -124,6 +128,20 @@ def _file(_: Properties) -> LocalFileSystem: return LocalFileSystem(auto_mkdir=True) +def _oss(properties: Properties) -> AbstractFileSystem: + from s3fs import S3FileSystem + + client_kwargs: Dict[str, Any] = { + "endpoint_url": properties.get(OSS_ENDPOINT), + "aws_access_key_id": properties.get(OSS_ACCESS_KEY_ID), + "aws_secret_access_key": properties.get(OSS_ACCESS_KEY_SECRET), + "aws_session_token": properties.get(OSS_SESSION_TOKEN), + } + config_kwargs: Dict[str, Any] = {"s3": {"addressing_style": "virtual"}, "signature_version": "v4"} + + return S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) + + def _s3(properties: Properties) -> AbstractFileSystem: from s3fs import S3FileSystem @@ -217,6 +235,7 @@ def _adls(properties: Properties) -> AbstractFileSystem: "abfss": _adls, "gs": _gs, "gcs": _gs, + "oss": _oss, } diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d9f84a42ba..6d68e8c128 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -99,6 +99,10 @@ HDFS_KERB_TICKET, HDFS_PORT, HDFS_USER, + OSS_ACCESS_KEY_ID, + OSS_ACCESS_KEY_SECRET, + OSS_ENDPOINT, + OSS_SESSION_TOKEN, PYARROW_USE_LARGE_TYPES_ON_READ, S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, @@ -172,6 +176,7 @@ from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime +from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -399,30 +404,42 @@ def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + "endpoint_override": get_first_property_value(self.properties, OSS_ENDPOINT, S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, OSS_ACCESS_KEY_ID, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value( + self.properties, OSS_ACCESS_KEY_SECRET, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY + ), + "session_token": get_first_property_value(self.properties, OSS_SESSION_TOKEN, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "force_virtual_addressing": True, } - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri - - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) - - if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT): - client_kwargs["request_timeout"] = float(request_timeout) + if self.properties.get(S3_ENDPOINT): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {S3_ENDPOINT} is deprecated, please use {OSS_ENDPOINT} instead.", + ) - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn + if access_key := get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {access_key} is deprecated, please use {OSS_ACCESS_KEY_ID} instead.", + ) - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name + if secret_key := get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {secret_key} is deprecated, please use {OSS_ACCESS_KEY_SECRET} instead.", + ) - if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + if session_token := get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN): + deprecation_message( + deprecated_in="0.9.1", + removed_in="1.0", + help_message=f"The property {session_token} is deprecated, please use {OSS_SESSION_TOKEN} instead.", + ) return S3FileSystem(**client_kwargs)