Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ def append(self, df: pa.Table) -> None:
with self.update_snapshot().fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
data_files = _dataframe_to_data_files(self, write_uuid=update_snapshot.commit_uuid, df=df)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

Expand Down Expand Up @@ -1052,7 +1052,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
with self.update_snapshot().overwrite() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
data_files = _dataframe_to_data_files(self, write_uuid=update_snapshot.commit_uuid, df=df)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

Expand Down Expand Up @@ -2349,7 +2349,9 @@ def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int,
return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'


def _dataframe_to_data_files(table: Table, df: pa.Table, file_schema: Optional[Schema] = None) -> Iterable[DataFile]:
def _dataframe_to_data_files(
table: Table, df: pa.Table, write_uuid: Optional[uuid.UUID] = None, file_schema: Optional[Schema] = None
) -> Iterable[DataFile]:
"""Convert a PyArrow table into a DataFile.

Returns:
Expand All @@ -2360,31 +2362,37 @@ def _dataframe_to_data_files(table: Table, df: pa.Table, file_schema: Optional[S
if len(table.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

write_uuid = uuid.uuid4()
counter = itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()

# This is an iter, so we don't have to materialize everything every time
# This will be more relevant when we start doing partitioned writes
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)]), file_schema=file_schema)


class _MergingSnapshotProducer:
commit_uuid: uuid.UUID
_operation: Operation
_table: Table
_snapshot_id: int
_parent_snapshot_id: Optional[int]
_added_data_files: List[DataFile]
_commit_uuid: uuid.UUID
_transaction: Optional[Transaction]

def __init__(self, operation: Operation, table: Table, transaction: Optional[Transaction] = None) -> None:
def __init__(
self,
operation: Operation,
table: Table,
commit_uuid: Optional[uuid.UUID] = None,
transaction: Optional[Transaction] = None,
) -> None:
self.commit_uuid = commit_uuid or uuid.uuid4()
self._operation = operation
self._table = table
self._snapshot_id = table.new_snapshot_id()
# Since we only support the main branch for now
self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None
self._added_data_files = []
self._commit_uuid = uuid.uuid4()
self._transaction = transaction

def __enter__(self) -> _MergingSnapshotProducer:
Expand All @@ -2408,7 +2416,7 @@ def _existing_manifests(self) -> List[ManifestFile]: ...
def _manifests(self) -> List[ManifestFile]:
def _write_added_manifest() -> List[ManifestFile]:
if self._added_data_files:
output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid)
output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self.commit_uuid)
with write_manifest(
format_version=self._table.format_version,
spec=self._table.spec(),
Expand All @@ -2434,7 +2442,8 @@ def _write_delete_manifest() -> List[ManifestFile]:
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
if len(deleted_entries) > 0:
output_file_location = _new_manifest_path(location=self._table.location(), num=1, commit_uuid=self._commit_uuid)
output_file_location = _new_manifest_path(location=self._table.location(), num=1, commit_uuid=self.commit_uuid)

with write_manifest(
format_version=self._table.format_version,
spec=self._table.spec(),
Expand Down Expand Up @@ -2477,7 +2486,7 @@ def commit(self) -> Snapshot:
summary = self._summary()

manifest_list_file_path = _generate_manifest_list_path(
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self.commit_uuid
)
with write_manifest_list(
format_version=self._table.metadata.format_version,
Expand Down