Skip to content

Commit 6cfc329

Browse files
committed
Moved the deduplicate logic found here: #2130 (comment) to a separate pr as suggested.
1 parent 93a79b9 commit 6cfc329

File tree

2 files changed

+0
-232
lines changed

2 files changed

+0
-232
lines changed

pyiceberg/table/maintenance.py

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -314,69 +314,6 @@ def process_manifest(manifest: ManifestFile) -> list[DataFile]:
314314

315315
return datafiles
316316

317-
def deduplicate_data_files(self) -> List[DataFile]:
318-
"""
319-
Remove duplicate data files from an Iceberg table.
320-
321-
Returns:
322-
List of removed DataFile objects.
323-
"""
324-
import os
325-
from collections import defaultdict
326-
327-
removed: List[DataFile] = []
328-
329-
# Get the current snapshot
330-
current_snapshot = self.tbl.current_snapshot()
331-
if not current_snapshot:
332-
return removed
333-
334-
# Collect all manifest entries from the current snapshot
335-
all_entries = []
336-
for manifest in current_snapshot.manifests(io=self.tbl.io):
337-
entries = list(manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=True))
338-
all_entries.extend(entries)
339-
340-
# Group entries by file name
341-
file_groups = defaultdict(list)
342-
for entry in all_entries:
343-
file_name = os.path.basename(entry.data_file.file_path)
344-
file_groups[file_name].append(entry)
345-
346-
# Find duplicate entries to remove
347-
has_duplicates = False
348-
files_to_remove = []
349-
files_to_keep = []
350-
351-
for _file_name, entries in file_groups.items():
352-
if len(entries) > 1:
353-
# Keep the first entry, remove the rest
354-
files_to_keep.append(entries[0].data_file)
355-
for duplicate_entry in entries[1:]:
356-
files_to_remove.append(duplicate_entry.data_file)
357-
removed.append(duplicate_entry.data_file)
358-
has_duplicates = True
359-
else:
360-
# No duplicates, keep the entry
361-
files_to_keep.append(entries[0].data_file)
362-
363-
# Only create a new snapshot if we actually have duplicates to remove
364-
if has_duplicates:
365-
with self.tbl.transaction() as txn:
366-
with txn.update_snapshot().overwrite() as overwrite_snapshot:
367-
# First, explicitly delete all the duplicate files
368-
for file_to_remove in files_to_remove:
369-
overwrite_snapshot.delete_data_file(file_to_remove)
370-
371-
# Then add back only the files that should be kept
372-
for file_to_keep in files_to_keep:
373-
overwrite_snapshot.append_data_file(file_to_keep)
374-
375-
# Refresh the table to reflect the changes
376-
self.tbl = self.tbl.refresh()
377-
378-
return removed
379-
380317
def _get_expiration_properties(self) -> tuple[Optional[int], Optional[int], Optional[int]]:
381318
"""Get the default expiration properties from table properties.
382319

tests/table/test_dedup_data_file_filepaths.py

Lines changed: 0 additions & 169 deletions
This file was deleted.

0 commit comments

Comments
 (0)