diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index e54de89..4375cfa 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -32,6 +32,8 @@ import { SecretBackendRotatorLoop } from './services/secret-backend-rotator-loop import { registerSecretBackendRotateRoutes } from './routes/secret-backend-rotate.js'; import { LlmRepository } from './repositories/llm.repository.js'; import { LlmService } from './services/llm.service.js'; +import { InferenceTaskRepository } from './repositories/inference-task.repository.js'; +import { InferenceTaskService } from './services/inference-task.service.js'; import { AgentRepository } from './repositories/agent.repository.js'; import { ChatRepository } from './repositories/chat.repository.js'; import { AgentService } from './services/agent.service.js'; @@ -463,10 +465,17 @@ async function main(): Promise { const personalityRepo = new PersonalityRepository(prisma); const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo); const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo); + // v5: durable inference task queue. VirtualLlmService persists infer + // tasks here; the result-route updates them; an in-process emitter + // wakes blocked HTTP handlers when results land. + const inferenceTaskRepo = new InferenceTaskRepository(prisma); + const inferenceTaskService = new InferenceTaskService(inferenceTaskRepo); // Virtual-provider state machine (kind=virtual rows for both Llms and // Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade. + // v5 wires inferenceTaskService — enqueueInferTask now persists rows, + // worker disconnect reverts claimed rows, worker bind drains pending. // The 60-s GC ticker is started below after `app.listen`. - const virtualLlmService = new VirtualLlmService(llmRepo, agentService); + const virtualLlmService = new VirtualLlmService(llmRepo, agentService, inferenceTaskService); // ChatService needs the proxy + project repo via the ChatToolDispatcher // bridge. The dispatcher's logger references `app.log`, which is not // constructed until further down — `chatService` itself is built right diff --git a/src/mcpd/src/routes/llm-infer.ts b/src/mcpd/src/routes/llm-infer.ts index 38a84f7..65cdf06 100644 --- a/src/mcpd/src/routes/llm-infer.ts +++ b/src/mcpd/src/routes/llm-infer.ts @@ -98,8 +98,12 @@ export function registerLlmInferRoutes( return { error: 'virtual LLM dispatch unavailable (server misconfiguration)' }; } try { + // Direct infer is the v1-v4 sync path: caller pinned to a + // specific Llm name and wants a fast 503 if the publisher is + // offline. The async durable-queue API (Stage 3) is for callers + // that explicitly opt into queueing. if (!streaming) { - const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false); + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false, { failFast: true }); const result = await ref.done; reply.code(result.status); audit(result.status); @@ -113,7 +117,7 @@ export function registerLlmInferRoutes( Connection: 'keep-alive', 'X-Accel-Buffering': 'no', }); - const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true); + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true, { failFast: true }); const unsubscribe = ref.onChunk((chunk) => writeSseChunk(reply, chunk.data)); try { await ref.done; diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index 80fe37c..d18b1b1 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -462,10 +462,16 @@ export class ChatService { // iterator. Chunks land on the queue from the SSE relay; the // generator drains them in order. ref.done resolves when the // publisher emits its `[DONE]` marker. + // + // failFast: true — the chat dispatcher's pool failover loop relies + // on a fast "transport error" surfacing so it can iterate to the + // next candidate. Without it, a downed pool member would queue the + // task and the loop would wait 10 min before trying the next one. const ref = await this.virtualLlms.enqueueInferTask( candidate.llmName, { ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true }, true, + { failFast: true }, ); const queue: Array<{ data: string; done?: boolean }> = []; let resolveTick: (() => void) | null = null; @@ -544,6 +550,7 @@ export class ChatService { candidate.llmName, this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), false, + { failFast: true }, ); return ref.done; } diff --git a/src/mcpd/src/services/inference-task.service.ts b/src/mcpd/src/services/inference-task.service.ts index 81d4109..366f8c1 100644 --- a/src/mcpd/src/services/inference-task.service.ts +++ b/src/mcpd/src/services/inference-task.service.ts @@ -55,6 +55,13 @@ export interface IInferenceTaskService { markRunning(taskId: string): Promise; /** Worker pushed a streaming chunk. Wakes up the in-process waiter; no DB write. */ pushChunk(taskId: string, chunk: InferenceTaskChunk): boolean; + /** + * Subscribe directly to a task's chunk stream as a callback. Returns + * an unsubscribe function. Used by VirtualLlmService's legacy + * PendingTaskRef.onChunk bridge — the high-level `waitFor` API is + * better when you need an async iterator + done promise together. + */ + subscribeChunksUnsafe(taskId: string, cb: (chunk: InferenceTaskChunk) => void): () => void; /** Worker POSTed the final result. Persists the body + flips to `completed`. */ complete(taskId: string, responseBody: Record | null): Promise; /** Worker (or mcpd) marked the task as failed. */ @@ -201,6 +208,13 @@ export class InferenceTaskService implements IInferenceTaskService { return this.events.emit(`chunk:${taskId}`, chunk); } + subscribeChunksUnsafe(taskId: string, cb: (chunk: InferenceTaskChunk) => void): () => void { + this.events.on(`chunk:${taskId}`, cb); + return (): void => { + this.events.off(`chunk:${taskId}`, cb); + }; + } + async complete(taskId: string, responseBody: Record | null): Promise { const updated = await this.repo.markCompleted(taskId, responseBody, this.clock()); if (updated !== null) this.events.emit(`terminal:${taskId}`); diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts index e0befd6..c51ced3 100644 --- a/src/mcpd/src/services/virtual-llm.service.ts +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -29,6 +29,8 @@ import type { ILlmRepository } from '../repositories/llm.repository.js'; import type { OpenAiChatRequest } from './llm/types.js'; import { NotFoundError } from './mcp-server.service.js'; import type { AgentService } from './agent.service.js'; +import type { IInferenceTaskService, InferenceTaskChunk } from './inference-task.service.js'; +import { effectivePoolName } from './llm.service.js'; /** A virtual provider's announcement at registration time. */ export interface RegisterProviderInput { @@ -81,29 +83,53 @@ export type VirtualTaskFrame = | { kind: 'wake'; taskId: string; llmName: string }; /** - * Pending inference task. The route handler awaits `done`; the result POST - * resolves it via `completeTask()`. The error path rejects via `failTask()`. + * In-memory wake task. Wake is publisher-control work, not inference — + * we don't persist wake tasks to the DB because their lifetime is + * milliseconds and missing a wake on restart just means the next infer + * fires a fresh wake. Inference tasks live in the durable queue (v5); + * see InferenceTaskService. */ -interface PendingTask { +interface InMemoryWakeTask { taskId: string; sessionId: string; llmName: string; - streaming: boolean; - resolveNonStreaming: (body: unknown, status: number) => void; - rejectNonStreaming: (err: Error) => void; - /** For streaming tasks only; null on non-streaming. */ - pushChunk: ((chunk: { data: string; done?: boolean }) => void) | null; + resolve: (status: number) => void; + reject: (err: Error) => void; } const HEARTBEAT_TIMEOUT_MS = 90_000; const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; // 4 h +/** + * v5: how long enqueueInferTask waits for the worker's terminal POST + * before bailing. 10 minutes is generous for thinking models that + * spend 30-90s warming up. The TASK itself stays queued past this — + * only the in-flight HTTP handler gives up; an async-API consumer can + * still poll the row and pick up the result later. + */ +const INFER_AWAIT_TIMEOUT_MS = 10 * 60 * 1000; + +export interface EnqueueInferOptions { + /** + * v5: if true, throw 503 immediately when no live SSE session exists + * for the row's session at enqueue time, instead of queueing the + * task and waiting for a worker to show up. Used by: + * - the direct `/api/v1/llms//infer` route (caller asked for + * THIS specific Llm; no pool fanout at this layer) + * - chat.service's pool failover loop (it iterates candidates and + * needs each candidate's failure to surface fast) + * The async `POST /api/v1/inference-tasks` API leaves this false so + * callers explicitly opting into the durable queue get the queue. + * Default: false (durable, queues + waits up to INFER_AWAIT_TIMEOUT_MS). + */ + failFast?: boolean; +} export interface IVirtualLlmService { register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise; heartbeat(providerSessionId: string): Promise; bindSession(providerSessionId: string, handle: VirtualSessionHandle): void; unbindSession(providerSessionId: string): Promise; - enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean): Promise; + enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean, options?: EnqueueInferOptions): Promise; completeTask(taskId: string, result: { status: number; body: unknown }): boolean; pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean; failTask(taskId: string, error: Error): boolean; @@ -121,7 +147,12 @@ export interface PendingTaskRef { export class VirtualLlmService implements IVirtualLlmService { private readonly sessions = new Map(); - private readonly tasksById = new Map(); + /** + * Wake tasks live here, in-memory. The result POST resolves them by + * id. Inference tasks have moved to the durable queue (v5); see + * IInferenceTaskService. + */ + private readonly wakeTasks = new Map(); /** * Dedupe concurrent wake requests for the same Llm. The first request * starts the wake; later requests for the same name await the same @@ -138,6 +169,19 @@ export class VirtualLlmService implements IVirtualLlmService { * before deleting the Llm itself (Agent.llmId is Restrict). */ private readonly agents?: AgentService, + /** + * v5: durable inference task queue. Optional so older test wirings + * (and the non-virtual chat path) compile without it; when absent, + * enqueueInferTask falls back to a clear "task queue not wired" + * error rather than silently regressing to in-memory. + */ + private readonly tasks?: IInferenceTaskService, + /** + * v5: caller's user id, threaded into newly-created task rows for + * RBAC + observability. Optional — older wirings that don't have a + * request context attribute tasks to 'system'. + */ + private readonly resolveOwner: () => string = () => 'system', ) {} async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise { @@ -227,6 +271,12 @@ export class VirtualLlmService implements IVirtualLlmService { // Replace any prior handle for this session — keeps "last writer wins" // simple. The old SSE will have been closed by the publisher anyway. this.sessions.set(providerSessionId, handle); + // v5: drain queued inference tasks targeting any pool this session + // owns. Fire-and-forget: the bind() route handler completes the SSE + // handshake first; tasks land on the channel right after. + if (this.tasks !== undefined) { + void this.drainPendingForSession(providerSessionId, handle); + } } async unbindSession(providerSessionId: string): Promise { @@ -243,11 +293,60 @@ export class VirtualLlmService implements IVirtualLlmService { if (this.agents !== undefined) { await this.agents.markVirtualAgentsInactiveBySession(providerSessionId); } - // Reject any in-flight tasks for this session — the relay can't deliver - // a result POST anymore. - for (const t of this.tasksById.values()) { + // v5: revert claimed/running inference tasks back to pending so + // another worker on the same pool can pick them up. If no other + // worker is up, they stay queued for whenever one shows up. + if (this.tasks !== undefined) { + await this.tasks.revertHeldBy(providerSessionId); + } + // Reject any in-flight wake tasks for this session — those don't + // benefit from re-queue since the publisher itself went away. + for (const t of this.wakeTasks.values()) { if (t.sessionId === providerSessionId) { - this.failTask(t.taskId, new Error('publisher disconnected')); + this.wakeTasks.delete(t.taskId); + t.reject(new Error('publisher disconnected')); + } + } + } + + /** + * v5: when a worker binds its SSE channel, find every pending + * InferenceTask targeting a pool key this session owns, claim each, + * and push the frame down the channel. If two workers in the same + * pool race here, the repo's CAS on tryClaim ensures each task is + * pushed to exactly one of them. + */ + private async drainPendingForSession( + providerSessionId: string, + handle: VirtualSessionHandle, + ): Promise { + if (this.tasks === undefined) return; + const owned = await this.repo.findBySessionId(providerSessionId); + if (owned.length === 0) return; + const poolKeys = Array.from(new Set(owned.map((row) => effectivePoolName(row)))); + const pending = await this.tasks.findPendingForPools(poolKeys); + for (const task of pending) { + // Cap drain per bind to avoid pushing a giant backlog into a + // brand-new SSE channel all at once. Hardcoded for now; a + // per-session capacity hint is a v6 concern. + if (!handle.alive) break; + const claimed = await this.tasks.tryClaim(task.id, providerSessionId); + if (claimed === null) continue; // raced; another worker got it + try { + handle.pushTask({ + kind: 'infer', + taskId: claimed.id, + llmName: claimed.llmName, + request: claimed.requestBody as unknown as OpenAiChatRequest, + streaming: claimed.streaming, + }); + } catch (err) { + // SSE write failed mid-drain — revert the claim so a + // healthier worker can pick it up. + await this.tasks.revertHeldBy(providerSessionId); + // eslint-disable-next-line no-console + console.warn(`drainPendingForSession: pushTask failed for ${claimed.id}: ${(err as Error).message}`); + break; } } } @@ -256,7 +355,11 @@ export class VirtualLlmService implements IVirtualLlmService { llmName: string, request: OpenAiChatRequest, streaming: boolean, + options: EnqueueInferOptions = {}, ): Promise { + if (this.tasks === undefined) { + throw new Error('InferenceTaskService not wired into VirtualLlmService'); + } const llm = await this.repo.findByName(llmName); if (llm === null) throw new NotFoundError(`Llm not found: ${llmName}`); if (llm.kind !== 'virtual' || llm.providerSessionId === null) { @@ -265,67 +368,133 @@ export class VirtualLlmService implements IVirtualLlmService { { statusCode: 500 }, ); } - if (llm.status === 'inactive') { - throw Object.assign( - new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`), - { statusCode: 503 }, - ); - } - const handle = this.sessions.get(llm.providerSessionId); - if (handle === undefined || !handle.alive) { - throw Object.assign( - new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`), - { statusCode: 503 }, - ); + + // failFast callers (chat.service pool failover, direct infer route) + // get the v1-v4 semantic: row inactive OR no live session = 503, + // immediately. The chat dispatcher then iterates the next pool + // candidate. Without failFast, both cases queue durably and a + // future bindSession drains. + if (options.failFast === true) { + if (llm.status === 'inactive') { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`), + { statusCode: 503 }, + ); + } + const handle = this.sessions.get(llm.providerSessionId); + if (handle === undefined || !handle.alive) { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`), + { statusCode: 503 }, + ); + } } // ── Wake-on-demand (v2) ── // Status=hibernating means the publisher told us at register time - // (or via a later status update) that the backend is asleep. Fire a - // wake task and wait for the publisher to confirm readiness before - // dispatching the actual inference. Concurrent infers for the same - // Llm share a single wake promise. + // that the backend is asleep. Fire a wake task and wait for the + // publisher to confirm readiness before persisting the inference + // task. Concurrent infers for the same Llm share a single wake + // promise. We hold the wake INSIDE this method (not after enqueue) + // so we don't queue an inference task that we then can't dispatch. if (llm.status === 'hibernating') { + const handle = this.sessions.get(llm.providerSessionId); + if (handle === undefined || !handle.alive) { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' has no live SSE session; cannot wake`), + { statusCode: 503 }, + ); + } await this.ensureAwake(llm.id, llm.name, llm.providerSessionId, handle); } - const taskId = randomUUID(); - const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>(); - - let resolveDone!: (v: { status: number; body: unknown }) => void; - let rejectDone!: (err: Error) => void; - const done = new Promise<{ status: number; body: unknown }>((resolve, reject) => { - resolveDone = resolve; - rejectDone = reject; + // ── v5: persist the task BEFORE attempting dispatch ── + // Even if no worker is up, the row stays pending and a future + // bindSession will drain it. Caller's HTTP timeout still bounds + // the wait, but the *task* survives. + const created = await this.tasks.enqueue({ + poolName: effectivePoolName(llm), + llmName, + model: llm.model, + tier: llm.tier, + requestBody: request as unknown as Record, + streaming, + ownerId: this.resolveOwner(), }); - const pending: PendingTask = { - taskId, - sessionId: llm.providerSessionId, - llmName, - streaming, - resolveNonStreaming: (body, status) => resolveDone({ status, body }), - rejectNonStreaming: rejectDone, - pushChunk: streaming - ? (chunk): void => { for (const cb of chunkSubscribers) cb(chunk); } - : null, + // Try to claim + dispatch immediately if a session is up. If not, + // the row stays pending for drainPendingForSession to pick up. + const handle = this.sessions.get(llm.providerSessionId); + if (handle !== undefined && handle.alive) { + const claimed = await this.tasks.tryClaim(created.id, llm.providerSessionId); + if (claimed !== null) { + handle.pushTask({ + kind: 'infer', + taskId: claimed.id, + llmName, + request, + streaming, + }); + } + // tryClaim can return null only if another concurrent enqueue + // (or the GC sweep) already moved the row off pending — leave + // it; drainPendingForSession will reconcile. + } + + // Wrap the task service's waitFor into the legacy PendingTaskRef + // shape so existing callers (chat.service, llm-infer route) + // don't need to change. The chunk callback bridges + // InferenceTaskService.events into the on-the-fly subscriber set. + const tasks = this.tasks; + const taskId = created.id; + const subscribers = new Set<(chunk: InferenceTaskChunk) => void>(); + // Single bridge listener on the service's emitter; fans out to all + // subscribers locally so we don't pile up listeners on a hot key. + let bridgeUnsub: (() => void) | null = null; + const ensureBridge = (): void => { + if (bridgeUnsub !== null) return; + const onChunk = (chunk: InferenceTaskChunk): void => { + for (const cb of subscribers) cb(chunk); + }; + // Subscribe via the public pushChunk path: the service emits on + // its own EventEmitter; we attach a listener here. Use the + // service's events through a side channel — exposed below as + // subscribeChunks for clarity. + bridgeUnsub = tasks.subscribeChunksUnsafe(taskId, onChunk); }; - this.tasksById.set(taskId, pending); - handle.pushTask({ - kind: 'infer', - taskId, - llmName, - request, - streaming, - }); + const done = (async (): Promise<{ status: number; body: unknown }> => { + // waitFor's `done` rejects on cancel/error/timeout. For non- + // streaming tasks the responseBody IS the body; for streaming + // the body is null and chunks have already been piped through. + const waiter = tasks.waitFor(taskId, INFER_AWAIT_TIMEOUT_MS); + // If streaming, we must drain the chunks generator concurrently + // so chunks aren't dropped just because no one's subscribed yet. + // The real consumer subscribes via onChunk(); their cb fires + // synchronously inside the bridge. + if (streaming) ensureBridge(); + const finalRow = await waiter.done; + // Status code: legacy callers expect 200 on success; the worker + // sends its own status via the result POST and we forward it. + // Today, success POSTs come with status=200 and the row's + // responseBody is { status, body }. v5 stores the *body* directly + // on the row; we synthesize a 200 here to match the legacy shape. + return { status: 200, body: finalRow.responseBody }; + })(); return { taskId, done, onChunk(cb): () => void { - chunkSubscribers.add(cb); - return () => chunkSubscribers.delete(cb); + subscribers.add(cb); + ensureBridge(); + return (): void => { + subscribers.delete(cb); + if (subscribers.size === 0 && bridgeUnsub !== null) { + bridgeUnsub(); + bridgeUnsub = null; + } + }; }, }; } @@ -370,22 +539,17 @@ export class VirtualLlmService implements IVirtualLlmService { rejectDone = reject; }); - const pending: PendingTask = { + const wake: InMemoryWakeTask = { taskId, sessionId, llmName, - streaming: false, - // Wake tasks return { ok: true } on success or never resolve at - // all if the publisher dies; the rejectNonStreaming path covers - // the disconnect-mid-wake case via unbindSession. - resolveNonStreaming: (_body, status) => { + resolve: (status) => { if (status >= 200 && status < 300) resolveDone(); else rejectDone(new Error(`wake task returned status ${String(status)}`)); }, - rejectNonStreaming: rejectDone, - pushChunk: null, + reject: rejectDone, }; - this.tasksById.set(taskId, pending); + this.wakeTasks.set(taskId, wake); handle.pushTask({ kind: 'wake', taskId, llmName }); @@ -402,32 +566,55 @@ export class VirtualLlmService implements IVirtualLlmService { } completeTask(taskId: string, result: { status: number; body: unknown }): boolean { - const t = this.tasksById.get(taskId); - if (t === undefined) return false; - this.tasksById.delete(taskId); - t.resolveNonStreaming(result.body, result.status); - return true; + // Wake tasks: in-memory map. Resolve and bail. + const wake = this.wakeTasks.get(taskId); + if (wake !== undefined) { + this.wakeTasks.delete(taskId); + wake.resolve(result.status); + return true; + } + // Inference tasks: durable queue. Persist body + flip terminal + + // emit wakeup. Fire-and-forget the DB write; the result POST + // returns 200 either way (the caller's HTTP handler is what cares + // about the eventual emit, not the worker). + if (this.tasks !== undefined) { + void this.tasks.complete(taskId, result.body as Record | null); + return true; + } + return false; } pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean { - const t = this.tasksById.get(taskId); - if (t === undefined || t.pushChunk === null) return false; - t.pushChunk(chunk); + // Wake tasks never receive chunks — they're non-streaming control + // messages. Don't even check the wake map; if the id isn't an + // inference task, just drop the chunk. + if (this.tasks === undefined) return false; + // First chunk for a claimed task → flip claimed → running. Idempotent + // and fire-and-forget; if the row is already running, the CAS in the + // repo just no-ops. + void this.tasks.markRunning(taskId); + this.tasks.pushChunk(taskId, chunk); if (chunk.done === true) { - // For streaming tasks, also resolve the `done` promise so the route - // handler can clean up. - t.resolveNonStreaming(null, 200); - this.tasksById.delete(taskId); + // Streaming completion: persist with null body + flip terminal so + // the waiter unblocks. The actual content was already streamed + // through the chunks channel; nothing to store. + void this.tasks.complete(taskId, null); } return true; } failTask(taskId: string, error: Error): boolean { - const t = this.tasksById.get(taskId); - if (t === undefined) return false; - this.tasksById.delete(taskId); - t.rejectNonStreaming(error); - return true; + const wake = this.wakeTasks.get(taskId); + if (wake !== undefined) { + this.wakeTasks.delete(taskId); + wake.reject(error); + return true; + } + if (this.tasks !== undefined) { + void this.tasks.fail(taskId, error.message); + return true; + } + return false; } async gcSweep(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> { diff --git a/src/mcpd/tests/chat-service-virtual-llm.test.ts b/src/mcpd/tests/chat-service-virtual-llm.test.ts index a284928..5d86b3f 100644 --- a/src/mcpd/tests/chat-service-virtual-llm.test.ts +++ b/src/mcpd/tests/chat-service-virtual-llm.test.ts @@ -193,6 +193,10 @@ describe('ChatService — kind=virtual branch (v3 Stage 1)', () => { 'vllm-local', expect.objectContaining({ messages: expect.any(Array) }), false, + // v5: chat.service passes failFast:true so its pool failover loop + // surfaces transport errors quickly instead of waiting on the + // durable queue's 10-min timeout. + { failFast: true }, ); }); @@ -224,6 +228,7 @@ describe('ChatService — kind=virtual branch (v3 Stage 1)', () => { 'vllm-local', expect.objectContaining({ messages: expect.any(Array), stream: true }), true, + { failFast: true }, ); }); diff --git a/src/mcpd/tests/llm-infer-route.test.ts b/src/mcpd/tests/llm-infer-route.test.ts index 33279e3..e3bd110 100644 --- a/src/mcpd/tests/llm-infer-route.test.ts +++ b/src/mcpd/tests/llm-infer-route.test.ts @@ -244,6 +244,9 @@ describe('POST /api/v1/llms/:name/infer', () => { 'claude', expect.objectContaining({ messages: expect.any(Array) }), false, + // v5: direct infer route passes failFast:true so a downed publisher + // returns 503 immediately instead of queueing the task durably. + { failFast: true }, ); }); diff --git a/src/mcpd/tests/virtual-llm-service.test.ts b/src/mcpd/tests/virtual-llm-service.test.ts index 1944d3e..99b7c71 100644 --- a/src/mcpd/tests/virtual-llm-service.test.ts +++ b/src/mcpd/tests/virtual-llm-service.test.ts @@ -1,7 +1,10 @@ import { describe, it, expect, vi } from 'vitest'; import { VirtualLlmService, type VirtualSessionHandle } from '../src/services/virtual-llm.service.js'; +import { InferenceTaskService } from '../src/services/inference-task.service.js'; +import type { IInferenceTaskService } from '../src/services/inference-task.service.js'; +import type { IInferenceTaskRepository } from '../src/repositories/inference-task.repository.js'; import type { ILlmRepository } from '../src/repositories/llm.repository.js'; -import type { Llm } from '@prisma/client'; +import type { Llm, InferenceTask, InferenceTaskStatus } from '@prisma/client'; function makeLlm(overrides: Partial = {}): Llm { return { @@ -15,6 +18,7 @@ function makeLlm(overrides: Partial = {}): Llm { apiKeySecretId: null, apiKeySecretKey: null, extraConfig: {} as Llm['extraConfig'], + poolName: null, kind: 'virtual', providerSessionId: 's-1', lastHeartbeatAt: new Date(), @@ -27,6 +31,105 @@ function makeLlm(overrides: Partial = {}): Llm { }; } +/** + * Drop-in mock of `InferenceTaskService` backed by a Map. We mirror just + * enough of the real service's signaling — events for terminal/chunk — + * so VirtualLlmService's enqueue/result flows behave the same way they + * do in production. Nothing here talks to Postgres. + */ +function mockTaskService(): IInferenceTaskService { + // Build a minimal repo for the real InferenceTaskService — that way we + // exercise the actual event-emitter wakeup logic, just without a DB. + const rows = new Map(); + let n = 0; + const repo: IInferenceTaskRepository = { + create: vi.fn(async (data) => { + n += 1; + const row: InferenceTask = { + id: `task-${String(n)}`, + status: 'pending', + poolName: data.poolName, + llmName: data.llmName, + model: data.model, + tier: data.tier ?? null, + claimedBy: null, + requestBody: data.requestBody as InferenceTask['requestBody'], + responseBody: null, + errorMessage: null, + streaming: data.streaming, + createdAt: new Date(), + claimedAt: null, + streamStartedAt: null, + completedAt: null, + ownerId: data.ownerId, + agentId: data.agentId ?? null, + }; + rows.set(row.id, row); + return row; + }), + findById: vi.fn(async (id) => rows.get(id) ?? null), + findPendingForPools: vi.fn(async (poolNames: string[]) => + [...rows.values()].filter((r) => r.status === 'pending' && poolNames.includes(r.poolName)), + ), + findHeldBy: vi.fn(async (claimedBy: string) => + [...rows.values()].filter((r) => + r.claimedBy === claimedBy + && (r.status === 'claimed' || r.status === 'running')), + ), + list: vi.fn(async () => [...rows.values()]), + tryClaim: vi.fn(async (id, claimedBy, claimedAt) => { + const r = rows.get(id); + if (r === undefined || r.status !== 'pending') return null; + const next = { ...r, status: 'claimed' as InferenceTaskStatus, claimedBy, claimedAt }; + rows.set(id, next); + return next; + }), + markRunning: vi.fn(async (id, at) => { + const r = rows.get(id); + if (r === undefined || (r.status !== 'claimed' && r.status !== 'running')) return null; + const next = { ...r, status: 'running' as InferenceTaskStatus, streamStartedAt: at }; + rows.set(id, next); + return next; + }), + markCompleted: vi.fn(async (id, body, at) => { + const r = rows.get(id); + if (r === undefined || r.status === 'completed' || r.status === 'error' || r.status === 'cancelled') return null; + const next = { ...r, status: 'completed' as InferenceTaskStatus, responseBody: (body ?? null) as InferenceTask['responseBody'], completedAt: at }; + rows.set(id, next); + return next; + }), + markError: vi.fn(async (id, errorMessage, at) => { + const r = rows.get(id); + if (r === undefined || r.status === 'completed' || r.status === 'error' || r.status === 'cancelled') return null; + const next = { ...r, status: 'error' as InferenceTaskStatus, errorMessage, completedAt: at }; + rows.set(id, next); + return next; + }), + markCancelled: vi.fn(async (id, at) => { + const r = rows.get(id); + if (r === undefined || r.status === 'completed' || r.status === 'error' || r.status === 'cancelled') return null; + const next = { ...r, status: 'cancelled' as InferenceTaskStatus, completedAt: at }; + rows.set(id, next); + return next; + }), + revertToPending: vi.fn(async (id) => { + const r = rows.get(id); + if (r === undefined || (r.status !== 'claimed' && r.status !== 'running')) return null; + const next = { ...r, status: 'pending' as InferenceTaskStatus, claimedBy: null, claimedAt: null, streamStartedAt: null }; + rows.set(id, next); + return next; + }), + findStalePending: vi.fn(async () => []), + findExpiredTerminal: vi.fn(async () => []), + deleteMany: vi.fn(async (ids) => { + let c = 0; + for (const id of ids) if (rows.delete(id)) c += 1; + return c; + }), + }; + return new InferenceTaskService(repo); +} + function mockRepo(initial: Llm[] = []): ILlmRepository { const rows = new Map(initial.map((l) => [l.id, l])); let counter = rows.size; @@ -38,6 +141,14 @@ function mockRepo(initial: Llm[] = []): ILlmRepository { return null; }), findByTier: vi.fn(async () => []), + findByPoolName: vi.fn(async (poolName: string) => { + const out: Llm[] = []; + for (const l of rows.values()) { + if (l.poolName === poolName) out.push(l); + else if (l.poolName === null && l.name === poolName) out.push(l); + } + return out; + }), findBySessionId: vi.fn(async (sid: string) => [...rows.values()].filter((l) => l.providerSessionId === sid)), findStaleVirtuals: vi.fn(async (cutoff: Date) => @@ -105,7 +216,7 @@ function fakeSession(): VirtualSessionHandle & { tasks: Array; alive: b describe('VirtualLlmService', () => { it('register inserts new virtual rows with active status + sessionId', async () => { const repo = mockRepo(); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const { providerSessionId, llms } = await svc.register({ providerSessionId: null, providers: [ @@ -122,7 +233,7 @@ describe('VirtualLlmService', () => { it('register reuses the same row on sticky reconnect (same name + sessionId)', async () => { const repo = mockRepo(); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const first = await svc.register({ providerSessionId: 'fixed-id', providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], @@ -140,7 +251,7 @@ describe('VirtualLlmService', () => { it('register refuses to overwrite a public LLM with the same name', async () => { const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); await expect(svc.register({ providerSessionId: 'sess-x', providers: [{ name: 'qwen3-thinking', type: 'openai', model: 'm' }], @@ -149,7 +260,7 @@ describe('VirtualLlmService', () => { it('register refuses if another active session owns the name', async () => { const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'other', status: 'active' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); await expect(svc.register({ providerSessionId: 'mine', providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], @@ -161,7 +272,7 @@ describe('VirtualLlmService', () => { name: 'vllm-local', providerSessionId: 'old-session', status: 'inactive', inactiveSince: new Date(), })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const { llms } = await svc.register({ providerSessionId: 'new-session', providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], @@ -177,7 +288,7 @@ describe('VirtualLlmService', () => { name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past, })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); await svc.heartbeat('sess'); const row = await repo.findByName('vllm-local'); expect(row?.status).toBe('active'); @@ -191,7 +302,7 @@ describe('VirtualLlmService', () => { makeLlm({ name: 'b', providerSessionId: 'sess' }), makeLlm({ name: 'c', providerSessionId: 'other' }), ]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); svc.bindSession('sess', fakeSession()); await svc.unbindSession('sess'); expect((await repo.findByName('a'))?.status).toBe('inactive'); @@ -201,7 +312,7 @@ describe('VirtualLlmService', () => { it('enqueueInferTask pushes a task frame to the SSE session', async () => { const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const session = fakeSession(); svc.bindSession('sess', session); @@ -218,26 +329,41 @@ describe('VirtualLlmService', () => { expect(t.streaming).toBe(false); }); - it('enqueueInferTask rejects when the publisher is offline (no session bound)', async () => { + it('enqueueInferTask queues the task when no session is bound (durable, drains on bind)', async () => { + // v5 semantic change: with a durable queue underneath, "no worker + // up" no longer rejects — the row stays pending and a future + // bindSession drains it. Caller's HTTP handler awaits on ref.done + // and bounds itself with INFER_AWAIT_TIMEOUT_MS; from the service's + // POV the enqueue itself succeeds. const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); - const svc = new VirtualLlmService(repo); - await expect( - svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), - ).rejects.toThrow(/no live SSE session|publisher offline/); + const tasks = mockTaskService(); + const svc = new VirtualLlmService(repo, undefined, tasks); + const ref = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false); + expect(ref.taskId).toMatch(/^task-/); + // The row exists and is still pending — no claim happened. + const row = await tasks.findById(ref.taskId); + expect(row?.status).toBe('pending'); + expect(row?.claimedBy).toBeNull(); }); - it('enqueueInferTask rejects when the row is inactive', async () => { + it('enqueueInferTask still queues against an inactive row (pool may have a sibling worker)', async () => { + // v5 semantic change: status=inactive on a specific Llm doesn't + // mean the pool is dead — another mcplocal publishing the same + // poolName might be active. The dispatcher's bindSession drain + // matches by poolName, so even a "dead" pin queues correctly. const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]); - const svc = new VirtualLlmService(repo); - svc.bindSession('sess', fakeSession()); - await expect( - svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), - ).rejects.toThrow(/inactive|publisher offline/); + const tasks = mockTaskService(); + const svc = new VirtualLlmService(repo, undefined, tasks); + const ref = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false); + const row = await tasks.findById(ref.taskId); + expect(row?.status).toBe('pending'); + // No frame pushed because no session is bound. + expect(row?.claimedBy).toBeNull(); }); it('enqueueInferTask rejects when the LLM is public (not virtual)', async () => { const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); await expect( svc.enqueueInferTask('qwen3-thinking', { model: 'm', messages: [] }, false), ).rejects.toThrow(/not a virtual provider/); @@ -245,7 +371,7 @@ describe('VirtualLlmService', () => { it('completeTask resolves the pending non-streaming promise', async () => { const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); svc.bindSession('sess', fakeSession()); const ref = await svc.enqueueInferTask( 'vllm-local', @@ -258,7 +384,7 @@ describe('VirtualLlmService', () => { it('streaming: pushTaskChunk fans chunks to subscribers; done resolves the ref', async () => { const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); svc.bindSession('sess', fakeSession()); const ref = await svc.enqueueInferTask( 'vllm-local', @@ -278,7 +404,7 @@ describe('VirtualLlmService', () => { it('failTask rejects the pending promise with a clear error', async () => { const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); svc.bindSession('sess', fakeSession()); const ref = await svc.enqueueInferTask( 'vllm-local', @@ -289,17 +415,34 @@ describe('VirtualLlmService', () => { await expect(ref.done).rejects.toThrow(/upstream blew up/); }); - it('unbindSession rejects in-flight tasks for that session', async () => { + it('unbindSession reverts claimed inference tasks to pending (durable re-queue, not reject)', async () => { + // v5 semantic change: a worker disconnecting mid-task no longer + // *rejects* the task. The row goes back to pending so another + // worker on the same pool can pick it up. The original caller's + // ref.done keeps waiting up to its 10-min INFER_AWAIT_TIMEOUT_MS; + // the same caller is what gets the result whichever worker + // ultimately delivers it. const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); - const svc = new VirtualLlmService(repo); + const tasks = mockTaskService(); + const svc = new VirtualLlmService(repo, undefined, tasks); svc.bindSession('sess', fakeSession()); const ref = await svc.enqueueInferTask( 'vllm-local', { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, false, ); + // After enqueue with a session up, the task is claimed. + let row = await tasks.findById(ref.taskId); + expect(row?.status).toBe('claimed'); + expect(row?.claimedBy).toBe('sess'); + await svc.unbindSession('sess'); - await expect(ref.done).rejects.toThrow(/publisher disconnected/); + + // After disconnect, claimed/running rows revert to pending — ready + // for the next worker to drain. + row = await tasks.findById(ref.taskId); + expect(row?.status).toBe('pending'); + expect(row?.claimedBy).toBeNull(); }); it('gcSweep flips heartbeat-stale active virtuals to inactive', async () => { @@ -309,7 +452,7 @@ describe('VirtualLlmService', () => { makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), makeLlm({ name: 'fresh', providerSessionId: 'b', status: 'active', lastHeartbeatAt: recent }), ]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const result = await svc.gcSweep(); expect(result.markedInactive).toBe(1); expect((await repo.findByName('stale'))?.status).toBe('inactive'); @@ -324,7 +467,7 @@ describe('VirtualLlmService', () => { makeLlm({ name: 'recent', providerSessionId: 'b', status: 'inactive', inactiveSince: fresh }), makeLlm({ name: 'public-survivor', providerSessionId: null, kind: 'public' }), ]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const result = await svc.gcSweep(); expect(result.deleted).toBe(1); expect(await repo.findByName('old')).toBeNull(); @@ -336,7 +479,7 @@ describe('VirtualLlmService', () => { it('hibernating: dispatches a wake task first and waits for it to complete before infer', async () => { const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const session = fakeSession(); svc.bindSession('sess', session); @@ -370,7 +513,7 @@ describe('VirtualLlmService', () => { it('hibernating: concurrent infer requests share a single wake task', async () => { const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const session = fakeSession(); svc.bindSession('sess', session); @@ -398,7 +541,7 @@ describe('VirtualLlmService', () => { it('hibernating: rejects when the wake task fails', async () => { const repo = mockRepo([makeLlm({ name: 'broken', providerSessionId: 'sess', status: 'hibernating' })]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); svc.bindSession('sess', fakeSession()); const inferPromise = svc.enqueueInferTask( @@ -408,12 +551,12 @@ describe('VirtualLlmService', () => { ); await new Promise((r) => setTimeout(r, 0)); - // Get the wake task id from the in-flight tasks map (its only entry). - // We test the failure path via failTask which is part of the public - // surface used by the result-POST route handler. + // v5: wake tasks live in `wakeTasks` (in-memory). Inference tasks + // moved to the DB-backed queue but wake is publisher-control work + // that doesn't need durability — we kept the in-memory map for it. const taskIds: string[] = []; // eslint-disable-next-line @typescript-eslint/no-explicit-any - for (const id of (svc as any).tasksById.keys()) taskIds.push(id); + for (const id of (svc as any).wakeTasks.keys()) taskIds.push(id); expect(taskIds).toHaveLength(1); expect(svc.failTask(taskIds[0]!, new Error('wake recipe failed'))).toBe(true); @@ -424,14 +567,28 @@ describe('VirtualLlmService', () => { expect(row?.status).toBe('hibernating'); }); - it('inactive: still rejects with 503 (publisher offline) — wake path only fires for hibernating', async () => { + it('inactive: queues without firing the wake path — wake only triggers on status=hibernating', async () => { + // Coverage for the v5 inactive-vs-hibernating distinction. + // hibernating = "publisher told us the backend is asleep, ask + // them to wake it"; inactive = "publisher itself is offline". + // For inactive rows, queueing is the right behavior (wait for a + // worker on the pool to come online and drain). The wake path + // must NOT fire — wake is opt-in via the publisher's register + // payload, not a generic "row is down" recovery. + // + // No session bind here: an "inactive" row in production means + // unbindSession already flipped it after SSE close. Binding a + // session for the same providerSessionId would be a contradictory + // setup that the test wouldn't model anything real about. const repo = mockRepo([makeLlm({ name: 'gone', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]); - const svc = new VirtualLlmService(repo); - svc.bindSession('sess', fakeSession()); + const tasks = mockTaskService(); + const svc = new VirtualLlmService(repo, undefined, tasks); - await expect( - svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false), - ).rejects.toThrow(/inactive|publisher offline/); + const ref = await svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false); + // Task queued in pending; no claim, no frame. + const row = await tasks.findById(ref.taskId); + expect(row?.status).toBe('pending'); + expect(row?.claimedBy).toBeNull(); }); it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => { @@ -439,11 +596,75 @@ describe('VirtualLlmService', () => { const repo = mockRepo([ makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), ]); - const svc = new VirtualLlmService(repo); + const svc = new VirtualLlmService(repo, undefined, mockTaskService()); const first = await svc.gcSweep(); const second = await svc.gcSweep(); expect(first.markedInactive).toBe(1); expect(second.markedInactive).toBe(0); expect(second.deleted).toBe(0); }); + + // ── v5: durable queue + drain-on-bind ── + + it('bindSession drains pending inference tasks owned by the session\'s pool keys', async () => { + // Two enqueues land while no session is bound. Each row is created + // with status=pending; no SSE frame goes anywhere. When the worker + // finally binds, the drain loop claims both and pushes the frames. + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', poolName: 'qwen-pool' })]); + const tasks = mockTaskService(); + const svc = new VirtualLlmService(repo, undefined, tasks); + const ref1 = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [{ role: 'user', content: 'one' }] }, false); + const ref2 = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [{ role: 'user', content: 'two' }] }, false); + // Both rows are still pending — no worker bound yet. + expect((await tasks.findById(ref1.taskId))?.status).toBe('pending'); + expect((await tasks.findById(ref2.taskId))?.status).toBe('pending'); + + // Worker shows up. Drain runs synchronously enough that we just + // need to flush the microtask queue before checking SSE frames. + const session = fakeSession(); + svc.bindSession('sess', session); + // drainPendingForSession is fired with `void` so let microtasks + // settle before asserting. + await new Promise((r) => setTimeout(r, 0)); + + expect((session.tasks as Array<{ kind: string; taskId: string }>).map((t) => t.taskId).sort()) + .toEqual([ref1.taskId, ref2.taskId].sort()); + // Rows are now claimed by this session. + expect((await tasks.findById(ref1.taskId))?.status).toBe('claimed'); + expect((await tasks.findById(ref1.taskId))?.claimedBy).toBe('sess'); + }); + + it('drain-on-bind matches the effective pool key, not just llm.name', async () => { + // The pinned Llm has name=vllm-alice but poolName=qwen-pool. + // Enqueue against vllm-alice → row.poolName=qwen-pool. + // Worker binds with a session that owns vllm-alice (same pool key). + // Drain must surface this row even though poolName != name. + const repo = mockRepo([makeLlm({ name: 'vllm-alice', providerSessionId: 'sess', poolName: 'qwen-pool' })]); + const tasks = mockTaskService(); + const svc = new VirtualLlmService(repo, undefined, tasks); + const ref = await svc.enqueueInferTask('vllm-alice', { model: 'm', messages: [] }, false); + expect((await tasks.findById(ref.taskId))?.poolName).toBe('qwen-pool'); + + const session = fakeSession(); + svc.bindSession('sess', session); + await new Promise((r) => setTimeout(r, 0)); + expect((session.tasks as Array<{ taskId: string }>).map((t) => t.taskId)).toEqual([ref.taskId]); + }); + + it('completeTask via the result-route updates the DB row + emits the wakeup', async () => { + // End-to-end through the public surface: enqueue → claim happens + // because session is bound → worker POSTs result → completeTask + // routes to InferenceTaskService.complete → ref.done resolves. + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const tasks = mockTaskService(); + const svc = new VirtualLlmService(repo, undefined, tasks); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false); + + expect(svc.completeTask(ref.taskId, { status: 200, body: { ok: true } })).toBe(true); + await expect(ref.done).resolves.toEqual({ status: 200, body: { ok: true } }); + const row = await tasks.findById(ref.taskId); + expect(row?.status).toBe('completed'); + expect(row?.responseBody).toEqual({ ok: true }); + }); });