Files
mcpctl/src/mcplocal/src/providers/registrar.ts
Michal 2c98a21323 feat(mcpd+cli+mcplocal): wire visibility filter through routes, CLI, registrar (v7 Stage 2)
Stage 1 added the schema + service predicate. This stage threads the
filter through every surface that lists or fetches Llms/Agents:

- mcpd routes: viewerFromRequest helper builds a Viewer from the
  request's RBAC scope. List endpoints rely on the existing
  preSerialization hook (now two-phase: name-scope first, visibility
  second). get-by-id/get-by-name routes pass the viewer to the service
  which 404s on hidden rows.
- RBAC: AllowedScope gains `isAdmin` to distinguish a `*` cross-resource
  grant (admins skip visibility) from a plain `view:llms` grant
  (wildcard for RBAC, but visibility still applies). FastifyRequest
  augmentation updated.
- VirtualLlmService.register accepts ownerId and stamps it on freshly
  created virtual rows; defaults visibility to 'private' on first
  create, leaves existing rows untouched on sticky reconnect.
- AgentService.registerVirtualAgents mirrors the same defaults.
- mcplocal: LlmProviderFileEntry / AgentFileEntry / RegistrarPublishedX
  carry visibility through to the register payload (default 'private').
- CLI: VISIBILITY column on `mcpctl get llm` and `mcpctl get agent`,
  `--visibility` flag on `mcpctl create llm` / `create agent`. YAML
  round-trip works because visibility passes through stripInternalFields
  unchanged (ownerId is already stripped). Completions regenerated.

Tests: mcpd 908/908, mcplocal 731/731, cli 437/437.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 01:03:58 +01:00

613 lines
23 KiB
TypeScript

