Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,9 @@ acceptedBreaks:
old: "class org.apache.iceberg.encryption.EncryptingFileIO"
new: "class org.apache.iceberg.encryption.EncryptingFileIO"
justification: "New method for Manifest List reading"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::metricsReporter(org.apache.iceberg.metrics.MetricsReporter)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

justification: "New method for reporting metrics from ExpireSnapshots"
org.apache.iceberg:iceberg-core:
- code: "java.class.noLongerInheritsFromClass"
old: "class org.apache.iceberg.rest.auth.OAuth2Manager"
Expand Down Expand Up @@ -1456,6 +1459,9 @@ acceptedBreaks:
old: "method void org.apache.iceberg.PartitionStats::deletedEntry(org.apache.iceberg.Snapshot)"
new: "method void org.apache.iceberg.PartitionStats::deletedEntry(org.apache.iceberg.Snapshot)"
justification: "Changing deprecated code"
- code: "java.method.removed"
old: "method org.apache.iceberg.io.CloseableIterable<java.lang.String> org.apache.iceberg.ManifestFiles::readPaths(org.apache.iceberg.ManifestFile, org.apache.iceberg.io.FileIO)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this breaks an existing API, which we must avoid

justification: "Replaced with a more generic readColumns method"
org.apache.iceberg:iceberg-data:
- code: "java.class.removed"
old: "class org.apache.iceberg.data.PartitionStatsHandler"
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.iceberg.metrics.MetricsReporter;

/**
* API for removing old {@link Snapshot snapshots} from a table.
Expand Down Expand Up @@ -161,4 +162,10 @@ default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement cleanExpiredMetadata");
}

/** Report metrics about the ExpireSnapshots operation to the provided reporter */
default ExpireSnapshots metricsReporter(MetricsReporter reporter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for SnapshotProducer we defined a protected method so that this doesn't have to be exposed in general APIs:

protected ThisT reportWith(MetricsReporter newReporter) {
this.reporter = newReporter;
return self();
}

We might want to do the same in RemoveSnapshots

throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement metricsReporter");
}
}
141 changes: 138 additions & 3 deletions core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.iceberg;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
Expand All @@ -42,6 +46,9 @@ public void accept(String file) {
};

private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class);
protected static final String MANIFEST = "manifest";
protected static final String MANIFEST_LIST = "manifest list";
protected static final String STATISTICS_FILES = "statistics files";
Comment on lines +49 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an enum?


protected final FileIO fileIO;
protected final ExecutorService planExecutorService;
Expand Down Expand Up @@ -72,7 +79,7 @@ protected FileCleanupStrategy(
* @param afterExpiration table metadata after snapshot expiration
* @param cleanupLevel controls which types of files are eligible for deletion
*/
public abstract void cleanFiles(
public abstract DeleteSummary cleanFiles(
TableMetadata beforeExpiration,
TableMetadata afterExpiration,
ExpireSnapshots.CleanupLevel cleanupLevel);
Expand All @@ -99,8 +106,9 @@ protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) {
}
}

protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
protected void deleteFiles(Set<String> pathsToDelete, String fileType, DeleteSummary summary) {
if (deleteFunc == null && fileIO instanceof SupportsBulkOperations) {
int failures = 0;
try {
((SupportsBulkOperations) fileIO).deleteFiles(pathsToDelete);
} catch (BulkDeletionFailureException e) {
Expand All @@ -110,9 +118,11 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
pathsToDelete.size(),
fileType,
e);
failures = e.numberFailedObjects();
} catch (RuntimeException e) {
LOG.warn("Bulk deletion failed", e);
}
summary.deletedFiles(fileType, pathsToDelete.size() - failures);
} else {
Consumer<String> deleteFuncToUse = deleteFunc == null ? defaultDeleteFunc : deleteFunc;

Expand All @@ -124,7 +134,92 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
.suppressFailureWhenFinished()
.onFailure(
(file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
.run(deleteFuncToUse::accept);
.run(
file -> {
deleteFuncToUse.accept(file);
summary.deletedFile(fileType);
});
}
}

static class DeleteSummary {
private final AtomicLong dataFilesCount = new AtomicLong(0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we already have all of this in the SnapshotSummary when the snapshot is committed?

private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
private final AtomicLong manifestsCount = new AtomicLong(0L);
private final AtomicLong manifestListsCount = new AtomicLong(0L);
private final AtomicLong statisticsFilesCount = new AtomicLong(0L);

public void deletedFiles(String type, int numFiles) {
if (FileContent.DATA.name().equalsIgnoreCase(type)) {
dataFilesCount.addAndGet(numFiles);

} else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
positionDeleteFilesCount.addAndGet(numFiles);

} else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
equalityDeleteFilesCount.addAndGet(numFiles);

} else if (MANIFEST.equalsIgnoreCase(type)) {
manifestsCount.addAndGet(numFiles);

} else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
manifestListsCount.addAndGet(numFiles);

} else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
statisticsFilesCount.addAndGet(numFiles);

} else {
throw new ValidationException("Illegal file type: %s", type);
}
}

public void deletedFile(String type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this call deletedFiles(type, 1) to avoid duplicate logic?

if (FileContent.DATA.name().equalsIgnoreCase(type)) {
dataFilesCount.incrementAndGet();

} else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
positionDeleteFilesCount.incrementAndGet();

} else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
equalityDeleteFilesCount.incrementAndGet();

} else if (MANIFEST.equalsIgnoreCase(type)) {
manifestsCount.incrementAndGet();

} else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
manifestListsCount.incrementAndGet();

} else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
statisticsFilesCount.incrementAndGet();

} else {
throw new ValidationException("Illegal file type: %s", type);
}
}

public long dataFilesCount() {
return dataFilesCount.get();
}

public long positionDeleteFilesCount() {
return positionDeleteFilesCount.get();
}

public long equalityDeleteFilesCount() {
return equalityDeleteFilesCount.get();
}

public long manifestsCount() {
return manifestsCount.get();
}

public long manifestListsCount() {
return manifestListsCount.get();
}

public long statisticsFilesCount() {
return statisticsFilesCount.get();
}
}

Expand All @@ -141,6 +236,46 @@ protected Set<String> expiredStatisticsFilesLocations(
return Sets.difference(statsFileLocationsBeforeExpiration, statsFileLocationsAfterExpiration);
}

protected static class FileInfo {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a 3rd FileInfo class. Can this one have a more distinct name?

private final FileContent content;
private final String path;

public FileInfo(FileContent content, String path) {
this.content = content;
this.path = path;
Comment on lines +244 to +245
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this project code style is generally null friendly, we don't need to adhere to this in new code.
I would use Preconditions.checkNotNull here (and then maybe put @Nonnull on the getters)

}

public FileContent getContent() {
return content;
}

public String getPath() {
return path;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (other == null || getClass() != other.getClass()) {
return false;
}

FileInfo fileInfo = (FileInfo) other;
return Objects.equals(content, fileInfo.content) && Objects.equals(path, fileInfo.path);
}

@Override
public int hashCode() {
return Objects.hash(content, path);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("content", content).add("path", path).toString();
}
}

