Lumi/plugins/lumi_ai/backend/request_jobs.js
2026-06-12 19:27:43 +02:00

156 lines
5.3 KiB
JavaScript

const crypto = require("crypto");
class AssistantRequestJobs {
constructor({ ttlMs = 15 * 60 * 1000, maxJobs = 200 } = {}) {
this.ttlMs = ttlMs;
this.maxJobs = maxJobs;
this.jobs = new Map();
}
create({ userId, execute, metadata = {} }) {
this.prune();
const id = crypto.randomUUID();
const controller = new AbortController();
const job = {
id,
user_id: String(userId),
state: "queued",
stage: "queued",
created_at: Date.now(),
updated_at: Date.now(),
result: null,
error: null,
retry_after_seconds: null,
details: sanitizeDetails(metadata),
controller,
frontend_soft_timeout_at: null
};
this.jobs.set(id, job);
setImmediate(async () => {
try {
const result = await execute(
(stage, details = {}) => this.update(id, stage, details),
controller.signal
);
if (job.state === "cancelled") return;
Object.assign(job, {
state: "complete",
stage: "done",
result,
updated_at: Date.now()
});
} catch (error) {
if (job.state === "cancelled") return;
const cancelled = error.code === "REQUEST_CANCELLED" || error.name === "AbortError";
Object.assign(job, {
state: cancelled ? "cancelled" : "error",
stage: cancelled ? "cancelled" : "error",
error: cancelled ? "Assistant request was cancelled." : error.message || "Lumi Assistant could not complete the request.",
retry_after_seconds: error.retry_after_seconds || null,
details: sanitizeDetails({ ...job.details, error_code: error.code }),
updated_at: Date.now()
});
}
});
return this.publicJob(job);
}
update(id, stage, details = {}) {
const job = this.jobs.get(id);
if (!job || ["complete", "error", "cancelled"].includes(job.state)) return null;
job.state = stage === "queued" ? "queued" : "running";
job.stage = stage;
job.updated_at = Date.now();
job.details = sanitizeDetails({ ...job.details, ...details });
return this.publicJob(job);
}
get(id, userId) {
this.prune();
const job = this.jobs.get(id);
if (!job || job.user_id !== String(userId)) return null;
return this.publicJob(job);
}
cancel(id, userId, { admin = false } = {}) {
this.prune();
const job = this.jobs.get(id);
if (!job || (!admin && job.user_id !== String(userId))) return null;
if (["complete", "error", "cancelled"].includes(job.state)) return this.publicJob(job);
job.state = "cancelled";
job.stage = "cancelled";
job.error = "Assistant request was cancelled.";
job.updated_at = Date.now();
job.controller.abort();
return this.publicJob(job);
}
markSoftTimeout(id, userId) {
const job = this.jobs.get(id);
if (!job || job.user_id !== String(userId)) return null;
job.frontend_soft_timeout_at = Date.now();
job.details = sanitizeDetails({ ...job.details, frontend_soft_timeout: true });
job.updated_at = Date.now();
return this.publicJob(job);
}
diagnostics(limit = 25) {
this.prune();
return [...this.jobs.values()]
.sort((left, right) => right.created_at - left.created_at)
.slice(0, limit)
.map((job) => ({ ...this.publicJob(job), user_id: job.user_id }));
}
publicJob(job) {
return {
id: job.id,
state: job.state,
stage: job.stage,
created_at: job.created_at,
updated_at: job.updated_at,
elapsed_ms: Math.max(0, (["complete", "error", "cancelled"].includes(job.state) ? job.updated_at : Date.now()) - job.created_at),
still_running: ["queued", "running"].includes(job.state),
frontend_soft_timeout_at: job.frontend_soft_timeout_at,
details: job.details || {},
result: job.state === "complete" ? job.result : null,
error: ["error", "cancelled"].includes(job.state) ? job.error : null,
retry_after_seconds: job.retry_after_seconds
};
}
prune(now = Date.now()) {
for (const [id, job] of this.jobs) {
if (
["complete", "error", "cancelled"].includes(job.state) &&
now - job.updated_at > this.ttlMs
) this.jobs.delete(id);
}
if (this.jobs.size <= this.maxJobs) return;
const oldest = [...this.jobs.values()]
.filter((job) => ["complete", "error", "cancelled"].includes(job.state))
.sort((left, right) => left.updated_at - right.updated_at)
.slice(0, this.jobs.size - this.maxJobs);
for (const job of oldest) this.jobs.delete(job.id);
}
}
function sanitizeDetails(details) {
const output = {};
for (const key of [
"queue_position", "gate_ms", "queue_ms", "prompt_eval_ms", "generation_ms", "total_ms",
"prompt_tokens", "generated_tokens", "prompt_tps", "generation_tps", "gpu_layers",
"context_size", "batch_size", "ubatch_size", "threads", "max_output_tokens",
"max_output_tokens_used"
]) {
if (Number.isFinite(Number(details[key]))) output[key] = Number(details[key]);
}
for (const key of ["route", "route_class", "reason_code", "backend", "error_code"]) {
if (details[key] != null) output[key] = String(details[key]).slice(0, 120);
}
if (details.frontend_soft_timeout != null) output.frontend_soft_timeout = Boolean(details.frontend_soft_timeout);
return output;
}
module.exports = { AssistantRequestJobs };