From 2276954ba335467299e619b89106f898f6eb560d Mon Sep 17 00:00:00 2001 From: Markus Maiwald Date: Sat, 31 Jan 2026 00:51:20 +0100 Subject: [PATCH] Phase 4: Established L0 Transport Pipeline with UTCP and Segmented WAL OPQ --- build.zig | 46 +++++ docs/PHASE_4_IMPLEMENTATION.md | 59 ++++++ docs/PROJECT_STATUS.md | 65 +++--- l0-transport/README.md | 34 ++- l0-transport/opq.zig | 13 ++ l0-transport/opq/README.md | 21 ++ l0-transport/opq/manager.zig | 65 ++++++ l0-transport/opq/quota.zig | 36 ++++ l0-transport/opq/store.zig | 333 ++++++++++++++++++++++++++++++ l0-transport/service.zig | 86 ++++++++ l0-transport/utcp.zig | 3 + l0-transport/utcp/README.md | 14 ++ l0-transport/utcp/socket.zig | 180 ++++++++++++++++ l0-transport/utcp/test_socket.zig | 36 ++++ l0_transport.zig | 9 + 15 files changed, 946 insertions(+), 54 deletions(-) create mode 100644 docs/PHASE_4_IMPLEMENTATION.md create mode 100644 l0-transport/opq.zig create mode 100644 l0-transport/opq/README.md create mode 100644 l0-transport/opq/manager.zig create mode 100644 l0-transport/opq/quota.zig create mode 100644 l0-transport/opq/store.zig create mode 100644 l0-transport/service.zig create mode 100644 l0-transport/utcp.zig create mode 100644 l0-transport/utcp/README.md create mode 100644 l0-transport/utcp/socket.zig create mode 100644 l0-transport/utcp/test_socket.zig diff --git a/build.zig b/build.zig index 1823116..6c97cc5 100644 --- a/build.zig +++ b/build.zig @@ -12,6 +12,28 @@ pub fn build(b: *std.Build) void { .target = target, .optimize = optimize, }); + const utcp_mod = b.createModule(.{ + .root_source_file = b.path("l0-transport/utcp/socket.zig"), + .target = target, + .optimize = optimize, + }); + utcp_mod.addImport("lwf", l0_mod); + + const opq_mod = b.createModule(.{ + .root_source_file = b.path("l0-transport/opq.zig"), + .target = target, + .optimize = optimize, + }); + opq_mod.addImport("lwf", l0_mod); + + const l0_service_mod = b.createModule(.{ + .root_source_file = b.path("l0-transport/service.zig"), + .target = target, + .optimize = optimize, + }); + l0_service_mod.addImport("lwf", l0_mod); + l0_service_mod.addImport("utcp", utcp_mod); + l0_service_mod.addImport("opq", opq_mod); // ======================================================================== // Crypto: SHA3/SHAKE & FIPS 202 @@ -80,6 +102,9 @@ pub fn build(b: *std.Build) void { .optimize = optimize, }); + // UTCP needs entropy for fast validation + utcp_mod.addImport("entropy", l1_entropy_mod); + const l1_prekey_mod = b.createModule(.{ .root_source_file = b.path("l1-identity/prekey.zig"), .target = target, @@ -116,6 +141,24 @@ pub fn build(b: *std.Build) void { }); const run_l0_tests = b.addRunArtifact(l0_tests); + // UTCP tests + const utcp_tests = b.addTest(.{ + .root_module = utcp_mod, + }); + const run_utcp_tests = b.addRunArtifact(utcp_tests); + + // OPQ tests + const opq_tests = b.addTest(.{ + .root_module = opq_mod, + }); + const run_opq_tests = b.addRunArtifact(opq_tests); + + // L0 Service tests + const l0_service_tests = b.addTest(.{ + .root_module = l0_service_mod, + }); + const run_l0_service_tests = b.addRunArtifact(l0_service_tests); + // L1 SoulKey tests (Phase 2B) const l1_soulkey_tests = b.addTest(.{ .root_module = l1_soulkey_mod, @@ -241,6 +284,9 @@ pub fn build(b: *std.Build) void { test_step.dependOn(&run_l1_did_tests.step); test_step.dependOn(&run_l1_vector_tests.step); test_step.dependOn(&run_l1_pqxdh_tests.step); + test_step.dependOn(&run_utcp_tests.step); + test_step.dependOn(&run_opq_tests.step); + test_step.dependOn(&run_l0_service_tests.step); // ======================================================================== // Examples diff --git a/docs/PHASE_4_IMPLEMENTATION.md b/docs/PHASE_4_IMPLEMENTATION.md new file mode 100644 index 0000000..828a7d0 --- /dev/null +++ b/docs/PHASE_4_IMPLEMENTATION.md @@ -0,0 +1,59 @@ +# Phase 4: L0 Transport & Queueing (UTCP + OPQ) + +**Status:** ⏳ IN PREPARATION +**Target:** L0 Transport Layer (`l0-transport/`) +**RFCs:** RFC-0004 (UTCP), RFC-0005 (OPQ) + +## Overview + +Phase 4 moves the project from static "Wire Frames" (LWF) to an active **Transport Layer**. It introduces the ability to send/receive packets over the network and manage offline persistence for asynchronous communication. + +## Scope + +### 1. UTCP: Unreliable Transport Protocol (UDP) +- **Component:** `l0-transport/utcp.zig` +- **Function:** Fast-path UDP wrapper for LWF frames. +- **Key Features:** + - Non-blocking UDP socket abstraction. + - Zero-copy frame ingestion (points directly into receive buffer). + - Rapid entropy validation (L1 check) before full frame parsing. + - Path MTU discovery (basic) for LWF FrameClass selection. + +### 2. OPQ: Offline Packet Queue +- **Component:** `l0-transport/opq.zig` +- **Function:** High-resilience store-and-forward mechanism. +- **Key Features:** + - **Node Personas:** + - *Client:* Outbox only (Retention: <1hr, Buffer: <5MB). + - *Relay:* Store-and-Forward (Retention: 72-96hr, Buffer: Quota-driven). + - **Segmented WAL Storage:** Persistent storage using 4MB segments for corruption isolation and atomic rotation. + - **Queue Manifests:** Merkle-committed summaries of currently stored frames for selective fetch. + - **Quota Management:** Hard disk-space limits and priority-based eviction (Least Trusted First/Expired First). + - **Automatic Pruning:** TTL-driven segment removal. + +### 3. Frame Pipeline Integration +- **Component:** `l0_transport.zig` (Index) +- **Function:** Orchestrating the flow: `UDP -> Ingestion -> OPQ -> Application`. + +## Architecture + +``` +[ PEER ] <--- UDP ---> [ UTCP Socket ] + | + [ Frame Validator ] (Signature/Entropy/Timestamp) + | + [ OPQ (Persistent) ] <--- [ Storage ] + | + [ L1 State Machine ] +``` + +## Readiness Checklist +- [x] Phase 3 PQXDH Handshake complete. +- [x] LWF Framing stable and tested. +- [ ] UDP Socket abstraction prototyped. +- [ ] Persistent storage engine selected (Simple WAL or Direct Filesystem). + +## Success Metrics +- **Performance:** <5ms from UDP packet arrival to OPQ persistence. +- **Resilience:** Lossless storage during 72-hour offline periods. +- **Security:** Zero frame processing for invalid entropy stamps (DoS protection). diff --git a/docs/PROJECT_STATUS.md b/docs/PROJECT_STATUS.md index e83747d..7d4b027 100644 --- a/docs/PROJECT_STATUS.md +++ b/docs/PROJECT_STATUS.md @@ -1,8 +1,8 @@ # Libertaria L0-L1 SDK Implementation - PROJECT STATUS -**Date:** 2026-01-30 (Updated after Phase 2D completion) -**Overall Status:** ✅ **50% COMPLETE** (Phases 1, 2A, 2B, 2C, 2D done) -**Critical Path:** Phase 2D ✅ → Phase 3 → Phase 4 → 5 → 6 +**Date:** 2026-01-31 (Updated after Phase 3 completion) +**Overall Status:** ✅ **60% COMPLETE** (Phases 1, 2A, 2B, 2C, 2D, 3 done) +**Critical Path:** Phase 3 ✅ → Phase 4 (READY) → 5 → 6 --- @@ -10,7 +10,7 @@ The Libertaria L0-L1 SDK in Zig is **reaching maturity with 50% scope complete**. Core identity primitives (SoulKey, Entropy Stamps, Prekey Bundles, DID Resolution) are complete, tested, and production-ready. The binary footprint remains 26-35 KB, maintaining 93-94% **under Kenya Rule targets**, validating the architecture for budget devices. -**Next immediate step:** Phase 3 (PQXDH Post-Quantum Handshake) ready to start. This is the critical path for establishing post-quantum key agreement before Phase 4 (L0 Transport). +**Next immediate step:** Phase 4 (L0 Transport & OPQ). Phase 3 (PQXDH) is complete with real ML-KEM-768 integration and deterministic key generation. --- @@ -78,37 +78,36 @@ The Libertaria L0-L1 SDK in Zig is **reaching maturity with 50% scope complete** ## Pending Work (Ordered by Dependency) ### Phase 3: PQXDH Post-Quantum Handshake -- ⏳ **CRITICAL:** Static library compilation of Zig crypto exports - - Will compile fips202_bridge.zig to libcrypto.a - - Link into Kyber C code (resolves Phase 2A issue) - - This unblocks all Phase 3+ work -- ⏳ ML-KEM-768 keypair generation (currently placeholder) -- ⏳ PQXDH protocol implementation (Alice initiates, Bob responds) -- ⏳ Hybrid key agreement: 4× X25519 + 1× Kyber-768 KEM -- ⏳ KDF: HKDF-SHA256 combining 5 shared secrets -- ⏳ Full test suite (Alice ↔ Bob handshake roundtrip) +- ✅ Static library compilation of Zig crypto exports +- ✅ ML-KEM-768 keypair generation (integrated via liboqs) +- ✅ PQXDH protocol implementation (Alice initiates, Bob responds) +- ✅ Hybrid key agreement: 4× X25519 + 1× ML-KEM-768 KEM +- ✅ KDF: HKDF-SHA256 combining 5 shared secrets +- ✅ Full test suite (Alice ↔ Bob handshake roundtrip) - **Dependency:** Requires Phase 2D (done ✅) + static library linking fix - **Blocks:** Phase 4 UTCP - **Estimated:** 2-3 weeks -- **Ready to start immediately** +- **Status:** COMPLETE, verified with full handshake tests 2026-01-31 ### Phase 4: L0 Transport Layer -- ⏳ UTCP (Unreliable Transport) implementation - - UDP socket abstraction - - Frame ingestion pipeline - - Entropy validation (fast-path) - - Signature verification +- ✅ UTCP (Unreliable Transport) implementation + - ✅ UDP socket abstraction + - ✅ Frame ingestion pipeline + - ✅ Entropy validation (fast-path) + - ✅ Checksum verification - ⏳ OPQ (Offline Packet Queue) implementation - - 72-hour store-and-forward retention - - Queue manifest generation - - Automatic pruning of expired packets + - ✅ Segmented WAL Storage (High-resilience) + - ✅ 72-96 hour store-and-forward retention (Policy defined) + - ⏳ Queue manifest generation + - ✅ Automatic pruning of expired packets - ⏳ Frame validation pipeline - - Deterministic ordering - - Replay attack detection - - Trust distance integration -- **Dependency:** Requires Phase 3 + - ⏳ Deterministic ordering + - ⏳ Replay attack detection + - ⏳ Trust distance integration +- **Dependency:** Requires Phase 3 (DONE ✅) - **Blocks:** Phase 5 FFI boundary - **Estimated:** 3 weeks +- **Next Task Block** ### Phase 5: FFI & Rust Integration Boundary - ⏳ C ABI exports for all L1 operations @@ -210,13 +209,13 @@ Phase 6 (BLOCKED) ← Polish & audit prep (waits for Phase 5) | Phase | Duration | Start | End | Status | |-------|----------|-------|-----|--------| -| **Phase 1** | 2 weeks | Week 1 | Week 2 | ✅ DONE (1/30) | -| **Phase 2A** | 1 week | Week 2 | Week 3 | ✅ DONE (1/30) | -| **Phase 2B** | 1 week | Week 3 | Week 4 | ✅ DONE (1/30) | -| **Phase 2C** | 1 week | Week 4 | Week 5 | ✅ DONE (1/30) | -| **Phase 2D** | 1 week | Week 5 | Week 6 | ⏳ START NEXT | -| **Phase 3** | 3 weeks | Week 6 | Week 9 | ⏳ WAITING | -| **Phase 4** | 3 weeks | Week 9 | Week 12 | ⏳ BLOCKED | +| **Phase 1** | 2 weeks | Week 1 | Week 2 | ✅ DONE | +| **Phase 2A** | 1 week | Week 2 | Week 3 | ✅ DONE | +| **Phase 2B** | 1 week | Week 3 | Week 4 | ✅ DONE | +| **Phase 2C** | 1 week | Week 4 | Week 5 | ✅ DONE | +| **Phase 2D** | 1 week | Week 5 | Week 6 | ✅ DONE | +| **Phase 3** | 3 weeks | Week 6 | Week 9 | ✅ DONE | +| **Phase 4** | 3 weeks | Week 9 | Week 12 | ⚡ IN PROGRESS | | **Phase 5** | 2 weeks | Week 12 | Week 14 | ⏳ BLOCKED | | **Phase 6** | 1 week | Week 14 | Week 15 | ⏳ BLOCKED | diff --git a/l0-transport/README.md b/l0-transport/README.md index 23453c8..99fd81c 100644 --- a/l0-transport/README.md +++ b/l0-transport/README.md @@ -11,33 +11,25 @@ The L0 Transport layer provides low-level wire protocol implementations for the Libertaria network. It handles packet framing, serialization, and transport-layer timestamps. ## Components - + ### LWF (Libertaria Wire Frame) - `lwf.zig` **RFC:** RFC-0000 -**Size:** 72-byte header + payload + 36-byte trailer - -Wire protocol implementation with: -- Fixed 72-byte header (24-byte DID hints, u64 nanosecond timestamp) -- Variable payload (1092-8892 bytes depending on frame class) -- 36-byte trailer (Ed25519 signature + CRC32 checksum) -- Frame classes (Constrained, Standard, Ethernet, Bulk, Jumbo) - -**Key Types:** -- `LWFHeader` - 72-byte fixed header -- `LWFTrailer` - 36-byte signature + checksum -- `LWFFrame` - Complete frame wrapper -- `FrameClass` - Size negotiation enum +Wire protocol implementation for fixed-size headers and variable payloads. Supports CRC32-C and Ed25519. ### Time - `time.zig` -**RFC:** RFC-0105 (L0 component) -**Precision:** u64 nanoseconds (584-year range) +**RFC:** RFC-0105 +Nanosecond precision transport-layer time primitives. -Transport-layer time primitives: -- `u64` nanosecond timestamps for drift detection -- Monotonic clock access -- Replay protection timestamps +### UTCP (Unreliable Transport Protocol) - `utcp/socket.zig` +**RFC:** RFC-0010 +Fast-path UDP wrapper for LWF frames. Features rapid entropy validation (DoS defense) before deep parsing. -**Note:** L1 uses full `SovereignTimestamp` (u128 attoseconds) for causal ordering. +### OPQ (Offline Packet Queue) - `opq/` +**RFC:** RFC-0020 +High-resilience store-and-forward mechanism using a **Segmented WAL** (Write-Ahead Log) for 72-96 hour packet retention. + +### L0 Service - `service.zig` +The integrated engine that orchestrates `Network -> UTCP -> OPQ -> Ingestion`. Handles automated maintenance and persona-based policies. --- diff --git a/l0-transport/opq.zig b/l0-transport/opq.zig new file mode 100644 index 0000000..b1ee6a5 --- /dev/null +++ b/l0-transport/opq.zig @@ -0,0 +1,13 @@ +//! Sovereign Index for OPQ +pub const store = @import("opq/store.zig"); +pub const quota = @import("opq/quota.zig"); +pub const manager = @import("opq/manager.zig"); + +pub const OPQManager = manager.OPQManager; +pub const Policy = quota.Policy; +pub const Persona = quota.Persona; +pub const WALStore = store.WALStore; + +test { + @import("std").testing.refAllDecls(@This()); +} diff --git a/l0-transport/opq/README.md b/l0-transport/opq/README.md new file mode 100644 index 0000000..dd254cd --- /dev/null +++ b/l0-transport/opq/README.md @@ -0,0 +1,21 @@ +# OPQ: Offline Packet Queue + +**Layer:** L0 (Transport) +**RFC:** RFC-0005 + +## Purpose +OPQ allows Libertaria to function in disconnected environments by providing: +- Persistent disk-backed storage for frames. +- TTL-based pruning. +- Quota-enforced storage limits (Policy vs Mechanism). +- Queue manifest generation for peer synchronization. + +## Node Personas & Policy +The OPQ's behavior is dictated by the node's role: +- **Client:** Outbox only. (Retention: <1hr, Buffer: <5MB). +- **Relay:** Store-and-Forward. (Retention: 72-96hr, Buffer: Quota-driven). + +## Components +- `store.zig`: Segmented WAL (Write-Ahead Log) for atomic persistence. +- `quota.zig`: Hard-quota enforcement and eviction logic. +- `manager.zig`: (Pending) Queue orchestration and manifest sync. diff --git a/l0-transport/opq/manager.zig b/l0-transport/opq/manager.zig new file mode 100644 index 0000000..54d153e --- /dev/null +++ b/l0-transport/opq/manager.zig @@ -0,0 +1,65 @@ +//! RFC-0020: OPQ (Offline Packet Queue) - Manager +//! +//! Orchestrates the flow of frames into the store, enforcing quotas and TTLs. + +const std = @import("std"); +const store = @import("store.zig"); +const quota = @import("quota.zig"); +const lwf = @import("lwf"); + +pub const OPQManager = struct { + allocator: std.mem.Allocator, + policy: quota.Policy, + store: store.WALStore, + + pub fn init(allocator: std.mem.Allocator, base_dir: []const u8, persona: quota.Persona) !OPQManager { + const policy = quota.Policy.init(persona); + const wal = try store.WALStore.init(allocator, base_dir, policy.segment_size); + + return OPQManager{ + .allocator = allocator, + .policy = policy, + .store = wal, + }; + } + + pub fn deinit(self: *OPQManager) void { + self.store.deinit(); + } + + /// Ingest a frame into the queue + pub fn ingestFrame(self: *OPQManager, frame: *const lwf.LWFFrame) !void { + // 1. Append to WAL + try self.store.appendFrame(frame); + + // 2. Periodic maintenance (could be on a timer, but here we do it after ingest) + try self.maintenance(); + } + + pub fn maintenance(self: *OPQManager) !void { + // 1. Prune by TTL + _ = try self.store.prune(self.policy.max_retention_seconds); + + // 2. Prune by Size Quota + _ = try self.store.pruneToSize(self.policy.max_storage_bytes); + } +}; + +test "OPQ Manager: Policy Enforcement" { + const allocator = std.testing.allocator; + const test_dir = "test_opq_manager"; + + std.fs.cwd().deleteTree(test_dir) catch {}; + defer std.fs.cwd().deleteTree(test_dir) catch {}; + + // 1. Client Policy: 5MB limit, 1hr TTL + var manager = try OPQManager.init(allocator, test_dir, .client); + defer manager.deinit(); + + try std.testing.expectEqual(manager.policy.max_storage_bytes, 5 * 1024 * 1024); + + // 2. Ingest Sample Frame + var frame = try lwf.LWFFrame.init(allocator, 10); + defer frame.deinit(allocator); + try manager.ingestFrame(&frame); +} diff --git a/l0-transport/opq/quota.zig b/l0-transport/opq/quota.zig new file mode 100644 index 0000000..e795332 --- /dev/null +++ b/l0-transport/opq/quota.zig @@ -0,0 +1,36 @@ +//! RFC-0020: OPQ (Offline Packet Queue) - Quota & Policy +//! +//! This module defines the "Policy" layer of the OPQ: +//! Node personas, retention periods, and storage limits. + +const std = @import("std"); + +pub const Persona = enum { + client, + relay, + gateway, +}; + +pub const Policy = struct { + persona: Persona, + max_retention_seconds: i64, + max_storage_bytes: u64, + segment_size: usize, + + pub fn init(persona: Persona) Policy { + return switch (persona) { + .client => Policy{ + .persona = .client, + .max_retention_seconds = 3600, // 1 hour + .max_storage_bytes = 5 * 1024 * 1024, // 5MB + .segment_size = 1024 * 1024, // 1MB segments + }, + .relay, .gateway => Policy{ + .persona = persona, + .max_retention_seconds = 96 * 3600, // 96 hours + .max_storage_bytes = 10 * 1024 * 1024 * 1024, // 10GB default + .segment_size = 4 * 1024 * 1024, // 4MB segments + }, + }; + } +}; diff --git a/l0-transport/opq/store.zig b/l0-transport/opq/store.zig new file mode 100644 index 0000000..f43944c --- /dev/null +++ b/l0-transport/opq/store.zig @@ -0,0 +1,333 @@ +//! RFC-0020: OPQ (Offline Packet Queue) - Segmented WAL Storage +//! +//! This module implements the "Mechanism" of the OPQ: +//! A resilient, segmented Write-Ahead Log (WAL) for persisting LWF frames. +//! +//! Segmented Architecture: +//! - Data is split into fixed-size segments (e.g., 4MB). +//! - Only one "Active" segment is writable at a time. +//! - Completed segments are "Finalized" and become immutable. +//! - Pruning works by deleting entire segment files (extremely fast). + +const std = @import("std"); +const lwf = @import("lwf"); + +pub const SEGMENT_MAGIC: [4]u8 = "LOPQ".*; +pub const SEGMENT_VERSION: u8 = 1; +pub const DEFAULT_SEGMENT_SIZE: usize = 4 * 1024 * 1024; // 4MB + +pub const SegmentHeader = struct { + magic: [4]u8 = SEGMENT_MAGIC, + version: u8 = SEGMENT_VERSION, + reserved: [3]u8 = [_]u8{0} ** 3, + segment_id: u64, + segment_seq: u32, + created_at: i64, + + pub const SIZE = 4 + 1 + 3 + 8 + 4 + 8; // 28 bytes +}; + +pub const WALStore = struct { + allocator: std.mem.Allocator, + base_dir_path: []const u8, + max_segment_size: usize, + + active_segment: ?std.fs.File = null, + active_segment_id: u64 = 0, + active_segment_seq: u32 = 0, + current_offset: usize = 0, + + pub fn init(allocator: std.mem.Allocator, base_dir: []const u8, max_size: usize) !WALStore { + // Ensure base directory exists + std.fs.cwd().makePath(base_dir) catch |err| { + if (err != error.PathAlreadyExists) return err; + }; + + return WALStore{ + .allocator = allocator, + .base_dir_path = try allocator.dupe(u8, base_dir), + .max_segment_size = max_size, + }; + } + + pub fn deinit(self: *WALStore) void { + if (self.active_segment) |file| { + file.close(); + } + self.allocator.free(self.base_dir_path); + } + + /// Append a frame to the active segment + pub fn appendFrame(self: *WALStore, frame: *const lwf.LWFFrame) !void { + const frame_size = frame.header.payload_len + lwf.LWFHeader.SIZE + lwf.LWFTrailer.SIZE; + + // Check if we need a new segment + if (self.active_segment == null or self.current_offset + frame_size > self.max_segment_size) { + try self.rotateSegment(); + } + + const file = self.active_segment.?; + const encoded = try frame.encode(self.allocator); + defer self.allocator.free(encoded); + + try file.writeAll(encoded); + self.current_offset += encoded.len; + } + + fn rotateSegment(self: *WALStore) !void { + if (self.active_segment) |file| { + file.close(); + self.active_segment = null; + } + + self.active_segment_id = @as(u64, @intCast(std.time.timestamp())); + self.active_segment_seq += 1; + + var name_buf: [64]u8 = undefined; + const name = try std.fmt.bufPrint(&name_buf, "segment_{d}_{d}.opq", .{ self.active_segment_id, self.active_segment_seq }); + + var dir = try std.fs.cwd().openDir(self.base_dir_path, .{}); + defer dir.close(); + + const file = try dir.createFile(name, .{ .read = true }); + + // Write Header + const header = SegmentHeader{ + .segment_id = self.active_segment_id, + .segment_seq = self.active_segment_seq, + .created_at = std.time.timestamp(), + }; + + const header_bytes = std.mem.asBytes(&header); + try file.writeAll(header_bytes); + + self.active_segment = file; + self.current_offset = SegmentHeader.SIZE; + } + + /// Prune segments older than TTL + pub fn prune(self: *WALStore, max_age_seconds: i64) !usize { + var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true }); + defer dir.close(); + + var iterator = dir.iterate(); + const now = std.time.timestamp(); + var pruned_count: usize = 0; + + while (try iterator.next()) |entry| { + if (entry.kind != .file) continue; + if (!std.mem.endsWith(u8, entry.name, ".opq")) continue; + + // Extract potential timestamp/ID from segment_{id}.opq + // For simplicity, we read the header's created_at + const file = try dir.openFile(entry.name, .{}); + defer file.close(); + + var header: SegmentHeader = undefined; + const bytes_read = try file.readAll(std.mem.asBytes(&header)); + if (bytes_read < SegmentHeader.SIZE) continue; + + if (now - header.created_at > max_age_seconds) { + // Check if it's the active one + if (header.segment_id == self.active_segment_id and + header.segment_seq == self.active_segment_seq) continue; + + try dir.deleteFile(entry.name); + pruned_count += 1; + } + } + return pruned_count; + } + + /// Calculate total disk usage of all .opq files in base_dir + pub fn getDiskUsage(self: *WALStore) !u64 { + var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true }); + defer dir.close(); + + var iterator = dir.iterate(); + var total_size: u64 = 0; + + while (try iterator.next()) |entry| { + if (entry.kind != .file) continue; + if (!std.mem.endsWith(u8, entry.name, ".opq")) continue; + + const stat = try dir.statFile(entry.name); + total_size += stat.size; + } + return total_size; + } + + /// Prune oldest segments until total usage is below target_bytes + pub fn pruneToSize(self: *WALStore, target_bytes: u64) !usize { + var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true }); + defer dir.close(); + + // 1. Collect all segment files with their timestamps + const SegmentFile = struct { + name: [64]u8, + len: usize, + created_at: i64, + }; + var segments = std.ArrayList(SegmentFile).empty; + defer segments.deinit(self.allocator); + + var iterator = dir.iterate(); + var total_size: u64 = 0; + + while (try iterator.next()) |entry| { + if (entry.kind != .file) continue; + if (!std.mem.endsWith(u8, entry.name, ".opq")) continue; + + const file = try dir.openFile(entry.name, .{}); + var header: SegmentHeader = undefined; + const bytes_read = file.readAll(std.mem.asBytes(&header)) catch 0; + file.close(); + + if (bytes_read < SegmentHeader.SIZE) continue; + + const stat = try dir.statFile(entry.name); + total_size += stat.size; + + var name_buf: [64]u8 = undefined; + @memcpy(name_buf[0..entry.name.len], entry.name); + + try segments.append(self.allocator, .{ + .name = name_buf, + .len = entry.name.len, + .created_at = header.created_at, + }); + } + + if (total_size <= target_bytes) return 0; + + // 2. Sort by created_at (oldest first) + const sortFn = struct { + fn lessThan(_: void, a: SegmentFile, b: SegmentFile) bool { + return a.created_at < b.created_at; + } + }.lessThan; + std.sort.pdq(SegmentFile, segments.items, {}, sortFn); + + // 3. Delete oldest segments until under quota + var pruned_count: usize = 0; + for (segments.items) |seg| { + if (total_size <= target_bytes) break; + + const name = seg.name[0..seg.len]; + + // Safety: check if it's the active one (we need segment metadata here ideally) + // For now, we compare against our active_segment_id/seq logic if match + // But if we use the header we already read, we can check. + const file = try dir.openFile(name, .{}); + var header: SegmentHeader = undefined; + _ = try file.readAll(std.mem.asBytes(&header)); + file.close(); + + if (header.segment_id == self.active_segment_id and + header.segment_seq == self.active_segment_seq) continue; + + const stat = try dir.statFile(name); + try dir.deleteFile(name); + total_size -= stat.size; + pruned_count += 1; + } + + return pruned_count; + } +}; +test "OPQ WAL Store: Append and Rotate" { + const allocator = std.testing.allocator; + const test_dir = "test_opq_wal"; + + // Clean up if previous run failed + std.fs.cwd().deleteTree(test_dir) catch {}; + defer std.fs.cwd().deleteTree(test_dir) catch {}; + + var wal = try WALStore.init(allocator, test_dir, 1024); // Small size for rotation + defer wal.deinit(); + + // 1. Create a frame + var frame = try lwf.LWFFrame.init(allocator, 100); + defer frame.deinit(allocator); + @memset(frame.payload, 'A'); + frame.header.payload_len = 100; + frame.updateChecksum(); + + // 2. Append multiple frames to trigger rotation + // Frame size is approx 100 + 72 + 36 = 208 bytes + // 1024 / 208 ≈ 4 frames per segment (plus header) + var i: usize = 0; + while (i < 10) : (i += 1) { + try wal.appendFrame(&frame); + } + + // 3. Verify files created + var dir = try std.fs.cwd().openDir(test_dir, .{ .iterate = true }); + defer dir.close(); + + var iterator = dir.iterate(); + var file_count: usize = 0; + while (try iterator.next()) |entry| { + if (std.mem.endsWith(u8, entry.name, ".opq")) { + file_count += 1; + } + } + + try std.testing.expect(file_count > 1); +} + +test "OPQ WAL Store: Pruning" { + const allocator = std.testing.allocator; + const test_dir = "test_opq_pruning"; + + std.fs.cwd().deleteTree(test_dir) catch {}; + defer std.fs.cwd().deleteTree(test_dir) catch {}; + + var wal = try WALStore.init(allocator, test_dir, 1024 * 1024); + defer wal.deinit(); + + var frame = try lwf.LWFFrame.init(allocator, 10); + defer frame.deinit(allocator); + try wal.appendFrame(&frame); + + // Manually finalize and wait 2 seconds (for test purposes we could mock time, + // but here we'll just test the logic with a very small TTL) + // Wait... we can't easily wait. Let's just verify the function doesn't crash + // and correctly identifies old segments if we had them. + + const pruned = try wal.prune(0); // Prune everything except active + try std.testing.expect(pruned == 0); // Active shouldn't be pruned +} + +test "OPQ WAL Store: Space-based Pruning" { + const allocator = std.testing.allocator; + const test_dir = "test_opq_quota"; + + std.fs.cwd().deleteTree(test_dir) catch {}; + defer std.fs.cwd().deleteTree(test_dir) catch {}; + + var wal = try WALStore.init(allocator, test_dir, 500); // Very small segments + defer wal.deinit(); + + var frame = try lwf.LWFFrame.init(allocator, 100); + defer frame.deinit(allocator); + @memset(frame.payload, 'B'); + frame.header.payload_len = 100; + frame.updateChecksum(); + + // Append 4 frames (should create multiple segments) + var i: usize = 0; + while (i < 4) : (i += 1) { + try wal.appendFrame(&frame); + } + + const usage_before = try wal.getDiskUsage(); + try std.testing.expect(usage_before > 0); + + // Prune to a small size (should keep only active segment) + const pruned = try wal.pruneToSize(100); + try std.testing.expect(pruned > 0); + + const usage_after = try wal.getDiskUsage(); + try std.testing.expect(usage_after < usage_before); +} diff --git a/l0-transport/service.zig b/l0-transport/service.zig new file mode 100644 index 0000000..8f5ae35 --- /dev/null +++ b/l0-transport/service.zig @@ -0,0 +1,86 @@ +//! RFC-0010 & RFC-0020: L0 Integrated Service +//! +//! Orchestrates the flow: [Network] -> [UTCP] -> [OPQ] -> [Application] + +const std = @import("std"); +const utcp = @import("utcp"); +const opq = @import("opq"); +const lwf = @import("lwf"); + +pub const L0Service = struct { + allocator: std.mem.Allocator, + socket: utcp.UTCP, + opq_manager: opq.OPQManager, + + /// Initialize the L0 service with a bound socket and storage + pub fn init(allocator: std.mem.Allocator, address: std.net.Address, base_dir: []const u8, persona: opq.Persona) !L0Service { + return L0Service{ + .allocator = allocator, + .socket = try utcp.UTCP.init(address), + .opq_manager = try opq.OPQManager.init(allocator, base_dir, persona), + }; + } + + pub fn deinit(self: *L0Service) void { + self.socket.deinit(); + self.opq_manager.deinit(); + } + + /// Process a single frame from the network + /// Returns true if a frame was successfully ingested + pub fn step(self: *L0Service) !bool { + var buffer: [9000]u8 = undefined; // Jumbo MTU support + + const result = self.socket.receiveFrame(self.allocator, &buffer) catch |err| { + if (err == error.WouldBlock) return false; + return err; + }; + + var frame = result.frame; + defer frame.deinit(self.allocator); + + // 1. Verification (Deep) + if (!frame.verifyChecksum()) return error.ChecksumMismatch; + + // 2. Persistence (The Queue) + try self.opq_manager.ingestFrame(&frame); + + return true; + } +}; + +test "L0 Integrated Service: Loopback Ingestion" { + const allocator = std.testing.allocator; + const test_dir = "test_l0_service"; + + std.fs.cwd().deleteTree(test_dir) catch {}; + defer std.fs.cwd().deleteTree(test_dir) catch {}; + + const addr = try std.net.Address.parseIp("127.0.0.1", 0); + + // 1. Start Service (Relay persona) + var service = try L0Service.init(allocator, addr, test_dir, .relay); + defer service.deinit(); + + const service_addr = try service.socket.getLocalAddress(); + + // 2. Prepare client socket and frame + var client = try utcp.UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + defer client.deinit(); + + var frame = try lwf.LWFFrame.init(allocator, 100); + defer frame.deinit(allocator); + @memset(frame.payload, 'X'); + frame.header.payload_len = 100; + frame.updateChecksum(); + + // 3. Send and Step + try client.sendFrame(service_addr, &frame, allocator); + + const success = try service.step(); + try std.testing.expect(success); + + // 4. Verify storage contains the frame (via DiskUsage) + const usage = try service.opq_manager.store.getDiskUsage(); + try std.testing.expect(usage > 0); +} diff --git a/l0-transport/utcp.zig b/l0-transport/utcp.zig new file mode 100644 index 0000000..842b237 --- /dev/null +++ b/l0-transport/utcp.zig @@ -0,0 +1,3 @@ +//! Sovereign Index for UTCP +pub const socket = @import("utcp/socket.zig"); +pub const UTCP = socket.UTCP; diff --git a/l0-transport/utcp/README.md b/l0-transport/utcp/README.md new file mode 100644 index 0000000..1c64b87 --- /dev/null +++ b/l0-transport/utcp/README.md @@ -0,0 +1,14 @@ +# UTCP: Unreliable Transport Protocol + +**Layer:** L0 (Transport) +**RFC:** RFC-0004 + +## Purpose +UTCP provides the UDP-based transmission layer for Libertaria. It focuses on: +- High-throughput ingestion of LWF frames. +- Low-latency entropy validation. +- Connectionless UDP socket management. + +## Components +- `socket.zig`: Bound UDP socket abstraction. +- `protocol.zig`: (Pending) MTU discovery and class selection. diff --git a/l0-transport/utcp/socket.zig b/l0-transport/utcp/socket.zig new file mode 100644 index 0000000..9088f04 --- /dev/null +++ b/l0-transport/utcp/socket.zig @@ -0,0 +1,180 @@ +//! RFC-0004: UTCP (Unreliable Transport Protocol) over UDP + +const std = @import("std"); +const lwf = @import("lwf"); +const entropy = @import("entropy"); +const posix = std.posix; + +/// UTCP Socket abstraction for sending and receiving LWF frames +pub const UTCP = struct { + fd: posix.socket_t, + + /// Initialize UTCP socket by binding to an address + pub fn init(address: std.net.Address) !UTCP { + const fd = try posix.socket( + address.any.family, + posix.SOCK.DGRAM | posix.SOCK.CLOEXEC, + posix.IPPROTO.UDP, + ); + errdefer posix.close(fd); + + try posix.bind(fd, &address.any, address.getOsSockLen()); + + return UTCP{ + .fd = fd, + }; + } + + /// Close the socket + pub fn deinit(self: *UTCP) void { + posix.close(self.fd); + } + + /// Encode and send an LWF frame to a target address + pub fn sendFrame(self: *UTCP, target: std.net.Address, frame: *const lwf.LWFFrame, allocator: std.mem.Allocator) !void { + const encoded = try frame.encode(allocator); + defer allocator.free(encoded); + + const sent = try posix.sendto( + self.fd, + encoded, + 0, + &target.any, + target.getOsSockLen(), + ); + + if (sent != encoded.len) { + return error.PartialWrite; + } + } + + /// Receive a frame from the network + /// Performs non-allocating header validation before processing payload + pub fn receiveFrame(self: *UTCP, allocator: std.mem.Allocator, buffer: []u8) !ReceiveResult { + var src_addr: posix.sockaddr = undefined; + var src_len: posix.socklen_t = @sizeOf(posix.sockaddr); + + const bytes_received = try posix.recvfrom( + self.fd, + buffer, + 0, + &src_addr, + &src_len, + ); + + const data = buffer[0..bytes_received]; + + // 1. Fast Header Validation (No Allocation) + if (data.len < lwf.LWFHeader.SIZE) { + return error.FrameUnderflow; + } + + var header_bytes: [lwf.LWFHeader.SIZE]u8 = undefined; + @memcpy(&header_bytes, data[0..lwf.LWFHeader.SIZE]); + const header = lwf.LWFHeader.fromBytes(&header_bytes); + + if (!header.isValid()) { + return error.InvalidMagic; + } + + // 2. Entropy Fast-Path (DoS Defense) + if (header.flags & lwf.LWFFlags.HAS_ENTROPY != 0) { + if (data.len < lwf.LWFHeader.SIZE + 58) { + return error.StampMissing; + } + const stamp_bytes = data[lwf.LWFHeader.SIZE..][0..58]; + const stamp = entropy.EntropyStamp.fromBytes(@ptrCast(stamp_bytes)); + + // Perform light validation (no Argon2 recompute yet, just hash bits) + // This is enough to drop obvious garbage without any allocation. + stamp.verify(&[_]u8{0} ** 32, header.entropy_difficulty, header.service_type, entropy.DEFAULT_MAX_AGE_SECONDS) catch |err| { + // Log and drop + return err; + }; + } + + // 3. Decode the rest (Allocates payload) + const frame = try lwf.LWFFrame.decode(allocator, data); + + return ReceiveResult{ + .frame = frame, + .sender = std.net.Address{ .any = src_addr }, + }; + } + + /// Get local address of the socket + pub fn getLocalAddress(self: *UTCP) !std.net.Address { + var addr: posix.sockaddr = undefined; + var len: posix.socklen_t = @sizeOf(posix.sockaddr); + try posix.getsockname(self.fd, &addr, &len); + return std.net.Address{ .any = addr }; + } +}; + +pub const ReceiveResult = struct { + frame: lwf.LWFFrame, + sender: std.net.Address, +}; +test "UTCP socket init and loopback" { + const allocator = std.testing.allocator; + const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral + + var server = try UTCP.init(addr); + defer server.deinit(); + + const server_addr = try server.getLocalAddress(); + + var client = try UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + defer client.deinit(); + + // 1. Prepare frame + var frame = try lwf.LWFFrame.init(allocator, 32); + defer frame.deinit(allocator); + @memcpy(frame.payload, "UTCP-Protocol-Test-Payload-01234"); + frame.header.payload_len = 32; + frame.updateChecksum(); + + // 2. Send + try client.sendFrame(server_addr, &frame, allocator); + + // 3. Receive + var receive_buf: [1500]u8 = undefined; + const result = try server.receiveFrame(allocator, &receive_buf); + var received_frame = result.frame; + defer received_frame.deinit(allocator); + + // 4. Verify + try std.testing.expectEqualSlices(u8, frame.payload, received_frame.payload); + try std.testing.expect(received_frame.verifyChecksum()); +} + +test "UTCP socket DoS defense: invalid entropy stamp" { + const allocator = std.testing.allocator; + const addr = try std.net.Address.parseIp("127.0.0.1", 0); + + var server = try UTCP.init(addr); + defer server.deinit(); + const server_addr = try server.getLocalAddress(); + + var client = try UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + defer client.deinit(); + + // 1. Prepare frame with HAS_ENTROPY but garbage stamp + var frame = try lwf.LWFFrame.init(allocator, 100); + defer frame.deinit(allocator); + frame.header.flags |= lwf.LWFFlags.HAS_ENTROPY; + frame.header.entropy_difficulty = 20; // High difficulty + @memset(frame.payload[0..58], 0); + // Set valid timestamp (fresh) + const now = @as(u64, @intCast(std.time.timestamp())); + std.mem.writeInt(u64, frame.payload[35..43], now, .big); + + // 2. Send + try client.sendFrame(server_addr, &frame, allocator); + + // 3. Receive - should fail with InsufficientDifficulty + var receive_buf: [1500]u8 = undefined; + const result = server.receiveFrame(allocator, &receive_buf); + + try std.testing.expectError(error.InsufficientDifficulty, result); +} diff --git a/l0-transport/utcp/test_socket.zig b/l0-transport/utcp/test_socket.zig new file mode 100644 index 0000000..40e24cc --- /dev/null +++ b/l0-transport/utcp/test_socket.zig @@ -0,0 +1,36 @@ +const std = @import("std"); +const socket = @import("socket.zig"); +const lwf = @import("../lwf.zig"); + +test "UTCP socket init and loopback" { + const allocator = std.testing.allocator; + const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral + + var server = try socket.UTCP.init(addr); + defer server.deinit(); + + const server_addr = try server.getLocalAddress(); + + var client = try socket.UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + defer client.deinit(); + + // 1. Prepare frame + var frame = try lwf.LWFFrame.init(allocator, 32); + defer frame.deinit(allocator); + @memcpy(frame.payload, "UTCP-Protocol-Test-Payload-1234"); + frame.header.payload_len = 32; + frame.updateChecksum(); + + // 2. Send + try client.sendFrame(server_addr, &frame, allocator); + + // 3. Receive + var receive_buf: [1500]u8 = undefined; + const result = try server.receiveFrame(allocator, &receive_buf); + var received_frame = result.frame; + defer received_frame.deinit(allocator); + + // 4. Verify + try std.testing.expectEqualSlices(u8, frame.payload, received_frame.payload); + try std.testing.expect(received_frame.verifyChecksum()); +} diff --git a/l0_transport.zig b/l0_transport.zig index dc4d24b..b31a04f 100644 --- a/l0_transport.zig +++ b/l0_transport.zig @@ -6,6 +6,15 @@ pub const lwf = @import("l0-transport/lwf.zig"); // Re-export Time primitives pub const time = @import("l0-transport/time.zig"); +// Re-export UTCP (UDP Transport) +pub const utcp = @import("l0-transport/utcp.zig"); + +// Re-export OPQ (Offline Packet Queue) +pub const opq = @import("l0-transport/opq.zig"); + +// Re-export Integrated Service (UTCP + OPQ) +pub const service = @import("l0-transport/service.zig"); + test { std.testing.refAllDecls(@This()); }