49 lines
2.2 KiB
JavaScript
49 lines
2.2 KiB
JavaScript
class RequestQueue {
|
|
constructor(getConfig) { this.getConfig=getConfig; this.active=0; this.pending=[]; this.rate=new Map(); }
|
|
get length(){ return this.pending.length; }
|
|
async run(userId, role, fn, { signal } = {}) {
|
|
const cfg=this.getConfig(); this.checkRate(userId,role,cfg);
|
|
if(this.pending.length >= cfg.max_queue_length) throw Object.assign(new Error("AI is busy right now. Try again in a moment."),{code:"QUEUE_FULL"});
|
|
if (signal?.aborted) throw cancelledError();
|
|
const queuedAt=Date.now();
|
|
return new Promise((resolve,reject)=>{
|
|
const job={fn,resolve,reject,queuedAt,signal,onAbort:null};
|
|
this.pending.push(job);
|
|
if (signal) {
|
|
job.onAbort=()=>{
|
|
const index=this.pending.indexOf(job);
|
|
if(index>=0)this.pending.splice(index,1);
|
|
reject(cancelledError());
|
|
};
|
|
signal.addEventListener("abort",job.onAbort,{once:true});
|
|
if(signal.aborted){job.onAbort();return;}
|
|
}
|
|
this.drain();
|
|
});
|
|
}
|
|
checkRate(userId,role,cfg) {
|
|
if(role==="admin" && cfg.admin_bypass_rate_limit) return;
|
|
const now=Date.now(), key=`${role}:${userId}`, rows=(this.rate.get(key)||[]).filter(t=>now-t<60000);
|
|
if(rows.length >= cfg.per_user_requests_per_minute) {
|
|
const retryAfter = Math.max(1, Math.ceil((60000 - (now - rows[0])) / 1000));
|
|
throw Object.assign(
|
|
new Error(`AI rate limit reached. Try again in ${retryAfter}s.`),
|
|
{ code:"RATE_LIMIT", retry_after_seconds: retryAfter }
|
|
);
|
|
}
|
|
rows.push(now); this.rate.set(key,rows);
|
|
}
|
|
drain(){
|
|
const limit=Math.max(1,Number(this.getConfig().concurrency)||1);
|
|
while(this.active<limit && this.pending.length){
|
|
const job=this.pending.shift();
|
|
if(job.signal?.aborted){job.reject(cancelledError());continue;}
|
|
if(job.onAbort)job.signal.removeEventListener("abort",job.onAbort);
|
|
this.active++;
|
|
Promise.resolve().then(()=>job.fn(Date.now()-job.queuedAt)).then(job.resolve,job.reject).finally(()=>{this.active--;this.drain();});
|
|
}
|
|
}
|
|
}
|
|
function cancelledError(){return Object.assign(new Error("Assistant request was cancelled."),{name:"AbortError",code:"REQUEST_CANCELLED"});}
|
|
module.exports = { RequestQueue };
|