From c03f04372acdd7e22f496b016623265f156ee02b Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sat, 10 Jan 2026 23:26:49 +0800 Subject: [PATCH 1/2] feat: add UpdateSnapshotReference --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/transaction.cc | 27 ++ src/iceberg/transaction.h | 4 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/meson.build | 1 + src/iceberg/update/pending_update.h | 1 + .../update/update_snapshot_reference.cc | 230 ++++++++++++++++++ .../update/update_snapshot_reference.h | 152 ++++++++++++ src/iceberg/util/snapshot_util.cc | 24 +- src/iceberg/util/snapshot_util_internal.h | 20 ++ 11 files changed, 459 insertions(+), 3 deletions(-) create mode 100644 src/iceberg/update/update_snapshot_reference.cc create mode 100644 src/iceberg/update/update_snapshot_reference.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 2ecd652f7..7d5fce53c 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -85,6 +85,7 @@ set(ICEBERG_SOURCES update/update_properties.cc update/update_schema.cc update/update_sort_order.cc + update/update_snapshot_reference.cc util/bucket_util.cc util/content_file_util.cc util/conversions.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 78ebd604c..5ba55ec3d 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -105,6 +105,7 @@ iceberg_sources = files( 'update/update_partition_spec.cc', 'update/update_properties.cc', 'update/update_schema.cc', + 'update/update_snapshot_reference.cc', 'update/update_sort_order.cc', 'util/bucket_util.cc', 'util/content_file_util.cc', diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6641a1afd..c2d981a48 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -32,6 +32,7 @@ #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" +#include "iceberg/update/update_snapshot_reference.h" #include "iceberg/update/update_sort_order.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -113,6 +114,24 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->SetCurrentSchema(std::move(result.schema), result.new_last_column_id); } break; + case PendingUpdate::Kind::kUpdateSnapshotReference: { + auto& update_ref = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply()); + const auto& current_refs = current().refs; + // Identify references which have been removed + for (const auto& [name, ref] : current_refs) { + if (updated_refs.find(name) == updated_refs.end()) { + metadata_builder_->RemoveRef(name); + } + } + // Identify references which have been created or updated. + for (const auto& [name, ref] : updated_refs) { + auto current_it = current_refs.find(name); + if (current_it == current_refs.end() || *current_it->second != *ref) { + metadata_builder_->SetRef(name, ref); + } + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -193,4 +212,12 @@ Result> Transaction::NewUpdateSchema() { return update_schema; } +Result> +Transaction::NewUpdateSnapshotReference() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_ref, + UpdateSnapshotReference::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref)); + return update_ref; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index ea918a173..654853b80 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -72,6 +72,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateSchema(); + /// \brief Create a new UpdateSnapshotReference to update snapshot references (branches + /// and tags) and commit the changes. + Result> NewUpdateSnapshotReference(); + private: Transaction(std::shared_ptr table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 2daf39e63..e74117e2f 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -192,6 +192,7 @@ class UpdatePartitionSpec; class UpdateProperties; class UpdateSchema; class UpdateSortOrder; +class UpdateSnapshotReference; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index e4c786f40..701be3ac0 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -20,6 +20,7 @@ install_headers( 'pending_update.h', 'update_partition_spec.h', 'update_schema.h', + 'update_snapshot_reference.h', 'update_sort_order.h', 'update_properties.h', ], diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 90723987c..a79ef9105 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -45,6 +45,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdatePartitionSpec, kUpdateProperties, kUpdateSchema, + kUpdateSnapshotReference, kUpdateSortOrder, }; diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc new file mode 100644 index 000000000..787e1cf7f --- /dev/null +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_snapshot_reference.h" + +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result> UpdateSnapshotReference::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateSnapshotReference without a transaction"); + return std::shared_ptr( + new UpdateSnapshotReference(std::move(transaction))); +} + +UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) { + // Initialize updated_refs_ with current refs from base metadata + const auto& base_refs = transaction_->current().refs; + for (const auto& [name, ref] : base_refs) { + updated_refs_[name] = ref; + } +} + +UpdateSnapshotReference::~UpdateSnapshotReference() = default; + +UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + // TODO(anyone): use MakeBranch after #408 got merged + auto branch = std::make_shared( + SnapshotRef{.snapshot_id = snapshot_id, .retention = SnapshotRef::Branch{}}); + auto [_, inserted] = updated_refs_.emplace(name, std::move(branch)); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + auto tag = std::make_shared( + SnapshotRef{.snapshot_id = snapshot_id, .retention = SnapshotRef::Tag{}}); + auto [_, inserted] = updated_refs_.emplace(name, std::move(tag)); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const std::string& name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main branch"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string& name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, + "Ref '{}' is a branch not a tag", name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch( + const std::string& name, const std::string& new_name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty"); + ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty"); + ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main branch"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + auto [_, inserted] = updated_refs_.emplace(new_name, it->second); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + it->second->snapshot_id = snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from, + const std::string& to) { + return ReplaceBranchInternal(from, to, false); +} + +UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from, + const std::string& to) { + return ReplaceBranchInternal(from, to, true); +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal( + const std::string& from, const std::string& to, bool fast_forward) { + ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty"); + ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty"); + auto to_it = updated_refs_.find(to); + ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist: {}", to); + + auto from_it = updated_refs_.find(from); + if (from_it == updated_refs_.end()) { + // Create branch if it doesn't exist + return CreateBranch(from, to_it->second->snapshot_id); + } + + ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", from); + + // Nothing to replace if snapshot IDs are the same + if (to_it->second->snapshot_id == from_it->second->snapshot_id) { + return *this; + } + + if (fast_forward) { + // Validate that current branch snapshot (from) is an ancestor of target snapshot (to) + // This ensures we can fast-forward from 'from' to 'to' + // TODO(anyone): use base() after #408 + const auto& base_metadata = transaction_->current(); + auto lookup = [&base_metadata](int64_t id) -> Result> { + return base_metadata.SnapshotById(id); + }; + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + bool target_is_ancestor, + SnapshotUtil::IsAncestorOf(to_it->second->snapshot_id, + from_it->second->snapshot_id, lookup)); + ICEBERG_BUILDER_CHECK(target_is_ancestor, + "Cannot fast-forward: {} is not an ancestor of {}", from, to); + } + + from_it->second->snapshot_id = to_it->second->snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, + "Ref '{}' is a branch not a tag", name); + it->second->snapshot_id = snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep( + const std::string& name, int32_t min_snapshots_to_keep) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + // TODO(anyone): use interface added in #408 + std::get(it->second->retention).min_snapshots_to_keep = + min_snapshots_to_keep; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs( + const std::string& name, int64_t max_snapshot_age_ms) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + std::get(it->second->retention).max_snapshot_age_ms = + max_snapshot_age_ms; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name); + if (it->second->type() == SnapshotRefType::kBranch) { + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + } else { + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + } + return *this; +} + +Result>> +UpdateSnapshotReference::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + return updated_refs_; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h new file mode 100644 index 000000000..174ff1fd9 --- /dev/null +++ b/src/iceberg/update/update_snapshot_reference.h @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_snapshot_reference.h +/// \brief Updates snapshot references (branches and tags). + +namespace iceberg { + +/// \brief Updates snapshot references. +/// +/// TODO(anyone): Add SetSnapshotOperation operations such as setCurrentSnapshot, +/// rollBackTime, rollbackTo to this class so that we can support those operations for +/// refs. +class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdateSnapshotReference() override; + + /// \brief Create a branch reference. + /// + /// \param name The branch name + /// \param snapshot_id The snapshot ID for the branch + /// \return Reference to this for method chaining + UpdateSnapshotReference& CreateBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Create a tag reference. + /// + /// \param name The tag name + /// \param snapshot_id The snapshot ID for the tag + /// \return Reference to this for method chaining + UpdateSnapshotReference& CreateTag(const std::string& name, int64_t snapshot_id); + + /// \brief Remove a branch reference. + /// + /// \param name The branch name to remove + /// \return Reference to this for method chaining + UpdateSnapshotReference& RemoveBranch(const std::string& name); + + /// \brief Remove a tag reference. + /// + /// \param name The tag name to remove + /// \return Reference to this for method chaining + UpdateSnapshotReference& RemoveTag(const std::string& name); + + /// \brief Rename a branch reference. + /// + /// \param name The current branch name + /// \param new_name The new branch name + /// \return Reference to this for method chaining + UpdateSnapshotReference& RenameBranch(const std::string& name, + const std::string& new_name); + + /// \brief Replace a branch reference with a new snapshot ID. + /// + /// \param name The branch name + /// \param snapshot_id The new snapshot ID + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Replace a branch reference with another reference's snapshot ID. + /// + /// \param from The branch name to update + /// \param to The reference name to copy the snapshot ID from + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceBranch(const std::string& from, const std::string& to); + + /// \brief Fast-forward a branch to another reference's snapshot ID. + /// + /// This is similar to ReplaceBranch but validates that the target snapshot is an + /// ancestor of the current branch snapshot. + /// + /// \param from The branch name to update + /// \param to The reference name to copy the snapshot ID from + /// \return Reference to this for method chaining + UpdateSnapshotReference& FastForward(const std::string& from, const std::string& to); + + /// \brief Replace a tag reference with a new snapshot ID. + /// + /// \param name The tag name + /// \param snapshot_id The new snapshot ID + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceTag(const std::string& name, int64_t snapshot_id); + + /// \brief Set the minimum number of snapshots to keep for a branch. + /// + /// \param name The branch name + /// \param min_snapshots_to_keep The minimum number of snapshots to keep + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMinSnapshotsToKeep(const std::string& name, + int32_t min_snapshots_to_keep); + + /// \brief Set the maximum snapshot age in milliseconds for a branch. + /// + /// \param name The branch name + /// \param max_snapshot_age_ms The maximum snapshot age in milliseconds + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMaxSnapshotAgeMs(const std::string& name, + int64_t max_snapshot_age_ms); + + /// \brief Set the maximum reference age in milliseconds. + /// + /// \param name The reference name + /// \param max_ref_age_ms The maximum reference age in milliseconds + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms); + + Kind kind() const final { return Kind::kUpdateSnapshotReference; } + + /// \brief Apply the pending changes and return the updated references. + Result>> Apply(); + + private: + explicit UpdateSnapshotReference(std::shared_ptr transaction); + + UpdateSnapshotReference& ReplaceBranchInternal(const std::string& from, + const std::string& to, + bool fast_forward); + + std::unordered_map> updated_refs_; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index 2ec36478a..8c71563a3 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -47,9 +47,19 @@ Result>> SnapshotUtil::AncestorsOf( Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_snapshot_id) { - ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); - return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) { - return snapshot != nullptr && snapshot->snapshot_id == ancestor_snapshot_id; + return table.SnapshotById(snapshot_id) + .and_then([snapshot_id, ancestor_snapshot_id, &table](const auto& snapshot) { + return IsAncestorOf(snapshot_id, ancestor_snapshot_id, + [&table](int64_t id) { return table.SnapshotById(id); }); + }); +} + +Result SnapshotUtil::IsAncestorOf( + int64_t snapshot_id, int64_t ancestor_snapshot_id, + const std::function>(int64_t)>& lookup) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(snapshot_id, lookup)); + return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& ancestor) { + return ancestor != nullptr && ancestor->snapshot_id == ancestor_snapshot_id; }); } @@ -180,6 +190,14 @@ Result>> SnapshotUtil::AncestorsOf( return AncestorsOf(snapshot, [&table](int64_t id) { return table.SnapshotById(id); }); } +Result>> SnapshotUtil::AncestorsOf( + int64_t snapshot_id, + const std::function>(int64_t)>& lookup) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, lookup(snapshot_id)); + ICEBERG_CHECK(snapshot != nullptr, "Snapshot is null for id {}", snapshot_id); + return AncestorsOf(snapshot, lookup); +} + Result>> SnapshotUtil::AncestorsOf( const std::shared_ptr& snapshot, const std::function>(int64_t)>& lookup) { diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index 2b11168ed..a1c06729a 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -52,6 +52,17 @@ class ICEBERG_EXPORT SnapshotUtil { static Result IsAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_snapshot_id); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using the + /// given lookup function. + /// + /// \param snapshot_id The snapshot ID to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \param lookup Function to lookup snapshots by ID + /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id + static Result IsAncestorOf( + int64_t snapshot_id, int64_t ancestor_snapshot_id, + const std::function>(int64_t)>& lookup); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of the table's current /// state. /// @@ -264,6 +275,15 @@ class ICEBERG_EXPORT SnapshotUtil { static Result>> AncestorsOf( const Table& table, const std::shared_ptr& snapshot); + /// \brief Helper function to traverse ancestors of a snapshot using a lookup function. + /// + /// \param snapshot_id The snapshot ID to start from + /// \param lookup Function to lookup snapshots by ID + /// \return A vector of ancestor snapshots + static Result>> AncestorsOf( + int64_t snapshot_id, + const std::function>(int64_t)>& lookup); + /// \brief Helper function to traverse ancestors of a snapshot using a lookup function. /// /// \param snapshot The snapshot to start from From c7b6980dc17820352628c3f46caa85212c6b7e3a Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sat, 10 Jan 2026 23:54:46 +0800 Subject: [PATCH 2/2] feat: add SnapshotManager --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/table.cc | 5 + src/iceberg/table.h | 3 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/snapshot_manager.cc | 290 +++++++++++++++++++++++++ src/iceberg/update/snapshot_manager.h | 232 ++++++++++++++++++++ 6 files changed, 532 insertions(+) create mode 100644 src/iceberg/update/snapshot_manager.cc create mode 100644 src/iceberg/update/snapshot_manager.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 7d5fce53c..f8cd6416d 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -66,6 +66,7 @@ set(ICEBERG_SOURCES schema_util.cc snapshot.cc sort_field.cc + update/snapshot_manager.cc sort_order.cc statistics_file.cc table.cc diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index f2e6d3202..f4fa8d2e3 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -30,6 +30,7 @@ #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" #include "iceberg/transaction.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" @@ -179,6 +180,10 @@ Result> Table::NewUpdateSchema() { return transaction->NewUpdateSchema(); } +Result> Table::NewSnapshotManager() { + return SnapshotManager::Make(name().ToString(), shared_from_this()); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 311395856..165d6a4bb 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -144,6 +144,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// changes. virtual Result> NewUpdateSchema(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. + virtual Result> NewSnapshotManager(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index e74117e2f..3bde6f3bf 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -182,6 +182,7 @@ class TableProperties; /// \brief Table update. class TableMetadataBuilder; class TableUpdate; +class SnapshotManager; class TableRequirement; class TableUpdateContext; class Transaction; diff --git a/src/iceberg/update/snapshot_manager.cc b/src/iceberg/update/snapshot_manager.cc new file mode 100644 index 000000000..bc79d1594 --- /dev/null +++ b/src/iceberg/update/snapshot_manager.cc @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_manager.h" + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/update/update_snapshot_reference.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> SnapshotManager::Make( + const std::string& table_name, std::shared_ptr
table) { + if (table == nullptr) { + return InvalidArgument("Table cannot be null"); + } + if (table->metadata() == nullptr) { + return InvalidArgument("Cannot manage snapshots: table {} does not exist", + table_name); + } + // Create a transaction first + ICEBERG_ASSIGN_OR_RAISE(auto transaction, + Transaction::Make(table, Transaction::Kind::kUpdate, + /*auto_commit=*/false)); + // Create SnapshotManager with the transaction (not external) + auto manager = std::shared_ptr( + new SnapshotManager(std::move(transaction), false)); + return manager; +} + +Result> SnapshotManager::Make( + std::shared_ptr transaction) { + if (transaction == nullptr) { + return InvalidArgument("Invalid input transaction: null"); + } + return std::shared_ptr( + new SnapshotManager(std::move(transaction), true)); +} + +SnapshotManager::SnapshotManager(std::shared_ptr transaction, + bool is_external) + : PendingUpdate(transaction), is_external_transaction_(is_external) {} + +SnapshotManager::~SnapshotManager() = default; + +SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) { + if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) { + return AddError(status.error()); + } + // TODO(anyone): Implement cherrypick operation + // This should create a new snapshot by applying changes from the given snapshot + // For now, throw NotImplemented + ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented"); + return *this; +} + +SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) { + if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) { + return AddError(status.error()); + } + // Verify snapshot exists + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot, + transaction_->current().SnapshotById(snapshot_id)); + // Set the main branch to point to this snapshot + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, + transaction_->NewUpdateSnapshotReference()); + update_ref->ReplaceBranch(std::string(SnapshotRef::kMainBranch), snapshot_id); + if (auto status = update_ref->Commit(); !status.has_value()) { + return AddError(status.error()); + } + return *this; +} + +SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) { + if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) { + return AddError(status.error()); + } + // Find the last snapshot before the given timestamp + const auto& snapshots = transaction_->current().snapshots; + std::shared_ptr target_snapshot = nullptr; + for (const auto& snapshot : snapshots) { + if (snapshot != nullptr && snapshot->timestamp_ms < timestamp_ms) { + if (target_snapshot == nullptr || + snapshot->timestamp_ms > target_snapshot->timestamp_ms) { + target_snapshot = snapshot; + } + } + } + ICEBERG_BUILDER_CHECK(target_snapshot != nullptr, + "Table has no old snapshot before timestamp {}", timestamp_ms); + return SetCurrentSnapshot(target_snapshot->snapshot_id); +} + +SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) { + if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) { + return AddError(status.error()); + } + // Verify snapshot exists + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot, + transaction_->current().SnapshotById(snapshot_id)); + // Get current snapshot + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, + transaction_->current().Snapshot()); + // Verify that the target snapshot is an ancestor of the current snapshot + // TODO(anyone): Use SnapshotUtil::IsAncestorOf once we have access to Table + // For now, we'll do a simple check by traversing parent_snapshot_id + int64_t current_id = current_snapshot->snapshot_id; + bool found = false; + while (current_id != -1) { + if (current_id == snapshot_id) { + found = true; + break; + } + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snap, + transaction_->current().SnapshotById(current_id)); + if (current_snap->parent_snapshot_id.has_value()) { + current_id = current_snap->parent_snapshot_id.value(); + } else { + break; + } + } + ICEBERG_BUILDER_CHECK(found, + "Cannot rollback to {}: it is not an ancestor of the current " + "snapshot", + snapshot_id); + return SetCurrentSnapshot(snapshot_id); +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, + transaction_->current().Snapshot()); + if (current_snapshot != nullptr) { + return CreateBranch(name, current_snapshot->snapshot_id); + } + // Check if branch already exists + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + const auto& current_refs = transaction_->current().refs; + ICEBERG_BUILDER_CHECK(current_refs.find(name) == current_refs.end(), + "Ref {} already exists", name); + // Create an empty snapshot for the branch + // TODO(anyone): Implement creating empty snapshot + // For now, throw NotImplemented + ICEBERG_BUILDER_CHECK(false, "Creating branch with empty snapshot not yet implemented"); + return *this; +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::CreateTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveBranch(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveBranch(name); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveTag(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveTag(name); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::FastForwardBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->FastForward(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::RenameBranch(const std::string& name, + const std::string& new_name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RenameBranch(name, new_name); + return *this; +} + +SnapshotManager& SnapshotManager::SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMinSnapshotsToKeep(branch_name, min_snapshots_to_keep); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxSnapshotAgeMs(branch_name, max_snapshot_age_ms); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxRefAgeMs(name, max_ref_age_ms); + return *this; +} + +Result> SnapshotManager::Apply() { + ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist()); + return transaction_->table()->current_snapshot(); +} + +Status SnapshotManager::Commit() { + ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist()); + if (!is_external_transaction_) { + ICEBERG_ASSIGN_OR_RAISE(auto updated_table, transaction_->Commit()); + // Create a new transaction with the updated table + ICEBERG_ASSIGN_OR_RAISE(transaction_, + Transaction::Make(updated_table, Transaction::Kind::kUpdate, + /*auto_commit=*/false)); + // Note: The base class transaction_ member is protected, so we can't update it + // directly However, since we always use transaction_ through the base class member, + // we need to ensure consistency. For now, we'll rely on the fact that all methods use + // transaction_ from the base class. + } + return {}; +} + +Result> +SnapshotManager::UpdateSnapshotReferencesOperation() { + if (update_snapshot_references_operation_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(update_snapshot_references_operation_, + transaction_->NewUpdateSnapshotReference()); + } + return update_snapshot_references_operation_; +} + +Status SnapshotManager::CommitIfRefUpdatesExist() { + if (update_snapshot_references_operation_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED(update_snapshot_references_operation_->Commit()); + update_snapshot_references_operation_ = nullptr; + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h new file mode 100644 index 000000000..540db6fa9 --- /dev/null +++ b/src/iceberg/update/snapshot_manager.h @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +/// \brief API for managing snapshots and snapshot references. +/// +/// Allows rolling table data back to a state at an older snapshot, cherry-picking +/// snapshots, and managing branches and tags. +class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { + public: + /// \brief Create a SnapshotManager for a table. + /// + /// \param table_name The name of the table + /// \param table The table to manage snapshots for + /// \return A new SnapshotManager instance, or an error if the table doesn't exist + static Result> Make(const std::string& table_name, + std::shared_ptr
table); + + /// \brief Create a SnapshotManager from an existing transaction. + /// + /// \param transaction The transaction to use + /// \return A new SnapshotManager instance + static Result> Make( + std::shared_ptr transaction); + + ~SnapshotManager() override; + + Kind kind() const final { + // SnapshotManager doesn't map to a specific PendingUpdate::Kind + // It manages multiple types of operations + return Kind::kUpdateSnapshotReference; + } + + /// \brief Apply supported changes in given snapshot and create a new snapshot which + /// will be set as the current snapshot on commit. + /// + /// \param snapshot_id A snapshot ID whose changes to apply + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException If the table has no snapshot with the given id + SnapshotManager& Cherrypick(int64_t snapshot_id); + + /// \brief Roll this table's data back to a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of the snapshot to roll back table data to + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException If the table has no snapshot with the given id + SnapshotManager& SetCurrentSnapshot(int64_t snapshot_id); + + /// \brief Roll this table's data back to the last Snapshot before the given timestamp. + /// + /// \param timestamp_ms A timestamp in milliseconds + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException If the table has no old snapshot before the given + /// timestamp + SnapshotManager& RollbackToTime(TimePointMs timestamp_ms); + + /// \brief Rollback table's state to a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of snapshot id to roll back table to. Must be an ancestor + /// of the current snapshot + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException If the table has no snapshot with the given id + /// \throws ValidationException If given snapshot id is not an ancestor of the current + /// state + SnapshotManager& RollbackTo(int64_t snapshot_id); + + /// \brief Create a new branch. The branch will point to current snapshot if the current + /// snapshot is not NULL. Otherwise, the branch will point to a newly created empty + /// snapshot. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if a branch with the given name already exists + SnapshotManager& CreateBranch(const std::string& name); + + /// \brief Create a new branch pointing to the given snapshot id. + /// + /// \param name Branch name + /// \param snapshot_id ID of the snapshot which will be the head of the branch + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if a branch with the given name already exists + SnapshotManager& CreateBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Create a new tag pointing to the given snapshot id. + /// + /// \param name Tag name + /// \param snapshot_id Snapshot ID for the head of the new tag + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if a tag with the given name already exists + SnapshotManager& CreateTag(const std::string& name, int64_t snapshot_id); + + /// \brief Remove a branch by name. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if the branch does not exist + SnapshotManager& RemoveBranch(const std::string& name); + + /// \brief Remove the tag with the given name. + /// + /// \param name Tag name + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if the tag does not exist + SnapshotManager& RemoveTag(const std::string& name); + + /// \brief Replaces the tag with the given name to point to the specified snapshot. + /// + /// \param name Tag to replace + /// \param snapshot_id New snapshot id for the given tag + /// \return Reference to this for method chaining + SnapshotManager& ReplaceTag(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the branch with the given name to point to the specified snapshot. + /// + /// \param name Branch to replace + /// \param snapshot_id New snapshot id for the given branch + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the from branch to point to the to snapshot. The to will remain + /// unchanged, and from branch will retain its retention properties. If the from branch + /// does not exist, it will be created with default retention properties. + /// + /// \param from Branch to replace + /// \param to The branch from should be replaced with + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& from, const std::string& to); + + /// \brief Performs a fast-forward of from up to the to snapshot if from is an ancestor + /// of to. The to will remain unchanged, and from will retain its retention properties. + /// If the from branch does not exist, it will be created with default retention + /// properties. + /// + /// \param from Branch to fast-forward + /// \param to Ref for the from branch to be fast forwarded to + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if from is not an ancestor of to + SnapshotManager& FastForwardBranch(const std::string& from, const std::string& to); + + /// \brief Rename a branch. + /// + /// \param name Name of branch to rename + /// \param new_name The desired new name of the branch + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if the branch to rename does not exist or if there + /// is already a branch with the same name as the desired new name + SnapshotManager& RenameBranch(const std::string& name, const std::string& new_name); + + /// \brief Updates the minimum number of snapshots to keep for a branch. + /// + /// \param branch_name Branch name + /// \param min_snapshots_to_keep Minimum number of snapshots to retain on the branch + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if the branch does not exist + SnapshotManager& SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep); + + /// \brief Updates the max snapshot age for a branch. + /// + /// \param branch_name Branch name + /// \param max_snapshot_age_ms Maximum snapshot age in milliseconds to retain on branch + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if the branch does not exist + SnapshotManager& SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms); + + /// \brief Updates the retention policy for a reference. + /// + /// \param name Reference name + /// \param max_ref_age_ms Retention age in milliseconds of the tag reference itself + /// \return Reference to this for method chaining + /// \throws IllegalArgumentException if the reference does not exist + SnapshotManager& SetMaxRefAgeMs(const std::string& name, int64_t max_ref_age_ms); + + /// \brief Apply the pending changes and return the current snapshot. + /// + /// \return The current snapshot after applying changes, or an error + Result> Apply(); + + /// \brief Commit all pending changes. + /// + /// \return Status indicating success or failure + Status Commit() override; + + private: + /// \brief Constructor for creating a SnapshotManager with a transaction. + /// + /// \param transaction The transaction to use + /// \param is_external Whether this is an external transaction (true) or created + /// internally (false) + SnapshotManager(std::shared_ptr transaction, bool is_external); + + /// \brief Get or create the UpdateSnapshotReference operation. + Result> UpdateSnapshotReferencesOperation(); + + /// \brief Commit any pending reference updates if they exist. + Status CommitIfRefUpdatesExist(); + + bool is_external_transaction_; + std::shared_ptr update_snapshot_references_operation_; +}; + +} // namespace iceberg