From 140cee947d14b6ecbf0339b6f815ff6b392fb2a4 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Wed, 13 Aug 2025 18:35:22 +0000 Subject: [PATCH 01/14] Improve _adls readability --- pyiceberg/io/fsspec.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index d075765ed1..26f048d12e 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -194,11 +194,15 @@ def _gs(properties: Properties) -> AbstractFileSystem: def _adls(properties: Properties) -> AbstractFileSystem: from adlfs import AzureBlobFileSystem - for key, sas_token in { - key.replace(f"{ADLS_SAS_TOKEN}.", ""): value for key, value in properties.items() if key.startswith(ADLS_SAS_TOKEN) - }.items(): + sas_tokens = {} + for key, value in properties.items(): + if key.startswith(ADLS_SAS_TOKEN): + clean_key = key.replace(f"{ADLS_SAS_TOKEN}.", "") + sas_tokens[clean_key] = value + + for account, sas_token in sas_tokens.items(): if ADLS_ACCOUNT_NAME not in properties: - properties[ADLS_ACCOUNT_NAME] = key.split(".")[0] + properties[ADLS_ACCOUNT_NAME] = account.split(".")[0] if ADLS_SAS_TOKEN not in properties: properties[ADLS_SAS_TOKEN] = sas_token From 572fd380bfbc0f800a1fbc62795425e56df42f7f Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Wed, 13 Aug 2025 21:04:04 +0000 Subject: [PATCH 02/14] Starting to work on caching fix --- pyiceberg/io/fsspec.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 26f048d12e..95fcc35462 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -344,7 +344,7 @@ class FsspecFileIO(FileIO): def __init__(self, properties: Properties): self._scheme_to_fs = {} self._scheme_to_fs.update(SCHEME_TO_FS) - self.get_fs: Callable[[str], AbstractFileSystem] = lru_cache(self._get_fs) + self.get_fs: Callable[[str, str], AbstractFileSystem] = lru_cache(self._get_fs) super().__init__(properties=properties) def new_input(self, location: str) -> FsspecInputFile: @@ -357,7 +357,7 @@ def new_input(self, location: str) -> FsspecInputFile: FsspecInputFile: An FsspecInputFile instance for the given location. """ uri = urlparse(location) - fs = self.get_fs(uri.scheme) + fs = fs = self.get_fs(uri.scheme, uri.netloc) return FsspecInputFile(location=location, fs=fs) def new_output(self, location: str) -> FsspecOutputFile: @@ -370,7 +370,7 @@ def new_output(self, location: str) -> FsspecOutputFile: FsspecOutputFile: An FsspecOutputFile instance for the given location. """ uri = urlparse(location) - fs = self.get_fs(uri.scheme) + fs = fs = self.get_fs(uri.scheme, uri.netloc) return FsspecOutputFile(location=location, fs=fs) def delete(self, location: Union[str, InputFile, OutputFile]) -> None: @@ -387,14 +387,15 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: str_location = location uri = urlparse(str_location) - fs = self.get_fs(uri.scheme) + fs = fs = self.get_fs(uri.scheme, uri.netloc) fs.rm(str_location) - def _get_fs(self, scheme: str) -> AbstractFileSystem: + def _get_fs(self, scheme: str, netloc: str) -> AbstractFileSystem: """Get a filesystem for a specific scheme.""" if scheme not in self._scheme_to_fs: raise ValueError(f"No registered filesystem for scheme: {scheme}") - return self._scheme_to_fs[scheme](self.properties) + properties = {**self.properties, "netloc": netloc} + return self._scheme_to_fs[scheme](properties) def __getstate__(self) -> Dict[str, Any]: """Create a dictionary of the FsSpecFileIO fields used when pickling.""" From fe1c5c0dd1b577cdb908f2c2e703f7bdb6fb3987 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Wed, 13 Aug 2025 22:03:30 +0000 Subject: [PATCH 03/14] basic netloc parsing --- pyiceberg/io/fsspec.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 95fcc35462..a153e2c49b 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -194,6 +194,13 @@ def _gs(properties: Properties) -> AbstractFileSystem: def _adls(properties: Properties) -> AbstractFileSystem: from adlfs import AzureBlobFileSystem + if ADLS_ACCOUNT_NAME not in properties: + netloc = properties.get("netloc") + if netloc: + # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax + properties[ADLS_ACCOUNT_NAME] = netloc.split("@")[1].split(".")[0] + + # Fixes https://github.com/apache/iceberg-python/issues/1146 sas_tokens = {} for key, value in properties.items(): if key.startswith(ADLS_SAS_TOKEN): From f0d6fdc3c012c53923a85358b00853586ad420b1 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Wed, 13 Aug 2025 22:25:15 +0000 Subject: [PATCH 04/14] cleanup --- pyiceberg/io/fsspec.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index a153e2c49b..6e1d16b9bc 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -194,24 +194,16 @@ def _gs(properties: Properties) -> AbstractFileSystem: def _adls(properties: Properties) -> AbstractFileSystem: from adlfs import AzureBlobFileSystem - if ADLS_ACCOUNT_NAME not in properties: - netloc = properties.get("netloc") - if netloc: - # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax - properties[ADLS_ACCOUNT_NAME] = netloc.split("@")[1].split(".")[0] + # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax + if not properties.get(ADLS_ACCOUNT_NAME) and (netloc := properties.get("netloc")): + properties[ADLS_ACCOUNT_NAME] = netloc.split("@")[1].split(".")[0] # Fixes https://github.com/apache/iceberg-python/issues/1146 - sas_tokens = {} - for key, value in properties.items(): - if key.startswith(ADLS_SAS_TOKEN): - clean_key = key.replace(f"{ADLS_SAS_TOKEN}.", "") - sas_tokens[clean_key] = value - - for account, sas_token in sas_tokens.items(): - if ADLS_ACCOUNT_NAME not in properties: - properties[ADLS_ACCOUNT_NAME] = account.split(".")[0] - if ADLS_SAS_TOKEN not in properties: - properties[ADLS_SAS_TOKEN] = sas_token + if not properties.get(ADLS_SAS_TOKEN): + for key, value in properties.items(): + if key.startswith(ADLS_SAS_TOKEN): + properties[ADLS_SAS_TOKEN] = value + break return AzureBlobFileSystem( connection_string=properties.get(ADLS_CONNECTION_STRING), From d76ef8d98830ecc5e9de83573158a2398e53bfd5 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Wed, 13 Aug 2025 22:41:23 +0000 Subject: [PATCH 05/14] Made netloc optional --- pyiceberg/io/fsspec.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 6e1d16b9bc..dcf9edc637 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -27,6 +27,7 @@ Any, Callable, Dict, + Optional, Union, ) from urllib.parse import urlparse @@ -343,7 +344,7 @@ class FsspecFileIO(FileIO): def __init__(self, properties: Properties): self._scheme_to_fs = {} self._scheme_to_fs.update(SCHEME_TO_FS) - self.get_fs: Callable[[str, str], AbstractFileSystem] = lru_cache(self._get_fs) + self.get_fs: Callable[[str, Optional[str]], AbstractFileSystem] = lru_cache(self._get_fs) super().__init__(properties=properties) def new_input(self, location: str) -> FsspecInputFile: @@ -389,11 +390,12 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: fs = fs = self.get_fs(uri.scheme, uri.netloc) fs.rm(str_location) - def _get_fs(self, scheme: str, netloc: str) -> AbstractFileSystem: - """Get a filesystem for a specific scheme.""" + def _get_fs(self, scheme: str, netloc: Optional[str] = None) -> AbstractFileSystem: if scheme not in self._scheme_to_fs: raise ValueError(f"No registered filesystem for scheme: {scheme}") - properties = {**self.properties, "netloc": netloc} + properties = self.properties.copy() + if netloc: + properties["netloc"] = netloc return self._scheme_to_fs[scheme](properties) def __getstate__(self) -> Dict[str, Any]: From e930bc7840124c5a23b84e4bfa664749eed3fb0c Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Wed, 13 Aug 2025 22:48:06 +0000 Subject: [PATCH 06/14] typo fix --- pyiceberg/io/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index dcf9edc637..94bdbd6ab5 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -387,7 +387,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: str_location = location uri = urlparse(str_location) - fs = fs = self.get_fs(uri.scheme, uri.netloc) + fs = self.get_fs(uri.scheme, uri.netloc) fs.rm(str_location) def _get_fs(self, scheme: str, netloc: Optional[str] = None) -> AbstractFileSystem: From 647a53356000e9fa8a7296a0f3843bd153836a1e Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Wed, 13 Aug 2025 22:55:57 +0000 Subject: [PATCH 07/14] typo fix --- pyiceberg/io/fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 94bdbd6ab5..d70da1bffb 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -357,7 +357,7 @@ def new_input(self, location: str) -> FsspecInputFile: FsspecInputFile: An FsspecInputFile instance for the given location. """ uri = urlparse(location) - fs = fs = self.get_fs(uri.scheme, uri.netloc) + fs = self.get_fs(uri.scheme, uri.netloc) return FsspecInputFile(location=location, fs=fs) def new_output(self, location: str) -> FsspecOutputFile: @@ -370,7 +370,7 @@ def new_output(self, location: str) -> FsspecOutputFile: FsspecOutputFile: An FsspecOutputFile instance for the given location. """ uri = urlparse(location) - fs = fs = self.get_fs(uri.scheme, uri.netloc) + fs = self.get_fs(uri.scheme, uri.netloc) return FsspecOutputFile(location=location, fs=fs) def delete(self, location: Union[str, InputFile, OutputFile]) -> None: From 309b75d6315d21e1660bc3a0cde2375dbbc5451f Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Thu, 14 Aug 2025 15:28:31 +0000 Subject: [PATCH 08/14] Cleaned up credential vending --- pyiceberg/io/fsspec.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index d70da1bffb..69c2b4d361 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -196,15 +196,14 @@ def _adls(properties: Properties) -> AbstractFileSystem: from adlfs import AzureBlobFileSystem # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax + account_uri = None if not properties.get(ADLS_ACCOUNT_NAME) and (netloc := properties.get("netloc")): - properties[ADLS_ACCOUNT_NAME] = netloc.split("@")[1].split(".")[0] + account_uri = netloc.split("@")[-1] + properties[ADLS_ACCOUNT_NAME] = account_uri.split(".")[0] # Fixes https://github.com/apache/iceberg-python/issues/1146 - if not properties.get(ADLS_SAS_TOKEN): - for key, value in properties.items(): - if key.startswith(ADLS_SAS_TOKEN): - properties[ADLS_SAS_TOKEN] = value - break + if not properties.get(ADLS_SAS_TOKEN) and account_uri: + properties[ADLS_SAS_TOKEN] = properties.get(f"{ADLS_SAS_TOKEN}.{account_uri}") return AzureBlobFileSystem( connection_string=properties.get(ADLS_CONNECTION_STRING), From c3700da3696de8bcd0b50aae87b526d9585eed70 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Thu, 14 Aug 2025 15:36:29 +0000 Subject: [PATCH 09/14] comment fix --- pyiceberg/io/fsspec.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 69c2b4d361..c51ee64880 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -390,6 +390,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: fs.rm(str_location) def _get_fs(self, scheme: str, netloc: Optional[str] = None) -> AbstractFileSystem: + """Get a filesystem for a specific scheme and netloc.""" if scheme not in self._scheme_to_fs: raise ValueError(f"No registered filesystem for scheme: {scheme}") properties = self.properties.copy() From bb031618e724af6364d40b84dcefc95e009feeb6 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Thu, 14 Aug 2025 19:05:51 +0000 Subject: [PATCH 10/14] More clean up --- pyiceberg/io/fsspec.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index c51ee64880..10cec2ddbe 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -196,14 +196,12 @@ def _adls(properties: Properties) -> AbstractFileSystem: from adlfs import AzureBlobFileSystem # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax - account_uri = None if not properties.get(ADLS_ACCOUNT_NAME) and (netloc := properties.get("netloc")): - account_uri = netloc.split("@")[-1] - properties[ADLS_ACCOUNT_NAME] = account_uri.split(".")[0] + properties[ADLS_ACCOUNT_NAME] = netloc.split("@")[-1].split(".")[0] # Fixes https://github.com/apache/iceberg-python/issues/1146 - if not properties.get(ADLS_SAS_TOKEN) and account_uri: - properties[ADLS_SAS_TOKEN] = properties.get(f"{ADLS_SAS_TOKEN}.{account_uri}") + if properties.get(ADLS_ACCOUNT_NAME) and not properties.get(ADLS_SAS_TOKEN): + properties[ADLS_SAS_TOKEN] = properties.get(f"{ADLS_SAS_TOKEN}.{ADLS_ACCOUNT_NAME}.dfs.core.windows.net") return AzureBlobFileSystem( connection_string=properties.get(ADLS_CONNECTION_STRING), From dabd1b1e039f82b434820fac0d65b934efc44571 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Thu, 14 Aug 2025 19:06:58 +0000 Subject: [PATCH 11/14] Added comment --- pyiceberg/io/fsspec.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 10cec2ddbe..9f73eb696d 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -193,6 +193,7 @@ def _gs(properties: Properties) -> AbstractFileSystem: def _adls(properties: Properties) -> AbstractFileSystem: + # https://fsspec.github.io/adlfs/api/ from adlfs import AzureBlobFileSystem # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax From 41755400db258e63989d66901bb498e6485f1a1f Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Thu, 14 Aug 2025 19:13:28 +0000 Subject: [PATCH 12/14] fix --- pyiceberg/io/fsspec.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 9f73eb696d..946298a413 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -197,12 +197,14 @@ def _adls(properties: Properties) -> AbstractFileSystem: from adlfs import AzureBlobFileSystem # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax + account_uri = None if not properties.get(ADLS_ACCOUNT_NAME) and (netloc := properties.get("netloc")): - properties[ADLS_ACCOUNT_NAME] = netloc.split("@")[-1].split(".")[0] + account_uri = netloc.split("@")[-1] + properties[ADLS_ACCOUNT_NAME] = account_uri.split(".")[0] # Fixes https://github.com/apache/iceberg-python/issues/1146 - if properties.get(ADLS_ACCOUNT_NAME) and not properties.get(ADLS_SAS_TOKEN): - properties[ADLS_SAS_TOKEN] = properties.get(f"{ADLS_SAS_TOKEN}.{ADLS_ACCOUNT_NAME}.dfs.core.windows.net") + if not properties.get(ADLS_SAS_TOKEN) and account_uri: + properties[ADLS_SAS_TOKEN] = properties.get(f"{ADLS_SAS_TOKEN}.{account_uri}") return AzureBlobFileSystem( connection_string=properties.get(ADLS_CONNECTION_STRING), From 9c03f6aef6615d6f8f192bdfaff90c24d9fe75bd Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Thu, 14 Aug 2025 19:27:55 +0000 Subject: [PATCH 13/14] fix --- pyiceberg/io/fsspec.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 946298a413..948c7f500b 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -197,9 +197,12 @@ def _adls(properties: Properties) -> AbstractFileSystem: from adlfs import AzureBlobFileSystem # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax - account_uri = None - if not properties.get(ADLS_ACCOUNT_NAME) and (netloc := properties.get("netloc")): + if netloc := properties.get("netloc"): account_uri = netloc.split("@")[-1] + else: + account_uri = None + + if not properties.get(ADLS_ACCOUNT_NAME) and account_uri: properties[ADLS_ACCOUNT_NAME] = account_uri.split(".")[0] # Fixes https://github.com/apache/iceberg-python/issues/1146 From 8944dd2234bf875accf4f404344969bcf9e619d1 Mon Sep 17 00:00:00 2001 From: Thomas Birchall Date: Thu, 14 Aug 2025 22:43:13 +0000 Subject: [PATCH 14/14] fix --- pyiceberg/io/fsspec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 948c7f500b..3e3288241b 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -193,7 +193,6 @@ def _gs(properties: Properties) -> AbstractFileSystem: def _adls(properties: Properties) -> AbstractFileSystem: - # https://fsspec.github.io/adlfs/api/ from adlfs import AzureBlobFileSystem # https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax