Skip to content

Commit bbdb227

Browse files
authored
feat: add rolling manifest writer (#443)
1 parent fea93b1 commit bbdb227

13 files changed

+593
-13
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ set(ICEBERG_SOURCES
4444
manifest/manifest_list.cc
4545
manifest/manifest_reader.cc
4646
manifest/manifest_writer.cc
47+
manifest/rolling_manifest_writer.cc
4748
manifest/v1_metadata.cc
4849
manifest/v2_metadata.cc
4950
manifest/v3_metadata.cc

src/iceberg/file_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class ICEBERG_EXPORT Writer {
103103
virtual Result<Metrics> metrics() = 0;
104104

105105
/// \brief Get the file length.
106-
/// Only valid after the file is closed.
106+
/// This can be called while the writer is still open or after the file is closed.
107107
virtual Result<int64_t> length() = 0;
108108

109109
/// \brief Returns a list of recommended split locations, if applicable, empty

src/iceberg/manifest/manifest_entry.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ enum class ManifestStatus {
4646

4747
/// \brief Get the relative manifest status type from int
4848
ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
49-
int status) noexcept {
49+
int32_t status) noexcept {
5050
switch (status) {
5151
case 0:
5252
return ManifestStatus::kExisting;
@@ -387,7 +387,7 @@ ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexc
387387

388388
/// \brief Get the relative data file content type from int
389389
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
390-
int content) noexcept {
390+
int32_t content) noexcept {
391391
switch (content) {
392392
case 0:
393393
return DataFile::Content::kData;

src/iceberg/manifest/manifest_writer.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); }
217217

218218
Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }
219219

220+
Result<int64_t> ManifestWriter::length() const { return writer_->length(); }
221+
220222
Result<ManifestFile> ManifestWriter::ToManifestFile() const {
221223
if (!closed_) [[unlikely]] {
222224
return Invalid("Cannot get ManifestFile before closing the writer.");

src/iceberg/manifest/manifest_writer.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class ICEBERG_EXPORT ManifestWriter {
4141

4242
/// \brief Write the entry that all its fields are populated correctly.
4343
/// \param entry Manifest entry to write.
44-
/// \return Status::OK() if entry was written successfully
44+
/// \return Status indicating success or failure
4545
/// \note All other write entry variants delegate to this method after populating
4646
/// the necessary fields.
4747
Status WriteEntry(const ManifestEntry& entry);
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT ManifestWriter {
5050
///
5151
/// \param file an added data file
5252
/// \param data_sequence_number a data sequence number for the file
53-
/// \return Status::OK() if the entry was written successfully
53+
/// \return Status indicating success or failure
5454
/// \note The entry's snapshot ID will be this manifest's snapshot ID. The entry's data
5555
/// sequence number will be the provided data sequence number. The entry's file sequence
5656
/// number will be assigned at commit.
@@ -67,7 +67,7 @@ class ICEBERG_EXPORT ManifestWriter {
6767
/// file was added)
6868
/// \param file_sequence_number a file sequence number (assigned when the file was
6969
/// added)
70-
/// \return Status::OK() if the entry was written successfully
70+
/// \return Status indicating success or failure
7171
/// \note The original data and file sequence numbers, snapshot ID, which were assigned
7272
/// at commit, must be preserved when adding an existing entry.
7373
Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
@@ -83,7 +83,7 @@ class ICEBERG_EXPORT ManifestWriter {
8383
/// file was added)
8484
/// \param file_sequence_number a file sequence number (assigned when the file was
8585
/// added)
86-
/// \return Status::OK() if the entry was written successfully
86+
/// \return Status indicating success or failure
8787
/// \note The entry's snapshot ID will be this manifest's snapshot ID. However, the
8888
/// original data and file sequence numbers of the file must be preserved when the file
8989
/// is marked as deleted.
@@ -95,7 +95,7 @@ class ICEBERG_EXPORT ManifestWriter {
9595

9696
/// \brief Write manifest entries to file.
9797
/// \param entries Already populated manifest entries to write.
98-
/// \return Status::OK() if all entries were written successfully
98+
/// \return Status indicating success or failure
9999
Status AddAll(const std::vector<ManifestEntry>& entries);
100100

101101
/// \brief Close writer and flush to storage.
@@ -108,6 +108,10 @@ class ICEBERG_EXPORT ManifestWriter {
108108
/// \note Only valid after the file is closed.
109109
Result<Metrics> metrics() const;
110110

111+
/// \brief Get the current length of the manifest file in bytes.
112+
/// \return The current length of the file, or an error if the operation fails.
113+
Result<int64_t> length() const;
114+
111115
/// \brief Get the ManifestFile object.
112116
/// \note Only valid after the file is closed.
113117
Result<ManifestFile> ToManifestFile() const;
@@ -187,12 +191,12 @@ class ICEBERG_EXPORT ManifestListWriter {
187191

188192
/// \brief Write manifest file to manifest list file.
189193
/// \param file Manifest file to write.
190-
/// \return Status::OK() if file was written successfully
194+
/// \return Status indicating success or failure
191195
Status Add(const ManifestFile& file);
192196

193197
/// \brief Write manifest file list to manifest list file.
194198
/// \param files Manifest file list to write.
195-
/// \return Status::OK() if all files were written successfully
199+
/// \return Status indicating success or failure
196200
Status AddAll(const std::vector<ManifestFile>& files);
197201

198202
/// \brief Close writer and flush to storage.

src/iceberg/manifest/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ install_headers(
2121
'manifest_list.h',
2222
'manifest_reader.h',
2323
'manifest_writer.h',
24+
'rolling_manifest_writer.h',
2425
],
2526
subdir: 'iceberg/manifest',
2627
)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest/rolling_manifest_writer.h"
21+
22+
#include "iceberg/manifest/manifest_entry.h"
23+
#include "iceberg/result.h"
24+
#include "iceberg/util/macros.h"
25+
26+
namespace iceberg {
27+
28+
RollingManifestWriter::RollingManifestWriter(
29+
ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes)
30+
: manifest_writer_factory_(std::move(manifest_writer_factory)),
31+
target_file_size_in_bytes_(target_file_size_in_bytes) {}
32+
33+
RollingManifestWriter::~RollingManifestWriter() {
34+
// Ensure we close the current writer if not already closed
35+
std::ignore = Close();
36+
}
37+
38+
Status RollingManifestWriter::WriteAddedEntry(
39+
std::shared_ptr<DataFile> file, std::optional<int64_t> data_sequence_number) {
40+
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
41+
ICEBERG_RETURN_UNEXPECTED(
42+
writer->WriteAddedEntry(std::move(file), data_sequence_number));
43+
current_file_rows_++;
44+
return {};
45+
}
46+
47+
Status RollingManifestWriter::WriteExistingEntry(
48+
std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
49+
int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) {
50+
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
51+
ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(
52+
std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number));
53+
current_file_rows_++;
54+
return {};
55+
}
56+
57+
Status RollingManifestWriter::WriteDeletedEntry(
58+
std::shared_ptr<DataFile> file, int64_t data_sequence_number,
59+
std::optional<int64_t> file_sequence_number) {
60+
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
61+
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(
62+
std::move(file), data_sequence_number, file_sequence_number));
63+
current_file_rows_++;
64+
return {};
65+
}
66+
67+
Status RollingManifestWriter::Close() {
68+
if (!closed_) {
69+
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
70+
closed_ = true;
71+
}
72+
return {};
73+
}
74+
75+
Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() const {
76+
if (!closed_) {
77+
return Invalid("Cannot get ManifestFile list from unclosed writer");
78+
}
79+
return manifest_files_;
80+
}
81+
82+
Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() {
83+
if (current_writer_ == nullptr) {
84+
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
85+
} else if (ShouldRollToNewFile()) {
86+
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
87+
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
88+
}
89+
90+
return current_writer_.get();
91+
}
92+
93+
bool RollingManifestWriter::ShouldRollToNewFile() const {
94+
if (current_writer_ == nullptr) {
95+
return false;
96+
}
97+
// Roll when row count is a multiple of the divisor and file size >= target
98+
if (current_file_rows_ % kRowsDivisor == 0) {
99+
auto length_result = current_writer_->length();
100+
if (length_result.has_value()) {
101+
return length_result.value() >= target_file_size_in_bytes_;
102+
}
103+
// TODO(anyone): If we can't get the length, don't roll for now, revisit this later.
104+
}
105+
return false;
106+
}
107+
108+
Status RollingManifestWriter::CloseCurrentWriter() {
109+
if (current_writer_ != nullptr) {
110+
ICEBERG_RETURN_UNEXPECTED(current_writer_->Close());
111+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, current_writer_->ToManifestFile());
112+
manifest_files_.push_back(std::move(manifest_file));
113+
current_writer_.reset();
114+
current_file_rows_ = 0;
115+
}
116+
return {};
117+
}
118+
119+
} // namespace iceberg
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/manifest/rolling_manifest_writer.h
23+
/// Rolling manifest writer that can produce multiple manifest files.
24+
25+
#include <functional>
26+
#include <memory>
27+
#include <vector>
28+
29+
#include "iceberg/iceberg_export.h"
30+
#include "iceberg/manifest/manifest_entry.h"
31+
#include "iceberg/manifest/manifest_list.h"
32+
#include "iceberg/manifest/manifest_writer.h"
33+
#include "iceberg/result.h"
34+
35+
namespace iceberg {
36+
37+
/// \brief A rolling manifest writer that can produce multiple manifest files.
38+
class ICEBERG_EXPORT RollingManifestWriter {
39+
public:
40+
/// \brief Factory function type for creating ManifestWriter instances.
41+
using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>()>;
42+
43+
/// \brief Construct a rolling manifest writer.
44+
/// \param manifest_writer_factory Factory function to create new ManifestWriter
45+
/// instances.
46+
/// \param target_file_size_in_bytes Target file size in bytes. When the current
47+
/// file reaches this size (and row count is a multiple of 250), a new file
48+
/// will be created.
49+
RollingManifestWriter(ManifestWriterFactory manifest_writer_factory,
50+
int64_t target_file_size_in_bytes);
51+
52+
~RollingManifestWriter();
53+
54+
/// \brief Add an added entry for a file.
55+
///
56+
/// \param file a data file
57+
/// \return Status indicating success or failure
58+
/// \note The entry's snapshot ID will be this manifest's snapshot ID. The
59+
/// entry's data sequence number will be the provided data sequence number.
60+
/// The entry's file sequence number will be assigned at commit.
61+
Status WriteAddedEntry(std::shared_ptr<DataFile> file,
62+
std::optional<int64_t> data_sequence_number = std::nullopt);
63+
64+
/// \brief Add an existing entry for a file.
65+
///
66+
/// \param file an existing data file
67+
/// \param file_snapshot_id snapshot ID when the data file was added to the table
68+
/// \param data_sequence_number a data sequence number of the file (assigned when
69+
/// the file was added)
70+
/// \param file_sequence_number a file sequence number (assigned when the file
71+
/// was added)
72+
/// \return Status indicating success or failure
73+
/// \note The original data and file sequence numbers, snapshot ID, which were
74+
/// assigned at commit, must be preserved when adding an existing entry.
75+
Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
76+
int64_t data_sequence_number,
77+
std::optional<int64_t> file_sequence_number = std::nullopt);
78+
79+
/// \brief Add a delete entry for a file.
80+
///
81+
/// \param file a deleted data file
82+
/// \param data_sequence_number a data sequence number of the file (assigned when
83+
/// the file was added)
84+
/// \param file_sequence_number a file sequence number (assigned when the file
85+
/// was added)
86+
/// \return Status indicating success or failure
87+
/// \note The entry's snapshot ID will be this manifest's snapshot ID. However,
88+
/// the original data and file sequence numbers of the file must be preserved
89+
/// when the file is marked as deleted.
90+
Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t data_sequence_number,
91+
std::optional<int64_t> file_sequence_number = std::nullopt);
92+
93+
/// \brief Close the rolling manifest writer.
94+
Status Close();
95+
96+
/// \brief Get the list of manifest files produced by this writer.
97+
/// \return A vector of ManifestFile objects
98+
/// \note Only valid after the writer is closed.
99+
Result<std::vector<ManifestFile>> ToManifestFiles() const;
100+
101+
private:
102+
/// \brief Get or create the current writer, rolling to a new file if needed.
103+
/// \return The current ManifestWriter, or an error if creation fails
104+
Result<ManifestWriter*> CurrentWriter();
105+
106+
/// \brief Check if we should roll to a new file.
107+
///
108+
/// This method checks if the current file has reached the target size
109+
/// or the number of rows has reached the threshold. If so, it rolls to a new file.
110+
bool ShouldRollToNewFile() const;
111+
112+
/// \brief Close the current writer and add its ManifestFile to the list.
113+
Status CloseCurrentWriter();
114+
115+
/// \brief The number of rows after which to consider rolling to a new file.
116+
/// \note This aligned with Iceberg's Java impl.
117+
static constexpr int64_t kRowsDivisor = 250;
118+
119+
ManifestWriterFactory manifest_writer_factory_;
120+
int64_t target_file_size_in_bytes_;
121+
std::vector<ManifestFile> manifest_files_;
122+
123+
int64_t current_file_rows_{0};
124+
std::unique_ptr<ManifestWriter> current_writer_{nullptr};
125+
bool closed_{false};
126+
};
127+
128+
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ iceberg_sources = files(
6666
'manifest/manifest_list.cc',
6767
'manifest/manifest_reader.cc',
6868
'manifest/manifest_writer.cc',
69+
'manifest/rolling_manifest_writer.cc',
6970
'manifest/v1_metadata.cc',
7071
'manifest/v2_metadata.cc',
7172
'manifest/v3_metadata.cc',

src/iceberg/test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ if(ICEBERG_BUILD_BUNDLE)
146146
manifest_list_versions_test.cc
147147
manifest_reader_stats_test.cc
148148
manifest_reader_test.cc
149-
manifest_writer_versions_test.cc)
149+
manifest_writer_versions_test.cc
150+
rolling_manifest_writer_test.cc)
150151

151152
add_iceberg_test(parquet_test
152153
USE_BUNDLE

0 commit comments

Comments
 (0)