From 2215922618f3e8401a84c7d7b903157432ded5ca Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 14:05:19 +0100 Subject: [PATCH] feat(mcpd): VirtualLlmService + repo lifecycle helpers (v1 Stage 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The state machine for kind=virtual Llm rows. Wires the schema added in Stage 1 into something that can register, heartbeat, time out, and relay inference tasks. The HTTP routes (Stage 3) plug into this. Repository (extends ILlmRepository): - create/update accept kind/providerSessionId/lastHeartbeatAt/status/ inactiveSince/type so VirtualLlmService can drive the lifecycle. - findBySessionId(sessionId) — the reconnect lookup. - findStaleVirtuals(cutoff) — heartbeat-stale rows for the GC sweep. - findExpiredInactives(cutoff) — 4h-expired rows for deletion. VirtualLlmService: - register(): sticky-id-aware upsert. New names insert as kind=virtual/ status=active. Existing virtual rows from the same session reactivate in place; existing inactive virtuals from a foreign session can be adopted (sticky reconnect). Refuses to overwrite a public row or a foreign session's still-active virtual. - heartbeat(): bumps lastHeartbeatAt for every row owned by the session; revives inactive rows. - bindSession()/unbindSession(): in-memory map of sessionId → SSE handle. Disconnect immediately flips owned rows to inactive AND rejects any in-flight tasks for that session. - enqueueInferTask(): pushes an `infer` task frame to the SSE handle, returns a PendingTaskRef whose `done` resolves when the publisher POSTs the result back. Streaming variant exposes onChunk(cb). - completeTask/pushTaskChunk/failTask: route-side hooks called from the result POST handler (lands in Stage 3). - gcSweep(): flips heartbeat-stale active virtuals to inactive (90s cutoff), deletes inactives past 4h. Idempotent. Lifecycle constants live in this file (HEARTBEAT_TIMEOUT_MS=90s, INACTIVE_RETENTION_MS=4h) so future stages can tune in one place. 18 new mocked-repo tests cover: register variants (insert, sticky reconnect, refuse public-overwrite, refuse foreign-session, adopt inactive-foreign), heartbeat-revive, unbind cascade, enqueue happy path + 503 paths (no session, inactive, public-Llm), complete/fail/ streaming chunk fan-out, GC sweep flip + delete + idempotence. mcpd suite: 819/819 (was 801, +18). Typecheck clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/mcpd/src/repositories/llm.repository.ts | 70 +++- src/mcpd/src/services/virtual-llm.service.ts | 327 +++++++++++++++++ src/mcpd/tests/virtual-llm-service.test.ts | 347 +++++++++++++++++++ 3 files changed, 743 insertions(+), 1 deletion(-) create mode 100644 src/mcpd/src/services/virtual-llm.service.ts create mode 100644 src/mcpd/tests/virtual-llm-service.test.ts diff --git a/src/mcpd/src/repositories/llm.repository.ts b/src/mcpd/src/repositories/llm.repository.ts index 92f839a..06a5868 100644 --- a/src/mcpd/src/repositories/llm.repository.ts +++ b/src/mcpd/src/repositories/llm.repository.ts @@ -1,4 +1,4 @@ -import type { PrismaClient, Llm, Prisma } from '@prisma/client'; +import type { PrismaClient, Llm, Prisma, LlmKind, LlmStatus } from '@prisma/client'; export interface CreateLlmInput { name: string; @@ -10,9 +10,16 @@ export interface CreateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // Virtual-provider lifecycle (omit for kind=public). + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface UpdateLlmInput { + type?: string; model?: string; url?: string; tier?: string; @@ -20,6 +27,13 @@ export interface UpdateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // Virtual-provider lifecycle. VirtualLlmService is the only writer for + // these in v1; the public CRUD path leaves them undefined. + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface ILlmRepository { @@ -30,6 +44,10 @@ export interface ILlmRepository { create(data: CreateLlmInput): Promise; update(id: string, data: UpdateLlmInput): Promise; delete(id: string): Promise; + // Virtual-provider lifecycle helpers (called by VirtualLlmService). + findBySessionId(sessionId: string): Promise; + findStaleVirtuals(heartbeatCutoff: Date): Promise; + findExpiredInactives(deletionCutoff: Date): Promise; } export class LlmRepository implements ILlmRepository { @@ -63,12 +81,18 @@ export class LlmRepository implements ILlmRepository { apiKeySecretId: data.apiKeySecretId ?? null, apiKeySecretKey: data.apiKeySecretKey ?? null, extraConfig: (data.extraConfig ?? {}) as Prisma.InputJsonValue, + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), }, }); } async update(id: string, data: UpdateLlmInput): Promise { const updateData: Prisma.LlmUpdateInput = {}; + if (data.type !== undefined) updateData.type = data.type; if (data.model !== undefined) updateData.model = data.model; if (data.url !== undefined) updateData.url = data.url; if (data.tier !== undefined) updateData.tier = data.tier; @@ -80,10 +104,54 @@ export class LlmRepository implements ILlmRepository { } if (data.apiKeySecretKey !== undefined) updateData.apiKeySecretKey = data.apiKeySecretKey; if (data.extraConfig !== undefined) updateData.extraConfig = data.extraConfig as Prisma.InputJsonValue; + if (data.kind !== undefined) updateData.kind = data.kind; + if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; + if (data.status !== undefined) updateData.status = data.status; + if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt; + if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince; return this.prisma.llm.update({ where: { id }, data: updateData }); } async delete(id: string): Promise { await this.prisma.llm.delete({ where: { id } }); } + + // ── Virtual-provider lifecycle queries ── + + async findBySessionId(sessionId: string): Promise { + return this.prisma.llm.findMany({ + where: { providerSessionId: sessionId }, + orderBy: { name: 'asc' }, + }); + } + + /** + * Virtuals whose lastHeartbeatAt is older than the cutoff and are still + * marked active. The GC sweep flips these to `inactive`. Public rows + * never have lastHeartbeatAt set so they're naturally excluded by the + * non-null compare on `lt`. + */ + async findStaleVirtuals(heartbeatCutoff: Date): Promise { + return this.prisma.llm.findMany({ + where: { + kind: 'virtual', + status: 'active', + lastHeartbeatAt: { lt: heartbeatCutoff }, + }, + }); + } + + /** + * Virtuals that have been inactive longer than the deletion cutoff (4h + * by default). The GC sweep removes these. + */ + async findExpiredInactives(deletionCutoff: Date): Promise { + return this.prisma.llm.findMany({ + where: { + kind: 'virtual', + status: 'inactive', + inactiveSince: { lt: deletionCutoff }, + }, + }); + } } diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts new file mode 100644 index 0000000..f242843 --- /dev/null +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -0,0 +1,327 @@ +/** + * VirtualLlmService — lifecycle for `kind=virtual` Llm rows. + * + * The story end-to-end: + * 1. mcplocal POSTs `/api/v1/llms/_provider-register` with the providers + * it wants to publish. We upsert each into the `Llm` table marked + * kind=virtual / status=active and return a stable + * `providerSessionId` to the caller. + * 2. mcplocal opens the SSE channel on `/api/v1/llms/_provider-stream`. + * `bindSession()` records the SSE handle in memory keyed by + * `providerSessionId`. Disconnect → `disconnect()` flips the rows to + * inactive immediately. + * 3. Heartbeats land on `/api/v1/llms/_provider-heartbeat` and bump + * `lastHeartbeatAt`. The 60-s GC sweep moves heartbeat-stale rows to + * inactive (catches sessions whose disconnect we missed) and deletes + * anything inactive past the 4-h cutoff. + * 4. At inference time `/api/v1/llms/:name/infer` resolves the row, sees + * kind=virtual, and asks `enqueueInferTask()` to relay through the SSE + * session. The session pumps the OpenAI body to mcplocal as a `task` + * frame and waits for the result POST on + * `/api/v1/llms/_provider-task/:taskId/result`. + * + * In v1 there's no wake-on-demand (v2) and no LB pool (v4). One open SSE + * session per `providerSessionId`; one inference at a time per task id. + */ +import type { Llm } from '@prisma/client'; +import { randomUUID } from 'node:crypto'; +import type { ILlmRepository } from '../repositories/llm.repository.js'; +import type { OpenAiChatRequest } from './llm/types.js'; +import { NotFoundError } from './mcp-server.service.js'; + +/** A virtual provider's announcement at registration time. */ +export interface RegisterProviderInput { + name: string; + type: string; + model: string; + tier?: string; + description?: string; + extraConfig?: Record; +} + +export interface RegisterResult { + providerSessionId: string; + llms: Llm[]; +} + +/** + * In-memory handle for a live SSE session. The route owns the actual + * Fastify reply object; this interface is what the service expects from + * it. Decouples the service from Fastify so unit tests can use a stub. + */ +export interface VirtualSessionHandle { + /** Send a server-sent task frame to the publisher (`event: task`). */ + pushTask(task: VirtualTaskFrame): void; + /** True iff the underlying SSE response is still writable. */ + readonly alive: boolean; +} + +export type VirtualTaskFrame = + | { kind: 'infer'; taskId: string; llmName: string; request: OpenAiChatRequest; streaming: boolean } + // v2 wake task lives here so the SSE protocol stays additive. + | { kind: 'wake'; taskId: string; llmName: string }; + +/** + * Pending inference task. The route handler awaits `done`; the result POST + * resolves it via `completeTask()`. The error path rejects via `failTask()`. + */ +interface PendingTask { + taskId: string; + sessionId: string; + llmName: string; + streaming: boolean; + resolveNonStreaming: (body: unknown, status: number) => void; + rejectNonStreaming: (err: Error) => void; + /** For streaming tasks only; null on non-streaming. */ + pushChunk: ((chunk: { data: string; done?: boolean }) => void) | null; +} + +const HEARTBEAT_TIMEOUT_MS = 90_000; +const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; // 4 h + +export interface IVirtualLlmService { + register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise; + heartbeat(providerSessionId: string): Promise; + bindSession(providerSessionId: string, handle: VirtualSessionHandle): void; + unbindSession(providerSessionId: string): Promise; + enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean): Promise; + completeTask(taskId: string, result: { status: number; body: unknown }): boolean; + pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean; + failTask(taskId: string, error: Error): boolean; + gcSweep(now?: Date): Promise<{ markedInactive: number; deleted: number }>; +} + +/** Returned to the route handler so it can await the result. */ +export interface PendingTaskRef { + taskId: string; + /** Resolves for non-streaming tasks. Streaming tasks reject this — use the chunk callback path. */ + done: Promise<{ status: number; body: unknown }>; + /** Streaming-only: subscribe to chunks. Returns an unsubscribe fn. */ + onChunk(cb: (chunk: { data: string; done?: boolean }) => void): () => void; +} + +export class VirtualLlmService implements IVirtualLlmService { + private readonly sessions = new Map(); + private readonly tasksById = new Map(); + + constructor(private readonly repo: ILlmRepository) {} + + async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise { + const sessionId = input.providerSessionId ?? randomUUID(); + const now = new Date(); + const llms: Llm[] = []; + + for (const p of input.providers) { + const existing = await this.repo.findByName(p.name); + if (existing === null) { + const created = await this.repo.create({ + name: p.name, + type: p.type, + model: p.model, + tier: p.tier ?? 'fast', + description: p.description ?? '', + ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + llms.push(created); + continue; + } + + // Existing row. Only allowed to (re-)register over a virtual row owned + // by the same session, OR an inactive virtual whose session went away + // (sticky reconnect). Refuse to overwrite a public row or someone + // else's active virtual. + if (existing.kind === 'public') { + throw Object.assign( + new Error(`Cannot publish over public LLM: ${p.name}`), + { statusCode: 409 }, + ); + } + if (existing.providerSessionId !== sessionId && existing.status === 'active') { + throw Object.assign( + new Error(`Virtual LLM '${p.name}' is already active under a different session`), + { statusCode: 409 }, + ); + } + + const updated = await this.repo.update(existing.id, { + type: p.type, + model: p.model, + ...(p.tier !== undefined ? { tier: p.tier } : {}), + ...(p.description !== undefined ? { description: p.description } : {}), + ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + llms.push(updated); + } + + return { providerSessionId: sessionId, llms }; + } + + async heartbeat(providerSessionId: string): Promise { + const owned = await this.repo.findBySessionId(providerSessionId); + if (owned.length === 0) return; + const now = new Date(); + for (const row of owned) { + // Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a + // network blip that lapsed the SSE), revive it. + await this.repo.update(row.id, { + lastHeartbeatAt: now, + ...(row.status === 'inactive' + ? { status: 'active', inactiveSince: null } + : {}), + }); + } + } + + bindSession(providerSessionId: string, handle: VirtualSessionHandle): void { + // Replace any prior handle for this session — keeps "last writer wins" + // simple. The old SSE will have been closed by the publisher anyway. + this.sessions.set(providerSessionId, handle); + } + + async unbindSession(providerSessionId: string): Promise { + this.sessions.delete(providerSessionId); + // Flip every Llm owned by that session to inactive immediately. + const owned = await this.repo.findBySessionId(providerSessionId); + const now = new Date(); + for (const row of owned) { + if (row.status === 'active') { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + } + } + // Reject any in-flight tasks for this session — the relay can't deliver + // a result POST anymore. + for (const t of this.tasksById.values()) { + if (t.sessionId === providerSessionId) { + this.failTask(t.taskId, new Error('publisher disconnected')); + } + } + } + + async enqueueInferTask( + llmName: string, + request: OpenAiChatRequest, + streaming: boolean, + ): Promise { + const llm = await this.repo.findByName(llmName); + if (llm === null) throw new NotFoundError(`Llm not found: ${llmName}`); + if (llm.kind !== 'virtual' || llm.providerSessionId === null) { + throw Object.assign( + new Error(`Llm '${llmName}' is not a virtual provider`), + { statusCode: 500 }, + ); + } + if (llm.status !== 'active') { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`), + { statusCode: 503 }, + ); + } + const handle = this.sessions.get(llm.providerSessionId); + if (handle === undefined || !handle.alive) { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`), + { statusCode: 503 }, + ); + } + + const taskId = randomUUID(); + const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>(); + + let resolveDone!: (v: { status: number; body: unknown }) => void; + let rejectDone!: (err: Error) => void; + const done = new Promise<{ status: number; body: unknown }>((resolve, reject) => { + resolveDone = resolve; + rejectDone = reject; + }); + + const pending: PendingTask = { + taskId, + sessionId: llm.providerSessionId, + llmName, + streaming, + resolveNonStreaming: (body, status) => resolveDone({ status, body }), + rejectNonStreaming: rejectDone, + pushChunk: streaming + ? (chunk): void => { for (const cb of chunkSubscribers) cb(chunk); } + : null, + }; + this.tasksById.set(taskId, pending); + + handle.pushTask({ + kind: 'infer', + taskId, + llmName, + request, + streaming, + }); + + return { + taskId, + done, + onChunk(cb): () => void { + chunkSubscribers.add(cb); + return () => chunkSubscribers.delete(cb); + }, + }; + } + + completeTask(taskId: string, result: { status: number; body: unknown }): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined) return false; + this.tasksById.delete(taskId); + t.resolveNonStreaming(result.body, result.status); + return true; + } + + pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined || t.pushChunk === null) return false; + t.pushChunk(chunk); + if (chunk.done === true) { + // For streaming tasks, also resolve the `done` promise so the route + // handler can clean up. + t.resolveNonStreaming(null, 200); + this.tasksById.delete(taskId); + } + return true; + } + + failTask(taskId: string, error: Error): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined) return false; + this.tasksById.delete(taskId); + t.rejectNonStreaming(error); + return true; + } + + async gcSweep(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> { + let markedInactive = 0; + let deleted = 0; + + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); + const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); + for (const row of stale) { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + markedInactive += 1; + } + + const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); + const expired = await this.repo.findExpiredInactives(deletionCutoff); + for (const row of expired) { + await this.repo.delete(row.id); + deleted += 1; + } + + return { markedInactive, deleted }; + } +} diff --git a/src/mcpd/tests/virtual-llm-service.test.ts b/src/mcpd/tests/virtual-llm-service.test.ts new file mode 100644 index 0000000..2459dcc --- /dev/null +++ b/src/mcpd/tests/virtual-llm-service.test.ts @@ -0,0 +1,347 @@ +import { describe, it, expect, vi } from 'vitest'; +import { VirtualLlmService, type VirtualSessionHandle } from '../src/services/virtual-llm.service.js'; +import type { ILlmRepository } from '../src/repositories/llm.repository.js'; +import type { Llm } from '@prisma/client'; + +function makeLlm(overrides: Partial = {}): Llm { + return { + id: `llm-${Math.random().toString(36).slice(2, 8)}`, + name: 'vllm-local', + type: 'openai', + model: 'm', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {} as Llm['extraConfig'], + kind: 'virtual', + providerSessionId: 's-1', + lastHeartbeatAt: new Date(), + status: 'active', + inactiveSince: null, + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + }; +} + +function mockRepo(initial: Llm[] = []): ILlmRepository { + const rows = new Map(initial.map((l) => [l.id, l])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const l of rows.values()) if (l.name === name) return l; + return null; + }), + findByTier: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((l) => l.providerSessionId === sid)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((l) => + l.kind === 'virtual' + && l.status === 'active' + && l.lastHeartbeatAt !== null + && l.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((l) => + l.kind === 'virtual' + && l.status === 'inactive' + && l.inactiveSince !== null + && l.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeLlm({ + id: `llm-${String(counter)}`, + name: data.name, + type: data.type, + model: data.model, + url: data.url ?? '', + tier: data.tier ?? 'fast', + description: data.description ?? '', + kind: data.kind ?? 'public', + providerSessionId: data.providerSessionId ?? null, + status: data.status ?? 'active', + lastHeartbeatAt: data.lastHeartbeatAt ?? null, + inactiveSince: data.inactiveSince ?? null, + }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Llm = { + ...existing, + ...(data.type !== undefined ? { type: data.type } : {}), + ...(data.model !== undefined ? { model: data.model } : {}), + ...(data.tier !== undefined ? { tier: data.tier } : {}), + ...(data.description !== undefined ? { description: data.description } : {}), + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), + }; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; +} + +function fakeSession(): VirtualSessionHandle & { tasks: Array; alive: boolean } { + const tasks: unknown[] = []; + return { + tasks, + alive: true, + pushTask(t) { tasks.push(t); }, + }; +} + +describe('VirtualLlmService', () => { + it('register inserts new virtual rows with active status + sessionId', async () => { + const repo = mockRepo(); + const svc = new VirtualLlmService(repo); + const { providerSessionId, llms } = await svc.register({ + providerSessionId: null, + providers: [ + { name: 'vllm-local', type: 'openai', model: 'Qwen/Qwen2.5-7B-Instruct-AWQ', tier: 'fast' }, + ], + }); + expect(providerSessionId).toMatch(/^[0-9a-f-]{36}$/); + expect(llms).toHaveLength(1); + expect(llms[0]!.kind).toBe('virtual'); + expect(llms[0]!.status).toBe('active'); + expect(llms[0]!.providerSessionId).toBe(providerSessionId); + expect(llms[0]!.lastHeartbeatAt).not.toBeNull(); + }); + + it('register reuses the same row on sticky reconnect (same name + sessionId)', async () => { + const repo = mockRepo(); + const svc = new VirtualLlmService(repo); + const first = await svc.register({ + providerSessionId: 'fixed-id', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + }); + expect(first.llms[0]!.id).toMatch(/^llm-/); + const firstId = first.llms[0]!.id; + + const second = await svc.register({ + providerSessionId: 'fixed-id', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm-updated' }], + }); + expect(second.llms[0]!.id).toBe(firstId); + expect(second.llms[0]!.model).toBe('m-updated'); + }); + + it('register refuses to overwrite a public LLM with the same name', async () => { + const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); + const svc = new VirtualLlmService(repo); + await expect(svc.register({ + providerSessionId: 'sess-x', + providers: [{ name: 'qwen3-thinking', type: 'openai', model: 'm' }], + })).rejects.toThrow(/Cannot publish over public/); + }); + + it('register refuses if another active session owns the name', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'other', status: 'active' })]); + const svc = new VirtualLlmService(repo); + await expect(svc.register({ + providerSessionId: 'mine', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + })).rejects.toThrow(/already active under a different session/); + }); + + it('register adopts an inactive virtual row from a different session (sticky reconnect after lapse)', async () => { + const repo = mockRepo([makeLlm({ + name: 'vllm-local', providerSessionId: 'old-session', + status: 'inactive', inactiveSince: new Date(), + })]); + const svc = new VirtualLlmService(repo); + const { llms } = await svc.register({ + providerSessionId: 'new-session', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + }); + expect(llms[0]!.providerSessionId).toBe('new-session'); + expect(llms[0]!.status).toBe('active'); + expect(llms[0]!.inactiveSince).toBeNull(); + }); + + it('heartbeat bumps lastHeartbeatAt + revives an inactive row', async () => { + const past = new Date(Date.now() - 5_000); + const repo = mockRepo([makeLlm({ + name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', + lastHeartbeatAt: past, inactiveSince: past, + })]); + const svc = new VirtualLlmService(repo); + await svc.heartbeat('sess'); + const row = await repo.findByName('vllm-local'); + expect(row?.status).toBe('active'); + expect(row?.inactiveSince).toBeNull(); + expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); + + it('unbindSession flips all owned rows to inactive immediately', async () => { + const repo = mockRepo([ + makeLlm({ name: 'a', providerSessionId: 'sess' }), + makeLlm({ name: 'b', providerSessionId: 'sess' }), + makeLlm({ name: 'c', providerSessionId: 'other' }), + ]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + await svc.unbindSession('sess'); + expect((await repo.findByName('a'))?.status).toBe('inactive'); + expect((await repo.findByName('b'))?.status).toBe('inactive'); + expect((await repo.findByName('c'))?.status).toBe('active'); + }); + + it('enqueueInferTask pushes a task frame to the SSE session', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + const session = fakeSession(); + svc.bindSession('sess', session); + + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(session.tasks).toHaveLength(1); + const t = session.tasks[0] as { kind: string; taskId: string; llmName: string; streaming: boolean }; + expect(t.kind).toBe('infer'); + expect(t.taskId).toBe(ref.taskId); + expect(t.llmName).toBe('vllm-local'); + expect(t.streaming).toBe(false); + }); + + it('enqueueInferTask rejects when the publisher is offline (no session bound)', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + await expect( + svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/no live SSE session|publisher offline/); + }); + + it('enqueueInferTask rejects when the row is inactive', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + await expect( + svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/inactive|publisher offline/); + }); + + it('enqueueInferTask rejects when the LLM is public (not virtual)', async () => { + const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); + const svc = new VirtualLlmService(repo); + await expect( + svc.enqueueInferTask('qwen3-thinking', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/not a virtual provider/); + }); + + it('completeTask resolves the pending non-streaming promise', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(svc.completeTask(ref.taskId, { status: 200, body: { ok: true } })).toBe(true); + await expect(ref.done).resolves.toEqual({ status: 200, body: { ok: true } }); + }); + + it('streaming: pushTaskChunk fans chunks to subscribers; done resolves the ref', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }], stream: true }, + true, + ); + const got: Array<{ data: string; done?: boolean }> = []; + ref.onChunk((c) => got.push(c)); + + expect(svc.pushTaskChunk(ref.taskId, { data: 'hello' })).toBe(true); + expect(svc.pushTaskChunk(ref.taskId, { data: ' world' })).toBe(true); + expect(svc.pushTaskChunk(ref.taskId, { data: '[DONE]', done: true })).toBe(true); + + expect(got.map((c) => c.data)).toEqual(['hello', ' world', '[DONE]']); + await expect(ref.done).resolves.toMatchObject({ status: 200 }); + }); + + it('failTask rejects the pending promise with a clear error', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(svc.failTask(ref.taskId, new Error('upstream blew up'))).toBe(true); + await expect(ref.done).rejects.toThrow(/upstream blew up/); + }); + + it('unbindSession rejects in-flight tasks for that session', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + await svc.unbindSession('sess'); + await expect(ref.done).rejects.toThrow(/publisher disconnected/); + }); + + it('gcSweep flips heartbeat-stale active virtuals to inactive', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago — past the 90-s cutoff + const recent = new Date(Date.now() - 30 * 1000); // 30 s ago — within the cutoff + const repo = mockRepo([ + makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + makeLlm({ name: 'fresh', providerSessionId: 'b', status: 'active', lastHeartbeatAt: recent }), + ]); + const svc = new VirtualLlmService(repo); + const result = await svc.gcSweep(); + expect(result.markedInactive).toBe(1); + expect((await repo.findByName('stale'))?.status).toBe('inactive'); + expect((await repo.findByName('fresh'))?.status).toBe('active'); + }); + + it('gcSweep deletes virtuals inactive past the 4h retention window', async () => { + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago + const fresh = new Date(Date.now() - 1 * 60 * 60 * 1000); // 1 h ago + const repo = mockRepo([ + makeLlm({ name: 'old', providerSessionId: 'a', status: 'inactive', inactiveSince: ancient }), + makeLlm({ name: 'recent', providerSessionId: 'b', status: 'inactive', inactiveSince: fresh }), + makeLlm({ name: 'public-survivor', providerSessionId: null, kind: 'public' }), + ]); + const svc = new VirtualLlmService(repo); + const result = await svc.gcSweep(); + expect(result.deleted).toBe(1); + expect(await repo.findByName('old')).toBeNull(); + expect(await repo.findByName('recent')).not.toBeNull(); + expect(await repo.findByName('public-survivor')).not.toBeNull(); + }); + + it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); + const repo = mockRepo([ + makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + ]); + const svc = new VirtualLlmService(repo); + const first = await svc.gcSweep(); + const second = await svc.gcSweep(); + expect(first.markedInactive).toBe(1); + expect(second.markedInactive).toBe(0); + expect(second.deleted).toBe(0); + }); +});