-
Notifications
You must be signed in to change notification settings - Fork 415
parallelize add_files
#1717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parallelize add_files
#1717
Changes from all commits
d5f3fdc
b5bfda5
efddf11
72e1f4b
be04e9c
ce57944
03208ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,10 +16,13 @@ | |
| # under the License. | ||
| # pylint:disable=redefined-outer-name | ||
|
|
||
| import multiprocessing | ||
| import os | ||
| import re | ||
| import threading | ||
| from datetime import date | ||
| from typing import Iterator | ||
| from unittest import mock | ||
|
|
||
| import pyarrow as pa | ||
| import pyarrow.parquet as pq | ||
|
|
@@ -31,9 +34,11 @@ | |
| from pyiceberg.exceptions import NoSuchTableError | ||
| from pyiceberg.io import FileIO | ||
| from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types | ||
| from pyiceberg.manifest import DataFile | ||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.table import Table | ||
| from pyiceberg.table.metadata import TableMetadata | ||
| from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform | ||
| from pyiceberg.types import ( | ||
| BooleanType, | ||
|
|
@@ -229,6 +234,54 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids( | |
| tbl.add_files(file_paths=file_paths) | ||
|
|
||
|
|
||
| @pytest.mark.integration | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Im not sure this test is any different from existing test (but maybe im mistaken), the test verifies that the files are correctly added to the table—by checking that the manifest counts match the expected values—but it does not directly assert that the file additions are processed concurrently.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a better that asserts that files are processed within different threads.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tests in CICD was failing. it should be fixed now
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. really cool way to check that files are processed within different threads. !! |
||
| def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: | ||
| from pyiceberg.io.pyarrow import parquet_file_to_data_file | ||
|
|
||
| real_parquet_file_to_data_file = parquet_file_to_data_file | ||
|
|
||
| lock = threading.Lock() | ||
| unique_threads_seen = set() | ||
| cpu_count = multiprocessing.cpu_count() | ||
|
|
||
| # patch the function _parquet_file_to_data_file to we can track how many unique thread IDs | ||
| # it was executed from | ||
| with mock.patch("pyiceberg.io.pyarrow.parquet_file_to_data_file") as patch_func: | ||
|
|
||
| def mock_parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile: | ||
| lock.acquire() | ||
| thread_id = threading.get_ident() # the current thread ID | ||
| unique_threads_seen.add(thread_id) | ||
| lock.release() | ||
| return real_parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path) | ||
|
|
||
| patch_func.side_effect = mock_parquet_file_to_data_file | ||
|
|
||
| identifier = f"default.unpartitioned_table_schema_updates_v{format_version}" | ||
| tbl = _create_table(session_catalog, identifier, format_version) | ||
|
|
||
| file_paths = [ | ||
| f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(cpu_count * 2) | ||
| ] | ||
| # write parquet files | ||
| for file_path in file_paths: | ||
| fo = tbl.io.new_output(file_path) | ||
| with fo.create(overwrite=True) as fos: | ||
| with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: | ||
| writer.write_table(ARROW_TABLE) | ||
|
|
||
| tbl.add_files(file_paths=file_paths) | ||
|
|
||
| # duration creation of threadpool processor, when max_workers is not | ||
| # specified, python will add cpu_count + 4 as the number of threads in the | ||
| # pool in this case | ||
| # https://github.com/python/cpython/blob/e06bebb87e1b33f7251196e1ddb566f528c3fc98/Lib/concurrent/futures/thread.py#L173-L181 | ||
| # we check that we have at least seen the number of threads. we don't | ||
| # specify the workers in the thread pool and we can't check without | ||
| # accessing private attributes of ThreadPoolExecutor | ||
| assert len(unique_threads_seen) >= cpu_count | ||
|
|
||
|
|
||
| @pytest.mark.integration | ||
| def test_add_files_to_unpartitioned_table_with_schema_updates( | ||
| spark: SparkSession, session_catalog: Catalog, format_version: int | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're changing a public API here. We either have to go through the deprecation cycle, or add a new method
parquet_file_to_data_file, that's being used byparquet_file_to_data_filesThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i re-added parquet_files_to_data_files (plural). internally it uses
parquet_file_to_data_file(singular)another alternative is to make
parquet_file_to_data_fileprivate. what do you think?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to make
parquet_file_to_date_fileprivate, then we have to go through the deprecation cycle as well. I think we can leave it in for now.