276 lines
9.0 KiB
Zig
276 lines
9.0 KiB
Zig
//! RFC-0120 Extension: Aleph-Style Gossip
|
|
//!
|
|
//! Probabilistic flooding for trust signal propagation.
|
|
//! Handles intermittent connectivity (Kenya Rule) via:
|
|
//! - Erasure-tolerant message references
|
|
//! - Coverage tracking for partition detection
|
|
//! - Entropy-stamped messages for replay protection
|
|
//!
|
|
//! Design: Each gossip message references k random prior messages,
|
|
//! creating a DAG structure resilient to packet loss.
|
|
|
|
const std = @import("std");
|
|
const types = @import("types.zig");
|
|
|
|
const NodeId = types.NodeId;
|
|
const RiskGraph = types.RiskGraph;
|
|
|
|
/// Gossip message with DAG references.
|
|
pub const GossipMessage = struct {
|
|
/// Unique message ID (hash of content + entropy)
|
|
id: u64,
|
|
/// Sender node index
|
|
sender: NodeId,
|
|
/// References to prior messages (DAG structure)
|
|
refs: []const u64,
|
|
/// Payload type
|
|
msg_type: MessageType,
|
|
/// Entropy stamp for temporal ordering (RFC-0100)
|
|
entropy_stamp: u64,
|
|
/// Message payload
|
|
payload: []const u8,
|
|
|
|
pub const MessageType = enum(u8) {
|
|
trust_vouch = 0, // New trust edge
|
|
trust_revoke = 1, // Edge removal
|
|
reputation_update = 2, // Score change
|
|
heartbeat = 3, // Liveness check
|
|
};
|
|
|
|
/// Compute message ID from content.
|
|
pub fn computeId(sender: NodeId, entropy_stamp: u64, payload: []const u8) u64 {
|
|
var hasher = std.hash.Wyhash.init(0);
|
|
hasher.update(std.mem.asBytes(&sender));
|
|
hasher.update(std.mem.asBytes(&entropy_stamp));
|
|
hasher.update(payload);
|
|
return hasher.final();
|
|
}
|
|
};
|
|
|
|
/// Gossip state tracker for a node.
|
|
pub const GossipState = struct {
|
|
allocator: std.mem.Allocator,
|
|
/// Recent message IDs (for reference sampling)
|
|
recent_messages: std.ArrayListUnmanaged(u64),
|
|
/// Seen message IDs (for deduplication)
|
|
seen_messages: std.AutoHashMapUnmanaged(u64, void),
|
|
/// Coverage tracking: which nodes have we heard from recently
|
|
heard_from: std.AutoHashMapUnmanaged(NodeId, u64), // node -> last_entropy_stamp
|
|
/// Configuration
|
|
config: Config,
|
|
|
|
pub const Config = struct {
|
|
/// Number of prior messages to reference
|
|
ref_k: usize = 3,
|
|
/// Maximum recent messages to track
|
|
max_recent: usize = 100,
|
|
/// Probability of forwarding (1.0 - drop_prob)
|
|
forward_prob: f64 = 0.7,
|
|
/// Coverage window (entropy stamp delta)
|
|
coverage_window: u64 = 60_000_000_000, // 60 seconds in nanoseconds
|
|
};
|
|
|
|
pub fn init(allocator: std.mem.Allocator, config: Config) GossipState {
|
|
return .{
|
|
.allocator = allocator,
|
|
.recent_messages = .{},
|
|
.seen_messages = .{},
|
|
.heard_from = .{},
|
|
.config = config,
|
|
};
|
|
}
|
|
|
|
pub fn deinit(self: *GossipState) void {
|
|
self.recent_messages.deinit(self.allocator);
|
|
self.seen_messages.deinit(self.allocator);
|
|
self.heard_from.deinit(self.allocator);
|
|
}
|
|
|
|
/// Check if message is new (not seen before).
|
|
pub fn isNewMessage(self: *GossipState, msg_id: u64) !bool {
|
|
if (self.seen_messages.get(msg_id)) |_| {
|
|
return false;
|
|
}
|
|
try self.seen_messages.put(self.allocator, msg_id, {});
|
|
return true;
|
|
}
|
|
|
|
/// Record a message as seen.
|
|
pub fn recordMessage(self: *GossipState, msg: *const GossipMessage) !void {
|
|
// Add to seen set
|
|
try self.seen_messages.put(self.allocator, msg.id, {});
|
|
|
|
// Add to recent messages (for future refs)
|
|
if (self.recent_messages.items.len >= self.config.max_recent) {
|
|
_ = self.recent_messages.orderedRemove(0);
|
|
}
|
|
try self.recent_messages.append(self.allocator, msg.id);
|
|
|
|
// Update heard_from
|
|
try self.heard_from.put(self.allocator, msg.sender, msg.entropy_stamp);
|
|
}
|
|
|
|
/// Sample k random references from recent messages.
|
|
pub fn sampleRefs(self: *GossipState, rand: std.Random, allocator: std.mem.Allocator) ![]u64 {
|
|
const k = @min(self.config.ref_k, self.recent_messages.items.len);
|
|
if (k == 0) return &[_]u64{};
|
|
|
|
var refs = try allocator.alloc(u64, k);
|
|
var selected = std.AutoHashMapUnmanaged(usize, void){};
|
|
defer selected.deinit(allocator);
|
|
|
|
var i: usize = 0;
|
|
while (i < k) {
|
|
const idx = rand.intRangeLessThan(usize, 0, self.recent_messages.items.len);
|
|
if (selected.get(idx)) |_| continue;
|
|
try selected.put(allocator, idx, {});
|
|
refs[i] = self.recent_messages.items[idx];
|
|
i += 1;
|
|
}
|
|
|
|
return refs;
|
|
}
|
|
|
|
/// Compute coverage ratio: fraction of nodes heard from recently.
|
|
pub fn computeCoverage(self: *const GossipState, total_nodes: usize, current_entropy: u64) f64 {
|
|
if (total_nodes == 0) return 1.0;
|
|
|
|
var active_count: usize = 0;
|
|
var it = self.heard_from.iterator();
|
|
while (it.next()) |entry| {
|
|
const last_stamp = entry.value_ptr.*;
|
|
if (current_entropy - last_stamp <= self.config.coverage_window) {
|
|
active_count += 1;
|
|
}
|
|
}
|
|
|
|
return @as(f64, @floatFromInt(active_count)) / @as(f64, @floatFromInt(total_nodes));
|
|
}
|
|
};
|
|
|
|
/// Gossip result after flooding.
|
|
pub const FloodResult = struct {
|
|
/// Number of neighbors that received the message
|
|
sent_count: usize,
|
|
/// Total neighbors attempted
|
|
total_neighbors: usize,
|
|
/// Coverage after flood
|
|
coverage: f64,
|
|
};
|
|
|
|
/// Probabilistic flood of a gossip message to neighbors.
|
|
pub fn floodMessage(
|
|
graph: *const RiskGraph,
|
|
sender: NodeId,
|
|
message: *const GossipMessage,
|
|
state: *GossipState,
|
|
rand: std.Random,
|
|
// In real impl, this would be a transport callback
|
|
// send_fn: *const fn(NodeId, []const u8) void,
|
|
) FloodResult {
|
|
var sent_count: usize = 0;
|
|
const neighbors = graph.neighbors(sender);
|
|
|
|
for (neighbors) |edge_idx| {
|
|
// In real impl: extract neighbor ID and send
|
|
_ = edge_idx; // Will be used when UTCP transport is integrated
|
|
|
|
// Probabilistic drop (simulates lossy network)
|
|
if (rand.float(f64) <= state.config.forward_prob) {
|
|
// In real impl: send_fn(neighbor, serialize(message));
|
|
// TODO: Integrate with UTCP transport layer
|
|
sent_count += 1;
|
|
}
|
|
}
|
|
|
|
const coverage = state.computeCoverage(graph.nodeCount(), message.entropy_stamp);
|
|
|
|
return FloodResult{
|
|
.sent_count = sent_count,
|
|
.total_neighbors = neighbors.len,
|
|
.coverage = coverage,
|
|
};
|
|
}
|
|
|
|
/// Create a new gossip message.
|
|
pub fn createMessage(
|
|
sender: NodeId,
|
|
msg_type: GossipMessage.MessageType,
|
|
payload: []const u8,
|
|
entropy_stamp: u64,
|
|
state: *GossipState,
|
|
rand: std.Random,
|
|
allocator: std.mem.Allocator,
|
|
) !GossipMessage {
|
|
const refs = try state.sampleRefs(rand, allocator);
|
|
const id = GossipMessage.computeId(sender, entropy_stamp, payload);
|
|
|
|
return GossipMessage{
|
|
.id = id,
|
|
.sender = sender,
|
|
.refs = refs,
|
|
.msg_type = msg_type,
|
|
.entropy_stamp = entropy_stamp,
|
|
.payload = payload,
|
|
};
|
|
}
|
|
|
|
// ============================================================================
|
|
// TESTS
|
|
// ============================================================================
|
|
|
|
test "GossipState: message deduplication" {
|
|
const allocator = std.testing.allocator;
|
|
var state = GossipState.init(allocator, .{});
|
|
defer state.deinit();
|
|
|
|
const msg_id: u64 = 12345;
|
|
|
|
// First time: new
|
|
try std.testing.expect(try state.isNewMessage(msg_id));
|
|
// Second time: duplicate
|
|
try std.testing.expect(!(try state.isNewMessage(msg_id)));
|
|
}
|
|
|
|
test "GossipState: coverage tracking" {
|
|
const allocator = std.testing.allocator;
|
|
var state = GossipState.init(allocator, .{ .coverage_window = 1000 });
|
|
defer state.deinit();
|
|
|
|
const now: u64 = 5000;
|
|
|
|
// Record messages from 2 nodes
|
|
try state.heard_from.put(allocator, 0, now - 500); // Recent
|
|
try state.heard_from.put(allocator, 1, now - 2000); // Stale
|
|
|
|
const coverage = state.computeCoverage(3, now);
|
|
// 1 out of 3 nodes heard from recently
|
|
try std.testing.expectApproxEqAbs(coverage, 0.333, 0.01);
|
|
}
|
|
|
|
test "GossipState: reference sampling" {
|
|
const allocator = std.testing.allocator;
|
|
var state = GossipState.init(allocator, .{ .ref_k = 2 });
|
|
defer state.deinit();
|
|
|
|
// Add some recent messages
|
|
try state.recent_messages.append(allocator, 100);
|
|
try state.recent_messages.append(allocator, 200);
|
|
try state.recent_messages.append(allocator, 300);
|
|
|
|
var prng = std.Random.DefaultPrng.init(42);
|
|
const refs = try state.sampleRefs(prng.random(), allocator);
|
|
defer allocator.free(refs);
|
|
|
|
try std.testing.expectEqual(refs.len, 2);
|
|
}
|
|
|
|
test "GossipMessage: ID computation" {
|
|
const id1 = GossipMessage.computeId(0, 1000, "hello");
|
|
const id2 = GossipMessage.computeId(0, 1000, "hello");
|
|
const id3 = GossipMessage.computeId(0, 1001, "hello");
|
|
|
|
try std.testing.expectEqual(id1, id2); // Same input, same ID
|
|
try std.testing.expect(id1 != id3); // Different entropy, different ID
|
|
}
|