Skip to content

Commit 657f22f

Browse files
committed
feat: add FileWriter base interface for data file writers
Add iceberg/data subdirectory with FileWriter base interface that defines common operations for writing Iceberg data files, including data files, equality delete files, and position delete files. - Add FileWriter interface with Write, Length, Close, and Metadata methods - Add WriteResult struct to hold metadata for produced files - Add comprehensive unit tests with MockFileWriter implementation - Update build system to include new data subdirectory Related to #441 task 1
1 parent 61a7de5 commit 657f22f

File tree

6 files changed

+358
-0
lines changed

6 files changed

+358
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
2020
set(ICEBERG_SOURCES
2121
arrow_c_data_guard_internal.cc
2222
catalog/memory/in_memory_catalog.cc
23+
data/writer.cc
2324
expression/aggregate.cc
2425
expression/binder.cc
2526
expression/evaluator.cc
@@ -142,6 +143,7 @@ add_iceberg_lib(iceberg
142143
iceberg_install_all_headers(iceberg)
143144

144145
add_subdirectory(catalog)
146+
add_subdirectory(data)
145147
add_subdirectory(expression)
146148
add_subdirectory(manifest)
147149
add_subdirectory(row)

src/iceberg/data/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/data)

src/iceberg/data/writer.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/writer.h"
21+
22+
namespace iceberg {
23+
24+
// FileWriter is a pure virtual interface class.
25+
// Implementations will be provided in subsequent tasks.
26+
27+
} // namespace iceberg

src/iceberg/data/writer.h

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/data/writer.h
23+
/// Base interface for Iceberg data file writers.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <vector>
28+
29+
#include "iceberg/arrow_c_data.h"
30+
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/manifest/manifest_entry.h"
32+
#include "iceberg/result.h"
33+
34+
namespace iceberg {
35+
36+
/// \brief Base interface for data file writers.
37+
///
38+
/// This interface defines the common operations for writing Iceberg data files,
39+
/// including data files, equality delete files, and position delete files.
40+
///
41+
/// Typical usage:
42+
/// 1. Create a writer instance (via concrete implementation)
43+
/// 2. Call Write() one or more times to write data
44+
/// 3. Call Close() to finalize the file
45+
/// 4. Call Metadata() to get file metadata (only valid after Close())
46+
///
47+
/// \note This interface is not thread-safe. Concurrent calls to Write()
48+
/// from multiple threads on the same instance are not supported.
49+
///
50+
/// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata)
51+
/// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which
52+
/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer abstraction,
53+
/// while Writer is the file format-level abstraction.
54+
class ICEBERG_EXPORT FileWriter {
55+
public:
56+
virtual ~FileWriter() = default;
57+
58+
/// \brief Write a batch of records.
59+
///
60+
/// \param data Arrow array containing the records to write.
61+
/// \return Status indicating success or failure.
62+
virtual Status Write(ArrowArray* data) = 0;
63+
64+
/// \brief Get the current number of bytes written.
65+
///
66+
/// \return Result containing the number of bytes written or an error.
67+
virtual Result<int64_t> Length() const = 0;
68+
69+
/// \brief Close the writer and finalize the file.
70+
///
71+
/// \return Status indicating success or failure.
72+
virtual Status Close() = 0;
73+
74+
/// \brief File metadata for all files produced by the writer.
75+
struct ICEBERG_EXPORT WriteResult {
76+
/// Usually a writer produces a single data or delete file.
77+
/// Position delete writer may produce multiple file-scoped delete files.
78+
/// In the future, multiple files can be produced if file rolling is supported.
79+
std::vector<std::shared_ptr<DataFile>> data_files;
80+
};
81+
82+
/// \brief Get file metadata for all files produced by this writer.
83+
///
84+
/// This method should be called after Close() to retrieve the metadata
85+
/// for all files written by this writer.
86+
///
87+
/// \return Result containing the write result or an error.
88+
virtual Result<WriteResult> Metadata() = 0;
89+
};
90+
91+
} // namespace iceberg

src/iceberg/test/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ add_iceberg_test(util_test
113113

114114
add_iceberg_test(roaring_test SOURCES roaring_test.cc)
115115

116+
add_iceberg_test(data_writer_test SOURCES data_writer_test.cc)
117+
116118
if(ICEBERG_BUILD_BUNDLE)
117119
add_iceberg_test(avro_test
118120
USE_BUNDLE
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/writer.h"
21+
22+
#include <memory>
23+
#include <vector>
24+
25+
#include <gmock/gmock.h>
26+
#include <gtest/gtest.h>
27+
28+
#include "iceberg/arrow_c_data.h"
29+
#include "iceberg/manifest/manifest_entry.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/test/matchers.h"
32+
33+
namespace iceberg {
34+
35+
// Mock implementation of FileWriter for testing
36+
class MockFileWriter : public FileWriter {
37+
public:
38+
MockFileWriter() : bytes_written_(0), is_closed_(false), write_count_(0) {}
39+
40+
Status Write(ArrowArray* data) override {
41+
if (is_closed_) {
42+
return Invalid("Writer is closed");
43+
}
44+
if (data == nullptr) {
45+
return Invalid("Null data provided");
46+
}
47+
write_count_++;
48+
// Simulate writing some bytes
49+
bytes_written_ += 1024;
50+
return {};
51+
}
52+
53+
Result<int64_t> Length() const override { return bytes_written_; }
54+
55+
Status Close() override {
56+
if (is_closed_) {
57+
return Invalid("Writer already closed");
58+
}
59+
is_closed_ = true;
60+
return {};
61+
}
62+
63+
Result<WriteResult> Metadata() override {
64+
if (!is_closed_) {
65+
return Invalid("Writer must be closed before getting metadata");
66+
}
67+
68+
WriteResult result;
69+
auto data_file = std::make_shared<DataFile>();
70+
data_file->file_path = "/test/data/file.parquet";
71+
data_file->file_format = FileFormatType::kParquet;
72+
data_file->record_count = write_count_ * 100;
73+
data_file->file_size_in_bytes = bytes_written_;
74+
result.data_files.push_back(data_file);
75+
76+
return result;
77+
}
78+
79+
bool is_closed() const { return is_closed_; }
80+
int32_t write_count() const { return write_count_; }
81+
82+
private:
83+
int64_t bytes_written_;
84+
bool is_closed_;
85+
int32_t write_count_;
86+
};
87+
88+
TEST(FileWriterTest, BasicWriteOperation) {
89+
MockFileWriter writer;
90+
91+
// Create a dummy ArrowArray (normally this would contain actual data)
92+
ArrowArray dummy_array = {};
93+
94+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
95+
ASSERT_EQ(writer.write_count(), 1);
96+
97+
auto length_result = writer.Length();
98+
ASSERT_THAT(length_result, IsOk());
99+
ASSERT_EQ(*length_result, 1024);
100+
}
101+
102+
TEST(FileWriterTest, MultipleWrites) {
103+
MockFileWriter writer;
104+
ArrowArray dummy_array = {};
105+
106+
// Write multiple times
107+
for (int i = 0; i < 5; i++) {
108+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
109+
}
110+
111+
ASSERT_EQ(writer.write_count(), 5);
112+
113+
auto length_result = writer.Length();
114+
ASSERT_THAT(length_result, IsOk());
115+
ASSERT_EQ(*length_result, 5120); // 5 * 1024
116+
}
117+
118+
TEST(FileWriterTest, WriteNullData) {
119+
MockFileWriter writer;
120+
121+
auto status = writer.Write(nullptr);
122+
ASSERT_THAT(status, HasErrorMessage("Null data provided"));
123+
}
124+
125+
TEST(FileWriterTest, CloseWriter) {
126+
MockFileWriter writer;
127+
ArrowArray dummy_array = {};
128+
129+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
130+
ASSERT_FALSE(writer.is_closed());
131+
132+
ASSERT_THAT(writer.Close(), IsOk());
133+
ASSERT_TRUE(writer.is_closed());
134+
}
135+
136+
TEST(FileWriterTest, DoubleClose) {
137+
MockFileWriter writer;
138+
139+
ASSERT_THAT(writer.Close(), IsOk());
140+
auto status = writer.Close();
141+
ASSERT_THAT(status, HasErrorMessage("Writer already closed"));
142+
}
143+
144+
TEST(FileWriterTest, WriteAfterClose) {
145+
MockFileWriter writer;
146+
ArrowArray dummy_array = {};
147+
148+
ASSERT_THAT(writer.Close(), IsOk());
149+
150+
auto status = writer.Write(&dummy_array);
151+
ASSERT_THAT(status, HasErrorMessage("Writer is closed"));
152+
}
153+
154+
TEST(FileWriterTest, MetadataBeforeClose) {
155+
MockFileWriter writer;
156+
ArrowArray dummy_array = {};
157+
158+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
159+
160+
auto metadata_result = writer.Metadata();
161+
ASSERT_THAT(metadata_result, HasErrorMessage("Writer must be closed before getting metadata"));
162+
}
163+
164+
TEST(FileWriterTest, MetadataAfterClose) {
165+
MockFileWriter writer;
166+
ArrowArray dummy_array = {};
167+
168+
// Write some data
169+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
170+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
171+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
172+
173+
// Close the writer
174+
ASSERT_THAT(writer.Close(), IsOk());
175+
176+
// Get metadata
177+
auto metadata_result = writer.Metadata();
178+
ASSERT_THAT(metadata_result, IsOk());
179+
180+
const auto& result = *metadata_result;
181+
ASSERT_EQ(result.data_files.size(), 1);
182+
183+
const auto& data_file = result.data_files[0];
184+
ASSERT_EQ(data_file->file_path, "/test/data/file.parquet");
185+
ASSERT_EQ(data_file->file_format, FileFormatType::kParquet);
186+
ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records
187+
ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024
188+
}
189+
190+
TEST(FileWriterTest, WriteResultStructure) {
191+
FileWriter::WriteResult result;
192+
193+
// Test that WriteResult can hold multiple data files
194+
auto data_file1 = std::make_shared<DataFile>();
195+
data_file1->file_path = "/test/data/file1.parquet";
196+
data_file1->record_count = 100;
197+
198+
auto data_file2 = std::make_shared<DataFile>();
199+
data_file2->file_path = "/test/data/file2.parquet";
200+
data_file2->record_count = 200;
201+
202+
result.data_files.push_back(data_file1);
203+
result.data_files.push_back(data_file2);
204+
205+
ASSERT_EQ(result.data_files.size(), 2);
206+
ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet");
207+
ASSERT_EQ(result.data_files[0]->record_count, 100);
208+
ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet");
209+
ASSERT_EQ(result.data_files[1]->record_count, 200);
210+
}
211+
212+
TEST(FileWriterTest, EmptyWriteResult) {
213+
FileWriter::WriteResult result;
214+
ASSERT_EQ(result.data_files.size(), 0);
215+
ASSERT_TRUE(result.data_files.empty());
216+
}
217+
218+
} // namespace iceberg

0 commit comments

Comments
 (0)