Skip to content

Commit 054f3e7

Browse files
committed
Added fsspecio method for OSS, standardized key configurations for OSS
1 parent 738ef11 commit 054f3e7

File tree

4 files changed

+41
-31
lines changed

4 files changed

+41
-31
lines changed

mkdocs/docs/configuration.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ Iceberg works with the concept of a FileIO which is a pluggable module for readi
9494
- **file**: `PyArrowFileIO`
9595
- **hdfs**: `PyArrowFileIO`
9696
- **abfs**, **abfss**: `FsspecFileIO`
97-
- **oss**: `PyArrowFileIO`
97+
- **oss**: `PyArrowFileIO`, `FsspecFileIO`
9898

9999
You can also set the FileIO explicitly:
100100

@@ -181,15 +181,14 @@ For the FileIO there are several configuration options available:
181181

182182
<!-- markdown-link-check-disable -->
183183

184-
PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss) as long as the endpoint is addressed with virtual hosted style.
184+
PyIceberg uses `PyArrowFileIO`'s `S3FileSystem` and `FsspecFileIO`'s `S3FileSystem` class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss). With `oss` protocol, both method has virtual hosted style set to default.
185185

186186
| Key | Example | Description |
187187
| -------------------- | ------------------- | ------------------------------------------------ |
188-
| s3.endpoint | <https://s3.oss-your-bucket-region.aliyuncs.com/> | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. |
189-
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
190-
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
191-
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
192-
| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This must be set to True as OSS can only be accessed with virtual hosted style address. |
188+
| oss.endpoint | <https://s3.oss-your-bucket-region.aliyuncs.com/> | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. |
189+
| oss.access-key-id | admin | Configure the static access key id used to access the FileIO. |
190+
| oss.access-key-secret | password | Configure the static secret access key used to access the FileIO. |
191+
| oss.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
193192

194193
<!-- markdown-link-check-enable-->
195194

pyiceberg/io/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@
6969
S3_ROLE_ARN = "s3.role-arn"
7070
S3_ROLE_SESSION_NAME = "s3.role-session-name"
7171
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
72+
OSS_ENDPOINT = "oss.endpoint"
73+
OSS_ACCESS_KEY_ID = "oss.access-key-id"
74+
OSS_ACCESS_KEY_SECRET = "oss.access-key-secret"
75+
OSS_SESSION_TOKEN = "oss.session-token"
7276
HDFS_HOST = "hdfs.host"
7377
HDFS_PORT = "hdfs.port"
7478
HDFS_USER = "hdfs.user"
@@ -298,7 +302,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
298302
"s3": [ARROW_FILE_IO, FSSPEC_FILE_IO],
299303
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
300304
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
301-
"oss": [ARROW_FILE_IO],
305+
"oss": [ARROW_FILE_IO, FSSPEC_FILE_IO],
302306
"gs": [ARROW_FILE_IO],
303307
"file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
304308
"hdfs": [ARROW_FILE_IO],

pyiceberg/io/fsspec.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@
5959
GCS_SESSION_KWARGS,
6060
GCS_TOKEN,
6161
GCS_VERSION_AWARE,
62+
OSS_ACCESS_KEY_ID,
63+
OSS_ACCESS_KEY_SECRET,
64+
OSS_ENDPOINT,
65+
OSS_SESSION_TOKEN,
6266
S3_ACCESS_KEY_ID,
6367
S3_CONNECT_TIMEOUT,
6468
S3_ENDPOINT,
@@ -124,6 +128,22 @@ def _file(_: Properties) -> LocalFileSystem:
124128
return LocalFileSystem(auto_mkdir=True)
125129

126130

131+
def _oss(properties: Properties) -> AbstractFileSystem:
132+
from s3fs import S3FileSystem
133+
134+
client_kwargs = {
135+
"endpoint_url": properties.get(OSS_ENDPOINT),
136+
"aws_access_key_id": properties.get(OSS_ACCESS_KEY_ID),
137+
"aws_secret_access_key": properties.get(OSS_ACCESS_KEY_SECRET),
138+
"aws_session_token": properties.get(OSS_SESSION_TOKEN),
139+
}
140+
config_kwargs = {"s3": {"addressing_style": "virtual"}, "signature_version": "v4"}
141+
142+
fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)
143+
144+
return fs
145+
146+
127147
def _s3(properties: Properties) -> AbstractFileSystem:
128148
from s3fs import S3FileSystem
129149

@@ -217,6 +237,7 @@ def _adls(properties: Properties) -> AbstractFileSystem:
217237
"abfss": _adls,
218238
"gs": _gs,
219239
"gcs": _gs,
240+
"oss": _oss,
220241
}
221242

222243

pyiceberg/io/pyarrow.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@
9999
HDFS_KERB_TICKET,
100100
HDFS_PORT,
101101
HDFS_USER,
102+
OSS_ACCESS_KEY_ID,
103+
OSS_ACCESS_KEY_SECRET,
104+
OSS_ENDPOINT,
105+
OSS_SESSION_TOKEN,
102106
PYARROW_USE_LARGE_TYPES_ON_READ,
103107
S3_ACCESS_KEY_ID,
104108
S3_CONNECT_TIMEOUT,
@@ -398,31 +402,13 @@ def _initialize_oss_fs(self) -> FileSystem:
398402
from pyarrow.fs import S3FileSystem
399403

400404
client_kwargs: Dict[str, Any] = {
401-
"endpoint_override": self.properties.get(S3_ENDPOINT),
402-
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
403-
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
404-
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
405-
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
405+
"endpoint_override": self.properties.get(OSS_ENDPOINT),
406+
"access_key": self.properties.get(OSS_ACCESS_KEY_ID),
407+
"secret_key": self.properties.get(OSS_ACCESS_KEY_SECRET),
408+
"session_token": self.properties.get(OSS_SESSION_TOKEN),
409+
"force_virtual_addressing": True,
406410
}
407411

408-
if proxy_uri := self.properties.get(S3_PROXY_URI):
409-
client_kwargs["proxy_options"] = proxy_uri
410-
411-
if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
412-
client_kwargs["connect_timeout"] = float(connect_timeout)
413-
414-
if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
415-
client_kwargs["request_timeout"] = float(request_timeout)
416-
417-
if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
418-
client_kwargs["role_arn"] = role_arn
419-
420-
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
421-
client_kwargs["session_name"] = session_name
422-
423-
if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
424-
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False)
425-
426412
return S3FileSystem(**client_kwargs)
427413

428414
def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:

0 commit comments

Comments
 (0)