Skip to content

Commit bfb19f6

Browse files
lishuxushuxu.li
andauthored
feat: metadata access support for table (#111)
Co-authored-by: shuxu.li <shuxu.li@clickzetta.com>
1 parent de2cbf1 commit bfb19f6

File tree

12 files changed

+447
-102
lines changed

12 files changed

+447
-102
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(ICEBERG_SOURCES
3939
sort_field.cc
4040
sort_order.cc
4141
statistics_file.cc
42+
table.cc
4243
table_metadata.cc
4344
transform.cc
4445
transform_function.cc

src/iceberg/table.cc

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/table.h"
21+
22+
#include "iceberg/partition_spec.h"
23+
#include "iceberg/schema.h"
24+
#include "iceberg/sort_order.h"
25+
#include "iceberg/table_metadata.h"
26+
27+
namespace iceberg {
28+
29+
const std::string& Table::uuid() const { return metadata_->table_uuid; }
30+
31+
Result<std::shared_ptr<Schema>> Table::schema() const { return metadata_->Schema(); }
32+
33+
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>&
34+
Table::schemas() const {
35+
if (!schemas_map_) {
36+
schemas_map_ =
37+
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<Schema>>>();
38+
for (const auto& schema : metadata_->schemas) {
39+
if (schema->schema_id()) {
40+
schemas_map_->emplace(schema->schema_id().value(), schema);
41+
}
42+
}
43+
}
44+
return schemas_map_;
45+
}
46+
47+
Result<std::shared_ptr<PartitionSpec>> Table::spec() const {
48+
return metadata_->PartitionSpec();
49+
}
50+
51+
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
52+
Table::specs() const {
53+
if (!partition_spec_map_) {
54+
partition_spec_map_ =
55+
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>();
56+
for (const auto& spec : metadata_->partition_specs) {
57+
partition_spec_map_->emplace(spec->spec_id(), spec);
58+
}
59+
}
60+
return partition_spec_map_;
61+
}
62+
63+
Result<std::shared_ptr<SortOrder>> Table::sort_order() const {
64+
return metadata_->SortOrder();
65+
}
66+
67+
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
68+
Table::sort_orders() const {
69+
if (!sort_orders_map_) {
70+
sort_orders_map_ =
71+
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>();
72+
for (const auto& order : metadata_->sort_orders) {
73+
sort_orders_map_->emplace(order->order_id(), order);
74+
}
75+
}
76+
return sort_orders_map_;
77+
}
78+
79+
const std::unordered_map<std::string, std::string>& Table::properties() const {
80+
return metadata_->properties;
81+
}
82+
83+
const std::string& Table::location() const { return metadata_->location; }
84+
85+
Result<std::shared_ptr<Snapshot>> Table::current_snapshot() const {
86+
return metadata_->Snapshot();
87+
}
88+
89+
Result<std::shared_ptr<Snapshot>> Table::SnapshotById(int64_t snapshot_id) const {
90+
auto iter = std::ranges::find_if(metadata_->snapshots,
91+
[this, &snapshot_id](const auto& snapshot) {
92+
return snapshot->snapshot_id == snapshot_id;
93+
});
94+
if (iter == metadata_->snapshots.end()) {
95+
return NotFound("Snapshot with ID {} is not found", snapshot_id);
96+
}
97+
return *iter;
98+
}
99+
100+
const std::vector<std::shared_ptr<Snapshot>>& Table::snapshots() const {
101+
return metadata_->snapshots;
102+
}
103+
104+
const std::vector<SnapshotLogEntry>& Table::history() const {
105+
return metadata_->snapshot_log;
106+
}
107+
108+
const std::shared_ptr<FileIO>& Table::io() const { return io_; }
109+
110+
} // namespace iceberg

src/iceberg/table.h

Lines changed: 63 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
#pragma once
2121

22-
#include <memory>
2322
#include <string>
2423
#include <unordered_map>
2524
#include <vector>
2625

2726
#include "iceberg/iceberg_export.h"
28-
#include "iceberg/result.h"
27+
#include "iceberg/snapshot.h"
28+
#include "iceberg/table_identifier.h"
2929
#include "iceberg/type_fwd.h"
3030

3131
namespace iceberg {
@@ -35,77 +35,92 @@ class ICEBERG_EXPORT Table {
3535
public:
3636
virtual ~Table() = default;
3737

38-
/// \brief Return the full name for this table
39-
virtual const std::string& name() const = 0;
38+
/// \brief Construct a table.
39+
/// \param[in] identifier The identifier of the table.
40+
/// \param[in] metadata The metadata for the table.
41+
/// \param[in] metadata_location The location of the table metadata file.
42+
/// \param[in] io The FileIO to read and write table data and metadata files.
43+
/// \param[in] catalog The catalog that this table belongs to. If null, the table will
44+
/// be read-only.
45+
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
46+
std::string metadata_location, std::shared_ptr<FileIO> io,
47+
std::shared_ptr<Catalog> catalog)
48+
: identifier_(std::move(identifier)),
49+
metadata_(std::move(metadata)),
50+
metadata_location_(std::move(metadata_location)),
51+
io_(std::move(io)),
52+
catalog_(std::move(catalog)) {};
53+
54+
/// \brief Return the identifier of this table
55+
const TableIdentifier& name() const { return identifier_; }
4056

4157
/// \brief Returns the UUID of the table
42-
virtual const std::string& uuid() const = 0;
58+
const std::string& uuid() const;
4359

44-
/// \brief Refresh the current table metadata
45-
virtual Status Refresh() = 0;
46-
47-
/// \brief Return the schema for this table
48-
virtual const std::shared_ptr<Schema>& schema() const = 0;
60+
/// \brief Return the schema for this table, return NotFoundError if not found
61+
Result<std::shared_ptr<Schema>> schema() const;
4962

5063
/// \brief Return a map of schema for this table
51-
virtual const std::unordered_map<int32_t, std::shared_ptr<Schema>>& schemas() const = 0;
64+
/// \note This method is **not** thread-safe in the current implementation.
65+
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>& schemas()
66+
const;
5267

53-
/// \brief Return the partition spec for this table
54-
virtual const std::shared_ptr<PartitionSpec>& spec() const = 0;
68+
/// \brief Return the partition spec for this table, return NotFoundError if not found
69+
Result<std::shared_ptr<PartitionSpec>> spec() const;
5570

5671
/// \brief Return a map of partition specs for this table
57-
virtual const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs()
58-
const = 0;
72+
/// \note This method is **not** thread-safe in the current implementation.
73+
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
74+
specs() const;
5975

60-
/// \brief Return the sort order for this table
61-
virtual const std::shared_ptr<SortOrder>& sort_order() const = 0;
76+
/// \brief Return the sort order for this table, return NotFoundError if not found
77+
Result<std::shared_ptr<SortOrder>> sort_order() const;
6278

6379
/// \brief Return a map of sort order IDs to sort orders for this table
64-
virtual const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>& sort_orders()
65-
const = 0;
80+
/// \note This method is **not** thread-safe in the current implementation.
81+
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
82+
sort_orders() const;
6683

6784
/// \brief Return a map of string properties for this table
68-
virtual const std::unordered_map<std::string, std::string>& properties() const = 0;
85+
const std::unordered_map<std::string, std::string>& properties() const;
6986

7087
/// \brief Return the table's base location
71-
virtual const std::string& location() const = 0;
88+
const std::string& location() const;
7289

73-
/// \brief Return the table's current snapshot
74-
virtual const std::shared_ptr<Snapshot>& current_snapshot() const = 0;
90+
/// \brief Return the table's current snapshot, return NotFoundError if not found
91+
Result<std::shared_ptr<Snapshot>> current_snapshot() const;
7592

76-
/// \brief Get the snapshot of this table with the given id, or null if there is no
77-
/// matching snapshot
93+
/// \brief Get the snapshot of this table with the given id
7894
///
7995
/// \param snapshot_id the ID of the snapshot to get
80-
/// \return the Snapshot with the given id
81-
virtual Result<std::shared_ptr<Snapshot>> snapshot(int64_t snapshot_id) const = 0;
96+
/// \return the Snapshot with the given id, return NotFoundError if not found
97+
Result<std::shared_ptr<Snapshot>> SnapshotById(int64_t snapshot_id) const;
8298

8399
/// \brief Get the snapshots of this table
84-
virtual const std::vector<std::shared_ptr<Snapshot>>& snapshots() const = 0;
100+
const std::vector<std::shared_ptr<Snapshot>>& snapshots() const;
85101

86102
/// \brief Get the snapshot history of this table
87103
///
88104
/// \return a vector of history entries
89-
virtual const std::vector<std::shared_ptr<HistoryEntry>>& history() const = 0;
90-
91-
/// \brief Create a new table scan for this table
92-
///
93-
/// Once a table scan is created, it can be refined to project columns and filter data.
94-
virtual std::unique_ptr<TableScan> NewScan() const = 0;
95-
96-
/// \brief Create a new append API to add files to this table and commit
97-
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;
98-
99-
/// \brief Create a new transaction API to commit multiple table operations at once
100-
virtual std::unique_ptr<Transaction> NewTransaction() = 0;
101-
102-
/// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an
103-
/// IO-less design in the core library.
104-
// /// \brief Returns a FileIO to read and write table data and metadata files
105-
// virtual std::shared_ptr<FileIO> io() const = 0;
106-
107-
/// \brief Returns a LocationProvider to provide locations for new data files
108-
virtual std::unique_ptr<LocationProvider> location_provider() const = 0;
105+
const std::vector<SnapshotLogEntry>& history() const;
106+
107+
/// \brief Returns a FileIO to read and write table data and metadata files
108+
const std::shared_ptr<FileIO>& io() const;
109+
110+
private:
111+
const TableIdentifier identifier_;
112+
std::shared_ptr<TableMetadata> metadata_;
113+
const std::string metadata_location_;
114+
std::shared_ptr<FileIO> io_;
115+
std::shared_ptr<Catalog> catalog_;
116+
117+
// Cache lazy-initialized maps.
118+
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>
119+
schemas_map_;
120+
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>
121+
partition_spec_map_;
122+
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>
123+
sort_orders_map_;
109124
};
110125

111126
} // namespace iceberg

src/iceberg/table_metadata.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
7676
return *iter;
7777
}
7878

