feat(mcpd): VirtualLlmService + repo lifecycle helpers (v1 Stage 2)

The state machine for kind=virtual Llm rows. Wires the schema added
in Stage 1 into something that can register, heartbeat, time out,
and relay inference tasks. The HTTP routes (Stage 3) plug into this.

Repository (extends ILlmRepository):
- create/update accept kind/providerSessionId/lastHeartbeatAt/status/
  inactiveSince/type so VirtualLlmService can drive the lifecycle.
- findBySessionId(sessionId) — the reconnect lookup.
- findStaleVirtuals(cutoff) — heartbeat-stale rows for the GC sweep.
- findExpiredInactives(cutoff) — 4h-expired rows for deletion.

VirtualLlmService:
- register(): sticky-id-aware upsert. New names insert as kind=virtual/
  status=active. Existing virtual rows from the same session reactivate
  in place; existing inactive virtuals from a foreign session can be
  adopted (sticky reconnect). Refuses to overwrite a public row or a
  foreign session's still-active virtual.
- heartbeat(): bumps lastHeartbeatAt for every row owned by the
  session; revives inactive rows.
- bindSession()/unbindSession(): in-memory map of sessionId → SSE
  handle. Disconnect immediately flips owned rows to inactive AND
  rejects any in-flight tasks for that session.
- enqueueInferTask(): pushes an `infer` task frame to the SSE handle,
  returns a PendingTaskRef whose `done` resolves when the publisher
  POSTs the result back. Streaming variant exposes onChunk(cb).
- completeTask/pushTaskChunk/failTask: route-side hooks called from
  the result POST handler (lands in Stage 3).
- gcSweep(): flips heartbeat-stale active virtuals to inactive (90s
  cutoff), deletes inactives past 4h. Idempotent.

Lifecycle constants live in this file (HEARTBEAT_TIMEOUT_MS=90s,
INACTIVE_RETENTION_MS=4h) so future stages can tune in one place.

18 new mocked-repo tests cover: register variants (insert, sticky
reconnect, refuse public-overwrite, refuse foreign-session, adopt
inactive-foreign), heartbeat-revive, unbind cascade, enqueue happy
path + 503 paths (no session, inactive, public-Llm), complete/fail/
streaming chunk fan-out, GC sweep flip + delete + idempotence.

mcpd suite: 819/819 (was 801, +18). Typecheck clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-04-27 14:05:19 +01:00
parent 1acd8b58bc
commit 2215922618
3 changed files with 743 additions and 1 deletions

View File

