Skip to content

Commit 3fb4847

Browse files
committed
feat: update partition spec
1 parent b8edbdf commit 3fb4847

20 files changed

+1681
-8
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ set(ICEBERG_SOURCES
7777
transform_function.cc
7878
type.cc
7979
update/pending_update.cc
80-
update/update_sort_order.cc
80+
update/update_partition_spec.cc
8181
update/update_properties.cc
82+
update/update_sort_order.cc
8283
util/bucket_util.cc
8384
util/conversions.cc
8485
util/decimal.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ iceberg_sources = files(
9999
'transform_function.cc',
100100
'type.cc',
101101
'update/pending_update.cc',
102+
'update/update_partition_spec.cc',
102103
'update/update_properties.cc',
103104
'update/update_sort_order.cc',
104105
'util/bucket_util.cc',

src/iceberg/partition_spec.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
9595
return std::make_unique<StructType>(std::move(partition_fields));
9696
}
9797

98+
bool PartitionSpec::SameSpec(const PartitionSpec& other) const {
99+
return fields_ == other.fields_ &&
100+
last_assigned_field_id_ == other.last_assigned_field_id_;
101+
}
102+
98103
std::string PartitionSpec::ToString() const {
99104
std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_);
100105
for (const auto& field : fields_) {

src/iceberg/partition_spec.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
6464
/// \brief Get the partition type binding to the input schema.
6565
Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) const;
6666

67+
/// \brief Checks whether this partition spec is equivalent to another partition spec
68+
/// while ignoring the spec id.
69+
bool SameSpec(const PartitionSpec& other) const;
70+
6771
std::string ToString() const override;
6872

6973
int32_t last_assigned_field_id() const { return last_assigned_field_id_; }

src/iceberg/table.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include "iceberg/table.h"
2121

22+
#include <memory>
23+
2224
#include "iceberg/catalog.h"
2325
#include "iceberg/partition_spec.h"
2426
#include "iceberg/result.h"
@@ -28,6 +30,7 @@
2830
#include "iceberg/table_properties.h"
2931
#include "iceberg/table_scan.h"
3032
#include "iceberg/transaction.h"
33+
#include "iceberg/update/update_partition_spec.h"
3134
#include "iceberg/update/update_properties.h"
3235
#include "iceberg/util/macros.h"
3336

@@ -147,6 +150,13 @@ Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
147150
/*auto_commit=*/false);
148151
}
149152

153+
Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
154+
ICEBERG_ASSIGN_OR_RAISE(
155+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
156+
/*auto_commit=*/true));
157+
return transaction->NewUpdatePartitionSpec();
158+
}
159+
150160
Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
151161
ICEBERG_ASSIGN_OR_RAISE(
152162
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,

src/iceberg/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
128128
/// \brief Create a new Transaction to commit multiple table operations at once.
129129
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
130130

131+
/// \brief Create a new UpdatePartitionSpec to update the partition spec of this table
132+
/// and commit the changes.
133+
virtual Result<std::shared_ptr<UpdatePartitionSpec>> NewUpdatePartitionSpec();
134+
131135
/// \brief Create a new UpdateProperties to update table properties and commit the
132136
/// changes.
133137
virtual Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();

src/iceberg/table_metadata.cc

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,8 @@ class TableMetadataBuilder::Impl {
428428
Result<int32_t> AddSortOrder(const SortOrder& order);
429429
Status SetProperties(const std::unordered_map<std::string, std::string>& updated);
430430
Status RemoveProperties(const std::unordered_set<std::string>& removed);
431-
431+
Status SetDefaultPartitionSpec(int32_t spec_id);
432+
Result<int32_t> AddPartitionSpec(const PartitionSpec& spec);
432433
std::unique_ptr<TableMetadata> Build();
433434

434435
private:
@@ -438,6 +439,12 @@ class TableMetadataBuilder::Impl {
438439
/// \return The ID to use for this sort order (reused if exists, new otherwise)
439440
int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);
440441

442+
/// \brief Internal method to check for existing partition spec and reuse its ID or
443+
/// create a new one
444+
/// \param new_spec The partition spec to check
445+
/// \return The ID to use for this partition spec (reused if exists, new otherwise)
446+
int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec);
447+
441448
private:
442449
// Base metadata (nullptr for new tables)
443450
const TableMetadata* base_;
@@ -572,6 +579,58 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
572579
return new_order_id;
573580
}
574581

