Skip to content

Commit c16b422

Browse files
committed
feat(rest): implement create table
1 parent da91626 commit c16b422

File tree

12 files changed

+314
-39
lines changed

12 files changed

+314
-39
lines changed

src/iceberg/catalog.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,13 @@ class ICEBERG_EXPORT Catalog {
107107
/// \param identifier a table identifier
108108
/// \param schema a schema
109109
/// \param spec a partition spec
110+
/// \param order a sort order
110111
/// \param location a location for the table; leave empty if unspecified
111112
/// \param properties a string map of table properties
112113
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
113114
virtual Result<std::shared_ptr<Table>> CreateTable(
114-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
115+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
116+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
115117
const std::string& location,
116118
const std::unordered_map<std::string, std::string>& properties) = 0;
117119

@@ -131,12 +133,14 @@ class ICEBERG_EXPORT Catalog {
131133
/// \param identifier a table identifier
132134
/// \param schema a schema
133135
/// \param spec a partition spec
136+
/// \param order a sort order
134137
/// \param location a location for the table; leave empty if unspecified
135138
/// \param properties a string map of table properties
136139
/// \return a Transaction to create the table or ErrorKind::kAlreadyExists if the
137140
/// table already exists
138141
virtual Result<std::shared_ptr<Transaction>> StageCreateTable(
139-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
142+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
143+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
140144
const std::string& location,
141145
const std::unordered_map<std::string, std::string>& properties) = 0;
142146

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
400400
}
401401

402402
Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
403-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
403+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
404+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
404405
const std::string& location,
405406
const std::unordered_map<std::string, std::string>& properties) {
406407
std::unique_lock lock(mutex_);
@@ -439,7 +440,8 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
439440
}
440441

441442
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
442-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
443+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
444+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
443445
const std::string& location,
444446
const std::unordered_map<std::string, std::string>& properties) {
445447
std::unique_lock lock(mutex_);

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ class ICEBERG_EXPORT InMemoryCatalog
7171
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7272

7373
Result<std::shared_ptr<Table>> CreateTable(
74-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
74+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
75+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
7576
const std::string& location,
7677
const std::unordered_map<std::string, std::string>& properties) override;
7778

@@ -81,7 +82,8 @@ class ICEBERG_EXPORT InMemoryCatalog
8182
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
8283

8384
Result<std::shared_ptr<Transaction>> StageCreateTable(
84-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
85+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
86+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
8587
const std::string& location,
8688
const std::unordered_map<std::string, std::string>& properties) override;
8789

src/iceberg/catalog/rest/json_internal.cc

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/catalog/rest/json_internal.h"
2121

22+
#include <memory>
2223
#include <string>
2324
#include <utility>
2425
#include <vector>
@@ -27,6 +28,8 @@
2728

2829
#include "iceberg/catalog/rest/types.h"
2930
#include "iceberg/json_internal.h"
31+
#include "iceberg/partition_spec.h"
32+
#include "iceberg/sort_order.h"
3033
#include "iceberg/table_identifier.h"
3134
#include "iceberg/util/json_util_internal.h"
3235
#include "iceberg/util/macros.h"
@@ -336,6 +339,57 @@ Result<ListTablesResponse> ListTablesResponseFromJson(const nlohmann::json& json
336339
return response;
337340
}
338341

342+
nlohmann::json ToJson(const CreateTableRequest& request) {
343+
nlohmann::json json;
344+
json[kName] = request.name;
345+
SetOptionalStringField(json, kLocation, request.location);
346+
if (request.schema) {
347+
json[kSchema] = ToJson(*request.schema);
348+
}
349+
if (request.partition_spec) {
350+
json[kPartitionSpec] = ToJson(*request.partition_spec);
351+
}
352+
if (request.write_order) {
353+
json[kWriteOrder] = ToJson(*request.write_order);
354+
}
355+
if (request.stage_create) {
356+
json[kStageCreate] = request.stage_create;
357+
}
358+
SetContainerField(json, kProperties, request.properties);
359+
return json;
360+
}
361+
362+
Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json) {
363+
CreateTableRequest request;
364+
ICEBERG_ASSIGN_OR_RAISE(request.name, GetJsonValue<std::string>(json, kName));
365+
ICEBERG_ASSIGN_OR_RAISE(request.location,
366+
GetJsonValueOrDefault<std::string>(json, kLocation));
367+
ICEBERG_ASSIGN_OR_RAISE(auto schema_json, GetJsonValue<nlohmann::json>(json, kSchema));
368+
ICEBERG_ASSIGN_OR_RAISE(request.schema, SchemaFromJson(schema_json));
369+
370+
if (json.contains(kPartitionSpec)) {
371+
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
372+
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
373+
ICEBERG_ASSIGN_OR_RAISE(request.partition_spec,
374+
PartitionSpecFromJson(request.schema, partition_spec,
375+
PartitionSpec::kInitialSpecId));
376+
}
377+
if (json.contains(kWriteOrder)) {
378+
ICEBERG_ASSIGN_OR_RAISE(auto sort_order_json,
379+
GetJsonValue<nlohmann::json>(json, kWriteOrder));
380+
ICEBERG_ASSIGN_OR_RAISE(request.write_order,
381+
SortOrderFromJson(sort_order_json, request.schema));
382+
}
383+
384+
ICEBERG_ASSIGN_OR_RAISE(request.stage_create,
385+
GetJsonValueOrDefault<bool>(json, kStageCreate, false));
386+
ICEBERG_ASSIGN_OR_RAISE(
387+
request.properties,
388+
GetJsonValueOrDefault<decltype(request.properties)>(json, kProperties));
389+
ICEBERG_RETURN_UNEXPECTED(request.Validate());
390+
return request;
391+
}
392+
339393
#define ICEBERG_DEFINE_FROM_JSON(Model) \
340394
template <> \
341395
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
@@ -354,5 +408,6 @@ ICEBERG_DEFINE_FROM_JSON(ListTablesResponse)
354408
ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
355409
ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
356410
ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
411+
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
357412

