From fc0028f4fe39709b568f5e84f7560256366b75b5 Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Sun, 4 Jan 2026 15:40:34 +0800 Subject: [PATCH 1/2] feat(rest):implement stage create table --- src/iceberg/catalog/rest/rest_catalog.cc | 47 ++++++++++++++--------- src/iceberg/catalog/rest/rest_catalog.h | 6 +++ src/iceberg/catalog/rest/type_fwd.h | 1 + src/iceberg/test/rest_catalog_test.cc | 48 ++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 17 deletions(-) diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index a799d69a3..f00784db6 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -44,6 +44,7 @@ #include "iceberg/table.h" #include "iceberg/table_requirement.h" #include "iceberg/table_update.h" +#include "iceberg/transaction.h" #include "iceberg/util/macros.h" namespace iceberg::rest { @@ -274,11 +275,11 @@ Result> RestCatalog::ListTables(const Namespace& ns return result; } -Result> RestCatalog::CreateTable( +Result RestCatalog::CreateTableInternal( const TableIdentifier& identifier, const std::shared_ptr& schema, const std::shared_ptr& spec, const std::shared_ptr& order, const std::string& location, - const std::unordered_map& properties) { + const std::unordered_map& properties, bool stage_create) { ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns)); @@ -288,7 +289,7 @@ Result> RestCatalog::CreateTable( .schema = schema, .partition_spec = spec, .write_order = order, - .stage_create = false, + .stage_create = stage_create, .properties = properties, }; @@ -298,10 +299,19 @@ Result> RestCatalog::CreateTable( client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance())); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); - ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); - return Table::Make(identifier, load_result.metadata, - std::move(load_result.metadata_location), file_io_, - shared_from_this()); + return LoadTableResultFromJson(json); +} + +Result> RestCatalog::CreateTable( + const TableIdentifier& identifier, const std::shared_ptr& schema, + const std::shared_ptr& spec, const std::shared_ptr& order, + const std::string& location, + const std::unordered_map& properties) { + ICEBERG_ASSIGN_OR_RAISE( + auto result, + CreateTableInternal(identifier, schema, spec, order, location, properties, false)); + return Table::Make(identifier, result.metadata, std::move(result.metadata_location), + file_io_, shared_from_this()); } Result> RestCatalog::UpdateTable( @@ -335,13 +345,19 @@ Result> RestCatalog::UpdateTable( } Result> RestCatalog::StageCreateTable( - [[maybe_unused]] const TableIdentifier& identifier, - [[maybe_unused]] const std::shared_ptr& schema, - [[maybe_unused]] const std::shared_ptr& spec, - [[maybe_unused]] const std::shared_ptr& order, - [[maybe_unused]] const std::string& location, - [[maybe_unused]] const std::unordered_map& properties) { - return NotImplemented("Not implemented"); + const TableIdentifier& identifier, const std::shared_ptr& schema, + const std::shared_ptr& spec, const std::shared_ptr& order, + const std::string& location, + const std::unordered_map& properties) { + ICEBERG_ASSIGN_OR_RAISE( + auto result, + CreateTableInternal(identifier, schema, spec, order, location, properties, true)); + ICEBERG_ASSIGN_OR_RAISE( + auto staged_table, + StagedTable::Make(identifier, result.metadata, std::move(result.metadata_location), + file_io_, shared_from_this())); + return Transaction::Make(staged_table, Transaction::Kind::kCreate, + /*auto_commit=*/false); } Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) { @@ -393,9 +409,6 @@ Result RestCatalog::LoadTableInternal( } Result> RestCatalog::LoadTable(const TableIdentifier& identifier) { - ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable()); - ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); - ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 41928cf7b..721df29d8 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -110,6 +110,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, Result LoadTableInternal(const TableIdentifier& identifier) const; + Result CreateTableInternal( + const TableIdentifier& identifier, const std::shared_ptr& schema, + const std::shared_ptr& spec, const std::shared_ptr& order, + const std::string& location, + const std::unordered_map& properties, bool stage_create); + std::unique_ptr config_; std::shared_ptr file_io_; std::unique_ptr client_; diff --git a/src/iceberg/catalog/rest/type_fwd.h b/src/iceberg/catalog/rest/type_fwd.h index e7fddb91a..9a57c11b8 100644 --- a/src/iceberg/catalog/rest/type_fwd.h +++ b/src/iceberg/catalog/rest/type_fwd.h @@ -25,6 +25,7 @@ namespace iceberg::rest { struct ErrorResponse; +struct LoadTableResult; class Endpoint; class ErrorHandler; diff --git a/src/iceberg/test/rest_catalog_test.cc b/src/iceberg/test/rest_catalog_test.cc index 7f04de0ae..f17b93918 100644 --- a/src/iceberg/test/rest_catalog_test.cc +++ b/src/iceberg/test/rest_catalog_test.cc @@ -52,6 +52,7 @@ #include "iceberg/test/std_io.h" #include "iceberg/test/test_resource.h" #include "iceberg/test/util/docker_compose_util.h" +#include "iceberg/transaction.h" namespace iceberg::rest { @@ -639,4 +640,51 @@ TEST_F(RestCatalogIntegrationTest, RegisterTable) { EXPECT_NE(table->name(), registered_table->name()); } +TEST_F(RestCatalogIntegrationTest, StageCreateTable) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + // Create namespace + Namespace ns{.levels = {"test_stage_create"}}; + auto status = catalog->CreateNamespace(ns, {}); + EXPECT_THAT(status, IsOk()); + + // Stage create table + auto schema = CreateDefaultSchema(); + auto partition_spec = PartitionSpec::Unpartitioned(); + auto sort_order = SortOrder::Unsorted(); + + TableIdentifier table_id{.ns = ns, .name = "staged_table"}; + std::unordered_map table_properties{{"key1", "value1"}}; + auto txn_result = catalog->StageCreateTable(table_id, schema, partition_spec, + sort_order, "", table_properties); + ASSERT_THAT(txn_result, IsOk()); + auto& txn = txn_result.value(); + + // Verify the staged table in transaction + EXPECT_NE(txn->table(), nullptr); + EXPECT_EQ(txn->table()->name(), table_id); + + // Table should NOT exist in catalog yet (staged but not committed) + auto exists_result = catalog->TableExists(table_id); + ASSERT_THAT(exists_result, IsOk()); + EXPECT_FALSE(exists_result.value()); + + // Commit the transaction + auto commit_result = txn->Commit(); + ASSERT_THAT(commit_result, IsOk()); + auto& committed_table = commit_result.value(); + + // Verify table now exists + exists_result = catalog->TableExists(table_id); + ASSERT_THAT(exists_result, IsOk()); + EXPECT_TRUE(exists_result.value()); + + // Verify table properties + EXPECT_EQ(committed_table->name(), table_id); + auto& props = committed_table->metadata()->properties.configs(); + EXPECT_EQ(props.at("key1"), "value1"); +} + } // namespace iceberg::rest From 27abc3f769b1e36bf6412b77860207cbd67a2722 Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Tue, 6 Jan 2026 17:47:25 +0800 Subject: [PATCH 2/2] 1 --- src/iceberg/catalog/rest/rest_catalog.cc | 26 ++++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index f00784db6..5ab6e591f 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -307,11 +307,11 @@ Result> RestCatalog::CreateTable( const std::shared_ptr& spec, const std::shared_ptr& order, const std::string& location, const std::unordered_map& properties) { - ICEBERG_ASSIGN_OR_RAISE( - auto result, - CreateTableInternal(identifier, schema, spec, order, location, properties, false)); - return Table::Make(identifier, result.metadata, std::move(result.metadata_location), - file_io_, shared_from_this()); + ICEBERG_ASSIGN_OR_RAISE(auto result, + CreateTableInternal(identifier, schema, spec, order, location, + properties, /*stage_create=*/false)); + return Table::Make(identifier, std::move(result.metadata), + std::move(result.metadata_location), file_io_, shared_from_this()); } Result> RestCatalog::UpdateTable( @@ -349,14 +349,14 @@ Result> RestCatalog::StageCreateTable( const std::shared_ptr& spec, const std::shared_ptr& order, const std::string& location, const std::unordered_map& properties) { - ICEBERG_ASSIGN_OR_RAISE( - auto result, - CreateTableInternal(identifier, schema, spec, order, location, properties, true)); - ICEBERG_ASSIGN_OR_RAISE( - auto staged_table, - StagedTable::Make(identifier, result.metadata, std::move(result.metadata_location), - file_io_, shared_from_this())); - return Transaction::Make(staged_table, Transaction::Kind::kCreate, + ICEBERG_ASSIGN_OR_RAISE(auto result, + CreateTableInternal(identifier, schema, spec, order, location, + properties, /*stage_create=*/true)); + ICEBERG_ASSIGN_OR_RAISE(auto staged_table, + StagedTable::Make(identifier, std::move(result.metadata), + std::move(result.metadata_location), file_io_, + shared_from_this())); + return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate, /*auto_commit=*/false); }