Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import importlib
import logging
import os
import warnings
from abc import ABC, abstractmethod
from io import SEEK_SET
Expand All @@ -37,7 +36,6 @@
List,
Optional,
Protocol,
Tuple,
Type,
Union,
runtime_checkable,
Expand Down Expand Up @@ -371,14 +369,3 @@ def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] =
raise ModuleNotFoundError(
'Could not load a FileIO, please consider installing one: pip3 install "pyiceberg[pyarrow]", for more options refer to the docs.'
) from e


def _parse_location(location: str) -> Tuple[str, str, str]:
"""Return the path without the scheme."""
uri = urlparse(location)
if not uri.scheme:
return "file", uri.netloc, os.path.abspath(location)
elif uri.scheme in ("hdfs", "viewfs"):
return uri.scheme, uri.netloc, uri.path
else:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
55 changes: 12 additions & 43 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
FileInfo,
FileSystem,
FileType,
FSSpecHandler,
)
from sortedcontainers import SortedList

Expand Down Expand Up @@ -117,7 +116,6 @@
InputStream,
OutputFile,
OutputStream,
_parse_location,
)
from pyiceberg.manifest import (
DataFile,
Expand Down Expand Up @@ -309,9 +307,7 @@ def open(self, seekable: bool = True) -> InputStream:
input_file = self._filesystem.open_input_file(self._path)
else:
input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
except FileNotFoundError:
raise
except PermissionError:
except (FileNotFoundError, PermissionError):
raise
except OSError as e:
if e.errno == 2 or "Path does not exist" in str(e):
Expand Down Expand Up @@ -916,27 +912,20 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi
raise ValueError(f"Unsupported file format: {file_format}")


def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
_, _, path = PyArrowFileIO.parse_location(data_file.file_path)
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)


def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
if data_file.file_format == FileFormat.PARQUET:
delete_fragment = _construct_fragment(
fs,
data_file,
file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE},
)
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
with io.new_input(data_file.file_path).open() as fi:
delete_fragment = _get_file_format(
data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE
).make_fragment(fi)
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
table = table.unify_dictionaries()
return {
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
for file in table.column("file_path").chunks[0].dictionary
}
elif data_file.file_format == FileFormat.PUFFIN:
_, _, path = PyArrowFileIO.parse_location(data_file.file_path)
with fs.open_input_file(path) as fi:
with io.new_input(data_file.file_path).open() as fi:
payload = fi.read()

return PuffinFile(payload).to_vector()
Expand Down Expand Up @@ -1383,7 +1372,7 @@ def _get_column_projection_values(


def _task_to_record_batches(
fs: FileSystem,
io: FileIO,
task: FileScanTask,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
Expand All @@ -1393,9 +1382,8 @@ def _task_to_record_batches(
name_mapping: Optional[NameMapping] = None,
partition_spec: Optional[PartitionSpec] = None,
) -> Iterator[pa.RecordBatch]:
_, _, path = _parse_location(task.file.file_path)
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with fs.open_input_file(path) as fin:
with io.new_input(task.file.file_path).open() as fin:
fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
# In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema
Expand Down Expand Up @@ -1479,7 +1467,7 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st
executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args),
[(_fs_from_file_path(io, delete_file.file_path), delete_file) for delete_file in unique_deletes],
[(io, delete_file) for delete_file in unique_deletes],
)
for delete in deletes_per_files:
for file, arr in delete.items():
Expand All @@ -1491,25 +1479,6 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st
return deletes_per_file


def _fs_from_file_path(io: FileIO, file_path: str) -> FileSystem:
scheme, netloc, _ = _parse_location(file_path)
if isinstance(io, PyArrowFileIO):
return io.fs_by_scheme(scheme, netloc)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO

if isinstance(io, FsspecFileIO):
from pyarrow.fs import PyFileSystem

return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
else:
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
except ModuleNotFoundError as e:
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e


class ArrowScan:
_table_metadata: TableMetadata
_io: FileIO
Expand Down Expand Up @@ -1654,7 +1623,7 @@ def _record_batches_from_scan_tasks_and_deletes(
if self._limit is not None and total_row_count >= self._limit:
break
batches = _task_to_record_batches(
_fs_from_file_path(self._io, task.file.file_path),
self._io,
task,
self._bound_row_filter,
self._projected_schema,
Expand Down
2 changes: 1 addition & 1 deletion tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1539,7 +1539,7 @@ def deletes_file(tmp_path: str, example_task: FileScanTask) -> str:


def test_read_deletes(deletes_file: str, example_task: FileScanTask) -> None:
deletes = _read_deletes(LocalFileSystem(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET))
deletes = _read_deletes(PyArrowFileIO(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET))
assert set(deletes.keys()) == {example_task.file.file_path}
assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]])

Expand Down