Skip to content

Commit c531e0f

Browse files
committed
allow passing pyarrow fs props
1 parent 4cac691 commit c531e0f

File tree

3 files changed

+316
-67
lines changed

3 files changed

+316
-67
lines changed

mkdocs/docs/configuration.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,33 @@ You can also set the FileIO explicitly:
105105

106106
For the FileIO there are several configuration options available:
107107

108+
### PyArrow FileSystem Extra Properties
109+
110+
When using `PyArrowFileIO`, any properties with filesystem specific prefixes that are not explicitly handled by PyIceberg will be passed to the underlying PyArrow filesystem implementations.
111+
112+
To use these properties, follow the format:
113+
114+
```txt
115+
{fs_scheme}.{parameter_name}
116+
```
117+
118+
- {fs_scheme} is the filesystem scheme (e.g., s3, hdfs, gcs).
119+
- {parameter_name} must match the name expected by the PyArrow filesystem.
120+
- Property values must use the correct type expected by the underlying filesystem (e.g., string, integer, boolean).
121+
122+
Below are examples of supported prefixes and how the properties are passed through:
123+
124+
<!-- markdown-link-check-disable -->
125+
126+
| Property Prefix | FileSystem | Example | Description |
127+
|-----------------|------------------------------------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------|
128+
| `s3.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `s3.load_frequency=900` | Passed as `load_frequency=900` to S3FileSystem |
129+
| `hdfs.` | [HadoopFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) | `hdfs.replication=3` | Passed as `replication=3` to HadoopFileSystem |
130+
| `gcs.` | [GcsFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.GcsFileSystem.html) | `gcs.project_id=test` | Passed as `project_id='test'` to GcsFileSystem |
131+
| `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.blob_cache-size=1024` | Passed as `blob_cache_size=1024` to AzureFileSystem |
132+
| `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem |
133+
| `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem |
134+
108135
### S3
109136

110137
<!-- markdown-link-check-disable -->

pyiceberg/io/pyarrow.py

Lines changed: 215 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -423,29 +423,67 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
423423
def _initialize_oss_fs(self) -> FileSystem:
424424
from pyarrow.fs import S3FileSystem
425425

426-
client_kwargs: Dict[str, Any] = {
427-
"endpoint_override": self.properties.get(S3_ENDPOINT),
428-
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
429-
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
430-
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
431-
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
432-
"force_virtual_addressing": property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True),
426+
# Mapping from PyIceberg properties to S3FileSystem parameter names
427+
property_mapping = {
428+
S3_ENDPOINT: "endpoint_override",
429+
S3_ACCESS_KEY_ID: "access_key",
430+
AWS_ACCESS_KEY_ID: "access_key",
431+
S3_SECRET_ACCESS_KEY: "secret_key",
432+
AWS_SECRET_ACCESS_KEY: "secret_key",
433+
S3_SESSION_TOKEN: "session_token",
434+
AWS_SESSION_TOKEN: "session_token",
435+
S3_REGION: "region",
436+
AWS_REGION: "region",
437+
S3_PROXY_URI: "proxy_options",
438+
S3_CONNECT_TIMEOUT: "connect_timeout",
439+
S3_REQUEST_TIMEOUT: "request_timeout",
440+
S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing",
433441
}
434442

435-
if proxy_uri := self.properties.get(S3_PROXY_URI):
436-
client_kwargs["proxy_options"] = proxy_uri
443+
# Properties that need special handling
444+
special_properties = {
445+
S3_CONNECT_TIMEOUT,
446+
S3_REQUEST_TIMEOUT,
447+
S3_FORCE_VIRTUAL_ADDRESSING,
448+
S3_ROLE_SESSION_NAME,
449+
S3_RESOLVE_REGION,
450+
AWS_ROLE_SESSION_NAME,
451+
}
437452

438-
if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
439-
client_kwargs["connect_timeout"] = float(connect_timeout)
453+
client_kwargs: Dict[str, Any] = {}
440454

441-
if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
442-
client_kwargs["request_timeout"] = float(request_timeout)
455+
for prop_name, prop_value in self.properties.items():
456+
if prop_value is None:
457+
continue
458+
459+
# Skip properties that need special handling
460+
if prop_name in special_properties:
461+
continue
443462

444-
if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
445-
client_kwargs["role_arn"] = role_arn
463+
# Map known property names to S3FileSystem parameter names
464+
if prop_name in property_mapping:
465+
param_name = property_mapping[prop_name]
466+
client_kwargs[param_name] = prop_value
446467

447-
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
448-
client_kwargs["session_name"] = session_name
468+
# Pass through any other s3.* properties to S3FileSystem
469+
elif prop_name.startswith("s3."):
470+
param_name = prop_name.split(".", 1)[1]
471+
client_kwargs[param_name] = prop_value
472+
473+
# Handle properties that need first value resolution
474+
if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties:
475+
client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID)
476+
477+
if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties:
478+
client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY)
479+
480+
if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties:
481+
client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN)
482+
483+
if S3_REGION in self.properties or AWS_REGION in self.properties:
484+
client_kwargs["region"] = get_first_property_value(self.properties, S3_REGION, AWS_REGION)
485+
486+
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True)
449487

450488
return S3FileSystem(**client_kwargs)
451489

@@ -467,32 +505,79 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
467505
else:
468506
bucket_region = provided_region
469507

470-
client_kwargs: Dict[str, Any] = {
471-
"endpoint_override": self.properties.get(S3_ENDPOINT),
472-
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
473-
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
474-
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
475-
"region": bucket_region,
508+
# Properties that need special handling
509+
property_mapping = {
510+
S3_ENDPOINT: "endpoint_override",
511+
S3_ACCESS_KEY_ID: "access_key",
512+
AWS_ACCESS_KEY_ID: "access_key",
513+
S3_SECRET_ACCESS_KEY: "secret_key",
514+
AWS_SECRET_ACCESS_KEY: "secret_key",
515+
S3_SESSION_TOKEN: "session_token",
516+
AWS_SESSION_TOKEN: "session_token",
517+
S3_PROXY_URI: "proxy_options",
518+
S3_CONNECT_TIMEOUT: "connect_timeout",
519+
S3_REQUEST_TIMEOUT: "request_timeout",
520+
S3_ROLE_ARN: "role_arn",
521+
AWS_ROLE_ARN: "role_arn",
522+
S3_ROLE_SESSION_NAME: "session_name",
523+
AWS_ROLE_SESSION_NAME: "session_name",
524+
S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing",
525+
S3_RETRY_STRATEGY_IMPL: "retry_strategy",
526+
}
527+
528+
# Properties that need special handling and should not be passed directly
529+
special_properties = {
530+
S3_RESOLVE_REGION,
531+
S3_REGION,
532+
AWS_REGION,
533+
S3_RETRY_STRATEGY_IMPL,
534+
S3_CONNECT_TIMEOUT,
535+
S3_REQUEST_TIMEOUT,
476536
}
477537

478-
if proxy_uri := self.properties.get(S3_PROXY_URI):
479-
client_kwargs["proxy_options"] = proxy_uri
538+
client_kwargs: Dict[str, Any] = {}
539+
540+
client_kwargs["region"] = bucket_region
541+
for prop_name, prop_value in self.properties.items():
542+
if prop_value is None:
543+
continue
544+
545+
# Skip properties that need special handling
546+
if prop_name in special_properties:
547+
continue
548+
549+
if prop_name in property_mapping:
550+
param_name = property_mapping[prop_name]
551+
client_kwargs[param_name] = prop_value
552+
553+
# Pass through any other s3.* properties that might be used by S3FileSystem
554+
elif prop_name.startswith("s3."):
555+
param_name = prop_name.split(".", 1)[1]
556+
client_kwargs[param_name] = prop_value
557+
558+
# Handle properties that need first value resolution
559+
if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties:
560+
client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID)
561+
562+
if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties:
563+
client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY)
564+
565+
if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties:
566+
client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN)
567+
568+
if S3_ROLE_ARN in self.properties or AWS_ROLE_ARN in self.properties:
569+
client_kwargs["role_arn"] = get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN)
570+
571+
if S3_ROLE_SESSION_NAME in self.properties or AWS_ROLE_SESSION_NAME in self.properties:
572+
client_kwargs["session_name"] = get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME)
480573

481574
if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
482575
client_kwargs["connect_timeout"] = float(connect_timeout)
483576

484577
if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
485578
client_kwargs["request_timeout"] = float(request_timeout)
486579

487-
if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
488-
client_kwargs["role_arn"] = role_arn
489-
490-
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
491-
client_kwargs["session_name"] = session_name
492-
493-
if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
494-
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)
495-
580+
# Handle retry strategy special case
496581
if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
497582
retry_instance := _import_retry_strategy(retry_strategy_impl)
498583
):
@@ -512,59 +597,111 @@ def _initialize_azure_fs(self) -> FileSystem:
512597

513598
from pyarrow.fs import AzureFileSystem
514599

515-
client_kwargs: Dict[str, str] = {}
516-
517-
if account_name := self.properties.get(ADLS_ACCOUNT_NAME):
518-
client_kwargs["account_name"] = account_name
519-
520-
if account_key := self.properties.get(ADLS_ACCOUNT_KEY):
521-
client_kwargs["account_key"] = account_key
522-
523-
if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY):
524-
client_kwargs["blob_storage_authority"] = blob_storage_authority
600+
# Mapping from PyIceberg properties to AzureFileSystem parameter names
601+
property_mapping = {
602+
ADLS_ACCOUNT_NAME: "account_name",
603+
ADLS_ACCOUNT_KEY: "account_key",
604+
ADLS_BLOB_STORAGE_AUTHORITY: "blob_storage_authority",
605+
ADLS_DFS_STORAGE_AUTHORITY: "dfs_storage_authority",
606+
ADLS_BLOB_STORAGE_SCHEME: "blob_storage_scheme",
607+
ADLS_DFS_STORAGE_SCHEME: "dfs_storage_scheme",
608+
ADLS_SAS_TOKEN: "sas_token",
609+
}
525610

526-
if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY):
527-
client_kwargs["dfs_storage_authority"] = dfs_storage_authority
611+
client_kwargs: Dict[str, Any] = {}
528612

529-
if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME):
530-
client_kwargs["blob_storage_scheme"] = blob_storage_scheme
613+
for prop_name, prop_value in self.properties.items():
614+
if prop_value is None:
615+
continue
531616

532-
if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME):
533-
client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme
617+
# Map known property names to AzureFileSystem parameter names
618+
if prop_name in property_mapping:
619+
param_name = property_mapping[prop_name]
620+
client_kwargs[param_name] = prop_value
534621

535-
if sas_token := self.properties.get(ADLS_SAS_TOKEN):
536-
client_kwargs["sas_token"] = sas_token
622+
# Pass through any other adls.* properties that might be used by AzureFileSystem
623+
elif prop_name.startswith("adls."):
624+
param_name = prop_name.split(".", 1)[1]
625+
client_kwargs[param_name] = prop_value
537626

538627
return AzureFileSystem(**client_kwargs)
539628

540629
def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
541630
from pyarrow.fs import HadoopFileSystem
542631

543-
hdfs_kwargs: Dict[str, Any] = {}
544632
if netloc:
545633
return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
546-
if host := self.properties.get(HDFS_HOST):
547-
hdfs_kwargs["host"] = host
548-
if port := self.properties.get(HDFS_PORT):
549-
# port should be an integer type
550-
hdfs_kwargs["port"] = int(port)
551-
if user := self.properties.get(HDFS_USER):
552-
hdfs_kwargs["user"] = user
553-
if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
554-
hdfs_kwargs["kerb_ticket"] = kerb_ticket
634+
635+
# Mapping from PyIceberg properties to S3FileSystem parameter names
636+
property_mapping = {
637+
HDFS_HOST: "host",
638+
HDFS_PORT: "port",
639+
HDFS_USER: "user",
640+
HDFS_KERB_TICKET: "kerb_ticket",
641+
}
642+
643+
hdfs_kwargs: Dict[str, Any] = {}
644+
645+
for prop_name, prop_value in self.properties.items():
646+
if prop_value is None:
647+
continue
648+
649+
# Map known property names to HadoopFileSystem parameter names
650+
if prop_name in property_mapping:
651+
param_name = property_mapping[prop_name]
652+
653+
if param_name == "port":
654+
hdfs_kwargs[param_name] = int(prop_value)
655+
else:
656+
hdfs_kwargs[param_name] = prop_value
657+
658+
# Pass through any other hdfs.* properties used to be used by HadoopFileSystem
659+
elif prop_name.startswith("hdfs."):
660+
param_name = prop_name.split(".", 1)[1]
661+
hdfs_kwargs[param_name] = prop_value
555662

556663
return HadoopFileSystem(**hdfs_kwargs)
557664

558665
def _initialize_gcs_fs(self) -> FileSystem:
559666
from pyarrow.fs import GcsFileSystem
560667

668+
# Mapping from PyIceberg properties to GcsFileSystem parameter names
669+
property_mapping = {
670+
GCS_TOKEN: "access_token",
671+
GCS_TOKEN_EXPIRES_AT_MS: "credential_token_expiration",
672+
GCS_DEFAULT_LOCATION: "default_bucket_location",
673+
GCS_SERVICE_HOST: "endpoint_override",
674+
}
675+
676+
# Properties that need special handling
677+
special_properties = {
678+
GCS_TOKEN_EXPIRES_AT_MS,
679+
GCS_SERVICE_HOST,
680+
}
681+
561682
gcs_kwargs: Dict[str, Any] = {}
562-
if access_token := self.properties.get(GCS_TOKEN):
563-
gcs_kwargs["access_token"] = access_token
683+
684+
for prop_name, prop_value in self.properties.items():
685+
if prop_value is None:
686+
continue
687+
688+
# Skip properties that need special handling
689+
if prop_name in special_properties:
690+
continue
691+
692+
# Map known property names to GcsFileSystem parameter names
693+
if prop_name in property_mapping:
694+
param_name = property_mapping[prop_name]
695+
gcs_kwargs[param_name] = prop_value
696+
697+
# Pass through any other gcs.* properties that might be used by GcsFileSystem
698+
elif prop_name.startswith("gcs."):
699+
param_name = prop_name.split(".", 1)[1]
700+
gcs_kwargs[param_name] = prop_value
701+
564702
if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
565703
gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
566-
if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
567-
gcs_kwargs["default_bucket_location"] = bucket_location
704+
568705
if endpoint := self.properties.get(GCS_SERVICE_HOST):
569706
url_parts = urlparse(endpoint)
570707
gcs_kwargs["scheme"] = url_parts.scheme
@@ -573,7 +710,18 @@ def _initialize_gcs_fs(self) -> FileSystem:
573710
return GcsFileSystem(**gcs_kwargs)
574711

575712
def _initialize_local_fs(self) -> FileSystem:
576-
return PyArrowLocalFileSystem()
713+
local_kwargs: Dict[str, Any] = {}
714+
715+
for prop_name, prop_value in self.properties.items():
716+
if prop_value is None:
717+
continue
718+
719+
# Pass through any other file.* properties that might be used by PyArrowLocalFileSystem
720+
elif prop_name.startswith("file."):
721+
param_name = prop_name.split(".", 1)[1]
722+
local_kwargs[param_name] = prop_value
723+
724+
return PyArrowLocalFileSystem(**local_kwargs)
577725

578726
def new_input(self, location: str) -> PyArrowFile:
579727
"""Get a PyArrowFile instance to read bytes from the file at the given location.

0 commit comments

Comments
 (0)