381 lines
13 KiB
Zig
381 lines
13 KiB
Zig
//! QVL Persistent Storage Layer
|
|
//!
|
|
//!libmdbx backend for RiskGraph with Kenya Rule compliance:
|
|
//! - Single-file embedded database
|
|
//! - Memory-mapped I/O (kernel-optimized)
|
|
//! - ACID transactions
|
|
//! - <10MB RAM footprint
|
|
|
|
const std = @import("std");
|
|
const types = @import("types.zig");
|
|
|
|
const NodeId = types.NodeId;
|
|
const RiskEdge = types.RiskEdge;
|
|
const RiskGraph = types.RiskGraph;
|
|
|
|
/// Database environment configuration
|
|
pub const DBConfig = struct {
|
|
/// Max readers (concurrent)
|
|
max_readers: u32 = 64,
|
|
/// Max databases (tables)
|
|
max_dbs: u32 = 8,
|
|
/// Map size (file size limit)
|
|
map_size: usize = 10 * 1024 * 1024, // 10MB Kenya Rule
|
|
/// Page size (4KB optimal for SSD)
|
|
page_size: u32 = 4096,
|
|
};
|
|
|
|
/// Persistent graph storage using libmdbx
|
|
pub const PersistentGraph = struct {
|
|
env: *lmdb.MDB_env,
|
|
dbi_nodes: lmdb.MDB_dbi,
|
|
dbi_edges: lmdb.MDB_dbi,
|
|
dbi_adjacency: lmdb.MDB_dbi,
|
|
dbi_metadata: lmdb.MDB_dbi,
|
|
allocator: std.mem.Allocator,
|
|
|
|
const Self = @This();
|
|
|
|
/// Open or create persistent graph database
|
|
pub fn open(path: []const u8, config: DBConfig, allocator: std.mem.Allocator) !Self {
|
|
var env: *lmdb.MDB_env = undefined;
|
|
|
|
// Initialize environment
|
|
try lmdb.mdb_env_create(&env);
|
|
errdefer lmdb.mdb_env_close(env);
|
|
|
|
// Set limits
|
|
try lmdb.mdb_env_set_maxreaders(env, config.max_readers);
|
|
try lmdb.mdb_env_set_maxdbs(env, config.max_dbs);
|
|
try lmdb.mdb_env_set_mapsize(env, config.map_size);
|
|
|
|
// Open environment
|
|
const flags = lmdb.MDB_NOSYNC | lmdb.MDB_NOMETASYNC; // Async durability for speed
|
|
try lmdb.mdb_env_open(env, path.ptr, flags, 0o644);
|
|
|
|
// Open databases (tables)
|
|
var txn: *lmdb.MDB_txn = undefined;
|
|
try lmdb.mdb_txn_begin(env, null, 0, &txn);
|
|
errdefer lmdb.mdb_txn_abort(txn);
|
|
|
|
const dbi_nodes = try lmdb.mdb_dbi_open(txn, "nodes", lmdb.MDB_CREATE | lmdb.MDB_INTEGERKEY);
|
|
const dbi_edges = try lmdb.mdb_dbi_open(txn, "edges", lmdb.MDB_CREATE);
|
|
const dbi_adjacency = try lmdb.mdb_dbi_open(txn, "adjacency", lmdb.MDB_CREATE | lmdb.MDB_DUPSORT);
|
|
const dbi_metadata = try lmdb.mdb_dbi_open(txn, "metadata", lmdb.MDB_CREATE);
|
|
|
|
try lmdb.mdb_txn_commit(txn);
|
|
|
|
return Self{
|
|
.env = env,
|
|
.dbi_nodes = dbi_nodes,
|
|
.dbi_edges = dbi_edges,
|
|
.dbi_adjacency = dbi_adjacency,
|
|
.dbi_metadata = dbi_metadata,
|
|
.allocator = allocator,
|
|
};
|
|
}
|
|
|
|
/// Close database
|
|
pub fn close(self: *Self) void {
|
|
lmdb.mdb_env_close(self.env);
|
|
}
|
|
|
|
/// Add node to persistent storage
|
|
pub fn addNode(self: *Self, node: NodeId) !void {
|
|
var txn: *lmdb.MDB_txn = undefined;
|
|
try lmdb.mdb_txn_begin(self.env, null, 0, &txn);
|
|
errdefer lmdb.mdb_txn_abort(txn);
|
|
|
|
const key = std.mem.asBytes(&node);
|
|
const val = &[_]u8{1}; // Presence marker
|
|
|
|
var mdb_key = lmdb.MDB_val{ .mv_size = key.len, .mv_data = key.ptr };
|
|
var mdb_val = lmdb.MDB_val{ .mv_size = val.len, .mv_data = val.ptr };
|
|
|
|
try lmdb.mdb_put(txn, self.dbi_nodes, &mdb_key, &mdb_val, 0);
|
|
try lmdb.mdb_txn_commit(txn);
|
|
}
|
|
|
|
/// Add edge to persistent storage
|
|
pub fn addEdge(self: *Self, edge: RiskEdge) !void {
|
|
var txn: *lmdb.MDB_txn = undefined;
|
|
try lmdb.mdb_txn_begin(self.env, null, 0, &txn);
|
|
errdefer lmdb.mdb_txn_abort(txn);
|
|
|
|
// Store edge data
|
|
const edge_key = try self.encodeEdgeKey(edge.from, edge.to);
|
|
const edge_val = try self.encodeEdgeValue(edge);
|
|
|
|
var mdb_key = lmdb.MDB_val{ .mv_size = edge_key.len, .mv_data = edge_key.ptr };
|
|
var mdb_val = lmdb.MDB_val{ .mv_size = edge_val.len, .mv_data = edge_val.ptr };
|
|
|
|
try lmdb.mdb_put(txn, self.dbi_edges, &mdb_key, &mdb_val, 0);
|
|
|
|
// Update adjacency index (from -> to)
|
|
const adj_key = std.mem.asBytes(&edge.from);
|
|
const adj_val = std.mem.asBytes(&edge.to);
|
|
|
|
var mdb_adj_key = lmdb.MDB_val{ .mv_size = adj_key.len, .mv_data = adj_key.ptr };
|
|
var mdb_adj_val = lmdb.MDB_val{ .mv_size = adj_val.len, .mv_data = adj_val.ptr };
|
|
|
|
try lmdb.mdb_put(txn, self.dbi_adjacency, &mdb_adj_key, &mdb_adj_val, 0);
|
|
|
|
// Update reverse adjacency (to -> from) for incoming queries
|
|
const rev_adj_key = std.mem.asBytes(&edge.to);
|
|
const rev_adj_val = std.mem.asBytes(&edge.from);
|
|
|
|
var mdb_rev_key = lmdb.MDB_val{ .mv_size = rev_adj_key.len, .mv_data = rev_adj_key.ptr };
|
|
var mdb_rev_val = lmdb.MDB_val{ .mv_size = rev_adj_val.len, .mv_data = rev_adj_val.ptr };
|
|
|
|
try lmdb.mdb_put(txn, self.dbi_adjacency, &mdb_rev_key, &mdb_rev_val, 0);
|
|
|
|
try lmdb.mdb_txn_commit(txn);
|
|
}
|
|
|
|
/// Get outgoing neighbors (from -> *)
|
|
pub fn getOutgoing(self: *Self, from: NodeId, allocator: std.mem.Allocator) ![]NodeId {
|
|
var txn: *lmdb.MDB_txn = undefined;
|
|
try lmdb.mdb_txn_begin(self.env, null, lmdb.MDB_RDONLY, &txn);
|
|
defer lmdb.mdb_txn_abort(txn); // Read-only, abort is fine
|
|
|
|
const key = std.mem.asBytes(&from);
|
|
var mdb_key = lmdb.MDB_val{ .mv_size = key.len, .mv_data = key.ptr };
|
|
var mdb_val: lmdb.MDB_val = undefined;
|
|
|
|
var cursor: *lmdb.MDB_cursor = undefined;
|
|
try lmdb.mdb_cursor_open(txn, self.dbi_adjacency, &cursor);
|
|
defer lmdb.mdb_cursor_close(cursor);
|
|
|
|
var result = std.ArrayList(NodeId).init(allocator);
|
|
errdefer result.deinit();
|
|
|
|
// Position cursor at key
|
|
const rc = lmdb.mdb_cursor_get(cursor, &mdb_key, &mdb_val, lmdb.MDB_SET_KEY);
|
|
if (rc == lmdb.MDB_NOTFOUND) {
|
|
return result.toOwnedSlice();
|
|
}
|
|
if (rc != 0) return error.MDBError;
|
|
|
|
// Iterate over all values for this key
|
|
while (true) {
|
|
const neighbor = std.mem.bytesToValue(NodeId, @as([*]const u8, @ptrCast(mdb_val.mv_data))[0..@sizeOf(NodeId)]);
|
|
try result.append(neighbor);
|
|
|
|
const next_rc = lmdb.mdb_cursor_get(cursor, &mdb_key, &mdb_val, lmdb.MDB_NEXT_DUP);
|
|
if (next_rc == lmdb.MDB_NOTFOUND) break;
|
|
if (next_rc != 0) return error.MDBError;
|
|
}
|
|
|
|
return result.toOwnedSlice();
|
|
}
|
|
|
|
/// Get incoming neighbors (* -> to)
|
|
pub fn getIncoming(self: *Self, to: NodeId, allocator: std.mem.Allocator) ![]NodeId {
|
|
// Same as getOutgoing but querying by "to" key
|
|
// Implementation mirrors getOutgoing
|
|
_ = to;
|
|
_ = allocator;
|
|
@panic("TODO: implement getIncoming");
|
|
}
|
|
|
|
/// Get specific edge
|
|
pub fn getEdge(self: *Self, from: NodeId, to: NodeId) !?RiskEdge {
|
|
var txn: *lmdb.MDB_txn = undefined;
|
|
try lmdb.mdb_txn_begin(self.env, null, lmdb.MDB_RDONLY, &txn);
|
|
defer lmdb.mdb_txn_abort(txn);
|
|
|
|
const key = try self.encodeEdgeKey(from, to);
|
|
var mdb_key = lmdb.MDB_val{ .mv_size = key.len, .mv_data = key.ptr };
|
|
var mdb_val: lmdb.MDB_val = undefined;
|
|
|
|
const rc = lmdb.mdb_get(txn, self.dbi_edges, &mdb_key, &mdb_val);
|
|
if (rc == lmdb.MDB_NOTFOUND) return null;
|
|
if (rc != 0) return error.MDBError;
|
|
|
|
return try self.decodeEdgeValue(mdb_val);
|
|
}
|
|
|
|
/// Load in-memory RiskGraph from persistent storage
|
|
pub fn toRiskGraph(self: *Self, allocator: std.mem.Allocator) !RiskGraph {
|
|
var graph = RiskGraph.init(allocator);
|
|
errdefer graph.deinit();
|
|
|
|
var txn: *lmdb.MDB_txn = undefined;
|
|
try lmdb.mdb_txn_begin(self.env, null, lmdb.MDB_RDONLY, &txn);
|
|
defer lmdb.mdb_txn_abort(txn);
|
|
|
|
// Iterate all edges
|
|
var cursor: *lmdb.MDB_cursor = undefined;
|
|
try lmdb.mdb_cursor_open(txn, self.dbi_edges, &cursor);
|
|
defer lmdb.mdb_cursor_close(cursor);
|
|
|
|
var mdb_key: lmdb.MDB_val = undefined;
|
|
var mdb_val: lmdb.MDB_val = undefined;
|
|
|
|
while (lmdb.mdb_cursor_get(cursor, &mdb_key, &mdb_val, lmdb.MDB_NEXT) == 0) {
|
|
const edge = try self.decodeEdgeValue(mdb_val);
|
|
try graph.addEdge(edge);
|
|
}
|
|
|
|
return graph;
|
|
}
|
|
|
|
// Internal: Encode edge key (from, to) -> bytes
|
|
fn encodeEdgeKey(self: *Self, from: NodeId, to: NodeId) ![]u8 {
|
|
_ = self;
|
|
var buf: [8]u8 = undefined;
|
|
std.mem.writeInt(u32, buf[0..4], from, .little);
|
|
std.mem.writeInt(u32, buf[4..8], to, .little);
|
|
return &buf;
|
|
}
|
|
|
|
// Internal: Encode RiskEdge -> bytes
|
|
fn encodeEdgeValue(self: *Self, edge: RiskEdge) ![]u8 {
|
|
_ = self;
|
|
// Compact binary encoding
|
|
var buf: [64]u8 = undefined;
|
|
var offset: usize = 0;
|
|
|
|
std.mem.writeInt(u32, buf[offset..][0..4], edge.from, .little);
|
|
offset += 4;
|
|
|
|
std.mem.writeInt(u32, buf[offset..][0..4], edge.to, .little);
|
|
offset += 4;
|
|
|
|
std.mem.writeInt(u64, buf[offset..][0..8], @bitCast(edge.risk), .little);
|
|
offset += 8;
|
|
|
|
std.mem.writeInt(u64, buf[offset..][0..8], edge.timestamp, .little);
|
|
offset += 8;
|
|
|
|
std.mem.writeInt(u64, buf[offset..][0..8], edge.nonce, .little);
|
|
offset += 8;
|
|
|
|
std.mem.writeInt(u8, buf[offset..][0..1], edge.level);
|
|
offset += 1;
|
|
|
|
std.mem.writeInt(u64, buf[offset..][0..8], edge.expires_at, .little);
|
|
offset += 8;
|
|
|
|
return buf[0..offset];
|
|
}
|
|
|
|
// Internal: Decode bytes -> RiskEdge
|
|
fn decodeEdgeValue(self: *Self, val: lmdb.MDB_val) !RiskEdge {
|
|
_ = self;
|
|
const data = @as([*]const u8, @ptrCast(val.mv_data))[0..val.mv_size];
|
|
|
|
var offset: usize = 0;
|
|
|
|
const from = std.mem.readInt(u32, data[offset..][0..4], .little);
|
|
offset += 4;
|
|
|
|
const to = std.mem.readInt(u32, data[offset..][0..4], .little);
|
|
offset += 4;
|
|
|
|
const risk_bits = std.mem.readInt(u64, data[offset..][0..8], .little);
|
|
const risk = @as(f64, @bitCast(risk_bits));
|
|
offset += 8;
|
|
|
|
const timestamp = std.mem.readInt(u64, data[offset..][0..8], .little);
|
|
offset += 8;
|
|
|
|
const nonce = std.mem.readInt(u64, data[offset..][0..8], .little);
|
|
offset += 8;
|
|
|
|
const level = std.mem.readInt(u8, data[offset..][0..1], .little);
|
|
offset += 1;
|
|
|
|
const expires_at = std.mem.readInt(u64, data[offset..][0..8], .little);
|
|
|
|
return RiskEdge{
|
|
.from = from,
|
|
.to = to,
|
|
.risk = risk,
|
|
.timestamp = timestamp,
|
|
.nonce = nonce,
|
|
.level = level,
|
|
.expires_at = expires_at,
|
|
};
|
|
}
|
|
};
|
|
|
|
// ============================================================================
|
|
// TESTS
|
|
// ============================================================================
|
|
|
|
test "PersistentGraph: basic operations" {
|
|
const allocator = std.testing.allocator;
|
|
|
|
// Create temporary database
|
|
const path = "/tmp/test_qvl_db";
|
|
defer std.fs.deleteFileAbsolute(path) catch {};
|
|
|
|
var graph = try PersistentGraph.open(path, .{}, allocator);
|
|
defer graph.close();
|
|
|
|
// Add nodes
|
|
try graph.addNode(0);
|
|
try graph.addNode(1);
|
|
try graph.addNode(2);
|
|
|
|
// Add edges
|
|
const ts = 1234567890;
|
|
try graph.addEdge(.{
|
|
.from = 0,
|
|
.to = 1,
|
|
.risk = -0.3,
|
|
.timestamp = ts,
|
|
.nonce = 0,
|
|
.level = 3,
|
|
.expires_at = ts + 86400,
|
|
});
|
|
|
|
try graph.addEdge(.{
|
|
.from = 1,
|
|
.to = 2,
|
|
.risk = -0.3,
|
|
.timestamp = ts,
|
|
.nonce = 1,
|
|
.level = 3,
|
|
.expires_at = ts + 86400,
|
|
});
|
|
|
|
// Query outgoing
|
|
const neighbors = try graph.getOutgoing(0, allocator);
|
|
defer allocator.free(neighbors);
|
|
|
|
try std.testing.expectEqual(neighbors.len, 1);
|
|
try std.testing.expectEqual(neighbors[0], 1);
|
|
|
|
// Retrieve edge
|
|
const edge = try graph.getEdge(0, 1);
|
|
try std.testing.expect(edge != null);
|
|
try std.testing.expectEqual(edge.?.from, 0);
|
|
try std.testing.expectEqual(edge.?.to, 1);
|
|
try std.testing.expectApproxEqAbs(edge.?.risk, -0.3, 0.001);
|
|
}
|
|
|
|
test "PersistentGraph: Kenya Rule compliance" {
|
|
const allocator = std.testing.allocator;
|
|
|
|
const path = "/tmp/test_kenya_db";
|
|
defer std.fs.deleteFileAbsolute(path) catch {};
|
|
|
|
// 10MB limit
|
|
var graph = try PersistentGraph.open(path, .{
|
|
.map_size = 10 * 1024 * 1024,
|
|
}, allocator);
|
|
defer graph.close();
|
|
|
|
// Add 1000 nodes
|
|
var i: u32 = 0;
|
|
while (i < 1000) : (i += 1) {
|
|
try graph.addNode(i);
|
|
}
|
|
|
|
// Verify database size
|
|
const stat = try std.fs.cwd().statFile(path);
|
|
try std.testing.expect(stat.size < 10 * 1024 * 1024);
|
|
}
|