/**
* Virtual-LLM registrar — the mcplocal counterpart to mcpd's
* VirtualLlmService.
*
* Lifecycle:
* 1. start() called on mcplocal boot. If no providers are opted-in
* (`publish: true` in config), do nothing.
* 2. POST /api/v1/llms/_provider-register with the publishable set;
* receive a stable providerSessionId. Persist it to
* `~/.mcpctl/provider-session` so reconnects after a crash/restart
* adopt the same row instead of orphaning it.
* 3. Open the SSE channel at /api/v1/llms/_provider-stream with the
* session id in `x-mcpctl-provider-session`. Listen for
* `event: task` frames; for each, call the local provider's
* `complete()` and POST the OpenAI-shaped result back.
* 4. Heartbeat every 30 s. If the SSE drops, exponential backoff
* reconnect from 5 s up to 60 s.
* 5. stop() destroys the SSE socket and clears the timer; mcpd's
* 90-s heartbeat watchdog will then flip our rows to inactive.
*
* v1 caveat: the LlmProvider abstraction returns a finalized
* CompletionResult, not a token stream. We therefore translate
* streaming requests into a single SSE chunk + [DONE]. Real
* per-token streaming is a v2 concern — see docs/virtual-llms.md.
*/
import http from 'node:http';
import https from 'node:https';
import { promises as fs } from 'node:fs';
import { dirname } from 'node:path';
import { spawn } from 'node:child_process';
import type { LlmProvider, CompletionOptions } from './types.js';
import type { WakeRecipe } from '../http/config.js';
export interface RegistrarLogger {
info: (msg: string) => void;
warn: (msg: string) => void;
error: (msg: string) => void;
}
export interface RegistrarPublishedProvider {
provider: LlmProvider;
/** mcpd-side `type` field (openai | anthropic | …). */
type: string;
/** Model id surfaced under `mcpctl get llm`. */
model: string;
/** Optional tier (default 'fast'). */
tier?: 'fast' | 'heavy';
/** Optional human-readable description for `mcpctl get llm`. */
description?: string;
/**
* Optional wake recipe for backends that hibernate. When provided AND
* `provider.isAvailable()` returns false at registrar start, the row is
* published with status=hibernating; on the first incoming `wake` task
* the registrar runs this recipe and waits for the backend to come up.
*/
wake?: WakeRecipe;
/**
* v4: optional pool key. When set, the published Llm row carries
* `poolName` and stacks with any other Llms sharing the same value.
* Agents pinned to any pool member dispatch across all healthy members.
*/
poolName?: string;
/**
* v6: optional override for the wire-side name. When set, the row
* mcpd creates uses this name instead of `provider.name`. Used by
* the per-publisher namespacing path: each user's mcplocal can take
* a shared local config (`provider.name = "vllm-local-qwen3"`) and
* publish under a hostname-suffixed wire name
* (`vllm-local-qwen3-alice`) so two publishers don't collide on
* mcpd's cluster-wide name uniqueness. Inbound infer/wake tasks
* carry the wire name, so the registrar matches by
* `publishName ?? provider.name` everywhere.
*/
publishName?: string;
/**
* v7: per-user RBAC scoping. mcplocal-published virtuals default to
* 'private' (visible only to the publishing user) — workstations
* shouldn't broadcast their models org-wide unless explicitly
* shared. The publisher can override per provider with
* `"visibility": "public"` in their mcplocal config.
*/
visibility?: 'public' | 'private';
}
/**
* Local agent declaration to publish alongside the providers (v3). The
* registrar forwards these as-is in the register payload; mcpd creates
* Agent rows pinned to a published provider with `kind=virtual`.
*/
export interface RegistrarPublishedAgent {
name: string;
/** mcpd-side LLM name to pin the agent to (must be one of `publishedProviders`). */
llmName: string;
description?: string;
systemPrompt?: string;
project?: string;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
/** v7: per-user RBAC scoping, defaults to 'private' on register. */
visibility?: 'public' | 'private';
}
export interface RegistrarOptions {
mcpdUrl: string;
token: string;
publishedProviders: RegistrarPublishedProvider[];
/** Optional v3 — local agents to publish alongside the providers. */
publishedAgents?: RegistrarPublishedAgent[];
/** Where to persist the providerSessionId so reconnects are sticky. */
sessionFilePath: string;
log: RegistrarLogger;
/** Override knobs for tests (ms). */
heartbeatIntervalMs?: number;
reconnectBaseMs?: number;
reconnectMaxMs?: number;
}
const DEFAULT_HEARTBEAT_MS = 30_000;
const DEFAULT_RECONNECT_BASE_MS = 5_000;
const DEFAULT_RECONNECT_MAX_MS = 60_000;
const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session';
interface InferTask {
kind: 'infer';
taskId: string;
llmName: string;
request: {
messages: Array<{ role: 'system' | 'user' | 'assistant' | 'tool'; content: string }>;
temperature?: number;
max_tokens?: number;
model?: string;
};
streaming: boolean;
}
type TaskFrame = InferTask | { kind: 'wake'; taskId: string; llmName: string };
export class VirtualLlmRegistrar {
private sessionId: string | null = null;
private heartbeatTimer: NodeJS.Timeout | null = null;
private sseRequest: http.ClientRequest | null = null;
private reconnectAttempt = 0;
private stopped = false;
constructor(private readonly opts: RegistrarOptions) {}
/** Bring the registrar online. Idempotent — calling twice is a no-op. */
async start(): Promise<void> {
if (this.opts.publishedProviders.length === 0) {
this.opts.log.info('virtual-llm registrar: nothing to publish (no provider has `publish: true`)');
return;
}
this.stopped = false;
this.sessionId = await this.loadStickySessionId();
await this.register();
this.startHeartbeat();
this.openSseStream();
}
/** Tear down the registrar. Safe to call multiple times. */
stop(): void {
this.stopped = true;
if (this.heartbeatTimer !== null) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (this.sseRequest !== null) {
this.sseRequest.destroy();
this.sseRequest = null;
}
}
/** Test/inspection helper — avoids exposing internals broadly. */
getSessionId(): string | null {
return this.sessionId;
}
private async loadStickySessionId(): Promise<string | null> {
try {
const raw = await fs.readFile(this.opts.sessionFilePath, 'utf-8');
const id = raw.trim();
return id === '' ? null : id;
} catch {
return null;
}
}
private async saveStickySessionId(id: string): Promise<void> {
try {
await fs.mkdir(dirname(this.opts.sessionFilePath), { recursive: true });
await fs.writeFile(this.opts.sessionFilePath, id, 'utf-8');
} catch (err) {
this.opts.log.warn(`virtual-llm registrar: failed to persist session id: ${(err as Error).message}`);
}
}
private async register(): Promise<void> {
// Decide initial status per provider. A provider with a wake recipe
// that's NOT currently available comes up as hibernating; otherwise
// active (today's behavior). isAvailable() is forgiving — any
// unexpected throw is treated as "not available" so a transient
// network blip during boot doesn't crash the registrar.
const providers = await Promise.all(this.opts.publishedProviders.map(async (p) => {
let initialStatus: 'active' | 'hibernating' = 'active';
if (p.wake !== undefined) {
let alive = false;
try { alive = await p.provider.isAvailable(); } catch { alive = false; }
if (!alive) initialStatus = 'hibernating';
}
return {
// v6: when `publishName` is set, that's the cluster-wide unique
// name the row goes under. Defaults to the provider's local
// name (today's behavior — no mangling).
name: p.publishName ?? p.provider.name,
type: p.type,
model: p.model,
...(p.tier !== undefined ? { tier: p.tier } : {}),
...(p.description !== undefined ? { description: p.description } : {}),
...(p.poolName !== undefined ? { poolName: p.poolName } : {}),
// v7: virtuals default to private. Operators who want their
// workstation model org-visible set "visibility": "public" per
// provider in mcplocal config.
visibility: p.visibility ?? 'private',
initialStatus,
};
}));
const body: Record<string, unknown> = { providers };
if (this.sessionId !== null) body['providerSessionId'] = this.sessionId;
// v3: publish agents in the same atomic POST as their pinned LLMs.
// Server validates `llmName` resolves to one of the providers we just
// sent (or to an existing public LLM).
if (this.opts.publishedAgents !== undefined && this.opts.publishedAgents.length > 0) {
body['agents'] = this.opts.publishedAgents.map((a) => ({
name: a.name,
llmName: a.llmName,
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
...(a.project !== undefined ? { project: a.project } : {}),
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
// v7: forward visibility to mcpd. Defaults to 'private' for
// virtual agents on the server side when omitted.
visibility: a.visibility ?? 'private',
}));
}
const res = await postJson(
this.urlFor('/api/v1/llms/_provider-register'),
body,
this.opts.token,
);
if (res.statusCode !== 201) {
throw new Error(`provider-register HTTP ${String(res.statusCode)}: ${res.body}`);
}
const parsed = JSON.parse(res.body) as { providerSessionId: string };
this.sessionId = parsed.providerSessionId;
await this.saveStickySessionId(this.sessionId);
this.opts.log.info(
`virtual-llm registrar: published ${String(this.opts.publishedProviders.length)} provider(s); sessionId=${this.sessionId}`,
);
}
private startHeartbeat(): void {
const intervalMs = this.opts.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_MS;
this.heartbeatTimer = setInterval(() => {
if (this.sessionId === null) return;
void this.heartbeatOnce();
}, intervalMs);
}
private async heartbeatOnce(): Promise<void> {
if (this.sessionId === null) return;
try {
const res = await postJson(
this.urlFor('/api/v1/llms/_provider-heartbeat'),
{ providerSessionId: this.sessionId },
this.opts.token,
);
if (res.statusCode !== 200) {
this.opts.log.warn(`virtual-llm registrar: heartbeat HTTP ${String(res.statusCode)}: ${res.body}`);
}
} catch (err) {
this.opts.log.warn(`virtual-llm registrar: heartbeat failed: ${(err as Error).message}`);
}
}
private openSseStream(): void {
if (this.stopped || this.sessionId === null) return;
const url = new URL(this.urlFor('/api/v1/llms/_provider-stream'));
const driver = url.protocol === 'https:' ? https : http;
const req = driver.request(
{
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 443 : 80),
path: url.pathname + url.search,
method: 'GET',
headers: {
Accept: 'text/event-stream',
Authorization: `Bearer ${this.opts.token}`,
[PROVIDER_SESSION_HEADER]: this.sessionId,
},
},
(res) => {
if (res.statusCode !== 200) {
let body = '';
res.on('data', (c: Buffer) => { body += c.toString('utf-8'); });
res.on('end', () => {
this.opts.log.warn(`virtual-llm registrar: SSE HTTP ${String(res.statusCode ?? 0)}: ${body}`);
this.scheduleReconnect();
});
return;
}
this.reconnectAttempt = 0;
res.setEncoding('utf-8');
let buf = '';
res.on('data', (chunk: string) => {
buf += chunk;
let nl: number;
while ((nl = buf.indexOf('\n\n')) !== -1) {
const frame = buf.slice(0, nl);
buf = buf.slice(nl + 2);
this.handleSseFrame(frame);
}
});
res.on('end', () => this.scheduleReconnect());
res.on('error', (err) => {
this.opts.log.warn(`virtual-llm registrar: SSE response error: ${err.message}`);
this.scheduleReconnect();
});
},
);
req.on('error', (err) => {
this.opts.log.warn(`virtual-llm registrar: SSE request error: ${err.message}`);
this.scheduleReconnect();
});
req.end();
this.sseRequest = req;
}
private scheduleReconnect(): void {
this.sseRequest = null;
if (this.stopped) return;
const base = this.opts.reconnectBaseMs ?? DEFAULT_RECONNECT_BASE_MS;
const max = this.opts.reconnectMaxMs ?? DEFAULT_RECONNECT_MAX_MS;
const delay = Math.min(max, base * 2 ** this.reconnectAttempt);
this.reconnectAttempt += 1;
this.opts.log.info(`virtual-llm registrar: SSE disconnected, reconnecting in ${String(delay)} ms`);
setTimeout(() => {
if (!this.stopped) this.openSseStream();
}, delay).unref();
}
private handleSseFrame(frame: string): void {
let event = 'message';
let data = '';
for (const line of frame.split('\n')) {
if (line.startsWith('event:')) event = line.slice(6).trim();
else if (line.startsWith('data:')) data += line.slice(5).trim();
}
if (event !== 'task' || data === '') return;
let task: TaskFrame;
try {
task = JSON.parse(data) as TaskFrame;
} catch (err) {
this.opts.log.warn(`virtual-llm registrar: malformed task frame: ${(err as Error).message}`);
return;
}
if (task.kind === 'infer') {
void this.handleInferTask(task);
return;
}
if (task.kind === 'wake') {
void this.handleWakeTask(task);
return;
}
}
/**
* Run the configured wake recipe and poll the provider until it comes
* up. Sends a `{ status: 200, body: { ok: true } }` result on success;
* `{ error }` on timeout or recipe failure. While waiting, also bumps
* the heartbeat so mcpd's GC sweep doesn't decide we're stale mid-wake.
*/
private async handleWakeTask(task: { kind: 'wake'; taskId: string; llmName: string }): Promise<void> {
// v6: match against the publish name (wire-side) when set, fall
// back to the local provider name. Inbound task frames carry the
// wire name mcpd knows the row by.
const published = this.opts.publishedProviders.find((p) => (p.publishName ?? p.provider.name) === task.llmName);
if (published === undefined) {
await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` });
return;
}
if (published.wake === undefined) {
await this.postResult(task.taskId, { error: `provider '${task.llmName}' has no wake recipe configured` });
return;
}
try {
await runWakeRecipe(published.wake);
// Poll isAvailable() until it comes up (or timeout). Heartbeat
// every poll tick so mcpd doesn't time us out while we're waiting
// on a slow boot.
const maxWaitMs = (published.wake.maxWaitSeconds ?? 60) * 1000;
const started = Date.now();
while (Date.now() - started < maxWaitMs) {
let alive = false;
try { alive = await published.provider.isAvailable(); } catch { alive = false; }
if (alive) {
await this.heartbeatOnce();
await this.postResult(task.taskId, { status: 200, body: { ok: true, ms: Date.now() - started } });
return;
}
await this.heartbeatOnce();
await new Promise((r) => setTimeout(r, 1500));
}
await this.postResult(task.taskId, { error: `provider '${task.llmName}' did not come up within ${String(maxWaitMs)}ms` });
} catch (err) {
await this.postResult(task.taskId, { error: `wake recipe failed: ${(err as Error).message}` });
}
}
private async handleInferTask(task: InferTask): Promise<void> {
// v6: match against the publish name (wire-side) when set, fall
// back to the local provider name. Inbound task frames carry the
// wire name mcpd knows the row by.
const published = this.opts.publishedProviders.find((p) => (p.publishName ?? p.provider.name) === task.llmName);
if (published === undefined) {
await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` });
return;
}
try {
const completionOpts: CompletionOptions = {
messages: task.request.messages.map((m) => ({ role: m.role, content: m.content })),
};
if (task.request.temperature !== undefined) completionOpts.temperature = task.request.temperature;
if (task.request.max_tokens !== undefined) completionOpts.maxTokens = task.request.max_tokens;
if (task.request.model !== undefined) completionOpts.model = task.request.model;
const result = await published.provider.complete(completionOpts);
const completionBody = openAiCompletionEnvelope(published.model, result, task.request.model);
if (task.streaming) {
// Single-chunk streaming for v1: emit one delta then a [DONE] marker.
// The server-side relay forwards both as SSE frames to the original
// caller. Real per-token streaming would require LlmProvider.stream().
const deltaFrame = openAiStreamChunk(published.model, result, task.request.model);
await this.postResult(task.taskId, { chunk: { data: JSON.stringify(deltaFrame) } });
await this.postResult(task.taskId, { chunk: { data: '[DONE]', done: true } });
} else {
await this.postResult(task.taskId, { status: 200, body: completionBody });
}
} catch (err) {
await this.postResult(task.taskId, { error: (err as Error).message });
}
}
private async postResult(taskId: string, body: unknown): Promise<void> {
try {
await postJson(
this.urlFor(`/api/v1/llms/_provider-task/${encodeURIComponent(taskId)}/result`),
body as Record<string, unknown>,
this.opts.token,
);
} catch (err) {
this.opts.log.warn(`virtual-llm registrar: result POST failed: ${(err as Error).message}`);
}
}
private urlFor(path: string): string {
return `${this.opts.mcpdUrl.replace(/\/$/, '')}${path}`;
}
}
/** Wrap a CompletionResult in the OpenAI chat.completion envelope. */
function openAiCompletionEnvelope(
modelFromConfig: string,
result: { content: string; finishReason: string; usage: { promptTokens: number; completionTokens: number; totalTokens: number } },
modelFromRequest: string | undefined,
): unknown {
return {
id: `cmpl-${Math.random().toString(36).slice(2)}`,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
model: modelFromRequest ?? modelFromConfig,
choices: [{
index: 0,
message: { role: 'assistant', content: result.content },
finish_reason: result.finishReason,
}],
usage: {
prompt_tokens: result.usage.promptTokens,
completion_tokens: result.usage.completionTokens,
total_tokens: result.usage.totalTokens,
},
};
}
/** Single-chunk OpenAI chat.completion.chunk frame for v1 streaming. */
function openAiStreamChunk(
modelFromConfig: string,
result: { content: string; finishReason: string },
modelFromRequest: string | undefined,
): unknown {
return {
id: `chunk-${Math.random().toString(36).slice(2)}`,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: modelFromRequest ?? modelFromConfig,
choices: [{
index: 0,
delta: { content: result.content },
finish_reason: result.finishReason,
}],
};
}
/**
* Execute a wake recipe. Returns when the recipe completes; throws if it
* fails. Doesn't itself poll for provider readiness — that's the caller's
* job (handleWakeTask polls isAvailable() with its own timeout).
*
* `http`: fires the configured request and considers any 2xx a success.
* The remote service is expected to be a "wake controller" that returns
* quickly; if the underlying boot takes minutes, the controller should
* return 202 and the readiness poll catches up.
*
* `command`: spawns the binary with args, waits for exit. Non-zero exit
* is treated as failure. stdout/stderr are discarded — the recipe's job
* is to *trigger* a wake, not to produce output.
*/
async function runWakeRecipe(recipe: WakeRecipe): Promise<void> {
if (recipe.type === 'http') {
const u = new URL(recipe.url);
const driver = u.protocol === 'https:' ? https : http;
const method = recipe.method ?? 'POST';
const headers: Record<string, string> = { ...(recipe.headers ?? {}) };
const body = recipe.body;
if (body !== undefined) {
headers['Content-Length'] = String(Buffer.byteLength(body));
}
await new Promise<void>((resolve, reject) => {
const req = driver.request({
hostname: u.hostname,
port: u.port || (u.protocol === 'https:' ? 443 : 80),
path: u.pathname + u.search,
method,
headers,
timeout: 30_000,
}, (res) => {
const status = res.statusCode ?? 0;
// Drain so the socket can be reused/freed.
res.resume();
if (status >= 200 && status < 300) resolve();
else reject(new Error(`wake HTTP returned ${String(status)}`));
});
req.on('error', reject);
req.on('timeout', () => { req.destroy(); reject(new Error('wake HTTP timed out')); });
if (body !== undefined) req.write(body);
req.end();
});
return;
}
if (recipe.type === 'command') {
await new Promise<void>((resolve, reject) => {
const child = spawn(recipe.command, recipe.args ?? [], {
stdio: 'ignore',
});
child.on('error', reject);
child.on('exit', (code) => {
if (code === 0) resolve();
else reject(new Error(`wake command exited with code ${String(code)}`));
});
});
return;
}
throw new Error(`unknown wake recipe type`);
}
interface PostResponse { statusCode: number; body: string }
/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */
function postJson(url: string, body: unknown, bearer: string): Promise<PostResponse> {
return new Promise((resolve, reject) => {
const u = new URL(url);
const driver = u.protocol === 'https:' ? https : http;
const payload = JSON.stringify(body);
const req = driver.request(
{
hostname: u.hostname,
port: u.port || (u.protocol === 'https:' ? 443 : 80),
path: u.pathname + u.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${bearer}`,
'Content-Length': String(Buffer.byteLength(payload)),
Accept: 'application/json',
},
timeout: 30_000,
},
(res) => {
const chunks: Buffer[] = [];
res.on('data', (c: Buffer) => chunks.push(c));
res.on('end', () => resolve({ statusCode: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') }));
},
);
req.on('error', reject);
req.on('timeout', () => { req.destroy(); reject(new Error('request timed out')); });
req.write(payload);
req.end();
});
}