@@ -2261,3 +2261,173 @@ def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
22612261 )
22622262
22632263 _create_table (session_catalog , identifier , {"format-version" : "3" }, schema = table .schema )
2264+
2265+
2266+ @pytest .mark .parametrize ("format_version" , [1 , 2 ])
2267+ def test_stage_only_delete (
2268+ spark : SparkSession , session_catalog : Catalog , arrow_table_with_null : pa .Table , format_version : int
2269+ ) -> None :
2270+ identifier = f"default.test_stage_only_delete_files_v{ format_version } "
2271+ iceberg_spec = PartitionSpec (
2272+ * [PartitionField (source_id = 4 , field_id = 1001 , transform = IdentityTransform (), name = "integer_partition" )]
2273+ )
2274+ tbl = _create_table (
2275+ session_catalog , identifier , {"format-version" : str (format_version )}, [arrow_table_with_null ], iceberg_spec
2276+ )
2277+
2278+ current_snapshot = tbl .metadata .current_snapshot_id
2279+ assert current_snapshot is not None
2280+
2281+ original_count = len (tbl .scan ().to_arrow ())
2282+ assert original_count == 3
2283+
2284+ files_to_delete = []
2285+ for file_task in tbl .scan ().plan_files ():
2286+ files_to_delete .append (file_task .file )
2287+ assert len (files_to_delete ) > 0
2288+
2289+ with tbl .transaction () as txn :
2290+ with txn .update_snapshot (stage_only = True ).delete () as delete :
2291+ delete .delete_by_predicate (EqualTo ("int" , 9 ))
2292+
2293+ # a new delete snapshot is added
2294+ snapshots = tbl .snapshots ()
2295+ assert len (snapshots ) == 2
2296+
2297+ rows = spark .sql (
2298+ f"""
2299+ SELECT operation, summary
2300+ FROM { identifier } .snapshots
2301+ ORDER BY committed_at ASC
2302+ """
2303+ ).collect ()
2304+ operations = [row .operation for row in rows ]
2305+ assert operations == ["append" , "delete" ]
2306+
2307+ # snapshot main ref has not changed
2308+ assert current_snapshot == tbl .metadata .current_snapshot_id
2309+ assert len (tbl .scan ().to_arrow ()) == original_count
2310+
2311+
2312+ @pytest .mark .integration
2313+ @pytest .mark .parametrize ("format_version" , [1 , 2 ])
2314+ def test_stage_only_fast_append (
2315+ spark : SparkSession , session_catalog : Catalog , arrow_table_with_null : pa .Table , format_version : int
2316+ ) -> None :
2317+ identifier = f"default.test_stage_only_fast_append_files_v{ format_version } "
2318+ tbl = _create_table (session_catalog , identifier , {"format-version" : str (format_version )}, [arrow_table_with_null ])
2319+
2320+ current_snapshot = tbl .metadata .current_snapshot_id
2321+ assert current_snapshot is not None
2322+
2323+ original_count = len (tbl .scan ().to_arrow ())
2324+ assert original_count == 3
2325+
2326+ with tbl .transaction () as txn :
2327+ with txn .update_snapshot (stage_only = True ).fast_append () as fast_append :
2328+ for data_file in _dataframe_to_data_files (
2329+ table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2330+ ):
2331+ fast_append .append_data_file (data_file = data_file )
2332+
2333+ # Main ref has not changed and data is not yet appended
2334+ assert current_snapshot == tbl .metadata .current_snapshot_id
2335+ assert len (tbl .scan ().to_arrow ()) == original_count
2336+
2337+ # There should be a new staged snapshot
2338+ snapshots = tbl .snapshots ()
2339+ assert len (snapshots ) == 2
2340+
2341+ rows = spark .sql (
2342+ f"""
2343+ SELECT operation, summary
2344+ FROM { identifier } .snapshots
2345+ ORDER BY committed_at ASC
2346+ """
2347+ ).collect ()
2348+ operations = [row .operation for row in rows ]
2349+ assert operations == ["append" , "append" ]
2350+
2351+
2352+ @pytest .mark .integration
2353+ @pytest .mark .parametrize ("format_version" , [1 , 2 ])
2354+ def test_stage_only_merge_append (
2355+ spark : SparkSession , session_catalog : Catalog , arrow_table_with_null : pa .Table , format_version : int
2356+ ) -> None :
2357+ identifier = f"default.test_stage_only_merge_append_files_v{ format_version } "
2358+ tbl = _create_table (session_catalog , identifier , {"format-version" : str (format_version )}, [arrow_table_with_null ])
2359+
2360+ current_snapshot = tbl .metadata .current_snapshot_id
2361+ assert current_snapshot is not None
2362+
2363+ original_count = len (tbl .scan ().to_arrow ())
2364+ assert original_count == 3
2365+
2366+ with tbl .transaction () as txn :
2367+ with txn .update_snapshot (stage_only = True ).merge_append () as merge_append :
2368+ for data_file in _dataframe_to_data_files (
2369+ table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2370+ ):
2371+ merge_append .append_data_file (data_file = data_file )
2372+
2373+ # Main ref has not changed and data is not yet appended
2374+ assert current_snapshot == tbl .metadata .current_snapshot_id
2375+ assert len (tbl .scan ().to_arrow ()) == original_count
2376+
2377+ # There should be a new staged snapshot
2378+ snapshots = tbl .snapshots ()
2379+ assert len (snapshots ) == 2
2380+
2381+ rows = spark .sql (
2382+ f"""
2383+ SELECT operation, summary
2384+ FROM { identifier } .snapshots
2385+ ORDER BY committed_at ASC
2386+ """
2387+ ).collect ()
2388+ operations = [row .operation for row in rows ]
2389+ assert operations == ["append" , "append" ]
2390+
2391+
2392+ @pytest .mark .integration
2393+ @pytest .mark .parametrize ("format_version" , [1 , 2 ])
2394+ def test_stage_only_overwrite_files (
2395+ spark : SparkSession , session_catalog : Catalog , arrow_table_with_null : pa .Table , format_version : int
2396+ ) -> None :
2397+ identifier = f"default.test_stage_only_overwrite_files_v{ format_version } "
2398+ tbl = _create_table (session_catalog , identifier , {"format-version" : str (format_version )}, [arrow_table_with_null ])
2399+
2400+ current_snapshot = tbl .metadata .current_snapshot_id
2401+ assert current_snapshot is not None
2402+
2403+ original_count = len (tbl .scan ().to_arrow ())
2404+ assert original_count == 3
2405+
2406+ files_to_delete = []
2407+ for file_task in tbl .scan ().plan_files ():
2408+ files_to_delete .append (file_task .file )
2409+ assert len (files_to_delete ) > 0
2410+
2411+ with tbl .transaction () as txn :
2412+ with txn .update_snapshot (stage_only = True ).overwrite () as overwrite :
2413+ for data_file in _dataframe_to_data_files (
2414+ table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2415+ ):
2416+ overwrite .append_data_file (data_file = data_file )
2417+ overwrite .delete_data_file (files_to_delete [0 ])
2418+
2419+ assert current_snapshot == tbl .metadata .current_snapshot_id
2420+ assert len (tbl .scan ().to_arrow ()) == original_count
2421+
2422+ snapshots = tbl .snapshots ()
2423+ assert len (snapshots ) == 2
2424+
2425+ rows = spark .sql (
2426+ f"""
2427+ SELECT operation, summary
2428+ FROM { identifier } .snapshots
2429+ ORDER BY committed_at ASC
2430+ """
2431+ ).collect ()
2432+ operations = [row .operation for row in rows ]
2433+ assert operations == ["append" , "overwrite" ]
0 commit comments