|
46 | 46 | from pyiceberg.exceptions import CommitFailedException, NoSuchTableError |
47 | 47 | from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not |
48 | 48 | from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _dataframe_to_data_files |
| 49 | +from pyiceberg.manifest import FileFormat |
49 | 50 | from pyiceberg.partitioning import PartitionField, PartitionSpec |
50 | 51 | from pyiceberg.schema import Schema |
51 | 52 | from pyiceberg.table import TableProperties |
@@ -709,6 +710,81 @@ def test_write_parquet_unsupported_properties( |
709 | 710 | tbl.append(arrow_table_with_null) |
710 | 711 |
|
711 | 712 |
|
| 713 | +@pytest.mark.integration |
| 714 | +@pytest.mark.parametrize("format_version", [1, 2]) |
| 715 | +def test_spark_writes_orc_pyiceberg_reads(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| 716 | + """Test that ORC files written by Spark can be read by PyIceberg.""" |
| 717 | + identifier = f"default.spark_writes_orc_pyiceberg_reads_v{format_version}" |
| 718 | + |
| 719 | + # Create test data |
| 720 | + test_data = [ |
| 721 | + (1, "Alice", 25, True), |
| 722 | + (2, "Bob", 30, False), |
| 723 | + (3, "Charlie", 35, True), |
| 724 | + (4, "David", 28, True), |
| 725 | + (5, "Eve", 32, False), |
| 726 | + ] |
| 727 | + |
| 728 | + # Create Spark DataFrame |
| 729 | + spark_df = spark.createDataFrame(test_data, ["id", "name", "age", "is_active"]) |
| 730 | + |
| 731 | + # Create table with Spark using ORC format |
| 732 | + spark_df.writeTo(identifier).using("iceberg").createOrReplace() |
| 733 | + |
| 734 | + # Configure table to use ORC format |
| 735 | + spark.sql( |
| 736 | + f""" |
| 737 | + ALTER TABLE {identifier} |
| 738 | + SET TBLPROPERTIES ( |
| 739 | + 'write.format.default' = 'orc', |
| 740 | + 'format-version' = '{format_version}' |
| 741 | + ) |
| 742 | + """ |
| 743 | + ) |
| 744 | + |
| 745 | + # Write data with ORC format using Spark |
| 746 | + spark_df.writeTo(identifier).using("iceberg").append() |
| 747 | + |
| 748 | + # Read with PyIceberg - this is the main focus of our validation |
| 749 | + tbl = session_catalog.load_table(identifier) |
| 750 | + pyiceberg_df = tbl.scan().to_pandas() |
| 751 | + |
| 752 | + # Verify PyIceberg results have the expected number of rows |
| 753 | + assert len(pyiceberg_df) == 10 # 5 rows from create + 5 rows from append |
| 754 | + |
| 755 | + # Verify PyIceberg column names |
| 756 | + assert list(pyiceberg_df.columns) == ["id", "name", "age", "is_active"] |
| 757 | + |
| 758 | + # Verify PyIceberg data integrity - check the actual data values |
| 759 | + expected_data = [ |
| 760 | + (1, "Alice", 25, True), |
| 761 | + (2, "Bob", 30, False), |
| 762 | + (3, "Charlie", 35, True), |
| 763 | + (4, "David", 28, True), |
| 764 | + (5, "Eve", 32, False), |
| 765 | + ] |
| 766 | + |
| 767 | + # Verify PyIceberg results contain the expected data (appears twice due to create + append) |
| 768 | + pyiceberg_data = list(zip(pyiceberg_df["id"], pyiceberg_df["name"], pyiceberg_df["age"], pyiceberg_df["is_active"])) |
| 769 | + assert pyiceberg_data == expected_data + expected_data # Data should appear twice |
| 770 | + |
| 771 | + # Verify PyIceberg data types are correct |
| 772 | + assert pyiceberg_df["id"].dtype == "int64" |
| 773 | + assert pyiceberg_df["name"].dtype == "object" # string |
| 774 | + assert pyiceberg_df["age"].dtype == "int64" |
| 775 | + assert pyiceberg_df["is_active"].dtype == "bool" |
| 776 | + |
| 777 | + # Cross-validate with Spark to ensure consistency |
| 778 | + spark_result = spark.sql(f"SELECT * FROM {identifier}").toPandas() |
| 779 | + pandas.testing.assert_frame_equal(spark_result, pyiceberg_df, check_dtype=False) |
| 780 | + |
| 781 | + # Verify the files are actually ORC format |
| 782 | + files = list(tbl.scan().plan_files()) |
| 783 | + assert len(files) > 0 |
| 784 | + for file_task in files: |
| 785 | + assert file_task.file.file_format == FileFormat.ORC |
| 786 | + |
| 787 | + |
712 | 788 | @pytest.mark.integration |
713 | 789 | def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: |
714 | 790 | identifier = "default.arrow_data_files" |
|
0 commit comments