197197ICEBERG_SCHEMA = b"iceberg.schema"
198198# The PARQUET: in front means that it is Parquet specific, in this case the field_id
199199PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
200+ # ORC stores IDs as string metadata
201+ ORC_FIELD_ID_KEY = b"iceberg.id"
200202PYARROW_FIELD_DOC_KEY = b"doc"
201203LIST_ELEMENT_NAME = "element"
202204MAP_KEY_NAME = "key"
@@ -389,14 +391,28 @@ def __init__(self, properties: Properties = EMPTY_DICT):
389391
390392 @staticmethod
391393 def parse_location (location : str ) -> Tuple [str , str , str ]:
392- """Return the path without the scheme."""
394+ """Return (scheme, netloc, path) for the given location.
395+ Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC
396+ if scheme/netloc are missing.
397+ """
393398 uri = urlparse (location )
394- if not uri .scheme :
395- return "file" , uri .netloc , os .path .abspath (location )
396- elif uri .scheme in ("hdfs" , "viewfs" ):
397- return uri .scheme , uri .netloc , uri .path
399+
400+ # Load defaults from environment
401+ default_scheme = os .getenv ("DEFAULT_SCHEME" , "file" )
402+ default_netloc = os .getenv ("DEFAULT_NETLOC" , "" )
403+
404+ # Apply logic
405+ scheme = uri .scheme or default_scheme
406+ netloc = uri .netloc or default_netloc
407+
408+ if scheme in ("hdfs" , "viewfs" ):
409+ return scheme , netloc , uri .path
398410 else :
399- return uri .scheme , uri .netloc , f"{ uri .netloc } { uri .path } "
411+ # For non-HDFS URIs, include netloc in the path if present
412+ path = uri .path if uri .scheme else os .path .abspath (location )
413+ if netloc and not path .startswith (netloc ):
414+ path = f"{ netloc } { path } "
415+ return scheme , netloc , path
400416
401417 def _initialize_fs (self , scheme : str , netloc : Optional [str ] = None ) -> FileSystem :
402418 """Initialize FileSystem for different scheme."""
@@ -576,7 +592,7 @@ def _initialize_gcs_fs(self) -> FileSystem:
576592 def _initialize_local_fs (self ) -> FileSystem :
577593 return PyArrowLocalFileSystem ()
578594
579- def new_input (self , location : str ) -> PyArrowFile :
595+ def new_input (self , location : str , fs : Optional [ FileIO ] = None ) -> PyArrowFile :
580596 """Get a PyArrowFile instance to read bytes from the file at the given location.
581597
582598 Args:
@@ -586,8 +602,11 @@ def new_input(self, location: str) -> PyArrowFile:
586602 PyArrowFile: A PyArrowFile instance for the given location.
587603 """
588604 scheme , netloc , path = self .parse_location (location )
605+ logger .warning (f"Scheme: { scheme } , Netloc: { netloc } , Path: { path } " )
606+ if not fs :
607+ fs = self .fs_by_scheme (scheme , netloc )
589608 return PyArrowFile (
590- fs = self . fs_by_scheme ( scheme , netloc ) ,
609+ fs = fs ,
591610 location = location ,
592611 path = path ,
593612 buffer_size = int (self .properties .get (BUFFER_SIZE , ONE_MEGABYTE )),
@@ -1023,7 +1042,11 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10231042def pyarrow_to_schema (
10241043 schema : pa .Schema , name_mapping : Optional [NameMapping ] = None , downcast_ns_timestamp_to_us : bool = False
10251044) -> Schema :
1026- has_ids = visit_pyarrow (schema , _HasIds ())
1045+ logger .warning (f"schema { schema } " )
1046+ hids = _HasIds ()
1047+ logger .warning ("hasIds" )
1048+ has_ids = visit_pyarrow (schema , hids )
1049+ logger .warning (f"has_ids is { has_ids } , name_mapping is { name_mapping } " )
10271050 if has_ids :
10281051 return visit_pyarrow (schema , _ConvertToIceberg (downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ))
10291052 elif name_mapping is not None :
@@ -1180,11 +1203,22 @@ def primitive(self, primitive: pa.DataType) -> T:
11801203
11811204
11821205def _get_field_id (field : pa .Field ) -> Optional [int ]:
1183- return (
1184- int (field_id_str .decode ())
1185- if (field .metadata and (field_id_str := field .metadata .get (PYARROW_PARQUET_FIELD_ID_KEY )))
1186- else None
1187- )
1206+ """Return the Iceberg field ID from Parquet or ORC metadata if available."""
1207+ if not field .metadata :
1208+ return None
1209+
1210+ # Try Parquet field ID first
1211+ field_id_bytes = field .metadata .get (PYARROW_PARQUET_FIELD_ID_KEY )
1212+ if field_id_bytes :
1213+ return int (field_id_bytes .decode ())
1214+
1215+ # Fallback: try ORC field ID
1216+ field_id_bytes = field .metadata .get (ORC_FIELD_ID_KEY )
1217+ if field_id_bytes :
1218+ return int (field_id_bytes .decode ())
1219+
1220+ return None
1221+
11881222
11891223
11901224class _HasIds (PyArrowSchemaVisitor [bool ]):
@@ -1453,6 +1487,7 @@ def _task_to_record_batches(
14531487 name_mapping : Optional [NameMapping ] = None ,
14541488 partition_spec : Optional [PartitionSpec ] = None ,
14551489) -> Iterator [pa .RecordBatch ]:
1490+ logger .warning (f"file format is { task .file .file_format } " )
14561491 if task .file .file_format == FileFormat .PARQUET :
14571492 arrow_format = ds .ParquetFileFormat (pre_buffer = True , buffer_size = (ONE_MEGABYTE * 8 ))
14581493 elif task .file .file_format == FileFormat .ORC :
@@ -1462,6 +1497,7 @@ def _task_to_record_batches(
14621497 with io .new_input (task .file .file_path ).open () as fin :
14631498 fragment = arrow_format .make_fragment (fin )
14641499 physical_schema = fragment .physical_schema
1500+ logger .warning (f"formats: filepath { task .file .file_path } , fragment { fragment } , physical_schema { physical_schema } " )
14651501 # In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema
14661502 # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
14671503 # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on
0 commit comments