const fs = require("fs"); const crypto = require("crypto"); const path = require("path"); const { spawn } = require("child_process"); const AdmZip = require("adm-zip"); const { resolveData } = require("./paths"); class DownloadManager { constructor(onEvent) { this.jobs = new Map(); this.onEvent = onEvent; } status(id) { return this.jobs.get(id) || null; } start({ id, url, filename, sha256, kind, archive = false, size = 0, runtimeMetadata = null, dependencies = [], beforeInstall = null, afterInstall = null }) { if (this.jobs.get(id) && !["complete", "error"].includes(this.jobs.get(id).state)) { throw new Error("Download already running."); } const dependencyList = normalizeDependencies(dependencies); const requiredSize = Number(size || 0) + dependencyList.reduce((sum, item) => sum + Number(item.size || 0), 0); if (requiredSize && freeDiskBytes() < requiredSize * 1.2) { throw new Error("not enough disk space"); } const job = { id, state: "queued", downloaded: 0, total: requiredSize || 0, error: null, started_at: Date.now(), current_file: null, files: [ { filename, size: Number(size || 0), role: "primary" }, ...dependencyList.map((dependency) => ({ filename: dependency.filename, size: Number(dependency.size || 0), role: "dependency" })) ] }; this.jobs.set(id, job); this.download({ job, url, filename, sha256, kind, archive, size, runtimeMetadata, dependencies: dependencyList, beforeInstall, afterInstall }).catch((error) => { const classified = classifyError(error); job.state = "error"; job.error = classified.message; job.error_category = classified.category; this.onEvent?.({ kind: "download", status: "failed", download_id: id, error: job.error, category: classified.category }); }); return job; } async download({ job, url, filename, sha256, kind, archive, size = 0, runtimeMetadata, dependencies = [], beforeInstall = null, afterInstall = null }) { job.state = "downloading"; const finalDir = resolveData(kind === "model" ? "models" : "runtime"); const primaryTmp = await this.downloadOne({ job, url, filename, sha256, expectedSize: size }); if (archive) { job.state = "extracting"; const staging = resolveData("tmp", `runtime-extract-${Date.now()}`); fs.mkdirSync(staging, { recursive: true }); try { await extractArchive(primaryTmp, staging, filename); for (const dependency of dependencies) { job.state = "downloading_dependency"; const dependencyTmp = await this.downloadOne({ job, url: dependency.url, filename: dependency.filename, sha256: dependency.sha256, expectedSize: dependency.size }); job.state = "extracting_dependency"; await extractArchive(dependencyTmp, staging, dependency.filename); fs.unlinkSync(dependencyTmp); } await makeRuntimeExecutable(staging); const executable = findRuntimeExecutable(staging); if (!executable) throw new Error("runtime executable missing after extraction"); let installCallbackStarted = false; try { if (typeof beforeInstall === "function") { installCallbackStarted = true; job.state = "preparing_install"; await beforeInstall({ job, finalDir, staging, runtimeMetadata }); } job.state = "installing"; await replaceDirectoryContents(staging, finalDir); fs.writeFileSync(path.join(finalDir, "lumi-runtime.json"), `${JSON.stringify({ backend: runtimeMetadata?.backend || "cpu", backend_variant: runtimeMetadata?.backend_variant || null, version: runtimeMetadata?.version || null, target: runtimeMetadata?.target || filename, dependencies: runtimeMetadata?.dependencies || dependencies.map((dependency) => ({ filename: dependency.filename, sha256: dependency.sha256, size: dependency.size || 0 })), installed_at: new Date().toISOString() }, null, 2)}\n`); } finally { if (installCallbackStarted && typeof afterInstall === "function") { await afterInstall({ job, finalDir, staging, runtimeMetadata }); } } fs.unlinkSync(primaryTmp); job.executable = findRuntimeExecutable(finalDir); } finally { fs.rmSync(primaryTmp, { force: true }); fs.rmSync(staging, { recursive: true, force: true }); } } else { const final = path.join(finalDir, filename); if (fs.existsSync(final)) fs.unlinkSync(final); fs.renameSync(primaryTmp, final); } job.state = "complete"; job.finished_at = Date.now(); job.sha256 = sha256 ? String(sha256).toLowerCase() : null; this.onEvent?.({ kind: "download", status: "success", download_id: job.id, sha256: job.sha256, duration_ms: job.finished_at - job.started_at }); } async downloadOne({ job, url, filename, sha256, expectedSize = 0 }) { const tmp = resolveData("tmp", `${filename}.part`); let existing = fs.existsSync(tmp) ? fs.statSync(tmp).size : 0; const normalizedExpected = Number(expectedSize || 0); if (existing && normalizedExpected && existing > normalizedExpected) { fs.unlinkSync(tmp); existing = 0; } if (existing && normalizedExpected && existing === normalizedExpected) { job.current_file = filename; job.state = "verifying"; const actual = await hashFile(tmp); if (actual === String(sha256 || "").toLowerCase()) { job.downloaded += existing; return tmp; } fs.unlinkSync(tmp); existing = 0; } const headers = existing ? { Range: `bytes=${existing}-` } : {}; const response = await fetch(url, { headers }); if (response.status === 416 && existing) { job.state = "verifying"; const actual = await hashFile(tmp); if (actual === String(sha256 || "").toLowerCase()) { job.downloaded += existing; return tmp; } fs.unlinkSync(tmp); throw new Error("hash mismatch"); } if (!response.ok && response.status !== 206) { throw new Error(`source unavailable (${response.status})`); } const resumed = existing > 0 && response.status === 206; const contentLength = Number(response.headers.get("content-length") || 0); const fileTotal = contentLength + (resumed ? existing : 0); if (!job.total && fileTotal) job.total = fileTotal; job.current_file = filename; const stream = fs.createWriteStream(tmp, { flags: resumed ? "a" : "w" }); if (resumed) job.downloaded += existing; for await (const chunk of response.body) { if (!stream.write(chunk)) await new Promise((resolve) => stream.once("drain", resolve)); job.downloaded += chunk.length; } await new Promise((resolve, reject) => stream.end((error) => error ? reject(error) : resolve())); job.state = "verifying"; const actual = await hashFile(tmp); if (actual !== String(sha256 || "").toLowerCase()) { fs.unlinkSync(tmp); throw new Error("hash mismatch"); } return tmp; } } function normalizeDependencies(dependencies) { if (!Array.isArray(dependencies)) return []; return dependencies.filter(Boolean).map((dependency) => { if (!dependency.url || !dependency.filename || !dependency.sha256) { throw new Error(`Runtime dependency is incomplete: ${dependency.filename || "unknown"}`); } return { ...dependency, size: Number(dependency.size || 0) }; }); } async function replaceDirectoryContents(staging, finalDir) { fs.mkdirSync(finalDir, { recursive: true }); const backup = resolveData("tmp", `runtime-replace-backup-${Date.now()}`); fs.mkdirSync(backup, { recursive: true }); const movedOld = []; const installedNew = []; try { for (const entry of fs.readdirSync(finalDir)) { const from = path.join(finalDir, entry); const to = path.join(backup, entry); await retryFileOperation(() => fs.renameSync(from, to)); movedOld.push({ from, to }); } for (const entry of fs.readdirSync(staging)) { const from = path.join(staging, entry); const to = path.join(finalDir, entry); await retryFileOperation(() => fs.renameSync(from, to)); installedNew.push(to); } } catch (error) { for (const target of installedNew.reverse()) { try { fs.rmSync(target, { recursive: true, force: true, maxRetries: 5, retryDelay: 200 }); } catch {} } for (const entry of movedOld.reverse()) { try { if (!fs.existsSync(entry.from) && fs.existsSync(entry.to)) fs.renameSync(entry.to, entry.from); } catch {} } throw error; } finally { try { fs.rmSync(backup, { recursive: true, force: true, maxRetries: 5, retryDelay: 200 }); } catch {} } } async function retryFileOperation(operation, attempts = 8, delayMs = 250) { let lastError = null; for (let attempt = 0; attempt < attempts; attempt += 1) { try { return operation(); } catch (error) { lastError = error; if (!isRetriableFileError(error) || attempt === attempts - 1) throw error; await new Promise((resolve) => setTimeout(resolve, delayMs)); } } throw lastError; } function isRetriableFileError(error) { return ["EBUSY", "EPERM", "EACCES", "ENOTEMPTY"].includes(error?.code); } async function makeRuntimeExecutable(dir){ if(process.platform==="win32")return; for(const entry of fs.readdirSync(dir,{withFileTypes:true})){ const target=path.join(dir,entry.name); if(entry.isDirectory())await makeRuntimeExecutable(target); else if(entry.name==="llama-server")fs.chmodSync(target,0o755); } } function findRuntimeExecutable(dir){ const name=process.platform==="win32"?"llama-server.exe":"llama-server"; for(const entry of fs.readdirSync(dir,{withFileTypes:true})){ const target=path.join(dir,entry.name); if(entry.isFile()&&entry.name===name)return target; if(entry.isDirectory()){const found=findRuntimeExecutable(target);if(found)return found;} } return null; } async function hashFile(file){const hash=crypto.createHash("sha256");for await(const chunk of fs.createReadStream(file))hash.update(chunk);return hash.digest("hex");} async function extractArchive(file,dest,name){ if(name.endsWith(".zip")){ const zip=new AdmZip(file); for(const entry of zip.getEntries())validateArchivePath(entry.entryName); zip.extractAllTo(dest,true); return; } const entries=await capture("tar",["-tzf",file]); if(entries.code!==0)throw new Error(`archive corrupt (${entries.code})`); for(const entry of entries.stdout.split(/\r?\n/).filter(Boolean))validateArchivePath(entry); await new Promise((resolve,reject)=>{const child=spawn("tar",["-xzf",file,"-C",dest],{windowsHide:true,shell:false});child.on("exit",c=>c===0?resolve():reject(new Error(`archive extraction failed (${c})`)));child.on("error",reject);}); } function validateArchivePath(entry){ const normalized=path.posix.normalize(String(entry).replace(/\\/g,"/")); if(path.posix.isAbsolute(normalized)||normalized===".."||normalized.startsWith("../"))throw new Error("archive path traversal"); } function capture(command,args){return new Promise((resolve,reject)=>{const child=spawn(command,args,{windowsHide:true,shell:false});let stdout="",stderr="";child.stdout.on("data",c=>stdout+=c);child.stderr.on("data",c=>stderr+=c);child.on("error",reject);child.on("exit",code=>resolve({code,stdout,stderr}));});} function classifyError(error){ const message=`${error?.code || ""} ${error?.message || String(error)}`.trim(); if(/ENOSPC|not enough disk/i.test(message))return{category:"disk_full",message:"Not enough disk space."}; if(/EACCES|EPERM|EBUSY|ENOTEMPTY|resource busy|permission denied/i.test(message))return{category:"permission_denied",message:"Runtime files are locked or permission was denied. Stop Lumi AI runtimes and retry."}; if(/hash mismatch/i.test(message))return{category:"hash_mismatch",message:"Downloaded file failed SHA-256 verification."}; if(/archive path traversal/i.test(message))return{category:"archive_path_traversal",message:"Archive contains an unsafe path."}; if(/archive corrupt|extraction failed/i.test(message))return{category:"archive_corrupt",message}; if(/\(404\)/.test(message))return{category:"http_404",message:"Download source was not found (404)."}; if(/\(403\)/.test(message))return{category:"http_403",message:"Download source denied access (403)."}; if(/\(429\)/.test(message))return{category:"http_429",message:"Download source rate limit reached (429)."}; if(/\(5\d\d\)/.test(message))return{category:"server_error",message}; if(/timeout|abort/i.test(message))return{category:"timeout",message:"Download timed out."}; if(/fetch|network|ENOTFOUND|EAI_AGAIN/i.test(message))return{category:"network_unavailable",message}; if(/runtime executable missing/i.test(message))return{category:"install_validation_failed",message}; return{category:"download_failed",message}; } function freeDiskBytes(){try{const stat=fs.statfsSync(resolveData("tmp"));return Number(stat.bavail)*Number(stat.bsize);}catch{return Number.MAX_SAFE_INTEGER;}} module.exports={DownloadManager,hashFile,validateArchivePath,classifyError};