libertaria-stack/core/l0-transport/opq/manager.zig

164 lines
5.7 KiB
Zig

//! RFC-0020: OPQ (Offline Packet Queue) - Manager
//!
//! Orchestrates the flow of frames into the store, enforcing quotas and TTLs.
const std = @import("std");
const store = @import("./store.zig");
const quota = @import("./quota.zig");
const manifest = @import("./manifest.zig");
const sequencer = @import("./sequencer.zig");
const trust_resolver = @import("./trust_resolver.zig");
const lwf = @import("lwf");
pub const OPQManager = struct {
allocator: std.mem.Allocator,
policy: quota.Policy,
store: store.WALStore,
index: std.ArrayListUnmanaged(manifest.PacketSummary),
trust_resolver: trust_resolver.TrustResolver,
pub fn init(allocator: std.mem.Allocator, base_dir: []const u8, persona: quota.Persona, resolver: trust_resolver.TrustResolver) !OPQManager {
const policy = quota.Policy.init(persona);
const wal = try store.WALStore.init(allocator, base_dir, policy.segment_size);
return OPQManager{
.allocator = allocator,
.policy = policy,
.store = wal,
.index = .{},
.trust_resolver = resolver,
};
}
pub fn deinit(self: *OPQManager) void {
self.store.deinit();
self.index.deinit(self.allocator);
}
/// Ingest a frame into the queue
pub fn ingestFrame(self: *OPQManager, frame: *const lwf.LWFFrame) !void {
// 1. Resolve Trust Category
const category = self.trust_resolver.resolve(frame.header.source_hint);
// 2. Resource Triage (Mechanism: Drop low-trust if busy)
// In a real implementation, we'd check current_total_size vs policy.
// For now, we allow the ingestion and rely on maintenance to prune.
// 3. Append to WAL
const loc = try self.store.appendFrame(frame);
// 2. Update In-Memory Index (Summary)
// Note: In real scenarios, queue_id should be deterministic or from header.
// For now, we use a random ID or part of checksum.
var q_id: [16]u8 = undefined;
std.crypto.random.bytes(&q_id);
try self.index.append(self.allocator, .{
.queue_id = q_id,
.sender_hint = frame.header.source_hint,
.size = @intCast(loc.len),
.priority = if (frame.header.flags & lwf.LWFFlags.PRIORITY != 0) .high else .normal,
.created_at = std.time.timestamp(),
.timestamp = frame.header.timestamp,
.sequence = frame.header.sequence,
.expires_at = std.time.timestamp() + self.policy.max_retention_seconds,
.entropy_cost = frame.header.entropy_difficulty,
.category = category,
});
// 5. Periodic maintenance
try self.maintenance();
}
pub fn generateManifest(self: *OPQManager, recipient: [24]u8) !manifest.QueueManifest {
var qm = manifest.QueueManifest.init(self.allocator, recipient);
errdefer qm.deinit();
for (self.index.items) |item| {
// In a real relay, we would filter by recipient!
// For now, we just add everything to the manifest.
try qm.items.append(self.allocator, item);
qm.total_count += 1;
qm.total_size += item.size;
}
sequencer.sortDeterministically(qm.items.items);
try qm.calculateMerkleRoot();
return qm;
}
pub fn maintenance(self: *OPQManager) !void {
// 1. Prune by TTL
_ = try self.store.prune(self.policy.max_retention_seconds);
// 2. Prune by Size Quota
_ = try self.store.pruneToSize(self.policy.max_storage_bytes);
}
};
test "OPQ Manager: Policy Enforcement" {
const allocator = std.testing.allocator;
const test_dir = "test_opq_manager";
std.fs.cwd().deleteTree(test_dir) catch {};
defer std.fs.cwd().deleteTree(test_dir) catch {};
// 1. Client Policy: 5MB limit, 1hr TTL
var manager = try OPQManager.init(allocator, test_dir, .client, trust_resolver.TrustResolver.noop());
defer manager.deinit();
try std.testing.expectEqual(manager.policy.max_storage_bytes, 5 * 1024 * 1024);
// 2. Ingest Sample Frame
var frame = try lwf.LWFFrame.init(allocator, 10);
defer frame.deinit(allocator);
try manager.ingestFrame(&frame);
// 3. Generate Manifest
const recipient = [_]u8{0} ** 24;
var mf = try manager.generateManifest(recipient);
defer mf.deinit();
try std.testing.expectEqual(mf.total_count, 1);
try std.testing.expect(mf.total_size > 0);
try std.testing.expect(!std.mem.eql(u8, &mf.merkle_root, &[_]u8{0} ** 32));
}
test "OPQ Manager: Deterministic Manifest Ordering" {
const allocator = std.testing.allocator;
const test_dir = "test_opq_ordering";
std.fs.cwd().deleteTree(test_dir) catch {};
defer std.fs.cwd().deleteTree(test_dir) catch {};
var manager = try OPQManager.init(allocator, test_dir, .relay, trust_resolver.TrustResolver.noop());
defer manager.deinit();
// 1. Ingest frames out of order
// Frame A: Time 200, Seq 2
var f1 = try lwf.LWFFrame.init(allocator, 10);
defer f1.deinit(allocator);
f1.header.timestamp = 200;
f1.header.sequence = 2;
f1.updateChecksum();
try manager.ingestFrame(&f1);
// Frame B: Time 100, Seq 1 (Should come first)
var f2 = try lwf.LWFFrame.init(allocator, 10);
defer f2.deinit(allocator);
f2.header.timestamp = 100;
f2.header.sequence = 1;
f2.updateChecksum();
try manager.ingestFrame(&f2);
// 2. Generate Manifest
const recipient = [_]u8{0} ** 24;
var mf = try manager.generateManifest(recipient);
defer mf.deinit();
// 3. Verify Order: item[0] should be timestamp 100
try std.testing.expectEqual(mf.items.items[0].timestamp, 100);
try std.testing.expectEqual(mf.items.items[1].timestamp, 200);
}