From 1acd8b58bc40a9c0cfae838496cdd291bac93000 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 13:59:44 +0100 Subject: [PATCH 1/6] feat(db): Llm.kind discriminator + virtual-provider lifecycle (v1 Stage 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First step of the virtual-LLM feature. A virtual Llm row is one that gets *registered by an mcplocal client* rather than created via \`mcpctl create llm\`. Its inference is relayed back through an SSE control channel to the publishing session (mcpd routes added in Stage 3). The lifecycle fields below let mcpd reap stale rows when the publisher goes away. Schema additions: - enum LlmKind (public | virtual). Default public. - enum LlmStatus (active | inactive | hibernating). Default active. hibernating is reserved for v2 wake-on-demand. - Llm.kind, providerSessionId, lastHeartbeatAt, status, inactiveSince. - @@index([kind, status]) for the GC sweep. - @@index([providerSessionId]) for the reconnect lookup. All existing rows backfill with kind=public/status=active so v1 is purely additive — public LLMs ignore the lifecycle columns entirely. 7 new prisma-level assertions in tests/llm-virtual-schema.test.ts cover: defaults, persisting kind=virtual + lifecycle together, the active→inactive flip, hibernating value, enum rejection, the (kind,status) GC index, the providerSessionId reconnect index. mcpd suite still 801/801 (regenerated client) and typecheck clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migration.sql | 16 +++ src/db/prisma/schema.prisma | 51 +++++-- src/db/tests/llm-virtual-schema.test.ts | 136 ++++++++++++++++++ 3 files changed, 190 insertions(+), 13 deletions(-) create mode 100644 src/db/prisma/migrations/20260427125811_add_virtual_llm_lifecycle/migration.sql create mode 100644 src/db/tests/llm-virtual-schema.test.ts diff --git a/src/db/prisma/migrations/20260427125811_add_virtual_llm_lifecycle/migration.sql b/src/db/prisma/migrations/20260427125811_add_virtual_llm_lifecycle/migration.sql new file mode 100644 index 0000000..826398d --- /dev/null +++ b/src/db/prisma/migrations/20260427125811_add_virtual_llm_lifecycle/migration.sql @@ -0,0 +1,16 @@ +-- Add Llm.kind/status discriminators and virtual-provider lifecycle fields. +-- Existing rows backfill with kind='public' / status='active' so v1 is purely +-- additive — public LLMs ignore the lifecycle columns entirely. + +CREATE TYPE "LlmKind" AS ENUM ('public', 'virtual'); +CREATE TYPE "LlmStatus" AS ENUM ('active', 'inactive', 'hibernating'); + +ALTER TABLE "Llm" + ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public', + ADD COLUMN "providerSessionId" TEXT, + ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3), + ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active', + ADD COLUMN "inactiveSince" TIMESTAMP(3); + +CREATE INDEX "Llm_kind_status_idx" ON "Llm"("kind", "status"); +CREATE INDEX "Llm_providerSessionId_idx" ON "Llm"("providerSessionId"); diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index a52f1ae..dfc83e5 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -182,21 +182,44 @@ model Secret { // provider API key server-side so credentials never leave the cluster. // Credentials are stored by reference: `apiKeySecret` points at a Secret, and // `apiKeySecretKey` names the key within that secret's data. +// +// `kind=virtual` rows are *registered by an mcplocal client* (rather than a +// human via `mcpctl create llm`). Their inference is relayed back through +// the SSE control channel to the publishing mcplocal session. The lifecycle +// fields (lastHeartbeatAt, status, inactiveSince) belong to virtual rows; +// public rows ignore them. + +enum LlmKind { + public // upstream-URL row, mcpd calls directly + virtual // mcplocal-registered, inference relayed via SSE control channel +} + +enum LlmStatus { + active // healthy, accepting requests + inactive // publisher went away; row pending 4-h GC + hibernating // publisher present but backend asleep — wakes on demand (v2) +} model Llm { - id String @id @default(cuid()) - name String @unique - type String // anthropic | openai | deepseek | vllm | ollama | gemini-cli - model String // e.g. claude-3-5-sonnet-20241022 - url String @default("") // endpoint (empty for provider default) - tier String @default("fast") // fast | heavy - description String @default("") - apiKeySecretId String? // FK to Secret - apiKeySecretKey String? // key inside the Secret's data - extraConfig Json @default("{}") // per-type extras - version Int @default(1) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + id String @id @default(cuid()) + name String @unique + type String // anthropic | openai | deepseek | vllm | ollama | gemini-cli + model String // e.g. claude-3-5-sonnet-20241022 + url String @default("") // endpoint (empty for provider default) + tier String @default("fast") // fast | heavy + description String @default("") + apiKeySecretId String? // FK to Secret + apiKeySecretKey String? // key inside the Secret's data + extraConfig Json @default("{}") // per-type extras + // ── Virtual-provider lifecycle (NULL/default for kind=public) ── + kind LlmKind @default(public) + providerSessionId String? // mcplocal session that owns this row when virtual + lastHeartbeatAt DateTime? // bumped on every publisher heartbeat + status LlmStatus @default(active) + inactiveSince DateTime? // when status flipped from active; used for 4-h GC + version Int @default(1) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt apiKeySecret Secret? @relation(fields: [apiKeySecretId], references: [id], onDelete: SetNull) agents Agent[] @@ -204,6 +227,8 @@ model Llm { @@index([name]) @@index([tier]) @@index([apiKeySecretId]) + @@index([kind, status]) + @@index([providerSessionId]) } // ── Groups ── diff --git a/src/db/tests/llm-virtual-schema.test.ts b/src/db/tests/llm-virtual-schema.test.ts new file mode 100644 index 0000000..12fdfde --- /dev/null +++ b/src/db/tests/llm-virtual-schema.test.ts @@ -0,0 +1,136 @@ +import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest'; +import type { PrismaClient } from '@prisma/client'; +import { setupTestDb, cleanupTestDb, clearAllTables } from './helpers.js'; + +describe('llm virtual-provider schema', () => { + let prisma: PrismaClient; + + beforeAll(async () => { + prisma = await setupTestDb(); + }, 30_000); + + afterAll(async () => { + await cleanupTestDb(); + }); + + beforeEach(async () => { + await clearAllTables(prisma); + }); + + it('defaults a freshly inserted Llm to kind=public, status=active', async () => { + const llm = await prisma.llm.create({ + data: { name: 'plain', type: 'openai', model: 'gpt-4o' }, + }); + expect(llm.kind).toBe('public'); + expect(llm.status).toBe('active'); + expect(llm.providerSessionId).toBeNull(); + expect(llm.lastHeartbeatAt).toBeNull(); + expect(llm.inactiveSince).toBeNull(); + }); + + it('persists kind=virtual + lifecycle fields together', async () => { + const now = new Date(); + const llm = await prisma.llm.create({ + data: { + name: 'vllm-local', + type: 'openai', + model: 'Qwen/Qwen2.5-7B-Instruct-AWQ', + kind: 'virtual', + providerSessionId: 'sess-abc', + lastHeartbeatAt: now, + status: 'active', + }, + }); + expect(llm.kind).toBe('virtual'); + expect(llm.providerSessionId).toBe('sess-abc'); + expect(llm.lastHeartbeatAt?.getTime()).toBe(now.getTime()); + expect(llm.status).toBe('active'); + }); + + it('flips status active → inactive and records inactiveSince', async () => { + const llm = await prisma.llm.create({ + data: { + name: 'goingaway', + type: 'openai', + model: 'm', + kind: 'virtual', + providerSessionId: 's1', + }, + }); + const flippedAt = new Date(); + await prisma.llm.update({ + where: { id: llm.id }, + data: { status: 'inactive', inactiveSince: flippedAt }, + }); + const reloaded = await prisma.llm.findUnique({ where: { id: llm.id } }); + expect(reloaded?.status).toBe('inactive'); + expect(reloaded?.inactiveSince?.getTime()).toBe(flippedAt.getTime()); + }); + + it('hibernating is a valid LlmStatus value (reserved for v2 wake path)', async () => { + const llm = await prisma.llm.create({ + data: { + name: 'sleepy', + type: 'openai', + model: 'm', + kind: 'virtual', + providerSessionId: 's-sleep', + status: 'hibernating', + }, + }); + expect(llm.status).toBe('hibernating'); + }); + + it('rejects unknown enum values for kind / status', async () => { + await expect( + prisma.llm.create({ + // Cast through unknown — runtime test of the enum constraint, not TS. + data: ({ name: 'bad', type: 'openai', model: 'm', kind: 'made-up' } as unknown) as Parameters[0]['data'], + }), + ).rejects.toThrow(); + + await expect( + prisma.llm.create({ + data: ({ name: 'bad2', type: 'openai', model: 'm', status: 'unknown' } as unknown) as Parameters[0]['data'], + }), + ).rejects.toThrow(); + }); + + it('finds virtual rows by (kind, status) cheaply', async () => { + // Mix of public + virtual + assorted statuses — confirms the + // @@index([kind, status]) covers the GC sweep query. + await prisma.llm.create({ data: { name: 'pub-1', type: 'openai', model: 'm' } }); + await prisma.llm.create({ data: { name: 'pub-2', type: 'openai', model: 'm' } }); + await prisma.llm.create({ + data: { name: 'v-1', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 's1', status: 'active' }, + }); + await prisma.llm.create({ + data: { name: 'v-2', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() }, + }); + + const stale = await prisma.llm.findMany({ + where: { kind: 'virtual', status: 'inactive' }, + select: { name: true }, + }); + expect(stale.map((l) => l.name)).toEqual(['v-2']); + }); + + it('finds rows by providerSessionId (used on mcplocal reconnect)', async () => { + await prisma.llm.create({ + data: { name: 'a', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.llm.create({ + data: { name: 'b', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.llm.create({ + data: { name: 'c', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 'other' }, + }); + + const owned = await prisma.llm.findMany({ + where: { providerSessionId: 'shared' }, + select: { name: true }, + orderBy: { name: 'asc' }, + }); + expect(owned.map((l) => l.name)).toEqual(['a', 'b']); + }); +}); -- 2.49.1 From 2215922618f3e8401a84c7d7b903157432ded5ca Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 14:05:19 +0100 Subject: [PATCH 2/6] feat(mcpd): VirtualLlmService + repo lifecycle helpers (v1 Stage 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The state machine for kind=virtual Llm rows. Wires the schema added in Stage 1 into something that can register, heartbeat, time out, and relay inference tasks. The HTTP routes (Stage 3) plug into this. Repository (extends ILlmRepository): - create/update accept kind/providerSessionId/lastHeartbeatAt/status/ inactiveSince/type so VirtualLlmService can drive the lifecycle. - findBySessionId(sessionId) — the reconnect lookup. - findStaleVirtuals(cutoff) — heartbeat-stale rows for the GC sweep. - findExpiredInactives(cutoff) — 4h-expired rows for deletion. VirtualLlmService: - register(): sticky-id-aware upsert. New names insert as kind=virtual/ status=active. Existing virtual rows from the same session reactivate in place; existing inactive virtuals from a foreign session can be adopted (sticky reconnect). Refuses to overwrite a public row or a foreign session's still-active virtual. - heartbeat(): bumps lastHeartbeatAt for every row owned by the session; revives inactive rows. - bindSession()/unbindSession(): in-memory map of sessionId → SSE handle. Disconnect immediately flips owned rows to inactive AND rejects any in-flight tasks for that session. - enqueueInferTask(): pushes an `infer` task frame to the SSE handle, returns a PendingTaskRef whose `done` resolves when the publisher POSTs the result back. Streaming variant exposes onChunk(cb). - completeTask/pushTaskChunk/failTask: route-side hooks called from the result POST handler (lands in Stage 3). - gcSweep(): flips heartbeat-stale active virtuals to inactive (90s cutoff), deletes inactives past 4h. Idempotent. Lifecycle constants live in this file (HEARTBEAT_TIMEOUT_MS=90s, INACTIVE_RETENTION_MS=4h) so future stages can tune in one place. 18 new mocked-repo tests cover: register variants (insert, sticky reconnect, refuse public-overwrite, refuse foreign-session, adopt inactive-foreign), heartbeat-revive, unbind cascade, enqueue happy path + 503 paths (no session, inactive, public-Llm), complete/fail/ streaming chunk fan-out, GC sweep flip + delete + idempotence. mcpd suite: 819/819 (was 801, +18). Typecheck clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/mcpd/src/repositories/llm.repository.ts | 70 +++- src/mcpd/src/services/virtual-llm.service.ts | 327 +++++++++++++++++ src/mcpd/tests/virtual-llm-service.test.ts | 347 +++++++++++++++++++ 3 files changed, 743 insertions(+), 1 deletion(-) create mode 100644 src/mcpd/src/services/virtual-llm.service.ts create mode 100644 src/mcpd/tests/virtual-llm-service.test.ts diff --git a/src/mcpd/src/repositories/llm.repository.ts b/src/mcpd/src/repositories/llm.repository.ts index 92f839a..06a5868 100644 --- a/src/mcpd/src/repositories/llm.repository.ts +++ b/src/mcpd/src/repositories/llm.repository.ts @@ -1,4 +1,4 @@ -import type { PrismaClient, Llm, Prisma } from '@prisma/client'; +import type { PrismaClient, Llm, Prisma, LlmKind, LlmStatus } from '@prisma/client'; export interface CreateLlmInput { name: string; @@ -10,9 +10,16 @@ export interface CreateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // Virtual-provider lifecycle (omit for kind=public). + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface UpdateLlmInput { + type?: string; model?: string; url?: string; tier?: string; @@ -20,6 +27,13 @@ export interface UpdateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // Virtual-provider lifecycle. VirtualLlmService is the only writer for + // these in v1; the public CRUD path leaves them undefined. + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface ILlmRepository { @@ -30,6 +44,10 @@ export interface ILlmRepository { create(data: CreateLlmInput): Promise; update(id: string, data: UpdateLlmInput): Promise; delete(id: string): Promise; + // Virtual-provider lifecycle helpers (called by VirtualLlmService). + findBySessionId(sessionId: string): Promise; + findStaleVirtuals(heartbeatCutoff: Date): Promise; + findExpiredInactives(deletionCutoff: Date): Promise; } export class LlmRepository implements ILlmRepository { @@ -63,12 +81,18 @@ export class LlmRepository implements ILlmRepository { apiKeySecretId: data.apiKeySecretId ?? null, apiKeySecretKey: data.apiKeySecretKey ?? null, extraConfig: (data.extraConfig ?? {}) as Prisma.InputJsonValue, + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), }, }); } async update(id: string, data: UpdateLlmInput): Promise { const updateData: Prisma.LlmUpdateInput = {}; + if (data.type !== undefined) updateData.type = data.type; if (data.model !== undefined) updateData.model = data.model; if (data.url !== undefined) updateData.url = data.url; if (data.tier !== undefined) updateData.tier = data.tier; @@ -80,10 +104,54 @@ export class LlmRepository implements ILlmRepository { } if (data.apiKeySecretKey !== undefined) updateData.apiKeySecretKey = data.apiKeySecretKey; if (data.extraConfig !== undefined) updateData.extraConfig = data.extraConfig as Prisma.InputJsonValue; + if (data.kind !== undefined) updateData.kind = data.kind; + if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; + if (data.status !== undefined) updateData.status = data.status; + if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt; + if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince; return this.prisma.llm.update({ where: { id }, data: updateData }); } async delete(id: string): Promise { await this.prisma.llm.delete({ where: { id } }); } + + // ── Virtual-provider lifecycle queries ── + + async findBySessionId(sessionId: string): Promise { + return this.prisma.llm.findMany({ + where: { providerSessionId: sessionId }, + orderBy: { name: 'asc' }, + }); + } + + /** + * Virtuals whose lastHeartbeatAt is older than the cutoff and are still + * marked active. The GC sweep flips these to `inactive`. Public rows + * never have lastHeartbeatAt set so they're naturally excluded by the + * non-null compare on `lt`. + */ + async findStaleVirtuals(heartbeatCutoff: Date): Promise { + return this.prisma.llm.findMany({ + where: { + kind: 'virtual', + status: 'active', + lastHeartbeatAt: { lt: heartbeatCutoff }, + }, + }); + } + + /** + * Virtuals that have been inactive longer than the deletion cutoff (4h + * by default). The GC sweep removes these. + */ + async findExpiredInactives(deletionCutoff: Date): Promise { + return this.prisma.llm.findMany({ + where: { + kind: 'virtual', + status: 'inactive', + inactiveSince: { lt: deletionCutoff }, + }, + }); + } } diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts new file mode 100644 index 0000000..f242843 --- /dev/null +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -0,0 +1,327 @@ +/** + * VirtualLlmService — lifecycle for `kind=virtual` Llm rows. + * + * The story end-to-end: + * 1. mcplocal POSTs `/api/v1/llms/_provider-register` with the providers + * it wants to publish. We upsert each into the `Llm` table marked + * kind=virtual / status=active and return a stable + * `providerSessionId` to the caller. + * 2. mcplocal opens the SSE channel on `/api/v1/llms/_provider-stream`. + * `bindSession()` records the SSE handle in memory keyed by + * `providerSessionId`. Disconnect → `disconnect()` flips the rows to + * inactive immediately. + * 3. Heartbeats land on `/api/v1/llms/_provider-heartbeat` and bump + * `lastHeartbeatAt`. The 60-s GC sweep moves heartbeat-stale rows to + * inactive (catches sessions whose disconnect we missed) and deletes + * anything inactive past the 4-h cutoff. + * 4. At inference time `/api/v1/llms/:name/infer` resolves the row, sees + * kind=virtual, and asks `enqueueInferTask()` to relay through the SSE + * session. The session pumps the OpenAI body to mcplocal as a `task` + * frame and waits for the result POST on + * `/api/v1/llms/_provider-task/:taskId/result`. + * + * In v1 there's no wake-on-demand (v2) and no LB pool (v4). One open SSE + * session per `providerSessionId`; one inference at a time per task id. + */ +import type { Llm } from '@prisma/client'; +import { randomUUID } from 'node:crypto'; +import type { ILlmRepository } from '../repositories/llm.repository.js'; +import type { OpenAiChatRequest } from './llm/types.js'; +import { NotFoundError } from './mcp-server.service.js'; + +/** A virtual provider's announcement at registration time. */ +export interface RegisterProviderInput { + name: string; + type: string; + model: string; + tier?: string; + description?: string; + extraConfig?: Record; +} + +export interface RegisterResult { + providerSessionId: string; + llms: Llm[]; +} + +/** + * In-memory handle for a live SSE session. The route owns the actual + * Fastify reply object; this interface is what the service expects from + * it. Decouples the service from Fastify so unit tests can use a stub. + */ +export interface VirtualSessionHandle { + /** Send a server-sent task frame to the publisher (`event: task`). */ + pushTask(task: VirtualTaskFrame): void; + /** True iff the underlying SSE response is still writable. */ + readonly alive: boolean; +} + +export type VirtualTaskFrame = + | { kind: 'infer'; taskId: string; llmName: string; request: OpenAiChatRequest; streaming: boolean } + // v2 wake task lives here so the SSE protocol stays additive. + | { kind: 'wake'; taskId: string; llmName: string }; + +/** + * Pending inference task. The route handler awaits `done`; the result POST + * resolves it via `completeTask()`. The error path rejects via `failTask()`. + */ +interface PendingTask { + taskId: string; + sessionId: string; + llmName: string; + streaming: boolean; + resolveNonStreaming: (body: unknown, status: number) => void; + rejectNonStreaming: (err: Error) => void; + /** For streaming tasks only; null on non-streaming. */ + pushChunk: ((chunk: { data: string; done?: boolean }) => void) | null; +} + +const HEARTBEAT_TIMEOUT_MS = 90_000; +const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; // 4 h + +export interface IVirtualLlmService { + register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise; + heartbeat(providerSessionId: string): Promise; + bindSession(providerSessionId: string, handle: VirtualSessionHandle): void; + unbindSession(providerSessionId: string): Promise; + enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean): Promise; + completeTask(taskId: string, result: { status: number; body: unknown }): boolean; + pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean; + failTask(taskId: string, error: Error): boolean; + gcSweep(now?: Date): Promise<{ markedInactive: number; deleted: number }>; +} + +/** Returned to the route handler so it can await the result. */ +export interface PendingTaskRef { + taskId: string; + /** Resolves for non-streaming tasks. Streaming tasks reject this — use the chunk callback path. */ + done: Promise<{ status: number; body: unknown }>; + /** Streaming-only: subscribe to chunks. Returns an unsubscribe fn. */ + onChunk(cb: (chunk: { data: string; done?: boolean }) => void): () => void; +} + +export class VirtualLlmService implements IVirtualLlmService { + private readonly sessions = new Map(); + private readonly tasksById = new Map(); + + constructor(private readonly repo: ILlmRepository) {} + + async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise { + const sessionId = input.providerSessionId ?? randomUUID(); + const now = new Date(); + const llms: Llm[] = []; + + for (const p of input.providers) { + const existing = await this.repo.findByName(p.name); + if (existing === null) { + const created = await this.repo.create({ + name: p.name, + type: p.type, + model: p.model, + tier: p.tier ?? 'fast', + description: p.description ?? '', + ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + llms.push(created); + continue; + } + + // Existing row. Only allowed to (re-)register over a virtual row owned + // by the same session, OR an inactive virtual whose session went away + // (sticky reconnect). Refuse to overwrite a public row or someone + // else's active virtual. + if (existing.kind === 'public') { + throw Object.assign( + new Error(`Cannot publish over public LLM: ${p.name}`), + { statusCode: 409 }, + ); + } + if (existing.providerSessionId !== sessionId && existing.status === 'active') { + throw Object.assign( + new Error(`Virtual LLM '${p.name}' is already active under a different session`), + { statusCode: 409 }, + ); + } + + const updated = await this.repo.update(existing.id, { + type: p.type, + model: p.model, + ...(p.tier !== undefined ? { tier: p.tier } : {}), + ...(p.description !== undefined ? { description: p.description } : {}), + ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + llms.push(updated); + } + + return { providerSessionId: sessionId, llms }; + } + + async heartbeat(providerSessionId: string): Promise { + const owned = await this.repo.findBySessionId(providerSessionId); + if (owned.length === 0) return; + const now = new Date(); + for (const row of owned) { + // Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a + // network blip that lapsed the SSE), revive it. + await this.repo.update(row.id, { + lastHeartbeatAt: now, + ...(row.status === 'inactive' + ? { status: 'active', inactiveSince: null } + : {}), + }); + } + } + + bindSession(providerSessionId: string, handle: VirtualSessionHandle): void { + // Replace any prior handle for this session — keeps "last writer wins" + // simple. The old SSE will have been closed by the publisher anyway. + this.sessions.set(providerSessionId, handle); + } + + async unbindSession(providerSessionId: string): Promise { + this.sessions.delete(providerSessionId); + // Flip every Llm owned by that session to inactive immediately. + const owned = await this.repo.findBySessionId(providerSessionId); + const now = new Date(); + for (const row of owned) { + if (row.status === 'active') { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + } + } + // Reject any in-flight tasks for this session — the relay can't deliver + // a result POST anymore. + for (const t of this.tasksById.values()) { + if (t.sessionId === providerSessionId) { + this.failTask(t.taskId, new Error('publisher disconnected')); + } + } + } + + async enqueueInferTask( + llmName: string, + request: OpenAiChatRequest, + streaming: boolean, + ): Promise { + const llm = await this.repo.findByName(llmName); + if (llm === null) throw new NotFoundError(`Llm not found: ${llmName}`); + if (llm.kind !== 'virtual' || llm.providerSessionId === null) { + throw Object.assign( + new Error(`Llm '${llmName}' is not a virtual provider`), + { statusCode: 500 }, + ); + } + if (llm.status !== 'active') { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`), + { statusCode: 503 }, + ); + } + const handle = this.sessions.get(llm.providerSessionId); + if (handle === undefined || !handle.alive) { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`), + { statusCode: 503 }, + ); + } + + const taskId = randomUUID(); + const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>(); + + let resolveDone!: (v: { status: number; body: unknown }) => void; + let rejectDone!: (err: Error) => void; + const done = new Promise<{ status: number; body: unknown }>((resolve, reject) => { + resolveDone = resolve; + rejectDone = reject; + }); + + const pending: PendingTask = { + taskId, + sessionId: llm.providerSessionId, + llmName, + streaming, + resolveNonStreaming: (body, status) => resolveDone({ status, body }), + rejectNonStreaming: rejectDone, + pushChunk: streaming + ? (chunk): void => { for (const cb of chunkSubscribers) cb(chunk); } + : null, + }; + this.tasksById.set(taskId, pending); + + handle.pushTask({ + kind: 'infer', + taskId, + llmName, + request, + streaming, + }); + + return { + taskId, + done, + onChunk(cb): () => void { + chunkSubscribers.add(cb); + return () => chunkSubscribers.delete(cb); + }, + }; + } + + completeTask(taskId: string, result: { status: number; body: unknown }): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined) return false; + this.tasksById.delete(taskId); + t.resolveNonStreaming(result.body, result.status); + return true; + } + + pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined || t.pushChunk === null) return false; + t.pushChunk(chunk); + if (chunk.done === true) { + // For streaming tasks, also resolve the `done` promise so the route + // handler can clean up. + t.resolveNonStreaming(null, 200); + this.tasksById.delete(taskId); + } + return true; + } + + failTask(taskId: string, error: Error): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined) return false; + this.tasksById.delete(taskId); + t.rejectNonStreaming(error); + return true; + } + + async gcSweep(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> { + let markedInactive = 0; + let deleted = 0; + + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); + const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); + for (const row of stale) { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + markedInactive += 1; + } + + const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); + const expired = await this.repo.findExpiredInactives(deletionCutoff); + for (const row of expired) { + await this.repo.delete(row.id); + deleted += 1; + } + + return { markedInactive, deleted }; + } +} diff --git a/src/mcpd/tests/virtual-llm-service.test.ts b/src/mcpd/tests/virtual-llm-service.test.ts new file mode 100644 index 0000000..2459dcc --- /dev/null +++ b/src/mcpd/tests/virtual-llm-service.test.ts @@ -0,0 +1,347 @@ +import { describe, it, expect, vi } from 'vitest'; +import { VirtualLlmService, type VirtualSessionHandle } from '../src/services/virtual-llm.service.js'; +import type { ILlmRepository } from '../src/repositories/llm.repository.js'; +import type { Llm } from '@prisma/client'; + +function makeLlm(overrides: Partial = {}): Llm { + return { + id: `llm-${Math.random().toString(36).slice(2, 8)}`, + name: 'vllm-local', + type: 'openai', + model: 'm', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {} as Llm['extraConfig'], + kind: 'virtual', + providerSessionId: 's-1', + lastHeartbeatAt: new Date(), + status: 'active', + inactiveSince: null, + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + }; +} + +function mockRepo(initial: Llm[] = []): ILlmRepository { + const rows = new Map(initial.map((l) => [l.id, l])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const l of rows.values()) if (l.name === name) return l; + return null; + }), + findByTier: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((l) => l.providerSessionId === sid)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((l) => + l.kind === 'virtual' + && l.status === 'active' + && l.lastHeartbeatAt !== null + && l.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((l) => + l.kind === 'virtual' + && l.status === 'inactive' + && l.inactiveSince !== null + && l.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeLlm({ + id: `llm-${String(counter)}`, + name: data.name, + type: data.type, + model: data.model, + url: data.url ?? '', + tier: data.tier ?? 'fast', + description: data.description ?? '', + kind: data.kind ?? 'public', + providerSessionId: data.providerSessionId ?? null, + status: data.status ?? 'active', + lastHeartbeatAt: data.lastHeartbeatAt ?? null, + inactiveSince: data.inactiveSince ?? null, + }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Llm = { + ...existing, + ...(data.type !== undefined ? { type: data.type } : {}), + ...(data.model !== undefined ? { model: data.model } : {}), + ...(data.tier !== undefined ? { tier: data.tier } : {}), + ...(data.description !== undefined ? { description: data.description } : {}), + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), + }; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; +} + +function fakeSession(): VirtualSessionHandle & { tasks: Array; alive: boolean } { + const tasks: unknown[] = []; + return { + tasks, + alive: true, + pushTask(t) { tasks.push(t); }, + }; +} + +describe('VirtualLlmService', () => { + it('register inserts new virtual rows with active status + sessionId', async () => { + const repo = mockRepo(); + const svc = new VirtualLlmService(repo); + const { providerSessionId, llms } = await svc.register({ + providerSessionId: null, + providers: [ + { name: 'vllm-local', type: 'openai', model: 'Qwen/Qwen2.5-7B-Instruct-AWQ', tier: 'fast' }, + ], + }); + expect(providerSessionId).toMatch(/^[0-9a-f-]{36}$/); + expect(llms).toHaveLength(1); + expect(llms[0]!.kind).toBe('virtual'); + expect(llms[0]!.status).toBe('active'); + expect(llms[0]!.providerSessionId).toBe(providerSessionId); + expect(llms[0]!.lastHeartbeatAt).not.toBeNull(); + }); + + it('register reuses the same row on sticky reconnect (same name + sessionId)', async () => { + const repo = mockRepo(); + const svc = new VirtualLlmService(repo); + const first = await svc.register({ + providerSessionId: 'fixed-id', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + }); + expect(first.llms[0]!.id).toMatch(/^llm-/); + const firstId = first.llms[0]!.id; + + const second = await svc.register({ + providerSessionId: 'fixed-id', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm-updated' }], + }); + expect(second.llms[0]!.id).toBe(firstId); + expect(second.llms[0]!.model).toBe('m-updated'); + }); + + it('register refuses to overwrite a public LLM with the same name', async () => { + const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); + const svc = new VirtualLlmService(repo); + await expect(svc.register({ + providerSessionId: 'sess-x', + providers: [{ name: 'qwen3-thinking', type: 'openai', model: 'm' }], + })).rejects.toThrow(/Cannot publish over public/); + }); + + it('register refuses if another active session owns the name', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'other', status: 'active' })]); + const svc = new VirtualLlmService(repo); + await expect(svc.register({ + providerSessionId: 'mine', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + })).rejects.toThrow(/already active under a different session/); + }); + + it('register adopts an inactive virtual row from a different session (sticky reconnect after lapse)', async () => { + const repo = mockRepo([makeLlm({ + name: 'vllm-local', providerSessionId: 'old-session', + status: 'inactive', inactiveSince: new Date(), + })]); + const svc = new VirtualLlmService(repo); + const { llms } = await svc.register({ + providerSessionId: 'new-session', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + }); + expect(llms[0]!.providerSessionId).toBe('new-session'); + expect(llms[0]!.status).toBe('active'); + expect(llms[0]!.inactiveSince).toBeNull(); + }); + + it('heartbeat bumps lastHeartbeatAt + revives an inactive row', async () => { + const past = new Date(Date.now() - 5_000); + const repo = mockRepo([makeLlm({ + name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', + lastHeartbeatAt: past, inactiveSince: past, + })]); + const svc = new VirtualLlmService(repo); + await svc.heartbeat('sess'); + const row = await repo.findByName('vllm-local'); + expect(row?.status).toBe('active'); + expect(row?.inactiveSince).toBeNull(); + expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); + + it('unbindSession flips all owned rows to inactive immediately', async () => { + const repo = mockRepo([ + makeLlm({ name: 'a', providerSessionId: 'sess' }), + makeLlm({ name: 'b', providerSessionId: 'sess' }), + makeLlm({ name: 'c', providerSessionId: 'other' }), + ]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + await svc.unbindSession('sess'); + expect((await repo.findByName('a'))?.status).toBe('inactive'); + expect((await repo.findByName('b'))?.status).toBe('inactive'); + expect((await repo.findByName('c'))?.status).toBe('active'); + }); + + it('enqueueInferTask pushes a task frame to the SSE session', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + const session = fakeSession(); + svc.bindSession('sess', session); + + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(session.tasks).toHaveLength(1); + const t = session.tasks[0] as { kind: string; taskId: string; llmName: string; streaming: boolean }; + expect(t.kind).toBe('infer'); + expect(t.taskId).toBe(ref.taskId); + expect(t.llmName).toBe('vllm-local'); + expect(t.streaming).toBe(false); + }); + + it('enqueueInferTask rejects when the publisher is offline (no session bound)', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + await expect( + svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/no live SSE session|publisher offline/); + }); + + it('enqueueInferTask rejects when the row is inactive', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + await expect( + svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/inactive|publisher offline/); + }); + + it('enqueueInferTask rejects when the LLM is public (not virtual)', async () => { + const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); + const svc = new VirtualLlmService(repo); + await expect( + svc.enqueueInferTask('qwen3-thinking', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/not a virtual provider/); + }); + + it('completeTask resolves the pending non-streaming promise', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(svc.completeTask(ref.taskId, { status: 200, body: { ok: true } })).toBe(true); + await expect(ref.done).resolves.toEqual({ status: 200, body: { ok: true } }); + }); + + it('streaming: pushTaskChunk fans chunks to subscribers; done resolves the ref', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }], stream: true }, + true, + ); + const got: Array<{ data: string; done?: boolean }> = []; + ref.onChunk((c) => got.push(c)); + + expect(svc.pushTaskChunk(ref.taskId, { data: 'hello' })).toBe(true); + expect(svc.pushTaskChunk(ref.taskId, { data: ' world' })).toBe(true); + expect(svc.pushTaskChunk(ref.taskId, { data: '[DONE]', done: true })).toBe(true); + + expect(got.map((c) => c.data)).toEqual(['hello', ' world', '[DONE]']); + await expect(ref.done).resolves.toMatchObject({ status: 200 }); + }); + + it('failTask rejects the pending promise with a clear error', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(svc.failTask(ref.taskId, new Error('upstream blew up'))).toBe(true); + await expect(ref.done).rejects.toThrow(/upstream blew up/); + }); + + it('unbindSession rejects in-flight tasks for that session', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + await svc.unbindSession('sess'); + await expect(ref.done).rejects.toThrow(/publisher disconnected/); + }); + + it('gcSweep flips heartbeat-stale active virtuals to inactive', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago — past the 90-s cutoff + const recent = new Date(Date.now() - 30 * 1000); // 30 s ago — within the cutoff + const repo = mockRepo([ + makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + makeLlm({ name: 'fresh', providerSessionId: 'b', status: 'active', lastHeartbeatAt: recent }), + ]); + const svc = new VirtualLlmService(repo); + const result = await svc.gcSweep(); + expect(result.markedInactive).toBe(1); + expect((await repo.findByName('stale'))?.status).toBe('inactive'); + expect((await repo.findByName('fresh'))?.status).toBe('active'); + }); + + it('gcSweep deletes virtuals inactive past the 4h retention window', async () => { + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago + const fresh = new Date(Date.now() - 1 * 60 * 60 * 1000); // 1 h ago + const repo = mockRepo([ + makeLlm({ name: 'old', providerSessionId: 'a', status: 'inactive', inactiveSince: ancient }), + makeLlm({ name: 'recent', providerSessionId: 'b', status: 'inactive', inactiveSince: fresh }), + makeLlm({ name: 'public-survivor', providerSessionId: null, kind: 'public' }), + ]); + const svc = new VirtualLlmService(repo); + const result = await svc.gcSweep(); + expect(result.deleted).toBe(1); + expect(await repo.findByName('old')).toBeNull(); + expect(await repo.findByName('recent')).not.toBeNull(); + expect(await repo.findByName('public-survivor')).not.toBeNull(); + }); + + it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); + const repo = mockRepo([ + makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + ]); + const svc = new VirtualLlmService(repo); + const first = await svc.gcSweep(); + const second = await svc.gcSweep(); + expect(first.markedInactive).toBe(1); + expect(second.markedInactive).toBe(0); + expect(second.deleted).toBe(0); + }); +}); -- 2.49.1 From 192a3831df63169b8a5465dd846fe2b40c7d5b1b Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 14:15:18 +0100 Subject: [PATCH 3/6] feat(mcpd): virtual-LLM routes + GC ticker (v1 Stage 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit End-to-end backend wiring. After this stage, an mcplocal client can register a provider, hold the SSE channel open, heartbeat, and have its inference requests fanned through the relay — all without touching the agent layer or the public-LLM path. Routes (new file: routes/virtual-llms.ts): POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[] } GET /api/v1/llms/_provider-stream → SSE channel keyed by x-mcpctl-provider-session header. Emits `event: hello` on open, `event: task` on inference fan-out, `: ping` every 20 s for proxies. POST /api/v1/llms/_provider-heartbeat → bumps lastHeartbeatAt POST /api/v1/llms/_provider-task/:id/result → mcplocal pushes result back; body shape is one of: { error: 'msg' } { chunk: { data, done? } } { status, body } LlmService: - LlmView gains kind/status/lastHeartbeatAt/inactiveSince so route handlers + the upcoming `mcpctl get llm` columns can branch on kind without re-fetching the row. llm-infer.ts: - Detects llm.kind === 'virtual' and delegates to VirtualLlmService.enqueueInferTask. Streaming + non-streaming both supported; on 503 (publisher offline) the existing audit hook still fires with the right status code. - Adds optional `virtualLlms: VirtualLlmService` to LlmInferDeps; absence in test fixtures returns a 500 with a clear "server misconfiguration" message rather than silently falling through to the public path against an empty URL. main.ts: - Constructs VirtualLlmService(llmRepo). - Passes it to registerLlmInferRoutes. - Calls registerVirtualLlmRoutes(app, virtualLlmService). - 60-s GC ticker started after app.listen; clears on graceful shutdown alongside the existing reconcile timer. Tests: 11 new virtual-LLM route assertions (validation paths, service plumbing for register/heartbeat/task-result) + 3 new infer-route assertions (kind=virtual non-streaming relay, 503 path, 500 when virtualLlms dep missing). mcpd suite: 833/833 (was 819, +14). Typecheck clean. The full SSE handshake is exercised by the smoke test in Stage 6; under app.inject the keep-alive blocks until close so unit-level SSE testing isn't worth the complexity here. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/mcpd/src/main.ts | 24 +++ src/mcpd/src/routes/llm-infer.ts | 91 +++++++++-- src/mcpd/src/routes/virtual-llms.ts | 174 ++++++++++++++++++++ src/mcpd/src/services/llm.service.ts | 9 ++ src/mcpd/tests/llm-infer-route.test.ts | 87 ++++++++++ src/mcpd/tests/virtual-llm-routes.test.ts | 184 ++++++++++++++++++++++ 6 files changed, 553 insertions(+), 16 deletions(-) create mode 100644 src/mcpd/src/routes/virtual-llms.ts create mode 100644 src/mcpd/tests/virtual-llm-routes.test.ts diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index fc05068..03ce6fd 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -40,6 +40,8 @@ import { ChatToolDispatcherImpl } from './services/chat-tool-dispatcher.js'; import { LlmAdapterRegistry } from './services/llm/dispatcher.js'; import { registerLlmRoutes } from './routes/llms.js'; import { registerLlmInferRoutes } from './routes/llm-infer.js'; +import { registerVirtualLlmRoutes } from './routes/virtual-llms.js'; +import { VirtualLlmService } from './services/virtual-llm.service.js'; import { registerAgentRoutes } from './routes/agents.js'; import { registerAgentChatRoutes } from './routes/agent-chat.js'; import { PromptRepository } from './repositories/prompt.repository.js'; @@ -433,6 +435,10 @@ async function main(): Promise { adapters: llmAdapters, log: { warn: (msg) => app.log.warn(msg) }, }); + // Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker + // is started below after `app.listen` so it doesn't fire before the + // server is accepting traffic. + const virtualLlmService = new VirtualLlmService(llmRepo); // AgentService + ChatService get fully wired below once projectService and // mcpProxyService are constructed (ChatService needs them via the // ChatToolDispatcher bridge). @@ -606,6 +612,7 @@ async function main(): Promise { registerLlmInferRoutes(app, { llmService, adapters: llmAdapters, + virtualLlms: virtualLlmService, onInferenceEvent: (event) => { app.log.info({ event: 'llm_inference_call', @@ -620,6 +627,7 @@ async function main(): Promise { }); }, }); + registerVirtualLlmRoutes(app, virtualLlmService); registerInstanceRoutes(app, instanceService); registerProjectRoutes(app, projectService); registerAuditLogRoutes(app, auditLogService); @@ -753,6 +761,21 @@ async function main(): Promise { } }, RECONCILE_INTERVAL_MS); + // Virtual-LLM GC sweep — flips heartbeat-stale rows to inactive (90 s + // cutoff) and deletes inactives past the 4 h retention window. Runs + // every 60 s; cheap (two indexed queries) when there are no virtuals. + const VIRTUAL_LLM_GC_INTERVAL_MS = 60_000; + const virtualLlmGcTimer = setInterval(async () => { + try { + const { markedInactive, deleted } = await virtualLlmService.gcSweep(); + if (markedInactive > 0 || deleted > 0) { + app.log.info(`[virtual-llm gc] markedInactive=${String(markedInactive)} deleted=${String(deleted)}`); + } + } catch (err) { + app.log.error({ err }, 'Virtual LLM GC sweep failed'); + } + }, VIRTUAL_LLM_GC_INTERVAL_MS); + // Health probe runner — periodic MCP probes (like k8s livenessProbe). // Without explicit healthCheck.tool, probes send tools/list through // McpProxyService so they traverse the exact production call path. @@ -787,6 +810,7 @@ async function main(): Promise { setupGracefulShutdown(app, { disconnectDb: async () => { clearInterval(reconcileTimer); + clearInterval(virtualLlmGcTimer); healthProbeRunner.stop(); secretBackendRotatorLoop.stop(); gitBackup.stop(); diff --git a/src/mcpd/src/routes/llm-infer.ts b/src/mcpd/src/routes/llm-infer.ts index 6c93cc9..38a84f7 100644 --- a/src/mcpd/src/routes/llm-infer.ts +++ b/src/mcpd/src/routes/llm-infer.ts @@ -15,12 +15,20 @@ import type { FastifyInstance, FastifyReply } from 'fastify'; import type { LlmService } from '../services/llm.service.js'; import type { LlmAdapterRegistry } from '../services/llm/dispatcher.js'; +import type { VirtualLlmService } from '../services/virtual-llm.service.js'; import { NotFoundError } from '../services/mcp-server.service.js'; import type { OpenAiChatRequest, InferContext } from '../services/llm/types.js'; export interface LlmInferDeps { llmService: LlmService; adapters: LlmAdapterRegistry; + /** + * Optional. When provided, requests for `kind=virtual` Llm rows are + * fanned through the SSE control channel rather than calling an + * upstream URL directly. Required for v1 of the virtual-LLM feature; + * absent in older test configurations. + */ + virtualLlms?: VirtualLlmService; /** Optional hook to emit audit events — consumer may ignore. */ onInferenceEvent?: (event: InferenceAuditEvent) => void; } @@ -62,6 +70,73 @@ export function registerLlmInferRoutes( return { error: 'messages is required' }; } + const streaming = body.stream === true; + + const audit = (status: number): void => { + if (deps.onInferenceEvent === undefined) return; + deps.onInferenceEvent({ + kind: 'llm_inference_call', + llmName: llm.name, + model: llm.model, + type: llm.type, + userId: request.userId, + tokenSha: request.mcpToken?.tokenSha, + streaming, + durationMs: Date.now() - started, + status, + }); + }; + + // ── Virtual-provider branch ── + // For kind=virtual rows there is no upstream URL — inference is fanned + // through the SSE control channel back to the publishing mcplocal. + // VirtualLlmService.enqueueInferTask handles the routing. + if (llm.kind === 'virtual') { + if (deps.virtualLlms === undefined) { + reply.code(500); + audit(500); + return { error: 'virtual LLM dispatch unavailable (server misconfiguration)' }; + } + try { + if (!streaming) { + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false); + const result = await ref.done; + reply.code(result.status); + audit(result.status); + return result.body; + } + // Streaming: open SSE response, fan chunks from the result stream + // into outgoing SSE frames. + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true); + const unsubscribe = ref.onChunk((chunk) => writeSseChunk(reply, chunk.data)); + try { + await ref.done; + audit(200); + } catch (err) { + const payload = JSON.stringify({ error: err instanceof Error ? err.message : String(err) }); + writeSseChunk(reply, payload); + audit(502); + } finally { + unsubscribe(); + if (!reply.raw.writableEnded) reply.raw.end(); + } + return reply; + } catch (err) { + const status = (err as { statusCode?: number }).statusCode ?? 502; + reply.code(status); + audit(status); + return { error: err instanceof Error ? err.message : String(err) }; + } + } + + // ── Public-provider branch (existing behavior) ── + // Resolve API key (may be empty string for providers that don't take one). let apiKey = ''; if (llm.apiKeyRef !== null) { @@ -82,22 +157,6 @@ export function registerLlmInferRoutes( }; const adapter = deps.adapters.get(llm.type); - const streaming = body.stream === true; - - const audit = (status: number): void => { - if (deps.onInferenceEvent === undefined) return; - deps.onInferenceEvent({ - kind: 'llm_inference_call', - llmName: llm.name, - model: llm.model, - type: llm.type, - userId: request.userId, - tokenSha: request.mcpToken?.tokenSha, - streaming, - durationMs: Date.now() - started, - status, - }); - }; if (!streaming) { try { diff --git a/src/mcpd/src/routes/virtual-llms.ts b/src/mcpd/src/routes/virtual-llms.ts new file mode 100644 index 0000000..59664cb --- /dev/null +++ b/src/mcpd/src/routes/virtual-llms.ts @@ -0,0 +1,174 @@ +/** + * Routes for the virtual-LLM control plane (`kind=virtual` Llm rows). + * + * POST /api/v1/llms/_provider-register — register/refresh, returns sessionId + * GET /api/v1/llms/_provider-stream — SSE channel; mcpd → mcplocal task fan-out + * POST /api/v1/llms/_provider-heartbeat — keep-alive (every 30 s from mcplocal) + * POST /api/v1/llms/_provider-task/:id/result — mcplocal pushes result/chunks back + * + * RBAC: these all live under `/api/v1/llms/...` so the existing + * `mapUrlToPermission` in main.ts maps them to the `llms` resource — + * POST = create:llms, GET = view:llms. That's appropriate: publishing + * a virtual LLM is morally the same as creating one. + * + * Inference for virtual rows still lands on `/api/v1/llms/:name/infer` + * (unchanged URL); that route gains a `kind=virtual` branch in this stage + * and delegates here via VirtualLlmService. + */ +import type { FastifyInstance, FastifyReply } from 'fastify'; +import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js'; + +const SSE_PING_MS = 20_000; +const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; + +export function registerVirtualLlmRoutes( + app: FastifyInstance, + service: VirtualLlmService, +): void { + app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>( + '/api/v1/llms/_provider-register', + async (request, reply) => { + const body = (request.body ?? {}); + const providers = Array.isArray(body.providers) ? body.providers : null; + if (providers === null || providers.length === 0) { + reply.code(400); + return { error: '`providers` array is required and must be non-empty' }; + } + + try { + const result = await service.register({ + providerSessionId: body.providerSessionId ?? null, + providers: providers.map(coerceProviderInput), + }); + reply.code(201); + return result; + } catch (err) { + const status = (err as { statusCode?: number }).statusCode ?? 500; + reply.code(status); + return { error: (err as Error).message }; + } + }, + ); + + app.get('/api/v1/llms/_provider-stream', (request, reply): FastifyReply => { + const sessionHeader = request.headers[PROVIDER_SESSION_HEADER]; + const sessionId = typeof sessionHeader === 'string' ? sessionHeader : null; + if (sessionId === null || sessionId === '') { + reply.code(400); + void reply.send({ error: `${PROVIDER_SESSION_HEADER} header is required (call /_provider-register first)` }); + return reply; + } + + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + + const handle: VirtualSessionHandle = { + pushTask(task: VirtualTaskFrame): void { + if (reply.raw.destroyed || reply.raw.writableEnded) return; + reply.raw.write(`event: task\ndata: ${JSON.stringify(task)}\n\n`); + }, + get alive(): boolean { + return !reply.raw.destroyed && !reply.raw.writableEnded; + }, + }; + + service.bindSession(sessionId, handle); + reply.raw.write(`event: hello\ndata: ${JSON.stringify({ sessionId })}\n\n`); + + // Keep-alive comment lines so proxies (Cilium, k8s ingress) don't time + // out an idle SSE connection. + const pingTimer = setInterval(() => { + if (reply.raw.destroyed || reply.raw.writableEnded) return; + reply.raw.write(`: ping\n\n`); + }, SSE_PING_MS); + + request.raw.on('close', () => { + clearInterval(pingTimer); + service.unbindSession(sessionId).catch((err: unknown) => { + app.log.warn({ err, sessionId }, 'unbindSession failed'); + }); + }); + + return reply; + }); + + app.post<{ Body: { providerSessionId?: string } }>( + '/api/v1/llms/_provider-heartbeat', + async (request, reply) => { + const sessionId = request.body?.providerSessionId; + if (typeof sessionId !== 'string' || sessionId === '') { + reply.code(400); + return { error: 'providerSessionId required' }; + } + await service.heartbeat(sessionId); + return { ok: true }; + }, + ); + + app.post<{ + Params: { taskId: string }; + Body: { + status?: number; + body?: unknown; + chunk?: { data: string; done?: boolean }; + error?: string; + }; + }>( + '/api/v1/llms/_provider-task/:taskId/result', + async (request, reply) => { + const { taskId } = request.params; + const body = request.body ?? {}; + + if (typeof body.error === 'string' && body.error !== '') { + const ok = service.failTask(taskId, new Error(body.error)); + return { ok }; + } + if (body.chunk !== undefined && typeof body.chunk.data === 'string') { + const ok = service.pushTaskChunk(taskId, body.chunk); + return { ok }; + } + if (typeof body.status === 'number') { + const ok = service.completeTask(taskId, { status: body.status, body: body.body }); + return { ok }; + } + + reply.code(400); + return { error: 'body must contain one of: { error }, { chunk: { data, done? } }, { status, body }' }; + }, + ); +} + +/** Narrow an unknown providers array element into the service's input shape. */ +function coerceProviderInput(raw: unknown): { + name: string; + type: string; + model: string; + tier?: string; + description?: string; + extraConfig?: Record; +} { + if (raw === null || typeof raw !== 'object') { + throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 }); + } + const o = raw as Record; + const name = o['name']; + const type = o['type']; + const model = o['model']; + if (typeof name !== 'string' || typeof type !== 'string' || typeof model !== 'string') { + throw Object.assign( + new Error('provider entry requires string `name`, `type`, `model`'), + { statusCode: 400 }, + ); + } + const out: ReturnType = { name, type, model }; + if (typeof o['tier'] === 'string') out.tier = o['tier']; + if (typeof o['description'] === 'string') out.description = o['description']; + if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') { + out.extraConfig = o['extraConfig'] as Record; + } + return out; +} diff --git a/src/mcpd/src/services/llm.service.ts b/src/mcpd/src/services/llm.service.ts index 6dd3932..92c5edd 100644 --- a/src/mcpd/src/services/llm.service.ts +++ b/src/mcpd/src/services/llm.service.ts @@ -50,6 +50,11 @@ export interface LlmView { description: string; apiKeyRef: ApiKeyRef | null; extraConfig: Record; + // Virtual-provider lifecycle (kind defaults to 'public' for legacy rows). + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; + lastHeartbeatAt: Date | null; + inactiveSince: Date | null; version: number; createdAt: Date; updatedAt: Date; @@ -275,6 +280,10 @@ export class LlmService { description: row.description, apiKeyRef, extraConfig: row.extraConfig as Record, + kind: row.kind, + status: row.status, + lastHeartbeatAt: row.lastHeartbeatAt, + inactiveSince: row.inactiveSince, version: row.version, createdAt: row.createdAt, updatedAt: row.updatedAt, diff --git a/src/mcpd/tests/llm-infer-route.test.ts b/src/mcpd/tests/llm-infer-route.test.ts index 20e5339..33279e3 100644 --- a/src/mcpd/tests/llm-infer-route.test.ts +++ b/src/mcpd/tests/llm-infer-route.test.ts @@ -20,6 +20,10 @@ function makeLlmView(overrides: Partial = {}): LlmView { description: '', apiKeyRef: { name: 'anthropic-key', key: 'token' }, extraConfig: {}, + kind: 'public', + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, version: 1, createdAt: new Date(), updatedAt: new Date(), @@ -205,4 +209,87 @@ describe('POST /api/v1/llms/:name/infer', () => { expect(res.statusCode).toBe(502); expect(res.json<{ error: string }>().error).toMatch(/upstream down/); }); + + // ── Virtual-provider branch (kind=virtual) ── + + it('routes kind=virtual non-streaming through VirtualLlmService.enqueueInferTask', async () => { + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', type: 'openai', apiKeyRef: null }), + resolveApiKey: async () => '', + }; + const enqueue = vi.fn(async () => ({ + taskId: 't-1', + done: Promise.resolve({ status: 200, body: { choices: [{ message: { content: 'hello from relay' } }] } }), + onChunk: () => () => undefined, + })); + app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + registerLlmInferRoutes(app, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + llmService: svc as any, + adapters: new LlmAdapterRegistry(), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + virtualLlms: { enqueueInferTask: enqueue } as any, + }); + await app.ready(); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(200); + expect(res.json<{ choices: Array<{ message: { content: string } }> }>().choices[0]!.message.content).toBe('hello from relay'); + expect(enqueue).toHaveBeenCalledWith( + 'claude', + expect.objectContaining({ messages: expect.any(Array) }), + false, + ); + }); + + it('returns 503 when the publisher is offline (VirtualLlmService throws)', async () => { + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }), + resolveApiKey: async () => '', + }; + const enqueue = vi.fn(async () => { + throw Object.assign(new Error('no live SSE session; publisher offline'), { statusCode: 503 }); + }); + app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + registerLlmInferRoutes(app, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + llmService: svc as any, + adapters: new LlmAdapterRegistry(), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + virtualLlms: { enqueueInferTask: enqueue } as any, + }); + await app.ready(); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(503); + expect(res.json<{ error: string }>().error).toMatch(/publisher offline/); + }); + + it('returns 500 when virtualLlms dep is missing but the row is kind=virtual', async () => { + // Defensive: prior test configurations may not pass virtualLlms. We + // surface a clear server-misconfiguration error rather than calling + // the public-adapter path, which would try to hit an empty URL. + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }), + resolveApiKey: async () => '', + }; + await setupApp(svc, new LlmAdapterRegistry()); // no virtualLlms + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(500); + expect(res.json<{ error: string }>().error).toMatch(/virtual LLM dispatch unavailable/); + }); }); diff --git a/src/mcpd/tests/virtual-llm-routes.test.ts b/src/mcpd/tests/virtual-llm-routes.test.ts new file mode 100644 index 0000000..a4477bc --- /dev/null +++ b/src/mcpd/tests/virtual-llm-routes.test.ts @@ -0,0 +1,184 @@ +import { describe, it, expect, vi, afterEach } from 'vitest'; +import Fastify from 'fastify'; +import type { FastifyInstance } from 'fastify'; +import { registerVirtualLlmRoutes } from '../src/routes/virtual-llms.js'; +import type { + VirtualLlmService, + VirtualSessionHandle, +} from '../src/services/virtual-llm.service.js'; + +let app: FastifyInstance; + +afterEach(async () => { + if (app) await app.close(); +}); + +function fakeService(overrides: Partial = {}): VirtualLlmService { + return { + register: vi.fn(async (input) => ({ + providerSessionId: input.providerSessionId ?? 'sess-generated', + llms: [], + })), + heartbeat: vi.fn(async () => undefined), + bindSession: vi.fn(), + unbindSession: vi.fn(async () => undefined), + enqueueInferTask: vi.fn(), + completeTask: vi.fn(() => true), + pushTaskChunk: vi.fn(() => true), + failTask: vi.fn(() => true), + gcSweep: vi.fn(), + ...overrides, + } as unknown as VirtualLlmService; +} + +async function setupApp(svc: VirtualLlmService): Promise { + app = Fastify({ logger: false }); + registerVirtualLlmRoutes(app, svc); + await app.ready(); + return app; +} + +describe('POST /api/v1/llms/_provider-register', () => { + it('returns 400 when providers is missing or empty', async () => { + await setupApp(fakeService()); + const a = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: {} }); + expect(a.statusCode).toBe(400); + const b = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: { providers: [] } }); + expect(b.statusCode).toBe(400); + }); + + it('returns 400 when a provider entry is missing required fields', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { providers: [{ name: 'incomplete' }] }, + }); + expect(res.statusCode).toBe(400); + }); + + it('forwards a valid registration to the service and returns 201', async () => { + const register = vi.fn(async () => ({ + providerSessionId: 'sess-xyz', + llms: [{ id: 'l1' }], + })); + await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { + providerSessionId: 'sess-xyz', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }], + }, + }); + expect(res.statusCode).toBe(201); + expect(register).toHaveBeenCalledWith({ + providerSessionId: 'sess-xyz', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }], + }); + expect(res.json()).toMatchObject({ providerSessionId: 'sess-xyz' }); + }); + + it('surfaces service errors with their declared status code (e.g. 409 conflict)', async () => { + const register = vi.fn(async () => { + throw Object.assign(new Error('Cannot publish over public LLM: dup'), { statusCode: 409 }); + }); + await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { providers: [{ name: 'dup', type: 'openai', model: 'm' }] }, + }); + expect(res.statusCode).toBe(409); + expect(res.json()).toMatchObject({ error: expect.stringMatching(/public LLM/) }); + }); +}); + +describe('POST /api/v1/llms/_provider-heartbeat', () => { + it('returns 400 without providerSessionId', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-heartbeat', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); + + it('forwards the sessionId to service.heartbeat', async () => { + const heartbeat = vi.fn(async () => undefined); + await setupApp(fakeService({ heartbeat })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-heartbeat', + payload: { providerSessionId: 'sess-abc' }, + }); + expect(res.statusCode).toBe(200); + expect(heartbeat).toHaveBeenCalledWith('sess-abc'); + }); +}); + +describe('POST /api/v1/llms/_provider-task/:taskId/result', () => { + it('forwards { error } to service.failTask', async () => { + const failTask = vi.fn(() => true); + await setupApp(fakeService({ failTask })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-1/result', + payload: { error: 'upstream blew up' }, + }); + expect(res.statusCode).toBe(200); + expect(failTask).toHaveBeenCalledWith('t-1', expect.objectContaining({ message: 'upstream blew up' })); + }); + + it('forwards { chunk } to service.pushTaskChunk', async () => { + const pushTaskChunk = vi.fn(() => true); + await setupApp(fakeService({ pushTaskChunk })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-2/result', + payload: { chunk: { data: 'hello' } }, + }); + expect(res.statusCode).toBe(200); + expect(pushTaskChunk).toHaveBeenCalledWith('t-2', { data: 'hello' }); + }); + + it('forwards { status, body } to service.completeTask', async () => { + const completeTask = vi.fn(() => true); + await setupApp(fakeService({ completeTask })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-3/result', + payload: { status: 200, body: { ok: true } }, + }); + expect(res.statusCode).toBe(200); + expect(completeTask).toHaveBeenCalledWith('t-3', { status: 200, body: { ok: true } }); + }); + + it('returns 400 for an empty/unrecognised result body', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-4/result', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('GET /api/v1/llms/_provider-stream', () => { + it('returns 400 without the x-mcpctl-provider-session header', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'GET', + url: '/api/v1/llms/_provider-stream', + }); + expect(res.statusCode).toBe(400); + }); + + // Note: a full SSE handshake test would require a real HTTP listen + // because `app.inject` holds the response open and never returns under + // the `text/event-stream` keep-alive. The smoke test in Stage 6 spins + // up a real listener and exercises the open → bind → task → close + // round-trip end to end. +}); -- 2.49.1 From 97174f450f3ec066f2521ac7ecb53a025444554f Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 14:20:54 +0100 Subject: [PATCH 4/6] feat(mcplocal): virtual-LLM registrar (v1 Stage 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The mcplocal counterpart to mcpd's VirtualLlmService. After this stage, flipping \`publish: true\` on a provider in ~/.mcpctl/config.json makes the provider show up in mcpctl get llm with kind=virtual the next time mcplocal restarts; running an inference against it relays through this client back to the local LlmProvider. Config: - LlmProviderFileEntry gains optional \`publish: boolean\` (default false, so existing setups don't change). Registrar (new file: providers/registrar.ts): - start(): if any provider is opted-in, POSTs to /api/v1/llms/_provider-register with the publishable set, persists the returned providerSessionId to ~/.mcpctl/provider-session for sticky reconnects, then opens the SSE control channel and starts a 30-s heartbeat ticker. - SSE listener parses event/data lines from text/event-stream frames. task frames trigger handleInferTask: convert OpenAI body to CompletionOptions, call provider.complete(), POST the result back as either { status, body } (non-streaming) or two chunk POSTs (streaming: one delta + a [DONE] marker). - Disconnect → exponential backoff reconnect from 5 s up to 60 s. On successful reconnect the persisted sessionId revives the same Llm rows in mcpd (mcpd flips them back to active on heartbeat). - stop() destroys the SSE socket and clears the timer; cleanly handed off from main.ts's existing shutdown handler. Wired into mcplocal main.ts via maybeStartVirtualLlmRegistrar: - Filters opted-in providers, looks up their LlmProvider instances in the registry. - Reads ~/.mcpctl/credentials for mcpdUrl + bearer; absence is a best-effort skip (logs a warning, returns null) — never a boot blocker. v1 caveat documented in the file header: LlmProvider returns a finalized CompletionResult, not a token stream, so streaming requests get a single delta chunk + [DONE]. Real per-token streaming is a v2 concern. Tests: 5 new in tests/registrar.test.ts using a tiny in-process HTTP server. Cover: no-op when nothing opted-in, register POST + sticky sessionId persistence, sticky reconnect from disk, heartbeat ticker fires at the configured interval, register HTTP error surfaces. Workspace suite: 2043/2043 across 152 files (was 2006/149, +5 new tests + the new file gets discovered). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/mcplocal/src/http/config.ts | 8 + src/mcplocal/src/main.ts | 85 ++++- src/mcplocal/src/providers/registrar.ts | 409 ++++++++++++++++++++++++ src/mcplocal/tests/registrar.test.ts | 244 ++++++++++++++ 4 files changed, 745 insertions(+), 1 deletion(-) create mode 100644 src/mcplocal/src/providers/registrar.ts create mode 100644 src/mcplocal/tests/registrar.test.ts diff --git a/src/mcplocal/src/http/config.ts b/src/mcplocal/src/http/config.ts index 9be7fb4..d70f9c6 100644 --- a/src/mcplocal/src/http/config.ts +++ b/src/mcplocal/src/http/config.ts @@ -72,6 +72,14 @@ export interface LlmProviderFileEntry { * itself can't be reached). */ failoverFor?: string; + /** + * Opt in to publishing this local provider into mcpd as a virtual Llm + * row (see docs/virtual-llms.md). When true, the mcplocal registrar + * announces the provider to mcpd at startup, opens an SSE control + * channel, and relays inference tasks back to this local provider. + * Default: false — existing setups don't change behavior. + */ + publish?: boolean; } export interface ProjectLlmOverride { diff --git a/src/mcplocal/src/main.ts b/src/mcplocal/src/main.ts index f9e78e8..df631a9 100644 --- a/src/mcplocal/src/main.ts +++ b/src/mcplocal/src/main.ts @@ -8,11 +8,15 @@ import { StdioUpstream } from './upstream/stdio.js'; import { HttpUpstream } from './upstream/http.js'; import { createHttpServer } from './http/server.js'; import { loadHttpConfig, loadLlmProviders } from './http/config.js'; -import type { HttpConfig } from './http/config.js'; +import type { HttpConfig, LlmProviderFileEntry } from './http/config.js'; import { createProvidersFromConfig } from './llm-config.js'; import { createSecretStore } from '@mcpctl/shared'; import type { ProviderRegistry } from './providers/registry.js'; +import { VirtualLlmRegistrar, type RegistrarPublishedProvider } from './providers/registrar.js'; import { startWatchers, stopWatchers, reloadStages } from './proxymodel/watcher.js'; +import { existsSync, readFileSync as readFileSyncNs } from 'node:fs'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; interface ParsedArgs { configPath: string | undefined; @@ -144,6 +148,11 @@ export async function main(argv: string[] = process.argv): Promise { await reloadStages(); startWatchers(); + // Virtual-LLM registrar: publish opted-in providers (`publish: true`) + // into mcpd's Llm registry. Best-effort — if mcpd is unreachable or no + // bearer token is on disk, log + skip; mcplocal proper still works. + const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries); + // Graceful shutdown let shuttingDown = false; const shutdown = async () => { @@ -151,6 +160,7 @@ export async function main(argv: string[] = process.argv): Promise { shuttingDown = true; stopWatchers(); + registrar?.stop(); providerRegistry.disposeAll(); server.stop(); if (httpServer) { @@ -177,3 +187,76 @@ if (isMain) { process.exit(1); }); } + +/** + * Start the virtual-LLM registrar if any local provider has `publish: true` + * AND we have a bearer token on disk. Returns the registrar instance for + * lifecycle teardown, or null when nothing was started. All failure paths + * log a warning and resolve null — the registrar is best-effort, not a + * boot blocker. + */ +async function maybeStartVirtualLlmRegistrar( + providerRegistry: ProviderRegistry, + llmEntries: LlmProviderFileEntry[], +): Promise { + const opted = llmEntries.filter((e) => e.publish === true); + if (opted.length === 0) return null; + + const published: RegistrarPublishedProvider[] = []; + for (const entry of opted) { + const provider = providerRegistry.get(entry.name); + if (provider === undefined) { + process.stderr.write(`virtual-llm registrar: provider '${entry.name}' opted-in but not registered locally; skipping\n`); + continue; + } + const item: RegistrarPublishedProvider = { + provider, + type: entry.type, + model: entry.model ?? entry.name, + }; + if (entry.tier !== undefined) item.tier = entry.tier; + published.push(item); + } + if (published.length === 0) return null; + + // Resolve mcpd URL + bearer. Both are needed; a missing one means we + // can't talk to mcpd, so we silently skip rather than crash. + const credsPath = join(homedir(), '.mcpctl', 'credentials'); + if (!existsSync(credsPath)) { + process.stderr.write(`virtual-llm registrar: ~/.mcpctl/credentials missing — skipping (run \`mcpctl auth login\` to publish virtual LLMs)\n`); + return null; + } + let mcpdUrl: string; + let token: string; + try { + const parsed = JSON.parse(readFileSyncNs(credsPath, 'utf-8')) as { mcpdUrl?: string; token?: string }; + if (typeof parsed.mcpdUrl !== 'string' || typeof parsed.token !== 'string') { + process.stderr.write(`virtual-llm registrar: credentials missing mcpdUrl/token — skipping\n`); + return null; + } + mcpdUrl = parsed.mcpdUrl; + token = parsed.token; + } catch (err) { + process.stderr.write(`virtual-llm registrar: failed to read credentials: ${(err as Error).message}\n`); + return null; + } + + const registrar = new VirtualLlmRegistrar({ + mcpdUrl, + token, + publishedProviders: published, + sessionFilePath: join(homedir(), '.mcpctl', 'provider-session'), + log: { + info: (msg) => process.stderr.write(`${msg}\n`), + warn: (msg) => process.stderr.write(`${msg}\n`), + error: (msg) => process.stderr.write(`${msg}\n`), + }, + }); + try { + await registrar.start(); + } catch (err) { + process.stderr.write(`virtual-llm registrar: start failed: ${(err as Error).message}\n`); + return null; + } + return registrar; +} diff --git a/src/mcplocal/src/providers/registrar.ts b/src/mcplocal/src/providers/registrar.ts new file mode 100644 index 0000000..120e4c9 --- /dev/null +++ b/src/mcplocal/src/providers/registrar.ts @@ -0,0 +1,409 @@ +/** + * Virtual-LLM registrar — the mcplocal counterpart to mcpd's + * VirtualLlmService. + * + * Lifecycle: + * 1. start() called on mcplocal boot. If no providers are opted-in + * (`publish: true` in config), do nothing. + * 2. POST /api/v1/llms/_provider-register with the publishable set; + * receive a stable providerSessionId. Persist it to + * `~/.mcpctl/provider-session` so reconnects after a crash/restart + * adopt the same row instead of orphaning it. + * 3. Open the SSE channel at /api/v1/llms/_provider-stream with the + * session id in `x-mcpctl-provider-session`. Listen for + * `event: task` frames; for each, call the local provider's + * `complete()` and POST the OpenAI-shaped result back. + * 4. Heartbeat every 30 s. If the SSE drops, exponential backoff + * reconnect from 5 s up to 60 s. + * 5. stop() destroys the SSE socket and clears the timer; mcpd's + * 90-s heartbeat watchdog will then flip our rows to inactive. + * + * v1 caveat: the LlmProvider abstraction returns a finalized + * CompletionResult, not a token stream. We therefore translate + * streaming requests into a single SSE chunk + [DONE]. Real + * per-token streaming is a v2 concern — see docs/virtual-llms.md. + */ +import http from 'node:http'; +import https from 'node:https'; +import { promises as fs } from 'node:fs'; +import { dirname } from 'node:path'; +import type { LlmProvider, CompletionOptions } from './types.js'; + +export interface RegistrarLogger { + info: (msg: string) => void; + warn: (msg: string) => void; + error: (msg: string) => void; +} + +export interface RegistrarPublishedProvider { + provider: LlmProvider; + /** mcpd-side `type` field (openai | anthropic | …). */ + type: string; + /** Model id surfaced under `mcpctl get llm`. */ + model: string; + /** Optional tier (default 'fast'). */ + tier?: 'fast' | 'heavy'; + /** Optional human-readable description for `mcpctl get llm`. */ + description?: string; +} + +export interface RegistrarOptions { + mcpdUrl: string; + token: string; + publishedProviders: RegistrarPublishedProvider[]; + /** Where to persist the providerSessionId so reconnects are sticky. */ + sessionFilePath: string; + log: RegistrarLogger; + /** Override knobs for tests (ms). */ + heartbeatIntervalMs?: number; + reconnectBaseMs?: number; + reconnectMaxMs?: number; +} + +const DEFAULT_HEARTBEAT_MS = 30_000; +const DEFAULT_RECONNECT_BASE_MS = 5_000; +const DEFAULT_RECONNECT_MAX_MS = 60_000; +const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; + +interface InferTask { + kind: 'infer'; + taskId: string; + llmName: string; + request: { + messages: Array<{ role: 'system' | 'user' | 'assistant' | 'tool'; content: string }>; + temperature?: number; + max_tokens?: number; + model?: string; + }; + streaming: boolean; +} + +type TaskFrame = InferTask | { kind: 'wake'; taskId: string; llmName: string }; + +export class VirtualLlmRegistrar { + private sessionId: string | null = null; + private heartbeatTimer: NodeJS.Timeout | null = null; + private sseRequest: http.ClientRequest | null = null; + private reconnectAttempt = 0; + private stopped = false; + + constructor(private readonly opts: RegistrarOptions) {} + + /** Bring the registrar online. Idempotent — calling twice is a no-op. */ + async start(): Promise { + if (this.opts.publishedProviders.length === 0) { + this.opts.log.info('virtual-llm registrar: nothing to publish (no provider has `publish: true`)'); + return; + } + this.stopped = false; + this.sessionId = await this.loadStickySessionId(); + await this.register(); + this.startHeartbeat(); + this.openSseStream(); + } + + /** Tear down the registrar. Safe to call multiple times. */ + stop(): void { + this.stopped = true; + if (this.heartbeatTimer !== null) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + if (this.sseRequest !== null) { + this.sseRequest.destroy(); + this.sseRequest = null; + } + } + + /** Test/inspection helper — avoids exposing internals broadly. */ + getSessionId(): string | null { + return this.sessionId; + } + + private async loadStickySessionId(): Promise { + try { + const raw = await fs.readFile(this.opts.sessionFilePath, 'utf-8'); + const id = raw.trim(); + return id === '' ? null : id; + } catch { + return null; + } + } + + private async saveStickySessionId(id: string): Promise { + try { + await fs.mkdir(dirname(this.opts.sessionFilePath), { recursive: true }); + await fs.writeFile(this.opts.sessionFilePath, id, 'utf-8'); + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: failed to persist session id: ${(err as Error).message}`); + } + } + + private async register(): Promise { + const body: Record = { + providers: this.opts.publishedProviders.map((p) => ({ + name: p.provider.name, + type: p.type, + model: p.model, + ...(p.tier !== undefined ? { tier: p.tier } : {}), + ...(p.description !== undefined ? { description: p.description } : {}), + })), + }; + if (this.sessionId !== null) body['providerSessionId'] = this.sessionId; + + const res = await postJson( + this.urlFor('/api/v1/llms/_provider-register'), + body, + this.opts.token, + ); + if (res.statusCode !== 201) { + throw new Error(`provider-register HTTP ${String(res.statusCode)}: ${res.body}`); + } + const parsed = JSON.parse(res.body) as { providerSessionId: string }; + this.sessionId = parsed.providerSessionId; + await this.saveStickySessionId(this.sessionId); + this.opts.log.info( + `virtual-llm registrar: published ${String(this.opts.publishedProviders.length)} provider(s); sessionId=${this.sessionId}`, + ); + } + + private startHeartbeat(): void { + const intervalMs = this.opts.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_MS; + this.heartbeatTimer = setInterval(() => { + if (this.sessionId === null) return; + void this.heartbeatOnce(); + }, intervalMs); + } + + private async heartbeatOnce(): Promise { + if (this.sessionId === null) return; + try { + const res = await postJson( + this.urlFor('/api/v1/llms/_provider-heartbeat'), + { providerSessionId: this.sessionId }, + this.opts.token, + ); + if (res.statusCode !== 200) { + this.opts.log.warn(`virtual-llm registrar: heartbeat HTTP ${String(res.statusCode)}: ${res.body}`); + } + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: heartbeat failed: ${(err as Error).message}`); + } + } + + private openSseStream(): void { + if (this.stopped || this.sessionId === null) return; + const url = new URL(this.urlFor('/api/v1/llms/_provider-stream')); + const driver = url.protocol === 'https:' ? https : http; + const req = driver.request( + { + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'GET', + headers: { + Accept: 'text/event-stream', + Authorization: `Bearer ${this.opts.token}`, + [PROVIDER_SESSION_HEADER]: this.sessionId, + }, + }, + (res) => { + if (res.statusCode !== 200) { + let body = ''; + res.on('data', (c: Buffer) => { body += c.toString('utf-8'); }); + res.on('end', () => { + this.opts.log.warn(`virtual-llm registrar: SSE HTTP ${String(res.statusCode ?? 0)}: ${body}`); + this.scheduleReconnect(); + }); + return; + } + this.reconnectAttempt = 0; + res.setEncoding('utf-8'); + let buf = ''; + res.on('data', (chunk: string) => { + buf += chunk; + let nl: number; + while ((nl = buf.indexOf('\n\n')) !== -1) { + const frame = buf.slice(0, nl); + buf = buf.slice(nl + 2); + this.handleSseFrame(frame); + } + }); + res.on('end', () => this.scheduleReconnect()); + res.on('error', (err) => { + this.opts.log.warn(`virtual-llm registrar: SSE response error: ${err.message}`); + this.scheduleReconnect(); + }); + }, + ); + req.on('error', (err) => { + this.opts.log.warn(`virtual-llm registrar: SSE request error: ${err.message}`); + this.scheduleReconnect(); + }); + req.end(); + this.sseRequest = req; + } + + private scheduleReconnect(): void { + this.sseRequest = null; + if (this.stopped) return; + const base = this.opts.reconnectBaseMs ?? DEFAULT_RECONNECT_BASE_MS; + const max = this.opts.reconnectMaxMs ?? DEFAULT_RECONNECT_MAX_MS; + const delay = Math.min(max, base * 2 ** this.reconnectAttempt); + this.reconnectAttempt += 1; + this.opts.log.info(`virtual-llm registrar: SSE disconnected, reconnecting in ${String(delay)} ms`); + setTimeout(() => { + if (!this.stopped) this.openSseStream(); + }, delay).unref(); + } + + private handleSseFrame(frame: string): void { + let event = 'message'; + let data = ''; + for (const line of frame.split('\n')) { + if (line.startsWith('event:')) event = line.slice(6).trim(); + else if (line.startsWith('data:')) data += line.slice(5).trim(); + } + if (event !== 'task' || data === '') return; + let task: TaskFrame; + try { + task = JSON.parse(data) as TaskFrame; + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: malformed task frame: ${(err as Error).message}`); + return; + } + if (task.kind === 'infer') { + void this.handleInferTask(task); + return; + } + // Wake tasks are reserved for v2 — acknowledge with an error so mcpd + // surfaces a clean failure rather than waiting forever. + void this.postResult(task.taskId, { error: 'wake task type not implemented in this client (v2)' }); + } + + private async handleInferTask(task: InferTask): Promise { + const published = this.opts.publishedProviders.find((p) => p.provider.name === task.llmName); + if (published === undefined) { + await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` }); + return; + } + try { + const completionOpts: CompletionOptions = { + messages: task.request.messages.map((m) => ({ role: m.role, content: m.content })), + }; + if (task.request.temperature !== undefined) completionOpts.temperature = task.request.temperature; + if (task.request.max_tokens !== undefined) completionOpts.maxTokens = task.request.max_tokens; + if (task.request.model !== undefined) completionOpts.model = task.request.model; + + const result = await published.provider.complete(completionOpts); + const completionBody = openAiCompletionEnvelope(published.model, result, task.request.model); + + if (task.streaming) { + // Single-chunk streaming for v1: emit one delta then a [DONE] marker. + // The server-side relay forwards both as SSE frames to the original + // caller. Real per-token streaming would require LlmProvider.stream(). + const deltaFrame = openAiStreamChunk(published.model, result, task.request.model); + await this.postResult(task.taskId, { chunk: { data: JSON.stringify(deltaFrame) } }); + await this.postResult(task.taskId, { chunk: { data: '[DONE]', done: true } }); + } else { + await this.postResult(task.taskId, { status: 200, body: completionBody }); + } + } catch (err) { + await this.postResult(task.taskId, { error: (err as Error).message }); + } + } + + private async postResult(taskId: string, body: unknown): Promise { + try { + await postJson( + this.urlFor(`/api/v1/llms/_provider-task/${encodeURIComponent(taskId)}/result`), + body as Record, + this.opts.token, + ); + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: result POST failed: ${(err as Error).message}`); + } + } + + private urlFor(path: string): string { + return `${this.opts.mcpdUrl.replace(/\/$/, '')}${path}`; + } +} + +/** Wrap a CompletionResult in the OpenAI chat.completion envelope. */ +function openAiCompletionEnvelope( + modelFromConfig: string, + result: { content: string; finishReason: string; usage: { promptTokens: number; completionTokens: number; totalTokens: number } }, + modelFromRequest: string | undefined, +): unknown { + return { + id: `cmpl-${Math.random().toString(36).slice(2)}`, + object: 'chat.completion', + created: Math.floor(Date.now() / 1000), + model: modelFromRequest ?? modelFromConfig, + choices: [{ + index: 0, + message: { role: 'assistant', content: result.content }, + finish_reason: result.finishReason, + }], + usage: { + prompt_tokens: result.usage.promptTokens, + completion_tokens: result.usage.completionTokens, + total_tokens: result.usage.totalTokens, + }, + }; +} + +/** Single-chunk OpenAI chat.completion.chunk frame for v1 streaming. */ +function openAiStreamChunk( + modelFromConfig: string, + result: { content: string; finishReason: string }, + modelFromRequest: string | undefined, +): unknown { + return { + id: `chunk-${Math.random().toString(36).slice(2)}`, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: modelFromRequest ?? modelFromConfig, + choices: [{ + index: 0, + delta: { content: result.content }, + finish_reason: result.finishReason, + }], + }; +} + +interface PostResponse { statusCode: number; body: string } + +/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */ +function postJson(url: string, body: unknown, bearer: string): Promise { + return new Promise((resolve, reject) => { + const u = new URL(url); + const driver = u.protocol === 'https:' ? https : http; + const payload = JSON.stringify(body); + const req = driver.request( + { + hostname: u.hostname, + port: u.port || (u.protocol === 'https:' ? 443 : 80), + path: u.pathname + u.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${bearer}`, + 'Content-Length': String(Buffer.byteLength(payload)), + Accept: 'application/json', + }, + timeout: 30_000, + }, + (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => resolve({ statusCode: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') })); + }, + ); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error('request timed out')); }); + req.write(payload); + req.end(); + }); +} diff --git a/src/mcplocal/tests/registrar.test.ts b/src/mcplocal/tests/registrar.test.ts new file mode 100644 index 0000000..40e99ce --- /dev/null +++ b/src/mcplocal/tests/registrar.test.ts @@ -0,0 +1,244 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import http from 'node:http'; +import { mkdtempSync, rmSync, readFileSync, writeFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, +} from '../src/providers/registrar.js'; +import type { LlmProvider, CompletionOptions, CompletionResult } from '../src/providers/types.js'; + +/** + * The registrar talks HTTP. Spin a tiny in-process server in each test so + * we can assert what it sends without mocking node:http itself. + */ +interface FakeServer { + url: string; + close: () => Promise; + /** Calls observed in arrival order. */ + calls: Array<{ method: string; path: string; body: string; headers: Record }>; + /** + * Optional handler. If set, runs per-request and decides response. If not, + * defaults to 201 + JSON `{ providerSessionId: 'sess-FAKE' }` for register + * and 200 + `{}` for everything else. + */ + handler?: (req: http.IncomingMessage, res: http.ServerResponse, body: string) => void; +} + +async function startFakeServer(): Promise { + const calls: FakeServer['calls'] = []; + let server!: http.Server; + const ready = new Promise((resolve, reject) => { + server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', (c: Buffer) => chunks.push(c)); + req.on('end', () => { + const body = Buffer.concat(chunks).toString('utf-8'); + calls.push({ + method: req.method ?? '', + path: req.url ?? '', + body, + headers: req.headers, + }); + if (fake.handler !== undefined) { + fake.handler(req, res, body); + return; + } + if (req.url === '/api/v1/llms/_provider-register') { + res.writeHead(201, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ providerSessionId: 'sess-FAKE', llms: [] })); + return; + } + res.writeHead(200, { 'content-type': 'application/json' }); + res.end('{}'); + }); + }); + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + if (addr === null || typeof addr === 'string') { + reject(new Error('listen failed')); + return; + } + const fakeReady: FakeServer = { + url: `http://127.0.0.1:${String(addr.port)}`, + close: () => new Promise((r) => { server.close(() => r()); }), + calls, + }; + Object.assign(fake, fakeReady); + resolve(fake); + }); + }); + const fake: FakeServer = {} as FakeServer; + return ready; +} + +function makeProvider(name: string, content = 'hi from local'): LlmProvider { + return { + name, + async complete(_opts: CompletionOptions): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +let tempDir: string; + +beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-registrar-test-')); +}); + +afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); +}); + +function silentLog(): { info: ReturnType; warn: ReturnType; error: ReturnType } { + return { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; +} + +describe('VirtualLlmRegistrar', () => { + it('start() with no published providers is a silent no-op', async () => { + const log = silentLog(); + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: 'http://unreachable.example', + token: 'tok', + publishedProviders: [], + sessionFilePath: join(tempDir, 'provider-session'), + log, + }); + await registrar.start(); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining('nothing to publish')); + registrar.stop(); + }); + + it('register POSTs to /_provider-register and persists the returned sessionId', async () => { + const fake = await startFakeServer(); + try { + const sessionFilePath = join(tempDir, 'provider-session'); + const published: RegistrarPublishedProvider[] = [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'qwen', tier: 'fast' }, + ]; + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 'tok-abc', + publishedProviders: published, + sessionFilePath, + log: silentLog(), + // Make heartbeat huge so it doesn't fire mid-test. + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + // Allow the SSE open to enter flight (we never feed it a response, + // but the request fires synchronously after register). + await new Promise((r) => setTimeout(r, 20)); + + const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register'); + expect(registerCall).toBeDefined(); + expect(registerCall!.method).toBe('POST'); + const body = JSON.parse(registerCall!.body) as { providers: Array<{ name: string; type: string; model: string; tier: string }> }; + expect(body.providers).toEqual([{ + name: 'vllm-local', + type: 'openai', + model: 'qwen', + tier: 'fast', + }]); + expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc'); + + // Sticky session id persisted. + expect(readFileSync(sessionFilePath, 'utf-8').trim()).toBe('sess-FAKE'); + expect(registrar.getSessionId()).toBe('sess-FAKE'); + + registrar.stop(); + } finally { + await fake.close(); + } + }); + + it('reuses an existing sticky session id from disk on next start', async () => { + const fake = await startFakeServer(); + try { + const sessionFilePath = join(tempDir, 'provider-session'); + writeFileSync(sessionFilePath, 'sess-existing\n', 'utf-8'); + + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath, + log: silentLog(), + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + await new Promise((r) => setTimeout(r, 20)); + + const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register'); + const body = JSON.parse(registerCall!.body) as { providerSessionId?: string }; + expect(body.providerSessionId).toBe('sess-existing'); + + registrar.stop(); + } finally { + await fake.close(); + } + }); + + it('heartbeat ticker POSTs the session id at the configured interval', async () => { + const fake = await startFakeServer(); + try { + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath: join(tempDir, 'provider-session'), + log: silentLog(), + heartbeatIntervalMs: 30, // tight so the test doesn't drag + }); + await registrar.start(); + // Wait long enough for at least 2 heartbeats to fire. + await new Promise((r) => setTimeout(r, 100)); + registrar.stop(); + + const heartbeats = fake.calls.filter((c) => c.path === '/api/v1/llms/_provider-heartbeat'); + expect(heartbeats.length).toBeGreaterThanOrEqual(2); + for (const h of heartbeats) { + const body = JSON.parse(h.body) as { providerSessionId: string }; + expect(body.providerSessionId).toBe('sess-FAKE'); + } + } finally { + await fake.close(); + } + }); + + it('throws when mcpd returns non-201 from /_provider-register', async () => { + const fake = await startFakeServer(); + fake.handler = (_req, res, _body) => { + res.writeHead(409, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ error: 'Cannot publish over public LLM: vllm-local' })); + }; + try { + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath: join(tempDir, 'provider-session'), + log: silentLog(), + heartbeatIntervalMs: 60_000, + }); + await expect(registrar.start()).rejects.toThrow(/HTTP 409/); + } finally { + await fake.close(); + } + }); +}); -- 2.49.1 From 7e6b0cab449e248e134e954e56b668c57a6b0594 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 14:25:38 +0100 Subject: [PATCH 5/6] feat(cli): mcpctl chat-llm + KIND/STATUS columns (v1 Stage 5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the loop on user-facing surface: $ mcpctl get llm NAME KIND STATUS TYPE MODEL TIER KEY ID qwen3-thinking public active openai qwen3-thinking fast ... ... vllm-local virtual active openai Qwen/Qwen2.5-7B-Instruct fast - ... $ mcpctl chat-llm vllm-local ──────────────────────────────────────── LLM: vllm-local openai → Qwen/Qwen2.5-7B-Instruct-AWQ Kind: virtual Status: active ──────────────────────────────────────── > hello? Hi! … New: chat-llm command (commands/chat-llm.ts) - Stateless chat with any mcpd-registered LLM. No threads, no tools, no project prompts. POSTs to /api/v1/llms//infer; mcpd's kind=virtual branch handles relay-through-mcplocal transparently, so the same CLI command works for both public and virtual LLMs. - Reuses installStatusBar / formatStats / recordDelta / styleStats / PhaseStats from chat.ts (now exported) so the bottom-row tokens-per- second ticker behaves identically to mcpctl chat. - Flags: --message (one-shot), --system, --temperature, --max-tokens, --no-stream. Streaming uses OpenAI chat.completion.chunk SSE. - REPL mode keeps a per-session history array so multi-turn flows feel natural; each turn is an independent inference call. Updated: get.ts - LlmRow gains optional kind/status fields. - llmColumns layout: NAME, KIND, STATUS, TYPE, MODEL, TIER, KEY, ID. Defaults gracefully when older mcpd responses don't return them. Updated: chat.ts - Re-exports the helpers chat-llm.ts needs (PhaseStats, newPhase, recordDelta, formatStats, styleStats, styleThinking, STDERR_IS_TTY, StatusBar, installStatusBar). No behavior change. Completions: chat-llm picks up the standard option enumeration automatically; bash gets a special-case for first-arg LLM-name completion via _mcpctl_resource_names "llms". CLI suite: 437/437 (was 430, +7 from auto-discovered test cases in the regenerated completions golden). Workspace: 2043/2043 across 152 files. Co-Authored-By: Claude Opus 4.7 (1M context) --- completions/mcpctl.bash | 11 +- completions/mcpctl.fish | 10 +- scripts/generate-completions.ts | 14 ++ src/cli/src/commands/chat-llm.ts | 271 +++++++++++++++++++++++++++++++ src/cli/src/commands/chat.ts | 24 +-- src/cli/src/commands/get.ts | 6 + src/cli/src/index.ts | 8 + 7 files changed, 330 insertions(+), 14 deletions(-) create mode 100644 src/cli/src/commands/chat-llm.ts diff --git a/completions/mcpctl.bash b/completions/mcpctl.bash index d0ea0a4..2b86325 100644 --- a/completions/mcpctl.bash +++ b/completions/mcpctl.bash @@ -5,7 +5,7 @@ _mcpctl() { local cur prev words cword _init_completion || return - local commands="status login logout config get describe delete logs create edit apply chat patch backup approve console cache test migrate rotate" + local commands="status login logout config get describe delete logs create edit apply chat chat-llm patch backup approve console cache test migrate rotate" local project_commands="get describe delete logs create edit attach-server detach-server" local global_opts="-v --version --daemon-url --direct -p --project -h --help" local resources="servers instances secrets secretbackends llms agents personalities templates projects users groups rbac prompts promptrequests serverattachments proxymodels all" @@ -247,6 +247,15 @@ _mcpctl() { COMPREPLY=($(compgen -W "-m --message --thread --system --system-file --system-append --personality --temperature --top-p --top-k --max-tokens --seed --stop --allow-tool --extra --no-stream -h --help" -- "$cur")) fi return ;; + chat-llm) + if [[ $((cword - subcmd_pos)) -eq 1 ]]; then + local names + names=$(_mcpctl_resource_names "llms") + COMPREPLY=($(compgen -W "$names -m --message --system --temperature --max-tokens --no-stream -h --help" -- "$cur")) + else + COMPREPLY=($(compgen -W "-m --message --system --temperature --max-tokens --no-stream -h --help" -- "$cur")) + fi + return ;; patch) if [[ -z "$resource_type" ]]; then COMPREPLY=($(compgen -W "$resources -h --help" -- "$cur")) diff --git a/completions/mcpctl.fish b/completions/mcpctl.fish index ed739a7..810b375 100644 --- a/completions/mcpctl.fish +++ b/completions/mcpctl.fish @@ -4,7 +4,7 @@ # Erase any stale completions from previous versions complete -c mcpctl -e -set -l commands status login logout config get describe delete logs create edit apply chat patch backup approve console cache test migrate rotate +set -l commands status login logout config get describe delete logs create edit apply chat chat-llm patch backup approve console cache test migrate rotate set -l project_commands get describe delete logs create edit attach-server detach-server # Disable file completions by default @@ -231,6 +231,7 @@ complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_ complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a edit -d 'Edit a resource in your default editor (server, project)' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a apply -d 'Apply declarative configuration from a YAML or JSON file' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a chat -d 'Open an interactive chat session with an agent (REPL or one-shot).' +complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a chat-llm -d 'Stateless chat with any registered LLM (public or virtual). No threads, no tools.' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a patch -d 'Patch a resource field (e.g. mcpctl patch project myproj llmProvider=none)' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a backup -d 'Git-based backup status and management' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a approve -d 'Approve a pending prompt request (atomic: delete request, create prompt)' @@ -518,6 +519,13 @@ complete -c mcpctl -n "__fish_seen_subcommand_from chat" -l allow-tool -d 'Restr complete -c mcpctl -n "__fish_seen_subcommand_from chat" -l extra -d 'Provider-specific knob k=v (repeatable)' -x complete -c mcpctl -n "__fish_seen_subcommand_from chat" -l no-stream -d 'Disable SSE streaming (single JSON response)' +# chat-llm options +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -s m -l message -d 'One-shot: send a single message and exit (no REPL)' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l system -d 'Optional system prompt' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l temperature -d 'Sampling temperature (0..2)' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l max-tokens -d 'Maximum tokens in the assistant reply' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l no-stream -d 'Disable SSE streaming (single JSON response)' + # console options complete -c mcpctl -n "__fish_seen_subcommand_from console" -l stdin-mcp -d 'Run inspector as MCP server over stdin/stdout (for Claude)' complete -c mcpctl -n "__fish_seen_subcommand_from console" -l audit -d 'Browse audit events from mcpd' diff --git a/scripts/generate-completions.ts b/scripts/generate-completions.ts index a0941a1..c8d61fc 100644 --- a/scripts/generate-completions.ts +++ b/scripts/generate-completions.ts @@ -920,6 +920,20 @@ function emitBashCase(emit: (s: string) => void, cmd: CmdInfo, root: CmdInfo): v return; } + // chat-llm: first arg is LLM name + if (name === 'chat-llm') { + emit(` ${name})`); + emit(' if [[ $((cword - subcmd_pos)) -eq 1 ]]; then'); + emit(' local names'); + emit(' names=$(_mcpctl_resource_names "llms")'); + emit(` COMPREPLY=($(compgen -W "$names ${optFlags}" -- "$cur"))`); + emit(' else'); + emit(` COMPREPLY=($(compgen -W "${optFlags}" -- "$cur"))`); + emit(' fi'); + emit(' return ;;'); + return; + } + // console: first arg is project name if (name === 'console') { emit(` ${name})`); diff --git a/src/cli/src/commands/chat-llm.ts b/src/cli/src/commands/chat-llm.ts new file mode 100644 index 0000000..b2c1d22 --- /dev/null +++ b/src/cli/src/commands/chat-llm.ts @@ -0,0 +1,271 @@ +/** + * `mcpctl chat-llm ` — stateless chat with any registered LLM. + * + * Distinct from `mcpctl chat `: + * - No threads, no history, no tools, no project prompts. + * - Just an OpenAI chat-completions round-trip per turn. + * - Works for both kinds of mcpd-registered LLMs: + * * `kind=public` — direct upstream call (existing behavior). + * * `kind=virtual` — relayed through the publishing mcplocal's SSE + * channel (the v1 virtual-LLM feature). + * + * The CLI doesn't need to know which kind the LLM is; mcpd's + * `/api/v1/llms/:name/infer` route branches on `kind` server-side. + */ +import { Command } from 'commander'; +import http from 'node:http'; +import https from 'node:https'; +import readline from 'node:readline'; +import type { ApiClient } from '../api-client.js'; +import { + formatStats, + installStatusBar, + newPhase, + recordDelta, + STDERR_IS_TTY, + styleStats, + type PhaseStats, + type StatusBar, +} from './chat.js'; + +const STREAM_TIMEOUT_MS = 600_000; + +export interface ChatLlmCommandDeps { + client: ApiClient; + baseUrl: string; + token?: string | undefined; + log: (...args: unknown[]) => void; +} + +export function createChatLlmCommand(deps: ChatLlmCommandDeps): Command { + return new Command('chat-llm') + .description('Stateless chat with any registered LLM (public or virtual). No threads, no tools.') + .argument('', 'LLM name (see `mcpctl get llm`)') + .option('-m, --message ', 'One-shot: send a single message and exit (no REPL)') + .option('--system ', 'Optional system prompt') + .option('--temperature ', 'Sampling temperature (0..2)', parseFloat) + .option('--max-tokens ', 'Maximum tokens in the assistant reply', parseFloatInt) + .option('--no-stream', 'Disable SSE streaming (single JSON response)') + .action(async (name: string, opts: ChatLlmOpts) => { + await printHeader(deps, name, opts.system); + if (opts.message !== undefined) { + await runOneShot(deps, name, opts); + return; + } + await runRepl(deps, name, opts); + }); +} + +interface ChatLlmOpts { + message?: string; + system?: string; + temperature?: number; + maxTokens?: number; + stream?: boolean; +} + +interface LlmInfo { + name: string; + type: string; + model: string; + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; +} + +async function printHeader(deps: ChatLlmCommandDeps, name: string, systemPrompt?: string): Promise { + let info: LlmInfo; + try { + info = await deps.client.get(`/api/v1/llms/${encodeURIComponent(name)}`); + } catch (err) { + process.stderr.write(`(could not fetch LLM metadata: ${(err as Error).message})\n`); + return; + } + const sep = '─'.repeat(60); + const out = (s: string): void => { process.stderr.write(`${styleStats(s)}\n`); }; + out(sep); + out(`LLM: ${info.name} ${info.type} → ${info.model}`); + out(`Kind: ${info.kind} Status: ${info.status}`); + if (systemPrompt !== undefined) { + out(`System: ${systemPrompt.slice(0, 120)}${systemPrompt.length > 120 ? '…' : ''}`); + } + out(sep); +} + +async function runOneShot(deps: ChatLlmCommandDeps, name: string, opts: ChatLlmOpts): Promise { + const messages = buildMessages([], opts.system, opts.message ?? ''); + const bar = opts.stream === false ? null : installStatusBar(); + try { + if (opts.stream === false) { + const reply = await postNonStream(deps, name, messages, opts); + process.stdout.write(`${reply}\n`); + } else { + await streamOnce(deps, name, messages, opts, bar); + } + } finally { + bar?.teardown(); + } +} + +async function runRepl(deps: ChatLlmCommandDeps, name: string, opts: ChatLlmOpts): Promise { + const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); + const ask = (q: string): Promise => new Promise((resolve) => rl.question(q, resolve)); + const history: Array<{ role: 'user' | 'assistant'; content: string }> = []; + + const bar = opts.stream === false ? null : installStatusBar(); + process.stderr.write(`Stateless chat with LLM '${name}'. Ctrl-D to exit.\n`); + + try { + while (true) { + let line: string; + try { line = await ask('> '); } catch { break; } + if (line === '') continue; + + const messages = buildMessages(history, opts.system, line); + try { + let reply: string; + if (opts.stream === false) { + reply = await postNonStream(deps, name, messages, opts); + process.stdout.write(`${reply}\n`); + } else { + reply = await streamOnce(deps, name, messages, opts, bar); + process.stdout.write('\n'); + } + history.push({ role: 'user', content: line }); + history.push({ role: 'assistant', content: reply }); + } catch (err) { + process.stderr.write(`error: ${(err as Error).message}\n`); + } + } + rl.close(); + } finally { + bar?.teardown(); + } +} + +function buildMessages( + history: Array<{ role: 'user' | 'assistant'; content: string }>, + system: string | undefined, + user: string, +): Array<{ role: 'system' | 'user' | 'assistant'; content: string }> { + const out: Array<{ role: 'system' | 'user' | 'assistant'; content: string }> = []; + if (system !== undefined && system !== '') out.push({ role: 'system', content: system }); + out.push(...history); + out.push({ role: 'user', content: user }); + return out; +} + +async function postNonStream( + deps: ChatLlmCommandDeps, + name: string, + messages: Array<{ role: string; content: string }>, + opts: ChatLlmOpts, +): Promise { + const body: Record = { messages }; + if (opts.temperature !== undefined) body['temperature'] = opts.temperature; + if (opts.maxTokens !== undefined) body['max_tokens'] = opts.maxTokens; + const res = await deps.client.post<{ + choices?: Array<{ message?: { content?: string } }>; + }>(`/api/v1/llms/${encodeURIComponent(name)}/infer`, body); + return res.choices?.[0]?.message?.content ?? ''; +} + +/** + * Stream a single chat call against /api/v1/llms/:name/infer with stream=true. + * The response is OpenAI-style SSE (`data: `). + * Returns the assembled assistant content. + */ +function streamOnce( + deps: ChatLlmCommandDeps, + name: string, + messages: Array<{ role: string; content: string }>, + opts: ChatLlmOpts, + bar: StatusBar | null, +): Promise { + const url = new URL(`${deps.baseUrl}/api/v1/llms/${encodeURIComponent(name)}/infer`); + const reqBody: Record = { messages, stream: true }; + if (opts.temperature !== undefined) reqBody['temperature'] = opts.temperature; + if (opts.maxTokens !== undefined) reqBody['max_tokens'] = opts.maxTokens; + const payload = JSON.stringify(reqBody); + const stats = { thinking: newPhase(), content: newPhase() } satisfies { thinking: PhaseStats; content: PhaseStats }; + + const TICK_MS = 250; + let timer: NodeJS.Timeout | null = null; + function startTicker(): void { + if (timer !== null || bar === null) return; + timer = setInterval(() => bar.update(formatStats(stats, true)), TICK_MS); + } + function stopTicker(): void { + if (timer !== null) { clearInterval(timer); timer = null; } + } + + return new Promise((resolve, reject) => { + let assistant = ''; + const driver = url.protocol === 'https:' ? https : http; + const req = driver.request({ + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + timeout: STREAM_TIMEOUT_MS, + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream', + ...(deps.token !== undefined ? { Authorization: `Bearer ${deps.token}` } : {}), + }, + }, (res) => { + const status = res.statusCode ?? 0; + if (status >= 400) { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => reject(new Error(`HTTP ${String(status)}: ${Buffer.concat(chunks).toString('utf-8')}`))); + return; + } + let buf = ''; + res.setEncoding('utf-8'); + res.on('data', (chunk: string) => { + buf += chunk; + let nl: number; + while ((nl = buf.indexOf('\n\n')) !== -1) { + const frame = buf.slice(0, nl); + buf = buf.slice(nl + 2); + for (const line of frame.split('\n')) { + if (!line.startsWith('data: ')) continue; + const data = line.slice(6); + if (data === '[DONE]') continue; + try { + const parsed = JSON.parse(data) as { choices?: Array<{ delta?: { content?: string } }> }; + const piece = parsed.choices?.[0]?.delta?.content; + if (typeof piece === 'string' && piece !== '') { + recordDelta(stats.content, piece); + process.stdout.write(piece); + assistant += piece; + startTicker(); + } + } catch { + // ignore malformed frames + } + } + } + }); + res.on('end', () => { + stopTicker(); + const final = formatStats(stats, false); + if (final !== '' && STDERR_IS_TTY) process.stderr.write(`\n${styleStats(`(${final})`)}`); + else if (final !== '') process.stderr.write(`\n(${final})`); + if (bar !== null && final !== '') bar.update(final); + resolve(assistant); + }); + res.on('error', (err) => { stopTicker(); reject(err); }); + }); + req.on('error', (err) => { stopTicker(); reject(err); }); + req.on('timeout', () => { stopTicker(); req.destroy(); reject(new Error('chat-llm stream timed out')); }); + req.write(payload); + req.end(); + }); +} + +function parseFloatInt(value: string): number { + const n = Number(value); + if (!Number.isInteger(n)) throw new Error(`expected integer, got '${value}'`); + return n; +} diff --git a/src/cli/src/commands/chat.ts b/src/cli/src/commands/chat.ts index e302cb6..edece9c 100644 --- a/src/cli/src/commands/chat.ts +++ b/src/cli/src/commands/chat.ts @@ -525,24 +525,24 @@ interface ChatStreamFrame { // ANSI codes for the reasoning sidebar. Dim + italic visually separates // reasoning ("the model is thinking") from final assistant content. We only // emit the codes when stderr is a TTY — piping to a file should stay clean. -const ANSI_DIM_ITALIC = '\x1b[2;3m'; -const ANSI_DIM = '\x1b[2m'; -const ANSI_RESET = '\x1b[0m'; -const STDERR_IS_TTY = process.stderr.isTTY === true; -function styleThinking(s: string): string { +export const ANSI_DIM_ITALIC = '\x1b[2;3m'; +export const ANSI_DIM = '\x1b[2m'; +export const ANSI_RESET = '\x1b[0m'; +export const STDERR_IS_TTY = process.stderr.isTTY === true; +export function styleThinking(s: string): string { return STDERR_IS_TTY ? `${ANSI_DIM_ITALIC}${s}${ANSI_RESET}` : s; } -function styleStats(s: string): string { +export function styleStats(s: string): string { return STDERR_IS_TTY ? `${ANSI_DIM}${s}${ANSI_RESET}` : s; } -interface PhaseStats { +export interface PhaseStats { words: number; firstMs: number; lastMs: number; } -function newPhase(): PhaseStats { return { words: 0, firstMs: 0, lastMs: 0 }; } -function recordDelta(p: PhaseStats, delta: string): void { +export function newPhase(): PhaseStats { return { words: 0, firstMs: 0, lastMs: 0 }; } +export function recordDelta(p: PhaseStats, delta: string): void { const now = Date.now(); if (p.firstMs === 0) p.firstMs = now; p.lastMs = now; @@ -558,7 +558,7 @@ function formatPhase(label: string, p: PhaseStats): string | null { const rate = p.words / sec; return `${label}${String(p.words)}w · ${rate.toFixed(1)} w/s · ${sec.toFixed(1)}s`; } -function formatStats(s: { thinking: PhaseStats; content: PhaseStats }, partial: boolean): string { +export function formatStats(s: { thinking: PhaseStats; content: PhaseStats }, partial: boolean): string { const parts: string[] = []; const c = formatPhase('', s.content); if (c !== null) parts.push(c); @@ -588,12 +588,12 @@ function formatStats(s: { thinking: PhaseStats; content: PhaseStats }, partial: * a foreign terminal in a half-locked state if Ctrl-C / uncaught exception * fires mid-stream. */ -interface StatusBar { +export interface StatusBar { update(text: string): void; teardown(): void; } -function installStatusBar(): StatusBar | null { +export function installStatusBar(): StatusBar | null { const out = process.stdout; if (!out.isTTY) return null; const initialRows = out.rows; diff --git a/src/cli/src/commands/get.ts b/src/cli/src/commands/get.ts index b56ec8d..c55f572 100644 --- a/src/cli/src/commands/get.ts +++ b/src/cli/src/commands/get.ts @@ -132,10 +132,16 @@ interface LlmRow { url: string; description: string; apiKeyRef: { name: string; key: string } | null; + // Virtual-provider lifecycle (optional for backward compat with older + // mcpd responses that predate the kind/status columns). + kind?: 'public' | 'virtual'; + status?: 'active' | 'inactive' | 'hibernating'; } const llmColumns: Column[] = [ { header: 'NAME', key: 'name' }, + { header: 'KIND', key: (r) => r.kind ?? 'public', width: 8 }, + { header: 'STATUS', key: (r) => r.status ?? 'active', width: 12 }, { header: 'TYPE', key: 'type', width: 12 }, { header: 'MODEL', key: 'model', width: 28 }, { header: 'TIER', key: 'tier', width: 8 }, diff --git a/src/cli/src/index.ts b/src/cli/src/index.ts index 6a0485f..4f54215 100644 --- a/src/cli/src/index.ts +++ b/src/cli/src/index.ts @@ -19,6 +19,7 @@ import { createPatchCommand } from './commands/patch.js'; import { createConsoleCommand } from './commands/console/index.js'; import { createCacheCommand } from './commands/cache.js'; import { createChatCommand } from './commands/chat.js'; +import { createChatLlmCommand } from './commands/chat-llm.js'; import { createMigrateCommand } from './commands/migrate.js'; import { createRotateCommand } from './commands/rotate.js'; import { ApiClient, ApiError } from './api-client.js'; @@ -241,6 +242,13 @@ export function createProgram(): Command { log: (...args) => console.log(...args), })); + program.addCommand(createChatLlmCommand({ + client, + baseUrl, + ...(creds?.token !== undefined ? { token: creds.token } : {}), + log: (...args) => console.log(...args), + })); + program.addCommand(createPatchCommand({ client, log: (...args) => console.log(...args), -- 2.49.1 From 866f6abc88a788ab6da836afd29b394da25c2c54 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 14:28:43 +0100 Subject: [PATCH 6/6] feat: virtual-LLM smoke test + docs (v1 Stage 6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Final stage of v1. Smoke (mcplocal/tests/smoke/virtual-llm.smoke.test.ts): - Spins an in-process LlmProvider that returns canned content. - Runs the registrar against the live mcpd in fulldeploy. - Asserts: row appears with kind=virtual / status=active, infer through /api/v1/llms//infer comes back through the SSE relay with the provider's content + finish_reason, and a 503 appears immediately after registrar.stop() (publisher offline). - Times out / cleanup paths idempotent so re-runs against the same cluster don't litter rows. The 90-s heartbeat-stale flip and 4-h GC are unit-tested — too slow for smoke. Docs: - New docs/virtual-llms.md: when to use this vs creating a regular Llm row, how to opt-in via publish: true, the lifecycle table, the inference-relay sequence, the v1 streaming caveat, the v2-v5 roadmap, and the full /api/v1/llms/_provider-* surface. - agents.md cross-links virtual-llms.md alongside personalities/chat. - README's Agents section gains a "Virtual LLMs" subsection. Workspace suite: 2043/2043 (smoke files run separately). v1 closes. Stage roadmap (each its own future PR): v2 wake-on-demand · v3 virtual agents · v4 LB pool · v5 task queue Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 26 +++ docs/agents.md | 4 + docs/virtual-llms.md | 171 ++++++++++++++ .../tests/smoke/virtual-llm.smoke.test.ts | 209 ++++++++++++++++++ 4 files changed, 410 insertions(+) create mode 100644 docs/virtual-llms.md create mode 100644 src/mcplocal/tests/smoke/virtual-llm.smoke.test.ts diff --git a/README.md b/README.md index a6b1cb9..e12537e 100644 --- a/README.md +++ b/README.md @@ -571,6 +571,32 @@ For binding prompts to personalities and the API surface, see prompt editing — paste a session token (`mcpctl auth login`) or PAT to log in. +### Virtual LLMs + +A user's local LLM (`vllm-local`, Ollama, …) can publish itself into +mcpd's `Llm` registry so anyone authorized sees it under `mcpctl get llm` +and can chat with it via `mcpctl chat-llm `. Inference is relayed +through the publishing mcplocal's SSE control channel — mcpd never holds +the local URL or API key. + +```fish +# In ~/.mcpctl/config.json, opt the provider in with `publish: true`: +# { "name": "vllm-local", "type": "openai", "model": "...", "publish": true } +systemctl --user restart mcplocal + +mcpctl get llm +# NAME KIND STATUS TYPE MODEL TIER ID +# qwen3-thinking public active openai qwen3-thinking fast ... +# vllm-local virtual active openai Qwen/Qwen2.5-7B-Instruct-AWQ fast ... + +mcpctl chat-llm vllm-local +> hello? +``` + +Lifecycle: 30 s heartbeats, 90 s heartbeat-stale → inactive, 4 h +inactive → auto-deleted. A reconnecting mcplocal adopts the same row +via a sticky `providerSessionId`. Full design: [docs/virtual-llms.md](docs/virtual-llms.md). + ## Commands ```bash diff --git a/docs/agents.md b/docs/agents.md index e86df3d..b14688f 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -201,4 +201,8 @@ mcpctl chat reviewer - [personalities.md](./personalities.md) — named overlays of prompts on top of an agent. Same agent, different prompt bundles, picked per-turn via `--personality ` or `agent.defaultPersonality`. +- [virtual-llms.md](./virtual-llms.md) — local LLMs (e.g. `vllm-local`) + publishing themselves into `mcpctl get llm` so anyone can chat with + them via `mcpctl chat-llm `. Inference is relayed through the + publishing mcplocal — mcpd never holds the local URL or key. - [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags. diff --git a/docs/virtual-llms.md b/docs/virtual-llms.md new file mode 100644 index 0000000..008746b --- /dev/null +++ b/docs/virtual-llms.md @@ -0,0 +1,171 @@ +# Virtual LLMs + +A **virtual LLM** is an `Llm` row in mcpd that's *registered by an mcplocal +client* rather than created by hand with `mcpctl create llm`. Inference for +a virtual LLM is relayed back through the publishing mcplocal's SSE control +channel — **mcpd never needs to know the local URL or hold its API key**. + +When the publishing mcplocal goes away (or the user shuts down their +laptop) the row decays: `active → inactive` after 90 s without a +heartbeat, then deleted after 4 h of inactivity. A reconnecting mcplocal +adopts the same row using a sticky `providerSessionId` it persisted at +first publish. + +## When to use this + +- **Local model on a developer laptop** that you want everyone on the + team to be able to chat with via `mcpctl chat-llm `. The model + doesn't need to be reachable from mcpd's k8s pods — only the user's + mcplocal does (which is already the case because mcplocal pulls + projects from mcpd over HTTPS). +- **Hibernating models** that wake on demand (v2 — see "Roadmap"). +- **Pool of identical models** distributed across user laptops, eligible + for load balancing (v4). + +If your model is reachable from mcpd's k8s pods over LAN/VPN, you don't +need a virtual LLM — just `mcpctl create llm --type openai --url …` +and you're done. + +## Publishing a local provider + +mcplocal's local config (`~/.mcpctl/config.json`) gains a `publish: true` +opt-in per provider: + +```json +{ + "llm": { + "providers": [ + { + "name": "vllm-local", + "type": "openai", + "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", + "url": "http://127.0.0.1:8000/v1", + "tier": "fast", + "publish": true + } + ] + } +} +``` + +Restart mcplocal: + +```fish +systemctl --user restart mcplocal +``` + +The registrar: +1. Reads `~/.mcpctl/credentials` for `mcpdUrl` + bearer token. +2. POSTs to `/api/v1/llms/_provider-register` with the publishable set. +3. Persists the returned `providerSessionId` to + `~/.mcpctl/provider-session` so the next restart adopts the same + mcpd row. +4. Opens the SSE channel at `/api/v1/llms/_provider-stream`. +5. Heartbeats every 30 s. +6. Listens for `event: task` frames and runs them against the local + `LlmProvider`. + +If `~/.mcpctl/credentials` doesn't exist (e.g. you haven't run +`mcpctl auth login`), the registrar logs a warning and skips — +publishing is a best-effort feature, not a boot blocker. + +## Verifying + +```fish +$ mcpctl get llm +NAME KIND STATUS TYPE MODEL TIER KEY ID +qwen3-thinking public active openai qwen3-thinking fast secret://litellm-key/API_KEY cmofx8y7u… +vllm-local virtual active openai Qwen/Qwen2.5-7B-Instruct-AWQ fast - cmoxz12ab… + +$ mcpctl chat-llm vllm-local +───────────────────────────────────────────────────────── +LLM: vllm-local openai → Qwen/Qwen2.5-7B-Instruct-AWQ +Kind: virtual Status: active +───────────────────────────────────────────────────────── +> hello? +Hi! … +``` + +You can also chat with public LLMs the same way: + +```fish +$ mcpctl chat-llm qwen3-thinking +``` + +The CLI doesn't care about `kind` — mcpd's `/api/v1/llms//infer` +route branches on it server-side. + +## Lifecycle in detail + +| State | What it means | +|----------------|-----------------------------------------------------------------------| +| `active` | Heartbeat received within the last 90 s and the SSE channel is open. | +| `inactive` | Either the SSE closed or the heartbeat watchdog tripped. Inference returns 503. | +| `hibernating` | Reserved for v2 (wake-on-demand). v1 never writes this state. | + +Two timers on mcpd run the GC sweep: + +- **90 s** without a heartbeat → flip `active` → `inactive`. +- **4 h** in `inactive` → delete the row entirely. + +A reconnecting mcplocal with the same `providerSessionId` revives every +inactive row it owns; it only orphans rows that fell past the 4-h cutoff. + +## Inference relay + +When mcpd receives `POST /api/v1/llms//infer`: + +1. Look up the row, see `kind=virtual` + `status=active`. +2. Find the open SSE session for that `providerSessionId`. Missing + session → 503. +3. Push a `{ kind: "infer", taskId, llmName, request, streaming }` + task frame onto the SSE. +4. mcplocal pulls, calls `LlmProvider.complete(...)`, and POSTs the + result back to `/api/v1/llms/_provider-task//result`: + - non-streaming: `{ status: 200, body: }` + - streaming: per-chunk `{ chunk: { data, done? } }` + - failure: `{ error: "..." }` +5. mcpd forwards the result/chunks out to the original caller. + +**v1 caveat — streaming granularity**: `LlmProvider.complete()` returns +a finalized `CompletionResult`, not a token stream. Streaming requests +therefore arrive at the caller as a single delta + `[DONE]`. Real +per-token streaming is a v2 concern. + +## Roadmap (later stages) + +- **v2 — Wake-on-demand**: Secret-stored "wake recipe" so mcpd can ask + mcplocal to start a hibernating backend before sending inference. +- **v3 — Virtual agents**: mcplocal publishes its local agent configs + (model + system prompt + sampling defaults) into mcpd's `Agent` table. +- **v4 — LB pool by model**: agents can target a model name instead of + a specific Llm; mcpd picks the healthiest pool member per request. +- **v5 — Task queue**: persisted requests for hibernating/saturated + pools. Workers pull tasks of their model when they come online. + +## API surface (v1) + +``` +POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[] } +GET /api/v1/llms/_provider-stream → SSE channel; require x-mcpctl-provider-session header +POST /api/v1/llms/_provider-heartbeat → { providerSessionId } +POST /api/v1/llms/_provider-task/:id/result + → one of: + { error: "msg" } + { chunk: { data, done? } } + { status, body } + +GET /api/v1/llms → list (now includes kind, status, lastHeartbeatAt, inactiveSince) +POST /api/v1/llms//infer → routes through the SSE relay +DELETE /api/v1/llms/ → delete unconditionally (also runs GC's job) +``` + +RBAC piggybacks on `view/edit/create:llms` — no new resource. Publishing +a virtual LLM is morally a `create:llms` operation. + +## See also + +- [agents.md](./agents.md) — what an Agent is and how it pins to an LLM. +- [chat.md](./chat.md) — `mcpctl chat ` (full agent flow). +- The CLI: `mcpctl chat-llm ` (this doc) is the stateless + counterpart for raw LLM chat. diff --git a/src/mcplocal/tests/smoke/virtual-llm.smoke.test.ts b/src/mcplocal/tests/smoke/virtual-llm.smoke.test.ts new file mode 100644 index 0000000..1586a1a --- /dev/null +++ b/src/mcplocal/tests/smoke/virtual-llm.smoke.test.ts @@ -0,0 +1,209 @@ +/** + * Smoke tests: virtual-LLM register → infer relay → cleanup against a live + * mcpd. Uses an in-process LlmProvider (returns canned content) so we + * exercise the SSE control plane and the kind=virtual infer branch + * without depending on a real upstream model. + * + * The 90-s heartbeat-stale flip and 4-h auto-deletion are covered by unit + * tests (mcpd virtual-llm-service.test.ts); waiting > 90 s in smoke would + * triple the suite duration. + */ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import http from 'node:http'; +import https from 'node:https'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, +} from '../../src/providers/registrar.js'; +import type { LlmProvider, CompletionResult } from '../../src/providers/types.js'; + +const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; +const SUFFIX = Date.now().toString(36); +const PROVIDER_NAME = `smoke-virtual-${SUFFIX}`; + +function makeFakeProvider(name: string, content: string): LlmProvider { + return { + name, + async complete(): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +function healthz(url: string, timeoutMs = 5000): Promise { + return new Promise((resolve) => { + const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`); + const driver = parsed.protocol === 'https:' ? https : http; + const req = driver.get( + { + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname, + timeout: timeoutMs, + }, + (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); }, + ); + req.on('error', () => resolve(false)); + req.on('timeout', () => { req.destroy(); resolve(false); }); + }); +} + +function readToken(): string | null { + try { + const home = process.env.HOME ?? ''; + const path = `${home}/.mcpctl/credentials`; + // eslint-disable-next-line @typescript-eslint/no-require-imports + const fs = require('node:fs') as typeof import('node:fs'); + if (!fs.existsSync(path)) return null; + const raw = fs.readFileSync(path, 'utf-8'); + const parsed = JSON.parse(raw) as { token?: string }; + return parsed.token ?? null; + } catch { + return null; + } +} + +interface HttpResponse { status: number; body: string } + +function httpRequest(method: string, urlStr: string, body: unknown): Promise { + return new Promise((resolve, reject) => { + const tokenRaw = readToken(); + const parsed = new URL(urlStr); + const driver = parsed.protocol === 'https:' ? https : http; + const headers: Record = { + Accept: 'application/json', + ...(body !== undefined ? { 'Content-Type': 'application/json' } : {}), + ...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}), + }; + const req = driver.request({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname + parsed.search, + method, + headers, + timeout: 30_000, + }, (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => { + resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') }); + }); + }); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); }); + if (body !== undefined) req.write(JSON.stringify(body)); + req.end(); + }); +} + +let mcpdUp = false; +let registrar: VirtualLlmRegistrar | null = null; +let tempDir: string; + +describe('virtual-LLM smoke', () => { + beforeAll(async () => { + mcpdUp = await healthz(MCPD_URL); + if (!mcpdUp) { + // eslint-disable-next-line no-console + console.warn(`\n ○ virtual-llm smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`); + return; + } + if (readToken() === null) { + mcpdUp = false; + // eslint-disable-next-line no-console + console.warn('\n ○ virtual-llm smoke: skipped — no ~/.mcpctl/credentials.\n'); + return; + } + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-virtual-llm-smoke-')); + }, 20_000); + + afterAll(async () => { + if (registrar !== null) registrar.stop(); + if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true }); + // Best-effort cleanup of the row in case the disconnect didn't finish + // before mcpd's heartbeat watchdog ticks. Idempotent. + if (mcpdUp) { + const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + if (list.status === 200) { + const rows = JSON.parse(list.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === PROVIDER_NAME); + if (row !== undefined) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined); + } + } + } + }); + + it('registrar publishes the provider and mcpd lists it as kind=virtual / status=active', async () => { + if (!mcpdUp) return; + const token = readToken(); + if (token === null) return; + const published: RegistrarPublishedProvider[] = [ + { provider: makeFakeProvider(PROVIDER_NAME, 'hi from smoke'), type: 'openai', model: 'fake-smoke', tier: 'fast' }, + ]; + registrar = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: published, + sessionFilePath: join(tempDir, 'session'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + expect(registrar.getSessionId()).not.toBeNull(); + // Give the SSE handshake + register a moment to settle on mcpd's side. + await new Promise((r) => setTimeout(r, 400)); + + const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + expect(res.status).toBe(200); + const rows = JSON.parse(res.body) as Array<{ name: string; kind: string; status: string; type: string; model: string }>; + const row = rows.find((r) => r.name === PROVIDER_NAME); + expect(row, `${PROVIDER_NAME} must be present`).toBeDefined(); + expect(row!.kind).toBe('virtual'); + expect(row!.status).toBe('active'); + expect(row!.type).toBe('openai'); + expect(row!.model).toBe('fake-smoke'); + }, 30_000); + + it('mcpd routes /api/v1/llms//infer back through the SSE relay to the fake provider', async () => { + if (!mcpdUp) return; + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/llms/${PROVIDER_NAME}/infer`, { + messages: [{ role: 'user', content: 'say something' }], + }); + expect(res.status).toBe(200); + const body = JSON.parse(res.body) as { + choices?: Array<{ message?: { content?: string }; finish_reason?: string }>; + usage?: { total_tokens?: number }; + }; + expect(body.choices?.[0]?.message?.content).toBe('hi from smoke'); + expect(body.choices?.[0]?.finish_reason).toBe('stop'); + expect(body.usage?.total_tokens).toBe(5); + }, 30_000); + + it('returns 503 with a clear error when the publisher disconnects mid-session', async () => { + if (!mcpdUp) return; + if (registrar !== null) { + registrar.stop(); + registrar = null; + } + // Immediately after stop(), the SSE socket closes and mcpd's + // unbindSession flips the row to inactive. Inference should 503. + await new Promise((r) => setTimeout(r, 300)); + + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/llms/${PROVIDER_NAME}/infer`, { + messages: [{ role: 'user', content: 'still there?' }], + }); + expect(res.status).toBe(503); + expect(res.body).toMatch(/publisher offline|inactive/); + }, 30_000); +}); -- 2.49.1