196196ICEBERG_SCHEMA = b"iceberg.schema"
197197# The PARQUET: in front means that it is Parquet specific, in this case the field_id
198198PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
199+ # ORC stores IDs as string metadata
200+ ORC_FIELD_ID_KEY = b"iceberg.id"
199201PYARROW_FIELD_DOC_KEY = b"doc"
200202LIST_ELEMENT_NAME = "element"
201203MAP_KEY_NAME = "key"
@@ -388,14 +390,28 @@ def __init__(self, properties: Properties = EMPTY_DICT):
388390
389391 @staticmethod
390392 def parse_location (location : str ) -> Tuple [str , str , str ]:
391- """Return the path without the scheme."""
393+ """Return (scheme, netloc, path) for the given location.
394+ Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC
395+ if scheme/netloc are missing.
396+ """
392397 uri = urlparse (location )
393- if not uri .scheme :
394- return "file" , uri .netloc , os .path .abspath (location )
395- elif uri .scheme in ("hdfs" , "viewfs" ):
396- return uri .scheme , uri .netloc , uri .path
398+
399+ # Load defaults from environment
400+ default_scheme = os .getenv ("DEFAULT_SCHEME" , "file" )
401+ default_netloc = os .getenv ("DEFAULT_NETLOC" , "" )
402+
403+ # Apply logic
404+ scheme = uri .scheme or default_scheme
405+ netloc = uri .netloc or default_netloc
406+
407+ if scheme in ("hdfs" , "viewfs" ):
408+ return scheme , netloc , uri .path
397409 else :
398- return uri .scheme , uri .netloc , f"{ uri .netloc } { uri .path } "
410+ # For non-HDFS URIs, include netloc in the path if present
411+ path = uri .path if uri .scheme else os .path .abspath (location )
412+ if netloc and not path .startswith (netloc ):
413+ path = f"{ netloc } { path } "
414+ return scheme , netloc , path
399415
400416 def _initialize_fs (self , scheme : str , netloc : Optional [str ] = None ) -> FileSystem :
401417 """Initialize FileSystem for different scheme."""
@@ -575,7 +591,7 @@ def _initialize_gcs_fs(self) -> FileSystem:
575591 def _initialize_local_fs (self ) -> FileSystem :
576592 return PyArrowLocalFileSystem ()
577593
578- def new_input (self , location : str ) -> PyArrowFile :
594+ def new_input (self , location : str , fs : Optional [ FileIO ] = None ) -> PyArrowFile :
579595 """Get a PyArrowFile instance to read bytes from the file at the given location.
580596
581597 Args:
@@ -585,8 +601,11 @@ def new_input(self, location: str) -> PyArrowFile:
585601 PyArrowFile: A PyArrowFile instance for the given location.
586602 """
587603 scheme , netloc , path = self .parse_location (location )
604+ logger .warning (f"Scheme: { scheme } , Netloc: { netloc } , Path: { path } " )
605+ if not fs :
606+ fs = self .fs_by_scheme (scheme , netloc )
588607 return PyArrowFile (
589- fs = self . fs_by_scheme ( scheme , netloc ) ,
608+ fs = fs ,
590609 location = location ,
591610 path = path ,
592611 buffer_size = int (self .properties .get (BUFFER_SIZE , ONE_MEGABYTE )),
@@ -1022,7 +1041,11 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10221041def pyarrow_to_schema (
10231042 schema : pa .Schema , name_mapping : Optional [NameMapping ] = None , downcast_ns_timestamp_to_us : bool = False
10241043) -> Schema :
1025- has_ids = visit_pyarrow (schema , _HasIds ())
1044+ logger .warning (f"schema { schema } " )
1045+ hids = _HasIds ()
1046+ logger .warning ("hasIds" )
1047+ has_ids = visit_pyarrow (schema , hids )
1048+ logger .warning (f"has_ids is { has_ids } , name_mapping is { name_mapping } " )
10261049 if has_ids :
10271050 return visit_pyarrow (schema , _ConvertToIceberg (downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ))
10281051 elif name_mapping is not None :
@@ -1179,11 +1202,22 @@ def primitive(self, primitive: pa.DataType) -> T:
11791202
11801203
11811204def _get_field_id (field : pa .Field ) -> Optional [int ]:
1182- return (
1183- int (field_id_str .decode ())
1184- if (field .metadata and (field_id_str := field .metadata .get (PYARROW_PARQUET_FIELD_ID_KEY )))
1185- else None
1186- )
1205+ """Return the Iceberg field ID from Parquet or ORC metadata if available."""
1206+ if not field .metadata :
1207+ return None
1208+
1209+ # Try Parquet field ID first
1210+ field_id_bytes = field .metadata .get (PYARROW_PARQUET_FIELD_ID_KEY )
1211+ if field_id_bytes :
1212+ return int (field_id_bytes .decode ())
1213+
1214+ # Fallback: try ORC field ID
1215+ field_id_bytes = field .metadata .get (ORC_FIELD_ID_KEY )
1216+ if field_id_bytes :
1217+ return int (field_id_bytes .decode ())
1218+
1219+ return None
1220+
11871221
11881222
11891223class _HasIds (PyArrowSchemaVisitor [bool ]):
@@ -1434,6 +1468,7 @@ def _task_to_record_batches(
14341468 name_mapping : Optional [NameMapping ] = None ,
14351469 partition_spec : Optional [PartitionSpec ] = None ,
14361470) -> Iterator [pa .RecordBatch ]:
1471+ logger .warning (f"file format is { task .file .file_format } " )
14371472 if task .file .file_format == FileFormat .PARQUET :
14381473 arrow_format = ds .ParquetFileFormat (pre_buffer = True , buffer_size = (ONE_MEGABYTE * 8 ))
14391474 elif task .file .file_format == FileFormat .ORC :
@@ -1443,6 +1478,7 @@ def _task_to_record_batches(
14431478 with io .new_input (task .file .file_path ).open () as fin :
14441479 fragment = arrow_format .make_fragment (fin )
14451480 physical_schema = fragment .physical_schema
1481+ logger .warning (f"formats: filepath { task .file .file_path } , fragment { fragment } , physical_schema { physical_schema } " )
14461482 # In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema
14471483 # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
14481484 # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on
0 commit comments