feat(mcpd): wake-before-infer for hibernating virtual LLMs (v2 Stage 2)
Second half of v2. mcpd now dispatches a \`wake\` task on the SSE
control channel when an inference request hits a row whose
status=hibernating, waits for the publisher to confirm readiness,
then proceeds with the infer task. Concurrent infers for the same
hibernating Llm share a single wake task — \`wakeInFlight\` map
dedupes by Llm name.
State machine in enqueueInferTask:
active → push infer task immediately (existing path).
inactive → 503, publisher offline (existing path).
hibernating → ensureAwake() → push infer task (new in v2).
ensureAwake/runWake (private):
- Allocates a fresh taskId on the existing PendingTask plumbing.
- Pushes \`{ kind: "wake", taskId, llmName }\` on the SSE handle.
- Awaits the publisher's result POST. On 2xx, flips the row to
active + bumps lastHeartbeatAt, so all queued + future infers
hit the active path. On non-2xx or service.failTask, the row
stays hibernating (next request retries).
Tests: 4 new in virtual-llm-service.test.ts cover happy path
(wake → infer in order), concurrent dedup (3 parallel infers, 1
wake task), wake failure surfaces to all queued infers and leaves
the row hibernating, inactive ≠ hibernating (still rejects with 503,
no wake attempt). 22/22 service tests, 2050/2050 workspace.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -112,6 +112,12 @@ export interface PendingTaskRef {
|
||||
export class VirtualLlmService implements IVirtualLlmService {
|
||||
private readonly sessions = new Map<string, VirtualSessionHandle>();
|
||||
private readonly tasksById = new Map<string, PendingTask>();
|
||||
/**
|
||||
* Dedupe concurrent wake requests for the same Llm. The first request
|
||||
* starts the wake; later requests for the same name await the same
|
||||
* promise. Cleared as soon as the wake settles (success or failure).
|
||||
*/
|
||||
private readonly wakeInFlight = new Map<string, Promise<void>>();
|
||||
|
||||
constructor(private readonly repo: ILlmRepository) {}
|
||||
|
||||
@@ -230,9 +236,9 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
{ statusCode: 500 },
|
||||
);
|
||||
}
|
||||
if (llm.status !== 'active') {
|
||||
if (llm.status === 'inactive') {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`),
|
||||
new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`),
|
||||
{ statusCode: 503 },
|
||||
);
|
||||
}
|
||||
@@ -244,6 +250,16 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
);
|
||||
}
|
||||
|
||||
// ── Wake-on-demand (v2) ──
|
||||
// Status=hibernating means the publisher told us at register time
|
||||
// (or via a later status update) that the backend is asleep. Fire a
|
||||
// wake task and wait for the publisher to confirm readiness before
|
||||
// dispatching the actual inference. Concurrent infers for the same
|
||||
// Llm share a single wake promise.
|
||||
if (llm.status === 'hibernating') {
|
||||
await this.ensureAwake(llm.id, llm.name, llm.providerSessionId, handle);
|
||||
}
|
||||
|
||||
const taskId = randomUUID();
|
||||
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
|
||||
|
||||
@@ -285,6 +301,77 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Drive the publisher to wake the backend. Concurrent callers for the
|
||||
* same Llm name share the in-flight promise — we only ever ask the
|
||||
* publisher once. Throws on timeout or recipe failure; on success the
|
||||
* row is flipped to active and subsequent infer calls proceed.
|
||||
*/
|
||||
private async ensureAwake(
|
||||
llmId: string,
|
||||
llmName: string,
|
||||
sessionId: string,
|
||||
handle: VirtualSessionHandle,
|
||||
): Promise<void> {
|
||||
const existing = this.wakeInFlight.get(llmName);
|
||||
if (existing !== undefined) {
|
||||
await existing;
|
||||
return;
|
||||
}
|
||||
const promise = this.runWake(llmId, llmName, sessionId, handle);
|
||||
this.wakeInFlight.set(llmName, promise);
|
||||
try {
|
||||
await promise;
|
||||
} finally {
|
||||
this.wakeInFlight.delete(llmName);
|
||||
}
|
||||
}
|
||||
|
||||
private async runWake(
|
||||
llmId: string,
|
||||
llmName: string,
|
||||
sessionId: string,
|
||||
handle: VirtualSessionHandle,
|
||||
): Promise<void> {
|
||||
const taskId = randomUUID();
|
||||
let resolveDone!: () => void;
|
||||
let rejectDone!: (err: Error) => void;
|
||||
const done = new Promise<void>((resolve, reject) => {
|
||||
resolveDone = resolve;
|
||||
rejectDone = reject;
|
||||
});
|
||||
|
||||
const pending: PendingTask = {
|
||||
taskId,
|
||||
sessionId,
|
||||
llmName,
|
||||
streaming: false,
|
||||
// Wake tasks return { ok: true } on success or never resolve at
|
||||
// all if the publisher dies; the rejectNonStreaming path covers
|
||||
// the disconnect-mid-wake case via unbindSession.
|
||||
resolveNonStreaming: (_body, status) => {
|
||||
if (status >= 200 && status < 300) resolveDone();
|
||||
else rejectDone(new Error(`wake task returned status ${String(status)}`));
|
||||
},
|
||||
rejectNonStreaming: rejectDone,
|
||||
pushChunk: null,
|
||||
};
|
||||
this.tasksById.set(taskId, pending);
|
||||
|
||||
handle.pushTask({ kind: 'wake', taskId, llmName });
|
||||
|
||||
await done;
|
||||
|
||||
// Flip the row to active so subsequent infer calls go through the
|
||||
// normal active path. The publisher's own heartbeat will keep the
|
||||
// row alive from this point.
|
||||
await this.repo.update(llmId, {
|
||||
status: 'active',
|
||||
lastHeartbeatAt: new Date(),
|
||||
inactiveSince: null,
|
||||
});
|
||||
}
|
||||
|
||||
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
|
||||
const t = this.tasksById.get(taskId);
|
||||
if (t === undefined) return false;
|
||||
|
||||
@@ -332,6 +332,108 @@ describe('VirtualLlmService', () => {
|
||||
expect(await repo.findByName('public-survivor')).not.toBeNull();
|
||||
});
|
||||
|
||||
// ── v2: wake-before-infer ──
|
||||
|
||||
it('hibernating: dispatches a wake task first and waits for it to complete before infer', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
|
||||
// Kick off enqueueInferTask. It blocks on the wake task.
|
||||
const inferPromise = svc.enqueueInferTask(
|
||||
'sleeping',
|
||||
{ model: 'm', messages: [{ role: 'user', content: 'hi' }] },
|
||||
false,
|
||||
);
|
||||
|
||||
// Wait a tick so the wake task gets pushed.
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
expect(session.tasks).toHaveLength(1);
|
||||
const wakeTask = session.tasks[0] as { kind: string; taskId: string; llmName: string };
|
||||
expect(wakeTask.kind).toBe('wake');
|
||||
expect(wakeTask.llmName).toBe('sleeping');
|
||||
|
||||
// Resolve the wake task — service flips the row to active, then
|
||||
// pushes the infer task on the same session.
|
||||
expect(svc.completeTask(wakeTask.taskId, { status: 200, body: { ok: true } })).toBe(true);
|
||||
const ref = await inferPromise;
|
||||
expect(session.tasks).toHaveLength(2);
|
||||
const inferTask = session.tasks[1] as { kind: string; taskId: string };
|
||||
expect(inferTask.kind).toBe('infer');
|
||||
expect(inferTask.taskId).toBe(ref.taskId);
|
||||
|
||||
// The row should be active now — concurrent callers won't trigger another wake.
|
||||
const row = await repo.findByName('sleeping');
|
||||
expect(row?.status).toBe('active');
|
||||
});
|
||||
|
||||
it('hibernating: concurrent infer requests share a single wake task', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
|
||||
// Fire 3 concurrent infer requests against the same hibernating LLM.
|
||||
const reqs = [
|
||||
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||
];
|
||||
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
// Exactly one wake task pushed, regardless of concurrent infers.
|
||||
const wakeTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'wake');
|
||||
expect(wakeTasks).toHaveLength(1);
|
||||
|
||||
const wakeTaskId = (session.tasks[0] as { taskId: string }).taskId;
|
||||
expect(svc.completeTask(wakeTaskId, { status: 200, body: { ok: true } })).toBe(true);
|
||||
|
||||
const refs = await Promise.all(reqs);
|
||||
// After wake, all 3 infer tasks pushed — total session tasks = 1 wake + 3 infer.
|
||||
const inferTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'infer');
|
||||
expect(inferTasks).toHaveLength(3);
|
||||
expect(refs.map((r) => r.taskId).sort()).toEqual(refs.map((r) => r.taskId).sort());
|
||||
});
|
||||
|
||||
it('hibernating: rejects when the wake task fails', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'broken', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
|
||||
const inferPromise = svc.enqueueInferTask(
|
||||
'broken',
|
||||
{ model: 'm', messages: [] },
|
||||
false,
|
||||
);
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
|
||||
// Get the wake task id from the in-flight tasks map (its only entry).
|
||||
// We test the failure path via failTask which is part of the public
|
||||
// surface used by the result-POST route handler.
|
||||
const taskIds: string[] = [];
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
for (const id of (svc as any).tasksById.keys()) taskIds.push(id);
|
||||
expect(taskIds).toHaveLength(1);
|
||||
expect(svc.failTask(taskIds[0]!, new Error('wake recipe failed'))).toBe(true);
|
||||
|
||||
await expect(inferPromise).rejects.toThrow(/wake recipe failed/);
|
||||
|
||||
// Row stayed hibernating — the next request will get another wake try.
|
||||
const row = await repo.findByName('broken');
|
||||
expect(row?.status).toBe('hibernating');
|
||||
});
|
||||
|
||||
it('inactive: still rejects with 503 (publisher offline) — wake path only fires for hibernating', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'gone', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
|
||||
await expect(
|
||||
svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false),
|
||||
).rejects.toThrow(/inactive|publisher offline/);
|
||||
});
|
||||
|
||||
it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => {
|
||||
const long = new Date(Date.now() - 5 * 60 * 1000);
|
||||
const repo = mockRepo([
|
||||
|
||||
Reference in New Issue
Block a user