Skip to content

Commit 2f56e2f

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-python into fd-use-iceberg-rust
2 parents 059e067 + 7a56ddb commit 2f56e2f

File tree

6 files changed

+72
-69
lines changed

6 files changed

+72
-69
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@
175175
from pyiceberg.utils.concurrent import ExecutorFactory
176176
from pyiceberg.utils.config import Config
177177
from pyiceberg.utils.datetime import millis_to_datetime
178+
from pyiceberg.utils.deprecated import deprecation_message
178179
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
179180
from pyiceberg.utils.singleton import Singleton
180181
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
@@ -1385,7 +1386,6 @@ def _task_to_record_batches(
13851386
positional_deletes: Optional[List[ChunkedArray]],
13861387
case_sensitive: bool,
13871388
name_mapping: Optional[NameMapping] = None,
1388-
use_large_types: bool = True,
13891389
partition_spec: Optional[PartitionSpec] = None,
13901390
) -> Iterator[pa.RecordBatch]:
13911391
_, _, path = _parse_location(task.file.file_path)
@@ -1415,13 +1415,7 @@ def _task_to_record_batches(
14151415

14161416
fragment_scanner = ds.Scanner.from_fragment(
14171417
fragment=fragment,
1418-
# With PyArrow 16.0.0 there is an issue with casting record-batches:
1419-
# https://github.com/apache/arrow/issues/41884
1420-
# https://github.com/apache/arrow/issues/43183
1421-
# Would be good to remove this later on
1422-
schema=_pyarrow_schema_ensure_large_types(physical_schema)
1423-
if use_large_types
1424-
else (_pyarrow_schema_ensure_small_types(physical_schema)),
1418+
schema=physical_schema,
14251419
# This will push down the query to Arrow.
14261420
# But in case there are positional deletes, we have to apply them first
14271421
filter=pyarrow_filter if not positional_deletes else None,
@@ -1456,7 +1450,6 @@ def _task_to_record_batches(
14561450
file_project_schema,
14571451
current_batch,
14581452
downcast_ns_timestamp_to_us=True,
1459-
use_large_types=use_large_types,
14601453
)
14611454

14621455
# Inject projected column values if available
@@ -1542,14 +1535,6 @@ def __init__(
15421535
self._case_sensitive = case_sensitive
15431536
self._limit = limit
15441537

1545-
@property
1546-
def _use_large_types(self) -> bool:
1547-
"""Whether to represent data as large arrow types.
1548-
1549-
Defaults to True.
1550-
"""
1551-
return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1552-
15531538
@property
15541539
def _projected_field_ids(self) -> Set[int]:
15551540
"""Set of field IDs that should be projected from the data files."""
@@ -1611,11 +1596,21 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table:
16111596

16121597
tables = [f.result() for f in completed_futures if f.result()]
16131598

1599+
arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False)
1600+
16141601
if len(tables) < 1:
1615-
return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
1602+
return pa.Table.from_batches([], schema=arrow_schema)
16161603

16171604
result = pa.concat_tables(tables, promote_options="permissive")
16181605

1606+
if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
1607+
deprecation_message(
1608+
deprecated_in="0.10.0",
1609+
removed_in="0.11.0",
1610+
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
1611+
)
1612+
result = result.cast(arrow_schema)
1613+
16191614
if self._limit is not None:
16201615
return result.slice(0, self._limit)
16211616

@@ -1658,7 +1653,6 @@ def _record_batches_from_scan_tasks_and_deletes(
16581653
deletes_per_file.get(task.file.file_path),
16591654
self._case_sensitive,
16601655
self._table_metadata.name_mapping(),
1661-
self._use_large_types,
16621656
self._table_metadata.spec(),
16631657
)
16641658
for batch in batches:
@@ -1677,13 +1671,12 @@ def _to_requested_schema(
16771671
batch: pa.RecordBatch,
16781672
downcast_ns_timestamp_to_us: bool = False,
16791673
include_field_ids: bool = False,
1680-
use_large_types: bool = True,
16811674
) -> pa.RecordBatch:
16821675
# We could reuse some of these visitors
16831676
struct_array = visit_with_partner(
16841677
requested_schema,
16851678
batch,
1686-
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types),
1679+
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids),
16871680
ArrowAccessor(file_schema),
16881681
)
16891682
return pa.RecordBatch.from_struct_array(struct_array)
@@ -1693,20 +1686,27 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
16931686
_file_schema: Schema
16941687
_include_field_ids: bool
16951688
_downcast_ns_timestamp_to_us: bool
1696-
_use_large_types: bool
1689+
_use_large_types: Optional[bool]
16971690

