Skip to content

Commit f942551

Browse files
authored
Parallelize add_files (#1717)
- `parquet_files_to_data_files` changed to `parquet_file_to_data_files` which processes a single parquet file and returns a `DataFile` - `_parquet_files_to_data_files` uses internal ExecutorFactory resolves #1335
1 parent f186d58 commit f942551

File tree

3 files changed

+91
-28
lines changed

3 files changed

+91
-28
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2466,36 +2466,43 @@ def _check_pyarrow_schema_compatible(
24662466

24672467
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
24682468
for file_path in file_paths:
2469-
input_file = io.new_input(file_path)
2470-
with input_file.open() as input_stream:
2471-
parquet_metadata = pq.read_metadata(input_stream)
2469+
data_file = parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path)
2470+
yield data_file
24722471

2473-
if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
2474-
raise NotImplementedError(
2475-
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
2476-
)
2477-
schema = table_metadata.schema()
2478-
_check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema())
24792472

2480-
statistics = data_file_statistics_from_parquet_metadata(
2481-
parquet_metadata=parquet_metadata,
2482-
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
2483-
parquet_column_mapping=parquet_path_to_id_mapping(schema),
2484-
)
2485-
data_file = DataFile(
2486-
content=DataFileContent.DATA,
2487-
file_path=file_path,
2488-
file_format=FileFormat.PARQUET,
2489-
partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
2490-
file_size_in_bytes=len(input_file),
2491-
sort_order_id=None,
2492-
spec_id=table_metadata.default_spec_id,
2493-
equality_ids=None,
2494-
key_metadata=None,
2495-
**statistics.to_serialized_dict(),
2473+
def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile:
2474+
input_file = io.new_input(file_path)
2475+
with input_file.open() as input_stream:
2476+
parquet_metadata = pq.read_metadata(input_stream)
2477+
2478+
arrow_schema = parquet_metadata.schema.to_arrow_schema()
2479+
if visit_pyarrow(arrow_schema, _HasIds()):
2480+
raise NotImplementedError(
2481+
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
24962482
)
24972483

2498-
yield data_file
2484+
schema = table_metadata.schema()
2485+
_check_pyarrow_schema_compatible(schema, arrow_schema)
2486+
2487+
statistics = data_file_statistics_from_parquet_metadata(
2488+
parquet_metadata=parquet_metadata,
2489+
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
2490+
parquet_column_mapping=parquet_path_to_id_mapping(schema),
2491+
)
2492+
data_file = DataFile(
2493+
content=DataFileContent.DATA,
2494+
file_path=file_path,
2495+
file_format=FileFormat.PARQUET,
2496+
partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
2497+
file_size_in_bytes=len(input_file),
2498+
sort_order_id=None,
2499+
spec_id=table_metadata.default_spec_id,
2500+
equality_ids=None,
2501+
key_metadata=None,
2502+
**statistics.to_serialized_dict(),
2503+
)
2504+
2505+
return data_file
24992506

25002507

25012508
ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"

pyiceberg/table/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,6 +1889,9 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
18891889
Returns:
18901890
An iterable that supplies DataFiles that describe the parquet files.
18911891
"""
1892-
from pyiceberg.io.pyarrow import parquet_files_to_data_files
1892+
from pyiceberg.io.pyarrow import parquet_file_to_data_file
18931893

1894-
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))
1894+
executor = ExecutorFactory.get_or_create()
1895+
futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths]
1896+
1897+
return [f.result() for f in futures if f.result()]

tests/integration/test_add_files.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
1818

19+
import multiprocessing
1920
import os
2021
import re
22+
import threading
2123
from datetime import date
2224
from typing import Iterator
25+
from unittest import mock
2326

2427
import pyarrow as pa
2528
import pyarrow.parquet as pq
@@ -31,9 +34,11 @@
3134
from pyiceberg.exceptions import NoSuchTableError
3235
from pyiceberg.io import FileIO
3336
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
37+
from pyiceberg.manifest import DataFile
3438
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
3539
from pyiceberg.schema import Schema
3640
from pyiceberg.table import Table
41+
from pyiceberg.table.metadata import TableMetadata
3742
from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform
3843
from pyiceberg.types import (
3944
BooleanType,
@@ -229,6 +234,54 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids(
229234
tbl.add_files(file_paths=file_paths)
230235

231236

237+
@pytest.mark.integration
238+
def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
239+
from pyiceberg.io.pyarrow import parquet_file_to_data_file
240+
241+
real_parquet_file_to_data_file = parquet_file_to_data_file
242+
243+
lock = threading.Lock()
244+
unique_threads_seen = set()
245+
cpu_count = multiprocessing.cpu_count()
246+
247+
# patch the function _parquet_file_to_data_file to we can track how many unique thread IDs
248+
# it was executed from
249+
with mock.patch("pyiceberg.io.pyarrow.parquet_file_to_data_file") as patch_func:
250+
251+
def mock_parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile:
252+
lock.acquire()
253+
thread_id = threading.get_ident() # the current thread ID
254+
unique_threads_seen.add(thread_id)
255+
lock.release()
256+
return real_parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path)
257+
258+
patch_func.side_effect = mock_parquet_file_to_data_file
259+
260+
identifier = f"default.unpartitioned_table_schema_updates_v{format_version}"
261+
tbl = _create_table(session_catalog, identifier, format_version)
262+
263+
file_paths = [
264+
f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(cpu_count * 2)
265+
]
266+
# write parquet files
267+
for file_path in file_paths:
268+
fo = tbl.io.new_output(file_path)
269+
with fo.create(overwrite=True) as fos:
270+
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
271+
writer.write_table(ARROW_TABLE)
272+
273+
tbl.add_files(file_paths=file_paths)
274+
275+
# duration creation of threadpool processor, when max_workers is not
276+
# specified, python will add cpu_count + 4 as the number of threads in the
277+
# pool in this case
278+
# https://github.com/python/cpython/blob/e06bebb87e1b33f7251196e1ddb566f528c3fc98/Lib/concurrent/futures/thread.py#L173-L181
279+
# we check that we have at least seen the number of threads. we don't
280+
# specify the workers in the thread pool and we can't check without
281+
# accessing private attributes of ThreadPoolExecutor
282+
assert len(unique_threads_seen) >= cpu_count
283+
284+
232285
@pytest.mark.integration
233286
def test_add_files_to_unpartitioned_table_with_schema_updates(
234287
spark: SparkSession, session_catalog: Catalog, format_version: int

0 commit comments

Comments
 (0)