feat(mcplocal): virtual-LLM registrar (v1 Stage 4)

The mcplocal counterpart to mcpd's VirtualLlmService. After this stage,
flipping \`publish: true\` on a provider in ~/.mcpctl/config.json makes
the provider show up in mcpctl get llm with kind=virtual the next time
mcplocal restarts; running an inference against it relays through this
client back to the local LlmProvider.

Config:
- LlmProviderFileEntry gains optional \`publish: boolean\` (default false,
  so existing setups don't change).

Registrar (new file: providers/registrar.ts):
- start(): if any provider is opted-in, POSTs to
  /api/v1/llms/_provider-register with the publishable set, persists
  the returned providerSessionId to ~/.mcpctl/provider-session for
  sticky reconnects, then opens the SSE control channel and starts a
  30-s heartbeat ticker.
- SSE listener parses event/data lines from text/event-stream frames.
  task frames trigger handleInferTask: convert OpenAI body to
  CompletionOptions, call provider.complete(), POST the result back as
  either { status, body } (non-streaming) or two chunk POSTs
  (streaming: one delta + a [DONE] marker).
- Disconnect → exponential backoff reconnect from 5 s up to 60 s. On
  successful reconnect the persisted sessionId revives the same Llm
  rows in mcpd (mcpd flips them back to active on heartbeat).
- stop() destroys the SSE socket and clears the timer; cleanly handed
  off from main.ts's existing shutdown handler.

Wired into mcplocal main.ts via maybeStartVirtualLlmRegistrar:
- Filters opted-in providers, looks up their LlmProvider instances in
  the registry.
- Reads ~/.mcpctl/credentials for mcpdUrl + bearer; absence is a
  best-effort skip (logs a warning, returns null) — never a boot
  blocker.

v1 caveat documented in the file header: LlmProvider returns a
finalized CompletionResult, not a token stream, so streaming requests
get a single delta chunk + [DONE]. Real per-token streaming is a v2
concern.

Tests: 5 new in tests/registrar.test.ts using a tiny in-process HTTP
server. Cover: no-op when nothing opted-in, register POST + sticky
sessionId persistence, sticky reconnect from disk, heartbeat ticker
fires at the configured interval, register HTTP error surfaces.

Workspace suite: 2043/2043 across 152 files (was 2006/149, +5
new tests + the new file gets discovered).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-04-27 14:20:54 +01:00
parent 192a3831df
commit 97174f450f
4 changed files with 745 additions and 1 deletions

View File

@@ -0,0 +1,244 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import http from 'node:http';
import { mkdtempSync, rmSync, readFileSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
VirtualLlmRegistrar,
type RegistrarPublishedProvider,
} from '../src/providers/registrar.js';
import type { LlmProvider, CompletionOptions, CompletionResult } from '../src/providers/types.js';
/**
* The registrar talks HTTP. Spin a tiny in-process server in each test so
* we can assert what it sends without mocking node:http itself.
*/
interface FakeServer {
url: string;
close: () => Promise<void>;
/** Calls observed in arrival order. */
calls: Array<{ method: string; path: string; body: string; headers: Record<string, string | string[] | undefined> }>;
/**
* Optional handler. If set, runs per-request and decides response. If not,
* defaults to 201 + JSON `{ providerSessionId: 'sess-FAKE' }` for register
* and 200 + `{}` for everything else.
*/
handler?: (req: http.IncomingMessage, res: http.ServerResponse, body: string) => void;
}
async function startFakeServer(): Promise<FakeServer> {
const calls: FakeServer['calls'] = [];
let server!: http.Server;
const ready = new Promise<FakeServer>((resolve, reject) => {
server = http.createServer((req, res) => {
const chunks: Buffer[] = [];
req.on('data', (c: Buffer) => chunks.push(c));
req.on('end', () => {
const body = Buffer.concat(chunks).toString('utf-8');
calls.push({
method: req.method ?? '',
path: req.url ?? '',
body,
headers: req.headers,
});
if (fake.handler !== undefined) {
fake.handler(req, res, body);
return;
}
if (req.url === '/api/v1/llms/_provider-register') {
res.writeHead(201, { 'content-type': 'application/json' });
res.end(JSON.stringify({ providerSessionId: 'sess-FAKE', llms: [] }));
return;
}
res.writeHead(200, { 'content-type': 'application/json' });
res.end('{}');
});
});
server.listen(0, '127.0.0.1', () => {
const addr = server.address();
if (addr === null || typeof addr === 'string') {
reject(new Error('listen failed'));
return;
}
const fakeReady: FakeServer = {
url: `http://127.0.0.1:${String(addr.port)}`,
close: () => new Promise<void>((r) => { server.close(() => r()); }),
calls,
};
Object.assign(fake, fakeReady);
resolve(fake);
});
});
const fake: FakeServer = {} as FakeServer;
return ready;
}
function makeProvider(name: string, content = 'hi from local'): LlmProvider {
return {
name,
async complete(_opts: CompletionOptions): Promise<CompletionResult> {
return {
content,
toolCalls: [],
usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 },
finishReason: 'stop',
};
},
async listModels() { return []; },
async isAvailable() { return true; },
};
}
let tempDir: string;
beforeEach(() => {
tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-registrar-test-'));
});
afterEach(() => {
rmSync(tempDir, { recursive: true, force: true });
});
function silentLog(): { info: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn> } {
return { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
}
describe('VirtualLlmRegistrar', () => {
it('start() with no published providers is a silent no-op', async () => {
const log = silentLog();
const registrar = new VirtualLlmRegistrar({
mcpdUrl: 'http://unreachable.example',
token: 'tok',
publishedProviders: [],
sessionFilePath: join(tempDir, 'provider-session'),
log,
});
await registrar.start();
expect(log.info).toHaveBeenCalledWith(expect.stringContaining('nothing to publish'));
registrar.stop();
});
it('register POSTs to /_provider-register and persists the returned sessionId', async () => {
const fake = await startFakeServer();
try {
const sessionFilePath = join(tempDir, 'provider-session');
const published: RegistrarPublishedProvider[] = [
{ provider: makeProvider('vllm-local'), type: 'openai', model: 'qwen', tier: 'fast' },
];
const registrar = new VirtualLlmRegistrar({
mcpdUrl: fake.url,
token: 'tok-abc',
publishedProviders: published,
sessionFilePath,
log: silentLog(),
// Make heartbeat huge so it doesn't fire mid-test.
heartbeatIntervalMs: 60_000,
});
await registrar.start();
// Allow the SSE open to enter flight (we never feed it a response,
// but the request fires synchronously after register).
await new Promise((r) => setTimeout(r, 20));
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
expect(registerCall).toBeDefined();
expect(registerCall!.method).toBe('POST');
const body = JSON.parse(registerCall!.body) as { providers: Array<{ name: string; type: string; model: string; tier: string }> };
expect(body.providers).toEqual([{
name: 'vllm-local',
type: 'openai',
model: 'qwen',
tier: 'fast',
}]);
expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc');
// Sticky session id persisted.
expect(readFileSync(sessionFilePath, 'utf-8').trim()).toBe('sess-FAKE');
expect(registrar.getSessionId()).toBe('sess-FAKE');
registrar.stop();
} finally {
await fake.close();
}
});
it('reuses an existing sticky session id from disk on next start', async () => {
const fake = await startFakeServer();
try {
const sessionFilePath = join(tempDir, 'provider-session');
writeFileSync(sessionFilePath, 'sess-existing\n', 'utf-8');
const registrar = new VirtualLlmRegistrar({
mcpdUrl: fake.url,
token: 't',
publishedProviders: [
{ provider: makeProvider('vllm-local'), type: 'openai', model: 'm' },
],
sessionFilePath,
log: silentLog(),
heartbeatIntervalMs: 60_000,
});
await registrar.start();
await new Promise((r) => setTimeout(r, 20));
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
const body = JSON.parse(registerCall!.body) as { providerSessionId?: string };
expect(body.providerSessionId).toBe('sess-existing');
registrar.stop();
} finally {
await fake.close();
}
});
it('heartbeat ticker POSTs the session id at the configured interval', async () => {
const fake = await startFakeServer();
try {
const registrar = new VirtualLlmRegistrar({
mcpdUrl: fake.url,
token: 't',
publishedProviders: [
{ provider: makeProvider('vllm-local'), type: 'openai', model: 'm' },
],
sessionFilePath: join(tempDir, 'provider-session'),
log: silentLog(),
heartbeatIntervalMs: 30, // tight so the test doesn't drag
});
await registrar.start();
// Wait long enough for at least 2 heartbeats to fire.
await new Promise((r) => setTimeout(r, 100));
registrar.stop();
const heartbeats = fake.calls.filter((c) => c.path === '/api/v1/llms/_provider-heartbeat');
expect(heartbeats.length).toBeGreaterThanOrEqual(2);
for (const h of heartbeats) {
const body = JSON.parse(h.body) as { providerSessionId: string };
expect(body.providerSessionId).toBe('sess-FAKE');
}
} finally {
await fake.close();
}
});
it('throws when mcpd returns non-201 from /_provider-register', async () => {
const fake = await startFakeServer();
fake.handler = (_req, res, _body) => {
res.writeHead(409, { 'content-type': 'application/json' });
res.end(JSON.stringify({ error: 'Cannot publish over public LLM: vllm-local' }));
};
try {
const registrar = new VirtualLlmRegistrar({
mcpdUrl: fake.url,
token: 't',
publishedProviders: [
{ provider: makeProvider('vllm-local'), type: 'openai', model: 'm' },
],
sessionFilePath: join(tempDir, 'provider-session'),
log: silentLog(),
heartbeatIntervalMs: 60_000,
});
await expect(registrar.start()).rejects.toThrow(/HTTP 409/);
} finally {
await fake.close();
}
});
});