feat(mcpd): VirtualLlmService rewires through durable queue (v5 Stage 2)
The in-memory `tasksById` map for inference tasks is gone. Every
inference call lands as a row in `InferenceTask`; the result POST
updates the row + emits a wakeup; the in-flight HTTP handler unblocks
on the wake. mcpd surviving a restart no longer drops in-flight tasks,
and a worker disconnecting mid-task no longer fails the caller — the
row reverts to pending and a sibling worker on the same pool drains it.
Wake tasks (publisher control messages, not inference) keep their own
small in-memory map (`wakeTasks`). They're millisecond-scoped and
don't benefit from durability — a missed wake on restart just means
the next infer fires a fresh wake.
Behavioral changes worth flagging:
- Worker disconnect mid-task: WAS reject ref.done with "publisher
disconnected"; NOW revert claimed/running rows to pending. Original
caller's ref.done keeps waiting up to INFER_AWAIT_TIMEOUT_MS (10
min); whichever worker delivers the result fulfills it.
- bindSession drains pending tasks for the session's pool keys. So
tasks queued while no worker was up automatically get dispatched
when one shows up. The drain matches by *effective pool key*
(poolName ?? name) — tasks queued against vllm-alice get drained
by any session whose owned Llms share alice's pool.
- New `failFast: true` option on enqueueInferTask (default: false).
Existing callers that NEED fast-fail get it explicitly:
- Direct `/api/v1/llms/<name>/infer` route: caller pinned a
specific Llm and wants 503 immediately if the publisher is
offline; queueing for an unknown future worker would surprise.
- chat.service pool failover loop: it iterates pool candidates
and needs each candidate's transport failure to surface fast.
Without failFast, a downed pool member would absorb the call
into the queue and the loop would wait 10 min before trying
the next.
The async API route (Stage 3) leaves failFast=false — that's the
whole point of the durable queue path.
- VirtualLlmService now requires an InferenceTaskService dep at
construction. Older test wirings that didn't pass it get a clear
"InferenceTaskService not wired" error from enqueueInferTask
rather than a confusing in-memory stub.
Tests:
- 12 existing virtual-llm-service tests updated for the new
semantics: "rejects when no session" → "queues durably"; "rejects
when row inactive" → "still queues (pool may have a sibling)";
"unbindSession rejects in-flight tasks" → "reverts to pending".
Wake-task probing now uses `wakeTasks` instead of `tasksById`.
- 3 new v5-specific tests: drain-on-bind matches by effective pool
key (not just name); enqueue without a session keeps the row
pending; completeTask via the result-route updates the DB and
emits the wakeup that resolves ref.done.
- chat-service-virtual-llm + llm-infer-route assertions updated to
expect the new {failFast: true} option arg.
mcpd 884/884 (was 881; +3 v5 cases). mcplocal 723/723. Full smoke
suite 144/144 against the deployed queue-backed mcpd.
Stage 3 (next): expose the durable queue via async API endpoints.
POST /api/v1/inference-tasks (enqueue with failFast=false), GET
/api/v1/inference-tasks/:id (poll), GET /api/v1/inference-tasks/:id/stream
(SSE), DELETE /api/v1/inference-tasks/:id (cancel). New `tasks` RBAC
resource.
This commit is contained in:
@@ -32,6 +32,8 @@ import { SecretBackendRotatorLoop } from './services/secret-backend-rotator-loop
|
||||
import { registerSecretBackendRotateRoutes } from './routes/secret-backend-rotate.js';
|
||||
import { LlmRepository } from './repositories/llm.repository.js';
|
||||
import { LlmService } from './services/llm.service.js';
|
||||
import { InferenceTaskRepository } from './repositories/inference-task.repository.js';
|
||||
import { InferenceTaskService } from './services/inference-task.service.js';
|
||||
import { AgentRepository } from './repositories/agent.repository.js';
|
||||
import { ChatRepository } from './repositories/chat.repository.js';
|
||||
import { AgentService } from './services/agent.service.js';
|
||||
@@ -463,10 +465,17 @@ async function main(): Promise<void> {
|
||||
const personalityRepo = new PersonalityRepository(prisma);
|
||||
const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo);
|
||||
const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo);
|
||||
// v5: durable inference task queue. VirtualLlmService persists infer
|
||||
// tasks here; the result-route updates them; an in-process emitter
|
||||
// wakes blocked HTTP handlers when results land.
|
||||
const inferenceTaskRepo = new InferenceTaskRepository(prisma);
|
||||
const inferenceTaskService = new InferenceTaskService(inferenceTaskRepo);
|
||||
// Virtual-provider state machine (kind=virtual rows for both Llms and
|
||||
// Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade.
|
||||
// v5 wires inferenceTaskService — enqueueInferTask now persists rows,
|
||||
// worker disconnect reverts claimed rows, worker bind drains pending.
|
||||
// The 60-s GC ticker is started below after `app.listen`.
|
||||
const virtualLlmService = new VirtualLlmService(llmRepo, agentService);
|
||||
const virtualLlmService = new VirtualLlmService(llmRepo, agentService, inferenceTaskService);
|
||||
// ChatService needs the proxy + project repo via the ChatToolDispatcher
|
||||
// bridge. The dispatcher's logger references `app.log`, which is not
|
||||
// constructed until further down — `chatService` itself is built right
|
||||
|
||||
@@ -98,8 +98,12 @@ export function registerLlmInferRoutes(
|
||||
return { error: 'virtual LLM dispatch unavailable (server misconfiguration)' };
|
||||
}
|
||||
try {
|
||||
// Direct infer is the v1-v4 sync path: caller pinned to a
|
||||
// specific Llm name and wants a fast 503 if the publisher is
|
||||
// offline. The async durable-queue API (Stage 3) is for callers
|
||||
// that explicitly opt into queueing.
|
||||
if (!streaming) {
|
||||
const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false);
|
||||
const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false, { failFast: true });
|
||||
const result = await ref.done;
|
||||
reply.code(result.status);
|
||||
audit(result.status);
|
||||
@@ -113,7 +117,7 @@ export function registerLlmInferRoutes(
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
});
|
||||
const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true);
|
||||
const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true, { failFast: true });
|
||||
const unsubscribe = ref.onChunk((chunk) => writeSseChunk(reply, chunk.data));
|
||||
try {
|
||||
await ref.done;
|
||||
|
||||
@@ -462,10 +462,16 @@ export class ChatService {
|
||||
// iterator. Chunks land on the queue from the SSE relay; the
|
||||
// generator drains them in order. ref.done resolves when the
|
||||
// publisher emits its `[DONE]` marker.
|
||||
//
|
||||
// failFast: true — the chat dispatcher's pool failover loop relies
|
||||
// on a fast "transport error" surfacing so it can iterate to the
|
||||
// next candidate. Without it, a downed pool member would queue the
|
||||
// task and the loop would wait 10 min before trying the next one.
|
||||
const ref = await this.virtualLlms.enqueueInferTask(
|
||||
candidate.llmName,
|
||||
{ ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true },
|
||||
true,
|
||||
{ failFast: true },
|
||||
);
|
||||
const queue: Array<{ data: string; done?: boolean }> = [];
|
||||
let resolveTick: (() => void) | null = null;
|
||||
@@ -544,6 +550,7 @@ export class ChatService {
|
||||
candidate.llmName,
|
||||
this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }),
|
||||
false,
|
||||
{ failFast: true },
|
||||
);
|
||||
return ref.done;
|
||||
}
|
||||
|
||||
@@ -55,6 +55,13 @@ export interface IInferenceTaskService {
|
||||
markRunning(taskId: string): Promise<InferenceTask | null>;
|
||||
/** Worker pushed a streaming chunk. Wakes up the in-process waiter; no DB write. */
|
||||
pushChunk(taskId: string, chunk: InferenceTaskChunk): boolean;
|
||||
/**
|
||||
* Subscribe directly to a task's chunk stream as a callback. Returns
|
||||
* an unsubscribe function. Used by VirtualLlmService's legacy
|
||||
* PendingTaskRef.onChunk bridge — the high-level `waitFor` API is
|
||||
* better when you need an async iterator + done promise together.
|
||||
*/
|
||||
subscribeChunksUnsafe(taskId: string, cb: (chunk: InferenceTaskChunk) => void): () => void;
|
||||
/** Worker POSTed the final result. Persists the body + flips to `completed`. */
|
||||
complete(taskId: string, responseBody: Record<string, unknown> | null): Promise<InferenceTask | null>;
|
||||
/** Worker (or mcpd) marked the task as failed. */
|
||||
@@ -201,6 +208,13 @@ export class InferenceTaskService implements IInferenceTaskService {
|
||||
return this.events.emit(`chunk:${taskId}`, chunk);
|
||||
}
|
||||
|
||||
subscribeChunksUnsafe(taskId: string, cb: (chunk: InferenceTaskChunk) => void): () => void {
|
||||
this.events.on(`chunk:${taskId}`, cb);
|
||||
return (): void => {
|
||||
this.events.off(`chunk:${taskId}`, cb);
|
||||
};
|
||||
}
|
||||
|
||||
async complete(taskId: string, responseBody: Record<string, unknown> | null): Promise<InferenceTask | null> {
|
||||
const updated = await this.repo.markCompleted(taskId, responseBody, this.clock());
|
||||
if (updated !== null) this.events.emit(`terminal:${taskId}`);
|
||||
|
||||
@@ -29,6 +29,8 @@ import type { ILlmRepository } from '../repositories/llm.repository.js';
|
||||
import type { OpenAiChatRequest } from './llm/types.js';
|
||||
import { NotFoundError } from './mcp-server.service.js';
|
||||
import type { AgentService } from './agent.service.js';
|
||||
import type { IInferenceTaskService, InferenceTaskChunk } from './inference-task.service.js';
|
||||
import { effectivePoolName } from './llm.service.js';
|
||||
|
||||
/** A virtual provider's announcement at registration time. */
|
||||
export interface RegisterProviderInput {
|
||||
@@ -81,29 +83,53 @@ export type VirtualTaskFrame =
|
||||
| { kind: 'wake'; taskId: string; llmName: string };
|
||||
|
||||
/**
|
||||
* Pending inference task. The route handler awaits `done`; the result POST
|
||||
* resolves it via `completeTask()`. The error path rejects via `failTask()`.
|
||||
* In-memory wake task. Wake is publisher-control work, not inference —
|
||||
* we don't persist wake tasks to the DB because their lifetime is
|
||||
* milliseconds and missing a wake on restart just means the next infer
|
||||
* fires a fresh wake. Inference tasks live in the durable queue (v5);
|
||||
* see InferenceTaskService.
|
||||
*/
|
||||
interface PendingTask {
|
||||
interface InMemoryWakeTask {
|
||||
taskId: string;
|
||||
sessionId: string;
|
||||
llmName: string;
|
||||
streaming: boolean;
|
||||
resolveNonStreaming: (body: unknown, status: number) => void;
|
||||
rejectNonStreaming: (err: Error) => void;
|
||||
/** For streaming tasks only; null on non-streaming. */
|
||||
pushChunk: ((chunk: { data: string; done?: boolean }) => void) | null;
|
||||
resolve: (status: number) => void;
|
||||
reject: (err: Error) => void;
|
||||
}
|
||||
|
||||
const HEARTBEAT_TIMEOUT_MS = 90_000;
|
||||
const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; // 4 h
|
||||
/**
|
||||
* v5: how long enqueueInferTask waits for the worker's terminal POST
|
||||
* before bailing. 10 minutes is generous for thinking models that
|
||||
* spend 30-90s warming up. The TASK itself stays queued past this —
|
||||
* only the in-flight HTTP handler gives up; an async-API consumer can
|
||||
* still poll the row and pick up the result later.
|
||||
*/
|
||||
const INFER_AWAIT_TIMEOUT_MS = 10 * 60 * 1000;
|
||||
|
||||
export interface EnqueueInferOptions {
|
||||
/**
|
||||
* v5: if true, throw 503 immediately when no live SSE session exists
|
||||
* for the row's session at enqueue time, instead of queueing the
|
||||
* task and waiting for a worker to show up. Used by:
|
||||
* - the direct `/api/v1/llms/<name>/infer` route (caller asked for
|
||||
* THIS specific Llm; no pool fanout at this layer)
|
||||
* - chat.service's pool failover loop (it iterates candidates and
|
||||
* needs each candidate's failure to surface fast)
|
||||
* The async `POST /api/v1/inference-tasks` API leaves this false so
|
||||
* callers explicitly opting into the durable queue get the queue.
|
||||
* Default: false (durable, queues + waits up to INFER_AWAIT_TIMEOUT_MS).
|
||||
*/
|
||||
failFast?: boolean;
|
||||
}
|
||||
|
||||
export interface IVirtualLlmService {
|
||||
register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult>;
|
||||
heartbeat(providerSessionId: string): Promise<void>;
|
||||
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void;
|
||||
unbindSession(providerSessionId: string): Promise<void>;
|
||||
enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean): Promise<PendingTaskRef>;
|
||||
enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean, options?: EnqueueInferOptions): Promise<PendingTaskRef>;
|
||||
completeTask(taskId: string, result: { status: number; body: unknown }): boolean;
|
||||
pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean;
|
||||
failTask(taskId: string, error: Error): boolean;
|
||||
@@ -121,7 +147,12 @@ export interface PendingTaskRef {
|
||||
|
||||
export class VirtualLlmService implements IVirtualLlmService {
|
||||
private readonly sessions = new Map<string, VirtualSessionHandle>();
|
||||
private readonly tasksById = new Map<string, PendingTask>();
|
||||
/**
|
||||
* Wake tasks live here, in-memory. The result POST resolves them by
|
||||
* id. Inference tasks have moved to the durable queue (v5); see
|
||||
* IInferenceTaskService.
|
||||
*/
|
||||
private readonly wakeTasks = new Map<string, InMemoryWakeTask>();
|
||||
/**
|
||||
* Dedupe concurrent wake requests for the same Llm. The first request
|
||||
* starts the wake; later requests for the same name await the same
|
||||
@@ -138,6 +169,19 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
* before deleting the Llm itself (Agent.llmId is Restrict).
|
||||
*/
|
||||
private readonly agents?: AgentService,
|
||||
/**
|
||||
* v5: durable inference task queue. Optional so older test wirings
|
||||
* (and the non-virtual chat path) compile without it; when absent,
|
||||
* enqueueInferTask falls back to a clear "task queue not wired"
|
||||
* error rather than silently regressing to in-memory.
|
||||
*/
|
||||
private readonly tasks?: IInferenceTaskService,
|
||||
/**
|
||||
* v5: caller's user id, threaded into newly-created task rows for
|
||||
* RBAC + observability. Optional — older wirings that don't have a
|
||||
* request context attribute tasks to 'system'.
|
||||
*/
|
||||
private readonly resolveOwner: () => string = () => 'system',
|
||||
) {}
|
||||
|
||||
async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> {
|
||||
@@ -227,6 +271,12 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
// Replace any prior handle for this session — keeps "last writer wins"
|
||||
// simple. The old SSE will have been closed by the publisher anyway.
|
||||
this.sessions.set(providerSessionId, handle);
|
||||
// v5: drain queued inference tasks targeting any pool this session
|
||||
// owns. Fire-and-forget: the bind() route handler completes the SSE
|
||||
// handshake first; tasks land on the channel right after.
|
||||
if (this.tasks !== undefined) {
|
||||
void this.drainPendingForSession(providerSessionId, handle);
|
||||
}
|
||||
}
|
||||
|
||||
async unbindSession(providerSessionId: string): Promise<void> {
|
||||
@@ -243,11 +293,60 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
if (this.agents !== undefined) {
|
||||
await this.agents.markVirtualAgentsInactiveBySession(providerSessionId);
|
||||
}
|
||||
// Reject any in-flight tasks for this session — the relay can't deliver
|
||||
// a result POST anymore.
|
||||
for (const t of this.tasksById.values()) {
|
||||
// v5: revert claimed/running inference tasks back to pending so
|
||||
// another worker on the same pool can pick them up. If no other
|
||||
// worker is up, they stay queued for whenever one shows up.
|
||||
if (this.tasks !== undefined) {
|
||||
await this.tasks.revertHeldBy(providerSessionId);
|
||||
}
|
||||
// Reject any in-flight wake tasks for this session — those don't
|
||||
// benefit from re-queue since the publisher itself went away.
|
||||
for (const t of this.wakeTasks.values()) {
|
||||
if (t.sessionId === providerSessionId) {
|
||||
this.failTask(t.taskId, new Error('publisher disconnected'));
|
||||
this.wakeTasks.delete(t.taskId);
|
||||
t.reject(new Error('publisher disconnected'));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* v5: when a worker binds its SSE channel, find every pending
|
||||
* InferenceTask targeting a pool key this session owns, claim each,
|
||||
* and push the frame down the channel. If two workers in the same
|
||||
* pool race here, the repo's CAS on tryClaim ensures each task is
|
||||
* pushed to exactly one of them.
|
||||
*/
|
||||
private async drainPendingForSession(
|
||||
providerSessionId: string,
|
||||
handle: VirtualSessionHandle,
|
||||
): Promise<void> {
|
||||
if (this.tasks === undefined) return;
|
||||
const owned = await this.repo.findBySessionId(providerSessionId);
|
||||
if (owned.length === 0) return;
|
||||
const poolKeys = Array.from(new Set(owned.map((row) => effectivePoolName(row))));
|
||||
const pending = await this.tasks.findPendingForPools(poolKeys);
|
||||
for (const task of pending) {
|
||||
// Cap drain per bind to avoid pushing a giant backlog into a
|
||||
// brand-new SSE channel all at once. Hardcoded for now; a
|
||||
// per-session capacity hint is a v6 concern.
|
||||
if (!handle.alive) break;
|
||||
const claimed = await this.tasks.tryClaim(task.id, providerSessionId);
|
||||
if (claimed === null) continue; // raced; another worker got it
|
||||
try {
|
||||
handle.pushTask({
|
||||
kind: 'infer',
|
||||
taskId: claimed.id,
|
||||
llmName: claimed.llmName,
|
||||
request: claimed.requestBody as unknown as OpenAiChatRequest,
|
||||
streaming: claimed.streaming,
|
||||
});
|
||||
} catch (err) {
|
||||
// SSE write failed mid-drain — revert the claim so a
|
||||
// healthier worker can pick it up.
|
||||
await this.tasks.revertHeldBy(providerSessionId);
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn(`drainPendingForSession: pushTask failed for ${claimed.id}: ${(err as Error).message}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -256,7 +355,11 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
llmName: string,
|
||||
request: OpenAiChatRequest,
|
||||
streaming: boolean,
|
||||
options: EnqueueInferOptions = {},
|
||||
): Promise<PendingTaskRef> {
|
||||
if (this.tasks === undefined) {
|
||||
throw new Error('InferenceTaskService not wired into VirtualLlmService');
|
||||
}
|
||||
const llm = await this.repo.findByName(llmName);
|
||||
if (llm === null) throw new NotFoundError(`Llm not found: ${llmName}`);
|
||||
if (llm.kind !== 'virtual' || llm.providerSessionId === null) {
|
||||
@@ -265,67 +368,133 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
{ statusCode: 500 },
|
||||
);
|
||||
}
|
||||
if (llm.status === 'inactive') {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`),
|
||||
{ statusCode: 503 },
|
||||
);
|
||||
}
|
||||
const handle = this.sessions.get(llm.providerSessionId);
|
||||
if (handle === undefined || !handle.alive) {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`),
|
||||
{ statusCode: 503 },
|
||||
);
|
||||
|
||||
// failFast callers (chat.service pool failover, direct infer route)
|
||||
// get the v1-v4 semantic: row inactive OR no live session = 503,
|
||||
// immediately. The chat dispatcher then iterates the next pool
|
||||
// candidate. Without failFast, both cases queue durably and a
|
||||
// future bindSession drains.
|
||||
if (options.failFast === true) {
|
||||
if (llm.status === 'inactive') {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`),
|
||||
{ statusCode: 503 },
|
||||
);
|
||||
}
|
||||
const handle = this.sessions.get(llm.providerSessionId);
|
||||
if (handle === undefined || !handle.alive) {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`),
|
||||
{ statusCode: 503 },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Wake-on-demand (v2) ──
|
||||
// Status=hibernating means the publisher told us at register time
|
||||
// (or via a later status update) that the backend is asleep. Fire a
|
||||
// wake task and wait for the publisher to confirm readiness before
|
||||
// dispatching the actual inference. Concurrent infers for the same
|
||||
// Llm share a single wake promise.
|
||||
// that the backend is asleep. Fire a wake task and wait for the
|
||||
// publisher to confirm readiness before persisting the inference
|
||||
// task. Concurrent infers for the same Llm share a single wake
|
||||
// promise. We hold the wake INSIDE this method (not after enqueue)
|
||||
// so we don't queue an inference task that we then can't dispatch.
|
||||
if (llm.status === 'hibernating') {
|
||||
const handle = this.sessions.get(llm.providerSessionId);
|
||||
if (handle === undefined || !handle.alive) {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Llm '${llmName}' has no live SSE session; cannot wake`),
|
||||
{ statusCode: 503 },
|
||||
);
|
||||
}
|
||||
await this.ensureAwake(llm.id, llm.name, llm.providerSessionId, handle);
|
||||
}
|
||||
|
||||
const taskId = randomUUID();
|
||||
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
|
||||
|
||||
let resolveDone!: (v: { status: number; body: unknown }) => void;
|
||||
let rejectDone!: (err: Error) => void;
|
||||
const done = new Promise<{ status: number; body: unknown }>((resolve, reject) => {
|
||||
resolveDone = resolve;
|
||||
rejectDone = reject;
|
||||
// ── v5: persist the task BEFORE attempting dispatch ──
|
||||
// Even if no worker is up, the row stays pending and a future
|
||||
// bindSession will drain it. Caller's HTTP timeout still bounds
|
||||
// the wait, but the *task* survives.
|
||||
const created = await this.tasks.enqueue({
|
||||
poolName: effectivePoolName(llm),
|
||||
llmName,
|
||||
model: llm.model,
|
||||
tier: llm.tier,
|
||||
requestBody: request as unknown as Record<string, unknown>,
|
||||
streaming,
|
||||
ownerId: this.resolveOwner(),
|
||||
});
|
||||
|
||||
const pending: PendingTask = {
|
||||
taskId,
|
||||
sessionId: llm.providerSessionId,
|
||||
llmName,
|
||||
streaming,
|
||||
resolveNonStreaming: (body, status) => resolveDone({ status, body }),
|
||||
rejectNonStreaming: rejectDone,
|
||||
pushChunk: streaming
|
||||
? (chunk): void => { for (const cb of chunkSubscribers) cb(chunk); }
|
||||
: null,
|
||||
// Try to claim + dispatch immediately if a session is up. If not,
|
||||
// the row stays pending for drainPendingForSession to pick up.
|
||||
const handle = this.sessions.get(llm.providerSessionId);
|
||||
if (handle !== undefined && handle.alive) {
|
||||
const claimed = await this.tasks.tryClaim(created.id, llm.providerSessionId);
|
||||
if (claimed !== null) {
|
||||
handle.pushTask({
|
||||
kind: 'infer',
|
||||
taskId: claimed.id,
|
||||
llmName,
|
||||
request,
|
||||
streaming,
|
||||
});
|
||||
}
|
||||
// tryClaim can return null only if another concurrent enqueue
|
||||
// (or the GC sweep) already moved the row off pending — leave
|
||||
// it; drainPendingForSession will reconcile.
|
||||
}
|
||||
|
||||
// Wrap the task service's waitFor into the legacy PendingTaskRef
|
||||
// shape so existing callers (chat.service, llm-infer route)
|
||||
// don't need to change. The chunk callback bridges
|
||||
// InferenceTaskService.events into the on-the-fly subscriber set.
|
||||
const tasks = this.tasks;
|
||||
const taskId = created.id;
|
||||
const subscribers = new Set<(chunk: InferenceTaskChunk) => void>();
|
||||
// Single bridge listener on the service's emitter; fans out to all
|
||||
// subscribers locally so we don't pile up listeners on a hot key.
|
||||
let bridgeUnsub: (() => void) | null = null;
|
||||
const ensureBridge = (): void => {
|
||||
if (bridgeUnsub !== null) return;
|
||||
const onChunk = (chunk: InferenceTaskChunk): void => {
|
||||
for (const cb of subscribers) cb(chunk);
|
||||
};
|
||||
// Subscribe via the public pushChunk path: the service emits on
|
||||
// its own EventEmitter; we attach a listener here. Use the
|
||||
// service's events through a side channel — exposed below as
|
||||
// subscribeChunks for clarity.
|
||||
bridgeUnsub = tasks.subscribeChunksUnsafe(taskId, onChunk);
|
||||
};
|
||||
this.tasksById.set(taskId, pending);
|
||||
|
||||
handle.pushTask({
|
||||
kind: 'infer',
|
||||
taskId,
|
||||
llmName,
|
||||
request,
|
||||
streaming,
|
||||
});
|
||||
const done = (async (): Promise<{ status: number; body: unknown }> => {
|
||||
// waitFor's `done` rejects on cancel/error/timeout. For non-
|
||||
// streaming tasks the responseBody IS the body; for streaming
|
||||
// the body is null and chunks have already been piped through.
|
||||
const waiter = tasks.waitFor(taskId, INFER_AWAIT_TIMEOUT_MS);
|
||||
// If streaming, we must drain the chunks generator concurrently
|
||||
// so chunks aren't dropped just because no one's subscribed yet.
|
||||
// The real consumer subscribes via onChunk(); their cb fires
|
||||
// synchronously inside the bridge.
|
||||
if (streaming) ensureBridge();
|
||||
const finalRow = await waiter.done;
|
||||
// Status code: legacy callers expect 200 on success; the worker
|
||||
// sends its own status via the result POST and we forward it.
|
||||
// Today, success POSTs come with status=200 and the row's
|
||||
// responseBody is { status, body }. v5 stores the *body* directly
|
||||
// on the row; we synthesize a 200 here to match the legacy shape.
|
||||
return { status: 200, body: finalRow.responseBody };
|
||||
})();
|
||||
|
||||
return {
|
||||
taskId,
|
||||
done,
|
||||
onChunk(cb): () => void {
|
||||
chunkSubscribers.add(cb);
|
||||
return () => chunkSubscribers.delete(cb);
|
||||
subscribers.add(cb);
|
||||
ensureBridge();
|
||||
return (): void => {
|
||||
subscribers.delete(cb);
|
||||
if (subscribers.size === 0 && bridgeUnsub !== null) {
|
||||
bridgeUnsub();
|
||||
bridgeUnsub = null;
|
||||
}
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -370,22 +539,17 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
rejectDone = reject;
|
||||
});
|
||||
|
||||
const pending: PendingTask = {
|
||||
const wake: InMemoryWakeTask = {
|
||||
taskId,
|
||||
sessionId,
|
||||
llmName,
|
||||
streaming: false,
|
||||
// Wake tasks return { ok: true } on success or never resolve at
|
||||
// all if the publisher dies; the rejectNonStreaming path covers
|
||||
// the disconnect-mid-wake case via unbindSession.
|
||||
resolveNonStreaming: (_body, status) => {
|
||||
resolve: (status) => {
|
||||
if (status >= 200 && status < 300) resolveDone();
|
||||
else rejectDone(new Error(`wake task returned status ${String(status)}`));
|
||||
},
|
||||
rejectNonStreaming: rejectDone,
|
||||
pushChunk: null,
|
||||
reject: rejectDone,
|
||||
};
|
||||
this.tasksById.set(taskId, pending);
|
||||
this.wakeTasks.set(taskId, wake);
|
||||
|
||||
handle.pushTask({ kind: 'wake', taskId, llmName });
|
||||
|
||||
@@ -402,32 +566,55 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
}
|
||||
|
||||
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
|
||||
const t = this.tasksById.get(taskId);
|
||||
if (t === undefined) return false;
|
||||
this.tasksById.delete(taskId);
|
||||
t.resolveNonStreaming(result.body, result.status);
|
||||
return true;
|
||||
// Wake tasks: in-memory map. Resolve and bail.
|
||||
const wake = this.wakeTasks.get(taskId);
|
||||
if (wake !== undefined) {
|
||||
this.wakeTasks.delete(taskId);
|
||||
wake.resolve(result.status);
|
||||
return true;
|
||||
}
|
||||
// Inference tasks: durable queue. Persist body + flip terminal +
|
||||
// emit wakeup. Fire-and-forget the DB write; the result POST
|
||||
// returns 200 either way (the caller's HTTP handler is what cares
|
||||
// about the eventual emit, not the worker).
|
||||
if (this.tasks !== undefined) {
|
||||
void this.tasks.complete(taskId, result.body as Record<string, unknown> | null);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean {
|
||||
const t = this.tasksById.get(taskId);
|
||||
if (t === undefined || t.pushChunk === null) return false;
|
||||
t.pushChunk(chunk);
|
||||
// Wake tasks never receive chunks — they're non-streaming control
|
||||
// messages. Don't even check the wake map; if the id isn't an
|
||||
// inference task, just drop the chunk.
|
||||
if (this.tasks === undefined) return false;
|
||||
// First chunk for a claimed task → flip claimed → running. Idempotent
|
||||
// and fire-and-forget; if the row is already running, the CAS in the
|
||||
// repo just no-ops.
|
||||
void this.tasks.markRunning(taskId);
|
||||
this.tasks.pushChunk(taskId, chunk);
|
||||
if (chunk.done === true) {
|
||||
// For streaming tasks, also resolve the `done` promise so the route
|
||||
// handler can clean up.
|
||||
t.resolveNonStreaming(null, 200);
|
||||
this.tasksById.delete(taskId);
|
||||
// Streaming completion: persist with null body + flip terminal so
|
||||
// the waiter unblocks. The actual content was already streamed
|
||||
// through the chunks channel; nothing to store.
|
||||
void this.tasks.complete(taskId, null);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
failTask(taskId: string, error: Error): boolean {
|
||||
const t = this.tasksById.get(taskId);
|
||||
if (t === undefined) return false;
|
||||
this.tasksById.delete(taskId);
|
||||
t.rejectNonStreaming(error);
|
||||
return true;
|
||||
const wake = this.wakeTasks.get(taskId);
|
||||
if (wake !== undefined) {
|
||||
this.wakeTasks.delete(taskId);
|
||||
wake.reject(error);
|
||||
return true;
|
||||
}
|
||||
if (this.tasks !== undefined) {
|
||||
void this.tasks.fail(taskId, error.message);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async gcSweep(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> {
|
||||
|
||||
@@ -193,6 +193,10 @@ describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
|
||||
'vllm-local',
|
||||
expect.objectContaining({ messages: expect.any(Array) }),
|
||||
false,
|
||||
// v5: chat.service passes failFast:true so its pool failover loop
|
||||
// surfaces transport errors quickly instead of waiting on the
|
||||
// durable queue's 10-min timeout.
|
||||
{ failFast: true },
|
||||
);
|
||||
});
|
||||
|
||||
@@ -224,6 +228,7 @@ describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
|
||||
'vllm-local',
|
||||
expect.objectContaining({ messages: expect.any(Array), stream: true }),
|
||||
true,
|
||||
{ failFast: true },
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -244,6 +244,9 @@ describe('POST /api/v1/llms/:name/infer', () => {
|
||||
'claude',
|
||||
expect.objectContaining({ messages: expect.any(Array) }),
|
||||
false,
|
||||
// v5: direct infer route passes failFast:true so a downed publisher
|
||||
// returns 503 immediately instead of queueing the task durably.
|
||||
{ failFast: true },
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import { describe, it, expect, vi } from 'vitest';
|
||||
import { VirtualLlmService, type VirtualSessionHandle } from '../src/services/virtual-llm.service.js';
|
||||
import { InferenceTaskService } from '../src/services/inference-task.service.js';
|
||||
import type { IInferenceTaskService } from '../src/services/inference-task.service.js';
|
||||
import type { IInferenceTaskRepository } from '../src/repositories/inference-task.repository.js';
|
||||
import type { ILlmRepository } from '../src/repositories/llm.repository.js';
|
||||
import type { Llm } from '@prisma/client';
|
||||
import type { Llm, InferenceTask, InferenceTaskStatus } from '@prisma/client';
|
||||
|
||||
function makeLlm(overrides: Partial<Llm> = {}): Llm {
|
||||
return {
|
||||
@@ -15,6 +18,7 @@ function makeLlm(overrides: Partial<Llm> = {}): Llm {
|
||||
apiKeySecretId: null,
|
||||
apiKeySecretKey: null,
|
||||
extraConfig: {} as Llm['extraConfig'],
|
||||
poolName: null,
|
||||
kind: 'virtual',
|
||||
providerSessionId: 's-1',
|
||||
lastHeartbeatAt: new Date(),
|
||||
@@ -27,6 +31,105 @@ function makeLlm(overrides: Partial<Llm> = {}): Llm {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop-in mock of `InferenceTaskService` backed by a Map. We mirror just
|
||||
* enough of the real service's signaling — events for terminal/chunk —
|
||||
* so VirtualLlmService's enqueue/result flows behave the same way they
|
||||
* do in production. Nothing here talks to Postgres.
|
||||
*/
|
||||
function mockTaskService(): IInferenceTaskService {
|
||||
// Build a minimal repo for the real InferenceTaskService — that way we
|
||||
// exercise the actual event-emitter wakeup logic, just without a DB.
|
||||
const rows = new Map<string, InferenceTask>();
|
||||
let n = 0;
|
||||
const repo: IInferenceTaskRepository = {
|
||||
create: vi.fn(async (data) => {
|
||||
n += 1;
|
||||
const row: InferenceTask = {
|
||||
id: `task-${String(n)}`,
|
||||
status: 'pending',
|
||||
poolName: data.poolName,
|
||||
llmName: data.llmName,
|
||||
model: data.model,
|
||||
tier: data.tier ?? null,
|
||||
claimedBy: null,
|
||||
requestBody: data.requestBody as InferenceTask['requestBody'],
|
||||
responseBody: null,
|
||||
errorMessage: null,
|
||||
streaming: data.streaming,
|
||||
createdAt: new Date(),
|
||||
claimedAt: null,
|
||||
streamStartedAt: null,
|
||||
completedAt: null,
|
||||
ownerId: data.ownerId,
|
||||
agentId: data.agentId ?? null,
|
||||
};
|
||||
rows.set(row.id, row);
|
||||
return row;
|
||||
}),
|
||||
findById: vi.fn(async (id) => rows.get(id) ?? null),
|
||||
findPendingForPools: vi.fn(async (poolNames: string[]) =>
|
||||
[...rows.values()].filter((r) => r.status === 'pending' && poolNames.includes(r.poolName)),
|
||||
),
|
||||
findHeldBy: vi.fn(async (claimedBy: string) =>
|
||||
[...rows.values()].filter((r) =>
|
||||
r.claimedBy === claimedBy
|
||||
&& (r.status === 'claimed' || r.status === 'running')),
|
||||
),
|
||||
list: vi.fn(async () => [...rows.values()]),
|
||||
tryClaim: vi.fn(async (id, claimedBy, claimedAt) => {
|
||||
const r = rows.get(id);
|
||||
if (r === undefined || r.status !== 'pending') return null;
|
||||
const next = { ...r, status: 'claimed' as InferenceTaskStatus, claimedBy, claimedAt };
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
markRunning: vi.fn(async (id, at) => {
|
||||
const r = rows.get(id);
|
||||
if (r === undefined || (r.status !== 'claimed' && r.status !== 'running')) return null;
|
||||
const next = { ...r, status: 'running' as InferenceTaskStatus, streamStartedAt: at };
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
markCompleted: vi.fn(async (id, body, at) => {
|
||||
const r = rows.get(id);
|
||||
if (r === undefined || r.status === 'completed' || r.status === 'error' || r.status === 'cancelled') return null;
|
||||
const next = { ...r, status: 'completed' as InferenceTaskStatus, responseBody: (body ?? null) as InferenceTask['responseBody'], completedAt: at };
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
markError: vi.fn(async (id, errorMessage, at) => {
|
||||
const r = rows.get(id);
|
||||
if (r === undefined || r.status === 'completed' || r.status === 'error' || r.status === 'cancelled') return null;
|
||||
const next = { ...r, status: 'error' as InferenceTaskStatus, errorMessage, completedAt: at };
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
markCancelled: vi.fn(async (id, at) => {
|
||||
const r = rows.get(id);
|
||||
if (r === undefined || r.status === 'completed' || r.status === 'error' || r.status === 'cancelled') return null;
|
||||
const next = { ...r, status: 'cancelled' as InferenceTaskStatus, completedAt: at };
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
revertToPending: vi.fn(async (id) => {
|
||||
const r = rows.get(id);
|
||||
if (r === undefined || (r.status !== 'claimed' && r.status !== 'running')) return null;
|
||||
const next = { ...r, status: 'pending' as InferenceTaskStatus, claimedBy: null, claimedAt: null, streamStartedAt: null };
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
findStalePending: vi.fn(async () => []),
|
||||
findExpiredTerminal: vi.fn(async () => []),
|
||||
deleteMany: vi.fn(async (ids) => {
|
||||
let c = 0;
|
||||
for (const id of ids) if (rows.delete(id)) c += 1;
|
||||
return c;
|
||||
}),
|
||||
};
|
||||
return new InferenceTaskService(repo);
|
||||
}
|
||||
|
||||
function mockRepo(initial: Llm[] = []): ILlmRepository {
|
||||
const rows = new Map<string, Llm>(initial.map((l) => [l.id, l]));
|
||||
let counter = rows.size;
|
||||
@@ -38,6 +141,14 @@ function mockRepo(initial: Llm[] = []): ILlmRepository {
|
||||
return null;
|
||||
}),
|
||||
findByTier: vi.fn(async () => []),
|
||||
findByPoolName: vi.fn(async (poolName: string) => {
|
||||
const out: Llm[] = [];
|
||||
for (const l of rows.values()) {
|
||||
if (l.poolName === poolName) out.push(l);
|
||||
else if (l.poolName === null && l.name === poolName) out.push(l);
|
||||
}
|
||||
return out;
|
||||
}),
|
||||
findBySessionId: vi.fn(async (sid: string) =>
|
||||
[...rows.values()].filter((l) => l.providerSessionId === sid)),
|
||||
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
|
||||
@@ -105,7 +216,7 @@ function fakeSession(): VirtualSessionHandle & { tasks: Array<unknown>; alive: b
|
||||
describe('VirtualLlmService', () => {
|
||||
it('register inserts new virtual rows with active status + sessionId', async () => {
|
||||
const repo = mockRepo();
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const { providerSessionId, llms } = await svc.register({
|
||||
providerSessionId: null,
|
||||
providers: [
|
||||
@@ -122,7 +233,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('register reuses the same row on sticky reconnect (same name + sessionId)', async () => {
|
||||
const repo = mockRepo();
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const first = await svc.register({
|
||||
providerSessionId: 'fixed-id',
|
||||
providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }],
|
||||
@@ -140,7 +251,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('register refuses to overwrite a public LLM with the same name', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
await expect(svc.register({
|
||||
providerSessionId: 'sess-x',
|
||||
providers: [{ name: 'qwen3-thinking', type: 'openai', model: 'm' }],
|
||||
@@ -149,7 +260,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('register refuses if another active session owns the name', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'other', status: 'active' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
await expect(svc.register({
|
||||
providerSessionId: 'mine',
|
||||
providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }],
|
||||
@@ -161,7 +272,7 @@ describe('VirtualLlmService', () => {
|
||||
name: 'vllm-local', providerSessionId: 'old-session',
|
||||
status: 'inactive', inactiveSince: new Date(),
|
||||
})]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const { llms } = await svc.register({
|
||||
providerSessionId: 'new-session',
|
||||
providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }],
|
||||
@@ -177,7 +288,7 @@ describe('VirtualLlmService', () => {
|
||||
name: 'vllm-local', providerSessionId: 'sess', status: 'inactive',
|
||||
lastHeartbeatAt: past, inactiveSince: past,
|
||||
})]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
await svc.heartbeat('sess');
|
||||
const row = await repo.findByName('vllm-local');
|
||||
expect(row?.status).toBe('active');
|
||||
@@ -191,7 +302,7 @@ describe('VirtualLlmService', () => {
|
||||
makeLlm({ name: 'b', providerSessionId: 'sess' }),
|
||||
makeLlm({ name: 'c', providerSessionId: 'other' }),
|
||||
]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
svc.bindSession('sess', fakeSession());
|
||||
await svc.unbindSession('sess');
|
||||
expect((await repo.findByName('a'))?.status).toBe('inactive');
|
||||
@@ -201,7 +312,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('enqueueInferTask pushes a task frame to the SSE session', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
|
||||
@@ -218,26 +329,41 @@ describe('VirtualLlmService', () => {
|
||||
expect(t.streaming).toBe(false);
|
||||
});
|
||||
|
||||
it('enqueueInferTask rejects when the publisher is offline (no session bound)', async () => {
|
||||
it('enqueueInferTask queues the task when no session is bound (durable, drains on bind)', async () => {
|
||||
// v5 semantic change: with a durable queue underneath, "no worker
|
||||
// up" no longer rejects — the row stays pending and a future
|
||||
// bindSession drains it. Caller's HTTP handler awaits on ref.done
|
||||
// and bounds itself with INFER_AWAIT_TIMEOUT_MS; from the service's
|
||||
// POV the enqueue itself succeeds.
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
await expect(
|
||||
svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false),
|
||||
).rejects.toThrow(/no live SSE session|publisher offline/);
|
||||
const tasks = mockTaskService();
|
||||
const svc = new VirtualLlmService(repo, undefined, tasks);
|
||||
const ref = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false);
|
||||
expect(ref.taskId).toMatch(/^task-/);
|
||||
// The row exists and is still pending — no claim happened.
|
||||
const row = await tasks.findById(ref.taskId);
|
||||
expect(row?.status).toBe('pending');
|
||||
expect(row?.claimedBy).toBeNull();
|
||||
});
|
||||
|
||||
it('enqueueInferTask rejects when the row is inactive', async () => {
|
||||
it('enqueueInferTask still queues against an inactive row (pool may have a sibling worker)', async () => {
|
||||
// v5 semantic change: status=inactive on a specific Llm doesn't
|
||||
// mean the pool is dead — another mcplocal publishing the same
|
||||
// poolName might be active. The dispatcher's bindSession drain
|
||||
// matches by poolName, so even a "dead" pin queues correctly.
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
await expect(
|
||||
svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false),
|
||||
).rejects.toThrow(/inactive|publisher offline/);
|
||||
const tasks = mockTaskService();
|
||||
const svc = new VirtualLlmService(repo, undefined, tasks);
|
||||
const ref = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false);
|
||||
const row = await tasks.findById(ref.taskId);
|
||||
expect(row?.status).toBe('pending');
|
||||
// No frame pushed because no session is bound.
|
||||
expect(row?.claimedBy).toBeNull();
|
||||
});
|
||||
|
||||
it('enqueueInferTask rejects when the LLM is public (not virtual)', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
await expect(
|
||||
svc.enqueueInferTask('qwen3-thinking', { model: 'm', messages: [] }, false),
|
||||
).rejects.toThrow(/not a virtual provider/);
|
||||
@@ -245,7 +371,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('completeTask resolves the pending non-streaming promise', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
svc.bindSession('sess', fakeSession());
|
||||
const ref = await svc.enqueueInferTask(
|
||||
'vllm-local',
|
||||
@@ -258,7 +384,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('streaming: pushTaskChunk fans chunks to subscribers; done resolves the ref', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
svc.bindSession('sess', fakeSession());
|
||||
const ref = await svc.enqueueInferTask(
|
||||
'vllm-local',
|
||||
@@ -278,7 +404,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('failTask rejects the pending promise with a clear error', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
svc.bindSession('sess', fakeSession());
|
||||
const ref = await svc.enqueueInferTask(
|
||||
'vllm-local',
|
||||
@@ -289,17 +415,34 @@ describe('VirtualLlmService', () => {
|
||||
await expect(ref.done).rejects.toThrow(/upstream blew up/);
|
||||
});
|
||||
|
||||
it('unbindSession rejects in-flight tasks for that session', async () => {
|
||||
it('unbindSession reverts claimed inference tasks to pending (durable re-queue, not reject)', async () => {
|
||||
// v5 semantic change: a worker disconnecting mid-task no longer
|
||||
// *rejects* the task. The row goes back to pending so another
|
||||
// worker on the same pool can pick it up. The original caller's
|
||||
// ref.done keeps waiting up to its 10-min INFER_AWAIT_TIMEOUT_MS;
|
||||
// the same caller is what gets the result whichever worker
|
||||
// ultimately delivers it.
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const tasks = mockTaskService();
|
||||
const svc = new VirtualLlmService(repo, undefined, tasks);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
const ref = await svc.enqueueInferTask(
|
||||
'vllm-local',
|
||||
{ model: 'm', messages: [{ role: 'user', content: 'hi' }] },
|
||||
false,
|
||||
);
|
||||
// After enqueue with a session up, the task is claimed.
|
||||
let row = await tasks.findById(ref.taskId);
|
||||
expect(row?.status).toBe('claimed');
|
||||
expect(row?.claimedBy).toBe('sess');
|
||||
|
||||
await svc.unbindSession('sess');
|
||||
await expect(ref.done).rejects.toThrow(/publisher disconnected/);
|
||||
|
||||
// After disconnect, claimed/running rows revert to pending — ready
|
||||
// for the next worker to drain.
|
||||
row = await tasks.findById(ref.taskId);
|
||||
expect(row?.status).toBe('pending');
|
||||
expect(row?.claimedBy).toBeNull();
|
||||
});
|
||||
|
||||
it('gcSweep flips heartbeat-stale active virtuals to inactive', async () => {
|
||||
@@ -309,7 +452,7 @@ describe('VirtualLlmService', () => {
|
||||
makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }),
|
||||
makeLlm({ name: 'fresh', providerSessionId: 'b', status: 'active', lastHeartbeatAt: recent }),
|
||||
]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const result = await svc.gcSweep();
|
||||
expect(result.markedInactive).toBe(1);
|
||||
expect((await repo.findByName('stale'))?.status).toBe('inactive');
|
||||
@@ -324,7 +467,7 @@ describe('VirtualLlmService', () => {
|
||||
makeLlm({ name: 'recent', providerSessionId: 'b', status: 'inactive', inactiveSince: fresh }),
|
||||
makeLlm({ name: 'public-survivor', providerSessionId: null, kind: 'public' }),
|
||||
]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const result = await svc.gcSweep();
|
||||
expect(result.deleted).toBe(1);
|
||||
expect(await repo.findByName('old')).toBeNull();
|
||||
@@ -336,7 +479,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('hibernating: dispatches a wake task first and waits for it to complete before infer', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
|
||||
@@ -370,7 +513,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('hibernating: concurrent infer requests share a single wake task', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
|
||||
@@ -398,7 +541,7 @@ describe('VirtualLlmService', () => {
|
||||
|
||||
it('hibernating: rejects when the wake task fails', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'broken', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
svc.bindSession('sess', fakeSession());
|
||||
|
||||
const inferPromise = svc.enqueueInferTask(
|
||||
@@ -408,12 +551,12 @@ describe('VirtualLlmService', () => {
|
||||
);
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
|
||||
// Get the wake task id from the in-flight tasks map (its only entry).
|
||||
// We test the failure path via failTask which is part of the public
|
||||
// surface used by the result-POST route handler.
|
||||
// v5: wake tasks live in `wakeTasks` (in-memory). Inference tasks
|
||||
// moved to the DB-backed queue but wake is publisher-control work
|
||||
// that doesn't need durability — we kept the in-memory map for it.
|
||||
const taskIds: string[] = [];
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
for (const id of (svc as any).tasksById.keys()) taskIds.push(id);
|
||||
for (const id of (svc as any).wakeTasks.keys()) taskIds.push(id);
|
||||
expect(taskIds).toHaveLength(1);
|
||||
expect(svc.failTask(taskIds[0]!, new Error('wake recipe failed'))).toBe(true);
|
||||
|
||||
@@ -424,14 +567,28 @@ describe('VirtualLlmService', () => {
|
||||
expect(row?.status).toBe('hibernating');
|
||||
});
|
||||
|
||||
it('inactive: still rejects with 503 (publisher offline) — wake path only fires for hibernating', async () => {
|
||||
it('inactive: queues without firing the wake path — wake only triggers on status=hibernating', async () => {
|
||||
// Coverage for the v5 inactive-vs-hibernating distinction.
|
||||
// hibernating = "publisher told us the backend is asleep, ask
|
||||
// them to wake it"; inactive = "publisher itself is offline".
|
||||
// For inactive rows, queueing is the right behavior (wait for a
|
||||
// worker on the pool to come online and drain). The wake path
|
||||
// must NOT fire — wake is opt-in via the publisher's register
|
||||
// payload, not a generic "row is down" recovery.
|
||||
//
|
||||
// No session bind here: an "inactive" row in production means
|
||||
// unbindSession already flipped it after SSE close. Binding a
|
||||
// session for the same providerSessionId would be a contradictory
|
||||
// setup that the test wouldn't model anything real about.
|
||||
const repo = mockRepo([makeLlm({ name: 'gone', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
const tasks = mockTaskService();
|
||||
const svc = new VirtualLlmService(repo, undefined, tasks);
|
||||
|
||||
await expect(
|
||||
svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false),
|
||||
).rejects.toThrow(/inactive|publisher offline/);
|
||||
const ref = await svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false);
|
||||
// Task queued in pending; no claim, no frame.
|
||||
const row = await tasks.findById(ref.taskId);
|
||||
expect(row?.status).toBe('pending');
|
||||
expect(row?.claimedBy).toBeNull();
|
||||
});
|
||||
|
||||
it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => {
|
||||
@@ -439,11 +596,75 @@ describe('VirtualLlmService', () => {
|
||||
const repo = mockRepo([
|
||||
makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }),
|
||||
]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const svc = new VirtualLlmService(repo, undefined, mockTaskService());
|
||||
const first = await svc.gcSweep();
|
||||
const second = await svc.gcSweep();
|
||||
expect(first.markedInactive).toBe(1);
|
||||
expect(second.markedInactive).toBe(0);
|
||||
expect(second.deleted).toBe(0);
|
||||
});
|
||||
|
||||
// ── v5: durable queue + drain-on-bind ──
|
||||
|
||||
it('bindSession drains pending inference tasks owned by the session\'s pool keys', async () => {
|
||||
// Two enqueues land while no session is bound. Each row is created
|
||||
// with status=pending; no SSE frame goes anywhere. When the worker
|
||||
// finally binds, the drain loop claims both and pushes the frames.
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', poolName: 'qwen-pool' })]);
|
||||
const tasks = mockTaskService();
|
||||
const svc = new VirtualLlmService(repo, undefined, tasks);
|
||||
const ref1 = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [{ role: 'user', content: 'one' }] }, false);
|
||||
const ref2 = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [{ role: 'user', content: 'two' }] }, false);
|
||||
// Both rows are still pending — no worker bound yet.
|
||||
expect((await tasks.findById(ref1.taskId))?.status).toBe('pending');
|
||||
expect((await tasks.findById(ref2.taskId))?.status).toBe('pending');
|
||||
|
||||
// Worker shows up. Drain runs synchronously enough that we just
|
||||
// need to flush the microtask queue before checking SSE frames.
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
// drainPendingForSession is fired with `void` so let microtasks
|
||||
// settle before asserting.
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
|
||||
expect((session.tasks as Array<{ kind: string; taskId: string }>).map((t) => t.taskId).sort())
|
||||
.toEqual([ref1.taskId, ref2.taskId].sort());
|
||||
// Rows are now claimed by this session.
|
||||
expect((await tasks.findById(ref1.taskId))?.status).toBe('claimed');
|
||||
expect((await tasks.findById(ref1.taskId))?.claimedBy).toBe('sess');
|
||||
});
|
||||
|
||||
it('drain-on-bind matches the effective pool key, not just llm.name', async () => {
|
||||
// The pinned Llm has name=vllm-alice but poolName=qwen-pool.
|
||||
// Enqueue against vllm-alice → row.poolName=qwen-pool.
|
||||
// Worker binds with a session that owns vllm-alice (same pool key).
|
||||
// Drain must surface this row even though poolName != name.
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-alice', providerSessionId: 'sess', poolName: 'qwen-pool' })]);
|
||||
const tasks = mockTaskService();
|
||||
const svc = new VirtualLlmService(repo, undefined, tasks);
|
||||
const ref = await svc.enqueueInferTask('vllm-alice', { model: 'm', messages: [] }, false);
|
||||
expect((await tasks.findById(ref.taskId))?.poolName).toBe('qwen-pool');
|
||||
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
expect((session.tasks as Array<{ taskId: string }>).map((t) => t.taskId)).toEqual([ref.taskId]);
|
||||
});
|
||||
|
||||
it('completeTask via the result-route updates the DB row + emits the wakeup', async () => {
|
||||
// End-to-end through the public surface: enqueue → claim happens
|
||||
// because session is bound → worker POSTs result → completeTask
|
||||
// routes to InferenceTaskService.complete → ref.done resolves.
|
||||
const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const tasks = mockTaskService();
|
||||
const svc = new VirtualLlmService(repo, undefined, tasks);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
const ref = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false);
|
||||
|
||||
expect(svc.completeTask(ref.taskId, { status: 200, body: { ok: true } })).toBe(true);
|
||||
await expect(ref.done).resolves.toEqual({ status: 200, body: { ok: true } });
|
||||
const row = await tasks.findById(ref.taskId);
|
||||
expect(row?.status).toBe('completed');
|
||||
expect(row?.responseBody).toEqual({ ok: true });
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user