-
Notifications
You must be signed in to change notification settings - Fork 418
Arrow: Infer the types when reading #1669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
fa9b3ca
0384b4e
6dd9308
2817c61
d6fbca9
fff7414
0d19987
7382112
6526cc2
d9d4fda
dd1c5d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1348,7 +1348,6 @@ def _task_to_record_batches( | |
| positional_deletes: Optional[List[ChunkedArray]], | ||
| case_sensitive: bool, | ||
| name_mapping: Optional[NameMapping] = None, | ||
| use_large_types: bool = True, | ||
| partition_spec: Optional[PartitionSpec] = None, | ||
| ) -> Iterator[pa.RecordBatch]: | ||
| _, _, path = _parse_location(task.file.file_path) | ||
|
|
@@ -1382,9 +1381,7 @@ def _task_to_record_batches( | |
| # https://github.com/apache/arrow/issues/41884 | ||
| # https://github.com/apache/arrow/issues/43183 | ||
| # Would be good to remove this later on | ||
| schema=_pyarrow_schema_ensure_large_types(physical_schema) | ||
| if use_large_types | ||
| else (_pyarrow_schema_ensure_small_types(physical_schema)), | ||
| schema=physical_schema, | ||
| # This will push down the query to Arrow. | ||
| # But in case there are positional deletes, we have to apply them first | ||
| filter=pyarrow_filter if not positional_deletes else None, | ||
|
|
@@ -1419,7 +1416,6 @@ def _task_to_record_batches( | |
| file_project_schema, | ||
| current_batch, | ||
| downcast_ns_timestamp_to_us=True, | ||
| use_large_types=use_large_types, | ||
| ) | ||
|
|
||
| # Inject projected column values if available | ||
|
|
@@ -1504,14 +1500,6 @@ def __init__( | |
| self._case_sensitive = case_sensitive | ||
| self._limit = limit | ||
|
|
||
| @property | ||
| def _use_large_types(self) -> bool: | ||
| """Whether to represent data as large arrow types. | ||
|
|
||
| Defaults to True. | ||
| """ | ||
| return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) | ||
|
|
||
| @property | ||
| def _projected_field_ids(self) -> Set[int]: | ||
| """Set of field IDs that should be projected from the data files.""" | ||
|
|
@@ -1573,11 +1561,16 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table: | |
|
|
||
| tables = [f.result() for f in completed_futures if f.result()] | ||
|
|
||
| arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False) | ||
|
|
||
| if len(tables) < 1: | ||
| return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False)) | ||
| return pa.Table.from_batches([], schema=arrow_schema) | ||
|
|
||
| result = pa.concat_tables(tables, promote_options="permissive") | ||
|
|
||
| if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False): | ||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| result = result.cast(arrow_schema) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left this in, but I would be leaning toward deprecating this, since I don't think we want to trouble the user. I think it should be an implementation detail based on how large the buffers are. |
||
|
|
||
| if self._limit is not None: | ||
| return result.slice(0, self._limit) | ||
|
|
||
|
|
@@ -1602,10 +1595,14 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record | |
| ValueError: When a field type in the file cannot be projected to the schema type | ||
| """ | ||
| deletes_per_file = _read_all_delete_files(self._io, tasks) | ||
| # Always use large types, since we cannot infer it in a streaming fashion, | ||
| # without fetching all the schemas first, which defeats the purpose of streaming | ||
| return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) | ||
|
|
||
| def _record_batches_from_scan_tasks_and_deletes( | ||
| self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]] | ||
| self, | ||
| tasks: Iterable[FileScanTask], | ||
| deletes_per_file: Dict[str, List[ChunkedArray]], | ||
| ) -> Iterator[pa.RecordBatch]: | ||
| total_row_count = 0 | ||
| for task in tasks: | ||
|
|
@@ -1620,7 +1617,6 @@ def _record_batches_from_scan_tasks_and_deletes( | |
| deletes_per_file.get(task.file.file_path), | ||
| self._case_sensitive, | ||
| self._table_metadata.name_mapping(), | ||
| self._use_large_types, | ||
| self._table_metadata.spec(), | ||
| ) | ||
| for batch in batches: | ||
|
|
@@ -1639,13 +1635,12 @@ def _to_requested_schema( | |
| batch: pa.RecordBatch, | ||
| downcast_ns_timestamp_to_us: bool = False, | ||
| include_field_ids: bool = False, | ||
| use_large_types: bool = True, | ||
| ) -> pa.RecordBatch: | ||
| # We could reuse some of these visitors | ||
| struct_array = visit_with_partner( | ||
| requested_schema, | ||
| batch, | ||
| ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types), | ||
| ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids), | ||
| ArrowAccessor(file_schema), | ||
| ) | ||
| return pa.RecordBatch.from_struct_array(struct_array) | ||
|
|
@@ -1655,19 +1650,17 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra | |
| _file_schema: Schema | ||
| _include_field_ids: bool | ||
| _downcast_ns_timestamp_to_us: bool | ||
| _use_large_types: bool | ||
| _use_large_types: Optional[bool] | ||
|
|
||
| def __init__( | ||
| self, | ||
| file_schema: Schema, | ||
| downcast_ns_timestamp_to_us: bool = False, | ||
| include_field_ids: bool = False, | ||
| use_large_types: bool = True, | ||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ) -> None: | ||
| self._file_schema = file_schema | ||
| self._include_field_ids = include_field_ids | ||
| self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us | ||
| self._use_large_types = use_large_types | ||
|
|
||
| def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: | ||
| file_field = self._file_schema.find_field(field.field_id) | ||
|
|
@@ -1677,8 +1670,6 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: | |
| target_schema = schema_to_pyarrow( | ||
| promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids | ||
| ) | ||
| if not self._use_large_types: | ||
| target_schema = _pyarrow_schema_ensure_small_types(target_schema) | ||
| return values.cast(target_schema) | ||
| elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: | ||
| if field.field_type == TimestampType(): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1750,7 +1750,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: | |
| return pa.RecordBatchReader.from_batches( | ||
| target_schema, | ||
| batches, | ||
| ) | ||
| ).cast(target_schema) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will still return |
||
|
|
||
| def to_pandas(self, **kwargs: Any) -> pd.DataFrame: | ||
| """Read a Pandas DataFrame eagerly from this Iceberg table. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -806,7 +806,7 @@ def test_configure_row_group_batch_size(session_catalog: Catalog) -> None: | |||||
|
|
||||||
| @pytest.mark.integration | ||||||
| @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) | ||||||
| def test_table_scan_default_to_large_types(catalog: Catalog) -> None: | ||||||
| def test_table_scan_keep_types(catalog: Catalog) -> None: | ||||||
| identifier = "default.test_table_scan_default_to_large_types" | ||||||
| arrow_table = pa.Table.from_arrays( | ||||||
| [ | ||||||
|
|
@@ -837,10 +837,10 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None: | |||||
|
|
||||||
| expected_schema = pa.schema( | ||||||
| [ | ||||||
| pa.field("string", pa.large_string()), | ||||||
| pa.field("string", pa.string()), | ||||||
| pa.field("string-to-binary", pa.large_binary()), | ||||||
| pa.field("binary", pa.large_binary()), | ||||||
| pa.field("list", pa.large_list(pa.large_string())), | ||||||
| pa.field("binary", pa.binary()), | ||||||
| pa.field("list", pa.list_(pa.string())), | ||||||
|
||||||
| pa.field("list", pa.list_(pa.string())), | |
| pa.field("list", pa.list_(pa.large_string())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good one!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko is this right? type promotion for string->binary results in a large_binary type
iceberg-python/pyiceberg/io/pyarrow.py
Lines 687 to 688 in 7a56ddb
| def visit_binary(self, _: BinaryType) -> pa.DataType: | |
| return pa.large_binary() |
i found these 3 places where _ConvertToArrowSchema converts to large type by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove this comment? seems related to the schema type inference
1b9b884