358413
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ ICEBERG_DECLARE_JSON_SERDE(ListTablesResponse)
5555
ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
5656
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
5757
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
58+
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
5859

5960
#undef ICEBERG_DECLARE_JSON_SERDE
6061

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "iceberg/partition_spec.h"
4141
#include "iceberg/result.h"
4242
#include "iceberg/schema.h"
43+
#include "iceberg/sort_order.h"
4344
#include "iceberg/table.h"
4445
#include "iceberg/util/macros.h"
4546

@@ -77,8 +78,8 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,
7778

7879
RestCatalog::~RestCatalog() = default;
7980

80-
Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
81-
const RestCatalogProperties& config) {
81+
Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
82+
const RestCatalogProperties& config, std::shared_ptr<FileIO> file_io) {
8283
ICEBERG_ASSIGN_OR_RAISE(auto uri, config.Uri());
8384
ICEBERG_ASSIGN_OR_RAISE(
8485
auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)),
@@ -103,14 +104,17 @@ Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
103104
ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
104105
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));
105106

106-
return std::unique_ptr<RestCatalog>(
107-
new RestCatalog(std::move(final_config), std::move(paths), std::move(endpoints)));
107+
return std::shared_ptr<RestCatalog>(
108+
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),
109+
std::move(endpoints)));
108110
}
109111

110112
RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
113+
std::shared_ptr<FileIO> file_io,
111114
std::unique_ptr<ResourcePaths> paths,
112115
std::unordered_set<Endpoint> endpoints)
113116
: config_(std::move(config)),
117+
file_io_(std::move(file_io)),
114118
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
115119
paths_(std::move(paths)),
116120
name_(config_->Get(RestCatalogProperties::kName)),
@@ -241,11 +245,33 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
241245
}
242246

243247
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
244-
[[maybe_unused]] const TableIdentifier& identifier,
245-
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
246-
[[maybe_unused]] const std::string& location,
247-
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
248-
return NotImplemented("Not implemented");
248+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
249+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
250+
const std::string& location,
251+
const std::unordered_map<std::string, std::string>& properties) {
252+
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::CreateTable()));
253+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
254+
255+
CreateTableRequest request{
256+
.name = identifier.name,
257+
.location = location,
258+
.schema = schema,
259+
.partition_spec = spec,
260+
.write_order = order,
261+
.stage_create = false,
262+
.properties = properties,
263+
};
264+
265+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
266+
ICEBERG_ASSIGN_OR_RAISE(
267+
const auto response,
268+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
269+
270+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
271+
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
272+
return Table::Make(identifier, load_result.metadata,
273+
std::move(load_result.metadata_location), file_io_,
274+
shared_from_this());
249275
}
250276

251277
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -257,7 +283,9 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
257283

