Skip to content

Commit fea93b1

Browse files
authored
feat: implement create (stage) table for InMemoryCatalog (#416)
1 parent cc5e5cc commit fea93b1

24 files changed

+711
-43
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ set(ICEBERG_SOURCES
6666
sort_order.cc
6767
statistics_file.cc
6868
table.cc
69+
table_identifier.cc
6970
table_metadata.cc
7071
table_properties.cc
7172
table_requirement.cc
@@ -86,6 +87,7 @@ set(ICEBERG_SOURCES
8687
util/decimal.cc
8788
util/gzip_internal.cc
8889
util/murmurhash3_internal.cc
90+
util/property_util.cc
8991
util/snapshot_util.cc
9092
util/temporal_util.cc
9193
util/timepoint.cc

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
#include "iceberg/table_identifier.h"
2727
#include "iceberg/table_metadata.h"
2828
#include "iceberg/table_requirement.h"
29+
#include "iceberg/table_requirements.h"
2930
#include "iceberg/table_update.h"
31+
#include "iceberg/transaction.h"
32+
#include "iceberg/util/checked_cast.h"
3033
#include "iceberg/util/macros.h"
3134

3235
namespace iceberg {
@@ -318,7 +321,7 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
318321
ICEBERG_RETURN_UNEXPECTED(ns);
319322
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
320323
if (it == ns.value()->table_metadata_locations_.end()) {
321-
return NotFound("{} does not exist", table_ident.name);
324+
return NotFound("Table does not exist: {}", table_ident);
322325
}
323326
return it->second;
324327
}
@@ -405,32 +408,68 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
405408
const std::string& location,
406409
const std::unordered_map<std::string, std::string>& properties) {
407410
std::unique_lock lock(mutex_);
408-
return NotImplemented("create table");
411+
if (root_namespace_->TableExists(identifier).value_or(false)) {
412+
return AlreadyExists("Table already exists: {}", identifier);
413+
}
414+
415+
std::string base_location =
416+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
417+
418+
ICEBERG_ASSIGN_OR_RAISE(auto table_metadata, TableMetadata::Make(*schema, *spec, *order,
419+
location, properties));
420+
421+
ICEBERG_ASSIGN_OR_RAISE(
422+
auto metadata_file_location,
423+
TableMetadataUtil::Write(*file_io_, nullptr, "", *table_metadata));
424+
ICEBERG_RETURN_UNEXPECTED(
425+
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
426+
return Table::Make(identifier, std::move(table_metadata),
427+
std::move(metadata_file_location), file_io_, shared_from_this());
409428
}
410429

411430
Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
412431
const TableIdentifier& identifier,
413432
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
414433
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
415434
std::unique_lock lock(mutex_);
416-
ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location,
417-
root_namespace_->GetTableMetadataLocation(identifier));
418-
419-
ICEBERG_ASSIGN_OR_RAISE(auto base,
420-
TableMetadataUtil::Read(*file_io_, base_metadata_location));
435+
auto base_metadata_location = root_namespace_->GetTableMetadataLocation(identifier);
436+
std::unique_ptr<TableMetadata> base;
437+
std::unique_ptr<TableMetadataBuilder> builder;
438+
ICEBERG_ASSIGN_OR_RAISE(auto is_create, TableRequirements::IsCreate(requirements));
439+
if (is_create) {
440+
if (base_metadata_location.has_value()) {
441+
return AlreadyExists("Table already exists: {}", identifier);
442+
}
443+
int8_t format_version = TableMetadata::kDefaultTableFormatVersion;
444+
for (const auto& update : updates) {
445+
if (update->kind() == TableUpdate::Kind::kUpgradeFormatVersion) {
446+
format_version =
447+
internal::checked_cast<const table::UpgradeFormatVersion&>(*update)
448+
.format_version();
449+
}
450+
}
451+
builder = TableMetadataBuilder::BuildFromEmpty(format_version);
452+
} else {
453+
ICEBERG_RETURN_UNEXPECTED(base_metadata_location);
454+
ICEBERG_ASSIGN_OR_RAISE(
455+
base, TableMetadataUtil::Read(*file_io_, base_metadata_location.value()));
456+
builder = TableMetadataBuilder::BuildFrom(base.get());
457+
}
421458

422459
for (const auto& requirement : requirements) {
423460
ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get()));
424461
}
425462

