1+ # Licensed to the Apache Software Foundation (ASF) under one
2+ # or more contributor license agreements. See the NOTICE file
3+ # distributed with this work for additional information
4+ # regarding copyright ownership. The ASF licenses this file
5+ # to you under the Apache License, Version 2.0 (the
6+ # "License"); you may not use this file except in compliance
7+ # with the License. You may obtain a copy of the License at
8+ #
9+ # http://www.apache.org/licenses/LICENSE-2.0
10+ #
11+ # Unless required by applicable law or agreed to in writing,
12+ # software distributed under the License is distributed on an
13+ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+ # KIND, either express or implied. See the License for the
15+ # specific language governing permissions and limitations
16+ # under the License.
17+ import pytest
18+ import pyarrow as pa
19+ import pyarrow .parquet as pq
20+ from pyarrow import Table as pa_table
21+ from datafusion import SessionContext
22+ from pyiceberg .table import Table
23+ from pyiceberg .io .pyarrow import parquet_file_to_data_file
24+ from tests .catalog .test_base import InMemoryCatalog
25+
26+ @pytest .fixture
27+ def iceberg_catalog (tmp_path ):
28+ catalog = InMemoryCatalog ("test.in_memory.catalog" , warehouse = tmp_path .absolute ().as_posix ())
29+ catalog .create_namespace ("default" )
30+ return catalog
31+
32+ def test_overwrite_removes_only_selected_datafile (iceberg_catalog , tmp_path ):
33+ # Create a table and append two batches referencing the same file path
34+ ctx = SessionContext ()
35+ identifier = "default.test_overwrite_removes_only_selected_datafile"
36+ try :
37+ iceberg_catalog .drop_table (identifier )
38+ except Exception :
39+ pass
40+
41+ # Create Arrow schema and table
42+ arrow_schema = pa .schema ([
43+ pa .field ("id" , pa .int32 (), nullable = False ),
44+ pa .field ("value" , pa .string (), nullable = True ),
45+ ])
46+ df_a = pa_table .from_pylist ([
47+ {"id" : 1 , "value" : "A" , "file_path" : "path/to/file_a" },
48+ ], schema = arrow_schema )
49+ df_b = pa_table .from_pylist ([
50+ {"id" : 1 , "value" : "A" , "file_path" : "path/to/file_a" },
51+ ], schema = arrow_schema )
52+
53+ # Write Arrow tables to Parquet files
54+ parquet_path_a = str (tmp_path / "file_a.parquet" )
55+ parquet_path_b = str (tmp_path / "file_a.parquet" )
56+ pq .write_table (df_a , parquet_path_a )
57+ pq .write_table (df_b , parquet_path_b )
58+
59+ table : Table = iceberg_catalog .create_table (identifier , arrow_schema )
60+
61+ # Add both files as DataFiles using add_files
62+ tx = table .transaction ()
63+ tx .add_files ([parquet_path_a ], check_duplicate_files = False )
64+ tx .add_files ([parquet_path_b ], check_duplicate_files = False )
65+
66+ # Find DataFile for file_b
67+ data_file_b = parquet_file_to_data_file (table .io , table .metadata , parquet_path_b )
68+
69+ # Overwrite: Remove only the DataFile for file_b
70+ table .transaction ().update_snapshot ().overwrite ().delete_data_file (data_file_b ).commit ()
71+
72+ # Assert: only the row from file_a remains
73+ # Get all file paths in the current table
74+ file_paths = [chunk .as_py () for chunk in table .inspect .data_files ().to_pylist ()]
75+
76+ # Assert there are no duplicate file paths
77+ assert len (file_paths ) == len (set (file_paths )), "Duplicate file paths found in the table"
0 commit comments