Skip to content

Commit b83be34

Browse files
committed
feat: InMemoryCatalog create table
1 parent 25daf33 commit b83be34

File tree

13 files changed

+264
-34
lines changed

13 files changed

+264
-34
lines changed

src/iceberg/catalog.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,15 @@ class ICEBERG_EXPORT Catalog {
105105
/// \brief Create a table
106106
///
107107
/// \param identifier a table identifier
108+
/// \param location a location for the table; leave empty if unspecified
108109
/// \param schema a schema
109110
/// \param spec a partition spec
110-
/// \param location a location for the table; leave empty if unspecified
111+
/// \param sort_order a sort order
111112
/// \param properties a string map of table properties
112113
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
113114
virtual Result<std::unique_ptr<Table>> CreateTable(
114-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
115-
const std::string& location,
115+
const TableIdentifier& identifier, const std::string& location,
116+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
116117
const std::unordered_map<std::string, std::string>& properties) = 0;
117118

118119
/// \brief Update a table
@@ -129,15 +130,16 @@ class ICEBERG_EXPORT Catalog {
129130
/// \brief Start a transaction to create a table
130131
///
131132
/// \param identifier a table identifier
133+
/// \param location a location for the table; leave empty if unspecified
132134
/// \param schema a schema
133135
/// \param spec a partition spec
134-
/// \param location a location for the table; leave empty if unspecified
136+
/// \param sort_order a sort order
135137
/// \param properties a string map of table properties
136138
/// \return a Transaction to create the table or ErrorKind::kAlreadyExists if the
137139
/// table already exists
138140
virtual Result<std::shared_ptr<Transaction>> StageCreateTable(
139-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
140-
const std::string& location,
141+
const TableIdentifier& identifier, const std::string& location,
142+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
141143
const std::unordered_map<std::string, std::string>& properties) = 0;
142144

143145
/// \brief Check whether table exists

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <algorithm>
2323
#include <iterator>
2424

25+
#include "iceberg/sort_order.h"
2526
#include "iceberg/table.h"
2627
#include "iceberg/table_identifier.h"
2728
#include "iceberg/table_metadata.h"
@@ -318,7 +319,7 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
318319
ICEBERG_RETURN_UNEXPECTED(ns);
319320
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
320321
if (it == ns.value()->table_metadata_locations_.end()) {
321-
return NotFound("{} does not exist", table_ident.name);
322+
return NotFound("Table does not exist: {}", table_ident);
322323
}
323324
return it->second;
324325
}
@@ -400,11 +401,25 @@ Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
400401
}
401402

402403
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
403-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
404-
const std::string& location,
404+
const TableIdentifier& identifier, const std::string& location, const Schema& schema,
405+
const PartitionSpec& spec, const SortOrder& sort_order,
405406
const std::unordered_map<std::string, std::string>& properties) {
406407
std::unique_lock lock(mutex_);
407-
return NotImplemented("create table");
408+
if (root_namespace_->TableExists(identifier).value_or(false)) {
409+
return AlreadyExists("Table already exists: {}", identifier);
410+
}
411+
412+
std::string base_location =
413+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
414+
ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadata::Make(base_location, schema, spec,
415+
sort_order, properties));
416+
ICEBERG_ASSIGN_OR_RAISE(auto metadata_file_location,
417+
TableMetadataUtil::Write(*file_io_, nullptr, "", *metadata));
418+
ICEBERG_RETURN_UNEXPECTED(
419+
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
420+
return std::make_unique<Table>(identifier, std::move(metadata),
421+
std::move(metadata_file_location), file_io_,
422+
std::static_pointer_cast<Catalog>(shared_from_this()));
408423
}
409424

410425
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
@@ -440,11 +455,20 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
440455
}
441456

