Skip to content

Commit 24182ac

Browse files
committed
feat: add rolling manifest writer
1 parent ff8eea9 commit 24182ac

13 files changed

+589
-6
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ set(ICEBERG_SOURCES
4444
manifest/manifest_list.cc
4545
manifest/manifest_reader.cc
4646
manifest/manifest_writer.cc
47+
manifest/rolling_manifest_writer.cc
4748
manifest/v1_metadata.cc
4849
manifest/v2_metadata.cc
4950
manifest/v3_metadata.cc

src/iceberg/file_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class ICEBERG_EXPORT Writer {
103103
virtual Result<Metrics> metrics() = 0;
104104

105105
/// \brief Get the file length.
106-
/// Only valid after the file is closed.
106+
/// This can be called while the writer is still open or after the file is closed.
107107
virtual Result<int64_t> length() = 0;
108108

109109
/// \brief Returns a list of recommended split locations, if applicable, empty

src/iceberg/manifest/manifest_entry.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ enum class ManifestStatus {
4646

4747
/// \brief Get the relative manifest status type from int
4848
ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
49-
int status) noexcept {
49+
int32_t status) noexcept {
5050
switch (status) {
5151
case 0:
5252
return ManifestStatus::kExisting;
@@ -387,7 +387,7 @@ ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexc
387387

388388
/// \brief Get the relative data file content type from int
389389
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
390-
int content) noexcept {
390+
int32_t content) noexcept {
391391
switch (content) {
392392
case 0:
393393
return DataFile::Content::kData;

src/iceberg/manifest/manifest_writer.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); }
217217

218218
Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }
219219

220+
Result<int64_t> ManifestWriter::length() const { return writer_->length(); }
221+
220222
Result<ManifestFile> ManifestWriter::ToManifestFile() const {
221223
if (!closed_) [[unlikely]] {
222224
return Invalid("Cannot get ManifestFile before closing the writer.");

src/iceberg/manifest/manifest_writer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ class ICEBERG_EXPORT ManifestWriter {
108108
/// \note Only valid after the file is closed.
109109
Result<Metrics> metrics() const;
110110

111+
/// \brief Get the current length of the manifest file in bytes.
112+
/// \return The current length of the file, or an error if the operation fails.
113+
Result<int64_t> length() const;
114+
111115
/// \brief Get the ManifestFile object.
112116
/// \note Only valid after the file is closed.
113117
Result<ManifestFile> ToManifestFile() const;

src/iceberg/manifest/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ install_headers(
2121
'manifest_list.h',
2222
'manifest_reader.h',
2323
'manifest_writer.h',
24+
'rolling_manifest_writer.h',
2425
],
2526
subdir: 'iceberg/manifest',
2627
)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest/rolling_manifest_writer.h"
21+
22+
#include "iceberg/manifest/manifest_entry.h"
23+
#include "iceberg/result.h"
24+
#include "iceberg/util/macros.h"
25+
26+
namespace iceberg {
27+
28+
RollingManifestWriter::RollingManifestWriter(
29+
ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes)
30+
: manifest_writer_factory_(std::move(manifest_writer_factory)),
31+
target_file_size_in_bytes_(target_file_size_in_bytes) {}
32+
33+
RollingManifestWriter::~RollingManifestWriter() {
34+
// Ensure we close the current writer if not already closed
35+
if (!closed_) {
36+
(void)CloseCurrentWriter();
37+
}
38+
}
39+
40+
Status RollingManifestWriter::WriteAddedEntry(
41+
std::shared_ptr<DataFile> file, std::optional<int64_t> data_sequence_number) {
42+
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
43+
ICEBERG_RETURN_UNEXPECTED(
44+
writer->WriteAddedEntry(std::move(file), data_sequence_number));
45+
current_file_rows_++;
46+
return {};
47+
}
48+
49+
Status RollingManifestWriter::WriteExistingEntry(
50+
std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
51+
int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) {
52+
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
53+
ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(
54+
std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number));
55+
current_file_rows_++;
56+
return {};
57+
}
58+
59+
Status RollingManifestWriter::WriteDeletedEntry(
60+
std::shared_ptr<DataFile> file, int64_t data_sequence_number,
61+
std::optional<int64_t> file_sequence_number) {
62+
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
63+
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(
64+
std::move(file), data_sequence_number, file_sequence_number));
65+
current_file_rows_++;
66+
return {};
67+
}
68+
69+
Status RollingManifestWriter::Close() {
70+
if (!closed_) {
71+
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
72+
closed_ = true;
73+
}
74+
return {};
75+
}
76+
77+
Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() const {
78+
if (!closed_) {
79+
return Invalid("Cannot get ManifestFile list from unclosed writer");
80+
}
81+
return manifest_files_;
82+
}
83+
84+
Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() {
85+
if (current_writer_ == nullptr) {
86+
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
87+
} else if (ShouldRollToNewFile()) {
88+
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
89+
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
90+
}
91+
92+
return current_writer_.get();
93+
}
94+
95+
bool RollingManifestWriter::ShouldRollToNewFile() const {
96+
if (current_writer_ == nullptr) {
97+
return false;
98+
}
99+
// Roll when row count is a multiple of ROWS_DIVISOR and file size >= target
100+
if (current_file_rows_ % kRowsDivisor == 0) {
101+
auto length_result = current_writer_->length();
102+
if (length_result.has_value()) {
103+
return length_result.value() >= target_file_size_in_bytes_;
104+
}
105+
// If we can't get the length, don't roll
106+
}
107+
return false;
108+
}
109+
110+
Status RollingManifestWriter::CloseCurrentWriter() {
111+
if (current_writer_ != nullptr) {
112+
ICEBERG_RETURN_UNEXPECTED(current_writer_->Close());
113+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, current_writer_->ToManifestFile());
114+
manifest_files_.push_back(std::move(manifest_file));
115+
current_writer_.reset();
116+
current_file_rows_ = 0;
117+
}
118+
return {};
119+
}
120+
121+
} // namespace iceberg
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/manifest/rolling_manifest_writer.h
23+
/// Rolling manifest writer that can produce multiple manifest files.
24+
25+
#include <functional>
26+
#include <memory>
27+
#include <vector>
28+
29+
#include "iceberg/iceberg_export.h"
30+
#include "iceberg/manifest/manifest_entry.h"
31+
#include "iceberg/manifest/manifest_list.h"
32+
#include "iceberg/manifest/manifest_writer.h"
33+
#include "iceberg/result.h"
34+
35+
namespace iceberg {
36+
37+
/// \brief A rolling manifest writer that can produce multiple manifest files.
38+
///
39+
/// As opposed to ManifestWriter, a rolling writer could produce multiple manifest
40+
/// files.
41+
class ICEBERG_EXPORT RollingManifestWriter {
42+
public:
43+
/// \brief Factory function type for creating ManifestWriter instances.
44+
using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>()>;
45+
46+
/// \brief Construct a rolling manifest writer.
47+
/// \param manifest_writer_factory Factory function to create new ManifestWriter
48+
/// instances.
49+
/// \param target_file_size_in_bytes Target file size in bytes. When the current
50+
/// file reaches this size (and row count is a multiple of 250), a new file
51+
/// will be created.
52+
RollingManifestWriter(ManifestWriterFactory manifest_writer_factory,
53+
int64_t target_file_size_in_bytes);
54+
55+
~RollingManifestWriter();
56+
57+
/// \brief Add an added entry for a file.
58+
///
59+
/// \param file a data file
60+
/// \return Status::OK() if the entry was written successfully
61+
/// \note The entry's snapshot ID will be this manifest's snapshot ID. The
62+
/// entry's data sequence number will be the provided data sequence number.
63+
/// The entry's file sequence number will be assigned at commit.
64+
Status WriteAddedEntry(std::shared_ptr<DataFile> file,
65+
std::optional<int64_t> data_sequence_number = std::nullopt);
66+
67+
/// \brief Add an existing entry for a file.
68+
///
69+
/// \param file an existing data file
70+
/// \param file_snapshot_id snapshot ID when the data file was added to the table
71+
/// \param data_sequence_number a data sequence number of the file (assigned when
72+
/// the file was added)
73+
/// \param file_sequence_number a file sequence number (assigned when the file
74+
/// was added)
75+
/// \return Status::OK() if the entry was written successfully
76+
/// \note The original data and file sequence numbers, snapshot ID, which were
77+
/// assigned at commit, must be preserved when adding an existing entry.
78+
Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
79+
int64_t data_sequence_number,
80+
std::optional<int64_t> file_sequence_number = std::nullopt);
81+
82+
/// \brief Add a delete entry for a file.
83+
///
84+
/// \param file a deleted data file
85+
/// \param data_sequence_number a data sequence number of the file (assigned when
86+
/// the file was added)
87+
/// \param file_sequence_number a file sequence number (assigned when the file
88+
/// was added)
89+
/// \return Status::OK() if the entry was written successfully
90+
/// \note The entry's snapshot ID will be this manifest's snapshot ID. However,
91+
/// the original data and file sequence numbers of the file must be preserved
92+
/// when the file is marked as deleted.
93+
Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t data_sequence_number,
94+
std::optional<int64_t> file_sequence_number = std::nullopt);
95+
96+
/// \brief Close the rolling manifest writer.
97+
Status Close();
98+
99+
/// \brief Get the list of manifest files produced by this writer.
100+
/// \return A vector of ManifestFile objects
101+
/// \note Only valid after the writer is closed.
102+
Result<std::vector<ManifestFile>> ToManifestFiles() const;
103+
104+
private:
105+
/// \brief Get or create the current writer, rolling to a new file if needed.
106+
/// \return The current ManifestWriter, or an error if creation fails
107+
Result<ManifestWriter*> CurrentWriter();
108+
109+
/// \brief Check if we should roll to a new file.
110+
///
111+
/// This method checks if the current file has reached the target size
112+
/// or the number of rows has reached the threshold. If so, it rolls to a new file.
113+
bool ShouldRollToNewFile() const;
114+
115+
/// \brief Close the current writer and add its ManifestFile to the list.
116+
Status CloseCurrentWriter();
117+
118+
static constexpr int64_t kRowsDivisor = 250;
119+
120+
ManifestWriterFactory manifest_writer_factory_;
121+
int64_t target_file_size_in_bytes_;
122+
std::vector<ManifestFile> manifest_files_;
123+
124+
int64_t current_file_rows_{0};
125+
std::unique_ptr<ManifestWriter> current_writer_{nullptr};
126+
bool closed_{false};
127+
};
128+
129+
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ iceberg_sources = files(
6666
'manifest/manifest_list.cc',
6767
'manifest/manifest_reader.cc',
6868
'manifest/manifest_writer.cc',
69+
'manifest/rolling_manifest_writer.cc',
6970
'manifest/v1_metadata.cc',
7071
'manifest/v2_metadata.cc',
7172
'manifest/v3_metadata.cc',

src/iceberg/test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ if(ICEBERG_BUILD_BUNDLE)
146146
manifest_list_versions_test.cc
147147
manifest_reader_stats_test.cc
148148
manifest_reader_test.cc
149-
manifest_writer_versions_test.cc)
149+
manifest_writer_versions_test.cc
150+
rolling_manifest_writer_test.cc)
150151

151152
add_iceberg_test(parquet_test
152153
USE_BUNDLE

0 commit comments

Comments
 (0)