-
Notifications
You must be signed in to change notification settings - Fork 3k
Core, API: Report metrics about deleted files in ExpireSnapshots #14921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1369,6 +1369,9 @@ acceptedBreaks: | |
| old: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| new: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| justification: "New method for Manifest List reading" | ||
| - code: "java.method.addedToInterface" | ||
| new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::metricsReporter(org.apache.iceberg.metrics.MetricsReporter)" | ||
| justification: "New method for reporting metrics from ExpireSnapshots" | ||
| org.apache.iceberg:iceberg-core: | ||
| - code: "java.class.noLongerInheritsFromClass" | ||
| old: "class org.apache.iceberg.rest.auth.OAuth2Manager" | ||
|
|
@@ -1456,6 +1459,9 @@ acceptedBreaks: | |
| old: "method void org.apache.iceberg.PartitionStats::deletedEntry(org.apache.iceberg.Snapshot)" | ||
| new: "method void org.apache.iceberg.PartitionStats::deletedEntry(org.apache.iceberg.Snapshot)" | ||
| justification: "Changing deprecated code" | ||
| - code: "java.method.removed" | ||
| old: "method org.apache.iceberg.io.CloseableIterable<java.lang.String> org.apache.iceberg.ManifestFiles::readPaths(org.apache.iceberg.ManifestFile, org.apache.iceberg.io.FileIO)" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this breaks an existing API, which we must avoid |
||
| justification: "Replaced with a more generic readColumns method" | ||
| org.apache.iceberg:iceberg-data: | ||
| - code: "java.class.removed" | ||
| old: "class org.apache.iceberg.data.PartitionStatsHandler" | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |||||||||
| import java.util.List; | ||||||||||
| import java.util.concurrent.ExecutorService; | ||||||||||
| import java.util.function.Consumer; | ||||||||||
| import org.apache.iceberg.metrics.MetricsReporter; | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * API for removing old {@link Snapshot snapshots} from a table. | ||||||||||
|
|
@@ -161,4 +162,10 @@ default ExpireSnapshots cleanExpiredMetadata(boolean clean) { | |||||||||
| throw new UnsupportedOperationException( | ||||||||||
| this.getClass().getName() + " doesn't implement cleanExpiredMetadata"); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** Report metrics about the ExpireSnapshots operation to the provided reporter */ | ||||||||||
| default ExpireSnapshots metricsReporter(MetricsReporter reporter) { | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for iceberg/core/src/main/java/org/apache/iceberg/SnapshotProducer.java Lines 191 to 194 in 026ec35
We might want to do the same in RemoveSnapshots
|
||||||||||
| throw new UnsupportedOperationException( | ||||||||||
| this.getClass().getName() + " doesn't implement metricsReporter"); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,14 +18,18 @@ | |
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.function.Consumer; | ||
| import org.apache.iceberg.exceptions.NotFoundException; | ||
| import org.apache.iceberg.exceptions.ValidationException; | ||
| import org.apache.iceberg.io.BulkDeletionFailureException; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.io.SupportsBulkOperations; | ||
| import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.util.Tasks; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -42,6 +46,9 @@ public void accept(String file) { | |
| }; | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class); | ||
| protected static final String MANIFEST = "manifest"; | ||
| protected static final String MANIFEST_LIST = "manifest list"; | ||
| protected static final String STATISTICS_FILES = "statistics files"; | ||
|
Comment on lines
+49
to
+51
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be an enum? |
||
|
|
||
| protected final FileIO fileIO; | ||
| protected final ExecutorService planExecutorService; | ||
|
|
@@ -72,7 +79,7 @@ protected FileCleanupStrategy( | |
| * @param afterExpiration table metadata after snapshot expiration | ||
| * @param cleanupLevel controls which types of files are eligible for deletion | ||
| */ | ||
| public abstract void cleanFiles( | ||
| public abstract DeleteSummary cleanFiles( | ||
| TableMetadata beforeExpiration, | ||
| TableMetadata afterExpiration, | ||
| ExpireSnapshots.CleanupLevel cleanupLevel); | ||
|
|
@@ -99,8 +106,9 @@ protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) { | |
| } | ||
| } | ||
|
|
||
| protected void deleteFiles(Set<String> pathsToDelete, String fileType) { | ||
| protected void deleteFiles(Set<String> pathsToDelete, String fileType, DeleteSummary summary) { | ||
| if (deleteFunc == null && fileIO instanceof SupportsBulkOperations) { | ||
| int failures = 0; | ||
| try { | ||
| ((SupportsBulkOperations) fileIO).deleteFiles(pathsToDelete); | ||
| } catch (BulkDeletionFailureException e) { | ||
|
|
@@ -110,9 +118,11 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) { | |
| pathsToDelete.size(), | ||
| fileType, | ||
| e); | ||
| failures = e.numberFailedObjects(); | ||
| } catch (RuntimeException e) { | ||
| LOG.warn("Bulk deletion failed", e); | ||
| } | ||
| summary.deletedFiles(fileType, pathsToDelete.size() - failures); | ||
| } else { | ||
| Consumer<String> deleteFuncToUse = deleteFunc == null ? defaultDeleteFunc : deleteFunc; | ||
|
|
||
|
|
@@ -124,7 +134,92 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) { | |
| .suppressFailureWhenFinished() | ||
| .onFailure( | ||
| (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) | ||
| .run(deleteFuncToUse::accept); | ||
| .run( | ||
| file -> { | ||
| deleteFuncToUse.accept(file); | ||
| summary.deletedFile(fileType); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| static class DeleteSummary { | ||
| private final AtomicLong dataFilesCount = new AtomicLong(0L); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't we already have all of this in the |
||
| private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L); | ||
| private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L); | ||
| private final AtomicLong manifestsCount = new AtomicLong(0L); | ||
| private final AtomicLong manifestListsCount = new AtomicLong(0L); | ||
| private final AtomicLong statisticsFilesCount = new AtomicLong(0L); | ||
|
|
||
| public void deletedFiles(String type, int numFiles) { | ||
| if (FileContent.DATA.name().equalsIgnoreCase(type)) { | ||
| dataFilesCount.addAndGet(numFiles); | ||
|
|
||
| } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) { | ||
| positionDeleteFilesCount.addAndGet(numFiles); | ||
|
|
||
| } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) { | ||
| equalityDeleteFilesCount.addAndGet(numFiles); | ||
|
|
||
| } else if (MANIFEST.equalsIgnoreCase(type)) { | ||
| manifestsCount.addAndGet(numFiles); | ||
|
|
||
| } else if (MANIFEST_LIST.equalsIgnoreCase(type)) { | ||
| manifestListsCount.addAndGet(numFiles); | ||
|
|
||
| } else if (STATISTICS_FILES.equalsIgnoreCase(type)) { | ||
| statisticsFilesCount.addAndGet(numFiles); | ||
|
|
||
| } else { | ||
| throw new ValidationException("Illegal file type: %s", type); | ||
| } | ||
| } | ||
|
|
||
| public void deletedFile(String type) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this call |
||
| if (FileContent.DATA.name().equalsIgnoreCase(type)) { | ||
| dataFilesCount.incrementAndGet(); | ||
|
|
||
| } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) { | ||
| positionDeleteFilesCount.incrementAndGet(); | ||
|
|
||
| } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) { | ||
| equalityDeleteFilesCount.incrementAndGet(); | ||
|
|
||
| } else if (MANIFEST.equalsIgnoreCase(type)) { | ||
| manifestsCount.incrementAndGet(); | ||
|
|
||
| } else if (MANIFEST_LIST.equalsIgnoreCase(type)) { | ||
| manifestListsCount.incrementAndGet(); | ||
|
|
||
| } else if (STATISTICS_FILES.equalsIgnoreCase(type)) { | ||
| statisticsFilesCount.incrementAndGet(); | ||
|
|
||
| } else { | ||
| throw new ValidationException("Illegal file type: %s", type); | ||
| } | ||
| } | ||
|
|
||
| public long dataFilesCount() { | ||
| return dataFilesCount.get(); | ||
| } | ||
|
|
||
| public long positionDeleteFilesCount() { | ||
| return positionDeleteFilesCount.get(); | ||
| } | ||
|
|
||
| public long equalityDeleteFilesCount() { | ||
| return equalityDeleteFilesCount.get(); | ||
| } | ||
|
|
||
| public long manifestsCount() { | ||
| return manifestsCount.get(); | ||
| } | ||
|
|
||
| public long manifestListsCount() { | ||
| return manifestListsCount.get(); | ||
| } | ||
|
|
||
| public long statisticsFilesCount() { | ||
| return statisticsFilesCount.get(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -141,6 +236,46 @@ protected Set<String> expiredStatisticsFilesLocations( | |
| return Sets.difference(statsFileLocationsBeforeExpiration, statsFileLocationsAfterExpiration); | ||
| } | ||
|
|
||
| protected static class FileInfo { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a 3rd |
||
| private final FileContent content; | ||
| private final String path; | ||
|
|
||
| public FileInfo(FileContent content, String path) { | ||
| this.content = content; | ||
| this.path = path; | ||
|
Comment on lines
+244
to
+245
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While this project code style is generally null friendly, we don't need to adhere to this in new code. |
||
| } | ||
|
|
||
| public FileContent getContent() { | ||
| return content; | ||
| } | ||
|
|
||
| public String getPath() { | ||
| return path; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object other) { | ||
| if (this == other) { | ||
| return true; | ||
| } else if (other == null || getClass() != other.getClass()) { | ||
| return false; | ||
| } | ||
|
|
||
| FileInfo fileInfo = (FileInfo) other; | ||
| return Objects.equals(content, fileInfo.content) && Objects.equals(path, fileInfo.path); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(content, path); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return MoreObjects.toStringHelper(this).add("content", content).add("path", path).toString(); | ||
| } | ||
| } | ||
|
|
||
| private Set<String> statsFileLocations(TableMetadata tableMetadata) { | ||
| Set<String> statsFileLocations = Sets.newHashSet(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.function.Consumer; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.exceptions.RuntimeIOException; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileIO; | ||
|
|
@@ -48,18 +49,19 @@ class IncrementalFileCleanup extends FileCleanupStrategy { | |
|
|
||
| @Override | ||
| @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) | ||
| public void cleanFiles( | ||
| public DeleteSummary cleanFiles( | ||
| TableMetadata beforeExpiration, | ||
| TableMetadata afterExpiration, | ||
| ExpireSnapshots.CleanupLevel cleanupLevel) { | ||
| DeleteSummary summary = new DeleteSummary(); | ||
| // clean up required underlying files based on the expired snapshots | ||
| // 1. Get a list of the snapshots that were removed | ||
| // 2. Delete any data files that were deleted by those snapshots and are not in the table | ||
| // 3. Delete any manifests that are no longer used by current snapshots | ||
| // 4. Delete the manifest lists | ||
| if (ExpireSnapshots.CleanupLevel.NONE == cleanupLevel) { | ||
| LOG.info("Nothing to clean."); | ||
| return; | ||
| return summary; | ||
| } | ||
|
|
||
| Set<Long> validIds = Sets.newHashSet(); | ||
|
|
@@ -79,12 +81,12 @@ public void cleanFiles( | |
|
|
||
| if (expiredIds.isEmpty()) { | ||
| // if no snapshots were expired, skip cleanup | ||
| return; | ||
| return summary; | ||
| } | ||
|
|
||
| Snapshot latest = beforeExpiration.currentSnapshot(); | ||
| if (latest == null) { | ||
| return; | ||
| return summary; | ||
| } | ||
|
|
||
| List<Snapshot> snapshots = afterExpiration.snapshots(); | ||
|
|
@@ -259,32 +261,44 @@ public void cleanFiles( | |
| }); | ||
|
|
||
| if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) { | ||
| Set<String> filesToDelete = | ||
| Set<FileInfo> filesToDelete = | ||
| findFilesToDelete( | ||
| manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById()); | ||
| LOG.debug("Deleting {} data files", filesToDelete.size()); | ||
| deleteFiles(filesToDelete, "data"); | ||
| Map<FileContent, Set<String>> groupedFilesToDelete = | ||
| filesToDelete.stream() | ||
| .collect( | ||
| Collectors.groupingBy( | ||
| FileInfo::getContent, | ||
| Collectors.mapping(FileInfo::getPath, Collectors.toSet()))); | ||
|
|
||
| for (Map.Entry<FileContent, Set<String>> entry : groupedFilesToDelete.entrySet()) { | ||
|
Comment on lines
+267
to
+274
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why group now? It seems orthogonal to collecting summary |
||
| Set<String> filesToDeleteGroup = entry.getValue(); | ||
| String fileType = entry.getKey().name(); | ||
| LOG.debug("Deleting {} {} files", filesToDeleteGroup.size(), fileType); | ||
| deleteFiles(filesToDeleteGroup, fileType, summary); | ||
| } | ||
| } | ||
|
|
||
| LOG.debug("Deleting {} manifest files", manifestsToDelete.size()); | ||
| deleteFiles(manifestsToDelete, "manifest"); | ||
| deleteFiles(manifestsToDelete, MANIFEST, summary); | ||
| LOG.debug("Deleting {} manifest-list files", manifestListsToDelete.size()); | ||
| deleteFiles(manifestListsToDelete, "manifest list"); | ||
| deleteFiles(manifestListsToDelete, MANIFEST_LIST, summary); | ||
|
|
||
| if (hasAnyStatisticsFiles(beforeExpiration)) { | ||
| Set<String> expiredStatisticsFilesLocations = | ||
| expiredStatisticsFilesLocations(beforeExpiration, afterExpiration); | ||
| LOG.debug("Deleting {} statistics files", expiredStatisticsFilesLocations.size()); | ||
| deleteFiles(expiredStatisticsFilesLocations, "statistics files"); | ||
| deleteFiles(expiredStatisticsFilesLocations, STATISTICS_FILES, summary); | ||
| } | ||
| return summary; | ||
| } | ||
|
|
||
| private Set<String> findFilesToDelete( | ||
| private Set<FileInfo> findFilesToDelete( | ||
| Set<ManifestFile> manifestsToScan, | ||
| Set<ManifestFile> manifestsToRevert, | ||
| Set<Long> validIds, | ||
| Map<Integer, PartitionSpec> specsById) { | ||
| Set<String> filesToDelete = ConcurrentHashMap.newKeySet(); | ||
| Set<FileInfo> filesToDelete = ConcurrentHashMap.newKeySet(); | ||
| Tasks.foreach(manifestsToScan) | ||
| .retry(3) | ||
| .suppressFailureWhenFinished() | ||
|
|
@@ -295,14 +309,16 @@ private Set<String> findFilesToDelete( | |
| .run( | ||
| manifest -> { | ||
| // the manifest has deletes, scan it to find files to delete | ||
| try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) { | ||
| for (ManifestEntry<?> entry : reader.entries()) { | ||
| try (ManifestReader<? extends ContentFile<?>> reader = | ||
| ManifestFiles.open(manifest, fileIO, specsById)) { | ||
| for (ManifestEntry<? extends ContentFile<?>> entry : reader.entries()) { | ||
| // if the snapshot ID of the DELETE entry is no longer valid, the data can be | ||
| // deleted | ||
| if (entry.status() == ManifestEntry.Status.DELETED | ||
| && !validIds.contains(entry.snapshotId())) { | ||
| // use toString to ensure the path will not change (Utf8 is reused) | ||
| filesToDelete.add(entry.file().location()); | ||
| ContentFile<?> file = entry.file(); | ||
| filesToDelete.add(new FileInfo(file.content(), file.location())); | ||
| } | ||
| } | ||
| } catch (IOException e) { | ||
|
|
@@ -320,12 +336,14 @@ private Set<String> findFilesToDelete( | |
| .run( | ||
| manifest -> { | ||
| // the manifest has deletes, scan it to find files to delete | ||
| try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) { | ||
| for (ManifestEntry<?> entry : reader.entries()) { | ||
| try (ManifestReader<? extends ContentFile<?>> reader = | ||
| ManifestFiles.open(manifest, fileIO, specsById)) { | ||
| for (ManifestEntry<? extends ContentFile<?>> entry : reader.entries()) { | ||
| // delete any ADDED file from manifests that were reverted | ||
| if (entry.status() == ManifestEntry.Status.ADDED) { | ||
| // use toString to ensure the path will not change (Utf8 is reused) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pre-existing. This is obsolete. Would be nice to remove. (same for same comment above) |
||
| filesToDelete.add(entry.file().location()); | ||
| ContentFile<?> file = entry.file(); | ||
| filesToDelete.add(new FileInfo(file.content(), file.location())); | ||
| } | ||
| } | ||
| } catch (IOException e) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://github.com/apache/iceberg/pull/14921/files#r2668035317 on how to avoid breaking this API