private Set<String> statsFileLocations(TableMetadata tableMetadata) {
Set<String> statsFileLocations = Sets.newHashSet();

Expand Down
54 changes: 36 additions & 18 deletions core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
Expand All @@ -48,18 +49,19 @@ class IncrementalFileCleanup extends FileCleanupStrategy {

@Override
@SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
public void cleanFiles(
public DeleteSummary cleanFiles(
TableMetadata beforeExpiration,
TableMetadata afterExpiration,
ExpireSnapshots.CleanupLevel cleanupLevel) {
DeleteSummary summary = new DeleteSummary();
// clean up required underlying files based on the expired snapshots
// 1. Get a list of the snapshots that were removed
// 2. Delete any data files that were deleted by those snapshots and are not in the table
// 3. Delete any manifests that are no longer used by current snapshots
// 4. Delete the manifest lists
if (ExpireSnapshots.CleanupLevel.NONE == cleanupLevel) {
LOG.info("Nothing to clean.");
return;
return summary;
}

Set<Long> validIds = Sets.newHashSet();
Expand All @@ -79,12 +81,12 @@ public void cleanFiles(

if (expiredIds.isEmpty()) {
// if no snapshots were expired, skip cleanup
return;
return summary;
}

Snapshot latest = beforeExpiration.currentSnapshot();
if (latest == null) {
return;
return summary;
}

List<Snapshot> snapshots = afterExpiration.snapshots();
Expand Down Expand Up @@ -259,32 +261,44 @@ public void cleanFiles(
});

if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) {
Set<String> filesToDelete =
Set<FileInfo> filesToDelete =
findFilesToDelete(
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());
LOG.debug("Deleting {} data files", filesToDelete.size());
deleteFiles(filesToDelete, "data");
Map<FileContent, Set<String>> groupedFilesToDelete =
filesToDelete.stream()
.collect(
Collectors.groupingBy(
FileInfo::getContent,
Collectors.mapping(FileInfo::getPath, Collectors.toSet())));

for (Map.Entry<FileContent, Set<String>> entry : groupedFilesToDelete.entrySet()) {
Comment on lines +267 to +274
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why group now? It seems orthogonal to collecting summary

Set<String> filesToDeleteGroup = entry.getValue();
String fileType = entry.getKey().name();
LOG.debug("Deleting {} {} files", filesToDeleteGroup.size(), fileType);
deleteFiles(filesToDeleteGroup, fileType, summary);
}
}

LOG.debug("Deleting {} manifest files", manifestsToDelete.size());
deleteFiles(manifestsToDelete, "manifest");
deleteFiles(manifestsToDelete, MANIFEST, summary);
LOG.debug("Deleting {} manifest-list files", manifestListsToDelete.size());
deleteFiles(manifestListsToDelete, "manifest list");
deleteFiles(manifestListsToDelete, MANIFEST_LIST, summary);

if (hasAnyStatisticsFiles(beforeExpiration)) {
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
LOG.debug("Deleting {} statistics files", expiredStatisticsFilesLocations.size());
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
deleteFiles(expiredStatisticsFilesLocations, STATISTICS_FILES, summary);
}
return summary;
}

private Set<String> findFilesToDelete(
private Set<FileInfo> findFilesToDelete(
Set<ManifestFile> manifestsToScan,
Set<ManifestFile> manifestsToRevert,
Set<Long> validIds,
Map<Integer, PartitionSpec> specsById) {
Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
Set<FileInfo> filesToDelete = ConcurrentHashMap.newKeySet();
Tasks.foreach(manifestsToScan)
.retry(3)
.suppressFailureWhenFinished()
Expand All @@ -295,14 +309,16 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
try (ManifestReader<? extends ContentFile<?>> reader =
ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<? extends ContentFile<?>> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be
// deleted
if (entry.status() == ManifestEntry.Status.DELETED
&& !validIds.contains(entry.snapshotId())) {
// use toString to ensure the path will not change (Utf8 is reused)
filesToDelete.add(entry.file().location());
ContentFile<?> file = entry.file();
filesToDelete.add(new FileInfo(file.content(), file.location()));
}
}
} catch (IOException e) {
Expand All @@ -320,12 +336,14 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
try (ManifestReader<? extends ContentFile<?>> reader =
ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<? extends ContentFile<?>> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
// use toString to ensure the path will not change (Utf8 is reused)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing. This is obsolete. Would be nice to remove. (same for same comment above)

filesToDelete.add(entry.file().location());
ContentFile<?> file = entry.file();
filesToDelete.add(new FileInfo(file.content(), file.location()));
}
}
} catch (IOException e) {
Expand Down
Loading