258284
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
259285
[[maybe_unused]] const TableIdentifier& identifier,
260-
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
286+
[[maybe_unused]] const std::shared_ptr<Schema>& schema,
287+
[[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
288+
[[maybe_unused]] const std::shared_ptr<SortOrder>& order,
261289
[[maybe_unused]] const std::string& location,
262290
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
263291
return NotImplemented("Not implemented");

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
#pragma once
2121

2222
#include <memory>
23-
#include <set>
2423
#include <string>
24+
#include <unordered_set>
2525

2626
#include "iceberg/catalog.h"
2727
#include "iceberg/catalog/rest/endpoint.h"
@@ -35,7 +35,8 @@
3535
namespace iceberg::rest {
3636

3737
/// \brief Rest catalog implementation.
38-
class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
38+
class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
39+
public std::enable_shared_from_this<RestCatalog> {
3940
public:
4041
~RestCatalog() override;
4142

@@ -47,8 +48,10 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
4748
/// \brief Create a RestCatalog instance
4849
///
4950
/// \param config the configuration for the RestCatalog
50-
/// \return a unique_ptr to RestCatalog instance
51-
static Result<std::unique_ptr<RestCatalog>> Make(const RestCatalogProperties& config);
51+
/// \param file_io the FileIO instance to use for table operations
52+
/// \return a shared_ptr to RestCatalog instance
53+
static Result<std::shared_ptr<RestCatalog>> Make(const RestCatalogProperties& config,
54+
std::shared_ptr<FileIO> file_io);
5255

5356
std::string_view name() const override;
5457

@@ -72,7 +75,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
7275
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7376

7477
Result<std::shared_ptr<Table>> CreateTable(
75-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
78+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
79+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
7680
const std::string& location,
7781
const std::unordered_map<std::string, std::string>& properties) override;
7882

@@ -82,7 +86,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
8286
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
8387

8488
Result<std::shared_ptr<Transaction>> StageCreateTable(
85-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
89+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
90+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
8691
const std::string& location,
8792
const std::unordered_map<std::string, std::string>& properties) override;
8893

@@ -100,10 +105,11 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
100105

101106
private:
102107
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
103-
std::unique_ptr<ResourcePaths> paths,
108+
std::shared_ptr<FileIO> file_io, std::unique_ptr<ResourcePaths> paths,
104109
std::unordered_set<Endpoint> endpoints);
105110

106111
std::unique_ptr<RestCatalogProperties> config_;
112+
std::shared_ptr<FileIO> file_io_;
107113
std::unique_ptr<HttpClient> client_;
108114
std::unique_ptr<ResourcePaths> paths_;
109115
std::string name_;

src/iceberg/catalog/rest/types.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "iceberg/catalog/rest/endpoint.h"
2828
#include "iceberg/catalog/rest/iceberg_rest_export.h"
2929
#include "iceberg/result.h"
30+
#include "iceberg/schema.h"
3031
#include "iceberg/table_identifier.h"
3132
#include "iceberg/type_fwd.h"
3233
#include "iceberg/util/macros.h"
@@ -138,6 +139,28 @@ struct ICEBERG_REST_EXPORT RenameTableRequest {
138139
bool operator==(const RenameTableRequest&) const = default;
139140
};
140141

142+
/// \brief Request to create a table.
143+
struct ICEBERG_REST_EXPORT CreateTableRequest {
144+
std::string name; // required
145+
std::string location;
146+
std::shared_ptr<Schema> schema; // required
147+
std::shared_ptr<PartitionSpec> partition_spec;
148+
std::shared_ptr<SortOrder> write_order;
149+
bool stage_create = false;
150+
std::unordered_map<std::string, std::string> properties;
151+
152+
/// \brief Validates the CreateTableRequest.
153+
Status Validate() const {
154+
if (name.empty()) {
155+
return Invalid("Missing table name");
156+
}
157+
if (!schema) {
158+
return Invalid("Missing schema");
159+
}
160+
return {};
161+
}
162+
};
163+
141164
/// \brief An opaque token that allows clients to make use of pagination for list APIs.
142165
using PageToken = std::string;
143166

src/iceberg/json_internal.cc

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,6 @@ Result<std::unique_ptr<SortOrder>> SortOrderFromJson(
220220
return SortOrder::Make(*current_schema, order_id, std::move(sort_fields));
221221
}
222222

223-
Result<std::unique_ptr<SortOrder>> SortOrderFromJson(const nlohmann::json& json) {
224-
ICEBERG_ASSIGN_OR_RAISE(auto order_id, GetJsonValue<int32_t>(json, kOrderId));
225-
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
226-
227-
std::vector<SortField> sort_fields;
228-
for (const auto& field_json : fields) {
229-
ICEBERG_ASSIGN_OR_RAISE(auto sort_field, SortFieldFromJson(field_json));
230-
sort_fields.push_back(std::move(*sort_field));
231-
}
232-
return SortOrder::Make(order_id, std::move(sort_fields));
233-
}
234-
235223
nlohmann::json ToJson(const SchemaField& field) {
236224
nlohmann::json json;
237225
json[kId] = field.field_id();

src/iceberg/test/mock_catalog.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ class MockCatalog : public Catalog {
5656
(const, override));
5757

5858
MOCK_METHOD((Result<std::shared_ptr<Table>>), CreateTable,
59-
(const TableIdentifier&, const Schema&, const PartitionSpec&,
59+
(const TableIdentifier&, const std::shared_ptr<Schema>&,
60+
const std::shared_ptr<PartitionSpec>&, const std::shared_ptr<SortOrder>&,
6061
const std::string&, (const std::unordered_map<std::string, std::string>&)),
6162
(override));
6263

@@ -67,7 +68,8 @@ class MockCatalog : public Catalog {
6768
(override));
6869

6970
MOCK_METHOD((Result<std::shared_ptr<Transaction>>), StageCreateTable,
70-
(const TableIdentifier&, const Schema&, const PartitionSpec&,
71+
(const TableIdentifier&, const std::shared_ptr<Schema>&,
72+
const std::shared_ptr<PartitionSpec>&, const std::shared_ptr<SortOrder>&,
7173
const std::string&, (const std::unordered_map<std::string, std::string>&)),
7274
(override));
7375

0 commit comments

Comments
 (0)