2828import pyarrow
2929import pyarrow as pa
3030import pyarrow .parquet as pq
31+ import pyarrow .orc as orc
3132import pytest
3233from packaging import version
3334from pyarrow .fs import AwsDefaultS3RetryStrategy , FileType , LocalFileSystem , S3FileSystem
@@ -2654,3 +2655,102 @@ def test_retry_strategy_not_found() -> None:
26542655 io = PyArrowFileIO (properties = {S3_RETRY_STRATEGY_IMPL : "pyiceberg.DoesNotExist" })
26552656 with pytest .warns (UserWarning , match = "Could not initialize S3 retry strategy: pyiceberg.DoesNotExist" ):
26562657 io .new_input ("s3://bucket/path/to/file" )
2658+
2659+
2660+ def test_write_and_read_orc (tmp_path ):
2661+ # Create a simple Arrow table
2662+ data = pa .table ({'a' : [1 , 2 , 3 ], 'b' : ['x' , 'y' , 'z' ]})
2663+ orc_path = tmp_path / 'test.orc'
2664+ orc .write_table (data , str (orc_path ))
2665+ # Read it back
2666+ orc_file = orc .ORCFile (str (orc_path ))
2667+ table_read = orc_file .read ()
2668+ assert table_read .equals (data )
2669+
2670+
2671+ def test_orc_file_format_integration (tmp_path ):
2672+ # This test mimics a minimal integration with PyIceberg's FileFormat enum and pyarrow.orc
2673+ from pyiceberg .manifest import FileFormat
2674+ import pyarrow .dataset as ds
2675+ data = pa .table ({'a' : [10 , 20 ], 'b' : ['foo' , 'bar' ]})
2676+ orc_path = tmp_path / 'iceberg.orc'
2677+ orc .write_table (data , str (orc_path ))
2678+ # Use PyArrow dataset API to read as ORC
2679+ dataset = ds .dataset (str (orc_path ), format = ds .OrcFileFormat ())
2680+ table_read = dataset .to_table ()
2681+ assert table_read .equals (data )
2682+
2683+
2684+ def test_iceberg_write_and_read_orc (tmp_path ):
2685+ """
2686+ Integration test: Write and read ORC via Iceberg API.
2687+ To run just this test:
2688+ pytest tests/io/test_pyarrow.py -k test_iceberg_write_and_read_orc
2689+ """
2690+ import pyarrow as pa
2691+ from pyiceberg .schema import Schema , NestedField
2692+ from pyiceberg .types import IntegerType , StringType
2693+ from pyiceberg .manifest import FileFormat , DataFileContent
2694+ from pyiceberg .table .metadata import TableMetadataV2
2695+ from pyiceberg .partitioning import PartitionSpec
2696+ from pyiceberg .io .pyarrow import write_file , PyArrowFileIO , ArrowScan
2697+ from pyiceberg .table import WriteTask , FileScanTask
2698+ import uuid
2699+
2700+ # Define schema and data
2701+ schema = Schema (
2702+ NestedField (1 , "id" , IntegerType (), required = True ),
2703+ NestedField (2 , "name" , StringType (), required = False ),
2704+ )
2705+ data = pa .table ({"id" : pa .array ([1 , 2 , 3 ], type = pa .int32 ()), "name" : ["a" , "b" , "c" ]})
2706+
2707+ # Create table metadata
2708+ table_metadata = TableMetadataV2 (
2709+ location = str (tmp_path ),
2710+ last_column_id = 2 ,
2711+ format_version = 2 ,
2712+ schemas = [schema ],
2713+ partition_specs = [PartitionSpec ()],
2714+ properties = {
2715+ "write.format.default" : "orc" ,
2716+ }
2717+ )
2718+ io = PyArrowFileIO ()
2719+
2720+ # Write ORC file using Iceberg API
2721+ write_uuid = uuid .uuid4 ()
2722+ tasks = [
2723+ WriteTask (
2724+ write_uuid = write_uuid ,
2725+ task_id = 0 ,
2726+ record_batches = data .to_batches (),
2727+ schema = schema ,
2728+ )
2729+ ]
2730+ data_files = list (write_file (io , table_metadata , iter (tasks )))
2731+ assert len (data_files ) == 1
2732+ data_file = data_files [0 ]
2733+ assert data_file .file_format == FileFormat .ORC
2734+ assert data_file .content == DataFileContent .DATA
2735+
2736+ # Read back using ArrowScan
2737+ scan = ArrowScan (
2738+ table_metadata = table_metadata ,
2739+ io = io ,
2740+ projected_schema = schema ,
2741+ row_filter = AlwaysTrue (),
2742+ case_sensitive = True ,
2743+ )
2744+ scan_task = FileScanTask (data_file = data_file )
2745+ table_read = scan .to_table ([scan_task ])
2746+
2747+ # Compare data ignoring schema metadata (like not null constraints)
2748+ assert table_read .num_rows == data .num_rows
2749+ assert table_read .num_columns == data .num_columns
2750+ assert table_read .column_names == data .column_names
2751+
2752+ # Compare actual column data values
2753+ for col_name in data .column_names :
2754+ original_values = data .column (col_name ).to_pylist ()
2755+ read_values = table_read .column (col_name ).to_pylist ()
2756+ assert original_values == read_values , f"Column { col_name } values don't match"
0 commit comments