442457
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
443-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
444-
const std::string& location,
458+
const TableIdentifier& identifier, const std::string& location, const Schema& schema,
459+
const PartitionSpec& spec, const SortOrder& sort_order,
445460
const std::unordered_map<std::string, std::string>& properties) {
446461
std::unique_lock lock(mutex_);
447-
return NotImplemented("stage create table");
462+
if (root_namespace_->TableExists(identifier).value_or(false)) {
463+
return AlreadyExists("Table already exists: {}", identifier);
464+
}
465+
466+
std::string base_location =
467+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
468+
ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadata::Make(base_location, schema, spec,
469+
sort_order, properties));
470+
// TODO(zhuo.wang) Init transaction with metadata
471+
return nullptr;
448472
}
449473

450474
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
@@ -495,7 +519,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
495519

496520
std::unique_lock lock(mutex_);
497521
if (!root_namespace_->NamespaceExists(identifier.ns)) {
498-
return NoSuchNamespace("table namespace does not exist.");
522+
return NoSuchNamespace("Table namespace does not exist: {}", identifier.ns);
499523
}
500524
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
501525
return UnknownError("The registry failed.");

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class ICEBERG_EXPORT InMemoryCatalog
7171
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7272

7373
Result<std::unique_ptr<Table>> CreateTable(
74-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
75-
const std::string& location,
74+
const TableIdentifier& identifier, const std::string& location,
75+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
7676
const std::unordered_map<std::string, std::string>& properties) override;
7777

7878
Result<std::unique_ptr<Table>> UpdateTable(
@@ -81,8 +81,8 @@ class ICEBERG_EXPORT InMemoryCatalog
8181
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
8282

8383
Result<std::shared_ptr<Transaction>> StageCreateTable(
84-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
85-
const std::string& location,
84+
const TableIdentifier& identifier, const std::string& location,
85+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
8686
const std::unordered_map<std::string, std::string>& properties) override;
8787

8888
Result<bool> TableExists(const TableIdentifier& identifier) const override;

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,9 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
242242

243243
Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
244244
[[maybe_unused]] const TableIdentifier& identifier,
245-
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
246-
[[maybe_unused]] const std::string& location,
245+
[[maybe_unused]] const std::string& location, [[maybe_unused]] const Schema& schema,
246+
[[maybe_unused]] const PartitionSpec& spec,
247+
[[maybe_unused]] const SortOrder& sort_order,
247248
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
248249
return NotImplemented("Not implemented");
249250
}
@@ -257,8 +258,9 @@ Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
257258

258259
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
259260
[[maybe_unused]] const TableIdentifier& identifier,
260-
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
261-
[[maybe_unused]] const std::string& location,
261+
[[maybe_unused]] const std::string& location, [[maybe_unused]] const Schema& schema,
262+
[[maybe_unused]] const PartitionSpec& spec,
263+
[[maybe_unused]] const SortOrder& sort_order,
262264
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
263265
return NotImplemented("Not implemented");
264266
}

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#pragma once
2121

2222
#include <memory>
23-
#include <set>
2423
#include <string>
2524

2625
#include "iceberg/catalog.h"
@@ -72,8 +71,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
7271
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7372

7473
Result<std::unique_ptr<Table>> CreateTable(
75-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
76-
const std::string& location,
74+
const TableIdentifier& identifier, const std::string& location,
75+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
7776
const std::unordered_map<std::string, std::string>& properties) override;
7877