79+
Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
80+
auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) {
81+
return snapshot->snapshot_id == current_snapshot_id;
82+
});
83+
if (iter == snapshots.end()) {
84+
return NotFound("Current snapshot with ID {} is not found", current_snapshot_id);
85+
}
86+
return *iter;
87+
}
88+
7989
namespace {
8090

8191
template <typename T>

src/iceberg/table_metadata.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ struct ICEBERG_EXPORT TableMetadata {
127127
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
128128
/// \brief Get the current sort order, return NotFoundError if not found
129129
Result<std::shared_ptr<SortOrder>> SortOrder() const;
130+
/// \brief Get the current snapshot, return NotFoundError if not found
131+
Result<std::shared_ptr<Snapshot>> Snapshot() const;
130132

131133
friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);
132134
};

src/iceberg/type_fwd.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ class TransformFunction;
9999
struct PartitionStatisticsFile;
100100
struct Snapshot;
101101
struct SnapshotRef;
102+
103+
struct MetadataLogEntry;
104+
struct SnapshotLogEntry;
105+
102106
struct StatisticsFile;
103107
struct TableMetadata;
104108

@@ -113,7 +117,6 @@ enum class TransformType;
113117
/// TODO: Forward declarations below are not added yet.
114118
/// ----------------------------------------------------------------------------
115119

