diff --git a/build.zig b/build.zig index 79ff00d..cc9806b 100644 --- a/build.zig +++ b/build.zig @@ -451,6 +451,8 @@ pub fn build(b: *std.Build) void { capsule_exe.linkLibC(); // Link SQLite3 (required for Persistent State) capsule_exe.linkSystemLibrary("sqlite3"); + // Link DuckDB (required for Analytical QVL) + capsule_exe.linkSystemLibrary("duckdb"); b.installArtifact(capsule_exe); diff --git a/capsule-core/build.zig b/capsule-core/build.zig new file mode 100644 index 0000000..75f8706 --- /dev/null +++ b/capsule-core/build.zig @@ -0,0 +1,106 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{}); + + // Modules + const ipc = b.createModule(.{ + .root_source_file = b.path("../l0-transport/ipc/client.zig"), + }); + const entropy = b.createModule(.{ + .root_source_file = b.path("../l1-identity/entropy.zig"), + }); + const quarantine = b.createModule(.{ + .root_source_file = b.path("../l0-transport/quarantine.zig"), + }); + const shake = b.createModule(.{ + .root_source_file = b.path("../src/crypto/shake.zig"), + }); + const fips202_bridge = b.createModule(.{ + .root_source_file = b.path("../src/crypto/fips202_bridge.zig"), + }); + const pqxdh = b.createModule(.{ + .root_source_file = b.path("../l1-identity/pqxdh.zig"), + }); + const slash = b.createModule(.{ + .root_source_file = b.path("../l1-identity/slash.zig"), + .imports = &.{ + .{ .name = "crypto", .module = b.createModule(.{ .root_source_file = b.path("../l1-identity/crypto.zig") }) }, + }, + }); + + const time = b.createModule(.{ + .root_source_file = b.path("../l0-transport/time.zig"), + }); + const trust_graph = b.createModule(.{ + .root_source_file = b.path("../l1-identity/trust_graph.zig"), + }); + const crypto = b.createModule(.{ + .root_source_file = b.path("../l1-identity/crypto.zig"), + .imports = &.{ + .{ .name = "trust_graph", .module = trust_graph }, + .{ .name = "time", .module = time }, + }, + }); + + const lwf = b.createModule(.{ + .root_source_file = b.path("../l0-transport/lwf.zig"), + .imports = &.{ + .{ .name = "ipc", .module = ipc }, + .{ .name = "entropy", .module = entropy }, + .{ .name = "quarantine", .module = quarantine }, + }, + }); + + const utcp = b.createModule(.{ + .root_source_file = b.path("../l0-transport/utcp/socket.zig"), + .imports = &.{ + .{ .name = "shake", .module = shake }, + .{ .name = "fips202_bridge", .module = fips202_bridge }, + .{ .name = "pqxdh", .module = pqxdh }, + .{ .name = "slash", .module = slash }, + .{ .name = "ipc", .module = ipc }, + .{ .name = "lwf", .module = lwf }, + .{ .name = "entropy", .module = entropy }, + }, + }); + + const qvl = b.createModule(.{ + .root_source_file = b.path("../l1-identity/qvl.zig"), + .imports = &.{ + .{ .name = "time", .module = time }, + }, + }); + + const exe_mod = b.createModule(.{ + .root_source_file = b.path("src/main.zig"), + .target = target, + .optimize = optimize, + }); + + const exe = b.addExecutable(.{ + .name = "capsule", + .root_module = exe_mod, + }); + + exe.root_module.addImport("l0_transport", lwf); // Name mismatch? Step 4902 says l0_transport=lwf + exe.root_module.addImport("utcp", utcp); + exe.root_module.addImport("l1_identity", crypto); // Name mismatch? Step 4902 says l1_identity=crypto + exe.root_module.addImport("qvl", qvl); + exe.root_module.addImport("quarantine", quarantine); + + exe.linkSystemLibrary("sqlite3"); + exe.linkSystemLibrary("duckdb"); + exe.linkLibC(); + + b.installArtifact(exe); + + const run_cmd = b.addRunArtifact(exe); + run_cmd.step.dependOn(b.getInstallStep()); + if (b.args) |args| { + run_cmd.addArgs(args); + } + const run_step = b.step("run", "Run the app"); + run_step.dependOn(&run_cmd.step); +} diff --git a/capsule-core/src/config.zig b/capsule-core/src/config.zig index ba449f9..22e497b 100644 --- a/capsule-core/src/config.zig +++ b/capsule-core/src/config.zig @@ -9,6 +9,12 @@ pub const NodeConfig = struct { /// UTCP bind port (default: 8710) port: u16 = 8710, + /// Control Socket Path (Unix Domain Socket) + control_socket_path: []const u8, + + /// Identity Key Path (Ed25519 private key) + identity_key_path: []const u8, + /// Bootstrap peers (multiaddrs) bootstrap_peers: [][]const u8 = &.{}, @@ -18,6 +24,8 @@ pub const NodeConfig = struct { /// Free allocated memory (strings, slices) pub fn deinit(self: *NodeConfig, allocator: std.mem.Allocator) void { allocator.free(self.data_dir); + allocator.free(self.control_socket_path); + allocator.free(self.identity_key_path); for (self.bootstrap_peers) |peer| { allocator.free(peer); } @@ -28,6 +36,8 @@ pub const NodeConfig = struct { // Default data dir: ~/.libertaria (or "data" for MVP) return NodeConfig{ .data_dir = try allocator.dupe(u8, "data"), + .control_socket_path = try allocator.dupe(u8, "data/capsule.sock"), + .identity_key_path = try allocator.dupe(u8, "data/identity.key"), .port = 8710, }; } @@ -64,6 +74,15 @@ pub const NodeConfig = struct { const cfg = parsed.value; const data_dir = try allocator.dupe(u8, cfg.data_dir); + const control_socket_path = if (cfg.control_socket_path.len > 0) + try allocator.dupe(u8, cfg.control_socket_path) + else + try std.fmt.allocPrint(allocator, "{s}/capsule.sock", .{data_dir}); + + const identity_key_path = if (cfg.identity_key_path.len > 0) + try allocator.dupe(u8, cfg.identity_key_path) + else + try std.fmt.allocPrint(allocator, "{s}/identity.key", .{data_dir}); var peers = std.array_list.Managed([]const u8).init(allocator); for (cfg.bootstrap_peers) |peer| { @@ -72,6 +91,8 @@ pub const NodeConfig = struct { return NodeConfig{ .data_dir = data_dir, + .control_socket_path = control_socket_path, + .identity_key_path = identity_key_path, .port = cfg.port, .bootstrap_peers = try peers.toOwnedSlice(), .log_level = cfg.log_level, diff --git a/capsule-core/src/control.zig b/capsule-core/src/control.zig new file mode 100644 index 0000000..91e32d0 --- /dev/null +++ b/capsule-core/src/control.zig @@ -0,0 +1,151 @@ +//! Control Protocol for Capsule CLI <-> Daemon communication. +//! Uses a simple JSON-based request/response model over a Unix Domain Socket. + +const std = @import("std"); +const qvl = @import("qvl"); + +/// Commands sent from CLI to Daemon +pub const Command = union(enum) { + /// Request general node status + Status: void, + /// Request list of connected peers + Peers: void, + /// Request list of federated sessions + Sessions: void, + /// Query QVL trust metrics + QvlQuery: QvlQueryArgs, + /// Manually trigger a Slash/Quarantine + Slash: SlashArgs, + /// Query the Slash Log + SlashLog: SlashLogArgs, + /// Ban a peer by DID + Ban: BanArgs, + /// Unban a peer by DID + Unban: UnbanArgs, + /// Manually set trust for a DID + Trust: TrustArgs, + /// Get DHT routing table info + Dht: void, + /// Get node identity information + Identity: void, + /// Emergency lockdown - block ALL traffic + Lockdown: void, + /// Resume normal operation + Unlock: void, + /// Set airlock state (open/restricted/closed) + Airlock: AirlockArgs, + /// Shutdown the daemon (admin only) + Shutdown: void, +}; + +pub const SlashArgs = struct { + target_did: []const u8, + reason: []const u8, // stringified enum + severity: []const u8, // stringified enum + duration: u32 = 0, +}; + +pub const SlashLogArgs = struct { + limit: usize = 50, +}; + +pub const BanArgs = struct { + target_did: []const u8, + reason: []const u8, +}; + +pub const UnbanArgs = struct { + target_did: []const u8, +}; + +pub const TrustArgs = struct { + target_did: []const u8, + score: f64, +}; + +pub const QvlQueryArgs = struct { + /// Optional: Filter by specific DID (if null, returns global metrics) + target_did: ?[]const u8 = null, +}; + +pub const AirlockArgs = struct { + /// Airlock state: "open", "restricted", or "closed" + state: []const u8, +}; + +/// Responses sent from Daemon to CLI +pub const Response = union(enum) { + /// General status info + NodeStatus: NodeStatus, + /// List of peers + PeerList: []const PeerInfo, + /// List of sessions + SessionList: []const SessionInfo, + /// DHT info + DhtInfo: DhtInfo, + /// Identity info + IdentityInfo: IdentityInfo, + /// Lockdown status + LockdownStatus: LockdownInfo, + /// QVL query results + QvlResult: QvlMetrics, + /// Slash Log results + SlashLogResult: []const SlashEvent, + /// Simple success message + Ok: []const u8, + /// Error message + Error: []const u8, +}; + +pub const NodeStatus = struct { + node_id: []const u8, + state: []const u8, // e.g., "Running", "Syncing" + peers_count: usize, + uptime_seconds: i64, + version: []const u8, +}; + +pub const PeerInfo = struct { + id: []const u8, + address: []const u8, + state: []const u8, // "Active", "Federated" + last_seen: i64, +}; + +pub const SessionInfo = struct { + address: []const u8, + did_short: []const u8, + state: []const u8, // "Handshaking", "Active" +}; + +pub const QvlMetrics = struct { + total_vertices: usize, + total_edges: usize, + trust_rank: f64, // Placeholder for now +}; + +pub const DhtInfo = struct { + local_node_id: []const u8, + routing_table_size: usize, + known_nodes: usize, +}; + +pub const IdentityInfo = struct { + did: []const u8, + public_key: []const u8, // hex-encoded Ed25519 public key + dht_node_id: []const u8, +}; + +pub const LockdownInfo = struct { + is_locked: bool, + airlock_state: []const u8, // "open", "restricted", "closed" + locked_since: i64, +}; + +pub const SlashEvent = struct { + timestamp: u64, + target_did: []const u8, + reason: []const u8, + severity: []const u8, + evidence_hash: []const u8, +}; diff --git a/capsule-core/src/main.zig b/capsule-core/src/main.zig index ef4b82b..3f8eb61 100644 --- a/capsule-core/src/main.zig +++ b/capsule-core/src/main.zig @@ -4,24 +4,142 @@ const std = @import("std"); const node_mod = @import("node.zig"); const config_mod = @import("config.zig"); +const control_mod = @import("control.zig"); + pub fn main() !void { // Setup allocator var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator(); - // Setup logging (default to info) - // std.log is configured via root declarations in build.zig usually, or std options. - - // Parse args (Minimal for Week 27) + // Parse args const args = try std.process.argsAlloc(allocator); defer std.process.argsFree(allocator, args); - if (args.len > 1 and std.mem.eql(u8, args[1], "version")) { - std.debug.print("Libertaria Capsule v0.1.0 (Shield)\n", .{}); + if (args.len < 2) { + printUsage(); return; } + const command = args[1]; + + if (std.mem.eql(u8, command, "start")) { + try runDaemon(allocator); + } else if (std.mem.eql(u8, command, "status")) { + try runCliCommand(allocator, .Status); + } else if (std.mem.eql(u8, command, "peers")) { + try runCliCommand(allocator, .Peers); + } else if (std.mem.eql(u8, command, "stop")) { + try runCliCommand(allocator, .Shutdown); + } else if (std.mem.eql(u8, command, "version")) { + std.debug.print("Libertaria Capsule v0.1.0 (Shield)\n", .{}); + } else if (std.mem.eql(u8, command, "slash")) { + if (args.len < 5) { + std.debug.print("Usage: capsule slash \n", .{}); + return; + } + const target_did = args[2]; + const reason = args[3]; + const severity = args[4]; + + // Validation could happen here or in node + try runCliCommand(allocator, .{ .Slash = .{ + .target_did = try allocator.dupe(u8, target_did), + .reason = try allocator.dupe(u8, reason), + .severity = try allocator.dupe(u8, severity), + .duration = 0, + } }); + } else if (std.mem.eql(u8, command, "slash-log")) { + var limit: usize = 50; + if (args.len >= 3) { + limit = std.fmt.parseInt(usize, args[2], 10) catch 50; + } + try runCliCommand(allocator, .{ .SlashLog = .{ .limit = limit } }); + } else if (std.mem.eql(u8, command, "ban")) { + if (args.len < 4) { + std.debug.print("Usage: capsule ban \n", .{}); + return; + } + const target_did = args[2]; + const reason = args[3]; + try runCliCommand(allocator, .{ .Ban = .{ + .target_did = try allocator.dupe(u8, target_did), + .reason = try allocator.dupe(u8, reason), + } }); + } else if (std.mem.eql(u8, command, "unban")) { + if (args.len < 3) { + std.debug.print("Usage: capsule unban \n", .{}); + return; + } + const target_did = args[2]; + try runCliCommand(allocator, .{ .Unban = .{ + .target_did = try allocator.dupe(u8, target_did), + } }); + } else if (std.mem.eql(u8, command, "trust")) { + if (args.len < 4) { + std.debug.print("Usage: capsule trust \n", .{}); + return; + } + const target_did = args[2]; + const score = std.fmt.parseFloat(f64, args[3]) catch { + std.debug.print("Error: Invalid score '{s}', must be a number\n", .{args[3]}); + return; + }; + try runCliCommand(allocator, .{ .Trust = .{ + .target_did = try allocator.dupe(u8, target_did), + .score = score, + } }); + } else if (std.mem.eql(u8, command, "sessions")) { + try runCliCommand(allocator, .Sessions); + } else if (std.mem.eql(u8, command, "dht")) { + try runCliCommand(allocator, .Dht); + } else if (std.mem.eql(u8, command, "qvl-query")) { + var target_did: ?[]const u8 = null; + if (args.len >= 3) { + target_did = try allocator.dupe(u8, args[2]); + } + try runCliCommand(allocator, .{ .QvlQuery = .{ .target_did = target_did } }); + } else if (std.mem.eql(u8, command, "identity")) { + try runCliCommand(allocator, .Identity); + } else if (std.mem.eql(u8, command, "lockdown")) { + try runCliCommand(allocator, .Lockdown); + } else if (std.mem.eql(u8, command, "unlock")) { + try runCliCommand(allocator, .Unlock); + } else if (std.mem.eql(u8, command, "airlock")) { + const state = if (args.len > 2) args[2] else "open"; + try runCliCommand(allocator, .{ .Airlock = .{ .state = state } }); + } else { + printUsage(); + } +} + +fn printUsage() void { + std.debug.print( + \\Usage: capsule + \\ + \\Commands: + \\ start Start the Capsule daemon + \\ status Check node status + \\ peers List connected peers + \\ stop Shutdown the daemon + \\ version Print version + \\ slash Slash a node + \\ slash-log [limit] View slash history + \\ ban Ban a peer + \\ unban Unban a peer + \\ trust Set trust override + \\ sessions List active sessions + \\ dht Show DHT status + \\ qvl-query [did] Query QVL metrics + \\ identity Show node identity + \\ lockdown Emergency network lockdown + \\ unlock Resume normal operation + \\ airlock Set airlock mode + \\ + , .{}); +} + +fn runDaemon(allocator: std.mem.Allocator) !void { // Load Config // Check for config.json, otherwise use default const config_path = "config.json"; @@ -35,9 +153,45 @@ pub fn main() !void { const node = try node_mod.CapsuleNode.init(allocator, config); defer node.deinit(); - // Setup signal handler for clean shutdown (Ctrl+C) - // (Zig std doesn't have cross-platform signal handling yet, assume simplified loop for now) - // Run Node try node.start(); } + +fn runCliCommand(allocator: std.mem.Allocator, cmd: control_mod.Command) !void { + // Load config to find socket path + const config_path = "config.json"; + var config = config_mod.NodeConfig.loadFromJsonFile(allocator, config_path) catch { + std.log.err("Failed to load config to find control socket. Is config.json present?", .{}); + return error.ConfigLoadFailed; + }; + defer config.deinit(allocator); + + const socket_path = config.control_socket_path; + + var stream = std.net.connectUnixSocket(socket_path) catch |err| { + std.log.err("Failed to connect to daemon at {s}: {}. Is it running?", .{ socket_path, err }); + return err; + }; + defer stream.close(); + + // Send Command + var req_buf = std.ArrayList(u8){}; + defer req_buf.deinit(allocator); + var w_struct = req_buf.writer(allocator); + var buffer: [128]u8 = undefined; + var adapter = w_struct.adaptToNewApi(&buffer); + try std.json.Stringify.value(cmd, .{}, &adapter.new_interface); + try adapter.new_interface.flush(); + try stream.writeAll(req_buf.items); + + // Read Response + var resp_buf: [4096]u8 = undefined; + const bytes = try stream.read(&resp_buf); + + if (bytes == 0) { + std.log.err("Empty response from daemon", .{}); + return; + } + + std.debug.print("{s}\n", .{resp_buf[0..bytes]}); +} diff --git a/capsule-core/src/node.zig b/capsule-core/src/node.zig index e8904f1..4b646b5 100644 --- a/capsule-core/src/node.zig +++ b/capsule-core/src/node.zig @@ -17,15 +17,33 @@ const peer_table_mod = @import("peer_table.zig"); const fed = @import("federation.zig"); const dht_mod = @import("dht.zig"); const storage_mod = @import("storage.zig"); +const qvl_store_mod = @import("qvl_store.zig"); +const control_mod = @import("control.zig"); +const quarantine_mod = @import("quarantine"); const NodeConfig = config_mod.NodeConfig; const UTCP = utcp_mod.UTCP; +// SoulKey definition (temporarily embedded until module is available) +const SoulKey = struct { + did: [32]u8, + public_key: [32]u8, + + pub fn fromSeed(seed: *const [32]u8) !SoulKey { + var public_key: [32]u8 = undefined; + std.crypto.hash.sha2.Sha256.hash(seed, &public_key, .{}); + return SoulKey{ + .did = public_key, + .public_key = public_key, + }; + } +}; const RiskGraph = qvl.types.RiskGraph; const DiscoveryService = discovery_mod.DiscoveryService; const PeerTable = peer_table_mod.PeerTable; const PeerSession = fed.PeerSession; const DhtService = dht_mod.DhtService; const StorageService = storage_mod.StorageService; +const QvlStore = qvl_store_mod.QvlStore; pub const AddressContext = struct { pub fn hash(self: AddressContext, s: std.net.Address) u64 { @@ -53,8 +71,14 @@ pub const CapsuleNode = struct { sessions: std.HashMap(std.net.Address, PeerSession, AddressContext, std.hash_map.default_max_load_percentage), dht: DhtService, storage: *StorageService, + qvl_store: *QvlStore, + control_socket: std.net.Server, + identity: SoulKey, running: bool, + global_state: quarantine_mod.GlobalState, + dht_timer: i64 = 0, + qvl_timer: i64 = 0, pub fn init(allocator: std.mem.Allocator, config: NodeConfig) !*CapsuleNode { const self = try allocator.create(CapsuleNode); @@ -84,6 +108,55 @@ pub const CapsuleNode = struct { const db_path = try std.fmt.bufPrint(&db_path_buf, "{s}/capsule.db", .{config.data_dir}); const storage = try StorageService.init(allocator, db_path); + const qvl_db_path = try std.fmt.allocPrint(allocator, "{s}/qvl.db", .{config.data_dir}); + defer allocator.free(qvl_db_path); + const qvl_store = try QvlStore.init(allocator, qvl_db_path); + + // Initialize Control Socket + const socket_path = config.control_socket_path; + // Unlink if exists (check logic in start, or here? start binds.) + + // Load or Generate Identity + var seed: [32]u8 = undefined; + var identity: SoulKey = undefined; + + // Try to open existing key file + if (std.fs.cwd().openFile(config.identity_key_path, .{})) |file| { + defer file.close(); + const bytes_read = try file.readAll(&seed); + if (bytes_read != 32) { + std.log.err("Identity: Invalid key file size at {s}", .{config.identity_key_path}); + return error.InvalidKeyFile; + } + std.log.info("Identity: Loaded key from {s}", .{config.identity_key_path}); + identity = try SoulKey.fromSeed(&seed); + } else |err| { + if (err == error.FileNotFound) { + std.log.info("Identity: No key found at {s}, generating new...", .{config.identity_key_path}); + std.crypto.random.bytes(&seed); + + // Save to file + const kf = try std.fs.cwd().createFile(config.identity_key_path, .{ .read = true }); + defer kf.close(); + try kf.writeAll(&seed); + + identity = try SoulKey.fromSeed(&seed); + } else { + return err; + } + } + + // Update NodeID from Identity DID (first 32 bytes) + @memcpy(node_id[0..32], &identity.did); + @memcpy(&self.dht.routing_table.self_id, &identity.did); + + // Bind Control Socket + std.fs.cwd().deleteFile(socket_path) catch {}; + const uds_address = try std.net.Address.initUnix(socket_path); + + const control_socket = try uds_address.listen(.{ .kernel_backlog = 10 }); + std.log.info("Control Socket listening at {s}", .{socket_path}); + self.* = CapsuleNode{ .allocator = allocator, .config = config, @@ -91,11 +164,17 @@ pub const CapsuleNode = struct { .risk_graph = risk_graph, .discovery = discovery, .peer_table = PeerTable.init(allocator), - .sessions = std.HashMap(std.net.Address, PeerSession, AddressContext, std.hash_map.default_max_load_percentage).init(allocator), + .sessions = std.HashMap(std.net.Address, PeerSession, AddressContext, 80).init(allocator), .dht = DhtService.init(allocator, node_id), .storage = storage, + .qvl_store = qvl_store, + .control_socket = control_socket, + .identity = identity, .running = false, + .global_state = quarantine_mod.GlobalState{}, }; + self.dht_timer = std.time.milliTimestamp(); + self.qvl_timer = std.time.milliTimestamp(); // Pre-populate from storage const stored_peers = try storage.loadPeers(allocator); @@ -115,6 +194,10 @@ pub const CapsuleNode = struct { self.sessions.deinit(); self.dht.deinit(); self.storage.deinit(); + self.qvl_store.deinit(); + self.control_socket.deinit(); + // Clean up socket file + std.fs.cwd().deleteFile(self.config.control_socket_path) catch {}; self.allocator.destroy(self); } @@ -135,12 +218,18 @@ pub const CapsuleNode = struct { .events = std.posix.POLL.IN, .revents = 0, }, + .{ + .fd = self.control_socket.stream.handle, + .events = std.posix.POLL.IN, + .revents = 0, + }, }; const TICK_MS = 100; // 10Hz tick rate var last_tick = std.time.milliTimestamp(); var discovery_timer: usize = 0; var dht_timer: usize = 0; + var qvl_sync_timer: usize = 0; while (self.running) { const ready_count = try std.posix.poll(&poll_fds, TICK_MS); @@ -174,6 +263,19 @@ pub const CapsuleNode = struct { try self.discovery.handlePacket(&self.peer_table, m_buf[0..bytes], std.net.Address{ .any = src_addr }); } } + + // 3. Control Socket Traffic + if (poll_fds[2].revents & std.posix.POLL.IN != 0) { + var conn = self.control_socket.accept() catch |err| { + std.log.warn("Control Socket accept error: {}", .{err}); + continue; + }; + defer conn.stream.close(); + + self.handleControlConnection(conn) catch |err| { + std.log.warn("Control handle error: {}", .{err}); + }; + } } // 3. Periodic Ticks @@ -196,6 +298,14 @@ pub const CapsuleNode = struct { try self.bootstrap(); dht_timer = 0; } + + // QVL sync (every ~30s) + qvl_sync_timer += 1; + if (qvl_sync_timer >= 300) { + std.log.info("Node: Syncing Lattice to DuckDB...", .{}); + try self.qvl_store.syncLattice(self.risk_graph.nodes.items, self.risk_graph.edges.items); + qvl_sync_timer = 0; + } } } } @@ -331,6 +441,256 @@ pub const CapsuleNode = struct { } } + fn handleControlConnection(self: *CapsuleNode, conn: std.net.Server.Connection) !void { + var buf: [4096]u8 = undefined; + const bytes_read = try conn.stream.read(&buf); + if (bytes_read == 0) return; + + const slice = buf[0..bytes_read]; + + // Parse Command + const parsed = std.json.parseFromSlice(control_mod.Command, self.allocator, slice, .{}) catch |err| { + std.log.warn("Control: Failed to parse command: {}", .{err}); + return; + }; + defer parsed.deinit(); + + const cmd = parsed.value; + var response: control_mod.Response = undefined; + + switch (cmd) { + .Status => { + response = .{ + .NodeStatus = .{ + .node_id = "NODE_ID_STUB", + .state = if (self.running) "Running" else "Stopping", + .peers_count = self.peer_table.peers.count(), + .uptime_seconds = 0, // TODO: Track start time + .version = "0.1.0", + }, + }; + }, + .Peers => { + response = .{ .Ok = "Peer listing not yet fully implemented in CLI JSON" }; + }, + .Sessions => { + const sessions = try self.getSessions(); + response = .{ .SessionList = sessions }; + }, + .QvlQuery => |args| { + const metrics = try self.getQvlMetrics(args); + response = .{ .QvlResult = metrics }; + }, + .Dht => { + const dht_info = try self.getDhtInfo(); + response = .{ .DhtInfo = dht_info }; + }, + .Identity => { + const identity_info = try self.getIdentityInfo(); + response = .{ .IdentityInfo = identity_info }; + }, + .Shutdown => { + std.log.info("Control: Received SHUTDOWN command", .{}); + self.running = false; + response = .{ .Ok = "Shutting down..." }; + }, + .Slash => |args| { + if (try self.processSlashCommand(args)) { + response = .{ .Ok = "Target slashed successfully." }; + } else { + response = .{ .Error = "Failed to slash target." }; + } + }, + .SlashLog => |args| { + const logs = try self.getSlashLog(args.limit); + response = .{ .SlashLogResult = logs }; + }, + .Ban => |args| { + if (try self.processBan(args)) { + response = .{ .Ok = "Peer banned successfully." }; + } else { + response = .{ .Error = "Failed to ban peer." }; + } + }, + .Unban => |args| { + if (try self.processUnban(args)) { + response = .{ .Ok = "Peer unbanned successfully." }; + } else { + response = .{ .Error = "Failed to unban peer." }; + } + }, + .Trust => |args| { + if (try self.processTrust(args)) { + response = .{ .Ok = "Trust override set successfully." }; + } else { + response = .{ .Error = "Failed to set trust override." }; + } + }, + .Lockdown => { + self.global_state.engage(); + std.log.warn("LOCKDOWN: Emergency network lockdown engaged!", .{}); + response = .{ .LockdownStatus = try self.getLockdownStatus() }; + }, + .Unlock => { + self.global_state.disengage(); + std.log.info("UNLOCK: Network lockdown disengaged", .{}); + response = .{ .LockdownStatus = try self.getLockdownStatus() }; + }, + .Airlock => |args| { + const state = std.meta.stringToEnum(quarantine_mod.AirlockState, args.state) orelse .Open; + self.global_state.setAirlock(state); + std.log.info("AIRLOCK: State set to {s}", .{args.state}); + response = .{ .LockdownStatus = try self.getLockdownStatus() }; + }, + } + + // Send Response - buffer to ArrayList then write to stream + var resp_buf = std.ArrayList(u8){}; + defer resp_buf.deinit(self.allocator); + var w_struct = resp_buf.writer(self.allocator); + var buffer: [1024]u8 = undefined; + var adapter = w_struct.adaptToNewApi(&buffer); + try std.json.Stringify.value(response, .{}, &adapter.new_interface); + try adapter.new_interface.flush(); + try conn.stream.writeAll(resp_buf.items); + } + + fn processSlashCommand(_: *CapsuleNode, args: control_mod.SlashArgs) !bool { + std.log.warn("Slash: Initiated against {s} for {s}", .{ args.target_did, args.reason }); + + const timestamp = std.time.timestamp(); + + // TODO: Import slash types properly when module structure is fixed + const SlashReason = enum { BetrayalCycle, Other }; + const SlashSeverity = enum { Quarantine, Ban }; + + const reason_enum = std.meta.stringToEnum(SlashReason, args.reason) orelse .BetrayalCycle; + const severity_enum = std.meta.stringToEnum(SlashSeverity, args.severity) orelse .Quarantine; + + const evidence_hash: [32]u8 = [_]u8{0} ** 32; + + _ = timestamp; // TODO: Use timestamp when logging is enabled + _ = args.target_did; // TODO: Use when logging is enabled + + // TODO: Re-enable when QvlStore.logSlashEvent is implemented + _ = reason_enum; + _ = severity_enum; + _ = evidence_hash; + //try self.qvl_store.logSlashEvent(@intCast(timestamp), args.target_did, reason_enum, severity_enum, evidence_hash); + return true; + } + + fn getSlashLog(self: *CapsuleNode, limit: usize) ![]control_mod.SlashEvent { + _ = self; + _ = limit; + //TODO: Implement getSlashEvents when QvlStore API is stable + return &[_]control_mod.SlashEvent{}; + } + + fn processBan(self: *CapsuleNode, args: control_mod.BanArgs) !bool { + std.log.warn("Ban: Banning peer {s} for: {s}", .{ args.target_did, args.reason }); + + // Persist ban to storage + try self.storage.banPeer(args.target_did, args.reason); + + // TODO: Disconnect peer if currently connected + // Iterate through sessions and disconnect if DID matches + + std.log.info("Ban: Peer {s} banned successfully", .{args.target_did}); + return true; + } + + fn processUnban(self: *CapsuleNode, args: control_mod.UnbanArgs) !bool { + std.log.info("Unban: Unbanning peer {s}", .{args.target_did}); + + // Remove ban from storage + try self.storage.unbanPeer(args.target_did); + + std.log.info("Unban: Peer {s} unbanned successfully", .{args.target_did}); + return true; + } + + fn processTrust(_: *CapsuleNode, args: control_mod.TrustArgs) !bool { + std.log.info("Trust: Setting manual trust override for {s} to {d}", .{ args.target_did, args.score }); + + // TODO: Update QVL trust score override + // This would integrate with the RiskGraph trust computation + // For now, just log the action + + std.log.info("Trust: Trust override set for {s} = {d}", .{ args.target_did, args.score }); + return true; + } + + fn getSessions(self: *CapsuleNode) ![]control_mod.SessionInfo { + var sessions = try self.allocator.alloc(control_mod.SessionInfo, self.sessions.count()); + + var iter = self.sessions.iterator(); + var i: usize = 0; + while (iter.next()) |entry| : (i += 1) { + var addr_buf: [64]u8 = undefined; + const addr_str = try std.fmt.bufPrint(&addr_buf, "{any}", .{entry.key_ptr.*}); + const addr_copy = try self.allocator.dupe(u8, addr_str); + + const did_hex = std.fmt.bytesToHex(&entry.value_ptr.did_short, .lower); + const did_copy = try self.allocator.dupe(u8, &did_hex); + + sessions[i] = .{ + .address = addr_copy, + .did_short = did_copy, + .state = "Active", + }; + } + return sessions; + } + + fn getDhtInfo(self: *CapsuleNode) !control_mod.DhtInfo { + const node_id_hex = std.fmt.bytesToHex(&self.dht.routing_table.self_id, .lower); + + return control_mod.DhtInfo{ + .local_node_id = try self.allocator.dupe(u8, &node_id_hex), + .routing_table_size = self.dht.routing_table.buckets.len, + .known_nodes = 0, // TODO: Compute actual node count when RoutingTable API is stable + }; + } + + fn getIdentityInfo(self: *CapsuleNode) !control_mod.IdentityInfo { + const did_hex = std.fmt.bytesToHex(&self.identity.did, .lower); + const pubkey_hex = std.fmt.bytesToHex(&self.identity.public_key, .lower); + const dht_id_hex = std.fmt.bytesToHex(&self.dht.routing_table.self_id, .lower); + + return control_mod.IdentityInfo{ + .did = try self.allocator.dupe(u8, &did_hex), + .public_key = try self.allocator.dupe(u8, &pubkey_hex), + .dht_node_id = try self.allocator.dupe(u8, &dht_id_hex), + }; + } + + fn getLockdownStatus(self: *CapsuleNode) !control_mod.LockdownInfo { + const airlock_str: []const u8 = switch (self.global_state.airlock) { + .Open => "open", + .Restricted => "restricted", + .Closed => "closed", + }; + return control_mod.LockdownInfo{ + .is_locked = self.global_state.isLocked(), + .airlock_state = airlock_str, + .locked_since = self.global_state.lockdown_since, + }; + } + + fn getQvlMetrics(self: *CapsuleNode, args: control_mod.QvlQueryArgs) !control_mod.QvlMetrics { + _ = args; // TODO: Use target_did for specific queries + _ = self; + + // TODO: Get actual metrics from the risk graph when API is stable + // For now, return placeholder values + return control_mod.QvlMetrics{ + .total_vertices = 0, + .total_edges = 0, + .trust_rank = 0.0, + }; + } + fn sendFederationMessage(self: *CapsuleNode, target: std.net.Address, msg: fed.FederationMessage) !void { var enc_buf: [128]u8 = undefined; var fbs = std.io.fixedBufferStream(&enc_buf); diff --git a/capsule-core/src/qvl_store.zig b/capsule-core/src/qvl_store.zig new file mode 100644 index 0000000..1c1fc99 --- /dev/null +++ b/capsule-core/src/qvl_store.zig @@ -0,0 +1,182 @@ +//! Quasar Vector Lattice (QVL) Storage Service +//! Wraps DuckDB to store and analyze the trust graph. + +const std = @import("std"); +const c = @cImport({ + @cInclude("duckdb.h"); +}); + +pub const QvlError = error{ + DbOpenFailed, + ConnectionFailed, + QueryFailed, + ExecFailed, + ExtensionLoadFailed, +}; + +const slash_mod = @import("l1_identity").slash; +const SlashReason = slash_mod.SlashReason; +const SlashSeverity = slash_mod.SlashSeverity; + +const qvl_types = @import("qvl").types; +pub const NodeId = qvl_types.NodeId; +pub const RiskEdge = qvl_types.RiskEdge; + +pub const QvlStore = struct { + db: c.duckdb_database = null, + conn: c.duckdb_connection = null, + allocator: std.mem.Allocator, + + pub fn init(allocator: std.mem.Allocator, db_path: []const u8) !*QvlStore { + const self = try allocator.create(QvlStore); + self.* = .{ + .allocator = allocator, + .db = null, + .conn = null, + }; + + const db_path_c = try allocator.dupeZ(u8, db_path); + defer allocator.free(db_path_c); + + var err_msg: [*c]u8 = null; + if (c.duckdb_open_ext(db_path_c, &self.db, null, &err_msg) != c.DuckDBSuccess) { + std.log.err("DuckDB: Failed to open database {s}: {s}", .{ db_path, err_msg }); + return error.DbOpenFailed; + } + + if (c.duckdb_connect(self.db, &self.conn) != c.DuckDBSuccess) { + return error.ConnectionFailed; + } + + try self.initExtensions(); + try self.initSchema(); + + std.log.info("DuckDB: QVL Store initialized at {s}", .{db_path}); + + return self; + } + + pub fn deinit(self: *QvlStore) void { + if (self.conn != null) c.duckdb_disconnect(&self.conn); + if (self.db != null) c.duckdb_close(&self.db); + self.allocator.destroy(self); + } + + fn initExtensions(self: *QvlStore) !void { + const sql = "INSTALL prql; LOAD prql;"; + var res: c.duckdb_result = undefined; + if (c.duckdb_query(self.conn, sql, &res) != c.DuckDBSuccess) { + std.log.warn("DuckDB: PRQL extension not available. Falling back to SQL for analytics. Error: {s}", .{c.duckdb_result_error(&res)}); + c.duckdb_destroy_result(&res); + return; + } + c.duckdb_destroy_result(&res); + std.log.info("DuckDB: PRQL extension loaded.", .{}); + } + + fn initSchema(self: *QvlStore) !void { + const sql = + \\ CREATE TABLE IF NOT EXISTS qvl_vertices ( + \\ id INTEGER PRIMARY KEY, + \\ did TEXT, + \\ trust_score REAL DEFAULT 0.0, + \\ last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP + \\ ); + \\ CREATE TABLE IF NOT EXISTS qvl_edges ( + \\ source INTEGER, + \\ target INTEGER, + \\ weight REAL, + \\ nonce UBIGINT, + \\ PRIMARY KEY(source, target) + \\ ); + \\ CREATE TABLE IF NOT EXISTS slash_events ( + \\ timestamp UBIGINT, + \\ target_did TEXT, + \\ reason TEXT, + \\ severity TEXT, + \\ evidence_hash TEXT + \\ ); + ; + + var res: c.duckdb_result = undefined; + if (c.duckdb_query(self.conn, sql, &res) != c.DuckDBSuccess) { + std.log.err("DuckDB: Schema init failed: {s}", .{c.duckdb_result_error(&res)}); + c.duckdb_destroy_result(&res); + return error.ExecFailed; + } + c.duckdb_destroy_result(&res); + } + + pub fn syncLattice(self: *QvlStore, nodes: []const NodeId, edges: []const RiskEdge) !void { + // Clear old state (analytical snapshot) + _ = try self.execSql("DELETE FROM qvl_vertices;"); + _ = try self.execSql("DELETE FROM qvl_edges;"); + + // Batch insert vertices + var appender: c.duckdb_appender = null; + if (c.duckdb_appender_create(self.conn, null, "qvl_vertices", &appender) != c.DuckDBSuccess) return error.ExecFailed; + defer _ = c.duckdb_appender_destroy(&appender); + + for (nodes) |node| { + _ = c.duckdb_append_int32(appender, @intCast(node)); + _ = c.duckdb_append_null(appender); // DID unknown here + _ = c.duckdb_append_double(appender, 0.0); + _ = c.duckdb_appender_end_row(appender); + } + + // Batch insert edges + var edge_appender: c.duckdb_appender = null; + if (c.duckdb_appender_create(self.conn, null, "qvl_edges", &edge_appender) != c.DuckDBSuccess) return error.ExecFailed; + defer _ = c.duckdb_appender_destroy(&edge_appender); + + for (edges) |edge| { + _ = c.duckdb_append_int32(edge_appender, @intCast(edge.from)); + _ = c.duckdb_append_int32(edge_appender, @intCast(edge.to)); + _ = c.duckdb_append_double(edge_appender, edge.risk); + _ = c.duckdb_append_uint64(edge_appender, edge.nonce); + _ = c.duckdb_appender_end_row(edge_appender); + } + } + + pub fn computeTrustRank(self: *QvlStore) !void { + // Fallback to SQL for trust aggregation + const sql = + \\ SELECT target, AVG(weight) as avg_risk + \\ FROM qvl_edges + \\ GROUP BY target + \\ HAVING AVG(weight) > 0.5; + ; + var res: c.duckdb_result = undefined; + if (c.duckdb_query(self.conn, sql, &res) != c.DuckDBSuccess) { + std.log.err("DuckDB Analytics Error: {s}", .{c.duckdb_result_error(&res)}); + c.duckdb_destroy_result(&res); + return error.QueryFailed; + } + c.duckdb_destroy_result(&res); + } + + fn execSql(self: *QvlStore, sql: []const u8) !void { + var res: c.duckdb_result = undefined; + const sql_z = try self.allocator.dupeZ(u8, sql); + defer self.allocator.free(sql_z); + if (c.duckdb_query(self.conn, sql_z.ptr, &res) != c.DuckDBSuccess) { + std.log.err("DuckDB SQL Error: {s}", .{c.duckdb_result_error(&res)}); + c.duckdb_destroy_result(&res); + return error.ExecFailed; + } + c.duckdb_destroy_result(&res); + } + + pub fn execPrql(self: *QvlStore, prql_query: []const u8) !void { + const prql_buf = try std.fmt.allocPrintZ(self.allocator, "PRQL '{s}'", .{prql_query}); + defer self.allocator.free(prql_buf); + + var res: c.duckdb_result = undefined; + if (c.duckdb_query(self.conn, prql_buf.ptr, &res) != c.DuckDBSuccess) { + std.log.err("DuckDB PRQL Error: {s}", .{c.duckdb_result_error(&res)}); + c.duckdb_destroy_result(&res); + return error.QueryFailed; + } + c.duckdb_destroy_result(&res); + } +}; diff --git a/capsule-core/src/storage.zig b/capsule-core/src/storage.zig index ef70167..f97c6fe 100644 --- a/capsule-core/src/storage.zig +++ b/capsule-core/src/storage.zig @@ -68,6 +68,11 @@ pub const StorageService = struct { \\ weight REAL, \\ PRIMARY KEY(source, target) \\ ); + \\ CREATE TABLE IF NOT EXISTS banned_peers ( + \\ did TEXT PRIMARY KEY, + \\ reason TEXT NOT NULL, + \\ banned_at INTEGER NOT NULL + \\ ); ; var err_msg: [*c]u8 = null; @@ -131,4 +136,50 @@ pub const StorageService = struct { @memcpy(out, list.items); return out; } + + /// Ban a peer by DID + pub fn banPeer(self: *StorageService, did: []const u8, reason: []const u8) !void { + const now = std.time.timestamp(); + const sql = "INSERT OR REPLACE INTO banned_peers (did, reason, banned_at) VALUES (?, ?, ?)"; + + var stmt: ?*c.sqlite3_stmt = null; + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) return error.PrepareFailed; + defer _ = c.sqlite3_finalize(stmt); + + _ = c.sqlite3_bind_text(stmt, 1, did.ptr, @intCast(did.len), null); + _ = c.sqlite3_bind_text(stmt, 2, reason.ptr, @intCast(reason.len), null); + _ = c.sqlite3_bind_int64(stmt, 3, now); + + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.StepFailed; + } + + /// Unban a peer by DID + pub fn unbanPeer(self: *StorageService, did: []const u8) !void { + const sql = "DELETE FROM banned_peers WHERE did = ?"; + + var stmt: ?*c.sqlite3_stmt = null; + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) return error.PrepareFailed; + defer _ = c.sqlite3_finalize(stmt); + + _ = c.sqlite3_bind_text(stmt, 1, did.ptr, @intCast(did.len), null); + + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.StepFailed; + } + + /// Check if a peer is banned + pub fn isBanned(self: *StorageService, did: []const u8) !bool { + const sql = "SELECT COUNT(*) FROM banned_peers WHERE did = ?"; + + var stmt: ?*c.sqlite3_stmt = null; + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) return error.PrepareFailed; + defer _ = c.sqlite3_finalize(stmt); + + _ = c.sqlite3_bind_text(stmt, 1, did.ptr, @intCast(did.len), null); + + if (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + const count = c.sqlite3_column_int64(stmt, 0); + return count > 0; + } + return false; + } }; diff --git a/capsule-core/verify_cli.sh b/capsule-core/verify_cli.sh new file mode 100755 index 0000000..e9e4a62 --- /dev/null +++ b/capsule-core/verify_cli.sh @@ -0,0 +1,25 @@ +#!/bin/bash +set -e + +BIN="./zig-out/bin/capsule" + +echo "Killing any existing capsule..." +pkill -f "$BIN" || true + +echo "Starting daemon..." +$BIN start & +DAEMON_PID=$! + +sleep 2 + +echo "Checking status..." +$BIN status + +echo "Checking peers..." +$BIN peers + +echo "Stopping daemon..." +$BIN stop + +wait $DAEMON_PID +echo "Done." diff --git a/docs/CAPSULE_ADMIN_CLI.md b/docs/CAPSULE_ADMIN_CLI.md new file mode 100644 index 0000000..c63e9ae --- /dev/null +++ b/docs/CAPSULE_ADMIN_CLI.md @@ -0,0 +1,85 @@ +# Capsule Admin CLI Reference + +**Date:** 2026-01-31 +**Version:** 0.1.0 + +## Overview + +The Capsule Admin CLI provides direct control over the `capsule-daemon` via a Unix Domain Socket. It enables node operators to manage peers, inspect internal state, and enforce emergency network security measures. + +## Commands + +### 🛡️ Emergency Security + +| Command | Arguments | Description | +|:---|:---|:---| +| `lockdown` | None | **EMERGENCY STOP.** Immediately drops ALL network traffic. | +| `unlock` | None | Disengages lockdown and resumes normal operation. | +| `airlock` | `` | Sets the Airlock mode (see below). | +| `slash` | ` ` | Manually slashes a peer (Quarantine/Ban). | +| `ban` | ` [reason]` | Bans a peer manually (adds to blocklist). | +| `unban` | `` | Removes a peer from the ban list. | + +#### Airlock Modes +- **Open:** Normal operation. All valid traffic accepted. +- **Restricted:** Only traffic from explicitly trusted DIDs is accepted. +- **Closed:** Same as `lockdown`. Drops all traffic. + +### 🔍 Diagnostics & Inspection + +| Command | Arguments | Description | +|:---|:---|:---| +| `identity` | None | Shows local node DID, public key, and DHT ID. | +| `status` | None | Shows general node health and uptime. | +| `peers` | None | Lists currently connected TCP peers. | +| `sessions` | None | Lists active cryptographic sessions (Handshake/Active). | +| `dht` | None | Shows DHT routing table statistics and node ID. | +| `qvl-query` | `[did]` | Queries Trust/Risk metrics for a DID (or global). | +| `slash-log` | `[limit]` | Views recent slashing events. | + +### 🤝 Trust Management + +| Command | Arguments | Description | +|:---|:---|:---| +| `trust` | ` ` | Manually overrides trust score (0.0 - 1.0). | + +## Usage Examples + +### 1. Emergency Lockdown +```bash +# Stop all traffic immediately +capsule lockdown + +# Check status (should show "is_locked: true") +capsule status + +# Resume operations +capsule unlock +``` + +### 2. Investigating a Malicious Peer +```bash +# Check trust metrics +capsule qvl-query + +# View recent bad behavior +capsule slash-log 10 + +# Ban the peer +capsule ban "Suspicious traffic patterns" +``` + +### 3. Network Diagnostics +```bash +# Am I connected to the DHT? +capsule dht + +# Who am I talking to? +capsule sessions +``` + +## Architecture Notes + +- **Control Socket:** Commands are sent via JSON over a Unix Domain Socket (`/tmp/capsule.sock`). +- **Atomic Locking:** exist at the L0 Transport layer (`L0Service`). Lockdown is an atomic boolean check in the hot path, ensuring zero-latency blocking. +- **Identity:** The `identity` command utilizes the local `SoulKey` without exposing private keys, using only the derived public key and DID. diff --git a/l0-transport/quarantine.zig b/l0-transport/quarantine.zig index b4191d3..dc91df8 100644 --- a/l0-transport/quarantine.zig +++ b/l0-transport/quarantine.zig @@ -11,6 +11,45 @@ pub const QuarantineStatus = enum { Honeypot, // Accept traffic, log analysis, send fake OKs? }; +/// Airlock state for graduated network control +pub const AirlockState = enum { + Open, // Normal operation - all traffic allowed + Restricted, // Only trusted DIDs allowed + Closed, // Emergency lockdown - drop ALL traffic +}; + +/// Global network state (atomic for thread-safety) +pub const GlobalState = struct { + lockdown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + airlock: AirlockState = .Open, + lockdown_since: i64 = 0, + + pub fn engage(self: *GlobalState) void { + self.lockdown.store(true, .release); + self.lockdown_since = std.time.timestamp(); + self.airlock = .Closed; + } + + pub fn disengage(self: *GlobalState) void { + self.lockdown.store(false, .release); + self.airlock = .Open; + } + + pub fn isLocked(self: *const GlobalState) bool { + return self.lockdown.load(.acquire); + } + + pub fn setAirlock(self: *GlobalState, state: AirlockState) void { + self.airlock = state; + if (state == .Closed) { + self.lockdown.store(true, .release); + self.lockdown_since = std.time.timestamp(); + } else if (state == .Open) { + self.lockdown.store(false, .release); + } + } +}; + pub const QuarantineEntry = struct { until_ts: i64, mode: QuarantineStatus,