16981691
def __init__(
16991692
self,
17001693
file_schema: Schema,
17011694
downcast_ns_timestamp_to_us: bool = False,
17021695
include_field_ids: bool = False,
1703-
use_large_types: bool = True,
1696+
use_large_types: Optional[bool] = None,
17041697
) -> None:
17051698
self._file_schema = file_schema
17061699
self._include_field_ids = include_field_ids
17071700
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
17081701
self._use_large_types = use_large_types
17091702

1703+
if use_large_types is not None:
1704+
deprecation_message(
1705+
deprecated_in="0.10.0",
1706+
removed_in="0.11.0",
1707+
help_message="Argument `use_large_types` will be removed from ArrowProjectionVisitor",
1708+
)
1709+
17101710
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
17111711
file_field = self._file_schema.find_field(field.field_id)
17121712

@@ -1715,7 +1715,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
17151715
target_schema = schema_to_pyarrow(
17161716
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
17171717
)
1718-
if not self._use_large_types:
1718+
if self._use_large_types is False:
17191719
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
17201720
return values.cast(target_schema)
17211721
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1785,7 +1785,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
17851785
return pa.RecordBatchReader.from_batches(
17861786
target_schema,
17871787
batches,
1788-
)
1788+
).cast(target_schema)
17891789

17901790
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
17911791
"""Read a Pandas DataFrame eagerly from this Iceberg table.

tests/integration/test_add_files.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from pyiceberg.catalog import Catalog
3434
from pyiceberg.exceptions import NoSuchTableError
3535
from pyiceberg.io import FileIO
36-
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
36+
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException
3737
from pyiceberg.manifest import DataFile
3838
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
3939
from pyiceberg.schema import Schema
@@ -588,11 +588,6 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca
588588
pa.field("foo", pa.string(), nullable=False),
589589
]
590590
)
591-
arrow_schema_large = pa.schema(
592-
[
593-
pa.field("foo", pa.large_string(), nullable=False),
594-
]
595-
)
596591

597592
tbl = _create_table(session_catalog, identifier, format_version, schema=iceberg_schema)
598593

@@ -614,27 +609,27 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca
614609
tbl.add_files([file_path])
615610

616611
table_schema = tbl.scan().to_arrow().schema
617-
assert table_schema == arrow_schema_large
612+
assert table_schema == arrow_schema
618613

619614
file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet"
620615
_write_parquet(
621616
tbl.io,
622617
file_path_large,
623-
arrow_schema_large,
618+
arrow_schema,
624619
pa.Table.from_pylist(
625620
[
626621
{
627622
"foo": "normal",
628623
}
629624
],
630-
schema=arrow_schema_large,
625+
schema=arrow_schema,
631626
),
632627
)
633628

634629
tbl.add_files([file_path_large])
635630

636631
table_schema = tbl.scan().to_arrow().schema
637-
assert table_schema == arrow_schema_large
632+
assert table_schema == arrow_schema
638633

639634

640635
@pytest.mark.integration
@@ -748,8 +743,8 @@ def test_add_files_with_valid_upcast(
748743
pa.schema(
749744
(
750745
pa.field("long", pa.int64(), nullable=True),
751-
pa.field("list", pa.large_list(pa.int64()), nullable=False),
752-
pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
746+
pa.field("list", pa.list_(pa.int64()), nullable=False),
747+
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
753748
pa.field("double", pa.float64(), nullable=True),
754749
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
755750
)
@@ -799,7 +794,7 @@ def test_add_files_subset_of_schema(spark: SparkSession, session_catalog: Catalo
799794
"qux": date(2024, 3, 7),
800795
}
801796
],
802-
schema=_pyarrow_schema_ensure_large_types(ARROW_SCHEMA),
797+
schema=ARROW_SCHEMA,
803798
)
804799

805800
lhs = spark.table(f"{identifier}").toPandas()

tests/integration/test_reads.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,16 @@ def test_configure_row_group_batch_size(session_catalog: Catalog) -> None:
831831

832832
@pytest.mark.integration
833833
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
834-
def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
834+
def test_table_scan_keep_types(catalog: Catalog) -> None:
835+
expected_schema = pa.schema(
836+
[
837+
pa.field("string", pa.string()),
838+
pa.field("string-to-binary", pa.large_binary()),
839+
pa.field("binary", pa.binary()),
840+
pa.field("list", pa.list_(pa.large_string())),
841+
]
842+
)
843+
835844
identifier = "default.test_table_scan_default_to_large_types"
836845
arrow_table = pa.Table.from_arrays(
837846
[
@@ -840,7 +849,7 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
840849
pa.array([b"a", b"b", b"c"]),
841850
pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
842851
],
843-
names=["string", "string-to-binary", "binary", "list"],
852+
schema=expected_schema,
844853
)
845854

846855
try:
@@ -859,15 +868,6 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
859868
update_schema.update_column("string-to-binary", BinaryType())
860869

861870
result_table = tbl.scan().to_arrow()
862-
863-
expected_schema = pa.schema(
864-
[
865-
pa.field("string", pa.large_string()),
866-
pa.field("string-to-binary", pa.large_binary()),
867-
pa.field("binary", pa.large_binary()),
868-
pa.field("list", pa.large_list(pa.large_string())),
869-
]
870-
)
871871
assert result_table.schema.equals(expected_schema)
872872

873873

@@ -906,7 +906,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
906906
expected_schema = pa.schema(
907907
[
908908
pa.field("string", pa.string()),
909-
pa.field("string-to-binary", pa.binary()),
909+
pa.field("string-to-binary", pa.large_binary()),
910910
pa.field("binary", pa.binary()),
911911
pa.field("list", pa.list_(pa.string())),
912912
]

tests/integration/test_writes/test_writes.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from urllib.parse import urlparse
2626

2727
import pandas as pd
28+
import pandas.testing
2829
import pyarrow as pa
2930
import pyarrow.compute as pc
3031
import pyarrow.parquet as pq
@@ -401,7 +402,14 @@ def test_python_writes_dictionary_encoded_column_with_spark_reads(
401402
tbl.append(arrow_table)
402403
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
403404
pyiceberg_df = tbl.scan().to_pandas()
404-
assert spark_df.equals(pyiceberg_df)
405+
406+
# We're just interested in the content, PyIceberg actually makes a nice Categorical out of it:
407+
# E AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="name") are different
408+
# E
409+
# E Attribute "dtype" are different
410+
# E [left]: object
411+
# E [right]: CategoricalDtype(categories=['AB', 'CD', 'EF'], ordered=False, categories_dtype=object)
412+
pandas.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False, check_categorical=False)
405413

406414

407415
@pytest.mark.integration
@@ -422,7 +430,7 @@ def test_python_writes_with_small_and_large_types_spark_reads(
422430
}
423431
pa_schema = pa.schema(
424432
[
425-
pa.field("foo", pa.large_string()),
433+
pa.field("foo", pa.string()),
426434
pa.field("id", pa.int32()),
427435
pa.field("name", pa.string()),
428436
pa.field(
@@ -432,7 +440,7 @@ def test_python_writes_with_small_and_large_types_spark_reads(
432440
pa.field("street", pa.string()),
433441
pa.field("city", pa.string()),
434442
pa.field("zip", pa.int32()),
435-
pa.field("bar", pa.large_string()),
443+
pa.field("bar", pa.string()),
436444
]
437445
),
438446
),
@@ -448,17 +456,17 @@ def test_python_writes_with_small_and_large_types_spark_reads(
448456
arrow_table_on_read = tbl.scan().to_arrow()
449457
assert arrow_table_on_read.schema == pa.schema(
450458
[
451-
pa.field("foo", pa.large_string()),
459+
pa.field("foo", pa.string()),
452460
pa.field("id", pa.int32()),
453-
pa.field("name", pa.large_string()),
461+
pa.field("name", pa.string()),
454462
pa.field(
455463
"address",
456464
pa.struct(
457465
[
458-
pa.field("street", pa.large_string()),
459-
pa.field("city", pa.large_string()),
466+
pa.field("street", pa.string()),
467+
pa.field("city", pa.string()),
460468
pa.field("zip", pa.int32()),
461-
pa.field("bar", pa.large_string()),
469+
pa.field("bar", pa.string()),
462470
]
463471
),
464472
),
@@ -1164,8 +1172,8 @@ def test_table_write_schema_with_valid_upcast(
11641172
pa.schema(
11651173
(
11661174
pa.field("long", pa.int64(), nullable=True),
1167-
pa.field("list", pa.large_list(pa.int64()), nullable=False),
1168-
pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
1175+
pa.field("list", pa.list_(pa.int64()), nullable=False),
1176+
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
11691177
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
11701178
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
11711179
)

tests/io/test_pyarrow.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,10 +1065,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None:
10651065

10661066
assert (
10671067
repr(result_table.schema)
1068-
== """properties: map<large_string, large_string>
1069-
child 0, entries: struct<key: large_string not null, value: large_string not null> not null
1070-
child 0, key: large_string not null
1071-
child 1, value: large_string not null"""
1068+
== """properties: map<string, string>
1069+
child 0, entries: struct<key: string not null, value: string not null> not null
1070+
child 0, key: string not null
1071+
child 1, value: string not null"""
10721072
)
10731073

10741074

@@ -1181,7 +1181,7 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa
11811181
with transaction.update_snapshot().overwrite() as update:
11821182
update.append_data_file(unpartitioned_file)
11831183

1184-
schema = pa.schema([("other_field", pa.large_string()), ("partition_id", pa.int64())])
1184+
schema = pa.schema([("other_field", pa.string()), ("partition_id", pa.int64())])
11851185
assert table.scan().to_arrow() == pa.table(
11861186
{
11871187
"other_field": ["foo", "bar", "baz"],
@@ -1245,7 +1245,7 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC
12451245
assert (
12461246
str(table.scan().to_arrow())
12471247
== """pyarrow.Table
1248-
field_1: large_string
1248+
field_1: string
12491249
field_2: int64
12501250
field_3: int64
12511251
----
@@ -1470,9 +1470,9 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s
14701470
assert actual.as_py() == expected
14711471
assert (
14721472
repr(result_table.schema)
1473-
== """locations: map<large_string, struct<latitude: double not null, longitude: double not null, altitude: double>>
1474-
child 0, entries: struct<key: large_string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null
1475-
child 0, key: large_string not null
1473+
== """locations: map<string, struct<latitude: double not null, longitude: double not null, altitude: double>>
1474+
child 0, entries: struct<key: string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null
1475+
child 0, key: string not null
14761476
child 1, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null
14771477
child 0, latitude: double not null
14781478
child 1, longitude: double not null

0 commit comments

Comments
 (0)