nip/src/nimpak/remote/resumable_fetch.nim

555 lines
17 KiB
Nim

## nimpak/remote/resumable_fetch.nim
## Resume-safe, chunked fetch with HTTP Range and CAS integration
##
## This module provides robust download capabilities for large binary packages
## over unreliable network connections with automatic resume and integrity verification.
import std/[os, httpclient, asyncdispatch, strutils, strformat, times, json, math, random, sequtils, options]
import ../cas
import ../types_fixed
type
ChunkState* = enum
ChunkPending = "pending"
ChunkDownloading = "downloading"
ChunkComplete = "complete"
ChunkFailed = "failed"
DownloadChunk* = object
index*: int
startByte*: int64
endByte*: int64
size*: int64
hash*: string
state*: ChunkState
attempts*: int
lastError*: string
DownloadSession* = ref object
url*: string
totalSize*: int64
chunkSize*: int64
chunks*: seq[DownloadChunk]
sessionId*: string
targetPath*: string
casManager*: CasManager
resumeFile*: string
startTime*: times.DateTime
bytesDownloaded*: int64
FetchProgress* = object
sessionId*: string
totalBytes*: int64
downloadedBytes*: int64
currentChunk*: int
totalChunks*: int
speed*: float # bytes per second
eta*: int # seconds remaining
status*: string
FetchResult*[T] = object
case success*: bool
of true:
value*: T
bytesTransferred*: int64
duration*: float
of false:
error*: string
errorCode*: int
const
DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024 # 4MB chunks
MAX_CHUNK_ATTEMPTS = 3
RESUME_FILE_SUFFIX = ".nip-resume"
# =============================================================================
# Progress Event System
# =============================================================================
type
ProgressEvent* = object
eventType*: string
sessionId*: string
progress*: FetchProgress
timestamp*: times.DateTime
var progressSubscribers*: seq[proc(event: ProgressEvent)] = @[]
proc subscribeProgress*(callback: proc(event: ProgressEvent)) =
## Subscribe to progress events for TUI/monitoring integration
progressSubscribers.add(callback)
proc emitProgress*(event: ProgressEvent) =
## Emit progress event to all subscribers
for subscriber in progressSubscribers:
try:
subscriber(event)
except:
# Don't let subscriber errors break the download
discard
# =============================================================================
# Download Session Management
# =============================================================================
proc newDownloadSession*(url: string, targetPath: string, casManager: CasManager,
chunkSize: int64 = DEFAULT_CHUNK_SIZE): DownloadSession =
## Create a new download session
let sessionId = fmt"dl-{epochTime().int}-{rand(9999):04d}"
let resumeFile = targetPath & RESUME_FILE_SUFFIX
DownloadSession(
url: url,
totalSize: 0, # Will be determined from HTTP HEAD
chunkSize: chunkSize,
chunks: @[],
sessionId: sessionId,
targetPath: targetPath,
casManager: casManager,
resumeFile: resumeFile,
startTime: now(),
bytesDownloaded: 0
)
proc saveSessionState*(session: DownloadSession) =
## Save session state for resume capability
let sessionData = %*{
"session_id": session.sessionId,
"url": session.url,
"total_size": session.totalSize,
"chunk_size": session.chunkSize,
"target_path": session.targetPath,
"start_time": $session.startTime,
"bytes_downloaded": session.bytesDownloaded,
"chunks": session.chunks.mapIt(%*{
"index": it.index,
"start_byte": it.startByte,
"end_byte": it.endByte,
"size": it.size,
"hash": it.hash,
"state": $it.state,
"attempts": it.attempts,
"last_error": it.lastError
})
}
writeFile(session.resumeFile, sessionData.pretty())
proc loadSessionState*(resumeFile: string, casManager: CasManager): Option[DownloadSession] =
## Load session state from resume file
try:
if not fileExists(resumeFile):
return none(DownloadSession)
let sessionData = parseJson(readFile(resumeFile))
var session = DownloadSession(
sessionId: sessionData["session_id"].getStr(),
url: sessionData["url"].getStr(),
totalSize: sessionData["total_size"].getInt(),
chunkSize: sessionData["chunk_size"].getInt(),
targetPath: sessionData["target_path"].getStr(),
casManager: casManager,
resumeFile: resumeFile,
startTime: sessionData["start_time"].getStr().parse("yyyy-MM-dd'T'HH:mm:ss'.'fff'Z'", utc()),
bytesDownloaded: sessionData["bytes_downloaded"].getInt(),
chunks: @[]
)
# Restore chunks
for chunkJson in sessionData["chunks"]:
session.chunks.add(DownloadChunk(
index: chunkJson["index"].getInt(),
startByte: chunkJson["start_byte"].getInt(),
endByte: chunkJson["end_byte"].getInt(),
size: chunkJson["size"].getInt(),
hash: chunkJson["hash"].getStr(),
state: parseEnum[ChunkState](chunkJson["state"].getStr()),
attempts: chunkJson["attempts"].getInt(),
lastError: chunkJson["last_error"].getStr()
))
return some(session)
except Exception:
return none(DownloadSession)
# =============================================================================
# HTTP Range Request Handling
# =============================================================================
proc getContentLength*(url: string): Future[FetchResult[int64]] {.async.} =
## Get content length using HTTP HEAD request
try:
let client = newAsyncHttpClient()
defer: client.close()
let response = await client.request(url, httpMethod = HttpHead)
if response.code != Http200 and response.code != Http206:
return FetchResult[int64](
success: false,
error: fmt"HTTP {response.code}: {response.status}",
errorCode: response.code.int
)
let contentLength = $response.headers.getOrDefault("content-length")
let size = if contentLength.len > 0: contentLength.parseInt() else: 0
return FetchResult[int64](
success: true,
value: size.int64,
bytesTransferred: 0,
duration: 0.0
)
except Exception as e:
return FetchResult[int64](
success: false,
error: fmt"Failed to get content length: {e.msg}",
errorCode: 500
)
proc downloadChunk*(session: DownloadSession, chunkIndex: int): Future[FetchResult[seq[byte]]] {.async.} =
## Download a specific chunk using HTTP Range request
if chunkIndex >= session.chunks.len:
return FetchResult[seq[byte]](
success: false,
error: "Invalid chunk index",
errorCode: 400
)
var chunk = session.chunks[chunkIndex]
chunk.state = ChunkDownloading
chunk.attempts += 1
session.chunks[chunkIndex] = chunk
try:
let client = newAsyncHttpClient()
defer: client.close()
# Set Range header for partial content
let rangeHeader = fmt"bytes={chunk.startByte}-{chunk.endByte}"
client.headers["Range"] = rangeHeader
let response = await client.request(session.url, httpMethod = HttpGet)
if response.code != Http206: # Partial Content
chunk.state = ChunkFailed
chunk.lastError = fmt"Expected HTTP 206, got {response.code}"
session.chunks[chunkIndex] = chunk
return FetchResult[seq[byte]](
success: false,
error: chunk.lastError,
errorCode: response.code.int
)
let body = await response.body
let data = cast[seq[byte]](body)
# Verify chunk size
if data.len != chunk.size:
chunk.state = ChunkFailed
chunk.lastError = fmt"Size mismatch: expected {chunk.size}, got {data.len}"
session.chunks[chunkIndex] = chunk
return FetchResult[seq[byte]](
success: false,
error: chunk.lastError,
errorCode: 400
)
# Calculate and verify chunk hash
let computedHash = session.casManager.computeHash(data)
if chunk.hash.len > 0 and computedHash != chunk.hash:
chunk.state = ChunkFailed
chunk.lastError = fmt"Hash mismatch: expected {chunk.hash}, got {computedHash}"
session.chunks[chunkIndex] = chunk
return FetchResult[seq[byte]](
success: false,
error: chunk.lastError,
errorCode: 400
)
# Update chunk state
chunk.state = ChunkComplete
chunk.hash = computedHash
session.chunks[chunkIndex] = chunk
session.bytesDownloaded += data.len.int64
return FetchResult[seq[byte]](
success: true,
value: data,
bytesTransferred: data.len.int64,
duration: 0.0
)
except Exception as e:
chunk.state = ChunkFailed
chunk.lastError = e.msg
session.chunks[chunkIndex] = chunk
return FetchResult[seq[byte]](
success: false,
error: fmt"Chunk download failed: {e.msg}",
errorCode: 500
)
# =============================================================================
# Resumable Download Engine
# =============================================================================
proc initializeChunks*(session: DownloadSession): Future[FetchResult[bool]] {.async.} =
## Initialize download chunks based on content length
try:
# Get content length if not already known
if session.totalSize == 0:
let sizeResult = await getContentLength(session.url)
if not sizeResult.success:
return FetchResult[bool](
success: false,
error: sizeResult.error,
errorCode: sizeResult.errorCode
)
session.totalSize = sizeResult.value
# Calculate number of chunks
let numChunks = int(ceil(float(session.totalSize) / float(session.chunkSize)))
# Initialize chunks if not already done
if session.chunks.len == 0:
for i in 0..<numChunks:
let startByte = i.int64 * session.chunkSize
let endByte = min(startByte + session.chunkSize - 1, session.totalSize - 1)
let size = endByte - startByte + 1
session.chunks.add(DownloadChunk(
index: i,
startByte: startByte,
endByte: endByte,
size: size,
hash: "", # Will be computed during download
state: ChunkPending,
attempts: 0,
lastError: ""
))
return FetchResult[bool](
success: true,
value: true,
bytesTransferred: 0,
duration: 0.0
)
except Exception as e:
return FetchResult[bool](
success: false,
error: fmt"Failed to initialize chunks: {e.msg}",
errorCode: 500
)
proc downloadWithResume*(session: DownloadSession,
maxConcurrent: int = 3): Future[FetchResult[string]] {.async.} =
## Download file with resume capability and concurrent chunk downloads
let startTime = cpuTime()
try:
# Initialize chunks
let initResult = await session.initializeChunks()
if not initResult.success:
return FetchResult[string](
success: false,
error: initResult.error,
errorCode: initResult.errorCode
)
# Create temporary directory for chunks
let tempDir = session.targetPath & ".chunks"
createDir(tempDir)
# Download chunks concurrently
var activeTasks: seq[Future[FetchResult[seq[byte]]]] = @[]
var completedChunks = 0
# Download chunks sequentially (simplified from original concurrent approach)
# TODO: Implement proper concurrent downloads with polling
for i, chunk in session.chunks:
if chunk.state == ChunkComplete:
inc completedChunks
continue
if chunk.attempts >= MAX_CHUNK_ATTEMPTS:
continue
let downloadResult = await session.downloadChunk(i)
if downloadResult.success:
# Save chunk data to temp file
let chunkFile = tempDir / fmt"chunk-{i:04d}"
writeFile(chunkFile, downloadResult.value)
inc completedChunks
# Emit progress event
let progress = FetchProgress(
sessionId: session.sessionId,
totalBytes: session.totalSize,
downloadedBytes: session.bytesDownloaded,
currentChunk: completedChunks,
totalChunks: session.chunks.len,
speed: float(session.bytesDownloaded) / max(cpuTime() - startTime, 0.001),
eta: 0,
status: fmt"Downloaded {completedChunks}/{session.chunks.len} chunks"
)
emitProgress(ProgressEvent(
eventType: "chunk_completed",
sessionId: session.sessionId,
progress: progress,
timestamp: now()
))
# Save session state periodically
session.saveSessionState()
# Reassemble file from chunks
let outputFile = open(session.targetPath, fmWrite)
defer: outputFile.close()
for chunk in session.chunks:
if chunk.state != ChunkComplete:
return FetchResult[string](
success: false,
error: fmt"Chunk {chunk.index} failed after {chunk.attempts} attempts: {chunk.lastError}",
errorCode: 500
)
let chunkFile = tempDir / fmt"chunk-{chunk.index:04d}"
if fileExists(chunkFile):
let chunkData = readFile(chunkFile)
outputFile.write(chunkData)
removeFile(chunkFile)
# Cleanup
removeDir(tempDir)
removeFile(session.resumeFile)
# Final progress event
let finalProgress = FetchProgress(
sessionId: session.sessionId,
totalBytes: session.totalSize,
downloadedBytes: session.totalSize,
currentChunk: session.chunks.len,
totalChunks: session.chunks.len,
speed: float(session.totalSize) / (cpuTime() - startTime),
eta: 0,
status: "Download completed"
)
emitProgress(ProgressEvent(
eventType: "download_completed",
sessionId: session.sessionId,
progress: finalProgress,
timestamp: now()
))
let duration = cpuTime() - startTime
return FetchResult[string](
success: true,
value: session.targetPath,
bytesTransferred: session.totalSize,
duration: duration
)
except Exception as e:
return FetchResult[string](
success: false,
error: fmt"Download failed: {e.msg}",
errorCode: 500
)
# =============================================================================
# High-Level API
# =============================================================================
proc fetchWithResume*(url: string, targetPath: string, casManager: CasManager,
chunkSize: int64 = DEFAULT_CHUNK_SIZE,
maxConcurrent: int = 3): Future[FetchResult[string]] {.async.} =
## High-level API for resumable downloads
try:
# Check for existing resume session
let resumeFile = targetPath & RESUME_FILE_SUFFIX
var session = loadSessionState(resumeFile, casManager).get(
newDownloadSession(url, targetPath, casManager, chunkSize)
)
# Perform download with resume capability
return await session.downloadWithResume(maxConcurrent)
except Exception as e:
return FetchResult[string](
success: false,
error: fmt"Fetch failed: {e.msg}",
errorCode: 500
)
proc fetchBinaryPackage*(packageName: string, version: string, url: string,
casManager: CasManager): Future[FetchResult[string]] {.async.} =
## Specialized function for binary package downloads
let targetPath = getTempDir() / fmt"{packageName}-{version}.npk"
# Use larger chunks for binary packages (8MB)
let fetchRes = await fetchWithResume(url, targetPath, casManager, 8 * 1024 * 1024, 4)
if fetchRes.success:
# Store in CAS for deduplication
let packageData = readFile(fetchRes.value)
let storeResult = casManager.storeObject(packageData.toOpenArrayByte(0, packageData.len - 1))
if storeResult.isOk:
# Remove temporary file
removeFile(fetchRes.value)
# Return CAS path
return FetchResult[string](
success: true,
value: storeResult.okValue.hash,
bytesTransferred: fetchRes.bytesTransferred,
duration: fetchRes.duration
)
# Store failed
return FetchResult[string](
success: false,
error: "Failed to store package in CAS: " & storeResult.errValue.msg,
errorCode: 500
)
# Fetch failed
return fetchRes
# =============================================================================
# CLI Integration
# =============================================================================
proc nipFetchCommand*(url: string, output: string = "", resume: bool = true,
chunks: int = 3): Future[FetchResult[string]] {.async.} =
## CLI command for resumable downloads
let targetPath = if output.len > 0: output else: extractFilename(url)
let casManager = newCasManager("~/.nip/cas", "/var/lib/nip/cas")
if resume:
return await fetchWithResume(url, targetPath, casManager, DEFAULT_CHUNK_SIZE, chunks)
else:
# TODO: Implement non-resumable download for comparison
return await fetchWithResume(url, targetPath, casManager, DEFAULT_CHUNK_SIZE, chunks)
# =============================================================================
# Export main functions
# =============================================================================
export ChunkState, DownloadChunk, DownloadSession, FetchProgress, FetchResult
export ProgressEvent, subscribeProgress, emitProgress
export newDownloadSession, saveSessionState, loadSessionState
export getContentLength, downloadChunk, initializeChunks, downloadWithResume
export fetchWithResume, fetchBinaryPackage, nipFetchCommand