Lumi/plugins/lumi_ai/backend/downloader.js
2026-06-25 14:10:04 +02:00

374 lines
14 KiB
JavaScript

const fs = require("fs");
const crypto = require("crypto");
const path = require("path");
const { spawn } = require("child_process");
const AdmZip = require("adm-zip");
const { resolveData } = require("./paths");
class DownloadManager {
constructor(onEvent) {
this.jobs = new Map();
this.onEvent = onEvent;
}
status(id) {
return this.jobs.get(id) || null;
}
start({
id,
url,
filename,
sha256,
kind,
archive = false,
size = 0,
runtimeMetadata = null,
dependencies = [],
beforeInstall = null,
afterInstall = null
}) {
if (this.jobs.get(id) && !["complete", "error"].includes(this.jobs.get(id).state)) {
throw new Error("Download already running.");
}
const dependencyList = normalizeDependencies(dependencies);
const requiredSize = Number(size || 0) + dependencyList.reduce((sum, item) => sum + Number(item.size || 0), 0);
if (requiredSize && freeDiskBytes() < requiredSize * 1.2) {
throw new Error("not enough disk space");
}
const job = {
id,
state: "queued",
downloaded: 0,
total: requiredSize || 0,
error: null,
started_at: Date.now(),
current_file: null,
files: [
{ filename, size: Number(size || 0), role: "primary" },
...dependencyList.map((dependency) => ({
filename: dependency.filename,
size: Number(dependency.size || 0),
role: "dependency"
}))
]
};
this.jobs.set(id, job);
this.download({
job,
url,
filename,
sha256,
kind,
archive,
size,
runtimeMetadata,
dependencies: dependencyList,
beforeInstall,
afterInstall
}).catch((error) => {
const classified = classifyError(error);
job.state = "error";
job.error = classified.message;
job.error_category = classified.category;
this.onEvent?.({
kind: "download",
status: "failed",
download_id: id,
error: job.error,
category: classified.category
});
});
return job;
}
async download({
job,
url,
filename,
sha256,
kind,
archive,
size = 0,
runtimeMetadata,
dependencies = [],
beforeInstall = null,
afterInstall = null
}) {
job.state = "downloading";
const finalDir = resolveData(kind === "model" ? "models" : "runtime");
const primaryTmp = await this.downloadOne({ job, url, filename, sha256, expectedSize: size });
if (archive) {
job.state = "extracting";
const staging = resolveData("tmp", `runtime-extract-${Date.now()}`);
fs.mkdirSync(staging, { recursive: true });
try {
await extractArchive(primaryTmp, staging, filename);
for (const dependency of dependencies) {
job.state = "downloading_dependency";
const dependencyTmp = await this.downloadOne({
job,
url: dependency.url,
filename: dependency.filename,
sha256: dependency.sha256,
expectedSize: dependency.size
});
job.state = "extracting_dependency";
await extractArchive(dependencyTmp, staging, dependency.filename);
fs.unlinkSync(dependencyTmp);
}
await makeRuntimeExecutable(staging);
const executable = findRuntimeExecutable(staging);
if (!executable) throw new Error("runtime executable missing after extraction");
let installCallbackStarted = false;
try {
if (typeof beforeInstall === "function") {
installCallbackStarted = true;
job.state = "preparing_install";
await beforeInstall({ job, finalDir, staging, runtimeMetadata });
}
job.state = "installing";
await replaceDirectoryContents(staging, finalDir);
fs.writeFileSync(path.join(finalDir, "lumi-runtime.json"), `${JSON.stringify({
backend: runtimeMetadata?.backend || "cpu",
backend_variant: runtimeMetadata?.backend_variant || null,
version: runtimeMetadata?.version || null,
target: runtimeMetadata?.target || filename,
dependencies: runtimeMetadata?.dependencies || dependencies.map((dependency) => ({
filename: dependency.filename,
sha256: dependency.sha256,
size: dependency.size || 0
})),
installed_at: new Date().toISOString()
}, null, 2)}\n`);
} finally {
if (installCallbackStarted && typeof afterInstall === "function") {
await afterInstall({ job, finalDir, staging, runtimeMetadata });
}
}
fs.unlinkSync(primaryTmp);
job.executable = findRuntimeExecutable(finalDir);
} finally {
fs.rmSync(primaryTmp, { force: true });
fs.rmSync(staging, { recursive: true, force: true });
}
} else {
const final = path.join(finalDir, filename);
if (fs.existsSync(final)) fs.unlinkSync(final);
fs.renameSync(primaryTmp, final);
}
job.state = "complete";
job.finished_at = Date.now();
job.sha256 = sha256 ? String(sha256).toLowerCase() : null;
this.onEvent?.({
kind: "download",
status: "success",
download_id: job.id,
sha256: job.sha256,
duration_ms: job.finished_at - job.started_at
});
}
async downloadOne({ job, url, filename, sha256, expectedSize = 0 }) {
const tmp = resolveData("tmp", `${filename}.part`);
let existing = fs.existsSync(tmp) ? fs.statSync(tmp).size : 0;
const normalizedExpected = Number(expectedSize || 0);
if (existing && normalizedExpected && existing > normalizedExpected) {
fs.unlinkSync(tmp);
existing = 0;
}
if (existing && normalizedExpected && existing === normalizedExpected) {
job.current_file = filename;
job.state = "verifying";
const actual = await hashFile(tmp);
if (actual === String(sha256 || "").toLowerCase()) {
job.downloaded += existing;
return tmp;
}
fs.unlinkSync(tmp);
existing = 0;
}
const headers = existing ? { Range: `bytes=${existing}-` } : {};
const response = await fetch(url, { headers });
if (response.status === 416 && existing) {
job.state = "verifying";
const actual = await hashFile(tmp);
if (actual === String(sha256 || "").toLowerCase()) {
job.downloaded += existing;
return tmp;
}
fs.unlinkSync(tmp);
throw new Error("hash mismatch");
}
if (!response.ok && response.status !== 206) {
throw new Error(`source unavailable (${response.status})`);
}
const resumed = existing > 0 && response.status === 206;
const contentLength = Number(response.headers.get("content-length") || 0);
const fileTotal = contentLength + (resumed ? existing : 0);
if (!job.total && fileTotal) job.total = fileTotal;
job.current_file = filename;
const stream = fs.createWriteStream(tmp, { flags: resumed ? "a" : "w" });
if (resumed) job.downloaded += existing;
for await (const chunk of response.body) {
if (!stream.write(chunk)) await new Promise((resolve) => stream.once("drain", resolve));
job.downloaded += chunk.length;
}
await new Promise((resolve, reject) => stream.end((error) => error ? reject(error) : resolve()));
job.state = "verifying";
const actual = await hashFile(tmp);
if (actual !== String(sha256 || "").toLowerCase()) {
fs.unlinkSync(tmp);
throw new Error("hash mismatch");
}
return tmp;
}
}
function normalizeDependencies(dependencies) {
if (!Array.isArray(dependencies)) return [];
return dependencies.filter(Boolean).map((dependency) => {
if (!dependency.url || !dependency.filename || !dependency.sha256) {
throw new Error(`Runtime dependency is incomplete: ${dependency.filename || "unknown"}`);
}
return {
...dependency,
size: Number(dependency.size || 0)
};
});
}
async function replaceDirectoryContents(staging, finalDir) {
fs.mkdirSync(finalDir, { recursive: true });
const backup = resolveData("tmp", `runtime-replace-backup-${Date.now()}`);
fs.mkdirSync(backup, { recursive: true });
const movedOld = [];
const installedNew = [];
try {
for (const entry of fs.readdirSync(finalDir)) {
const from = path.join(finalDir, entry);
const to = path.join(backup, entry);
await retryFileOperation(() => fs.renameSync(from, to));
movedOld.push({ from, to });
}
for (const entry of fs.readdirSync(staging)) {
const from = path.join(staging, entry);
const to = path.join(finalDir, entry);
await retryFileOperation(() => fs.renameSync(from, to));
installedNew.push(to);
}
} catch (error) {
for (const target of installedNew.reverse()) {
try { fs.rmSync(target, { recursive: true, force: true, maxRetries: 5, retryDelay: 200 }); } catch {}
}
for (const entry of movedOld.reverse()) {
try {
if (!fs.existsSync(entry.from) && fs.existsSync(entry.to)) fs.renameSync(entry.to, entry.from);
} catch {}
}
throw error;
} finally {
try { fs.rmSync(backup, { recursive: true, force: true, maxRetries: 5, retryDelay: 200 }); } catch {}
}
}
async function retryFileOperation(operation, attempts = 8, delayMs = 250) {
let lastError = null;
for (let attempt = 0; attempt < attempts; attempt += 1) {
try {
return operation();
} catch (error) {
lastError = error;
if (!isRetriableFileError(error) || attempt === attempts - 1) throw error;
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
throw lastError;
}
function isRetriableFileError(error) {
return ["EBUSY", "EPERM", "EACCES", "ENOTEMPTY"].includes(error?.code);
}
async function makeRuntimeExecutable(dir){
if(process.platform==="win32")return;
for(const entry of fs.readdirSync(dir,{withFileTypes:true})){
const target=path.join(dir,entry.name);
if(entry.isDirectory())await makeRuntimeExecutable(target);
else if(entry.name==="llama-server")fs.chmodSync(target,0o755);
}
}
function findRuntimeExecutable(dir){
const name=process.platform==="win32"?"llama-server.exe":"llama-server";
for(const entry of fs.readdirSync(dir,{withFileTypes:true})){
const target=path.join(dir,entry.name);
if(entry.isFile()&&entry.name===name)return target;
if(entry.isDirectory()){const found=findRuntimeExecutable(target);if(found)return found;}
}
return null;
}
async function hashFile(file){const hash=crypto.createHash("sha256");for await(const chunk of fs.createReadStream(file))hash.update(chunk);return hash.digest("hex");}
async function extractArchive(file,dest,name){
if(name.endsWith(".zip")){
const zip=new AdmZip(file);
for(const entry of zip.getEntries())validateArchivePath(entry.entryName);
zip.extractAllTo(dest,true);
return;
}
const entries=await capture("tar",["-tzf",file]);
if(entries.code!==0)throw new Error(`archive corrupt (${entries.code})`);
for(const entry of entries.stdout.split(/\r?\n/).filter(Boolean))validateArchivePath(entry);
await new Promise((resolve,reject)=>{const child=spawn("tar",["-xzf",file,"-C",dest],{windowsHide:true,shell:false});child.on("exit",c=>c===0?resolve():reject(new Error(`archive extraction failed (${c})`)));child.on("error",reject);});
}
function validateArchivePath(entry){
const normalized=path.posix.normalize(String(entry).replace(/\\/g,"/"));
if(path.posix.isAbsolute(normalized)||normalized===".."||normalized.startsWith("../"))throw new Error("archive path traversal");
}
function capture(command,args){return new Promise((resolve,reject)=>{const child=spawn(command,args,{windowsHide:true,shell:false});let stdout="",stderr="";child.stdout.on("data",c=>stdout+=c);child.stderr.on("data",c=>stderr+=c);child.on("error",reject);child.on("exit",code=>resolve({code,stdout,stderr}));});}
function classifyError(error){
const message=`${error?.code || ""} ${error?.message || String(error)}`.trim();
if(/ENOSPC|not enough disk/i.test(message))return{category:"disk_full",message:"Not enough disk space."};
if(/EACCES|EPERM|EBUSY|ENOTEMPTY|resource busy|permission denied/i.test(message))return{category:"permission_denied",message:"Runtime files are locked or permission was denied. Stop Lumi AI runtimes and retry."};
if(/hash mismatch/i.test(message))return{category:"hash_mismatch",message:"Downloaded file failed SHA-256 verification."};
if(/archive path traversal/i.test(message))return{category:"archive_path_traversal",message:"Archive contains an unsafe path."};
if(/archive corrupt|extraction failed/i.test(message))return{category:"archive_corrupt",message};
if(/\(404\)/.test(message))return{category:"http_404",message:"Download source was not found (404)."};
if(/\(403\)/.test(message))return{category:"http_403",message:"Download source denied access (403)."};
if(/\(429\)/.test(message))return{category:"http_429",message:"Download source rate limit reached (429)."};
if(/\(5\d\d\)/.test(message))return{category:"server_error",message};
if(/timeout|abort/i.test(message))return{category:"timeout",message:"Download timed out."};
if(/fetch|network|ENOTFOUND|EAI_AGAIN/i.test(message))return{category:"network_unavailable",message};
if(/runtime executable missing/i.test(message))return{category:"install_validation_failed",message};
return{category:"download_failed",message};
}
function freeDiskBytes(){try{const stat=fs.statfsSync(resolveData("tmp"));return Number(stat.bavail)*Number(stat.bsize);}catch{return Number.MAX_SAFE_INTEGER;}}
module.exports={DownloadManager,hashFile,validateArchivePath,classifyError};