582+
Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) {
583+
if (spec_id == -1) {
584+
if (!last_added_spec_id_.has_value()) {
585+
return InvalidArgument(
586+
"Cannot set last added partition spec: no partition spec has been added");
587+
}
588+
return SetDefaultPartitionSpec(last_added_spec_id_.value());
589+
}
590+
591+
if (spec_id == metadata_.default_spec_id) {
592+
return {};
593+
}
594+
595+
metadata_.default_spec_id = spec_id;
596+
597+
changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(spec_id));
598+
return {};
599+
}
600+
601+
Result<int32_t> TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec& spec) {
602+
int32_t new_spec_id = ReuseOrCreateNewPartitionSpecId(spec);
603+
604+
if (specs_by_id_.find(new_spec_id) != specs_by_id_.end()) {
605+
// update last_added_spec_id if the spec was added in this set of changes (since it
606+
// is now the last)
607+
bool is_new_spec = last_added_spec_id_.has_value() &&
608+
std::ranges::find_if(changes_, [new_spec_id](const auto& change) {
609+
auto* add_spec =
610+
dynamic_cast<table::AddPartitionSpec*>(change.get());
611+
return add_spec && add_spec->spec()->spec_id() == new_spec_id;
612+
}) != changes_.cend();
613+
last_added_spec_id_ = is_new_spec ? std::make_optional(new_spec_id) : std::nullopt;
614+
return new_spec_id;
615+
}
616+
617+
// Get current schema and validate the partition spec against it
618+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema());
619+
ICEBERG_RETURN_UNEXPECTED(spec.Validate(*schema, /*allow_missing_fields=*/false));
620+
621+
std::shared_ptr<PartitionSpec> new_spec;
622+
ICEBERG_ASSIGN_OR_RAISE(
623+
new_spec,
624+
PartitionSpec::Make(new_spec_id, std::vector<PartitionField>(spec.fields().begin(),
625+
spec.fields().end())));
626+
metadata_.partition_specs.push_back(new_spec);
627+
specs_by_id_.emplace(new_spec_id, new_spec);
628+
629+
changes_.push_back(std::make_unique<table::AddPartitionSpec>(new_spec));
630+
last_added_spec_id_ = new_spec_id;
631+
return new_spec_id;
632+
}
633+
575634
Status TableMetadataBuilder::Impl::SetProperties(
576635
const std::unordered_map<std::string, std::string>& updated) {
577636
// If updated is empty, return early (no-op)
@@ -653,6 +712,20 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId(
653712
return new_order_id;
654713
}
655714

715+
int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
716+
const PartitionSpec& new_spec) {
717+
// determine the next spec id
718+
int32_t new_spec_id = PartitionSpec::kInitialSpecId;
719+
for (const auto& spec : metadata_.partition_specs) {
720+
if (spec->SameSpec(new_spec)) {
721+
return spec->spec_id();
722+
} else if (new_spec_id <= spec->spec_id()) {
723+
new_spec_id = spec->spec_id() + 1;
724+
}
725+
}
726+
return new_spec_id;
727+
}
728+
656729
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
657730
: impl_(std::make_unique<Impl>(format_version)) {}
658731

@@ -723,16 +796,19 @@ TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr<Schema> sc
723796

724797
TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(
725798
std::shared_ptr<PartitionSpec> spec) {
726-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
799+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
800+
return SetDefaultPartitionSpec(spec_id);
727801
}
728802

729803
TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(int32_t spec_id) {
730-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
804+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetDefaultPartitionSpec(spec_id));
805+
return *this;
731806
}
732807

733808
TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
734809
std::shared_ptr<PartitionSpec> spec) {
735-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
810+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
811+
return *this;
736812
}
737813

738814
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(

src/iceberg/table_update.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
7272
// AddPartitionSpec
7373

7474
void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
75-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
75+
builder.AddPartitionSpec(spec_);
7676
}
7777

7878
void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
@@ -82,7 +82,7 @@ void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
8282
// SetDefaultPartitionSpec
8383

8484
void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
85-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
85+
builder.SetDefaultPartitionSpec(spec_id_);
8686
}
8787

8888
void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ if(ICEBERG_BUILD_BUNDLE)
154154
USE_BUNDLE
155155
SOURCES
156156
transaction_test.cc
157+
update_partition_spec_test.cc
157158
update_properties_test.cc
158159
update_sort_order_test.cc)
159160

0 commit comments

Comments
 (0)