7978
Result<std::unique_ptr<Table>> UpdateTable(
@@ -82,8 +81,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
8281
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
8382

8483
Result<std::shared_ptr<Transaction>> StageCreateTable(
85-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
86-
const std::string& location,
84+
const TableIdentifier& identifier, const std::string& location,
85+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
8786
const std::unordered_map<std::string, std::string>& properties) override;
8887

8988
Result<bool> TableExists(const TableIdentifier& identifier) const override;

src/iceberg/table_identifier.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
/// \file iceberg/table_identifier.h
2323
/// A TableIdentifier is a unique identifier for a table
2424

25+
#include <format>
26+
#include <sstream>
2527
#include <string>
2628
#include <vector>
2729

@@ -35,6 +37,15 @@ struct ICEBERG_EXPORT Namespace {
3537
std::vector<std::string> levels;
3638

3739
bool operator==(const Namespace& other) const { return levels == other.levels; }
40+
41+
std::string ToString() const {
42+
std::ostringstream oss;
43+
for (size_t i = 0; i < levels.size(); ++i) {
44+
if (i) oss << '.';
45+
oss << levels[i];
46+
}
47+
return oss.str();
48+
}
3849
};
3950

4051
/// \brief Identifies a table in iceberg catalog.
@@ -53,6 +64,27 @@ struct ICEBERG_EXPORT TableIdentifier {
5364
}
5465
return {};
5566
}
67+
68+
std::string ToString() const { return ns.ToString() + '.' + name; }
5669
};
5770

5871
} // namespace iceberg
72+
73+
namespace std {
74+
75+
template <>
76+
struct formatter<iceberg::Namespace> : std::formatter<std::string> {
77+
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
78+
auto format(const iceberg::Namespace& ns, format_context& ctx) const {
79+
return std::formatter<std::string>::format(ns.ToString(), ctx);
80+
}
81+
};
82+
83+
template <>
84+
struct formatter<iceberg::TableIdentifier> : std::formatter<std::string> {
85+
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
86+
auto format(const iceberg::TableIdentifier& id, format_context& ctx) const {
87+
return std::formatter<std::string>::format(id.ToString(), ctx);
88+
}
89+
};
90+
} // namespace std

src/iceberg/table_metadata.cc

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,19 @@
3636
#include "iceberg/exception.h"
3737
#include "iceberg/file_io.h"
3838
#include "iceberg/json_internal.h"
39+
#include "iceberg/metrics_config.h"
3940
#include "iceberg/partition_spec.h"
4041
#include "iceberg/result.h"
4142
#include "iceberg/schema.h"
4243
#include "iceberg/snapshot.h"
4344
#include "iceberg/sort_order.h"
4445
#include "iceberg/table_properties.h"
4546
#include "iceberg/table_update.h"
47+
#include "iceberg/util/error_collector.h"
4648
#include "iceberg/util/gzip_internal.h"
4749
#include "iceberg/util/location_util.h"
4850
#include "iceberg/util/macros.h"
51+
#include "iceberg/util/type_util.h"
4952
#include "iceberg/util/uuid.h"
5053
namespace iceberg {
5154
namespace {
@@ -64,6 +67,47 @@ std::string ToString(const MetadataLogEntry& entry) {
6467
entry.metadata_file);
6568
}
6669

70+
Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
71+
const std::string& location, const iceberg::Schema& schema,
72+
const iceberg::PartitionSpec& spec, const iceberg::SortOrder& sort_order,
73+
const std::unordered_map<std::string, std::string>& properties, int format_version) {
74+
for (const auto& [key, _] : properties) {
75+
if (TableProperties::reserved_properties().contains(key)) {
76+
return InvalidArgument(
77+
"Table properties should not contain reserved properties, but got {}", key);
78+
}
79+
}
80+
81+
int last_column_id = 0;
82+
auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
83+
84+
auto fresh_schema =
85+
IdAssigner::AssignFreshIds(Schema::kInitialSchemaId, schema, next_id);
86+
87+
ICEBERG_ASSIGN_OR_RAISE(
88+
auto fresh_spec,
89+
PartitionSpec::Make(
90+
*fresh_schema, PartitionSpec::kInitialSpecId,
91+
std::vector<PartitionField>(spec.fields().begin(), spec.fields().end()),
92+
false));
93+
ICEBERG_ASSIGN_OR_RAISE(
94+
auto fresh_order,
95+
SortOrder::Make(sort_order.is_unsorted() ? sort_order.order_id()
96+
: SortOrder::kInitialSortOrderId,
97+
std::vector<SortField>(sort_order.fields().begin(),
98+
sort_order.fields().end())));
99+
ICEBERG_RETURN_UNEXPECTED(
100+
MetricsConfig::VerifyReferencedColumns(properties, *fresh_schema));
101+
102+
return TableMetadataBuilder::BuildFromEmpty(format_version)
103+
->SetLocation(location)
104+
.SetCurrentSchema(fresh_schema, next_id())
105+
.AddPartitionSpec(std::move(fresh_spec))
106+
.AddSortOrder(std::move(fresh_order))
107+
.SetProperties(properties)
108+
.Build();
109+
}
110+
67111
Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
68112
return SchemaById(current_schema_id);
69113
}
@@ -417,6 +461,33 @@ struct TableMetadataBuilder::Impl {
417461

418462
metadata.last_updated_ms = kInvalidLastUpdatedMs;
419463
}
464+
465+
// TODO(zhuo.wang) Do validation
466+
Status AddSchema(const std::shared_ptr<Schema>& schema, int new_last_column_id) {
467+
if (new_last_column_id < metadata.last_column_id) {
468+
return InvalidArgument("Invalid last column ID: {} < {} (previous last column ID)",
469+
new_last_column_id, metadata.last_column_id);
470+
}
471+
metadata.schemas.push_back(schema);
472+
schemas_by_id.emplace(schema->schema_id().value(), schema);
473+
metadata.current_schema_id = schema->schema_id().value();
474+
475+
return {};
476+
}
477+
478+
// TODO(zhuo.wang) Do validation
479+
Status AddPartitionSpec(const std::shared_ptr<PartitionSpec>& spec) {
480+
metadata.partition_specs.push_back(spec);
481+
specs_by_id.emplace(spec->spec_id(), spec);
482+
return {};
483+
}
484+
485+
// TODO(zhuo.wang) Do validation
486+
Status AddSortOrder(const std::shared_ptr<SortOrder>& order) {
487+
metadata.sort_orders.push_back(order);
488+
sort_orders_by_id.emplace(order->order_id(), order);
489+
return {};
490+
}
420491
};
421492

