993 lines
34 KiB
Nim
993 lines
34 KiB
Nim
## nimpak/remote/sync_engine.nim
|
|
## Synchronization engine with bloom filters for NimPak
|
|
##
|
|
## This module implements Task 15.1d:
|
|
## - Incremental sync using event log from Task 11 integrity monitor
|
|
## - Bloom filter handshake for O(changes) synchronization
|
|
## - Delta object creation and application for bandwidth optimization
|
|
## - Mirror network support with load balancing and failover
|
|
## - Bandwidth management and compression
|
|
|
|
import std/[os, times, json, tables, sequtils, strutils, strformat, asyncdispatch,
|
|
algorithm, hashes, sets, math, random, httpclient, options]
|
|
import ../security/[event_logger, integrity_monitor]
|
|
import ../cas
|
|
import ../types_fixed
|
|
import manager
|
|
|
|
type
|
|
BloomFilter* = object
|
|
bits*: seq[bool]
|
|
size*: int
|
|
hashFunctions*: int
|
|
expectedElements*: int
|
|
falsePositiveRate*: float
|
|
|
|
SyncEventType* = enum
|
|
SyncPackageAdded = "package_added"
|
|
SyncPackageRemoved = "package_removed"
|
|
SyncPackageUpdated = "package_updated"
|
|
SyncManifestChanged = "manifest_changed"
|
|
SyncKeyRevoked = "key_revoked"
|
|
SyncKeyRolledOver = "key_rolled_over"
|
|
|
|
SyncEvent* = object
|
|
id*: string
|
|
timestamp*: times.DateTime
|
|
eventType*: SyncEventType
|
|
objectHash*: string # CAS hash of affected object
|
|
metadata*: JsonNode
|
|
sequenceNumber*: int64
|
|
|
|
DeltaObject* = object
|
|
objectHash*: string
|
|
deltaType*: string # "add", "remove", "modify"
|
|
compressedData*: seq[byte]
|
|
originalSize*: int64
|
|
compressedSize*: int64
|
|
dependencies*: seq[string] # Hashes of dependent objects
|
|
|
|
SyncState* = object
|
|
lastSyncTime*: times.DateTime
|
|
lastSequenceNumber*: int64
|
|
bloomFilter*: BloomFilter
|
|
knownObjects*: HashSet[string]
|
|
pendingDeltas*: seq[DeltaObject]
|
|
|
|
MirrorNode* = object
|
|
id*: string
|
|
url*: string
|
|
priority*: int # Higher = preferred
|
|
latency*: float # Average response time in ms
|
|
reliability*: float # Success rate (0.0-1.0)
|
|
bandwidth*: int64 # Available bandwidth in bytes/sec
|
|
lastSync*: times.DateTime
|
|
status*: MirrorStatus
|
|
syncState*: SyncState
|
|
|
|
MirrorStatus* = enum
|
|
MirrorActive = "active"
|
|
MirrorSlow = "slow"
|
|
MirrorUnreachable = "unreachable"
|
|
MirrorSyncing = "syncing"
|
|
|
|
SyncEngine* = ref object
|
|
localCasManager*: CasManager
|
|
eventLogger*: SecurityEventLogger
|
|
mirrors*: Table[string, MirrorNode]
|
|
activeMirror*: string
|
|
config*: SyncEngineConfig
|
|
syncState*: SyncState
|
|
bandwidthLimiter*: BandwidthLimiter
|
|
|
|
SyncEngineConfig* = object
|
|
maxMirrors*: int
|
|
syncIntervalSeconds*: int
|
|
bloomFilterSize*: int
|
|
bloomFilterHashFunctions*: int
|
|
maxDeltaSize*: int64 # Maximum delta object size
|
|
compressionLevel*: int # zstd compression level
|
|
bandwidthLimitBps*: int64 # Bandwidth limit in bytes per second
|
|
failoverTimeoutMs*: int # Timeout before failover
|
|
maxConcurrentSyncs*: int
|
|
|
|
BandwidthLimiter* = ref object
|
|
limitBps*: int64
|
|
currentUsage*: int64
|
|
windowStart*: times.DateTime
|
|
windowSizeMs*: int
|
|
|
|
SyncResult*[T] = object
|
|
case success*: bool
|
|
of true:
|
|
value*: T
|
|
bytesTransferred*: int64
|
|
duration*: float
|
|
of false:
|
|
error*: string
|
|
errorCode*: int
|
|
|
|
# =============================================================================
|
|
# Bloom Filter Implementation
|
|
# =============================================================================
|
|
|
|
proc newBloomFilter*(expectedElements: int, falsePositiveRate: float = 0.01): BloomFilter =
|
|
## Create a new bloom filter optimized for the expected number of elements
|
|
let size = int(-1.0 * float(expectedElements) * ln(falsePositiveRate) / (ln(2.0) * ln(2.0)))
|
|
let hashFunctions = int(float(size) / float(expectedElements) * ln(2.0))
|
|
|
|
BloomFilter(
|
|
bits: newSeq[bool](size),
|
|
size: size,
|
|
hashFunctions: max(1, hashFunctions),
|
|
expectedElements: expectedElements,
|
|
falsePositiveRate: falsePositiveRate
|
|
)
|
|
|
|
proc hash1(data: string): uint32 =
|
|
## First hash function (FNV-1a variant)
|
|
var hash: uint32 = 2166136261'u32
|
|
for c in data:
|
|
hash = hash xor uint32(c)
|
|
hash = hash * 16777619'u32
|
|
return hash
|
|
|
|
proc hash2(data: string): uint32 =
|
|
## Second hash function (djb2 variant)
|
|
var hash: uint32 = 5381'u32
|
|
for c in data:
|
|
hash = ((hash shl 5) + hash) + uint32(c)
|
|
return hash
|
|
|
|
proc getHashValues(data: string, numHashes: int, size: int): seq[int] =
|
|
## Generate multiple hash values using double hashing
|
|
let h1 = hash1(data)
|
|
let h2 = hash2(data)
|
|
|
|
result = newSeq[int](numHashes)
|
|
for i in 0..<numHashes:
|
|
result[i] = int((h1 + uint32(i) * h2) mod uint32(size))
|
|
|
|
proc add*(filter: var BloomFilter, item: string) =
|
|
## Add an item to the bloom filter
|
|
let hashes = getHashValues(item, filter.hashFunctions, filter.size)
|
|
for hashVal in hashes:
|
|
filter.bits[hashVal] = true
|
|
|
|
proc contains*(filter: BloomFilter, item: string): bool =
|
|
## Check if an item might be in the bloom filter
|
|
let hashes = getHashValues(item, filter.hashFunctions, filter.size)
|
|
for hashVal in hashes:
|
|
if not filter.bits[hashVal]:
|
|
return false
|
|
return true
|
|
|
|
proc estimatedFalsePositiveRate*(filter: BloomFilter): float =
|
|
## Calculate the current estimated false positive rate
|
|
let setBits = filter.bits.countIt(it)
|
|
let ratio = float(setBits) / float(filter.size)
|
|
return pow(ratio, float(filter.hashFunctions))
|
|
|
|
proc serialize*(filter: BloomFilter): seq[byte] =
|
|
## Serialize bloom filter for network transmission
|
|
var result: seq[byte] = @[]
|
|
|
|
# Add header
|
|
# Add header
|
|
var sizeBytes = cast[array[sizeof(int), byte]](filter.size)
|
|
result.add(sizeBytes)
|
|
var funcBytes = cast[array[sizeof(int), byte]](filter.hashFunctions)
|
|
result.add(funcBytes)
|
|
var elemBytes = cast[array[sizeof(int), byte]](filter.expectedElements)
|
|
result.add(elemBytes)
|
|
|
|
# Pack bits efficiently (8 bits per byte)
|
|
let numBytes = (filter.size + 7) div 8
|
|
var packedBits = newSeq[byte](numBytes)
|
|
|
|
for i in 0..<filter.size:
|
|
if filter.bits[i]:
|
|
let byteIndex = i div 8
|
|
let bitIndex = i mod 8
|
|
packedBits[byteIndex] = packedBits[byteIndex] or (1'u8 shl bitIndex)
|
|
|
|
result.add(packedBits)
|
|
return result
|
|
|
|
proc deserializeBloomFilter*(data: seq[byte]): BloomFilter =
|
|
## Deserialize bloom filter from network data
|
|
var offset = 0
|
|
|
|
# Read header
|
|
# Read header
|
|
if offset + sizeof(int) > data.len: raise newException(ValueError, "Data too short")
|
|
let size = cast[ptr int](unsafeAddr data[offset])[]
|
|
offset += sizeof(int)
|
|
|
|
if offset + sizeof(int) > data.len: raise newException(ValueError, "Data too short")
|
|
let hashFunctions = cast[ptr int](unsafeAddr data[offset])[]
|
|
offset += sizeof(int)
|
|
|
|
if offset + sizeof(int) > data.len: raise newException(ValueError, "Data too short")
|
|
let expectedElements = cast[ptr int](unsafeAddr data[offset])[]
|
|
offset += sizeof(int)
|
|
|
|
# Unpack bits
|
|
var bits = newSeq[bool](size)
|
|
let numBytes = (size + 7) div 8
|
|
|
|
for i in 0..<size:
|
|
let byteIndex = i div 8
|
|
let bitIndex = i mod 8
|
|
if byteIndex < numBytes and offset + byteIndex < data.len:
|
|
bits[i] = (data[offset + byteIndex] and (1'u8 shl bitIndex)) != 0
|
|
|
|
BloomFilter(
|
|
bits: bits,
|
|
size: size,
|
|
hashFunctions: hashFunctions,
|
|
expectedElements: expectedElements,
|
|
falsePositiveRate: 0.01 # Default value
|
|
)
|
|
|
|
# =============================================================================
|
|
# Bandwidth Management
|
|
# =============================================================================
|
|
|
|
proc newBandwidthLimiter*(limitBps: int64): BandwidthLimiter =
|
|
## Create a new bandwidth limiter
|
|
BandwidthLimiter(
|
|
limitBps: limitBps,
|
|
currentUsage: 0,
|
|
windowStart: now(),
|
|
windowSizeMs: 1000 # 1 second window
|
|
)
|
|
|
|
proc checkBandwidth*(limiter: BandwidthLimiter, requestedBytes: int64): bool =
|
|
## Check if bandwidth is available for the requested transfer
|
|
let currentTime = now()
|
|
let windowElapsed = (currentTime - limiter.windowStart).inMilliseconds
|
|
|
|
# Reset window if enough time has passed
|
|
if windowElapsed >= limiter.windowSizeMs:
|
|
limiter.currentUsage = 0
|
|
limiter.windowStart = currentTime
|
|
|
|
# Check if request would exceed limit
|
|
if limiter.currentUsage + requestedBytes > limiter.limitBps:
|
|
return false
|
|
|
|
limiter.currentUsage += requestedBytes
|
|
return true
|
|
|
|
proc waitForBandwidth*(limiter: BandwidthLimiter, requestedBytes: int64) {.async.} =
|
|
## Wait until bandwidth is available for the requested transfer
|
|
while not limiter.checkBandwidth(requestedBytes):
|
|
await sleepAsync(100) # Wait 100ms and try again
|
|
|
|
# =============================================================================
|
|
# Sync Engine Initialization
|
|
# =============================================================================
|
|
|
|
proc newSyncEngine*(casManager: CasManager, eventLogger: SecurityEventLogger,
|
|
config: SyncEngineConfig): SyncEngine =
|
|
## Create a new synchronization engine
|
|
let syncState = SyncState(
|
|
lastSyncTime: default(times.DateTime),
|
|
lastSequenceNumber: 0,
|
|
bloomFilter: newBloomFilter(config.bloomFilterSize),
|
|
knownObjects: initHashSet[string](),
|
|
pendingDeltas: @[]
|
|
)
|
|
|
|
SyncEngine(
|
|
localCasManager: casManager,
|
|
eventLogger: eventLogger,
|
|
mirrors: initTable[string, MirrorNode](),
|
|
activeMirror: "",
|
|
config: config,
|
|
syncState: syncState,
|
|
bandwidthLimiter: newBandwidthLimiter(config.bandwidthLimitBps)
|
|
)
|
|
|
|
proc getDefaultSyncEngineConfig*(): SyncEngineConfig =
|
|
## Get default synchronization engine configuration
|
|
SyncEngineConfig(
|
|
maxMirrors: 10,
|
|
syncIntervalSeconds: 300, # 5 minutes
|
|
bloomFilterSize: 100000, # 100k expected objects
|
|
bloomFilterHashFunctions: 7,
|
|
maxDeltaSize: 100 * 1024 * 1024, # 100MB
|
|
compressionLevel: 3, # Balanced compression
|
|
bandwidthLimitBps: 10 * 1024 * 1024, # 10MB/s
|
|
failoverTimeoutMs: 5000, # 5 seconds
|
|
maxConcurrentSyncs: 3
|
|
)
|
|
|
|
# =============================================================================
|
|
# Mirror Management
|
|
# =============================================================================
|
|
|
|
proc addMirror*(engine: SyncEngine, id: string, url: string, priority: int = 50): SyncResult[MirrorNode] =
|
|
## Add a new mirror node to the sync engine
|
|
try:
|
|
if engine.mirrors.len >= engine.config.maxMirrors:
|
|
return SyncResult[MirrorNode](success: false, error: "Maximum mirrors reached", errorCode: 429)
|
|
|
|
if id in engine.mirrors:
|
|
return SyncResult[MirrorNode](success: false, error: fmt"Mirror already exists: {id}", errorCode: 409)
|
|
|
|
let mirror = MirrorNode(
|
|
id: id,
|
|
url: url,
|
|
priority: priority,
|
|
latency: 0.0,
|
|
reliability: 1.0, # Start with perfect reliability
|
|
bandwidth: 0,
|
|
lastSync: default(times.DateTime),
|
|
status: MirrorActive,
|
|
syncState: SyncState(
|
|
lastSyncTime: default(times.DateTime),
|
|
lastSequenceNumber: 0,
|
|
bloomFilter: newBloomFilter(engine.config.bloomFilterSize),
|
|
knownObjects: initHashSet[string](),
|
|
pendingDeltas: @[]
|
|
)
|
|
)
|
|
|
|
engine.mirrors[id] = mirror
|
|
|
|
# Set as active mirror if it's the first one or has higher priority
|
|
if engine.activeMirror == "" or priority > engine.mirrors[engine.activeMirror].priority:
|
|
engine.activeMirror = id
|
|
|
|
logGlobalSecurityEvent(EventSystemStartup, SeverityInfo, "sync-engine",
|
|
fmt"Mirror added: {id} ({url}) priority={priority}")
|
|
|
|
return SyncResult[MirrorNode](success: true, value: mirror, bytesTransferred: 0, duration: 0.0)
|
|
|
|
except Exception as e:
|
|
return SyncResult[MirrorNode](success: false, error: fmt"Failed to add mirror: {e.msg}", errorCode: 500)
|
|
|
|
proc removeMirror*(engine: SyncEngine, id: string): SyncResult[bool] =
|
|
## Remove a mirror node from the sync engine
|
|
try:
|
|
if id notin engine.mirrors:
|
|
return SyncResult[bool](success: false, error: fmt"Mirror not found: {id}", errorCode: 404)
|
|
|
|
engine.mirrors.del(id)
|
|
|
|
# Find new active mirror if we removed the active one
|
|
if engine.activeMirror == id:
|
|
engine.activeMirror = ""
|
|
var bestPriority = -1
|
|
for mirrorId, mirror in engine.mirrors.pairs:
|
|
if mirror.status == MirrorActive and mirror.priority > bestPriority:
|
|
engine.activeMirror = mirrorId
|
|
bestPriority = mirror.priority
|
|
|
|
logGlobalSecurityEvent(EventSystemStartup, SeverityInfo, "sync-engine",
|
|
fmt"Mirror removed: {id}")
|
|
|
|
return SyncResult[bool](success: true, value: true, bytesTransferred: 0, duration: 0.0)
|
|
|
|
except Exception as e:
|
|
return SyncResult[bool](success: false, error: fmt"Failed to remove mirror: {e.msg}", errorCode: 500)
|
|
|
|
proc selectBestMirror*(engine: SyncEngine): Option[MirrorNode] =
|
|
## Select the best available mirror based on priority, latency, and reliability
|
|
var bestMirror: Option[MirrorNode] = none(MirrorNode)
|
|
var bestScore = -1.0
|
|
|
|
for mirror in engine.mirrors.values:
|
|
if mirror.status != MirrorActive:
|
|
continue
|
|
|
|
# Calculate composite score: priority * reliability / (1 + latency)
|
|
let score = float(mirror.priority) * mirror.reliability / (1.0 + mirror.latency / 1000.0)
|
|
|
|
if score > bestScore:
|
|
bestScore = score
|
|
bestMirror = some(mirror)
|
|
|
|
return bestMirror
|
|
|
|
# =============================================================================
|
|
# Event Log Integration
|
|
# =============================================================================
|
|
|
|
proc extractSyncEventsFromSecurityLog*(engine: SyncEngine, since: times.DateTime): seq[SyncEvent] =
|
|
## Extract synchronization-relevant events from the security event log
|
|
var syncEvents: seq[SyncEvent] = @[]
|
|
|
|
try:
|
|
# Read security events since the last sync
|
|
let securityEvents = engine.eventLogger.auditSecurityLog(since, now())
|
|
|
|
var sequenceNumber = engine.syncState.lastSequenceNumber + 1
|
|
|
|
for secEvent in securityEvents:
|
|
var syncEvent: Option[SyncEvent] = none(SyncEvent)
|
|
|
|
case secEvent.eventType:
|
|
of EventPackageVerification:
|
|
# Package was verified - might indicate new package
|
|
if secEvent.metadata.hasKey("package_hash"):
|
|
syncEvent = some(SyncEvent(
|
|
id: secEvent.id,
|
|
timestamp: secEvent.timestamp,
|
|
eventType: SyncPackageUpdated,
|
|
objectHash: secEvent.metadata["package_hash"].getStr(),
|
|
metadata: secEvent.metadata,
|
|
sequenceNumber: sequenceNumber
|
|
))
|
|
|
|
of EventKeyRevocation:
|
|
# Key was revoked - affects package trust
|
|
syncEvent = some(SyncEvent(
|
|
id: secEvent.id,
|
|
timestamp: secEvent.timestamp,
|
|
eventType: SyncKeyRevoked,
|
|
objectHash: secEvent.metadata.getOrDefault("key_id").getStr(""),
|
|
metadata: secEvent.metadata,
|
|
sequenceNumber: sequenceNumber
|
|
))
|
|
|
|
of EventKeyRollover:
|
|
# Key was rolled over - affects package signatures
|
|
syncEvent = some(SyncEvent(
|
|
id: secEvent.id,
|
|
timestamp: secEvent.timestamp,
|
|
eventType: SyncKeyRolledOver,
|
|
objectHash: secEvent.metadata.getOrDefault("new_key_id").getStr(""),
|
|
metadata: secEvent.metadata,
|
|
sequenceNumber: sequenceNumber
|
|
))
|
|
|
|
else:
|
|
# Other events might be relevant in the future
|
|
discard
|
|
|
|
if syncEvent.isSome():
|
|
syncEvents.add(syncEvent.get())
|
|
inc sequenceNumber
|
|
|
|
except Exception as e:
|
|
logGlobalSecurityEvent(EventSecurityIncident, SeverityError, "sync-engine",
|
|
fmt"Failed to extract sync events: {e.msg}")
|
|
|
|
return syncEvents
|
|
|
|
proc updateBloomFilterFromEvents*(engine: SyncEngine, events: seq[SyncEvent]) =
|
|
## Update the local bloom filter based on sync events
|
|
for event in events:
|
|
case event.eventType:
|
|
of SyncPackageAdded, SyncPackageUpdated:
|
|
engine.syncState.bloomFilter.add(event.objectHash)
|
|
engine.syncState.knownObjects.incl(event.objectHash)
|
|
|
|
of SyncPackageRemoved:
|
|
# Note: We can't remove from bloom filter, but we can remove from known objects
|
|
engine.syncState.knownObjects.excl(event.objectHash)
|
|
|
|
of SyncKeyRevoked, SyncKeyRolledOver:
|
|
# These affect trust but don't directly change object presence
|
|
discard
|
|
|
|
of SyncManifestChanged:
|
|
engine.syncState.bloomFilter.add(event.objectHash)
|
|
engine.syncState.knownObjects.incl(event.objectHash)
|
|
|
|
# =============================================================================
|
|
# Bloom Filter Handshake Protocol
|
|
# =============================================================================
|
|
|
|
proc performBloomFilterHandshake*(engine: SyncEngine, mirror: MirrorNode): Future[SyncResult[seq[string]]] {.async.} =
|
|
## Perform bloom filter handshake to identify objects that need synchronization
|
|
let startTime = cpuTime()
|
|
|
|
try:
|
|
# Serialize our bloom filter
|
|
let localBloomData = engine.syncState.bloomFilter.serialize()
|
|
|
|
# Wait for bandwidth availability
|
|
await engine.bandwidthLimiter.waitForBandwidth(int64(localBloomData.len))
|
|
|
|
# Send bloom filter to mirror
|
|
let client = newAsyncHttpClient()
|
|
let handshakeUrl = mirror.url / "api/v1/sync/bloom-handshake"
|
|
|
|
let response = await client.post(handshakeUrl, body = $localBloomData)
|
|
let responseData = await response.body
|
|
|
|
if response.code != Http200:
|
|
return SyncResult[seq[string]](
|
|
success: false,
|
|
error: fmt"Handshake failed: HTTP {response.code}",
|
|
errorCode: response.code.int
|
|
)
|
|
|
|
# Parse response to get list of objects we don't have
|
|
let responseJson = parseJson(responseData)
|
|
let missingObjects = responseJson["missing_objects"].getElems().mapIt(it.getStr())
|
|
let remoteBloomData = responseJson["remote_bloom_filter"].getStr()
|
|
|
|
# Deserialize remote bloom filter
|
|
let remoteBloomFilter = deserializeBloomFilter(cast[seq[byte]](remoteBloomData))
|
|
|
|
# Find objects we have that the remote doesn't
|
|
var objectsToSend: seq[string] = @[]
|
|
for objectHash in engine.syncState.knownObjects:
|
|
if not remoteBloomFilter.contains(objectHash):
|
|
objectsToSend.add(objectHash)
|
|
|
|
# Log handshake results
|
|
logGlobalSecurityEvent(EventSystemHealthCheck, SeverityInfo, "sync-engine",
|
|
fmt"Bloom filter handshake with {mirror.id}: {missingObjects.len} to receive, {objectsToSend.len} to send")
|
|
|
|
let duration = cpuTime() - startTime
|
|
return SyncResult[seq[string]](
|
|
success: true,
|
|
value: missingObjects,
|
|
bytesTransferred: int64(localBloomData.len + responseData.len),
|
|
duration: duration
|
|
)
|
|
|
|
except Exception as e:
|
|
let duration = cpuTime() - startTime
|
|
return SyncResult[seq[string]](
|
|
success: false,
|
|
error: fmt"Handshake error: {e.msg}",
|
|
errorCode: 500
|
|
)
|
|
|
|
# =============================================================================
|
|
# Delta Object Creation and Application
|
|
# =============================================================================
|
|
|
|
proc compressZstd*(data: seq[byte], level: int): seq[byte] =
|
|
## Compress data using zstd (placeholder implementation)
|
|
# TODO: Implement actual zstd compression when library is available
|
|
# For now, return original data with a simple marker
|
|
result = @[0xFF'u8, 0xFE'u8] & data # Marker for "compressed"
|
|
|
|
proc decompressZstd*(data: seq[byte]): seq[byte] =
|
|
## Decompress zstd data (placeholder implementation)
|
|
# TODO: Implement actual zstd decompression when library is available
|
|
# For now, check for marker and return data without it
|
|
if data.len >= 2 and data[0] == 0xFF'u8 and data[1] == 0xFE'u8:
|
|
return data[2..^1]
|
|
else:
|
|
return data
|
|
|
|
proc createDeltaObject*(engine: SyncEngine, objectHash: string): SyncResult[DeltaObject] =
|
|
## Create a delta object for efficient transmission
|
|
try:
|
|
# Retrieve object from local CAS
|
|
let objectResult = engine.localCasManager.retrieveObject(objectHash)
|
|
if not objectResult.isOk:
|
|
return SyncResult[DeltaObject](
|
|
success: false,
|
|
error: fmt"Object not found in CAS: {objectHash}",
|
|
errorCode: 404
|
|
)
|
|
|
|
let originalData = objectResult.okValue
|
|
let originalSize = int64(originalData.len)
|
|
|
|
# Compress the data using zstd
|
|
let compressedData = compressZstd(originalData, engine.config.compressionLevel)
|
|
let compressedSize = int64(compressedData.len)
|
|
|
|
# Check if delta would be too large
|
|
if compressedSize > engine.config.maxDeltaSize:
|
|
return SyncResult[DeltaObject](
|
|
success: false,
|
|
error: fmt"Delta object too large: {compressedSize} > {engine.config.maxDeltaSize}",
|
|
errorCode: 413
|
|
)
|
|
|
|
let deltaObject = DeltaObject(
|
|
objectHash: objectHash,
|
|
deltaType: "add", # For now, we only support full object transmission
|
|
compressedData: compressedData,
|
|
originalSize: originalSize,
|
|
compressedSize: compressedSize,
|
|
dependencies: @[] # TODO: Implement dependency tracking
|
|
)
|
|
|
|
return SyncResult[DeltaObject](
|
|
success: true,
|
|
value: deltaObject,
|
|
bytesTransferred: compressedSize,
|
|
duration: 0.0
|
|
)
|
|
|
|
except Exception as e:
|
|
return SyncResult[DeltaObject](
|
|
success: false,
|
|
error: fmt"Failed to create delta object: {e.msg}",
|
|
errorCode: 500
|
|
)
|
|
|
|
proc applyDeltaObject*(engine: SyncEngine, delta: DeltaObject): SyncResult[bool] =
|
|
## Apply a delta object to the local CAS
|
|
try:
|
|
# Decompress the data
|
|
let originalData = decompressZstd(delta.compressedData)
|
|
|
|
# Verify the hash matches
|
|
let computedHash = engine.localCasManager.computeHash(originalData)
|
|
if computedHash != delta.objectHash:
|
|
return SyncResult[bool](
|
|
success: false,
|
|
error: fmt"Hash mismatch: expected {delta.objectHash}, got {computedHash}",
|
|
errorCode: 400
|
|
)
|
|
|
|
# Store in local CAS
|
|
let storeResult = engine.localCasManager.storeObject(originalData)
|
|
if not storeResult.isOk:
|
|
return SyncResult[bool](
|
|
success: false,
|
|
error: fmt"Failed to store object: {storeResult.errValue.msg}",
|
|
errorCode: 500
|
|
)
|
|
|
|
# Update local state
|
|
engine.syncState.knownObjects.incl(delta.objectHash)
|
|
engine.syncState.bloomFilter.add(delta.objectHash)
|
|
|
|
return SyncResult[bool](
|
|
success: true,
|
|
value: true,
|
|
bytesTransferred: delta.compressedSize,
|
|
duration: 0.0
|
|
)
|
|
|
|
except Exception as e:
|
|
return SyncResult[bool](
|
|
success: false,
|
|
error: fmt"Failed to apply delta object: {e.msg}",
|
|
errorCode: 500
|
|
)
|
|
|
|
# =============================================================================
|
|
# Incremental Synchronization
|
|
# =============================================================================
|
|
|
|
proc performIncrementalSync*(engine: SyncEngine, mirrorId: string): Future[SyncResult[int]] {.async.} =
|
|
## Perform incremental synchronization with a specific mirror
|
|
let startTime = cpuTime()
|
|
var totalBytesTransferred: int64 = 0
|
|
|
|
try:
|
|
if mirrorId notin engine.mirrors:
|
|
return SyncResult[int](
|
|
success: false,
|
|
error: fmt"Mirror not found: {mirrorId}",
|
|
errorCode: 404
|
|
)
|
|
|
|
var mirror = engine.mirrors[mirrorId]
|
|
mirror.status = MirrorSyncing
|
|
|
|
# Extract events since last sync
|
|
let lastSyncTime = if mirror.syncState.lastSyncTime == default(times.DateTime):
|
|
now() - initDuration(hours = 24) # Default to last 24 hours
|
|
else:
|
|
mirror.syncState.lastSyncTime
|
|
|
|
let syncEvents = engine.extractSyncEventsFromSecurityLog(lastSyncTime)
|
|
|
|
if syncEvents.len == 0:
|
|
# No changes to sync
|
|
mirror.status = MirrorActive
|
|
mirror.lastSync = now()
|
|
engine.mirrors[mirrorId] = mirror
|
|
|
|
return SyncResult[int](
|
|
success: true,
|
|
value: 0,
|
|
bytesTransferred: 0,
|
|
duration: cpuTime() - startTime
|
|
)
|
|
|
|
# Update local bloom filter with recent events
|
|
engine.updateBloomFilterFromEvents(syncEvents)
|
|
|
|
# Perform bloom filter handshake
|
|
let handshakeResult = await engine.performBloomFilterHandshake(mirror)
|
|
if not handshakeResult.success:
|
|
mirror.status = MirrorUnreachable
|
|
engine.mirrors[mirrorId] = mirror
|
|
return SyncResult[int](
|
|
success: false,
|
|
error: fmt"Handshake failed: {handshakeResult.error}",
|
|
errorCode: handshakeResult.errorCode
|
|
)
|
|
|
|
totalBytesTransferred += handshakeResult.bytesTransferred
|
|
let missingObjects = handshakeResult.value
|
|
|
|
# Create and send delta objects for missing objects
|
|
var syncedObjects = 0
|
|
for objectHash in missingObjects:
|
|
# Check bandwidth limit
|
|
await engine.bandwidthLimiter.waitForBandwidth(engine.config.maxDeltaSize)
|
|
|
|
let deltaResult = engine.createDeltaObject(objectHash)
|
|
if not deltaResult.success:
|
|
logGlobalSecurityEvent(EventSecurityIncident, SeverityWarning, "sync-engine",
|
|
fmt"Failed to create delta for {objectHash}: {deltaResult.error}")
|
|
continue
|
|
|
|
let delta = deltaResult.value
|
|
|
|
# Send delta to mirror
|
|
let client = newAsyncHttpClient()
|
|
let deltaUrl = mirror.url / "api/v1/sync/delta"
|
|
|
|
let deltaJson = %*{
|
|
"object_hash": delta.objectHash,
|
|
"delta_type": delta.deltaType,
|
|
"compressed_data": delta.compressedData,
|
|
"original_size": delta.originalSize,
|
|
"compressed_size": delta.compressedSize
|
|
}
|
|
|
|
let response = await client.post(deltaUrl, body = $deltaJson)
|
|
|
|
if response.code == Http200:
|
|
inc syncedObjects
|
|
totalBytesTransferred += delta.compressedSize
|
|
else:
|
|
logGlobalSecurityEvent(EventSecurityIncident, SeverityWarning, "sync-engine",
|
|
fmt"Failed to send delta {objectHash} to {mirrorId}: HTTP {response.code}")
|
|
|
|
# Update mirror state
|
|
mirror.status = MirrorActive
|
|
mirror.lastSync = now()
|
|
mirror.syncState.lastSyncTime = now()
|
|
mirror.syncState.lastSequenceNumber = if syncEvents.len > 0: syncEvents[^1].sequenceNumber else: mirror.syncState.lastSequenceNumber
|
|
engine.mirrors[mirrorId] = mirror
|
|
|
|
# Update engine state
|
|
engine.syncState.lastSyncTime = now()
|
|
if syncEvents.len > 0:
|
|
engine.syncState.lastSequenceNumber = syncEvents[^1].sequenceNumber
|
|
|
|
logGlobalSecurityEvent(EventSystemHealthCheck, SeverityInfo, "sync-engine",
|
|
fmt"Incremental sync with {mirrorId} completed: {syncedObjects} objects, {totalBytesTransferred} bytes")
|
|
|
|
let duration = cpuTime() - startTime
|
|
return SyncResult[int](
|
|
success: true,
|
|
value: syncedObjects,
|
|
bytesTransferred: totalBytesTransferred,
|
|
duration: duration
|
|
)
|
|
|
|
except Exception as e:
|
|
let duration = cpuTime() - startTime
|
|
return SyncResult[int](
|
|
success: false,
|
|
error: fmt"Incremental sync failed: {e.msg}",
|
|
errorCode: 500
|
|
)
|
|
|
|
# =============================================================================
|
|
# Mirror Network with Load Balancing and Failover
|
|
# =============================================================================
|
|
|
|
proc updateMirrorHealth*(engine: SyncEngine, mirrorId: string, latency: float, success: bool) =
|
|
## Update mirror health metrics for load balancing decisions
|
|
if mirrorId notin engine.mirrors:
|
|
return
|
|
|
|
var mirror = engine.mirrors[mirrorId]
|
|
|
|
# Update latency with exponential moving average
|
|
if mirror.latency == 0.0:
|
|
mirror.latency = latency
|
|
else:
|
|
mirror.latency = 0.7 * mirror.latency + 0.3 * latency
|
|
|
|
# Update reliability with exponential moving average
|
|
let successRate = if success: 1.0 else: 0.0
|
|
mirror.reliability = 0.9 * mirror.reliability + 0.1 * successRate
|
|
|
|
# Update status based on health
|
|
if mirror.reliability < 0.5:
|
|
mirror.status = MirrorUnreachable
|
|
elif mirror.latency > 5000.0: # 5 seconds
|
|
mirror.status = MirrorSlow
|
|
else:
|
|
mirror.status = MirrorActive
|
|
|
|
engine.mirrors[mirrorId] = mirror
|
|
|
|
proc performFailover*(engine: SyncEngine): Option[string] =
|
|
## Perform failover to the next best available mirror
|
|
let currentMirror = engine.activeMirror
|
|
|
|
# Find the best alternative mirror
|
|
let bestMirror = engine.selectBestMirror()
|
|
if bestMirror.isNone():
|
|
return none(string)
|
|
|
|
let newActiveMirror = bestMirror.get().id
|
|
if newActiveMirror != currentMirror:
|
|
engine.activeMirror = newActiveMirror
|
|
|
|
logGlobalSecurityEvent(EventSystemHealthCheck, SeverityWarning, "sync-engine",
|
|
fmt"Failover from {currentMirror} to {newActiveMirror}")
|
|
|
|
return some(newActiveMirror)
|
|
|
|
return none(string)
|
|
|
|
proc syncWithLoadBalancing*(engine: SyncEngine): Future[SyncResult[int]] {.async.} =
|
|
## Perform synchronization with automatic load balancing and failover
|
|
var totalSynced = 0
|
|
var totalBytesTransferred: int64 = 0
|
|
let startTime = cpuTime()
|
|
|
|
try:
|
|
# Get list of active mirrors sorted by priority and health
|
|
var availableMirrors = engine.mirrors.values.toSeq
|
|
.filterIt(it.status == MirrorActive)
|
|
.sortedByIt(-it.priority)
|
|
|
|
if availableMirrors.len == 0:
|
|
return SyncResult[int](
|
|
success: false,
|
|
error: "No active mirrors available",
|
|
errorCode: 503
|
|
)
|
|
|
|
# Try to sync with mirrors in order of preference
|
|
for mirror in availableMirrors:
|
|
let syncStartTime = cpuTime()
|
|
|
|
try:
|
|
let syncResult = await engine.performIncrementalSync(mirror.id)
|
|
let syncDuration = cpuTime() - syncStartTime
|
|
|
|
if syncResult.success:
|
|
totalSynced += syncResult.value
|
|
totalBytesTransferred += syncResult.bytesTransferred
|
|
|
|
# Update mirror health with successful sync
|
|
engine.updateMirrorHealth(mirror.id, syncDuration * 1000.0, true)
|
|
|
|
# If we successfully synced with this mirror, we can stop
|
|
break
|
|
else:
|
|
# Update mirror health with failed sync
|
|
engine.updateMirrorHealth(mirror.id, syncDuration * 1000.0, false)
|
|
|
|
logGlobalSecurityEvent(EventSecurityIncident, SeverityWarning, "sync-engine",
|
|
fmt"Sync failed with {mirror.id}: {syncResult.error}")
|
|
|
|
except Exception as e:
|
|
# Update mirror health with exception
|
|
engine.updateMirrorHealth(mirror.id, (cpuTime() - syncStartTime) * 1000.0, false)
|
|
|
|
logGlobalSecurityEvent(EventSecurityIncident, SeverityError, "sync-engine",
|
|
fmt"Sync exception with {mirror.id}: {e.msg}")
|
|
|
|
# Perform failover if needed
|
|
if totalSynced == 0:
|
|
let failoverResult = engine.performFailover()
|
|
if failoverResult.isSome():
|
|
# Try once more with the new active mirror
|
|
let newMirror = failoverResult.get()
|
|
let syncResult = await engine.performIncrementalSync(newMirror)
|
|
if syncResult.success:
|
|
totalSynced += syncResult.value
|
|
totalBytesTransferred += syncResult.bytesTransferred
|
|
|
|
let duration = cpuTime() - startTime
|
|
|
|
if totalSynced > 0:
|
|
return SyncResult[int](
|
|
success: true,
|
|
value: totalSynced,
|
|
bytesTransferred: totalBytesTransferred,
|
|
duration: duration
|
|
)
|
|
else:
|
|
return SyncResult[int](
|
|
success: false,
|
|
error: "No objects synchronized (or all mirrors failed)",
|
|
errorCode: 503
|
|
)
|
|
|
|
except Exception as e:
|
|
let duration = cpuTime() - startTime
|
|
return SyncResult[int](
|
|
success: false,
|
|
error: fmt"Load balanced sync failed: {e.msg}",
|
|
errorCode: 500
|
|
)
|
|
|
|
# =============================================================================
|
|
# Automatic Synchronization Daemon
|
|
# =============================================================================
|
|
|
|
proc startSyncDaemon*(engine: SyncEngine) {.async.} =
|
|
## Start the automatic synchronization daemon
|
|
logGlobalSecurityEvent(EventSystemStartup, SeverityInfo, "sync-engine",
|
|
fmt"Starting sync daemon (interval: {engine.config.syncIntervalSeconds}s)")
|
|
|
|
while true:
|
|
try:
|
|
let syncResult = await engine.syncWithLoadBalancing()
|
|
|
|
if syncResult.success:
|
|
logGlobalSecurityEvent(EventSystemHealthCheck, SeverityInfo, "sync-engine",
|
|
fmt"Automatic sync completed: {syncResult.value} objects, {syncResult.bytesTransferred} bytes")
|
|
else:
|
|
logGlobalSecurityEvent(EventSecurityIncident, SeverityError, "sync-engine",
|
|
fmt"Automatic sync failed: {syncResult.error}")
|
|
|
|
# Wait for next sync interval
|
|
await sleepAsync(engine.config.syncIntervalSeconds * 1000)
|
|
|
|
except Exception as e:
|
|
logGlobalSecurityEvent(EventSecurityIncident, SeverityError, "sync-engine",
|
|
fmt"Sync daemon error: {e.msg}")
|
|
|
|
# Wait longer on error to avoid spam
|
|
await sleepAsync(60000) # 1 minute
|
|
|
|
|
|
|
|
# =============================================================================
|
|
# CLI Integration
|
|
# =============================================================================
|
|
|
|
proc nipSyncCommand*(target: string = "all", force: bool = false): Future[SyncResult[int]] {.async.} =
|
|
## Implement nip sync command
|
|
try:
|
|
# Initialize sync engine
|
|
let casManager = newCasManager("~/.nip/cas", "/var/lib/nip/cas")
|
|
let eventLogger = globalSecurityLogger # Use global logger
|
|
let config = getDefaultSyncEngineConfig()
|
|
var engine = newSyncEngine(casManager, eventLogger, config)
|
|
|
|
# Add default mirrors (would normally be loaded from config)
|
|
discard engine.addMirror("official", "https://packages.nexusos.org", 100)
|
|
discard engine.addMirror("community", "https://community.nexusos.org", 50)
|
|
|
|
if target == "all":
|
|
# Sync with all mirrors using load balancing
|
|
return await engine.syncWithLoadBalancing()
|
|
else:
|
|
# Sync with specific mirror
|
|
return await engine.performIncrementalSync(target)
|
|
|
|
except Exception as e:
|
|
return SyncResult[int](
|
|
success: false,
|
|
error: fmt"Sync command failed: {e.msg}",
|
|
errorCode: 500
|
|
)
|
|
|
|
# =============================================================================
|
|
# Export main functions
|
|
# =============================================================================
|
|
|
|
export BloomFilter, SyncEventType, SyncEvent, DeltaObject, SyncState
|
|
export MirrorNode, MirrorStatus, SyncEngine, SyncEngineConfig
|
|
export BandwidthLimiter, SyncResult
|
|
export newBloomFilter, add, contains, serialize, deserializeBloomFilter
|
|
export newBandwidthLimiter, checkBandwidth, waitForBandwidth
|
|
export newSyncEngine, getDefaultSyncEngineConfig
|
|
export addMirror, removeMirror, selectBestMirror
|
|
export extractSyncEventsFromSecurityLog, updateBloomFilterFromEvents
|
|
export performBloomFilterHandshake, createDeltaObject, applyDeltaObject
|
|
export performIncrementalSync, syncWithLoadBalancing
|
|
export updateMirrorHealth, performFailover
|
|
export startSyncDaemon, nipSyncCommand |