From c5cdc09fade55613c0b3eadf4b7100a8c31d7e75 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 5 Jan 2026 17:53:20 +0800 Subject: [PATCH 1/4] feat: support expire snapshots --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/table.cc | 8 + src/iceberg/table.h | 4 + src/iceberg/table_metadata.cc | 94 +++++- src/iceberg/table_update.cc | 8 +- src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/expire_snapshots_test.cc | 94 ++++++ .../test/table_metadata_builder_test.cc | 147 ++++++++- src/iceberg/transaction.cc | 22 ++ src/iceberg/transaction.h | 4 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/expire_snapshots.cc | 297 ++++++++++++++++++ src/iceberg/update/expire_snapshots.h | 175 +++++++++++ src/iceberg/update/pending_update.h | 1 + 15 files changed, 847 insertions(+), 11 deletions(-) create mode 100644 src/iceberg/test/expire_snapshots_test.cc create mode 100644 src/iceberg/update/expire_snapshots.cc create mode 100644 src/iceberg/update/expire_snapshots.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 2ecd652f7..fbec4e565 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -80,6 +80,7 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc + update/expire_snapshots.cc update/pending_update.cc update/update_partition_spec.cc update/update_properties.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 78ebd604c..557f053e4 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -101,6 +101,7 @@ iceberg_sources = files( 'transform.cc', 'transform_function.cc', 'type.cc', + 'update/expire_snapshots.cc', 'update/pending_update.cc', 'update/update_partition_spec.cc', 'update/update_properties.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index f2e6d3202..1018f0775 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -30,6 +30,7 @@ #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" #include "iceberg/transaction.h" +#include "iceberg/update/expire_snapshots.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" @@ -179,6 +180,13 @@ Result> Table::NewUpdateSchema() { return transaction->NewUpdateSchema(); } +Result> Table::NewExpireSnapshots() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewExpireSnapshots(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 311395856..1fdea2572 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -144,6 +144,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewUpdateSchema(); + /// \brief Create a new ExpireSnapshots to remove expired snapshots and commit the + /// changes. + virtual Result> NewExpireSnapshots(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 851048b30..594d0bd82 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -592,6 +592,12 @@ class TableMetadataBuilder::Impl { Result AddSchema(const Schema& schema, int32_t new_last_column_id); void SetLocation(std::string_view location); + Status SetRef(const std::string& name, std::shared_ptr ref); + Status RemoveRef(const std::string& name); + Status AddSnapshot(std::shared_ptr snapshot); + Status RemoveSnapshots(const std::vector& snapshot_ids); + Status RemovePartitionSpecs(const std::vector& spec_ids); + Result> Build(); private: @@ -1077,6 +1083,79 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( return new_schema_id; } +Status TableMetadataBuilder::Impl::SetRef(const std::string& name, + std::shared_ptr ref) { + ICEBERG_PRECHECK(!metadata_.refs.contains(name), + "Cannot set ref: {}, which is already exist.", name); + metadata_.refs[name] = ref; + if (ref->type() == SnapshotRefType::kBranch) { + auto retention = std::get(ref->retention); + changes_.push_back(std::make_unique( + name, ref->snapshot_id, ref->type(), retention.min_snapshots_to_keep, + retention.max_snapshot_age_ms, retention.max_ref_age_ms)); + } else { + auto retention = std::get(ref->retention); + changes_.push_back(std::make_unique( + name, ref->snapshot_id, ref->type(), std::nullopt, std::nullopt, + retention.max_ref_age_ms)); + } + return {}; +} + +Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) { + ICEBERG_PRECHECK(metadata_.refs.contains(name), + "Cannot remove ref: {}, which is not exist.", name); + + metadata_.refs.erase(name); + changes_.push_back(std::make_unique(name)); + + return {}; +} + +Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapshot) { + // TODO(xiao.dong) this is only for test, not official complete implementation + metadata_.snapshots.emplace_back(std::move(snapshot)); + return {}; +} + +Status TableMetadataBuilder::Impl::RemoveSnapshots( + const std::vector& snapshot_ids) { + auto current_snapshot_id = metadata_.current_snapshot_id; + std::unordered_set snapshot_ids_set(snapshot_ids.begin(), snapshot_ids.end()); + ICEBERG_PRECHECK(!snapshot_ids_set.contains(current_snapshot_id), + "Cannot remove current snapshot: {}", current_snapshot_id); + + if (!snapshot_ids.empty()) { + metadata_.snapshots = + metadata_.snapshots | std::views::filter([&](const auto& snapshot) { + return !snapshot_ids_set.contains(snapshot->snapshot_id); + }) | + std::ranges::to>>(); + changes_.push_back(std::make_unique(snapshot_ids)); + } + + return {}; +} + +Status TableMetadataBuilder::Impl::RemovePartitionSpecs( + const std::vector& spec_ids) { + auto default_spec_id = metadata_.default_spec_id; + std::unordered_set spec_ids_set(spec_ids.begin(), spec_ids.end()); + ICEBERG_PRECHECK(!spec_ids_set.contains(default_spec_id), + "Cannot remove default spec: {}", default_spec_id); + + if (!spec_ids.empty()) { + metadata_.partition_specs = + metadata_.partition_specs | std::views::filter([&](const auto& spec) { + return !spec_ids_set.contains(spec->spec_id()); + }) | + std::ranges::to>>(); + changes_.push_back(std::make_unique(spec_ids)); + } + + return {}; +} + TableMetadataBuilder::TableMetadataBuilder(int8_t format_version) : impl_(std::make_unique(format_version)) {} @@ -1179,7 +1258,8 @@ TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec( TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs( const std::vector& spec_ids) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionSpecs(spec_ids)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas( @@ -1207,7 +1287,8 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder( TableMetadataBuilder& TableMetadataBuilder::AddSnapshot( std::shared_ptr snapshot) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(std::move(snapshot))); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_id, @@ -1217,11 +1298,13 @@ TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_i TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name, std::shared_ptr ref) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, std::move(ref))); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveRef(name)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots( @@ -1231,7 +1314,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots( TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots( const std::vector& snapshot_ids) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSnapshots(snapshot_ids)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() { diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 38ce0fbc9..804df12e3 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -178,7 +178,7 @@ std::unique_ptr SetDefaultPartitionSpec::Clone() const { // RemovePartitionSpecs void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.RemovePartitionSpecs(spec_ids_); } void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const { @@ -301,7 +301,9 @@ std::unique_ptr AddSnapshot::Clone() const { // RemoveSnapshots -void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {} +void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const { + builder.RemoveSnapshots(snapshot_ids_); +} void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const { // RemoveSnapshots doesn't generate any requirements @@ -322,7 +324,7 @@ std::unique_ptr RemoveSnapshots::Clone() const { // RemoveSnapshotRef void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.RemoveRef(ref_name_); } void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const { diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 4e86576ed..69d11135c 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -167,6 +167,7 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(table_update_test USE_BUNDLE SOURCES + expire_snapshots_test.cc transaction_test.cc update_partition_spec_test.cc update_properties_test.cc diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc new file mode 100644 index 000000000..ea8530f13 --- /dev/null +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" + +namespace iceberg { + +class ExpireSnapshotsTest : public UpdateTestBase { + protected: +}; + +TEST_F(ExpireSnapshotsTest, Empty) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1); + EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004); + EXPECT_THAT(result.ref_to_remove.empty(), true); + EXPECT_THAT(result.schema_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_to_remove.empty(), true); +} + +TEST_F(ExpireSnapshotsTest, Keep2) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->RetainLast(2); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true); + EXPECT_THAT(result.ref_to_remove.empty(), true); + EXPECT_THAT(result.schema_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_to_remove.empty(), true); +} + +TEST_F(ExpireSnapshotsTest, ExpireById) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(3051729675574597004); + update->RetainLast(2); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1); + EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004); + EXPECT_THAT(result.ref_to_remove.empty(), true); + EXPECT_THAT(result.schema_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_to_remove.empty(), true); +} + +TEST_F(ExpireSnapshotsTest, ExpireByIdNotExist) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(3055729675574597003); + update->RetainLast(2); + auto result = update->Apply(); + EXPECT_THAT(result.has_value(), false); + EXPECT_THAT(result.error().message.contains("Snapshot:3055729675574597003 not exist"), + true); +} + +TEST_F(ExpireSnapshotsTest, ExpireOlderThan1) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireOlderThan(1515100955770 - 1); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true); + EXPECT_THAT(result.ref_to_remove.empty(), true); + EXPECT_THAT(result.schema_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_to_remove.empty(), true); +} + +TEST_F(ExpireSnapshotsTest, ExpireOlderThan2) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireOlderThan(1515100955770 + 1); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1); + EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004); + EXPECT_THAT(result.ref_to_remove.empty(), true); + EXPECT_THAT(result.schema_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_to_remove.empty(), true); +} + +} // namespace iceberg diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index a0fbe3be7..85fb1dd0f 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -61,7 +61,8 @@ Result> CreateDisorderedSchema() { } // Helper function to create base metadata for tests -std::unique_ptr CreateBaseMetadata() { +std::unique_ptr CreateBaseMetadata( + std::shared_ptr spec = nullptr) { auto metadata = std::make_unique(); metadata->format_version = 2; metadata->table_uuid = "test-uuid-1234"; @@ -71,8 +72,13 @@ std::unique_ptr CreateBaseMetadata() { metadata->last_column_id = 3; metadata->current_schema_id = 0; metadata->schemas.push_back(CreateTestSchema()); - metadata->partition_specs.push_back(PartitionSpec::Unpartitioned()); - metadata->default_spec_id = PartitionSpec::kInitialSpecId; + if (spec == nullptr) { + metadata->partition_specs.push_back(PartitionSpec::Unpartitioned()); + metadata->default_spec_id = PartitionSpec::kInitialSpecId; + } else { + metadata->default_spec_id = spec->spec_id(); + metadata->partition_specs.push_back(std::move(spec)); + } metadata->last_partition_id = 0; metadata->current_snapshot_id = kInvalidSnapshotId; metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; @@ -1127,4 +1133,139 @@ TEST(TableMetadataBuilderTest, RemoveSchemasAfterSchemaChange) { ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema: 1")); } +// Test RemoveSnapshotRef +TEST(TableMetadataBuilderTest, RemoveSnapshotRefBasic) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add multiple refs + auto ref1 = std::make_shared(); + ref1->snapshot_id = 1; + ref1->retention = SnapshotRef::Branch{}; + builder->SetRef("ref1", ref1); + auto ref2 = std::make_shared(); + ref2->snapshot_id = 2; + ref2->retention = SnapshotRef::Tag{}; + builder->SetRef("ref2", ref2); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->refs.size(), 2); + + // Remove one ref + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveRef("ref2"); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->refs.size(), 1); + EXPECT_TRUE(metadata->refs.contains("ref1")); +} + +// Test RemoveSnapshot +TEST(TableMetadataBuilderTest, RemoveSnapshotBasic) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add multiple snapshots + auto snapshot1 = std::make_shared(); + snapshot1->snapshot_id = 1; + builder->AddSnapshot(snapshot1); + auto snapshot2 = std::make_shared(); + snapshot2->snapshot_id = 2; + builder->AddSnapshot(snapshot2); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->snapshots.size(), 2); + + // Remove one snapshot + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSnapshots({snapshot2->snapshot_id}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->snapshots.size(), 1); + EXPECT_TRUE( + std::ranges::find_if(metadata->snapshots, [&](const std::shared_ptr& s) { + return s->snapshot_id == snapshot1->snapshot_id; + }) != metadata->snapshots.end()); +} + +TEST(TableMetadataBuilderTest, RemoveSnapshotNotExist) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add multiple snapshots + auto snapshot1 = std::make_shared(); + snapshot1->snapshot_id = 1; + builder->AddSnapshot(snapshot1); + auto snapshot2 = std::make_shared(); + snapshot2->snapshot_id = 2; + builder->AddSnapshot(snapshot2); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->snapshots.size(), 2); + + // Remove one snapshot + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSnapshots({3}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->snapshots.size(), 2); + + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSnapshots({1, 2}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->snapshots.size(), 0); +} + +// Test RemovePartitionSpec +TEST(TableMetadataBuilderTest, RemovePartitionSpecBasic) { + // Add multiple specs + PartitionField field1(2, 4, "field1", Transform::Identity()); + PartitionField field2(3, 5, "field2", Transform::Identity()); + ICEBERG_UNWRAP_OR_FAIL(auto spec1, PartitionSpec::Make(1, {field1})); + + auto base = CreateBaseMetadata(std::move(spec1)); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + ICEBERG_UNWRAP_OR_FAIL(auto spec2, PartitionSpec::Make(2, {field1, field2})); + builder->AddPartitionSpec(std::move(spec2)); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->partition_specs.size(), 2); + + // Remove one spec + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemovePartitionSpecs({2}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->partition_specs.size(), 1); + EXPECT_TRUE(std::ranges::find_if(metadata->partition_specs, + [&](const std::shared_ptr& s) { + return s->spec_id() == 1; + }) != metadata->partition_specs.end()); +} + +TEST(TableMetadataBuilderTest, RemovePartitionSpecNotExist) { + // Add multiple specs + PartitionField field1(2, 4, "field1", Transform::Identity()); + PartitionField field2(3, 5, "field2", Transform::Identity()); + ICEBERG_UNWRAP_OR_FAIL(auto spec1, PartitionSpec::Make(1, {field1})); + + auto base = CreateBaseMetadata(std::move(spec1)); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + ICEBERG_UNWRAP_OR_FAIL(auto spec2, PartitionSpec::Make(2, {field1, field2})); + builder->AddPartitionSpec(std::move(spec2)); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->partition_specs.size(), 2); + + // Remove one not exist spec + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemovePartitionSpecs({3}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->partition_specs.size(), 2); + + // Remove all + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemovePartitionSpecs({2, 3}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->partition_specs.size(), 1); +} + } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6641a1afd..8792cb6f1 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -28,6 +28,7 @@ #include "iceberg/table_requirement.h" #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" +#include "iceberg/update/expire_snapshots.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" @@ -113,6 +114,20 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->SetCurrentSchema(std::move(result.schema), result.new_last_column_id); } break; + case PendingUpdate::Kind::kExpireSnapshots: { + auto& expire_snapshots = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply()); + if (!result.snapshot_ids_to_remove.empty()) { + metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove)); + } + if (!result.partition_spec_to_remove.empty()) { + metadata_builder_->RemovePartitionSpecs( + std::move(result.partition_spec_to_remove)); + } + if (!result.schema_to_remove.empty()) { + metadata_builder_->RemoveSchemas(std::move(result.schema_to_remove)); + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -193,4 +208,11 @@ Result> Transaction::NewUpdateSchema() { return update_schema; } +Result> Transaction::NewExpireSnapshots() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr expire_snapshots, + ExpireSnapshots::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(expire_snapshots)); + return expire_snapshots; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index ea918a173..cd0639824 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -72,6 +72,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateSchema(); + /// \brief Create a new ExpireSnapshots to remove expired snapshots and commit the + /// changes. + Result> NewExpireSnapshots(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 2daf39e63..cc1929ca6 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -192,6 +192,7 @@ class UpdatePartitionSpec; class UpdateProperties; class UpdateSchema; class UpdateSortOrder; +class ExpireSnapshots; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc new file mode 100644 index 000000000..5d17a813f --- /dev/null +++ b/src/iceberg/update/expire_snapshots.cc @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result> ExpireSnapshots::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create ExpireSnapshots without a transaction"); + return std::shared_ptr(new ExpireSnapshots(std::move(transaction))); +} + +ExpireSnapshots::ExpireSnapshots( + [[maybe_unused]] std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)), + default_min_num_snapshots_( + transaction_->current().properties.Get(TableProperties::kMinSnapshotsToKeep)), + default_max_ref_age_ms_( + transaction_->current().properties.Get(TableProperties::kMaxRefAgeMs)), + current_time_ms_(std::chrono::time_point_cast( + std::chrono::system_clock::now())) { + auto max_snapshot_age_ms = + transaction_->current().properties.Get(TableProperties::kMaxSnapshotAgeMs); + default_expire_older_than_ = + current_time_ms_.time_since_epoch().count() - max_snapshot_age_ms; +} + +ExpireSnapshots::~ExpireSnapshots() = default; + +ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) { + const auto& current_metadata = transaction_->current(); + auto iter = std::ranges::find_if(current_metadata.snapshots, + [&](const std::shared_ptr& snapshot) { + return snapshot->snapshot_id == snapshot_id; + }); + + ICEBERG_BUILDER_CHECK(iter != current_metadata.snapshots.end(), + "Snapshot:{} not exist.", snapshot_id); + snapshot_ids_to_expire_.push_back(snapshot_id); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) { + ICEBERG_BUILDER_CHECK(timestamp_millis > 0, "Timestamp must be positive: {}", + timestamp_millis); + default_expire_older_than_ = timestamp_millis; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) { + ICEBERG_BUILDER_CHECK(num_snapshots > 0, + "Number of snapshots to retain must be positive: {}", + num_snapshots); + default_min_num_snapshots_ = num_snapshots; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::DeleteWith( + std::function delete_func) { + ICEBERG_BUILDER_CHECK(delete_func != nullptr, "Delete function cannot be null"); + delete_func_ = std::move(delete_func); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) { + cleanup_level_ = level; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::CleanExpiredMetadata(bool clean) { + clean_expired_metadata_ = clean; + return *this; +} + +Result> ExpireSnapshots::ComputeBranchSnapshotsToRetain( + const Table& table, int64_t snapshot, int64_t expire_snapshot_older_than, + int32_t min_snapshots_to_keep) { + std::vector snapshot_ids_to_retain; + ICEBERG_ASSIGN_OR_RAISE(auto snapshots, SnapshotUtil::AncestorsOf(table, snapshot)); + for (const auto& ancestor : snapshots) { + if (snapshot_ids_to_retain.size() < min_snapshots_to_keep || + ancestor->timestamp_ms.time_since_epoch().count() > expire_snapshot_older_than) { + snapshot_ids_to_retain.emplace_back(ancestor->snapshot_id); + } else { + break; + } + } + return snapshot_ids_to_retain; +} + +Result> ExpireSnapshots::ComputeAllBranchSnapshotIds( + const Table& table, const SnapshotToRef& retained_refs) { + std::vector snapshot_ids_to_retain; + for (const auto& [key, ref] : retained_refs) { + if (ref->type() == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref->retention); + int64_t expire_snapshot_older_than = + branch.max_snapshot_age_ms.has_value() + ? current_time_ms_.time_since_epoch().count() - + branch.max_snapshot_age_ms.value() + : default_expire_older_than_; + + int32_t min_snapshots_to_keep = branch.min_snapshots_to_keep.has_value() + ? branch.min_snapshots_to_keep.value() + : default_min_num_snapshots_; + + ICEBERG_ASSIGN_OR_RAISE(auto to_retain, + ComputeBranchSnapshotsToRetain(table, ref->snapshot_id, + expire_snapshot_older_than, + min_snapshots_to_keep)); + snapshot_ids_to_retain.insert(snapshot_ids_to_retain.end(), to_retain.begin(), + to_retain.end()); + } + } + return snapshot_ids_to_retain; +} + +Result> ExpireSnapshots::UnreferencedSnapshotIds( + const Table& table, const TableMetadata& current_metadata, + const SnapshotToRef& retained_refs) { + std::unordered_set referenced_snapshot_ids; + std::vector snapshot_ids_to_retain; + for (const auto& [key, ref] : retained_refs) { + if (ref->type() == SnapshotRefType::kBranch) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshots, + SnapshotUtil::AncestorsOf(table, ref->snapshot_id)); + for (const auto& snapshot : snapshots) { + referenced_snapshot_ids.insert(snapshot->snapshot_id); + } + } else { + referenced_snapshot_ids.insert(ref->snapshot_id); + } + } + + for (const auto& snapshot : current_metadata.snapshots) { + if (!referenced_snapshot_ids.contains(snapshot->snapshot_id) && + // unreferenced, not old enough to expire + snapshot->timestamp_ms.time_since_epoch().count() > default_expire_older_than_) { + snapshot_ids_to_retain.emplace_back(snapshot->snapshot_id); + } + } + return snapshot_ids_to_retain; +} + +Result ExpireSnapshots::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + const TableMetadata& current_metadata = transaction_->current(); + // attempt to clean expired metadata even if there are no snapshots to expire + // table metadata builder takes care of the case when this should actually be a no-op + if (current_metadata.snapshots.empty() && !clean_expired_metadata_) { + return {}; + } + std::unordered_set retained_snapshot_ids; + SnapshotToRef retained_refs; + std::unordered_map> retained_id_to_ref; + + for (const auto& [key, ref] : current_metadata.refs) { + std::cout << "handle ref:" << key << " snapshot:" << ref->snapshot_id << std::endl; + if (key == SnapshotRef::kMainBranch) { + std::cout << "retain main ref:" << key << " snapshot:" << ref->snapshot_id + << std::endl; + retained_refs[key] = ref; + continue; + } + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, + current_metadata.SnapshotById(ref->snapshot_id)); + auto max_ref_ags = ref->type() == SnapshotRefType::kBranch + ? std::get(ref->retention).max_ref_age_ms + : std::get(ref->retention).max_ref_age_ms; + auto max_ref_ags_ms = + max_ref_ags.has_value() ? max_ref_ags.value() : default_max_ref_age_ms_; + std::cout << "max_ref_ags_ms:" << max_ref_ags_ms << std::endl; + if (snapshot != nullptr) { + auto ref_age_ms = (current_time_ms_ - snapshot->timestamp_ms).count(); + std::cout << "ref_age_ms:" << max_ref_ags_ms << std::endl; + if (ref_age_ms <= max_ref_ags_ms) { + retained_refs[key] = ref; + } + } else { + // Invalid reference, remove it + } + } + for (const auto& [key, ref] : retained_refs) { + retained_id_to_ref[ref->snapshot_id].emplace_back(key); + retained_snapshot_ids.insert(ref->snapshot_id); + std::cout << "retained_snapshot_ids:" << ref->snapshot_id << std::endl; + } + + for (auto id : snapshot_ids_to_expire_) { + if (retained_id_to_ref.contains(id)) { + return Invalid("Cannot expire {}. Still referenced by refs.", id); + } + } + ICEBERG_ASSIGN_OR_RAISE( + auto all_branch_snapshot_ids, + ComputeAllBranchSnapshotIds(*transaction_->table(), retained_refs)); + + ICEBERG_ASSIGN_OR_RAISE( + auto unreferenced_snapshot_ids, + UnreferencedSnapshotIds(*transaction_->table(), current_metadata, retained_refs)); + + retained_snapshot_ids.insert(all_branch_snapshot_ids.begin(), + all_branch_snapshot_ids.end()); + retained_snapshot_ids.insert(unreferenced_snapshot_ids.begin(), + unreferenced_snapshot_ids.end()); + + std::vector specs_to_remove; + std::unordered_set schemas_to_remove; + if (clean_expired_metadata_) { + // TODO(xiao.dong) parallel processing + std::unordered_set reachable_specs; + std::unordered_set reachable_schemas; + reachable_specs.insert(current_metadata.default_spec_id); + reachable_schemas.insert(current_metadata.current_schema_id); + + for (const auto& snapshot_id : retained_snapshot_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, current_metadata.SnapshotById(snapshot_id)); + auto file_io = transaction_->table()->io(); + auto snapshot_cache = std::make_unique(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_list, snapshot_cache->Manifests(file_io)); + for (const auto& manifest : manifest_list) { + reachable_specs.insert(manifest.partition_spec_id); + } + if (snapshot->schema_id.has_value()) { + reachable_schemas.insert(snapshot->schema_id.value()); + } + } + + for (const auto& spec : current_metadata.partition_specs) { + if (!reachable_specs.contains(spec->spec_id())) { + specs_to_remove.emplace_back(spec->spec_id()); + } + } + for (const auto& schema : current_metadata.schemas) { + if (!reachable_schemas.contains(schema->schema_id())) { + schemas_to_remove.insert(schema->schema_id()); + } + } + } + SnapshotToRef refs_to_remove; + for (const auto& [key, ref] : current_metadata.refs) { + if (!retained_refs.contains(key)) { + refs_to_remove[key] = ref; + } + } + for (const auto& snapshot : current_metadata.snapshots) { + if (!retained_snapshot_ids.contains(snapshot->snapshot_id)) { + snapshot_ids_to_expire_.emplace_back(snapshot->snapshot_id); + } + } + return ExpireSnapshotsResult{.ref_to_remove = std::move(refs_to_remove), + .snapshot_ids_to_remove = snapshot_ids_to_expire_, + .partition_spec_to_remove = std::move(specs_to_remove), + .schema_to_remove = std::move(schemas_to_remove)}; +} + +Status ExpireSnapshots::Commit() { + ICEBERG_RETURN_UNEXPECTED(PendingUpdate::Commit()); + // TODO(xiao.dong) implements of FileCleanupStrategy + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h new file mode 100644 index 000000000..e11fb1ca3 --- /dev/null +++ b/src/iceberg/update/expire_snapshots.h @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" +#include "iceberg/util/timepoint.h" + +/// \file iceberg/update/expire_snapshots.h +/// \brief API for removing old snapshots from a table. + +namespace iceberg { + +/// \brief An enum representing possible clean up levels used in snapshot expiration. +enum class CleanupLevel : uint8_t { + /// Skip all file cleanup, only remove snapshot metadata. + kNone, + /// Clean up only metadata files (manifests, manifest lists, statistics), retain data + /// files. + kMetadataOnly, + /// Clean up both metadata and data files (default). + kAll +}; + +/// \brief API for removing old snapshots from a table. +/// +/// This API accumulates snapshot deletions and commits the new list to the table. This +/// API does not allow deleting the current snapshot. +/// +/// When committing, these changes will be applied to the latest table metadata. Commit +/// conflicts will be resolved by applying the changes to the new latest metadata and +/// reattempting the commit. +/// +/// Manifest files that are no longer used by valid snapshots will be deleted. Data files +/// that were deleted by snapshots that are expired will be deleted. DeleteWith() can be +/// used to pass an alternative deletion method. +/// +/// Apply() returns a list of the snapshots that will be removed. +class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~ExpireSnapshots() override; + + using SnapshotToRef = std::unordered_map>; + + struct ExpireSnapshotsResult { + SnapshotToRef ref_to_remove; + std::vector snapshot_ids_to_remove; + std::vector partition_spec_to_remove; + std::unordered_set schema_to_remove; + }; + + /// \brief Expires a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of the snapshot to expire. + /// \return Reference to this for method chaining. + ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id); + + /// \brief Expires all snapshots older than the given timestamp. + /// + /// \param timestamp_millis A long timestamp in milliseconds. + /// \return Reference to this for method chaining. + ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis); + + /// \brief Retains the most recent ancestors of the current snapshot. + /// + /// If a snapshot would be expired because it is older than the expiration timestamp, + /// but is one of the num_snapshots most recent ancestors of the current state, it will + /// be retained. This will not cause snapshots explicitly identified by id from + /// expiring. + /// + /// This may keep more than num_snapshots ancestors if snapshots are added concurrently. + /// This may keep less than num_snapshots ancestors if the current table state does not + /// have that many. + /// + /// \param num_snapshots The number of snapshots to retain. + /// \return Reference to this for method chaining. + ExpireSnapshots& RetainLast(int num_snapshots); + + /// \brief Passes an alternative delete implementation that will be used for manifests + /// and data files. + /// + /// Manifest files that are no longer used by valid snapshots will be deleted. Data + /// files that were deleted by snapshots that are expired will be deleted. + /// + /// If this method is not called, unnecessary manifests and data files will still be + /// deleted. + /// + /// \param delete_func A function that will be called to delete manifests and data files + /// \return Reference to this for method chaining. + ExpireSnapshots& DeleteWith(std::function delete_func); + + /// \brief Configures the cleanup level for expired files. + /// + /// This method provides fine-grained control over which files are cleaned up during + /// snapshot expiration. + /// + /// Consider CleanupLevel::kMetadataOnly when data files are shared across tables or + /// when using procedures like add-files that may reference the same data files. + /// + /// Consider CleanupLevel::kNone when data and metadata files may be more efficiently + /// removed using a distributed framework through the actions API. + /// + /// \param level The cleanup level to use for expired snapshots. + /// \return Reference to this for method chaining. + ExpireSnapshots& CleanupLevel(enum CleanupLevel level); + + /// \brief Enable cleaning up unused metadata, such as partition specs, schemas, etc. + /// + /// \param clean Remove unused partition specs, schemas, or other metadata when true. + /// \return Reference to this for method chaining. + ExpireSnapshots& CleanExpiredMetadata(bool clean); + + Kind kind() const final { return Kind::kExpireSnapshots; } + + /// \brief Apply the pending changes and return the results + /// \return The results of changes + Result Apply(); + + Status Commit() override; + + private: + explicit ExpireSnapshots([[maybe_unused]] std::shared_ptr transaction); + + Result> ComputeBranchSnapshotsToRetain( + const Table& table, int64_t snapshot, int64_t expire_snapshot_older_than, + int32_t min_snapshots_to_keep); + + Result> ComputeAllBranchSnapshotIds( + const Table& table, const SnapshotToRef& retained_refs); + + Result> UnreferencedSnapshotIds( + const Table& table, const TableMetadata& current_metadata, + const SnapshotToRef& retained_refs); + + private: + // Internal state + int32_t default_min_num_snapshots_; + int64_t default_max_ref_age_ms_; + int64_t default_expire_older_than_; + TimePointMs current_time_ms_; + std::vector snapshot_ids_to_expire_; + std::function delete_func_; + enum CleanupLevel cleanup_level_ { CleanupLevel::kAll }; + bool clean_expired_metadata_{false}; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 90723987c..d4534d819 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -46,6 +46,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateProperties, kUpdateSchema, kUpdateSortOrder, + kExpireSnapshots, }; /// \brief Return the kind of this pending update. From 5956aadd2d27d21ccbab45f1bd9f00307cbf7ecb Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 6 Jan 2026 14:28:48 +0800 Subject: [PATCH 2/4] fix build issue --- src/iceberg/test/table_metadata_builder_test.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index 85fb1dd0f..6ea50183a 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -1177,7 +1177,8 @@ TEST(TableMetadataBuilderTest, RemoveSnapshotBasic) { // Remove one snapshot builder = TableMetadataBuilder::BuildFrom(metadata.get()); - builder->RemoveSnapshots({snapshot2->snapshot_id}); + std::vector to_remove{snapshot2->snapshot_id}; + builder->RemoveSnapshots(to_remove); ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); ASSERT_EQ(metadata->snapshots.size(), 1); EXPECT_TRUE( @@ -1203,12 +1204,14 @@ TEST(TableMetadataBuilderTest, RemoveSnapshotNotExist) { // Remove one snapshot builder = TableMetadataBuilder::BuildFrom(metadata.get()); - builder->RemoveSnapshots({3}); + std::vector to_remove{3}; + builder->RemoveSnapshots(to_remove); ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); ASSERT_EQ(metadata->snapshots.size(), 2); builder = TableMetadataBuilder::BuildFrom(metadata.get()); - builder->RemoveSnapshots({1, 2}); + to_remove = {1, 2}; + builder->RemoveSnapshots(to_remove); ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); ASSERT_EQ(metadata->snapshots.size(), 0); } From 42d7071cae24b2478d0553c70332558bd513f621 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 7 Jan 2026 12:04:29 +0800 Subject: [PATCH 3/4] fix comments --- src/iceberg/table_metadata.cc | 186 +++++++++++++++--- .../test/table_metadata_builder_test.cc | 11 ++ 2 files changed, 169 insertions(+), 28 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 594d0bd82..5eca4c807 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -1085,9 +1085,51 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( Status TableMetadataBuilder::Impl::SetRef(const std::string& name, std::shared_ptr ref) { - ICEBERG_PRECHECK(!metadata_.refs.contains(name), - "Cannot set ref: {}, which is already exist.", name); + // Check if the ref already exists and is equal to the new ref + auto existing_ref_it = metadata_.refs.find(name); + if (existing_ref_it != metadata_.refs.end() && *existing_ref_it->second == *ref) { + // No change needed + return {}; + } + + // Validate that the snapshot exists + int64_t snapshot_id = ref->snapshot_id; + auto snapshot_it = + std::ranges::find_if(metadata_.snapshots, [snapshot_id](const auto& snapshot) { + return snapshot != nullptr && snapshot->snapshot_id == snapshot_id; + }); + ICEBERG_PRECHECK(snapshot_it != metadata_.snapshots.end(), + "Cannot set {} to unknown snapshot: {}", name, snapshot_id); + + // Check if this is an added snapshot (in the current set of changes) + bool is_added_snapshot = + std::ranges::any_of(changes_, [snapshot_id](const auto& change) { + return change->kind() == TableUpdate::Kind::kAddSnapshot && + internal::checked_cast(*change) + .snapshot() + ->snapshot_id == snapshot_id; + }); + + if (is_added_snapshot) { + metadata_.last_updated_ms = (*snapshot_it)->timestamp_ms; + } + + // Handle main branch specially + if (name == SnapshotRef::kMainBranch) { + metadata_.current_snapshot_id = ref->snapshot_id; + if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) { + metadata_.last_updated_ms = + TimePointMs{std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch())}; + } + + metadata_.snapshot_log.emplace_back(metadata_.last_updated_ms, ref->snapshot_id); + } + + // Update the refs map metadata_.refs[name] = ref; + + // Record the change if (ref->type() == SnapshotRefType::kBranch) { auto retention = std::get(ref->retention); changes_.push_back(std::make_unique( @@ -1099,39 +1141,116 @@ Status TableMetadataBuilder::Impl::SetRef(const std::string& name, name, ref->snapshot_id, ref->type(), std::nullopt, std::nullopt, retention.max_ref_age_ms)); } + return {}; } Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) { - ICEBERG_PRECHECK(metadata_.refs.contains(name), - "Cannot remove ref: {}, which is not exist.", name); + // Handle main branch specially + if (name == SnapshotRef::kMainBranch) { + metadata_.current_snapshot_id = kInvalidSnapshotId; + } - metadata_.refs.erase(name); - changes_.push_back(std::make_unique(name)); + // Remove the ref from the map + auto it = metadata_.refs.find(name); + if (it != metadata_.refs.end()) { + metadata_.refs.erase(it); + changes_.push_back(std::make_unique(name)); + } return {}; } Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapshot) { - // TODO(xiao.dong) this is only for test, not official complete implementation - metadata_.snapshots.emplace_back(std::move(snapshot)); + if (snapshot == nullptr) { + // No-op + return {}; + } + + // Validate preconditions + ICEBERG_PRECHECK(!metadata_.schemas.empty(), + "Attempting to add a snapshot before a schema is added"); + ICEBERG_PRECHECK(!metadata_.partition_specs.empty(), + "Attempting to add a snapshot before a partition spec is added"); + ICEBERG_PRECHECK(!metadata_.sort_orders.empty(), + "Attempting to add a snapshot before a sort order is added"); + + // Check if snapshot already exists + int64_t snapshot_id = snapshot->snapshot_id; + auto existing_snapshot = + std::ranges::find_if(metadata_.snapshots, [snapshot_id](const auto& s) { + return s != nullptr && s->snapshot_id == snapshot_id; + }); + ICEBERG_PRECHECK(existing_snapshot == metadata_.snapshots.end(), + "Snapshot already exists for id: {}", snapshot_id); + + // Validate sequence number + ICEBERG_PRECHECK( + metadata_.format_version == 1 || + snapshot->sequence_number > metadata_.last_sequence_number || + !snapshot->parent_snapshot_id.has_value(), + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot->sequence_number, metadata_.last_sequence_number); + + // Update metadata + metadata_.last_updated_ms = snapshot->timestamp_ms; + metadata_.last_sequence_number = snapshot->sequence_number; + metadata_.snapshots.push_back(snapshot); + changes_.push_back(std::make_unique(snapshot)); + + // TODO(xiao.dong) Handle row lineage for format version >= 3 return {}; } Status TableMetadataBuilder::Impl::RemoveSnapshots( const std::vector& snapshot_ids) { - auto current_snapshot_id = metadata_.current_snapshot_id; + if (snapshot_ids.empty()) { + return {}; + } + std::unordered_set snapshot_ids_set(snapshot_ids.begin(), snapshot_ids.end()); - ICEBERG_PRECHECK(!snapshot_ids_set.contains(current_snapshot_id), - "Cannot remove current snapshot: {}", current_snapshot_id); - if (!snapshot_ids.empty()) { - metadata_.snapshots = - metadata_.snapshots | std::views::filter([&](const auto& snapshot) { - return !snapshot_ids_set.contains(snapshot->snapshot_id); - }) | - std::ranges::to>>(); - changes_.push_back(std::make_unique(snapshot_ids)); + // Build a map of snapshot IDs for quick lookup + std::unordered_map> snapshots_by_id; + for (const auto& snapshot : metadata_.snapshots) { + if (snapshot) { + snapshots_by_id[snapshot->snapshot_id] = snapshot; + } + } + + // Filter snapshots to retain + std::vector> retained_snapshots; + retained_snapshots.reserve(metadata_.snapshots.size()); + + for (const auto& snapshot : metadata_.snapshots) { + if (!snapshot) continue; + + int64_t snapshot_id = snapshot->snapshot_id; + if (snapshot_ids_set.contains(snapshot_id)) { + // Remove from the map + snapshots_by_id.erase(snapshot_id); + // Record the removal + changes_.push_back( + std::make_unique(std::vector{snapshot_id})); + // Note: Statistics and partition statistics removal would be handled here + // if those features were implemented + } else { + retained_snapshots.push_back(snapshot); + } + } + + metadata_.snapshots = std::move(retained_snapshots); + + // Remove any refs that are no longer valid (dangling refs) + std::vector dangling_refs; + for (const auto& [ref_name, ref] : metadata_.refs) { + if (!snapshots_by_id.contains(ref->snapshot_id)) { + dangling_refs.push_back(ref_name); + } + } + + for (const auto& ref_name : dangling_refs) { + ICEBERG_RETURN_UNEXPECTED(RemoveRef(ref_name)); } return {}; @@ -1139,20 +1258,31 @@ Status TableMetadataBuilder::Impl::RemoveSnapshots( Status TableMetadataBuilder::Impl::RemovePartitionSpecs( const std::vector& spec_ids) { - auto default_spec_id = metadata_.default_spec_id; + if (spec_ids.empty()) { + return {}; + } + std::unordered_set spec_ids_set(spec_ids.begin(), spec_ids.end()); - ICEBERG_PRECHECK(!spec_ids_set.contains(default_spec_id), - "Cannot remove default spec: {}", default_spec_id); - if (!spec_ids.empty()) { - metadata_.partition_specs = - metadata_.partition_specs | std::views::filter([&](const auto& spec) { - return !spec_ids_set.contains(spec->spec_id()); - }) | - std::ranges::to>>(); - changes_.push_back(std::make_unique(spec_ids)); + // Validate that we're not removing the default spec + ICEBERG_PRECHECK(!spec_ids_set.contains(metadata_.default_spec_id), + "Cannot remove the default partition spec"); + + // Filter partition specs to retain + metadata_.partition_specs = + metadata_.partition_specs | std::views::filter([&](const auto& spec) { + return !spec_ids_set.contains(spec->spec_id()); + }) | + std::ranges::to>>(); + + // Update the specs_by_id_ index + for (int32_t spec_id : spec_ids) { + specs_by_id_.erase(spec_id); } + // Record the change + changes_.push_back(std::make_unique(spec_ids)); + return {}; } diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index 6ea50183a..f1f19a53b 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -1138,6 +1138,14 @@ TEST(TableMetadataBuilderTest, RemoveSnapshotRefBasic) { auto base = CreateBaseMetadata(); auto builder = TableMetadataBuilder::BuildFrom(base.get()); + // Add multiple snapshots + auto snapshot1 = std::make_shared(); + snapshot1->snapshot_id = 1; + builder->AddSnapshot(snapshot1); + auto snapshot2 = std::make_shared(); + snapshot2->snapshot_id = 2; + builder->AddSnapshot(snapshot2); + // Add multiple refs auto ref1 = std::make_shared(); ref1->snapshot_id = 1; @@ -1181,6 +1189,9 @@ TEST(TableMetadataBuilderTest, RemoveSnapshotBasic) { builder->RemoveSnapshots(to_remove); ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); ASSERT_EQ(metadata->snapshots.size(), 1); + for (const auto& s : metadata->snapshots) { + std::cout << s->snapshot_id << std::endl; + } EXPECT_TRUE( std::ranges::find_if(metadata->snapshots, [&](const std::shared_ptr& s) { return s->snapshot_id == snapshot1->snapshot_id; From bd391a9bc9177a86375ce80f6d78637e797a497d Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 7 Jan 2026 17:41:09 +0800 Subject: [PATCH 4/4] fix comment --- src/iceberg/test/expire_snapshots_test.cc | 20 ++++++++++---------- src/iceberg/transaction.cc | 8 ++++---- src/iceberg/update/expire_snapshots.cc | 4 ++-- src/iceberg/update/expire_snapshots.h | 4 ++-- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index ea8530f13..8bf93c1e9 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -34,8 +34,8 @@ TEST_F(ExpireSnapshotsTest, Empty) { EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1); EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004); EXPECT_THAT(result.ref_to_remove.empty(), true); - EXPECT_THAT(result.schema_to_remove.empty(), true); - EXPECT_THAT(result.partition_spec_to_remove.empty(), true); + EXPECT_THAT(result.schema_ids_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true); } TEST_F(ExpireSnapshotsTest, Keep2) { @@ -44,8 +44,8 @@ TEST_F(ExpireSnapshotsTest, Keep2) { ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true); EXPECT_THAT(result.ref_to_remove.empty(), true); - EXPECT_THAT(result.schema_to_remove.empty(), true); - EXPECT_THAT(result.partition_spec_to_remove.empty(), true); + EXPECT_THAT(result.schema_ids_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true); } TEST_F(ExpireSnapshotsTest, ExpireById) { @@ -56,8 +56,8 @@ TEST_F(ExpireSnapshotsTest, ExpireById) { EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1); EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004); EXPECT_THAT(result.ref_to_remove.empty(), true); - EXPECT_THAT(result.schema_to_remove.empty(), true); - EXPECT_THAT(result.partition_spec_to_remove.empty(), true); + EXPECT_THAT(result.schema_ids_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true); } TEST_F(ExpireSnapshotsTest, ExpireByIdNotExist) { @@ -76,8 +76,8 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan1) { ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true); EXPECT_THAT(result.ref_to_remove.empty(), true); - EXPECT_THAT(result.schema_to_remove.empty(), true); - EXPECT_THAT(result.partition_spec_to_remove.empty(), true); + EXPECT_THAT(result.schema_ids_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true); } TEST_F(ExpireSnapshotsTest, ExpireOlderThan2) { @@ -87,8 +87,8 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan2) { EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1); EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004); EXPECT_THAT(result.ref_to_remove.empty(), true); - EXPECT_THAT(result.schema_to_remove.empty(), true); - EXPECT_THAT(result.partition_spec_to_remove.empty(), true); + EXPECT_THAT(result.schema_ids_to_remove.empty(), true); + EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true); } } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 8792cb6f1..33cc2d297 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -120,12 +120,12 @@ Status Transaction::Apply(PendingUpdate& update) { if (!result.snapshot_ids_to_remove.empty()) { metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove)); } - if (!result.partition_spec_to_remove.empty()) { + if (!result.partition_spec_ids_to_remove.empty()) { metadata_builder_->RemovePartitionSpecs( - std::move(result.partition_spec_to_remove)); + std::move(result.partition_spec_ids_to_remove)); } - if (!result.schema_to_remove.empty()) { - metadata_builder_->RemoveSchemas(std::move(result.schema_to_remove)); + if (!result.schema_ids_to_remove.empty()) { + metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove)); } } break; default: diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 5d17a813f..db6101220 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -284,8 +284,8 @@ Result ExpireSnapshots::Apply() { } return ExpireSnapshotsResult{.ref_to_remove = std::move(refs_to_remove), .snapshot_ids_to_remove = snapshot_ids_to_expire_, - .partition_spec_to_remove = std::move(specs_to_remove), - .schema_to_remove = std::move(schemas_to_remove)}; + .partition_spec_ids_to_remove = std::move(specs_to_remove), + .schema_ids_to_remove = std::move(schemas_to_remove)}; } Status ExpireSnapshots::Commit() { diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index e11fb1ca3..8aa9f5b63 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -73,8 +73,8 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { struct ExpireSnapshotsResult { SnapshotToRef ref_to_remove; std::vector snapshot_ids_to_remove; - std::vector partition_spec_to_remove; - std::unordered_set schema_to_remove; + std::vector partition_spec_ids_to_remove; + std::unordered_set schema_ids_to_remove; }; /// \brief Expires a specific Snapshot identified by id.