Skip to content

Commit c71a1e4

Browse files
committed
feat:add segment storage
Signed-off-by: Chen Kai <281165273grape@gmail.com>
1 parent 90644a7 commit c71a1e4

File tree

2 files changed

+316
-1
lines changed

2 files changed

+316
-1
lines changed

src/root.zig

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ pub const deposit_helper = @import("consensus/helpers/deposit.zig");
3131
pub const voluntary_exit_helper = @import("consensus/helpers/voluntary_exit.zig");
3232
pub const justification_finalization_helper = @import("consensus/helpers/justification_finalization.zig");
3333
pub const finality = @import("consensus/helpers/finality.zig");
34-
pub const rewards_penalties = @import("consensus/helpers/rewards_penalties.zig");
34+
pub const rewards_penalties_helper = @import("consensus/helpers/rewards_penalties.zig");
35+
pub const segment_storage = @import("storage/segment_storage.zig");
3536

3637
test {
3738
@import("std").testing.refAllDeclsRecursive(@This());

src/storage/segment_storage.zig

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
const std = @import("std");
2+
const fs = std.fs;
3+
const mmap = std.c.mmap;
4+
const munmap = std.c.munmap;
5+
const crc32 = std.hash.Crc32;
6+
const MAP = std.c.MAP;
7+
8+
pub const Record = struct {
9+
offset: u64,
10+
crc: u32,
11+
size: u32,
12+
timestamp: i64,
13+
data: []const u8,
14+
15+
pub fn init(data: []const u8) Record {
16+
return .{
17+
.offset = 0,
18+
.crc = std.hash.Crc32.hash(data),
19+
.size = @intCast(data.len),
20+
.timestamp = std.time.milliTimestamp(),
21+
.data = data,
22+
};
23+
}
24+
};
25+
26+
pub const SegmentFile = struct {
27+
file: fs.File,
28+
mmap_data: []u8,
29+
position: u64,
30+
31+
pub fn init(path: []const u8, size: u64) !SegmentFile {
32+
const file = try fs.createFileAbsolute(path, .{
33+
.read = true,
34+
.truncate = false,
35+
});
36+
try file.setEndPos(size);
37+
38+
const mmap_data = mmap(null, size, std.c.PROT.READ | std.c.PROT.WRITE, .{ .TYPE = .SHARED }, file.handle, 0);
39+
return .{
40+
.file = file,
41+
.mmap_data = @as([*]u8, @ptrCast(mmap_data))[0..size],
42+
.position = 0,
43+
};
44+
}
45+
46+
pub fn deinit(self: *SegmentFile) void {
47+
self.file.sync() catch {
48+
// ignore
49+
};
50+
_ = munmap(@alignCast(@ptrCast(self.mmap_data.ptr)), self.mmap_data.len);
51+
self.file.close();
52+
}
53+
};
54+
55+
pub const Segment = struct {
56+
base_offset: u64,
57+
data_file: SegmentFile,
58+
index_file: SegmentFile,
59+
mutex: std.Thread.Mutex,
60+
61+
pub fn init(dir: []const u8, base_offset: u64, config: Log.Config, allocator: std.mem.Allocator) !Segment {
62+
const data_path = try std.fmt.allocPrint(
63+
allocator,
64+
"{s}/{d}.data",
65+
.{ dir, base_offset },
66+
);
67+
defer allocator.free(data_path);
68+
69+
const index_path = try std.fmt.allocPrint(
70+
allocator,
71+
"{s}/{d}.index",
72+
.{ dir, base_offset },
73+
);
74+
defer allocator.free(index_path);
75+
76+
const data_file = try SegmentFile.init(data_path, config.segment_size);
77+
const index_file = try SegmentFile.init(index_path, config.index_size);
78+
79+
return .{
80+
.base_offset = base_offset,
81+
.data_file = data_file,
82+
.index_file = index_file,
83+
.mutex = std.Thread.Mutex{},
84+
};
85+
}
86+
pub fn append(self: *Segment, record: Record) !u64 {
87+
self.mutex.lock();
88+
defer self.mutex.unlock();
89+
90+
// Write record header
91+
try self.writeHeader(record);
92+
93+
// Write data
94+
const data_offset = self.data_file.position;
95+
@memcpy(
96+
self.data_file.mmap_data[data_offset..][0..record.data.len],
97+
record.data,
98+
);
99+
100+
// Update index
101+
try self.updateIndex(record.offset, data_offset);
102+
103+
self.data_file.position += record.data.len;
104+
return record.offset;
105+
}
106+
fn writeHeader(self: *Segment, record: Record) !void {
107+
var header: [16]u8 = undefined;
108+
std.mem.writeInt(u32, header[0..4], record.crc, .little);
109+
std.mem.writeInt(u32, header[4..8], record.size, .little);
110+
std.mem.writeInt(i64, header[8..16], record.timestamp, .little);
111+
112+
const pos = self.data_file.position;
113+
@memcpy(self.data_file.mmap_data[pos..][0..16], &header);
114+
self.data_file.position += 16;
115+
}
116+
117+
fn updateIndex(self: *Segment, offset: u64, position: u64) !void {
118+
const relative_offset = offset - self.base_offset;
119+
const index_position = relative_offset * @sizeOf(u64);
120+
121+
// Write position to index file
122+
std.mem.writeInt(u64, self.index_file.mmap_data[index_position..][0..8], position, .little);
123+
124+
self.index_file.position = @max(self.index_file.position, index_position + @sizeOf(u64));
125+
}
126+
127+
pub fn read(self: *Segment, offset: u64) !Record {
128+
self.mutex.lock();
129+
defer self.mutex.unlock();
130+
131+
// Read from index to get position
132+
const position = try self.readIndex(offset);
133+
134+
// Read record header
135+
const header = try self.readHeader(position);
136+
137+
// Read data
138+
const data = self.data_file.mmap_data[position + 16 ..][0..header.size];
139+
140+
// Add CRC validation in read() method
141+
if (crc32.hash(data) != header.crc) {
142+
return error.InvalidCrc;
143+
}
144+
145+
return Record{
146+
.offset = offset,
147+
.crc = header.crc,
148+
.size = header.size,
149+
.timestamp = header.timestamp,
150+
.data = data,
151+
};
152+
}
153+
154+
fn readHeader(self: *Segment, position: u64) !struct { crc: u32, size: u32, timestamp: i64 } {
155+
const header_data = self.data_file.mmap_data[position..][0..16];
156+
157+
return .{
158+
.crc = @as(u32, @bitCast(std.mem.readInt(u32, header_data[0..4], .little))),
159+
.size = @as(u32, @bitCast(std.mem.readInt(u32, header_data[4..8], .little))),
160+
.timestamp = @as(i64, @bitCast(std.mem.readInt(i64, header_data[8..16], .little))),
161+
};
162+
}
163+
164+
fn readIndex(self: *Segment, offset: u64) !u64 {
165+
const relative_offset = offset - self.base_offset;
166+
const index_position = relative_offset * @sizeOf(u64);
167+
168+
if (index_position >= self.index_file.position) {
169+
return error.OffsetNotFound;
170+
}
171+
172+
return std.mem.readInt(u64, self.index_file.mmap_data[index_position..][0..8], .little);
173+
}
174+
175+
pub fn deinit(self: *Segment) void {
176+
self.data_file.deinit();
177+
self.index_file.deinit();
178+
}
179+
};
180+
181+
pub const SegmentError = error{
182+
InvalidCrc,
183+
OffsetNotFound,
184+
InvalidConfig,
185+
BufferOverflow,
186+
};
187+
188+
pub const Log = struct {
189+
dir: []const u8,
190+
segments: std.ArrayList(Segment),
191+
config: Config,
192+
mutex: std.Thread.Mutex,
193+
194+
pub const Config = struct {
195+
segment_size: u64,
196+
index_size: u64,
197+
max_segments: u32,
198+
compress: bool,
199+
200+
pub fn validate(self: Config) SegmentError!void {
201+
if (self.segment_size == 0 or self.index_size == 0 or self.max_segments == 0) {
202+
return SegmentError.InvalidConfig;
203+
}
204+
if (self.segment_size > std.math.maxInt(u32)) {
205+
return SegmentError.InvalidConfig;
206+
}
207+
}
208+
};
209+
210+
pub fn init(dir: []const u8, config: Config, allocator: std.mem.Allocator) !Log {
211+
try config.validate();
212+
try fs.makeDirAbsolute(dir);
213+
214+
return .{
215+
.dir = dir,
216+
.segments = std.ArrayList(Segment).init(allocator),
217+
.config = config,
218+
.mutex = std.Thread.Mutex{},
219+
};
220+
}
221+
222+
pub fn append(self: *Log, data: []const u8) !u64 {
223+
if (data.len > self.config.segment_size) {
224+
return SegmentError.BufferOverflow;
225+
}
226+
227+
self.mutex.lock();
228+
defer self.mutex.unlock();
229+
230+
if (self.segments.items.len == 0 or self.shouldRotate()) {
231+
try self.rotate();
232+
}
233+
234+
var record = Record.init(data);
235+
if (self.config.compress) {
236+
record.data = try self.compress(data);
237+
}
238+
239+
const last_segment = &self.segments.items[self.segments.items.len - 1];
240+
return last_segment.append(record);
241+
}
242+
243+
pub fn read(self: *Log, offset: u64) !Record {
244+
self.mutex.lock();
245+
defer self.mutex.unlock();
246+
247+
for (self.segments.items) |*segment| {
248+
if (offset >= segment.base_offset and
249+
offset < segment.base_offset + segment.data_file.position)
250+
{
251+
return segment.read(offset);
252+
}
253+
}
254+
255+
return error.OffsetNotFound;
256+
}
257+
258+
fn rotate(self: *Log) !void {
259+
const base_offset = if (self.segments.items.len == 0)
260+
0
261+
else
262+
self.segments.items[self.segments.items.len - 1].base_offset +
263+
self.segments.items[self.segments.items.len - 1].data_file.position;
264+
265+
const segment = try Segment.init(self.dir, base_offset, self.config, self.segments.allocator);
266+
try self.segments.append(segment);
267+
268+
if (self.segments.items.len > self.config.max_segments) {
269+
var oldest = self.segments.orderedRemove(0);
270+
oldest.deinit();
271+
}
272+
}
273+
274+
fn shouldRotate(self: *Log) bool {
275+
if (self.segments.items.len == 0) {
276+
return true;
277+
}
278+
279+
const current = &self.segments.items[self.segments.items.len - 1];
280+
const size = current.data_file.position;
281+
282+
return size >= self.config.segment_size;
283+
}
284+
285+
fn compress(self: *Log, data: []const u8) ![]const u8 {
286+
if (data.len > self.config.segment_size) {
287+
return SegmentError.BufferOverflow;
288+
}
289+
var out = std.ArrayList(u8).init(self.segments.allocator);
290+
var comp = try std.compress.zlib.compressor(out.writer(), .{});
291+
_ = try comp.write(data);
292+
try comp.flush();
293+
try comp.finish();
294+
295+
return out.toOwnedSlice();
296+
}
297+
298+
fn decompress(self: *Log, data: []const u8) ![]const u8 {
299+
var decomp = std.compress.zlib.decompressor(std.io.fixedBufferStream(data).reader());
300+
defer decomp.deinit();
301+
302+
var out = std.ArrayList(u8).init(self.segments.allocator);
303+
try decomp.decompress(data, &out);
304+
305+
return out.toOwnedSlice();
306+
}
307+
308+
pub fn deinit(self: *Log) void {
309+
for (self.segments.items) |*segment| {
310+
segment.deinit();
311+
}
312+
self.segments.deinit();
313+
}
314+
};

0 commit comments

Comments
 (0)