@@ -0,0 +1,327 @@
/**
* VirtualLlmService — lifecycle for `kind=virtual` Llm rows.
*
* The story end-to-end:
* 1. mcplocal POSTs `/api/v1/llms/_provider-register` with the providers
* it wants to publish. We upsert each into the `Llm` table marked
* kind=virtual / status=active and return a stable
* `providerSessionId` to the caller.
* 2. mcplocal opens the SSE channel on `/api/v1/llms/_provider-stream`.
* `bindSession()` records the SSE handle in memory keyed by
* `providerSessionId`. Disconnect → `disconnect()` flips the rows to
* inactive immediately.
* 3. Heartbeats land on `/api/v1/llms/_provider-heartbeat` and bump
* `lastHeartbeatAt`. The 60-s GC sweep moves heartbeat-stale rows to
* inactive (catches sessions whose disconnect we missed) and deletes
* anything inactive past the 4-h cutoff.
* 4. At inference time `/api/v1/llms/:name/infer` resolves the row, sees
* kind=virtual, and asks `enqueueInferTask()` to relay through the SSE
* session. The session pumps the OpenAI body to mcplocal as a `task`
* frame and waits for the result POST on
* `/api/v1/llms/_provider-task/:taskId/result`.
*
* In v1 there's no wake-on-demand (v2) and no LB pool (v4). One open SSE
* session per `providerSessionId`; one inference at a time per task id.
*/
import type { Llm } from '@prisma/client';
import { randomUUID } from 'node:crypto';
import type { ILlmRepository } from '../repositories/llm.repository.js';
import type { OpenAiChatRequest } from './llm/types.js';
import { NotFoundError } from './mcp-server.service.js';
/** A virtual provider's announcement at registration time. */
export interface RegisterProviderInput {
name: string;
type: string;
model: string;
tier?: string;
description?: string;
extraConfig?: Record<string, unknown>;
}
export interface RegisterResult {
providerSessionId: string;
llms: Llm[];
}
/**
* In-memory handle for a live SSE session. The route owns the actual
* Fastify reply object; this interface is what the service expects from
* it. Decouples the service from Fastify so unit tests can use a stub.
*/
export interface VirtualSessionHandle {
/** Send a server-sent task frame to the publisher (`event: task`). */
pushTask(task: VirtualTaskFrame): void;
/** True iff the underlying SSE response is still writable. */
readonly alive: boolean;
}
export type VirtualTaskFrame =
| { kind: 'infer'; taskId: string; llmName: string; request: OpenAiChatRequest; streaming: boolean }
// v2 wake task lives here so the SSE protocol stays additive.
| { kind: 'wake'; taskId: string; llmName: string };
/**
* Pending inference task. The route handler awaits `done`; the result POST
* resolves it via `completeTask()`. The error path rejects via `failTask()`.
*/
interface PendingTask {
taskId: string;
sessionId: string;
llmName: string;
streaming: boolean;
resolveNonStreaming: (body: unknown, status: number) => void;
rejectNonStreaming: (err: Error) => void;
/** For streaming tasks only; null on non-streaming. */
pushChunk: ((chunk: { data: string; done?: boolean }) => void) | null;
}
const HEARTBEAT_TIMEOUT_MS = 90_000;
const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; // 4 h
export interface IVirtualLlmService {
register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult>;
heartbeat(providerSessionId: string): Promise<void>;
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void;
unbindSession(providerSessionId: string): Promise<void>;
enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean): Promise<PendingTaskRef>;
completeTask(taskId: string, result: { status: number; body: unknown }): boolean;
pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean;
failTask(taskId: string, error: Error): boolean;
gcSweep(now?: Date): Promise<{ markedInactive: number; deleted: number }>;
}
/** Returned to the route handler so it can await the result. */
export interface PendingTaskRef {
taskId: string;
/** Resolves for non-streaming tasks. Streaming tasks reject this — use the chunk callback path. */
done: Promise<{ status: number; body: unknown }>;
/** Streaming-only: subscribe to chunks. Returns an unsubscribe fn. */
onChunk(cb: (chunk: { data: string; done?: boolean }) => void): () => void;
}
export class VirtualLlmService implements IVirtualLlmService {
private readonly sessions = new Map<string, VirtualSessionHandle>();
private readonly tasksById = new Map<string, PendingTask>();
constructor(private readonly repo: ILlmRepository) {}
async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> {
const sessionId = input.providerSessionId ?? randomUUID();
const now = new Date();
const llms: Llm[] = [];
for (const p of input.providers) {
const existing = await this.repo.findByName(p.name);
if (existing === null) {
const created = await this.repo.create({
name: p.name,
type: p.type,
model: p.model,
tier: p.tier ?? 'fast',
description: p.description ?? '',
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
inactiveSince: null,
});
llms.push(created);
continue;
}
// Existing row. Only allowed to (re-)register over a virtual row owned
// by the same session, OR an inactive virtual whose session went away
// (sticky reconnect). Refuse to overwrite a public row or someone
// else's active virtual.
if (existing.kind === 'public') {
throw Object.assign(
new Error(`Cannot publish over public LLM: ${p.name}`),
{ statusCode: 409 },
);
}
if (existing.providerSessionId !== sessionId && existing.status === 'active') {
throw Object.assign(
new Error(`Virtual LLM '${p.name}' is already active under a different session`),
{ statusCode: 409 },
);
}
const updated = await this.repo.update(existing.id, {
type: p.type,
model: p.model,
...(p.tier !== undefined ? { tier: p.tier } : {}),
...(p.description !== undefined ? { description: p.description } : {}),
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
inactiveSince: null,
});
llms.push(updated);
}
return { providerSessionId: sessionId, llms };
}
async heartbeat(providerSessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(providerSessionId);
if (owned.length === 0) return;
const now = new Date();
for (const row of owned) {
// Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a
// network blip that lapsed the SSE), revive it.
await this.repo.update(row.id, {
lastHeartbeatAt: now,
...(row.status === 'inactive'
? { status: 'active', inactiveSince: null }
: {}),
});
}
}
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void {
// Replace any prior handle for this session — keeps "last writer wins"
// simple. The old SSE will have been closed by the publisher anyway.
this.sessions.set(providerSessionId, handle);
}
async unbindSession(providerSessionId: string): Promise<void> {
this.sessions.delete(providerSessionId);
// Flip every Llm owned by that session to inactive immediately.
const owned = await this.repo.findBySessionId(providerSessionId);
const now = new Date();
for (const row of owned) {
if (row.status === 'active') {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
}
}
// Reject any in-flight tasks for this session — the relay can't deliver
// a result POST anymore.
for (const t of this.tasksById.values()) {
if (t.sessionId === providerSessionId) {
this.failTask(t.taskId, new Error('publisher disconnected'));
}
}
}
async enqueueInferTask(
llmName: string,
request: OpenAiChatRequest,
streaming: boolean,
): Promise<PendingTaskRef> {
const llm = await this.repo.findByName(llmName);
if (llm === null) throw new NotFoundError(`Llm not found: ${llmName}`);
if (llm.kind !== 'virtual' || llm.providerSessionId === null) {
throw Object.assign(
new Error(`Llm '${llmName}' is not a virtual provider`),
{ statusCode: 500 },
);
}
if (llm.status !== 'active') {
throw Object.assign(
new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`),
{ statusCode: 503 },
);
}
const handle = this.sessions.get(llm.providerSessionId);
if (handle === undefined || !handle.alive) {
throw Object.assign(
new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`),
{ statusCode: 503 },
);
}
const taskId = randomUUID();
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
let resolveDone!: (v: { status: number; body: unknown }) => void;
let rejectDone!: (err: Error) => void;
const done = new Promise<{ status: number; body: unknown }>((resolve, reject) => {
resolveDone = resolve;
rejectDone = reject;
});
const pending: PendingTask = {
taskId,
sessionId: llm.providerSessionId,
llmName,
streaming,
resolveNonStreaming: (body, status) => resolveDone({ status, body }),
rejectNonStreaming: rejectDone,
pushChunk: streaming
? (chunk): void => { for (const cb of chunkSubscribers) cb(chunk); }
: null,
};
this.tasksById.set(taskId, pending);
handle.pushTask({
kind: 'infer',
taskId,
llmName,
request,
streaming,
});
return {
taskId,
done,
onChunk(cb): () => void {
chunkSubscribers.add(cb);
return () => chunkSubscribers.delete(cb);
},
};
}
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
const t = this.tasksById.get(taskId);
if (t === undefined) return false;
this.tasksById.delete(taskId);
t.resolveNonStreaming(result.body, result.status);
return true;
}
pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean {
const t = this.tasksById.get(taskId);
if (t === undefined || t.pushChunk === null) return false;
t.pushChunk(chunk);
if (chunk.done === true) {
// For streaming tasks, also resolve the `done` promise so the route
// handler can clean up.
t.resolveNonStreaming(null, 200);
this.tasksById.delete(taskId);
}
return true;
}
failTask(taskId: string, error: Error): boolean {
const t = this.tasksById.get(taskId);
if (t === undefined) return false;
this.tasksById.delete(taskId);
t.rejectNonStreaming(error);
return true;
}
async gcSweep(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> {
let markedInactive = 0;
let deleted = 0;
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
for (const row of stale) {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
markedInactive += 1;
}
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
const expired = await this.repo.findExpiredInactives(deletionCutoff);
for (const row of expired) {
await this.repo.delete(row.id);
deleted += 1;
}
return { markedInactive, deleted };
}
}