Second half of v2. mcpd now dispatches a \`wake\` task on the SSE
control channel when an inference request hits a row whose
status=hibernating, waits for the publisher to confirm readiness,
then proceeds with the infer task. Concurrent infers for the same
hibernating Llm share a single wake task — \`wakeInFlight\` map
dedupes by Llm name.
State machine in enqueueInferTask:
active → push infer task immediately (existing path).
inactive → 503, publisher offline (existing path).
hibernating → ensureAwake() → push infer task (new in v2).
ensureAwake/runWake (private):
- Allocates a fresh taskId on the existing PendingTask plumbing.
- Pushes \`{ kind: "wake", taskId, llmName }\` on the SSE handle.
- Awaits the publisher's result POST. On 2xx, flips the row to
active + bumps lastHeartbeatAt, so all queued + future infers
hit the active path. On non-2xx or service.failTask, the row
stays hibernating (next request retries).
Tests: 4 new in virtual-llm-service.test.ts cover happy path
(wake → infer in order), concurrent dedup (3 parallel infers, 1
wake task), wake failure surfaces to all queued infers and leaves
the row hibernating, inactive ≠ hibernating (still rejects with 503,
no wake attempt). 22/22 service tests, 2050/2050 workspace.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
425 lines
15 KiB
TypeScript
425 lines
15 KiB
TypeScript
/**
|
|
* VirtualLlmService — lifecycle for `kind=virtual` Llm rows.
|
|
*
|
|
* The story end-to-end:
|
|
* 1. mcplocal POSTs `/api/v1/llms/_provider-register` with the providers
|
|
* it wants to publish. We upsert each into the `Llm` table marked
|
|
* kind=virtual / status=active and return a stable
|
|
* `providerSessionId` to the caller.
|
|
* 2. mcplocal opens the SSE channel on `/api/v1/llms/_provider-stream`.
|
|
* `bindSession()` records the SSE handle in memory keyed by
|
|
* `providerSessionId`. Disconnect → `disconnect()` flips the rows to
|
|
* inactive immediately.
|
|
* 3. Heartbeats land on `/api/v1/llms/_provider-heartbeat` and bump
|
|
* `lastHeartbeatAt`. The 60-s GC sweep moves heartbeat-stale rows to
|
|
* inactive (catches sessions whose disconnect we missed) and deletes
|
|
* anything inactive past the 4-h cutoff.
|
|
* 4. At inference time `/api/v1/llms/:name/infer` resolves the row, sees
|
|
* kind=virtual, and asks `enqueueInferTask()` to relay through the SSE
|
|
* session. The session pumps the OpenAI body to mcplocal as a `task`
|
|
* frame and waits for the result POST on
|
|
* `/api/v1/llms/_provider-task/:taskId/result`.
|
|
*
|
|
* In v1 there's no wake-on-demand (v2) and no LB pool (v4). One open SSE
|
|
* session per `providerSessionId`; one inference at a time per task id.
|
|
*/
|
|
import type { Llm } from '@prisma/client';
|
|
import { randomUUID } from 'node:crypto';
|
|
import type { ILlmRepository } from '../repositories/llm.repository.js';
|
|
import type { OpenAiChatRequest } from './llm/types.js';
|
|
import { NotFoundError } from './mcp-server.service.js';
|
|
|
|
/** A virtual provider's announcement at registration time. */
|
|
export interface RegisterProviderInput {
|
|
name: string;
|
|
type: string;
|
|
model: string;
|
|
tier?: string;
|
|
description?: string;
|
|
extraConfig?: Record<string, unknown>;
|
|
/**
|
|
* Optional. Lets the publisher hint that the underlying backend is
|
|
* asleep — mcpd records the row as `hibernating` and will dispatch a
|
|
* `wake` task before any inference. Defaults to `active` (today's
|
|
* behavior). v2 publishers (mcplocal with a configured wake recipe)
|
|
* pass 'hibernating' when `LlmProvider.isAvailable()` returns false at
|
|
* publish time.
|
|
*/
|
|
initialStatus?: 'active' | 'hibernating';
|
|
}
|
|
|
|
export interface RegisterResult {
|
|
providerSessionId: string;
|
|
llms: Llm[];
|
|
}
|
|
|
|
/**
|
|
* In-memory handle for a live SSE session. The route owns the actual
|
|
* Fastify reply object; this interface is what the service expects from
|
|
* it. Decouples the service from Fastify so unit tests can use a stub.
|
|
*/
|
|
export interface VirtualSessionHandle {
|
|
/** Send a server-sent task frame to the publisher (`event: task`). */
|
|
pushTask(task: VirtualTaskFrame): void;
|
|
/** True iff the underlying SSE response is still writable. */
|
|
readonly alive: boolean;
|
|
}
|
|
|
|
export type VirtualTaskFrame =
|
|
| { kind: 'infer'; taskId: string; llmName: string; request: OpenAiChatRequest; streaming: boolean }
|
|
// v2 wake task lives here so the SSE protocol stays additive.
|
|
| { kind: 'wake'; taskId: string; llmName: string };
|
|
|
|
/**
|
|
* Pending inference task. The route handler awaits `done`; the result POST
|
|
* resolves it via `completeTask()`. The error path rejects via `failTask()`.
|
|
*/
|
|
interface PendingTask {
|
|
taskId: string;
|
|
sessionId: string;
|
|
llmName: string;
|
|
streaming: boolean;
|
|
resolveNonStreaming: (body: unknown, status: number) => void;
|
|
rejectNonStreaming: (err: Error) => void;
|
|
/** For streaming tasks only; null on non-streaming. */
|
|
pushChunk: ((chunk: { data: string; done?: boolean }) => void) | null;
|
|
}
|
|
|
|
const HEARTBEAT_TIMEOUT_MS = 90_000;
|
|
const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; // 4 h
|
|
|
|
export interface IVirtualLlmService {
|
|
register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult>;
|
|
heartbeat(providerSessionId: string): Promise<void>;
|
|
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void;
|
|
unbindSession(providerSessionId: string): Promise<void>;
|
|
enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean): Promise<PendingTaskRef>;
|
|
completeTask(taskId: string, result: { status: number; body: unknown }): boolean;
|
|
pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean;
|
|
failTask(taskId: string, error: Error): boolean;
|
|
gcSweep(now?: Date): Promise<{ markedInactive: number; deleted: number }>;
|
|
}
|
|
|
|
/** Returned to the route handler so it can await the result. */
|
|
export interface PendingTaskRef {
|
|
taskId: string;
|
|
/** Resolves for non-streaming tasks. Streaming tasks reject this — use the chunk callback path. */
|
|
done: Promise<{ status: number; body: unknown }>;
|
|
/** Streaming-only: subscribe to chunks. Returns an unsubscribe fn. */
|
|
onChunk(cb: (chunk: { data: string; done?: boolean }) => void): () => void;
|
|
}
|
|
|
|
export class VirtualLlmService implements IVirtualLlmService {
|
|
private readonly sessions = new Map<string, VirtualSessionHandle>();
|
|
private readonly tasksById = new Map<string, PendingTask>();
|
|
/**
|
|
* Dedupe concurrent wake requests for the same Llm. The first request
|
|
* starts the wake; later requests for the same name await the same
|
|
* promise. Cleared as soon as the wake settles (success or failure).
|
|
*/
|
|
private readonly wakeInFlight = new Map<string, Promise<void>>();
|
|
|
|
constructor(private readonly repo: ILlmRepository) {}
|
|
|
|
async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> {
|
|
const sessionId = input.providerSessionId ?? randomUUID();
|
|
const now = new Date();
|
|
const llms: Llm[] = [];
|
|
|
|
for (const p of input.providers) {
|
|
const initialStatus = p.initialStatus ?? 'active';
|
|
const existing = await this.repo.findByName(p.name);
|
|
if (existing === null) {
|
|
const created = await this.repo.create({
|
|
name: p.name,
|
|
type: p.type,
|
|
model: p.model,
|
|
tier: p.tier ?? 'fast',
|
|
description: p.description ?? '',
|
|
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
|
kind: 'virtual',
|
|
providerSessionId: sessionId,
|
|
status: initialStatus,
|
|
lastHeartbeatAt: now,
|
|
inactiveSince: null,
|
|
});
|
|
llms.push(created);
|
|
continue;
|
|
}
|
|
|
|
// Existing row. Only allowed to (re-)register over a virtual row owned
|
|
// by the same session, OR an inactive virtual whose session went away
|
|
// (sticky reconnect). Refuse to overwrite a public row or someone
|
|
// else's active virtual.
|
|
if (existing.kind === 'public') {
|
|
throw Object.assign(
|
|
new Error(`Cannot publish over public LLM: ${p.name}`),
|
|
{ statusCode: 409 },
|
|
);
|
|
}
|
|
if (existing.providerSessionId !== sessionId && existing.status === 'active') {
|
|
throw Object.assign(
|
|
new Error(`Virtual LLM '${p.name}' is already active under a different session`),
|
|
{ statusCode: 409 },
|
|
);
|
|
}
|
|
|
|
const updated = await this.repo.update(existing.id, {
|
|
type: p.type,
|
|
model: p.model,
|
|
...(p.tier !== undefined ? { tier: p.tier } : {}),
|
|
...(p.description !== undefined ? { description: p.description } : {}),
|
|
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
|
kind: 'virtual',
|
|
providerSessionId: sessionId,
|
|
status: initialStatus,
|
|
lastHeartbeatAt: now,
|
|
inactiveSince: null,
|
|
});
|
|
llms.push(updated);
|
|
}
|
|
|
|
return { providerSessionId: sessionId, llms };
|
|
}
|
|
|
|
async heartbeat(providerSessionId: string): Promise<void> {
|
|
const owned = await this.repo.findBySessionId(providerSessionId);
|
|
if (owned.length === 0) return;
|
|
const now = new Date();
|
|
for (const row of owned) {
|
|
// Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a
|
|
// network blip that lapsed the SSE), revive it.
|
|
await this.repo.update(row.id, {
|
|
lastHeartbeatAt: now,
|
|
...(row.status === 'inactive'
|
|
? { status: 'active', inactiveSince: null }
|
|
: {}),
|
|
});
|
|
}
|
|
}
|
|
|
|
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void {
|
|
// Replace any prior handle for this session — keeps "last writer wins"
|
|
// simple. The old SSE will have been closed by the publisher anyway.
|
|
this.sessions.set(providerSessionId, handle);
|
|
}
|
|
|
|
async unbindSession(providerSessionId: string): Promise<void> {
|
|
this.sessions.delete(providerSessionId);
|
|
// Flip every Llm owned by that session to inactive immediately.
|
|
const owned = await this.repo.findBySessionId(providerSessionId);
|
|
const now = new Date();
|
|
for (const row of owned) {
|
|
if (row.status === 'active') {
|
|
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
|
|
}
|
|
}
|
|
// Reject any in-flight tasks for this session — the relay can't deliver
|
|
// a result POST anymore.
|
|
for (const t of this.tasksById.values()) {
|
|
if (t.sessionId === providerSessionId) {
|
|
this.failTask(t.taskId, new Error('publisher disconnected'));
|
|
}
|
|
}
|
|
}
|
|
|
|
async enqueueInferTask(
|
|
llmName: string,
|
|
request: OpenAiChatRequest,
|
|
streaming: boolean,
|
|
): Promise<PendingTaskRef> {
|
|
const llm = await this.repo.findByName(llmName);
|
|
if (llm === null) throw new NotFoundError(`Llm not found: ${llmName}`);
|
|
if (llm.kind !== 'virtual' || llm.providerSessionId === null) {
|
|
throw Object.assign(
|
|
new Error(`Llm '${llmName}' is not a virtual provider`),
|
|
{ statusCode: 500 },
|
|
);
|
|
}
|
|
if (llm.status === 'inactive') {
|
|
throw Object.assign(
|
|
new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`),
|
|
{ statusCode: 503 },
|
|
);
|
|
}
|
|
const handle = this.sessions.get(llm.providerSessionId);
|
|
if (handle === undefined || !handle.alive) {
|
|
throw Object.assign(
|
|
new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`),
|
|
{ statusCode: 503 },
|
|
);
|
|
}
|
|
|
|
// ── Wake-on-demand (v2) ──
|
|
// Status=hibernating means the publisher told us at register time
|
|
// (or via a later status update) that the backend is asleep. Fire a
|
|
// wake task and wait for the publisher to confirm readiness before
|
|
// dispatching the actual inference. Concurrent infers for the same
|
|
// Llm share a single wake promise.
|
|
if (llm.status === 'hibernating') {
|
|
await this.ensureAwake(llm.id, llm.name, llm.providerSessionId, handle);
|
|
}
|
|
|
|
const taskId = randomUUID();
|
|
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
|
|
|
|
let resolveDone!: (v: { status: number; body: unknown }) => void;
|
|
let rejectDone!: (err: Error) => void;
|
|
const done = new Promise<{ status: number; body: unknown }>((resolve, reject) => {
|
|
resolveDone = resolve;
|
|
rejectDone = reject;
|
|
});
|
|
|
|
const pending: PendingTask = {
|
|
taskId,
|
|
sessionId: llm.providerSessionId,
|
|
llmName,
|
|
streaming,
|
|
resolveNonStreaming: (body, status) => resolveDone({ status, body }),
|
|
rejectNonStreaming: rejectDone,
|
|
pushChunk: streaming
|
|
? (chunk): void => { for (const cb of chunkSubscribers) cb(chunk); }
|
|
: null,
|
|
};
|
|
this.tasksById.set(taskId, pending);
|
|
|
|
handle.pushTask({
|
|
kind: 'infer',
|
|
taskId,
|
|
llmName,
|
|
request,
|
|
streaming,
|
|
});
|
|
|
|
return {
|
|
taskId,
|
|
done,
|
|
onChunk(cb): () => void {
|
|
chunkSubscribers.add(cb);
|
|
return () => chunkSubscribers.delete(cb);
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Drive the publisher to wake the backend. Concurrent callers for the
|
|
* same Llm name share the in-flight promise — we only ever ask the
|
|
* publisher once. Throws on timeout or recipe failure; on success the
|
|
* row is flipped to active and subsequent infer calls proceed.
|
|
*/
|
|
private async ensureAwake(
|
|
llmId: string,
|
|
llmName: string,
|
|
sessionId: string,
|
|
handle: VirtualSessionHandle,
|
|
): Promise<void> {
|
|
const existing = this.wakeInFlight.get(llmName);
|
|
if (existing !== undefined) {
|
|
await existing;
|
|
return;
|
|
}
|
|
const promise = this.runWake(llmId, llmName, sessionId, handle);
|
|
this.wakeInFlight.set(llmName, promise);
|
|
try {
|
|
await promise;
|
|
} finally {
|
|
this.wakeInFlight.delete(llmName);
|
|
}
|
|
}
|
|
|
|
private async runWake(
|
|
llmId: string,
|
|
llmName: string,
|
|
sessionId: string,
|
|
handle: VirtualSessionHandle,
|
|
): Promise<void> {
|
|
const taskId = randomUUID();
|
|
let resolveDone!: () => void;
|
|
let rejectDone!: (err: Error) => void;
|
|
const done = new Promise<void>((resolve, reject) => {
|
|
resolveDone = resolve;
|
|
rejectDone = reject;
|
|
});
|
|
|
|
const pending: PendingTask = {
|
|
taskId,
|
|
sessionId,
|
|
llmName,
|
|
streaming: false,
|
|
// Wake tasks return { ok: true } on success or never resolve at
|
|
// all if the publisher dies; the rejectNonStreaming path covers
|
|
// the disconnect-mid-wake case via unbindSession.
|
|
resolveNonStreaming: (_body, status) => {
|
|
if (status >= 200 && status < 300) resolveDone();
|
|
else rejectDone(new Error(`wake task returned status ${String(status)}`));
|
|
},
|
|
rejectNonStreaming: rejectDone,
|
|
pushChunk: null,
|
|
};
|
|
this.tasksById.set(taskId, pending);
|
|
|
|
handle.pushTask({ kind: 'wake', taskId, llmName });
|
|
|
|
await done;
|
|
|
|
// Flip the row to active so subsequent infer calls go through the
|
|
// normal active path. The publisher's own heartbeat will keep the
|
|
// row alive from this point.
|
|
await this.repo.update(llmId, {
|
|
status: 'active',
|
|
lastHeartbeatAt: new Date(),
|
|
inactiveSince: null,
|
|
});
|
|
}
|
|
|
|
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
|
|
const t = this.tasksById.get(taskId);
|
|
if (t === undefined) return false;
|
|
this.tasksById.delete(taskId);
|
|
t.resolveNonStreaming(result.body, result.status);
|
|
return true;
|
|
}
|
|
|
|
pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean {
|
|
const t = this.tasksById.get(taskId);
|
|
if (t === undefined || t.pushChunk === null) return false;
|
|
t.pushChunk(chunk);
|
|
if (chunk.done === true) {
|
|
// For streaming tasks, also resolve the `done` promise so the route
|
|
// handler can clean up.
|
|
t.resolveNonStreaming(null, 200);
|
|
this.tasksById.delete(taskId);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
failTask(taskId: string, error: Error): boolean {
|
|
const t = this.tasksById.get(taskId);
|
|
if (t === undefined) return false;
|
|
this.tasksById.delete(taskId);
|
|
t.rejectNonStreaming(error);
|
|
return true;
|
|
}
|
|
|
|
async gcSweep(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> {
|
|
let markedInactive = 0;
|
|
let deleted = 0;
|
|
|
|
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
|
|
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
|
|
for (const row of stale) {
|
|
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
|
|
markedInactive += 1;
|
|
}
|
|
|
|
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
|
|
const expired = await this.repo.findExpiredInactives(deletionCutoff);
|
|
for (const row of expired) {
|
|
await this.repo.delete(row.id);
|
|
deleted += 1;
|
|
}
|
|
|
|
return { markedInactive, deleted };
|
|
}
|
|
}
|