426-
auto builder = TableMetadataBuilder::BuildFrom(base.get());
427463
for (const auto& update : updates) {
428464
update->ApplyTo(*builder);
429465
}
430466
ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build());
431467
ICEBERG_ASSIGN_OR_RAISE(
432468
auto new_metadata_location,
433-
TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location, *updated));
469+
TableMetadataUtil::Write(
470+
*file_io_, base.get(),
471+
base_metadata_location.has_value() ? base_metadata_location.value() : "",
472+
*updated));
434473
ICEBERG_RETURN_UNEXPECTED(
435474
root_namespace_->UpdateTableMetadataLocation(identifier, new_metadata_location));
436475
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);
@@ -445,7 +484,21 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
445484
const std::string& location,
446485
const std::unordered_map<std::string, std::string>& properties) {
447486
std::unique_lock lock(mutex_);
448-
return NotImplemented("stage create table");
487+
if (root_namespace_->TableExists(identifier).value_or(false)) {
488+
return AlreadyExists("Table already exists: {}", identifier);
489+
}
490+
491+
std::string base_location =
492+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
493+
494+
ICEBERG_ASSIGN_OR_RAISE(
495+
auto table_metadata,
496+
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
497+
ICEBERG_ASSIGN_OR_RAISE(
498+
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
499+
shared_from_this()));
500+
return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
501+
/* auto_commit */ false);
449502
}
450503

451504
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
@@ -495,7 +548,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
495548

496549
std::unique_lock lock(mutex_);
497550
if (!root_namespace_->NamespaceExists(identifier.ns)) {
498-
return NoSuchNamespace("table namespace does not exist.");
551+
return NoSuchNamespace("Table namespace does not exist: {}", identifier.ns);
499552
}
500553
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
501554
return UnknownError("The registry failed.");

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ iceberg_sources = files(
8888
'sort_order.cc',
8989
'statistics_file.cc',
9090
'table.cc',
91+
'table_identifier.cc',
9192
'table_metadata.cc',
9293
'table_properties.cc',
9394
'table_requirement.cc',
@@ -108,6 +109,7 @@ iceberg_sources = files(
108109
'util/decimal.cc',
109110
'util/gzip_internal.cc',
110111
'util/murmurhash3_internal.cc',
112+
'util/property_util.cc',
111113
'util/snapshot_util.cc',
112114
'util/temporal_util.cc',
113115
'util/timepoint.cc',

src/iceberg/schema.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
235235
for (auto id : identifier_field_ids_) {
236236
ICEBERG_ASSIGN_OR_RAISE(auto name, FindColumnNameById(id));
237237
if (!name.has_value()) {
238-
return InvalidSchema("Cannot find the field of the specified field id: {}", id);
238+
return InvalidSchema("Cannot find identifier field id: {}", id);
239239
}
240240
names.emplace_back(name.value());
241241
}

src/iceberg/table_identifier.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/table_identifier.h"
21+
22+
#include "iceberg/util/formatter_internal.h"
23+
24+
namespace iceberg {
25+
26+
std::string Namespace::ToString() const { return FormatRange(levels, ".", "", ""); }
27+
28+
std::string TableIdentifier::ToString() const {
29+
if (!ns.levels.empty()) {
30+
return std::format("{}.{}", ns.ToString(), name);
31+
} else {
32+
return name;
33+
}
34+
}
35+
36+
} // namespace iceberg

src/iceberg/table_identifier.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include "iceberg/iceberg_export.h"
2929
#include "iceberg/result.h"
30+
#include "iceberg/util/formatter.h" // IWYU pragma: keep
3031

3132
namespace iceberg {
3233

@@ -35,8 +36,12 @@ struct ICEBERG_EXPORT Namespace {
3536
std::vector<std::string> levels;
3637

3738
bool operator==(const Namespace& other) const { return levels == other.levels; }
39+
40+
std::string ToString() const;
3841
};
3942

43+
ICEBERG_EXPORT inline std::string ToString(const Namespace& ns) { return ns.ToString(); }
44+
4045
/// \brief Identifies a table in iceberg catalog.
4146
struct ICEBERG_EXPORT TableIdentifier {
4247
Namespace ns;
@@ -53,6 +58,12 @@ struct ICEBERG_EXPORT TableIdentifier {
5358
}
5459
return {};
5560
}
61+
62+
std::string ToString() const;
5663
};
5764

65+
ICEBERG_EXPORT inline std::string ToString(const TableIdentifier& ident) {
66+
return ident.ToString();
67+
}
68+
5869
} // namespace iceberg

0 commit comments

Comments
 (0)