422493
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
@@ -529,7 +600,7 @@ TableMetadataBuilder& TableMetadataBuilder::UpgradeFormatVersion(
529600

530601
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(
531602
std::shared_ptr<Schema> schema, int32_t new_last_column_id) {
532-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
603+
BUILDER_RETURN(impl_->AddSchema(schema, new_last_column_id));
533604
}
534605

535606
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(int32_t schema_id) {
@@ -551,7 +622,7 @@ TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(int32_t spec
551622

552623
TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
553624
std::shared_ptr<PartitionSpec> spec) {
554-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
625+
BUILDER_RETURN(impl_->AddPartitionSpec(spec));
555626
}
556627

557628
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
@@ -746,7 +817,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveProperties(
746817
}
747818

748819
TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) {
749-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
820+
impl_->metadata.location = location;
821+
return *this;
750822
}
751823

752824
TableMetadataBuilder& TableMetadataBuilder::AddEncryptionKey(

src/iceberg/table_metadata.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ struct ICEBERG_EXPORT TableMetadata {
124124
/// A `long` higher than all assigned row IDs
125125
int64_t next_row_id;
126126

127+
static Result<std::unique_ptr<TableMetadata>> Make(
128+
const std::string& location, const iceberg::Schema& schema,
129+
const iceberg::PartitionSpec& spec, const iceberg::SortOrder& sort_order,
130+
const std::unordered_map<std::string, std::string>& properties,
131+
int format_version = kDefaultTableFormatVersion);
132+
127133
/// \brief Get the current schema, return NotFoundError if not found
128134
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
129135
/// \brief Get the current schema by ID, return NotFoundError if not found

0 commit comments

Comments
 (0)