Skip to content

Commit 2864ab7

Browse files
rcjverhoefclaude
andcommitted
feat: support storage-credentials in REST catalog LoadTableResult
The Iceberg REST spec's LoadTableResult includes a storage-credentials field for vended credentials (prefix-scoped temporary STS tokens). PyIceberg was only reading the config field and silently dropping storage-credentials, so vended credentials never reached the FileIO. Per the spec: "Clients must first check whether the respective credentials exist in the storage-credentials field before checking the config for credentials." This adds: - storage_credentials field to TableResponse - Longest-prefix credential resolution (mirroring Java's S3FileIO) - Merging resolved credentials into FileIO with highest precedence Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7d4a8ef commit 2864ab7

File tree

2 files changed

+118
-2
lines changed

2 files changed

+118
-2
lines changed

pyiceberg/catalog/rest/__init__.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
PlanSubmitted,
4141
PlanTableScanRequest,
4242
ScanTasks,
43+
StorageCredential,
4344
)
4445
from pyiceberg.exceptions import (
4546
AuthorizationExpiredError,
@@ -256,6 +257,7 @@ class TableResponse(IcebergBaseModel):
256257
metadata_location: str | None = Field(alias="metadata-location", default=None)
257258
metadata: TableMetadata
258259
config: Properties = Field(default_factory=dict)
260+
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list)
259261

260262

261263
class CreateTableRequest(IcebergBaseModel):
@@ -391,6 +393,26 @@ def _create_session(self) -> Session:
391393

392394
return session
393395

396+
@staticmethod
397+
def _resolve_storage_credentials(storage_credentials: list[StorageCredential], location: str | None) -> Properties:
398+
"""Resolve the best-matching storage credential by longest prefix match.
399+
400+
Mirrors the Java implementation in S3FileIO.clientForStoragePath() which iterates
401+
over storage credential prefixes and selects the one with the longest match.
402+
403+
See: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
404+
"""
405+
if not storage_credentials or not location:
406+
return {}
407+
408+
best_match: StorageCredential | None = None
409+
for cred in storage_credentials:
410+
if location.startswith(cred.prefix):
411+
if best_match is None or len(cred.prefix) > len(best_match.prefix):
412+
best_match = cred
413+
414+
return best_match.config if best_match else {}
415+
394416
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
395417
merged_properties = {**self.properties, **properties}
396418
if self._auth_manager:
@@ -729,24 +751,34 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin
729751
session.mount(self.uri, SigV4Adapter(**self.properties))
730752

731753
def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
754+
# Per Iceberg spec: storage-credentials take precedence over config
755+
credential_config = self._resolve_storage_credentials(
756+
table_response.storage_credentials, table_response.metadata_location
757+
)
732758
return Table(
733759
identifier=identifier_tuple,
734760
metadata_location=table_response.metadata_location, # type: ignore
735761
metadata=table_response.metadata,
736762
io=self._load_file_io(
737-
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
763+
{**table_response.metadata.properties, **table_response.config, **credential_config},
764+
table_response.metadata_location,
738765
),
739766
catalog=self,
740767
config=table_response.config,
741768
)
742769

743770
def _response_to_staged_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> StagedTable:
771+
# Per Iceberg spec: storage-credentials take precedence over config
772+
credential_config = self._resolve_storage_credentials(
773+
table_response.storage_credentials, table_response.metadata_location
774+
)
744775
return StagedTable(
745776
identifier=identifier_tuple,
746777
metadata_location=table_response.metadata_location, # type: ignore
747778
metadata=table_response.metadata,
748779
io=self._load_file_io(
749-
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
780+
{**table_response.metadata.properties, **table_response.config, **credential_config},
781+
table_response.metadata_location,
750782
),
751783
catalog=self,
752784
)

tests/catalog/test_rest.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2351,3 +2351,87 @@ def test_table_uuid_check_on_refresh(rest_mock: Mocker, example_table_metadata_v
23512351
assert "Table UUID does not match" in str(exc_info.value)
23522352
assert f"current={original_uuid}" in str(exc_info.value)
23532353
assert f"refreshed={different_uuid}" in str(exc_info.value)
2354+
2355+
2356+
def test_resolve_storage_credentials_longest_prefix_wins() -> None:
2357+
from pyiceberg.catalog.rest.scan_planning import StorageCredential
2358+
2359+
credentials = [
2360+
StorageCredential(prefix="s3://warehouse/", config={"s3.access-key-id": "short-prefix-key"}),
2361+
StorageCredential(prefix="s3://warehouse/database/table", config={"s3.access-key-id": "long-prefix-key"}),
2362+
]
2363+
result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json")
2364+
assert result == {"s3.access-key-id": "long-prefix-key"}
2365+
2366+
2367+
def test_resolve_storage_credentials_no_match() -> None:
2368+
from pyiceberg.catalog.rest.scan_planning import StorageCredential
2369+
2370+
credentials = [
2371+
StorageCredential(prefix="s3://other-bucket/", config={"s3.access-key-id": "no-match"}),
2372+
]
2373+
result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json")
2374+
assert result == {}
2375+
2376+
2377+
def test_resolve_storage_credentials_empty() -> None:
2378+
assert RestCatalog._resolve_storage_credentials([], "s3://warehouse/foo") == {}
2379+
assert RestCatalog._resolve_storage_credentials([], None) == {}
2380+
2381+
2382+
def test_load_table_with_storage_credentials(
2383+
rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
2384+
) -> None:
2385+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
2386+
rest_mock.get(
2387+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2388+
json={
2389+
"metadata-location": metadata_location,
2390+
"metadata": example_table_metadata_with_snapshot_v1,
2391+
"config": {
2392+
"s3.access-key-id": "from-config",
2393+
"s3.secret-access-key": "from-config-secret",
2394+
},
2395+
"storage-credentials": [
2396+
{
2397+
"prefix": "s3://warehouse/database/table",
2398+
"config": {
2399+
"s3.access-key-id": "vended-key",
2400+
"s3.secret-access-key": "vended-secret",
2401+
"s3.session-token": "vended-token",
2402+
},
2403+
}
2404+
],
2405+
},
2406+
status_code=200,
2407+
request_headers=TEST_HEADERS,
2408+
)
2409+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
2410+
table = catalog.load_table(("fokko", "table"))
2411+
2412+
# Storage credentials should override config values
2413+
assert table.io.properties["s3.access-key-id"] == "vended-key"
2414+
assert table.io.properties["s3.secret-access-key"] == "vended-secret"
2415+
assert table.io.properties["s3.session-token"] == "vended-token"
2416+
2417+
2418+
def test_load_table_without_storage_credentials(
2419+
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
2420+
) -> None:
2421+
rest_mock.get(
2422+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2423+
json=example_table_metadata_with_snapshot_v1_rest_json,
2424+
status_code=200,
2425+
request_headers=TEST_HEADERS,
2426+
)
2427+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
2428+
actual = catalog.load_table(("fokko", "table"))
2429+
expected = Table(
2430+
identifier=("fokko", "table"),
2431+
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
2432+
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
2433+
io=load_file_io(),
2434+
catalog=catalog,
2435+
)
2436+
assert actual.metadata.model_dump() == expected.metadata.model_dump()
2437+
assert actual == expected

0 commit comments

Comments
 (0)