116-
class HistoryEntry;
117120
class StructLike;
118121

119122
class MetadataUpdate;

test/CMakeLists.txt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ target_sources(catalog_test PRIVATE in_memory_catalog_test.cc)
4949
target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
5050
add_test(NAME catalog_test COMMAND catalog_test)
5151

52+
add_executable(table_test)
53+
target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}")
54+
target_sources(table_test PRIVATE test_common.cc json_internal_test.cc table_test.cc
55+
schema_json_test.cc)
56+
target_link_libraries(table_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
57+
add_test(NAME table_test COMMAND table_test)
58+
5259
add_executable(expression_test)
5360
target_sources(expression_test PRIVATE expression_test.cc literal_test.cc)
5461
target_link_libraries(expression_test PRIVATE iceberg_static GTest::gtest_main
@@ -57,8 +64,8 @@ add_test(NAME expression_test COMMAND expression_test)
5764

5865
add_executable(json_serde_test)
5966
target_include_directories(json_serde_test PRIVATE "${CMAKE_BINARY_DIR}")
60-
target_sources(json_serde_test PRIVATE json_internal_test.cc metadata_serde_test.cc
61-
schema_json_test.cc)
67+
target_sources(json_serde_test PRIVATE test_common.cc json_internal_test.cc
68+
metadata_serde_test.cc schema_json_test.cc)
6269
target_link_libraries(json_serde_test PRIVATE iceberg_static GTest::gtest_main
6370
GTest::gmock)
6471
add_test(NAME json_serde_test COMMAND json_serde_test)

test/metadata_io_test.cc

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,27 +50,25 @@ class MetadataIOTest : public TempFileTestBase {
5050
/*optional=*/false);
5151
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);
5252

53-
TableMetadata metadata{
54-
.format_version = 1,
55-
.table_uuid = "1234567890",
56-
.location = "s3://bucket/path",
57-
.last_sequence_number = 0,
58-
.schemas = {schema},
59-
.current_schema_id = 1,
60-
.default_spec_id = 0,
61-
.last_partition_id = 0,
62-
.properties = {{"key", "value"}},
63-
.current_snapshot_id = 3051729675574597004,
64-
.snapshots = {std::make_shared<Snapshot>(Snapshot{
65-
.snapshot_id = 3051729675574597004,
66-
.sequence_number = 0,
67-
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
68-
.manifest_list = "s3://a/b/1.avro",
69-
.summary = {{"operation", "append"}},
70-
})},
71-
.default_sort_order_id = 0,
72-
.next_row_id = 0};
73-
return metadata;
53+
return TableMetadata{.format_version = 1,
54+
.table_uuid = "1234567890",
55+
.location = "s3://bucket/path",
56+
.last_sequence_number = 0,
57+
.schemas = {schema},
58+
.current_schema_id = 1,
59+
.default_spec_id = 0,
60+
.last_partition_id = 0,
61+
.properties = {{"key", "value"}},
62+
.current_snapshot_id = 3051729675574597004,
63+
.snapshots = {std::make_shared<Snapshot>(Snapshot{
64+
.snapshot_id = 3051729675574597004,
65+
.sequence_number = 0,
66+
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
67+
.manifest_list = "s3://a/b/1.avro",
68+
.summary = {{"operation", "append"}},
69+
})},
70+
.default_sort_order_id = 0,
71+
.next_row_id = 0};
7472
}
7573

7674
std::shared_ptr<iceberg::FileIO> io_;

0 commit comments

Comments
 (0)