@@ -1022,7 +1022,7 @@ def append(self, df: pa.Table) -> None:
10221022 with self .update_snapshot ().fast_append () as update_snapshot :
10231023 # skip writing data files if the dataframe is empty
10241024 if df .shape [0 ] > 0 :
1025- data_files = _dataframe_to_data_files (self , df = df )
1025+ data_files = _dataframe_to_data_files (self , write_uuid = update_snapshot . commit_uuid , df = df )
10261026 for data_file in data_files :
10271027 update_snapshot .append_data_file (data_file )
10281028
@@ -1052,7 +1052,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
10521052 with self .update_snapshot ().overwrite () as update_snapshot :
10531053 # skip writing data files if the dataframe is empty
10541054 if df .shape [0 ] > 0 :
1055- data_files = _dataframe_to_data_files (self , df = df )
1055+ data_files = _dataframe_to_data_files (self , write_uuid = update_snapshot . commit_uuid , df = df )
10561056 for data_file in data_files :
10571057 update_snapshot .append_data_file (data_file )
10581058
@@ -2349,7 +2349,9 @@ def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int,
23492349 return f'{ location } /metadata/snap-{ snapshot_id } -{ attempt } -{ commit_uuid } .avro'
23502350
23512351
2352- def _dataframe_to_data_files (table : Table , df : pa .Table , file_schema : Optional [Schema ] = None ) -> Iterable [DataFile ]:
2352+ def _dataframe_to_data_files (
2353+ table : Table , df : pa .Table , write_uuid : Optional [uuid .UUID ] = None , file_schema : Optional [Schema ] = None
2354+ ) -> Iterable [DataFile ]:
23532355 """Convert a PyArrow table into a DataFile.
23542356
23552357 Returns:
@@ -2360,31 +2362,37 @@ def _dataframe_to_data_files(table: Table, df: pa.Table, file_schema: Optional[S
23602362 if len (table .spec ().fields ) > 0 :
23612363 raise ValueError ("Cannot write to partitioned tables" )
23622364
2363- write_uuid = uuid .uuid4 ()
23642365 counter = itertools .count (0 )
2366+ write_uuid = write_uuid or uuid .uuid4 ()
23652367
23662368 # This is an iter, so we don't have to materialize everything every time
23672369 # This will be more relevant when we start doing partitioned writes
23682370 yield from write_file (table , iter ([WriteTask (write_uuid , next (counter ), df )]), file_schema = file_schema )
23692371
23702372
23712373class _MergingSnapshotProducer :
2374+ commit_uuid : uuid .UUID
23722375 _operation : Operation
23732376 _table : Table
23742377 _snapshot_id : int
23752378 _parent_snapshot_id : Optional [int ]
23762379 _added_data_files : List [DataFile ]
2377- _commit_uuid : uuid .UUID
23782380 _transaction : Optional [Transaction ]
23792381
2380- def __init__ (self , operation : Operation , table : Table , transaction : Optional [Transaction ] = None ) -> None :
2382+ def __init__ (
2383+ self ,
2384+ operation : Operation ,
2385+ table : Table ,
2386+ commit_uuid : Optional [uuid .UUID ] = None ,
2387+ transaction : Optional [Transaction ] = None ,
2388+ ) -> None :
2389+ self .commit_uuid = commit_uuid or uuid .uuid4 ()
23812390 self ._operation = operation
23822391 self ._table = table
23832392 self ._snapshot_id = table .new_snapshot_id ()
23842393 # Since we only support the main branch for now
23852394 self ._parent_snapshot_id = snapshot .snapshot_id if (snapshot := self ._table .current_snapshot ()) else None
23862395 self ._added_data_files = []
2387- self ._commit_uuid = uuid .uuid4 ()
23882396 self ._transaction = transaction
23892397
23902398 def __enter__ (self ) -> _MergingSnapshotProducer :
@@ -2408,7 +2416,7 @@ def _existing_manifests(self) -> List[ManifestFile]: ...
24082416 def _manifests (self ) -> List [ManifestFile ]:
24092417 def _write_added_manifest () -> List [ManifestFile ]:
24102418 if self ._added_data_files :
2411- output_file_location = _new_manifest_path (location = self ._table .location (), num = 0 , commit_uuid = self ._commit_uuid )
2419+ output_file_location = _new_manifest_path (location = self ._table .location (), num = 0 , commit_uuid = self .commit_uuid )
24122420 with write_manifest (
24132421 format_version = self ._table .format_version ,
24142422 spec = self ._table .spec (),
@@ -2434,7 +2442,8 @@ def _write_delete_manifest() -> List[ManifestFile]:
24342442 # Check if we need to mark the files as deleted
24352443 deleted_entries = self ._deleted_entries ()
24362444 if len (deleted_entries ) > 0 :
2437- output_file_location = _new_manifest_path (location = self ._table .location (), num = 1 , commit_uuid = self ._commit_uuid )
2445+ output_file_location = _new_manifest_path (location = self ._table .location (), num = 1 , commit_uuid = self .commit_uuid )
2446+
24382447 with write_manifest (
24392448 format_version = self ._table .format_version ,
24402449 spec = self ._table .spec (),
@@ -2477,7 +2486,7 @@ def commit(self) -> Snapshot:
24772486 summary = self ._summary ()
24782487
24792488 manifest_list_file_path = _generate_manifest_list_path (
2480- location = self ._table .location (), snapshot_id = self ._snapshot_id , attempt = 0 , commit_uuid = self ._commit_uuid
2489+ location = self ._table .location (), snapshot_id = self ._snapshot_id , attempt = 0 , commit_uuid = self .commit_uuid
24812490 )
24822491 with write_manifest_list (
24832492 format_version = self ._table .metadata .format_version ,
0 commit comments