Lumi/plugins/lumi_ai/backend/runtime_manager.js
2026-06-11 06:35:43 +02:00

291 lines
17 KiB
JavaScript

const fs = require("fs");
const path = require("path");
const net = require("net");
const os = require("os");
const crypto = require("crypto");
const { spawn } = require("child_process");
const { resolveData } = require("./paths");
const { getRuntimeState, saveRuntimeState } = require("./config_manager");
const { normalizeExitCode, classifyLaunchError } = require("./error_codes");
const { persistDiagnostic, getLatestDiagnostic, tail } = require("./diagnostics");
class RuntimeManager {
constructor({ getConfig, getModel, runtimeManifest, onCrash, onDiagnostic }) {
Object.assign(this, { getConfig, getModel, runtimeManifest, onCrash, onDiagnostic });
this.child = null;
this.port = null;
this.startedAt = null;
this.lastError = null;
this.lastSelfTest = null;
}
findBinary() {
return findRecursive(resolveData("runtime"), process.platform === "win32" ? "llama-server.exe" : "llama-server");
}
modelPath() {
const model = this.getModel(this.getConfig().selected_model_id);
return model ? resolveData("models", model.filename) : null;
}
status() {
const binary = this.findBinary();
const model = this.modelPath();
return {
state: this.child && !this.child.killed ? "running" : this.lastError ? "error" : "stopped",
runtime_installed: Boolean(binary),
runtime_usable: this.lastSelfTest?.success ?? null,
model_downloaded: Boolean(model && fs.existsSync(model)),
port: this.port,
pid: this.child?.pid || null,
uptime_ms: this.startedAt ? Date.now() - this.startedAt : 0,
last_error: this.lastError,
last_self_test: this.lastSelfTest,
executable_path: binary,
working_directory: binary ? path.dirname(binary) : null,
model_path: model,
latest_diagnostic: getLatestDiagnostic()
};
}
async selfTest() {
const binary = this.findBinary();
if (!binary) return this.failDiagnostic("executable_missing", "RUNTIME_MISSING", "Runtime executable was not found.", { remediation_steps: ["Download or reinstall the managed runtime."] });
const installation = this.verifyRuntimeInstallation();
if (!installation.success) return this.failDiagnostic(installation.category, "INSTALL_VALIDATION_FAILED", installation.message, installation);
const result = await runCaptured(binary, ["--help"], path.dirname(binary), 10000);
fs.writeFileSync(resolveData("logs", "runtime-selftest.log"), `${result.stdout}\n${result.stderr}`.trim());
if (result.error) {
const decoded = classifyLaunchError(result.error);
return this.failDiagnostic(decoded.category, decoded.code, result.error.message, { ...decoded, executable_path: binary, working_directory: path.dirname(binary), command_args: ["--help"], stdout_tail: tail(result.stdout), stderr_tail: tail(result.stderr) });
}
if (result.timedOut) return this.failDiagnostic("self_test_timeout", "SELF_TEST_TIMEOUT", "Runtime self-test exceeded 10 seconds.", { executable_path: binary, working_directory: path.dirname(binary), command_args: ["--help"], stdout_tail: tail(result.stdout), stderr_tail: tail(result.stderr) });
if (result.code !== 0 || !/llama|usage|server|options/i.test(`${result.stdout}\n${result.stderr}`)) {
const decoded = normalizeExitCode(result.code, result.signal);
return this.failDiagnostic(decoded.category, decoded.code, "Runtime self-test failed.", { ...decoded, executable_path: binary, working_directory: path.dirname(binary), command_args: ["--help"], stdout_tail: tail(result.stdout), stderr_tail: tail(result.stderr) });
}
this.lastSelfTest = { success: true, timestamp: new Date().toISOString(), executable_path: binary, code: result.code };
this.lastError = null;
this.onDiagnostic?.({ kind: "runtime_self_test", status: "success" });
return this.lastSelfTest;
}
verifyRuntimeInstallation() {
const binary = this.findBinary();
if (!binary) return { success: false, category: "executable_missing", message: "Runtime executable was not found." };
const runtimeDir = resolveData("runtime");
const size = folderSize(runtimeDir);
if (size < 1024 * 1024) return { success: false, category: "incomplete_extraction", message: "Extracted runtime folder is unexpectedly small.", executable_path: binary, runtime_folder_size: size };
if (process.platform !== "win32") {
try { fs.accessSync(binary, fs.constants.X_OK); } catch { return { success: false, category: "permission_denied", message: "Runtime executable bit is not set.", executable_path: binary, runtime_folder_size: size }; }
}
if (process.platform === "win32") {
const dlls = findFiles(runtimeDir, (name) => name.toLowerCase().endsWith(".dll"));
if (!dlls.length) return { success: false, category: "missing_dependency", message: "No runtime DLL files were found after extraction.", executable_path: binary, runtime_folder_size: size };
return { success: true, executable_path: binary, runtime_folder_size: size, dll_count: dlls.length };
}
return { success: true, executable_path: binary, runtime_folder_size: size };
}
async verifyModel() {
const model = this.getModel(this.getConfig().selected_model_id);
const file = this.modelPath();
if (!model || !file || !fs.existsSync(file)) return { success: false, category: "model_missing", message: "Selected model file is missing." };
const stat = fs.statSync(file);
if (stat.size !== model.size) return { success: false, category: "model_size_mismatch", message: `Expected ${model.size} bytes, found ${stat.size}.` };
const header = Buffer.alloc(4);
const descriptor = fs.openSync(file, "r");
try { fs.readSync(descriptor, header, 0, 4, 0); } finally { fs.closeSync(descriptor); }
if (header.toString("ascii") !== "GGUF") return { success: false, category: "model_invalid", message: "Selected file does not have a GGUF header." };
const sha256 = await hashFile(file);
if (sha256 !== model.sha256) return { success: false, category: "model_hash_mismatch", message: "Selected model SHA-256 does not match the manifest.", sha256 };
return { success: true, file, size: stat.size, sha256 };
}
async start({ resume = false } = {}) {
if (this.child && !this.child.killed) return this.status();
this.onDiagnostic?.({ kind: "runtime_start", status: "attempt" });
const selfTest = await this.selfTest();
if (!selfTest.success) {
this.onDiagnostic?.({ kind: "runtime_start", status: "failed", category: selfTest.category });
throw new Error(selfTest.message || "Runtime self-test failed.");
}
const modelValidation = await this.verifyModel();
if (!modelValidation.success) {
const diagnostic = this.failDiagnostic(modelValidation.category, "MODEL_VALIDATION_FAILED", modelValidation.message, { model_path: this.modelPath() });
saveRuntimeState({ ...getRuntimeState(), desired_state: "stopped", last_known_state: "error", last_stop_reason: modelValidation.category, last_manual_stop: false, last_crashed: false, last_diagnostic_category: modelValidation.category });
throw new Error(diagnostic.message);
}
const binary = this.findBinary();
const model = this.modelPath();
this.port = await freePort();
const cfg = this.getConfig();
const threads = Number(cfg.threads) > 0 ? Number(cfg.threads) : os.cpus().length;
const args = ["--host", "127.0.0.1", "--port", String(this.port), "-m", model, "-c", String(cfg.context_size || 4096), "-t", String(threads)];
const logPath = resolveData("logs", `runtime-${Date.now()}.log`);
const log = fs.openSync(logPath, "a");
const child = spawn(binary, args, { cwd: path.dirname(binary), stdio: ["ignore", log, log], windowsHide: true, shell: false });
fs.closeSync(log);
this.child = child;
this.startedAt = Date.now();
this.lastError = null;
child.once("error", (error) => {
child.__spawnFailed = true;
const decoded = classifyLaunchError(error);
this.failDiagnostic(decoded.category, decoded.code, error.message, { ...decoded, executable_path: binary, working_directory: path.dirname(binary), command_args: args, model_path: model });
if (this.child === child) this.child = null;
this.persistCrash(decoded.category, error.message, decoded.signed_exit_code);
});
child.once("exit", (code, signal) => {
const expected = child.__manualStop || child.__spawnFailed;
if (this.child === child) this.child = null;
if (!expected) {
const decoded = normalizeExitCode(code, signal);
const diagnostic = this.failDiagnostic(decoded.category, decoded.code, `Runtime exited before or after health readiness.`, { ...decoded, executable_path: binary, working_directory: path.dirname(binary), command_args: args, model_path: model });
this.persistCrash(decoded.category, diagnostic.message, decoded.signed_exit_code);
}
});
saveRuntimeState({ ...getRuntimeState(), desired_state: "running", last_known_state: "starting", last_crashed: false, last_manual_stop: false, last_stop_reason: resume ? "resuming" : "starting", selected_model_id: cfg.selected_model_id });
try {
await waitHealth(this, 45000);
saveRuntimeState({ ...getRuntimeState(), desired_state: "running", last_known_state: "running", last_crashed: false, last_manual_stop: false, last_stop_reason: resume ? "resumed" : "started", selected_model_id: cfg.selected_model_id });
this.onDiagnostic?.({ kind: "runtime_start", status: "success", model_load_ms: Date.now() - this.startedAt });
return this.status();
} catch (error) {
if (this.child) await this.stop({ manual: false, reason: "health_timeout" });
const existing = getLatestDiagnostic();
const preserveProcessExit = error.category === "process_exited_before_health" && existing?.raw_exit_code != null;
if (!preserveProcessExit) {
this.failDiagnostic(error.category || "health_timeout", "RUNTIME_HEALTH_FAILED", error.message, { executable_path: binary, working_directory: path.dirname(binary), command_args: args, model_path: model });
saveRuntimeState({ ...getRuntimeState(), desired_state: "stopped", last_known_state: "error", last_stop_reason: error.category || "health_timeout", last_manual_stop: false, last_crashed: false, last_diagnostic_category: error.category || "health_timeout" });
} else {
error.message = `${existing.code}: ${existing.message}`;
}
this.onDiagnostic?.({ kind: "runtime_start", status: "failed", category: error.category || "health_timeout" });
throw error;
}
}
failDiagnostic(category, code, message, extra = {}) {
this.lastError = message;
this.lastSelfTest = category.startsWith("self_test") || code === "RUNTIME_MISSING" || extra.command_args?.[0] === "--help" ? { success: false, category, code, message } : this.lastSelfTest;
const diagnostic = persistDiagnostic({ category, code, message, ...extra });
if (extra.command_args?.[0] === "--help" || code === "RUNTIME_MISSING" || category === "self_test_timeout") {
saveRuntimeState({ ...getRuntimeState(), desired_state: "stopped", last_known_state: "error", last_stop_reason: "self_test_failed", last_manual_stop: false, last_crashed: false, last_diagnostic_category: category, last_exit_code: extra.signed_exit_code ?? null });
this.onDiagnostic?.({ kind: "runtime_self_test", status: "failed", category, code });
}
this.onDiagnostic?.({ kind: "runtime_diagnostic", status: "failed", category, code });
return { success: false, ...diagnostic };
}
persistCrash(category, message, exitCode) {
saveRuntimeState({ ...getRuntimeState(), desired_state: "stopped", last_known_state: "crashed", last_crashed: true, last_stop_reason: "runtime_crash", last_manual_stop: false, last_exit_code: exitCode ?? null, last_diagnostic_category: category });
this.onCrash?.(message);
}
async stop({ manual = true, reason = "manual_stop" } = {}) {
const wasRunning = Boolean(this.child && !this.child.killed);
if (this.child) {
const child = this.child;
child.__manualStop = true;
child.kill();
await waitExit(child, 10000);
if (this.child === child && !child.killed) child.kill("SIGKILL");
}
this.child = null;
this.startedAt = null;
const resumeAfterShutdown = !manual && reason === "bot_shutdown" && wasRunning;
saveRuntimeState({ ...getRuntimeState(), desired_state: resumeAfterShutdown ? "running" : "stopped", last_known_state: "stopped", last_stop_reason: reason, last_manual_stop: manual, last_crashed: false });
return this.status();
}
async restart() { await this.stop({ manual: false, reason: "restart" }); return this.start(); }
async health() {
const status = this.status();
if (status.state !== "running") return { ...status, healthy: false };
try {
const response = await fetch(`http://127.0.0.1:${this.port}/health`, { signal: AbortSignal.timeout(2000) });
if (!response.ok) return { ...status, healthy: false, health_status: "http_error", health_http_status: response.status };
try {
const body = await response.json();
return { ...status, healthy: true, health_status: "ready", health_response: body };
} catch {
return { ...status, healthy: false, health_status: "invalid_json" };
}
} catch (error) {
return { ...status, healthy: false, health_status: error.name === "TimeoutError" ? "connection_timeout" : "connection_refused" };
}
}
async infer(messages, maxTokens = 300) {
if (!this.port) throw new Error("Runtime is offline.");
const response = await fetch(`http://127.0.0.1:${this.port}/v1/chat/completions`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ model: "local", messages, max_tokens: maxTokens, temperature: 0.2 }), signal: AbortSignal.timeout(this.getConfig().request_timeout_ms || 120000) });
if (!response.ok) throw new Error(`Inference failed (${response.status})`);
return response.json();
}
}
function findRecursive(dir, name) {
if (!fs.existsSync(dir)) return null;
for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
const target = path.join(dir, entry.name);
if (entry.isFile() && entry.name === name) return target;
if (entry.isDirectory()) { const found = findRecursive(target, name); if (found) return found; }
}
return null;
}
function freePort() {
return new Promise((resolve, reject) => {
const server = net.createServer();
server.listen(0, "127.0.0.1", () => { const port = server.address().port; server.close(() => resolve(port)); });
server.on("error", reject);
});
}
function runCaptured(executable, args, cwd, timeoutMs) {
return new Promise((resolve) => {
const child = spawn(executable, args, { cwd, windowsHide: true, shell: false });
let stdout = "", stderr = "", settled = false, timedOut = false, timer;
const finish = (result) => { if (settled) return; settled = true; clearTimeout(timer); resolve({ stdout, stderr, timedOut, ...result }); };
child.stdout.on("data", (chunk) => { stdout = tail(stdout + chunk, 12000); });
child.stderr.on("data", (chunk) => { stderr = tail(stderr + chunk, 12000); });
child.once("error", (error) => finish({ error }));
child.once("exit", (code, signal) => finish({ code, signal }));
timer = setTimeout(() => { timedOut = true; child.kill(); }, timeoutMs);
});
}
async function waitHealth(manager, timeout) {
const end = Date.now() + timeout;
let lastCategory = "connection_refused";
while (Date.now() < end) {
if (!manager.child) throw Object.assign(new Error("Runtime process exited before health became ready."), { category: "process_exited_before_health" });
try {
const response = await fetch(`http://127.0.0.1:${manager.port}/health`, { signal: AbortSignal.timeout(2000) });
if (!response.ok) lastCategory = "http_error";
else {
try { await response.json(); return; }
catch { lastCategory = "invalid_json"; }
}
} catch (error) {
lastCategory = error.name === "TimeoutError" ? "connection_timeout" : "connection_refused";
}
await new Promise((resolve) => setTimeout(resolve, 500));
}
throw Object.assign(new Error(`Runtime process remained alive but health did not become ready within 45 seconds (${lastCategory}).`), { category: lastCategory === "connection_refused" ? "model_load_timeout" : lastCategory });
}
function waitExit(child, timeout) {
return new Promise((resolve) => {
if (child.exitCode != null) return resolve();
const timer = setTimeout(resolve, timeout);
child.once("exit", () => { clearTimeout(timer); resolve(); });
});
}
async function hashFile(file) {
const hash = crypto.createHash("sha256");
for await (const chunk of fs.createReadStream(file)) hash.update(chunk);
return hash.digest("hex");
}
function folderSize(dir) {
if (!fs.existsSync(dir)) return 0;
return fs.readdirSync(dir, { withFileTypes: true }).reduce((total, entry) => {
const target = path.join(dir, entry.name);
return total + (entry.isDirectory() ? folderSize(target) : entry.isFile() ? fs.statSync(target).size : 0);
}, 0);
}
function findFiles(dir, predicate) {
if (!fs.existsSync(dir)) return [];
return fs.readdirSync(dir, { withFileTypes: true }).flatMap((entry) => {
const target = path.join(dir, entry.name);
return entry.isDirectory() ? findFiles(target, predicate) : entry.isFile() && predicate(entry.name) ? [target] : [];
});
}
module.exports = { RuntimeManager, runCaptured };