Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 33 additions & 26 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2466,36 +2466,43 @@ def _check_pyarrow_schema_compatible(

def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
for file_path in file_paths:
input_file = io.new_input(file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)
data_file = parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path)
yield data_file

if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
raise NotImplementedError(
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)
schema = table_metadata.schema()
_check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema())

statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
file_size_in_bytes=len(input_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're changing a public API here. We either have to go through the deprecation cycle, or add a new method parquet_file_to_data_file, that's being used by parquet_file_to_data_files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i re-added parquet_files_to_data_files (plural). internally it uses parquet_file_to_data_file (singular)

another alternative is to make parquet_file_to_data_file private. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to make parquet_file_to_date_file private, then we have to go through the deprecation cycle as well. I think we can leave it in for now.

input_file = io.new_input(file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)

arrow_schema = parquet_metadata.schema.to_arrow_schema()
if visit_pyarrow(arrow_schema, _HasIds()):
raise NotImplementedError(
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)

yield data_file
schema = table_metadata.schema()
_check_pyarrow_schema_compatible(schema, arrow_schema)

statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
file_size_in_bytes=len(input_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)

return data_file


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
Expand Down
7 changes: 5 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1891,6 +1891,9 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
Returns:
An iterable that supplies DataFiles that describe the parquet files.
"""
from pyiceberg.io.pyarrow import parquet_files_to_data_files
from pyiceberg.io.pyarrow import parquet_file_to_data_file

yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))
executor = ExecutorFactory.get_or_create()
futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths]

return [f.result() for f in futures if f.result()]
53 changes: 53 additions & 0 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
# under the License.
# pylint:disable=redefined-outer-name

import multiprocessing
import os
import re
import threading
from datetime import date
from typing import Iterator
from unittest import mock

import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -31,9 +34,11 @@
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.io import FileIO
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
from pyiceberg.manifest import DataFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform
from pyiceberg.types import (
BooleanType,
Expand Down Expand Up @@ -229,6 +234,54 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids(
tbl.add_files(file_paths=file_paths)


@pytest.mark.integration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure this test is any different from existing test (but maybe im mistaken), the test verifies that the files are correctly added to the table—by checking that the manifest counts match the expected values—but it does not directly assert that the file additions are processed concurrently.
we already have tests that add multiple file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a better that asserts that files are processed within different threads.
let me know if this is sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests in CICD was failing. it should be fixed now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really cool way to check that files are processed within different threads. !!

def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
from pyiceberg.io.pyarrow import parquet_file_to_data_file

real_parquet_file_to_data_file = parquet_file_to_data_file

lock = threading.Lock()
unique_threads_seen = set()
cpu_count = multiprocessing.cpu_count()

# patch the function _parquet_file_to_data_file to we can track how many unique thread IDs
# it was executed from
with mock.patch("pyiceberg.io.pyarrow.parquet_file_to_data_file") as patch_func:

def mock_parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile:
lock.acquire()
thread_id = threading.get_ident() # the current thread ID
unique_threads_seen.add(thread_id)
lock.release()
return real_parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path)

patch_func.side_effect = mock_parquet_file_to_data_file

identifier = f"default.unpartitioned_table_schema_updates_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

file_paths = [
f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(cpu_count * 2)
]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
writer.write_table(ARROW_TABLE)

tbl.add_files(file_paths=file_paths)

# duration creation of threadpool processor, when max_workers is not
# specified, python will add cpu_count + 4 as the number of threads in the
# pool in this case
# https://github.com/python/cpython/blob/e06bebb87e1b33f7251196e1ddb566f528c3fc98/Lib/concurrent/futures/thread.py#L173-L181
# we check that we have at least seen the number of threads. we don't
# specify the workers in the thread pool and we can't check without
# accessing private attributes of ThreadPoolExecutor
assert len(unique_threads_seen) >= cpu_count


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_with_schema_updates(
spark: SparkSession, session_catalog: Catalog, format_version: int
Expand Down