From 97174f450f3ec066f2521ac7ecb53a025444554f Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 14:20:54 +0100 Subject: [PATCH] feat(mcplocal): virtual-LLM registrar (v1 Stage 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The mcplocal counterpart to mcpd's VirtualLlmService. After this stage, flipping \`publish: true\` on a provider in ~/.mcpctl/config.json makes the provider show up in mcpctl get llm with kind=virtual the next time mcplocal restarts; running an inference against it relays through this client back to the local LlmProvider. Config: - LlmProviderFileEntry gains optional \`publish: boolean\` (default false, so existing setups don't change). Registrar (new file: providers/registrar.ts): - start(): if any provider is opted-in, POSTs to /api/v1/llms/_provider-register with the publishable set, persists the returned providerSessionId to ~/.mcpctl/provider-session for sticky reconnects, then opens the SSE control channel and starts a 30-s heartbeat ticker. - SSE listener parses event/data lines from text/event-stream frames. task frames trigger handleInferTask: convert OpenAI body to CompletionOptions, call provider.complete(), POST the result back as either { status, body } (non-streaming) or two chunk POSTs (streaming: one delta + a [DONE] marker). - Disconnect → exponential backoff reconnect from 5 s up to 60 s. On successful reconnect the persisted sessionId revives the same Llm rows in mcpd (mcpd flips them back to active on heartbeat). - stop() destroys the SSE socket and clears the timer; cleanly handed off from main.ts's existing shutdown handler. Wired into mcplocal main.ts via maybeStartVirtualLlmRegistrar: - Filters opted-in providers, looks up their LlmProvider instances in the registry. - Reads ~/.mcpctl/credentials for mcpdUrl + bearer; absence is a best-effort skip (logs a warning, returns null) — never a boot blocker. v1 caveat documented in the file header: LlmProvider returns a finalized CompletionResult, not a token stream, so streaming requests get a single delta chunk + [DONE]. Real per-token streaming is a v2 concern. Tests: 5 new in tests/registrar.test.ts using a tiny in-process HTTP server. Cover: no-op when nothing opted-in, register POST + sticky sessionId persistence, sticky reconnect from disk, heartbeat ticker fires at the configured interval, register HTTP error surfaces. Workspace suite: 2043/2043 across 152 files (was 2006/149, +5 new tests + the new file gets discovered). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/mcplocal/src/http/config.ts | 8 + src/mcplocal/src/main.ts | 85 ++++- src/mcplocal/src/providers/registrar.ts | 409 ++++++++++++++++++++++++ src/mcplocal/tests/registrar.test.ts | 244 ++++++++++++++ 4 files changed, 745 insertions(+), 1 deletion(-) create mode 100644 src/mcplocal/src/providers/registrar.ts create mode 100644 src/mcplocal/tests/registrar.test.ts diff --git a/src/mcplocal/src/http/config.ts b/src/mcplocal/src/http/config.ts index 9be7fb4..d70f9c6 100644 --- a/src/mcplocal/src/http/config.ts +++ b/src/mcplocal/src/http/config.ts @@ -72,6 +72,14 @@ export interface LlmProviderFileEntry { * itself can't be reached). */ failoverFor?: string; + /** + * Opt in to publishing this local provider into mcpd as a virtual Llm + * row (see docs/virtual-llms.md). When true, the mcplocal registrar + * announces the provider to mcpd at startup, opens an SSE control + * channel, and relays inference tasks back to this local provider. + * Default: false — existing setups don't change behavior. + */ + publish?: boolean; } export interface ProjectLlmOverride { diff --git a/src/mcplocal/src/main.ts b/src/mcplocal/src/main.ts index f9e78e8..df631a9 100644 --- a/src/mcplocal/src/main.ts +++ b/src/mcplocal/src/main.ts @@ -8,11 +8,15 @@ import { StdioUpstream } from './upstream/stdio.js'; import { HttpUpstream } from './upstream/http.js'; import { createHttpServer } from './http/server.js'; import { loadHttpConfig, loadLlmProviders } from './http/config.js'; -import type { HttpConfig } from './http/config.js'; +import type { HttpConfig, LlmProviderFileEntry } from './http/config.js'; import { createProvidersFromConfig } from './llm-config.js'; import { createSecretStore } from '@mcpctl/shared'; import type { ProviderRegistry } from './providers/registry.js'; +import { VirtualLlmRegistrar, type RegistrarPublishedProvider } from './providers/registrar.js'; import { startWatchers, stopWatchers, reloadStages } from './proxymodel/watcher.js'; +import { existsSync, readFileSync as readFileSyncNs } from 'node:fs'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; interface ParsedArgs { configPath: string | undefined; @@ -144,6 +148,11 @@ export async function main(argv: string[] = process.argv): Promise { await reloadStages(); startWatchers(); + // Virtual-LLM registrar: publish opted-in providers (`publish: true`) + // into mcpd's Llm registry. Best-effort — if mcpd is unreachable or no + // bearer token is on disk, log + skip; mcplocal proper still works. + const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries); + // Graceful shutdown let shuttingDown = false; const shutdown = async () => { @@ -151,6 +160,7 @@ export async function main(argv: string[] = process.argv): Promise { shuttingDown = true; stopWatchers(); + registrar?.stop(); providerRegistry.disposeAll(); server.stop(); if (httpServer) { @@ -177,3 +187,76 @@ if (isMain) { process.exit(1); }); } + +/** + * Start the virtual-LLM registrar if any local provider has `publish: true` + * AND we have a bearer token on disk. Returns the registrar instance for + * lifecycle teardown, or null when nothing was started. All failure paths + * log a warning and resolve null — the registrar is best-effort, not a + * boot blocker. + */ +async function maybeStartVirtualLlmRegistrar( + providerRegistry: ProviderRegistry, + llmEntries: LlmProviderFileEntry[], +): Promise { + const opted = llmEntries.filter((e) => e.publish === true); + if (opted.length === 0) return null; + + const published: RegistrarPublishedProvider[] = []; + for (const entry of opted) { + const provider = providerRegistry.get(entry.name); + if (provider === undefined) { + process.stderr.write(`virtual-llm registrar: provider '${entry.name}' opted-in but not registered locally; skipping\n`); + continue; + } + const item: RegistrarPublishedProvider = { + provider, + type: entry.type, + model: entry.model ?? entry.name, + }; + if (entry.tier !== undefined) item.tier = entry.tier; + published.push(item); + } + if (published.length === 0) return null; + + // Resolve mcpd URL + bearer. Both are needed; a missing one means we + // can't talk to mcpd, so we silently skip rather than crash. + const credsPath = join(homedir(), '.mcpctl', 'credentials'); + if (!existsSync(credsPath)) { + process.stderr.write(`virtual-llm registrar: ~/.mcpctl/credentials missing — skipping (run \`mcpctl auth login\` to publish virtual LLMs)\n`); + return null; + } + let mcpdUrl: string; + let token: string; + try { + const parsed = JSON.parse(readFileSyncNs(credsPath, 'utf-8')) as { mcpdUrl?: string; token?: string }; + if (typeof parsed.mcpdUrl !== 'string' || typeof parsed.token !== 'string') { + process.stderr.write(`virtual-llm registrar: credentials missing mcpdUrl/token — skipping\n`); + return null; + } + mcpdUrl = parsed.mcpdUrl; + token = parsed.token; + } catch (err) { + process.stderr.write(`virtual-llm registrar: failed to read credentials: ${(err as Error).message}\n`); + return null; + } + + const registrar = new VirtualLlmRegistrar({ + mcpdUrl, + token, + publishedProviders: published, + sessionFilePath: join(homedir(), '.mcpctl', 'provider-session'), + log: { + info: (msg) => process.stderr.write(`${msg}\n`), + warn: (msg) => process.stderr.write(`${msg}\n`), + error: (msg) => process.stderr.write(`${msg}\n`), + }, + }); + try { + await registrar.start(); + } catch (err) { + process.stderr.write(`virtual-llm registrar: start failed: ${(err as Error).message}\n`); + return null; + } + return registrar; +} diff --git a/src/mcplocal/src/providers/registrar.ts b/src/mcplocal/src/providers/registrar.ts new file mode 100644 index 0000000..120e4c9 --- /dev/null +++ b/src/mcplocal/src/providers/registrar.ts @@ -0,0 +1,409 @@ +/** + * Virtual-LLM registrar — the mcplocal counterpart to mcpd's + * VirtualLlmService. + * + * Lifecycle: + * 1. start() called on mcplocal boot. If no providers are opted-in + * (`publish: true` in config), do nothing. + * 2. POST /api/v1/llms/_provider-register with the publishable set; + * receive a stable providerSessionId. Persist it to + * `~/.mcpctl/provider-session` so reconnects after a crash/restart + * adopt the same row instead of orphaning it. + * 3. Open the SSE channel at /api/v1/llms/_provider-stream with the + * session id in `x-mcpctl-provider-session`. Listen for + * `event: task` frames; for each, call the local provider's + * `complete()` and POST the OpenAI-shaped result back. + * 4. Heartbeat every 30 s. If the SSE drops, exponential backoff + * reconnect from 5 s up to 60 s. + * 5. stop() destroys the SSE socket and clears the timer; mcpd's + * 90-s heartbeat watchdog will then flip our rows to inactive. + * + * v1 caveat: the LlmProvider abstraction returns a finalized + * CompletionResult, not a token stream. We therefore translate + * streaming requests into a single SSE chunk + [DONE]. Real + * per-token streaming is a v2 concern — see docs/virtual-llms.md. + */ +import http from 'node:http'; +import https from 'node:https'; +import { promises as fs } from 'node:fs'; +import { dirname } from 'node:path'; +import type { LlmProvider, CompletionOptions } from './types.js'; + +export interface RegistrarLogger { + info: (msg: string) => void; + warn: (msg: string) => void; + error: (msg: string) => void; +} + +export interface RegistrarPublishedProvider { + provider: LlmProvider; + /** mcpd-side `type` field (openai | anthropic | …). */ + type: string; + /** Model id surfaced under `mcpctl get llm`. */ + model: string; + /** Optional tier (default 'fast'). */ + tier?: 'fast' | 'heavy'; + /** Optional human-readable description for `mcpctl get llm`. */ + description?: string; +} + +export interface RegistrarOptions { + mcpdUrl: string; + token: string; + publishedProviders: RegistrarPublishedProvider[]; + /** Where to persist the providerSessionId so reconnects are sticky. */ + sessionFilePath: string; + log: RegistrarLogger; + /** Override knobs for tests (ms). */ + heartbeatIntervalMs?: number; + reconnectBaseMs?: number; + reconnectMaxMs?: number; +} + +const DEFAULT_HEARTBEAT_MS = 30_000; +const DEFAULT_RECONNECT_BASE_MS = 5_000; +const DEFAULT_RECONNECT_MAX_MS = 60_000; +const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; + +interface InferTask { + kind: 'infer'; + taskId: string; + llmName: string; + request: { + messages: Array<{ role: 'system' | 'user' | 'assistant' | 'tool'; content: string }>; + temperature?: number; + max_tokens?: number; + model?: string; + }; + streaming: boolean; +} + +type TaskFrame = InferTask | { kind: 'wake'; taskId: string; llmName: string }; + +export class VirtualLlmRegistrar { + private sessionId: string | null = null; + private heartbeatTimer: NodeJS.Timeout | null = null; + private sseRequest: http.ClientRequest | null = null; + private reconnectAttempt = 0; + private stopped = false; + + constructor(private readonly opts: RegistrarOptions) {} + + /** Bring the registrar online. Idempotent — calling twice is a no-op. */ + async start(): Promise { + if (this.opts.publishedProviders.length === 0) { + this.opts.log.info('virtual-llm registrar: nothing to publish (no provider has `publish: true`)'); + return; + } + this.stopped = false; + this.sessionId = await this.loadStickySessionId(); + await this.register(); + this.startHeartbeat(); + this.openSseStream(); + } + + /** Tear down the registrar. Safe to call multiple times. */ + stop(): void { + this.stopped = true; + if (this.heartbeatTimer !== null) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + if (this.sseRequest !== null) { + this.sseRequest.destroy(); + this.sseRequest = null; + } + } + + /** Test/inspection helper — avoids exposing internals broadly. */ + getSessionId(): string | null { + return this.sessionId; + } + + private async loadStickySessionId(): Promise { + try { + const raw = await fs.readFile(this.opts.sessionFilePath, 'utf-8'); + const id = raw.trim(); + return id === '' ? null : id; + } catch { + return null; + } + } + + private async saveStickySessionId(id: string): Promise { + try { + await fs.mkdir(dirname(this.opts.sessionFilePath), { recursive: true }); + await fs.writeFile(this.opts.sessionFilePath, id, 'utf-8'); + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: failed to persist session id: ${(err as Error).message}`); + } + } + + private async register(): Promise { + const body: Record = { + providers: this.opts.publishedProviders.map((p) => ({ + name: p.provider.name, + type: p.type, + model: p.model, + ...(p.tier !== undefined ? { tier: p.tier } : {}), + ...(p.description !== undefined ? { description: p.description } : {}), + })), + }; + if (this.sessionId !== null) body['providerSessionId'] = this.sessionId; + + const res = await postJson( + this.urlFor('/api/v1/llms/_provider-register'), + body, + this.opts.token, + ); + if (res.statusCode !== 201) { + throw new Error(`provider-register HTTP ${String(res.statusCode)}: ${res.body}`); + } + const parsed = JSON.parse(res.body) as { providerSessionId: string }; + this.sessionId = parsed.providerSessionId; + await this.saveStickySessionId(this.sessionId); + this.opts.log.info( + `virtual-llm registrar: published ${String(this.opts.publishedProviders.length)} provider(s); sessionId=${this.sessionId}`, + ); + } + + private startHeartbeat(): void { + const intervalMs = this.opts.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_MS; + this.heartbeatTimer = setInterval(() => { + if (this.sessionId === null) return; + void this.heartbeatOnce(); + }, intervalMs); + } + + private async heartbeatOnce(): Promise { + if (this.sessionId === null) return; + try { + const res = await postJson( + this.urlFor('/api/v1/llms/_provider-heartbeat'), + { providerSessionId: this.sessionId }, + this.opts.token, + ); + if (res.statusCode !== 200) { + this.opts.log.warn(`virtual-llm registrar: heartbeat HTTP ${String(res.statusCode)}: ${res.body}`); + } + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: heartbeat failed: ${(err as Error).message}`); + } + } + + private openSseStream(): void { + if (this.stopped || this.sessionId === null) return; + const url = new URL(this.urlFor('/api/v1/llms/_provider-stream')); + const driver = url.protocol === 'https:' ? https : http; + const req = driver.request( + { + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'GET', + headers: { + Accept: 'text/event-stream', + Authorization: `Bearer ${this.opts.token}`, + [PROVIDER_SESSION_HEADER]: this.sessionId, + }, + }, + (res) => { + if (res.statusCode !== 200) { + let body = ''; + res.on('data', (c: Buffer) => { body += c.toString('utf-8'); }); + res.on('end', () => { + this.opts.log.warn(`virtual-llm registrar: SSE HTTP ${String(res.statusCode ?? 0)}: ${body}`); + this.scheduleReconnect(); + }); + return; + } + this.reconnectAttempt = 0; + res.setEncoding('utf-8'); + let buf = ''; + res.on('data', (chunk: string) => { + buf += chunk; + let nl: number; + while ((nl = buf.indexOf('\n\n')) !== -1) { + const frame = buf.slice(0, nl); + buf = buf.slice(nl + 2); + this.handleSseFrame(frame); + } + }); + res.on('end', () => this.scheduleReconnect()); + res.on('error', (err) => { + this.opts.log.warn(`virtual-llm registrar: SSE response error: ${err.message}`); + this.scheduleReconnect(); + }); + }, + ); + req.on('error', (err) => { + this.opts.log.warn(`virtual-llm registrar: SSE request error: ${err.message}`); + this.scheduleReconnect(); + }); + req.end(); + this.sseRequest = req; + } + + private scheduleReconnect(): void { + this.sseRequest = null; + if (this.stopped) return; + const base = this.opts.reconnectBaseMs ?? DEFAULT_RECONNECT_BASE_MS; + const max = this.opts.reconnectMaxMs ?? DEFAULT_RECONNECT_MAX_MS; + const delay = Math.min(max, base * 2 ** this.reconnectAttempt); + this.reconnectAttempt += 1; + this.opts.log.info(`virtual-llm registrar: SSE disconnected, reconnecting in ${String(delay)} ms`); + setTimeout(() => { + if (!this.stopped) this.openSseStream(); + }, delay).unref(); + } + + private handleSseFrame(frame: string): void { + let event = 'message'; + let data = ''; + for (const line of frame.split('\n')) { + if (line.startsWith('event:')) event = line.slice(6).trim(); + else if (line.startsWith('data:')) data += line.slice(5).trim(); + } + if (event !== 'task' || data === '') return; + let task: TaskFrame; + try { + task = JSON.parse(data) as TaskFrame; + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: malformed task frame: ${(err as Error).message}`); + return; + } + if (task.kind === 'infer') { + void this.handleInferTask(task); + return; + } + // Wake tasks are reserved for v2 — acknowledge with an error so mcpd + // surfaces a clean failure rather than waiting forever. + void this.postResult(task.taskId, { error: 'wake task type not implemented in this client (v2)' }); + } + + private async handleInferTask(task: InferTask): Promise { + const published = this.opts.publishedProviders.find((p) => p.provider.name === task.llmName); + if (published === undefined) { + await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` }); + return; + } + try { + const completionOpts: CompletionOptions = { + messages: task.request.messages.map((m) => ({ role: m.role, content: m.content })), + }; + if (task.request.temperature !== undefined) completionOpts.temperature = task.request.temperature; + if (task.request.max_tokens !== undefined) completionOpts.maxTokens = task.request.max_tokens; + if (task.request.model !== undefined) completionOpts.model = task.request.model; + + const result = await published.provider.complete(completionOpts); + const completionBody = openAiCompletionEnvelope(published.model, result, task.request.model); + + if (task.streaming) { + // Single-chunk streaming for v1: emit one delta then a [DONE] marker. + // The server-side relay forwards both as SSE frames to the original + // caller. Real per-token streaming would require LlmProvider.stream(). + const deltaFrame = openAiStreamChunk(published.model, result, task.request.model); + await this.postResult(task.taskId, { chunk: { data: JSON.stringify(deltaFrame) } }); + await this.postResult(task.taskId, { chunk: { data: '[DONE]', done: true } }); + } else { + await this.postResult(task.taskId, { status: 200, body: completionBody }); + } + } catch (err) { + await this.postResult(task.taskId, { error: (err as Error).message }); + } + } + + private async postResult(taskId: string, body: unknown): Promise { + try { + await postJson( + this.urlFor(`/api/v1/llms/_provider-task/${encodeURIComponent(taskId)}/result`), + body as Record, + this.opts.token, + ); + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: result POST failed: ${(err as Error).message}`); + } + } + + private urlFor(path: string): string { + return `${this.opts.mcpdUrl.replace(/\/$/, '')}${path}`; + } +} + +/** Wrap a CompletionResult in the OpenAI chat.completion envelope. */ +function openAiCompletionEnvelope( + modelFromConfig: string, + result: { content: string; finishReason: string; usage: { promptTokens: number; completionTokens: number; totalTokens: number } }, + modelFromRequest: string | undefined, +): unknown { + return { + id: `cmpl-${Math.random().toString(36).slice(2)}`, + object: 'chat.completion', + created: Math.floor(Date.now() / 1000), + model: modelFromRequest ?? modelFromConfig, + choices: [{ + index: 0, + message: { role: 'assistant', content: result.content }, + finish_reason: result.finishReason, + }], + usage: { + prompt_tokens: result.usage.promptTokens, + completion_tokens: result.usage.completionTokens, + total_tokens: result.usage.totalTokens, + }, + }; +} + +/** Single-chunk OpenAI chat.completion.chunk frame for v1 streaming. */ +function openAiStreamChunk( + modelFromConfig: string, + result: { content: string; finishReason: string }, + modelFromRequest: string | undefined, +): unknown { + return { + id: `chunk-${Math.random().toString(36).slice(2)}`, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: modelFromRequest ?? modelFromConfig, + choices: [{ + index: 0, + delta: { content: result.content }, + finish_reason: result.finishReason, + }], + }; +} + +interface PostResponse { statusCode: number; body: string } + +/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */ +function postJson(url: string, body: unknown, bearer: string): Promise { + return new Promise((resolve, reject) => { + const u = new URL(url); + const driver = u.protocol === 'https:' ? https : http; + const payload = JSON.stringify(body); + const req = driver.request( + { + hostname: u.hostname, + port: u.port || (u.protocol === 'https:' ? 443 : 80), + path: u.pathname + u.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${bearer}`, + 'Content-Length': String(Buffer.byteLength(payload)), + Accept: 'application/json', + }, + timeout: 30_000, + }, + (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => resolve({ statusCode: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') })); + }, + ); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error('request timed out')); }); + req.write(payload); + req.end(); + }); +} diff --git a/src/mcplocal/tests/registrar.test.ts b/src/mcplocal/tests/registrar.test.ts new file mode 100644 index 0000000..40e99ce --- /dev/null +++ b/src/mcplocal/tests/registrar.test.ts @@ -0,0 +1,244 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import http from 'node:http'; +import { mkdtempSync, rmSync, readFileSync, writeFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, +} from '../src/providers/registrar.js'; +import type { LlmProvider, CompletionOptions, CompletionResult } from '../src/providers/types.js'; + +/** + * The registrar talks HTTP. Spin a tiny in-process server in each test so + * we can assert what it sends without mocking node:http itself. + */ +interface FakeServer { + url: string; + close: () => Promise; + /** Calls observed in arrival order. */ + calls: Array<{ method: string; path: string; body: string; headers: Record }>; + /** + * Optional handler. If set, runs per-request and decides response. If not, + * defaults to 201 + JSON `{ providerSessionId: 'sess-FAKE' }` for register + * and 200 + `{}` for everything else. + */ + handler?: (req: http.IncomingMessage, res: http.ServerResponse, body: string) => void; +} + +async function startFakeServer(): Promise { + const calls: FakeServer['calls'] = []; + let server!: http.Server; + const ready = new Promise((resolve, reject) => { + server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', (c: Buffer) => chunks.push(c)); + req.on('end', () => { + const body = Buffer.concat(chunks).toString('utf-8'); + calls.push({ + method: req.method ?? '', + path: req.url ?? '', + body, + headers: req.headers, + }); + if (fake.handler !== undefined) { + fake.handler(req, res, body); + return; + } + if (req.url === '/api/v1/llms/_provider-register') { + res.writeHead(201, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ providerSessionId: 'sess-FAKE', llms: [] })); + return; + } + res.writeHead(200, { 'content-type': 'application/json' }); + res.end('{}'); + }); + }); + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + if (addr === null || typeof addr === 'string') { + reject(new Error('listen failed')); + return; + } + const fakeReady: FakeServer = { + url: `http://127.0.0.1:${String(addr.port)}`, + close: () => new Promise((r) => { server.close(() => r()); }), + calls, + }; + Object.assign(fake, fakeReady); + resolve(fake); + }); + }); + const fake: FakeServer = {} as FakeServer; + return ready; +} + +function makeProvider(name: string, content = 'hi from local'): LlmProvider { + return { + name, + async complete(_opts: CompletionOptions): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +let tempDir: string; + +beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-registrar-test-')); +}); + +afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); +}); + +function silentLog(): { info: ReturnType; warn: ReturnType; error: ReturnType } { + return { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; +} + +describe('VirtualLlmRegistrar', () => { + it('start() with no published providers is a silent no-op', async () => { + const log = silentLog(); + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: 'http://unreachable.example', + token: 'tok', + publishedProviders: [], + sessionFilePath: join(tempDir, 'provider-session'), + log, + }); + await registrar.start(); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining('nothing to publish')); + registrar.stop(); + }); + + it('register POSTs to /_provider-register and persists the returned sessionId', async () => { + const fake = await startFakeServer(); + try { + const sessionFilePath = join(tempDir, 'provider-session'); + const published: RegistrarPublishedProvider[] = [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'qwen', tier: 'fast' }, + ]; + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 'tok-abc', + publishedProviders: published, + sessionFilePath, + log: silentLog(), + // Make heartbeat huge so it doesn't fire mid-test. + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + // Allow the SSE open to enter flight (we never feed it a response, + // but the request fires synchronously after register). + await new Promise((r) => setTimeout(r, 20)); + + const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register'); + expect(registerCall).toBeDefined(); + expect(registerCall!.method).toBe('POST'); + const body = JSON.parse(registerCall!.body) as { providers: Array<{ name: string; type: string; model: string; tier: string }> }; + expect(body.providers).toEqual([{ + name: 'vllm-local', + type: 'openai', + model: 'qwen', + tier: 'fast', + }]); + expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc'); + + // Sticky session id persisted. + expect(readFileSync(sessionFilePath, 'utf-8').trim()).toBe('sess-FAKE'); + expect(registrar.getSessionId()).toBe('sess-FAKE'); + + registrar.stop(); + } finally { + await fake.close(); + } + }); + + it('reuses an existing sticky session id from disk on next start', async () => { + const fake = await startFakeServer(); + try { + const sessionFilePath = join(tempDir, 'provider-session'); + writeFileSync(sessionFilePath, 'sess-existing\n', 'utf-8'); + + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath, + log: silentLog(), + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + await new Promise((r) => setTimeout(r, 20)); + + const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register'); + const body = JSON.parse(registerCall!.body) as { providerSessionId?: string }; + expect(body.providerSessionId).toBe('sess-existing'); + + registrar.stop(); + } finally { + await fake.close(); + } + }); + + it('heartbeat ticker POSTs the session id at the configured interval', async () => { + const fake = await startFakeServer(); + try { + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath: join(tempDir, 'provider-session'), + log: silentLog(), + heartbeatIntervalMs: 30, // tight so the test doesn't drag + }); + await registrar.start(); + // Wait long enough for at least 2 heartbeats to fire. + await new Promise((r) => setTimeout(r, 100)); + registrar.stop(); + + const heartbeats = fake.calls.filter((c) => c.path === '/api/v1/llms/_provider-heartbeat'); + expect(heartbeats.length).toBeGreaterThanOrEqual(2); + for (const h of heartbeats) { + const body = JSON.parse(h.body) as { providerSessionId: string }; + expect(body.providerSessionId).toBe('sess-FAKE'); + } + } finally { + await fake.close(); + } + }); + + it('throws when mcpd returns non-201 from /_provider-register', async () => { + const fake = await startFakeServer(); + fake.handler = (_req, res, _body) => { + res.writeHead(409, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ error: 'Cannot publish over public LLM: vllm-local' })); + }; + try { + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath: join(tempDir, 'provider-session'), + log: silentLog(), + heartbeatIntervalMs: 60_000, + }); + await expect(registrar.start()).rejects.toThrow(/HTTP 409/); + } finally { + await fake.close(); + } + }); +});