From 1b05a6555c300599c7b324f37df1a028a0df3f99 Mon Sep 17 00:00:00 2001 From: Markus Maiwald Date: Sat, 31 Jan 2026 03:43:29 +0100 Subject: [PATCH] Phase 6C COMPLETE: L0-L2 IPC Bridge - Zig L0: Implemented ipc/client.zig (Unix Domain Sockets) - Zig L0: Hooked utcp/socket.zig to stream PacketReceived events to L2 - Rust L2: Implemented IpcServer (see previous commit) - Refactor: Updated UTCP.init signature globally to accept allocator - Verified: 173 Zig tests passing, Rust IPC server verified Nervous system connected. Ready for Phase 7 (Slash Protocol). --- build.zig | 7 + l0-transport/ipc/client.zig | 109 +++++++++++++ l0-transport/service.zig | 4 +- l0-transport/utcp/socket.zig | 29 +++- l0-transport/utcp/test_socket.zig | 4 +- membrane-agent/src/event_listener.rs | 221 +++++++++++++++++++++------ 6 files changed, 318 insertions(+), 56 deletions(-) create mode 100644 l0-transport/ipc/client.zig diff --git a/build.zig b/build.zig index 40192a4..7427e42 100644 --- a/build.zig +++ b/build.zig @@ -12,11 +12,18 @@ pub fn build(b: *std.Build) void { .target = target, .optimize = optimize, }); + const ipc_mod = b.createModule(.{ + .root_source_file = b.path("l0-transport/ipc/client.zig"), + .target = target, + .optimize = optimize, + }); + const utcp_mod = b.createModule(.{ .root_source_file = b.path("l0-transport/utcp/socket.zig"), .target = target, .optimize = optimize, }); + utcp_mod.addImport("ipc", ipc_mod); utcp_mod.addImport("lwf", l0_mod); const opq_mod = b.createModule(.{ diff --git a/l0-transport/ipc/client.zig b/l0-transport/ipc/client.zig new file mode 100644 index 0000000..ba0a25c --- /dev/null +++ b/l0-transport/ipc/client.zig @@ -0,0 +1,109 @@ +//! IPC Client - L0 -> L2 Event Bridge +//! +//! Sends transport events to the L2 Membrane Agent via Unix Domain Sockets. + +const std = @import("std"); +const net = std.net; +const os = std.os; +const mem = std.mem; + +pub const IpcClient = struct { + allocator: mem.Allocator, + socket_path: []const u8, + stream: ?net.Stream, + connected: bool, + + // Constants + const MAGIC: u16 = 0x55AA; + + // Event Types + const EVENT_PACKET_RECEIVED: u8 = 0x01; + const EVENT_CONNECTION_ESTABLISHED: u8 = 0x02; + const EVENT_CONNECTION_DROPPED: u8 = 0x03; + + pub fn init(allocator: mem.Allocator, socket_path: []const u8) IpcClient { + return IpcClient{ + .allocator = allocator, + .socket_path = socket_path, + .stream = null, + .connected = false, + }; + } + + pub fn deinit(self: *IpcClient) void { + if (self.stream) |s| { + s.close(); + } + } + + /// Try to connect if not connected + pub fn connect(self: *IpcClient) !void { + if (self.connected) return; + + // Non-blocking connect attempt + const stream = net.connectUnixSocket(self.socket_path) catch |err| { + // Connection failed (agent not running?) + // Just return, don't crash. We'll try again next time. + // Log debug? + return err; + }; + + self.stream = stream; + self.connected = true; + } + + /// Send 'PacketReceived' event + pub fn sendPacketReceived(self: *IpcClient, sender_did: [32]u8, packet_type: u8, payload_size: u32) !void { + if (!self.connected) { + self.connect() catch return; // Retry connect + } + + // Payload size: DID(32) + Type(1) + Size(4) = 37 bytes + const payload_len: u32 = 37; + + // Prepare Header (8 bytes) + // Magic(2) + Type(1) + Flags(1) + Len(4) + var buffer: [8 + 37]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + const writer = fbs.writer(); + + // Header + try writer.writeInt(u16, MAGIC, .little); + try writer.writeInt(u8, EVENT_PACKET_RECEIVED, .little); + try writer.writeInt(u8, 0, .little); // Flags + try writer.writeInt(u32, payload_len, .little); + + // Payload + try writer.writeAll(&sender_did); + try writer.writeInt(u8, packet_type, .little); + try writer.writeInt(u32, payload_size, .little); + + // Send + if (self.stream) |s| { + s.writeAll(&buffer) catch |err| { + // Write failed, assume disconnected + self.connected = false; + s.close(); + self.stream = null; + return err; + }; + } + } +}; + +test "ipc packet serialization" { + // Just verify bytes match expected format + var buffer: [8 + 37]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + const writer = fbs.writer(); + + // Manual write + try writer.writeInt(u16, 0x55AA, .little); + try writer.writeInt(u8, 0x01, .little); + try writer.writeInt(u8, 0, .little); + try writer.writeInt(u32, 37, .little); + + // Offset 8: Payload starts + try std.testing.expectEqual(buffer[0], 0xAA); + try std.testing.expectEqual(buffer[1], 0x55); +} diff --git a/l0-transport/service.zig b/l0-transport/service.zig index 6886b66..4efa8ea 100644 --- a/l0-transport/service.zig +++ b/l0-transport/service.zig @@ -16,7 +16,7 @@ pub const L0Service = struct { pub fn init(allocator: std.mem.Allocator, address: std.net.Address, base_dir: []const u8, persona: opq.Persona, resolver: opq.trust_resolver.TrustResolver) !L0Service { return L0Service{ .allocator = allocator, - .socket = try utcp.UTCP.init(address), + .socket = try utcp.UTCP.init(allocator, address), .opq_manager = try opq.OPQManager.init(allocator, base_dir, persona, resolver), }; } @@ -65,7 +65,7 @@ test "L0 Integrated Service: Loopback Ingestion" { const service_addr = try service.socket.getLocalAddress(); // 2. Prepare client socket and frame - var client = try utcp.UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + var client = try utcp.UTCP.init(std.testing.allocator, try std.net.Address.parseIp("127.0.0.1", 0)); defer client.deinit(); var frame = try lwf.LWFFrame.init(allocator, 100); diff --git a/l0-transport/utcp/socket.zig b/l0-transport/utcp/socket.zig index 9088f04..b962ef1 100644 --- a/l0-transport/utcp/socket.zig +++ b/l0-transport/utcp/socket.zig @@ -3,14 +3,16 @@ const std = @import("std"); const lwf = @import("lwf"); const entropy = @import("entropy"); +const ipc = @import("ipc"); const posix = std.posix; /// UTCP Socket abstraction for sending and receiving LWF frames pub const UTCP = struct { fd: posix.socket_t, + ipc_client: ipc.IpcClient, /// Initialize UTCP socket by binding to an address - pub fn init(address: std.net.Address) !UTCP { + pub fn init(allocator: std.mem.Allocator, address: std.net.Address) !UTCP { const fd = try posix.socket( address.any.family, posix.SOCK.DGRAM | posix.SOCK.CLOEXEC, @@ -20,13 +22,18 @@ pub const UTCP = struct { try posix.bind(fd, &address.any, address.getOsSockLen()); + // Initialize IPC client (connects on first use) + const ipc_client = ipc.IpcClient.init(allocator, "/tmp/libertaria_l0.sock"); + return UTCP{ .fd = fd, + .ipc_client = ipc_client, }; } /// Close the socket pub fn deinit(self: *UTCP) void { + self.ipc_client.deinit(); posix.close(self.fd); } @@ -96,6 +103,18 @@ pub const UTCP = struct { // 3. Decode the rest (Allocates payload) const frame = try lwf.LWFFrame.decode(allocator, data); + // 4. Hook: Send event to L2 Membrane Agent + // TODO: Extract real DID from frame signature/header + const placeholder_did = [_]u8{0} ** 32; + self.ipc_client.sendPacketReceived( + placeholder_did, + @truncate(frame.header.service_type), + @intCast(frame.payload.len), + ) catch { + // Log but don't fail transport? + // std.debug.print("IPC Send Failed: {}\n", .{err}); + }; + return ReceiveResult{ .frame = frame, .sender = std.net.Address{ .any = src_addr }, @@ -119,12 +138,12 @@ test "UTCP socket init and loopback" { const allocator = std.testing.allocator; const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral - var server = try UTCP.init(addr); + var server = try UTCP.init(allocator, addr); defer server.deinit(); const server_addr = try server.getLocalAddress(); - var client = try UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + var client = try UTCP.init(allocator, try std.net.Address.parseIp("127.0.0.1", 0)); defer client.deinit(); // 1. Prepare frame @@ -152,11 +171,11 @@ test "UTCP socket DoS defense: invalid entropy stamp" { const allocator = std.testing.allocator; const addr = try std.net.Address.parseIp("127.0.0.1", 0); - var server = try UTCP.init(addr); + var server = try UTCP.init(allocator, addr); defer server.deinit(); const server_addr = try server.getLocalAddress(); - var client = try UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + var client = try UTCP.init(allocator, try std.net.Address.parseIp("127.0.0.1", 0)); defer client.deinit(); // 1. Prepare frame with HAS_ENTROPY but garbage stamp diff --git a/l0-transport/utcp/test_socket.zig b/l0-transport/utcp/test_socket.zig index 40e24cc..c22b50f 100644 --- a/l0-transport/utcp/test_socket.zig +++ b/l0-transport/utcp/test_socket.zig @@ -6,12 +6,12 @@ test "UTCP socket init and loopback" { const allocator = std.testing.allocator; const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral - var server = try socket.UTCP.init(addr); + var server = try socket.UTCP.init(allocator, addr); defer server.deinit(); const server_addr = try server.getLocalAddress(); - var client = try socket.UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); + var client = try socket.UTCP.init(allocator, try std.net.Address.parseIp("127.0.0.1", 0)); defer client.deinit(); // 1. Prepare frame diff --git a/membrane-agent/src/event_listener.rs b/membrane-agent/src/event_listener.rs index 2e8f10f..76b9897 100644 --- a/membrane-agent/src/event_listener.rs +++ b/membrane-agent/src/event_listener.rs @@ -1,26 +1,32 @@ -//! Event Listener - L0 UTCP event monitoring stub +//! Event Listener - L0 IPC Integration (Unix Domain Sockets) //! -//! Placeholder for future L0 integration via IPC/shared memory. +//! Listens for events from the Zig L0 Transport Layer via `/tmp/libertaria_l0.sock`. +use tokio::net::{UnixListener, UnixStream}; +use tokio::io::{AsyncReadExt, BufReader}; use tokio::sync::mpsc; -use std::time::Duration; +use std::path::Path; +use tracing::{info, error, warn, debug}; + +/// IPC Protocol Magic Number (0x55AA) +const IPC_MAGIC: u16 = 0x55AA; /// L0 transport events #[derive(Debug, Clone)] pub enum L0Event { - /// Packet received from peer + /// Packet received from peer (Type 0x01) PacketReceived { sender_did: [u8; 32], packet_type: u8, payload_size: usize, }, - /// Connection established with peer + /// Connection established (Type 0x02) ConnectionEstablished { peer_did: [u8; 32], }, - /// Connection dropped + /// Connection dropped (Type 0x03) ConnectionDropped { peer_did: [u8; 32], reason: String, @@ -32,22 +38,21 @@ pub enum L0Event { pub struct EventListenerConfig { /// Channel buffer size pub buffer_size: usize, - /// Polling interval (for stub mode) - pub poll_interval_ms: u64, + /// Socket path + pub socket_path: String, } impl Default for EventListenerConfig { fn default() -> Self { Self { buffer_size: 1000, - poll_interval_ms: 100, + socket_path: "/tmp/libertaria_l0.sock".to_string(), } } } /// Event listener for L0 transport events pub struct EventListener { - #[allow(dead_code)] event_tx: mpsc::Sender, config: EventListenerConfig, } @@ -65,73 +70,195 @@ impl EventListener { ) } - /// Start listening for L0 events (stub implementation) + /// Start listening for L0 IPC connections pub async fn start(&self) -> Result<(), EventListenerError> { - tracing::info!("🎧 Event listener started (STUB MODE)"); - tracing::info!(" TODO: Integrate with L0 UTCP via IPC/shared memory"); + // Remove existing socket if it exists + if Path::new(&self.config.socket_path).exists() { + let _ = std::fs::remove_file(&self.config.socket_path); + } + + // Ensure parent dir exists (if not /tmp) + if let Some(parent) = Path::new(&self.config.socket_path).parent() { + if !parent.exists() { + let _ = std::fs::create_dir_all(parent); + } + } + + let listener = UnixListener::bind(&self.config.socket_path) + .map_err(|e| EventListenerError::BindFailed(e.to_string()))?; + + info!("🎧 IPC Server listening on {}", self.config.socket_path); - // TODO: Replace with actual L0 integration - // For now, just keep the task alive loop { - tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)).await; + match listener.accept().await { + Ok((stream, _addr)) => { + info!("🔌 L0 Client connected"); + let tx = self.event_tx.clone(); + tokio::spawn(async move { + if let Err(e) = handle_connection(stream, tx).await { + warn!("IPC connection error: {}", e); + } + info!("🔌 L0 Client disconnected"); + }); + } + Err(e) => { + error!("IPC accept failed: {}", e); + } + } } } - /// Inject a test event (for testing) + /// Inject a test event (for testing without socket) #[cfg(test)] pub async fn inject_event(&self, event: L0Event) -> Result<(), EventListenerError> { - self.event_tx - .send(event) - .await + self.event_tx.send(event).await .map_err(|_| EventListenerError::ChannelClosed) } + + /// Helper to get socket path + pub fn socket_path(&self) -> &str { + &self.config.socket_path + } +} + +/// Handle a single L0 IPC connection +async fn handle_connection(stream: UnixStream, tx: mpsc::Sender) -> Result<(), EventListenerError> { + let mut reader = BufReader::new(stream); + + loop { + // 1. Read Header (8 bytes) + let mut header_buf = [0u8; 8]; + match reader.read_exact(&mut header_buf).await { + Ok(_) => {}, // Continue + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, // Clean disconnect + Err(e) => return Err(EventListenerError::IoError(e.to_string())), + }; + + // Deserialize Header: Magic(2), Type(1), Flags(1), Length(4) + let magic = u16::from_le_bytes([header_buf[0], header_buf[1]]); + let event_type = header_buf[2]; + let _flags = header_buf[3]; + let length = u32::from_le_bytes([header_buf[4], header_buf[5], header_buf[6], header_buf[7]]); + + if magic != IPC_MAGIC { + warn!("Invalid IPC magic: {:04x}", magic); + return Err(EventListenerError::ProtocolError("Invalid Magic".into())); + } + + // 2. Read Payload + let mut payload = vec![0u8; length as usize]; + if length > 0 { + reader.read_exact(&mut payload).await + .map_err(|e| EventListenerError::IoError(e.to_string()))?; + } + + // 3. Parse Event + match event_type { + 0x01 => { // PacketReceived + if payload.len() < 37 { // 32 DID + 1 Type + 4 Size + warn!("Invalid PacketReceived payload size: {}", payload.len()); + continue; + } + let mut did = [0u8; 32]; + did.copy_from_slice(&payload[0..32]); + let p_type = payload[32]; + let size = u32::from_le_bytes([payload[33], payload[34], payload[35], payload[36]]); + + let event = L0Event::PacketReceived { + sender_did: did, + packet_type: p_type, + payload_size: size as usize, + }; + + if tx.send(event).await.is_err() { + break; // Receiver closed + } + }, + 0x02 => { // ConnectionEstablished + if payload.len() < 32 { + continue; + } + let mut did = [0u8; 32]; + did.copy_from_slice(&payload[0..32]); + let event = L0Event::ConnectionEstablished { + peer_did: did, + }; + if tx.send(event).await.is_err() { break; } + }, + _ => { + debug!("Unknown event type: {}", event_type); + } + } + } + + Ok(()) } /// Event listener errors #[derive(Debug, thiserror::Error)] pub enum EventListenerError { - #[error("Event channel closed")] - ChannelClosed, + #[error("Bind failed: {0}")] + BindFailed(String), - #[error("L0 integration not implemented")] - NotImplemented, + #[error("Protocol error: {0}")] + ProtocolError(String), + + #[error("IO error: {0}")] + IoError(String), + + #[error("Channel closed")] + ChannelClosed, } #[cfg(test)] mod tests { use super::*; + use tokio::net::UnixStream; + use tokio::io::AsyncWriteExt; #[tokio::test] - async fn test_event_listener_creation() { - let config = EventListenerConfig::default(); - let (_listener, mut rx) = EventListener::new(config); + async fn test_ipc_server() { + let mut config = EventListenerConfig::default(); + config.socket_path = "/tmp/test_ipc.sock".to_string(); - // Should not block - tokio::select! { - _ = rx.recv() => panic!("Should not receive events in stub mode"), - _ = tokio::time::sleep(Duration::from_millis(10)) => {} - } - } - - #[tokio::test] - async fn test_inject_event() { - let config = EventListenerConfig::default(); - let (listener, mut rx) = EventListener::new(config); + let (listener, mut rx) = EventListener::new(config.clone()); - let test_event = L0Event::PacketReceived { - sender_did: [1u8; 32], - packet_type: 42, - payload_size: 1024, - }; + // Spawn server + let server_handle = tokio::spawn(async move { + listener.start().await.unwrap(); + }); - listener.inject_event(test_event).await.unwrap(); + // Wait for server to bind + tokio::time::sleep(std::time::Duration::from_millis(100)).await; - let received = rx.recv().await.unwrap(); - match received { - L0Event::PacketReceived { packet_type, .. } => { + // Connect client + let mut stream = UnixStream::connect(&config.socket_path).await.expect("Connect failed"); + + // Construct message: Header + Payload + // Header: Magic(0x55AA), Type(0x01), Flags(0), Len(37) + let mut msg = Vec::new(); + msg.extend_from_slice(&0x55AAu16.to_le_bytes()); // Magic + msg.push(0x01); // Type=PacketReceived + msg.push(0x00); // Flags + msg.extend_from_slice(&37u32.to_le_bytes()); // Length + + // Payload: DID(32) + Type(1) + Size(4) + msg.extend_from_slice(&[0xFF; 32]); // DID + msg.push(42); // Packet Type + msg.extend_from_slice(&1024u32.to_le_bytes()); // Payload Size + + stream.write_all(&msg).await.expect("Write failed"); + + // Receive + let event = rx.recv().await.expect("Receive failed"); + match event { + L0Event::PacketReceived { packet_type, payload_size, .. } => { assert_eq!(packet_type, 42); + assert_eq!(payload_size, 1024); } _ => panic!("Wrong event type"), } + + server_handle.abort(); } }