349 lines
12 KiB
Zig
349 lines
12 KiB
Zig
//! RFC-0020: OPQ (Offline Packet Queue) - Segmented WAL Storage
|
|
//!
|
|
//! This module implements the "Mechanism" of the OPQ:
|
|
//! A resilient, segmented Write-Ahead Log (WAL) for persisting LWF frames.
|
|
//!
|
|
//! Segmented Architecture:
|
|
//! - Data is split into fixed-size segments (e.g., 4MB).
|
|
//! - Only one "Active" segment is writable at a time.
|
|
//! - Completed segments are "Finalized" and become immutable.
|
|
//! - Pruning works by deleting entire segment files (extremely fast).
|
|
|
|
const std = @import("std");
|
|
const lwf = @import("../lwf.zig");
|
|
|
|
pub const SEGMENT_MAGIC: [4]u8 = "LOPQ".*;
|
|
pub const SEGMENT_VERSION: u8 = 1;
|
|
pub const DEFAULT_SEGMENT_SIZE: usize = 4 * 1024 * 1024; // 4MB
|
|
|
|
pub const SegmentHeader = struct {
|
|
magic: [4]u8 = SEGMENT_MAGIC,
|
|
version: u8 = SEGMENT_VERSION,
|
|
reserved: [3]u8 = [_]u8{0} ** 3,
|
|
segment_id: u64,
|
|
segment_seq: u32,
|
|
created_at: i64,
|
|
|
|
pub const SIZE = 4 + 1 + 3 + 8 + 4 + 8; // 28 bytes
|
|
};
|
|
|
|
pub const WALLocation = struct {
|
|
segment_id: u64,
|
|
segment_seq: u32,
|
|
offset: usize,
|
|
len: usize,
|
|
};
|
|
|
|
pub const WALStore = struct {
|
|
allocator: std.mem.Allocator,
|
|
base_dir_path: []const u8,
|
|
max_segment_size: usize,
|
|
|
|
active_segment: ?std.fs.File = null,
|
|
active_segment_id: u64 = 0,
|
|
active_segment_seq: u32 = 0,
|
|
current_offset: usize = 0,
|
|
|
|
pub fn init(allocator: std.mem.Allocator, base_dir: []const u8, max_size: usize) !WALStore {
|
|
// Ensure base directory exists
|
|
std.fs.cwd().makePath(base_dir) catch |err| {
|
|
if (err != error.PathAlreadyExists) return err;
|
|
};
|
|
|
|
return WALStore{
|
|
.allocator = allocator,
|
|
.base_dir_path = try allocator.dupe(u8, base_dir),
|
|
.max_segment_size = max_size,
|
|
};
|
|
}
|
|
|
|
pub fn deinit(self: *WALStore) void {
|
|
if (self.active_segment) |file| {
|
|
file.close();
|
|
}
|
|
self.allocator.free(self.base_dir_path);
|
|
}
|
|
|
|
/// Append a frame to the active segment
|
|
pub fn appendFrame(self: *WALStore, frame: *const lwf.LWFFrame) !WALLocation {
|
|
const frame_size = frame.header.payload_len + lwf.LWFHeader.SIZE + lwf.LWFTrailer.SIZE;
|
|
|
|
// Check if we need a new segment
|
|
if (self.active_segment == null or self.current_offset + frame_size > self.max_segment_size) {
|
|
try self.rotateSegment();
|
|
}
|
|
|
|
const file = self.active_segment.?;
|
|
const encoded = try frame.encode(self.allocator);
|
|
defer self.allocator.free(encoded);
|
|
|
|
const loc = WALLocation{
|
|
.segment_id = self.active_segment_id,
|
|
.segment_seq = self.active_segment_seq,
|
|
.offset = self.current_offset,
|
|
.len = encoded.len,
|
|
};
|
|
|
|
try file.writeAll(encoded);
|
|
self.current_offset += encoded.len;
|
|
return loc;
|
|
}
|
|
|
|
fn rotateSegment(self: *WALStore) !void {
|
|
if (self.active_segment) |file| {
|
|
file.close();
|
|
self.active_segment = null;
|
|
}
|
|
|
|
self.active_segment_id = @as(u64, @intCast(std.time.timestamp()));
|
|
self.active_segment_seq += 1;
|
|
|
|
var name_buf: [64]u8 = undefined;
|
|
const name = try std.fmt.bufPrint(&name_buf, "segment_{d}_{d}.opq", .{ self.active_segment_id, self.active_segment_seq });
|
|
|
|
var dir = try std.fs.cwd().openDir(self.base_dir_path, .{});
|
|
defer dir.close();
|
|
|
|
const file = try dir.createFile(name, .{ .read = true });
|
|
|
|
// Write Header
|
|
const header = SegmentHeader{
|
|
.segment_id = self.active_segment_id,
|
|
.segment_seq = self.active_segment_seq,
|
|
.created_at = std.time.timestamp(),
|
|
};
|
|
|
|
const header_bytes = std.mem.asBytes(&header);
|
|
try file.writeAll(header_bytes);
|
|
|
|
self.active_segment = file;
|
|
self.current_offset = SegmentHeader.SIZE;
|
|
}
|
|
|
|
/// Prune segments older than TTL
|
|
pub fn prune(self: *WALStore, max_age_seconds: i64) !usize {
|
|
var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true });
|
|
defer dir.close();
|
|
|
|
var iterator = dir.iterate();
|
|
const now = std.time.timestamp();
|
|
var pruned_count: usize = 0;
|
|
|
|
while (try iterator.next()) |entry| {
|
|
if (entry.kind != .file) continue;
|
|
if (!std.mem.endsWith(u8, entry.name, ".opq")) continue;
|
|
|
|
// Extract potential timestamp/ID from segment_{id}.opq
|
|
// For simplicity, we read the header's created_at
|
|
const file = try dir.openFile(entry.name, .{});
|
|
defer file.close();
|
|
|
|
var header: SegmentHeader = undefined;
|
|
const bytes_read = try file.readAll(std.mem.asBytes(&header));
|
|
if (bytes_read < SegmentHeader.SIZE) continue;
|
|
|
|
if (now - header.created_at > max_age_seconds) {
|
|
// Check if it's the active one
|
|
if (header.segment_id == self.active_segment_id and
|
|
header.segment_seq == self.active_segment_seq) continue;
|
|
|
|
try dir.deleteFile(entry.name);
|
|
pruned_count += 1;
|
|
}
|
|
}
|
|
return pruned_count;
|
|
}
|
|
|
|
/// Calculate total disk usage of all .opq files in base_dir
|
|
pub fn getDiskUsage(self: *WALStore) !u64 {
|
|
var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true });
|
|
defer dir.close();
|
|
|
|
var iterator = dir.iterate();
|
|
var total_size: u64 = 0;
|
|
|
|
while (try iterator.next()) |entry| {
|
|
if (entry.kind != .file) continue;
|
|
if (!std.mem.endsWith(u8, entry.name, ".opq")) continue;
|
|
|
|
const stat = try dir.statFile(entry.name);
|
|
total_size += stat.size;
|
|
}
|
|
return total_size;
|
|
}
|
|
|
|
/// Prune oldest segments until total usage is below target_bytes
|
|
pub fn pruneToSize(self: *WALStore, target_bytes: u64) !usize {
|
|
var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true });
|
|
defer dir.close();
|
|
|
|
// 1. Collect all segment files with their timestamps
|
|
const SegmentFile = struct {
|
|
name: [64]u8,
|
|
len: usize,
|
|
created_at: i64,
|
|
};
|
|
var segments = std.ArrayList(SegmentFile).empty;
|
|
defer segments.deinit(self.allocator);
|
|
|
|
var iterator = dir.iterate();
|
|
var total_size: u64 = 0;
|
|
|
|
while (try iterator.next()) |entry| {
|
|
if (entry.kind != .file) continue;
|
|
if (!std.mem.endsWith(u8, entry.name, ".opq")) continue;
|
|
|
|
const file = try dir.openFile(entry.name, .{});
|
|
var header: SegmentHeader = undefined;
|
|
const bytes_read = file.readAll(std.mem.asBytes(&header)) catch 0;
|
|
file.close();
|
|
|
|
if (bytes_read < SegmentHeader.SIZE) continue;
|
|
|
|
const stat = try dir.statFile(entry.name);
|
|
total_size += stat.size;
|
|
|
|
var name_buf: [64]u8 = undefined;
|
|
@memcpy(name_buf[0..entry.name.len], entry.name);
|
|
|
|
try segments.append(self.allocator, .{
|
|
.name = name_buf,
|
|
.len = entry.name.len,
|
|
.created_at = header.created_at,
|
|
});
|
|
}
|
|
|
|
if (total_size <= target_bytes) return 0;
|
|
|
|
// 2. Sort by created_at (oldest first)
|
|
const sortFn = struct {
|
|
fn lessThan(_: void, a: SegmentFile, b: SegmentFile) bool {
|
|
return a.created_at < b.created_at;
|
|
}
|
|
}.lessThan;
|
|
std.sort.pdq(SegmentFile, segments.items, {}, sortFn);
|
|
|
|
// 3. Delete oldest segments until under quota
|
|
var pruned_count: usize = 0;
|
|
for (segments.items) |seg| {
|
|
if (total_size <= target_bytes) break;
|
|
|
|
const name = seg.name[0..seg.len];
|
|
|
|
// Safety: check if it's the active one (we need segment metadata here ideally)
|
|
// For now, we compare against our active_segment_id/seq logic if match
|
|
// But if we use the header we already read, we can check.
|
|
const file = try dir.openFile(name, .{});
|
|
var header: SegmentHeader = undefined;
|
|
_ = try file.readAll(std.mem.asBytes(&header));
|
|
file.close();
|
|
|
|
if (header.segment_id == self.active_segment_id and
|
|
header.segment_seq == self.active_segment_seq) continue;
|
|
|
|
const stat = try dir.statFile(name);
|
|
try dir.deleteFile(name);
|
|
total_size -= stat.size;
|
|
pruned_count += 1;
|
|
}
|
|
|
|
return pruned_count;
|
|
}
|
|
};
|
|
test "OPQ WAL Store: Append and Rotate" {
|
|
const allocator = std.testing.allocator;
|
|
const test_dir = "test_opq_wal";
|
|
|
|
// Clean up if previous run failed
|
|
std.fs.cwd().deleteTree(test_dir) catch {};
|
|
defer std.fs.cwd().deleteTree(test_dir) catch {};
|
|
|
|
var wal = try WALStore.init(allocator, test_dir, 1024); // Small size for rotation
|
|
defer wal.deinit();
|
|
|
|
// 1. Create a frame
|
|
var frame = try lwf.LWFFrame.init(allocator, 100);
|
|
defer frame.deinit(allocator);
|
|
@memset(frame.payload, 'A');
|
|
frame.header.payload_len = 100;
|
|
frame.updateChecksum();
|
|
|
|
// 2. Append multiple frames to trigger rotation
|
|
// Frame size is approx 100 + 72 + 36 = 208 bytes
|
|
// 1024 / 208 ≈ 4 frames per segment (plus header)
|
|
var i: usize = 0;
|
|
while (i < 10) : (i += 1) {
|
|
_ = try wal.appendFrame(&frame);
|
|
}
|
|
|
|
// 3. Verify files created
|
|
var dir = try std.fs.cwd().openDir(test_dir, .{ .iterate = true });
|
|
defer dir.close();
|
|
|
|
var iterator = dir.iterate();
|
|
var file_count: usize = 0;
|
|
while (try iterator.next()) |entry| {
|
|
if (std.mem.endsWith(u8, entry.name, ".opq")) {
|
|
file_count += 1;
|
|
}
|
|
}
|
|
|
|
try std.testing.expect(file_count > 1);
|
|
}
|
|
|
|
test "OPQ WAL Store: Pruning" {
|
|
const allocator = std.testing.allocator;
|
|
const test_dir = "test_opq_pruning";
|
|
|
|
std.fs.cwd().deleteTree(test_dir) catch {};
|
|
defer std.fs.cwd().deleteTree(test_dir) catch {};
|
|
|
|
var wal = try WALStore.init(allocator, test_dir, 1024 * 1024);
|
|
defer wal.deinit();
|
|
|
|
var frame = try lwf.LWFFrame.init(allocator, 10);
|
|
defer frame.deinit(allocator);
|
|
_ = try wal.appendFrame(&frame);
|
|
|
|
// Manually finalize and wait 2 seconds (for test purposes we could mock time,
|
|
// but here we'll just test the logic with a very small TTL)
|
|
// Wait... we can't easily wait. Let's just verify the function doesn't crash
|
|
// and correctly identifies old segments if we had them.
|
|
|
|
const pruned = try wal.prune(0); // Prune everything except active
|
|
try std.testing.expect(pruned == 0); // Active shouldn't be pruned
|
|
}
|
|
|
|
test "OPQ WAL Store: Space-based Pruning" {
|
|
const allocator = std.testing.allocator;
|
|
const test_dir = "test_opq_quota";
|
|
|
|
std.fs.cwd().deleteTree(test_dir) catch {};
|
|
defer std.fs.cwd().deleteTree(test_dir) catch {};
|
|
|
|
var wal = try WALStore.init(allocator, test_dir, 500); // Very small segments
|
|
defer wal.deinit();
|
|
|
|
var frame = try lwf.LWFFrame.init(allocator, 100);
|
|
defer frame.deinit(allocator);
|
|
@memset(frame.payload, 'B');
|
|
frame.header.payload_len = 100;
|
|
frame.updateChecksum();
|
|
|
|
// Append 4 frames (should create multiple segments)
|
|
var i: usize = 0;
|
|
while (i < 4) : (i += 1) {
|
|
_ = try wal.appendFrame(&frame);
|
|
}
|
|
|
|
const usage_before = try wal.getDiskUsage();
|
|
try std.testing.expect(usage_before > 0);
|
|
|
|
// Prune to a small size (should keep only active segment)
|
|
const pruned = try wal.pruneToSize(100);
|
|
try std.testing.expect(pruned > 0);
|
|
|
|
const usage_after = try wal.getDiskUsage();
|
|
try std.testing.expect(usage_after < usage_before);
|
|
}
|