Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ set(ICEBERG_SOURCES
schema_util.cc
snapshot.cc
sort_field.cc
update/snapshot_manager.cc
sort_order.cc
statistics_file.cc
table.cc
Expand All @@ -85,6 +86,7 @@ set(ICEBERG_SOURCES
update/update_properties.cc
update/update_schema.cc
update/update_sort_order.cc
update/update_snapshot_reference.cc
util/bucket_util.cc
util/content_file_util.cc
util/conversions.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ iceberg_sources = files(
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_snapshot_reference.cc',
'update/update_sort_order.cc',
'util/bucket_util.cc',
'util/content_file_util.cc',
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
#include "iceberg/transaction.h"
#include "iceberg/update/snapshot_manager.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
Expand Down Expand Up @@ -179,6 +180,10 @@ Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
return transaction->NewUpdateSchema();
}

Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
return SnapshotManager::Make(name().ToString(), shared_from_this());
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();

/// \brief Create a new SnapshotManager to manage snapshots and snapshot references.
virtual Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
27 changes: 27 additions & 0 deletions src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_snapshot_reference.h"
#include "iceberg/update/update_sort_order.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -113,6 +114,24 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->SetCurrentSchema(std::move(result.schema),
result.new_last_column_id);
} break;
case PendingUpdate::Kind::kUpdateSnapshotReference: {
auto& update_ref = internal::checked_cast<UpdateSnapshotReference&>(update);
ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply());
const auto& current_refs = current().refs;
// Identify references which have been removed
for (const auto& [name, ref] : current_refs) {
if (updated_refs.find(name) == updated_refs.end()) {
metadata_builder_->RemoveRef(name);
}
}
// Identify references which have been created or updated.
for (const auto& [name, ref] : updated_refs) {
auto current_it = current_refs.find(name);
if (current_it == current_refs.end() || *current_it->second != *ref) {
metadata_builder_->SetRef(name, ref);
}
}
} break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
Expand Down Expand Up @@ -193,4 +212,12 @@ Result<std::shared_ptr<UpdateSchema>> Transaction::NewUpdateSchema() {
return update_schema;
}

Result<std::shared_ptr<UpdateSnapshotReference>>
Transaction::NewUpdateSnapshotReference() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
UpdateSnapshotReference::Make(shared_from_this()));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
return update_ref;
}

} // namespace iceberg
4 changes: 4 additions & 0 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();

/// \brief Create a new UpdateSnapshotReference to update snapshot references (branches
/// and tags) and commit the changes.
Result<std::shared_ptr<UpdateSnapshotReference>> NewUpdateSnapshotReference();

private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class TableProperties;
/// \brief Table update.
class TableMetadataBuilder;
class TableUpdate;
class SnapshotManager;
class TableRequirement;
class TableUpdateContext;
class Transaction;
Expand All @@ -192,6 +193,7 @@ class UpdatePartitionSpec;
class UpdateProperties;
class UpdateSchema;
class UpdateSortOrder;
class UpdateSnapshotReference;

/// ----------------------------------------------------------------------------
/// TODO: Forward declarations below are not added yet.
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ install_headers(
'pending_update.h',
'update_partition_spec.h',
'update_schema.h',
'update_snapshot_reference.h',
'update_sort_order.h',
'update_properties.h',
],
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/pending_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kUpdatePartitionSpec,
kUpdateProperties,
kUpdateSchema,
kUpdateSnapshotReference,
kUpdateSortOrder,
};

Expand Down
Loading
Loading