diff --git a/README.md b/README.md index a6b1cb9..e12537e 100644 --- a/README.md +++ b/README.md @@ -571,6 +571,32 @@ For binding prompts to personalities and the API surface, see prompt editing — paste a session token (`mcpctl auth login`) or PAT to log in. +### Virtual LLMs + +A user's local LLM (`vllm-local`, Ollama, …) can publish itself into +mcpd's `Llm` registry so anyone authorized sees it under `mcpctl get llm` +and can chat with it via `mcpctl chat-llm `. Inference is relayed +through the publishing mcplocal's SSE control channel — mcpd never holds +the local URL or API key. + +```fish +# In ~/.mcpctl/config.json, opt the provider in with `publish: true`: +# { "name": "vllm-local", "type": "openai", "model": "...", "publish": true } +systemctl --user restart mcplocal + +mcpctl get llm +# NAME KIND STATUS TYPE MODEL TIER ID +# qwen3-thinking public active openai qwen3-thinking fast ... +# vllm-local virtual active openai Qwen/Qwen2.5-7B-Instruct-AWQ fast ... + +mcpctl chat-llm vllm-local +> hello? +``` + +Lifecycle: 30 s heartbeats, 90 s heartbeat-stale → inactive, 4 h +inactive → auto-deleted. A reconnecting mcplocal adopts the same row +via a sticky `providerSessionId`. Full design: [docs/virtual-llms.md](docs/virtual-llms.md). + ## Commands ```bash diff --git a/completions/mcpctl.bash b/completions/mcpctl.bash index d0ea0a4..2b86325 100644 --- a/completions/mcpctl.bash +++ b/completions/mcpctl.bash @@ -5,7 +5,7 @@ _mcpctl() { local cur prev words cword _init_completion || return - local commands="status login logout config get describe delete logs create edit apply chat patch backup approve console cache test migrate rotate" + local commands="status login logout config get describe delete logs create edit apply chat chat-llm patch backup approve console cache test migrate rotate" local project_commands="get describe delete logs create edit attach-server detach-server" local global_opts="-v --version --daemon-url --direct -p --project -h --help" local resources="servers instances secrets secretbackends llms agents personalities templates projects users groups rbac prompts promptrequests serverattachments proxymodels all" @@ -247,6 +247,15 @@ _mcpctl() { COMPREPLY=($(compgen -W "-m --message --thread --system --system-file --system-append --personality --temperature --top-p --top-k --max-tokens --seed --stop --allow-tool --extra --no-stream -h --help" -- "$cur")) fi return ;; + chat-llm) + if [[ $((cword - subcmd_pos)) -eq 1 ]]; then + local names + names=$(_mcpctl_resource_names "llms") + COMPREPLY=($(compgen -W "$names -m --message --system --temperature --max-tokens --no-stream -h --help" -- "$cur")) + else + COMPREPLY=($(compgen -W "-m --message --system --temperature --max-tokens --no-stream -h --help" -- "$cur")) + fi + return ;; patch) if [[ -z "$resource_type" ]]; then COMPREPLY=($(compgen -W "$resources -h --help" -- "$cur")) diff --git a/completions/mcpctl.fish b/completions/mcpctl.fish index ed739a7..810b375 100644 --- a/completions/mcpctl.fish +++ b/completions/mcpctl.fish @@ -4,7 +4,7 @@ # Erase any stale completions from previous versions complete -c mcpctl -e -set -l commands status login logout config get describe delete logs create edit apply chat patch backup approve console cache test migrate rotate +set -l commands status login logout config get describe delete logs create edit apply chat chat-llm patch backup approve console cache test migrate rotate set -l project_commands get describe delete logs create edit attach-server detach-server # Disable file completions by default @@ -231,6 +231,7 @@ complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_ complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a edit -d 'Edit a resource in your default editor (server, project)' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a apply -d 'Apply declarative configuration from a YAML or JSON file' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a chat -d 'Open an interactive chat session with an agent (REPL or one-shot).' +complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a chat-llm -d 'Stateless chat with any registered LLM (public or virtual). No threads, no tools.' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a patch -d 'Patch a resource field (e.g. mcpctl patch project myproj llmProvider=none)' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a backup -d 'Git-based backup status and management' complete -c mcpctl -n "not __mcpctl_has_project; and not __fish_seen_subcommand_from $commands" -a approve -d 'Approve a pending prompt request (atomic: delete request, create prompt)' @@ -518,6 +519,13 @@ complete -c mcpctl -n "__fish_seen_subcommand_from chat" -l allow-tool -d 'Restr complete -c mcpctl -n "__fish_seen_subcommand_from chat" -l extra -d 'Provider-specific knob k=v (repeatable)' -x complete -c mcpctl -n "__fish_seen_subcommand_from chat" -l no-stream -d 'Disable SSE streaming (single JSON response)' +# chat-llm options +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -s m -l message -d 'One-shot: send a single message and exit (no REPL)' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l system -d 'Optional system prompt' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l temperature -d 'Sampling temperature (0..2)' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l max-tokens -d 'Maximum tokens in the assistant reply' -x +complete -c mcpctl -n "__fish_seen_subcommand_from chat-llm" -l no-stream -d 'Disable SSE streaming (single JSON response)' + # console options complete -c mcpctl -n "__fish_seen_subcommand_from console" -l stdin-mcp -d 'Run inspector as MCP server over stdin/stdout (for Claude)' complete -c mcpctl -n "__fish_seen_subcommand_from console" -l audit -d 'Browse audit events from mcpd' diff --git a/docs/agents.md b/docs/agents.md index e86df3d..b14688f 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -201,4 +201,8 @@ mcpctl chat reviewer - [personalities.md](./personalities.md) — named overlays of prompts on top of an agent. Same agent, different prompt bundles, picked per-turn via `--personality ` or `agent.defaultPersonality`. +- [virtual-llms.md](./virtual-llms.md) — local LLMs (e.g. `vllm-local`) + publishing themselves into `mcpctl get llm` so anyone can chat with + them via `mcpctl chat-llm `. Inference is relayed through the + publishing mcplocal — mcpd never holds the local URL or key. - [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags. diff --git a/docs/virtual-llms.md b/docs/virtual-llms.md new file mode 100644 index 0000000..008746b --- /dev/null +++ b/docs/virtual-llms.md @@ -0,0 +1,171 @@ +# Virtual LLMs + +A **virtual LLM** is an `Llm` row in mcpd that's *registered by an mcplocal +client* rather than created by hand with `mcpctl create llm`. Inference for +a virtual LLM is relayed back through the publishing mcplocal's SSE control +channel — **mcpd never needs to know the local URL or hold its API key**. + +When the publishing mcplocal goes away (or the user shuts down their +laptop) the row decays: `active → inactive` after 90 s without a +heartbeat, then deleted after 4 h of inactivity. A reconnecting mcplocal +adopts the same row using a sticky `providerSessionId` it persisted at +first publish. + +## When to use this + +- **Local model on a developer laptop** that you want everyone on the + team to be able to chat with via `mcpctl chat-llm `. The model + doesn't need to be reachable from mcpd's k8s pods — only the user's + mcplocal does (which is already the case because mcplocal pulls + projects from mcpd over HTTPS). +- **Hibernating models** that wake on demand (v2 — see "Roadmap"). +- **Pool of identical models** distributed across user laptops, eligible + for load balancing (v4). + +If your model is reachable from mcpd's k8s pods over LAN/VPN, you don't +need a virtual LLM — just `mcpctl create llm --type openai --url …` +and you're done. + +## Publishing a local provider + +mcplocal's local config (`~/.mcpctl/config.json`) gains a `publish: true` +opt-in per provider: + +```json +{ + "llm": { + "providers": [ + { + "name": "vllm-local", + "type": "openai", + "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", + "url": "http://127.0.0.1:8000/v1", + "tier": "fast", + "publish": true + } + ] + } +} +``` + +Restart mcplocal: + +```fish +systemctl --user restart mcplocal +``` + +The registrar: +1. Reads `~/.mcpctl/credentials` for `mcpdUrl` + bearer token. +2. POSTs to `/api/v1/llms/_provider-register` with the publishable set. +3. Persists the returned `providerSessionId` to + `~/.mcpctl/provider-session` so the next restart adopts the same + mcpd row. +4. Opens the SSE channel at `/api/v1/llms/_provider-stream`. +5. Heartbeats every 30 s. +6. Listens for `event: task` frames and runs them against the local + `LlmProvider`. + +If `~/.mcpctl/credentials` doesn't exist (e.g. you haven't run +`mcpctl auth login`), the registrar logs a warning and skips — +publishing is a best-effort feature, not a boot blocker. + +## Verifying + +```fish +$ mcpctl get llm +NAME KIND STATUS TYPE MODEL TIER KEY ID +qwen3-thinking public active openai qwen3-thinking fast secret://litellm-key/API_KEY cmofx8y7u… +vllm-local virtual active openai Qwen/Qwen2.5-7B-Instruct-AWQ fast - cmoxz12ab… + +$ mcpctl chat-llm vllm-local +───────────────────────────────────────────────────────── +LLM: vllm-local openai → Qwen/Qwen2.5-7B-Instruct-AWQ +Kind: virtual Status: active +───────────────────────────────────────────────────────── +> hello? +Hi! … +``` + +You can also chat with public LLMs the same way: + +```fish +$ mcpctl chat-llm qwen3-thinking +``` + +The CLI doesn't care about `kind` — mcpd's `/api/v1/llms//infer` +route branches on it server-side. + +## Lifecycle in detail + +| State | What it means | +|----------------|-----------------------------------------------------------------------| +| `active` | Heartbeat received within the last 90 s and the SSE channel is open. | +| `inactive` | Either the SSE closed or the heartbeat watchdog tripped. Inference returns 503. | +| `hibernating` | Reserved for v2 (wake-on-demand). v1 never writes this state. | + +Two timers on mcpd run the GC sweep: + +- **90 s** without a heartbeat → flip `active` → `inactive`. +- **4 h** in `inactive` → delete the row entirely. + +A reconnecting mcplocal with the same `providerSessionId` revives every +inactive row it owns; it only orphans rows that fell past the 4-h cutoff. + +## Inference relay + +When mcpd receives `POST /api/v1/llms//infer`: + +1. Look up the row, see `kind=virtual` + `status=active`. +2. Find the open SSE session for that `providerSessionId`. Missing + session → 503. +3. Push a `{ kind: "infer", taskId, llmName, request, streaming }` + task frame onto the SSE. +4. mcplocal pulls, calls `LlmProvider.complete(...)`, and POSTs the + result back to `/api/v1/llms/_provider-task//result`: + - non-streaming: `{ status: 200, body: }` + - streaming: per-chunk `{ chunk: { data, done? } }` + - failure: `{ error: "..." }` +5. mcpd forwards the result/chunks out to the original caller. + +**v1 caveat — streaming granularity**: `LlmProvider.complete()` returns +a finalized `CompletionResult`, not a token stream. Streaming requests +therefore arrive at the caller as a single delta + `[DONE]`. Real +per-token streaming is a v2 concern. + +## Roadmap (later stages) + +- **v2 — Wake-on-demand**: Secret-stored "wake recipe" so mcpd can ask + mcplocal to start a hibernating backend before sending inference. +- **v3 — Virtual agents**: mcplocal publishes its local agent configs + (model + system prompt + sampling defaults) into mcpd's `Agent` table. +- **v4 — LB pool by model**: agents can target a model name instead of + a specific Llm; mcpd picks the healthiest pool member per request. +- **v5 — Task queue**: persisted requests for hibernating/saturated + pools. Workers pull tasks of their model when they come online. + +## API surface (v1) + +``` +POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[] } +GET /api/v1/llms/_provider-stream → SSE channel; require x-mcpctl-provider-session header +POST /api/v1/llms/_provider-heartbeat → { providerSessionId } +POST /api/v1/llms/_provider-task/:id/result + → one of: + { error: "msg" } + { chunk: { data, done? } } + { status, body } + +GET /api/v1/llms → list (now includes kind, status, lastHeartbeatAt, inactiveSince) +POST /api/v1/llms//infer → routes through the SSE relay +DELETE /api/v1/llms/ → delete unconditionally (also runs GC's job) +``` + +RBAC piggybacks on `view/edit/create:llms` — no new resource. Publishing +a virtual LLM is morally a `create:llms` operation. + +## See also + +- [agents.md](./agents.md) — what an Agent is and how it pins to an LLM. +- [chat.md](./chat.md) — `mcpctl chat ` (full agent flow). +- The CLI: `mcpctl chat-llm ` (this doc) is the stateless + counterpart for raw LLM chat. diff --git a/scripts/generate-completions.ts b/scripts/generate-completions.ts index a0941a1..c8d61fc 100644 --- a/scripts/generate-completions.ts +++ b/scripts/generate-completions.ts @@ -920,6 +920,20 @@ function emitBashCase(emit: (s: string) => void, cmd: CmdInfo, root: CmdInfo): v return; } + // chat-llm: first arg is LLM name + if (name === 'chat-llm') { + emit(` ${name})`); + emit(' if [[ $((cword - subcmd_pos)) -eq 1 ]]; then'); + emit(' local names'); + emit(' names=$(_mcpctl_resource_names "llms")'); + emit(` COMPREPLY=($(compgen -W "$names ${optFlags}" -- "$cur"))`); + emit(' else'); + emit(` COMPREPLY=($(compgen -W "${optFlags}" -- "$cur"))`); + emit(' fi'); + emit(' return ;;'); + return; + } + // console: first arg is project name if (name === 'console') { emit(` ${name})`); diff --git a/src/cli/src/commands/chat-llm.ts b/src/cli/src/commands/chat-llm.ts new file mode 100644 index 0000000..b2c1d22 --- /dev/null +++ b/src/cli/src/commands/chat-llm.ts @@ -0,0 +1,271 @@ +/** + * `mcpctl chat-llm ` — stateless chat with any registered LLM. + * + * Distinct from `mcpctl chat `: + * - No threads, no history, no tools, no project prompts. + * - Just an OpenAI chat-completions round-trip per turn. + * - Works for both kinds of mcpd-registered LLMs: + * * `kind=public` — direct upstream call (existing behavior). + * * `kind=virtual` — relayed through the publishing mcplocal's SSE + * channel (the v1 virtual-LLM feature). + * + * The CLI doesn't need to know which kind the LLM is; mcpd's + * `/api/v1/llms/:name/infer` route branches on `kind` server-side. + */ +import { Command } from 'commander'; +import http from 'node:http'; +import https from 'node:https'; +import readline from 'node:readline'; +import type { ApiClient } from '../api-client.js'; +import { + formatStats, + installStatusBar, + newPhase, + recordDelta, + STDERR_IS_TTY, + styleStats, + type PhaseStats, + type StatusBar, +} from './chat.js'; + +const STREAM_TIMEOUT_MS = 600_000; + +export interface ChatLlmCommandDeps { + client: ApiClient; + baseUrl: string; + token?: string | undefined; + log: (...args: unknown[]) => void; +} + +export function createChatLlmCommand(deps: ChatLlmCommandDeps): Command { + return new Command('chat-llm') + .description('Stateless chat with any registered LLM (public or virtual). No threads, no tools.') + .argument('', 'LLM name (see `mcpctl get llm`)') + .option('-m, --message ', 'One-shot: send a single message and exit (no REPL)') + .option('--system ', 'Optional system prompt') + .option('--temperature ', 'Sampling temperature (0..2)', parseFloat) + .option('--max-tokens ', 'Maximum tokens in the assistant reply', parseFloatInt) + .option('--no-stream', 'Disable SSE streaming (single JSON response)') + .action(async (name: string, opts: ChatLlmOpts) => { + await printHeader(deps, name, opts.system); + if (opts.message !== undefined) { + await runOneShot(deps, name, opts); + return; + } + await runRepl(deps, name, opts); + }); +} + +interface ChatLlmOpts { + message?: string; + system?: string; + temperature?: number; + maxTokens?: number; + stream?: boolean; +} + +interface LlmInfo { + name: string; + type: string; + model: string; + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; +} + +async function printHeader(deps: ChatLlmCommandDeps, name: string, systemPrompt?: string): Promise { + let info: LlmInfo; + try { + info = await deps.client.get(`/api/v1/llms/${encodeURIComponent(name)}`); + } catch (err) { + process.stderr.write(`(could not fetch LLM metadata: ${(err as Error).message})\n`); + return; + } + const sep = '─'.repeat(60); + const out = (s: string): void => { process.stderr.write(`${styleStats(s)}\n`); }; + out(sep); + out(`LLM: ${info.name} ${info.type} → ${info.model}`); + out(`Kind: ${info.kind} Status: ${info.status}`); + if (systemPrompt !== undefined) { + out(`System: ${systemPrompt.slice(0, 120)}${systemPrompt.length > 120 ? '…' : ''}`); + } + out(sep); +} + +async function runOneShot(deps: ChatLlmCommandDeps, name: string, opts: ChatLlmOpts): Promise { + const messages = buildMessages([], opts.system, opts.message ?? ''); + const bar = opts.stream === false ? null : installStatusBar(); + try { + if (opts.stream === false) { + const reply = await postNonStream(deps, name, messages, opts); + process.stdout.write(`${reply}\n`); + } else { + await streamOnce(deps, name, messages, opts, bar); + } + } finally { + bar?.teardown(); + } +} + +async function runRepl(deps: ChatLlmCommandDeps, name: string, opts: ChatLlmOpts): Promise { + const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); + const ask = (q: string): Promise => new Promise((resolve) => rl.question(q, resolve)); + const history: Array<{ role: 'user' | 'assistant'; content: string }> = []; + + const bar = opts.stream === false ? null : installStatusBar(); + process.stderr.write(`Stateless chat with LLM '${name}'. Ctrl-D to exit.\n`); + + try { + while (true) { + let line: string; + try { line = await ask('> '); } catch { break; } + if (line === '') continue; + + const messages = buildMessages(history, opts.system, line); + try { + let reply: string; + if (opts.stream === false) { + reply = await postNonStream(deps, name, messages, opts); + process.stdout.write(`${reply}\n`); + } else { + reply = await streamOnce(deps, name, messages, opts, bar); + process.stdout.write('\n'); + } + history.push({ role: 'user', content: line }); + history.push({ role: 'assistant', content: reply }); + } catch (err) { + process.stderr.write(`error: ${(err as Error).message}\n`); + } + } + rl.close(); + } finally { + bar?.teardown(); + } +} + +function buildMessages( + history: Array<{ role: 'user' | 'assistant'; content: string }>, + system: string | undefined, + user: string, +): Array<{ role: 'system' | 'user' | 'assistant'; content: string }> { + const out: Array<{ role: 'system' | 'user' | 'assistant'; content: string }> = []; + if (system !== undefined && system !== '') out.push({ role: 'system', content: system }); + out.push(...history); + out.push({ role: 'user', content: user }); + return out; +} + +async function postNonStream( + deps: ChatLlmCommandDeps, + name: string, + messages: Array<{ role: string; content: string }>, + opts: ChatLlmOpts, +): Promise { + const body: Record = { messages }; + if (opts.temperature !== undefined) body['temperature'] = opts.temperature; + if (opts.maxTokens !== undefined) body['max_tokens'] = opts.maxTokens; + const res = await deps.client.post<{ + choices?: Array<{ message?: { content?: string } }>; + }>(`/api/v1/llms/${encodeURIComponent(name)}/infer`, body); + return res.choices?.[0]?.message?.content ?? ''; +} + +/** + * Stream a single chat call against /api/v1/llms/:name/infer with stream=true. + * The response is OpenAI-style SSE (`data: `). + * Returns the assembled assistant content. + */ +function streamOnce( + deps: ChatLlmCommandDeps, + name: string, + messages: Array<{ role: string; content: string }>, + opts: ChatLlmOpts, + bar: StatusBar | null, +): Promise { + const url = new URL(`${deps.baseUrl}/api/v1/llms/${encodeURIComponent(name)}/infer`); + const reqBody: Record = { messages, stream: true }; + if (opts.temperature !== undefined) reqBody['temperature'] = opts.temperature; + if (opts.maxTokens !== undefined) reqBody['max_tokens'] = opts.maxTokens; + const payload = JSON.stringify(reqBody); + const stats = { thinking: newPhase(), content: newPhase() } satisfies { thinking: PhaseStats; content: PhaseStats }; + + const TICK_MS = 250; + let timer: NodeJS.Timeout | null = null; + function startTicker(): void { + if (timer !== null || bar === null) return; + timer = setInterval(() => bar.update(formatStats(stats, true)), TICK_MS); + } + function stopTicker(): void { + if (timer !== null) { clearInterval(timer); timer = null; } + } + + return new Promise((resolve, reject) => { + let assistant = ''; + const driver = url.protocol === 'https:' ? https : http; + const req = driver.request({ + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + timeout: STREAM_TIMEOUT_MS, + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream', + ...(deps.token !== undefined ? { Authorization: `Bearer ${deps.token}` } : {}), + }, + }, (res) => { + const status = res.statusCode ?? 0; + if (status >= 400) { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => reject(new Error(`HTTP ${String(status)}: ${Buffer.concat(chunks).toString('utf-8')}`))); + return; + } + let buf = ''; + res.setEncoding('utf-8'); + res.on('data', (chunk: string) => { + buf += chunk; + let nl: number; + while ((nl = buf.indexOf('\n\n')) !== -1) { + const frame = buf.slice(0, nl); + buf = buf.slice(nl + 2); + for (const line of frame.split('\n')) { + if (!line.startsWith('data: ')) continue; + const data = line.slice(6); + if (data === '[DONE]') continue; + try { + const parsed = JSON.parse(data) as { choices?: Array<{ delta?: { content?: string } }> }; + const piece = parsed.choices?.[0]?.delta?.content; + if (typeof piece === 'string' && piece !== '') { + recordDelta(stats.content, piece); + process.stdout.write(piece); + assistant += piece; + startTicker(); + } + } catch { + // ignore malformed frames + } + } + } + }); + res.on('end', () => { + stopTicker(); + const final = formatStats(stats, false); + if (final !== '' && STDERR_IS_TTY) process.stderr.write(`\n${styleStats(`(${final})`)}`); + else if (final !== '') process.stderr.write(`\n(${final})`); + if (bar !== null && final !== '') bar.update(final); + resolve(assistant); + }); + res.on('error', (err) => { stopTicker(); reject(err); }); + }); + req.on('error', (err) => { stopTicker(); reject(err); }); + req.on('timeout', () => { stopTicker(); req.destroy(); reject(new Error('chat-llm stream timed out')); }); + req.write(payload); + req.end(); + }); +} + +function parseFloatInt(value: string): number { + const n = Number(value); + if (!Number.isInteger(n)) throw new Error(`expected integer, got '${value}'`); + return n; +} diff --git a/src/cli/src/commands/chat.ts b/src/cli/src/commands/chat.ts index e302cb6..edece9c 100644 --- a/src/cli/src/commands/chat.ts +++ b/src/cli/src/commands/chat.ts @@ -525,24 +525,24 @@ interface ChatStreamFrame { // ANSI codes for the reasoning sidebar. Dim + italic visually separates // reasoning ("the model is thinking") from final assistant content. We only // emit the codes when stderr is a TTY — piping to a file should stay clean. -const ANSI_DIM_ITALIC = '\x1b[2;3m'; -const ANSI_DIM = '\x1b[2m'; -const ANSI_RESET = '\x1b[0m'; -const STDERR_IS_TTY = process.stderr.isTTY === true; -function styleThinking(s: string): string { +export const ANSI_DIM_ITALIC = '\x1b[2;3m'; +export const ANSI_DIM = '\x1b[2m'; +export const ANSI_RESET = '\x1b[0m'; +export const STDERR_IS_TTY = process.stderr.isTTY === true; +export function styleThinking(s: string): string { return STDERR_IS_TTY ? `${ANSI_DIM_ITALIC}${s}${ANSI_RESET}` : s; } -function styleStats(s: string): string { +export function styleStats(s: string): string { return STDERR_IS_TTY ? `${ANSI_DIM}${s}${ANSI_RESET}` : s; } -interface PhaseStats { +export interface PhaseStats { words: number; firstMs: number; lastMs: number; } -function newPhase(): PhaseStats { return { words: 0, firstMs: 0, lastMs: 0 }; } -function recordDelta(p: PhaseStats, delta: string): void { +export function newPhase(): PhaseStats { return { words: 0, firstMs: 0, lastMs: 0 }; } +export function recordDelta(p: PhaseStats, delta: string): void { const now = Date.now(); if (p.firstMs === 0) p.firstMs = now; p.lastMs = now; @@ -558,7 +558,7 @@ function formatPhase(label: string, p: PhaseStats): string | null { const rate = p.words / sec; return `${label}${String(p.words)}w · ${rate.toFixed(1)} w/s · ${sec.toFixed(1)}s`; } -function formatStats(s: { thinking: PhaseStats; content: PhaseStats }, partial: boolean): string { +export function formatStats(s: { thinking: PhaseStats; content: PhaseStats }, partial: boolean): string { const parts: string[] = []; const c = formatPhase('', s.content); if (c !== null) parts.push(c); @@ -588,12 +588,12 @@ function formatStats(s: { thinking: PhaseStats; content: PhaseStats }, partial: * a foreign terminal in a half-locked state if Ctrl-C / uncaught exception * fires mid-stream. */ -interface StatusBar { +export interface StatusBar { update(text: string): void; teardown(): void; } -function installStatusBar(): StatusBar | null { +export function installStatusBar(): StatusBar | null { const out = process.stdout; if (!out.isTTY) return null; const initialRows = out.rows; diff --git a/src/cli/src/commands/get.ts b/src/cli/src/commands/get.ts index b56ec8d..c55f572 100644 --- a/src/cli/src/commands/get.ts +++ b/src/cli/src/commands/get.ts @@ -132,10 +132,16 @@ interface LlmRow { url: string; description: string; apiKeyRef: { name: string; key: string } | null; + // Virtual-provider lifecycle (optional for backward compat with older + // mcpd responses that predate the kind/status columns). + kind?: 'public' | 'virtual'; + status?: 'active' | 'inactive' | 'hibernating'; } const llmColumns: Column[] = [ { header: 'NAME', key: 'name' }, + { header: 'KIND', key: (r) => r.kind ?? 'public', width: 8 }, + { header: 'STATUS', key: (r) => r.status ?? 'active', width: 12 }, { header: 'TYPE', key: 'type', width: 12 }, { header: 'MODEL', key: 'model', width: 28 }, { header: 'TIER', key: 'tier', width: 8 }, diff --git a/src/cli/src/index.ts b/src/cli/src/index.ts index 6a0485f..4f54215 100644 --- a/src/cli/src/index.ts +++ b/src/cli/src/index.ts @@ -19,6 +19,7 @@ import { createPatchCommand } from './commands/patch.js'; import { createConsoleCommand } from './commands/console/index.js'; import { createCacheCommand } from './commands/cache.js'; import { createChatCommand } from './commands/chat.js'; +import { createChatLlmCommand } from './commands/chat-llm.js'; import { createMigrateCommand } from './commands/migrate.js'; import { createRotateCommand } from './commands/rotate.js'; import { ApiClient, ApiError } from './api-client.js'; @@ -241,6 +242,13 @@ export function createProgram(): Command { log: (...args) => console.log(...args), })); + program.addCommand(createChatLlmCommand({ + client, + baseUrl, + ...(creds?.token !== undefined ? { token: creds.token } : {}), + log: (...args) => console.log(...args), + })); + program.addCommand(createPatchCommand({ client, log: (...args) => console.log(...args), diff --git a/src/db/prisma/migrations/20260427125811_add_virtual_llm_lifecycle/migration.sql b/src/db/prisma/migrations/20260427125811_add_virtual_llm_lifecycle/migration.sql new file mode 100644 index 0000000..826398d --- /dev/null +++ b/src/db/prisma/migrations/20260427125811_add_virtual_llm_lifecycle/migration.sql @@ -0,0 +1,16 @@ +-- Add Llm.kind/status discriminators and virtual-provider lifecycle fields. +-- Existing rows backfill with kind='public' / status='active' so v1 is purely +-- additive — public LLMs ignore the lifecycle columns entirely. + +CREATE TYPE "LlmKind" AS ENUM ('public', 'virtual'); +CREATE TYPE "LlmStatus" AS ENUM ('active', 'inactive', 'hibernating'); + +ALTER TABLE "Llm" + ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public', + ADD COLUMN "providerSessionId" TEXT, + ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3), + ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active', + ADD COLUMN "inactiveSince" TIMESTAMP(3); + +CREATE INDEX "Llm_kind_status_idx" ON "Llm"("kind", "status"); +CREATE INDEX "Llm_providerSessionId_idx" ON "Llm"("providerSessionId"); diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index a52f1ae..dfc83e5 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -182,21 +182,44 @@ model Secret { // provider API key server-side so credentials never leave the cluster. // Credentials are stored by reference: `apiKeySecret` points at a Secret, and // `apiKeySecretKey` names the key within that secret's data. +// +// `kind=virtual` rows are *registered by an mcplocal client* (rather than a +// human via `mcpctl create llm`). Their inference is relayed back through +// the SSE control channel to the publishing mcplocal session. The lifecycle +// fields (lastHeartbeatAt, status, inactiveSince) belong to virtual rows; +// public rows ignore them. + +enum LlmKind { + public // upstream-URL row, mcpd calls directly + virtual // mcplocal-registered, inference relayed via SSE control channel +} + +enum LlmStatus { + active // healthy, accepting requests + inactive // publisher went away; row pending 4-h GC + hibernating // publisher present but backend asleep — wakes on demand (v2) +} model Llm { - id String @id @default(cuid()) - name String @unique - type String // anthropic | openai | deepseek | vllm | ollama | gemini-cli - model String // e.g. claude-3-5-sonnet-20241022 - url String @default("") // endpoint (empty for provider default) - tier String @default("fast") // fast | heavy - description String @default("") - apiKeySecretId String? // FK to Secret - apiKeySecretKey String? // key inside the Secret's data - extraConfig Json @default("{}") // per-type extras - version Int @default(1) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + id String @id @default(cuid()) + name String @unique + type String // anthropic | openai | deepseek | vllm | ollama | gemini-cli + model String // e.g. claude-3-5-sonnet-20241022 + url String @default("") // endpoint (empty for provider default) + tier String @default("fast") // fast | heavy + description String @default("") + apiKeySecretId String? // FK to Secret + apiKeySecretKey String? // key inside the Secret's data + extraConfig Json @default("{}") // per-type extras + // ── Virtual-provider lifecycle (NULL/default for kind=public) ── + kind LlmKind @default(public) + providerSessionId String? // mcplocal session that owns this row when virtual + lastHeartbeatAt DateTime? // bumped on every publisher heartbeat + status LlmStatus @default(active) + inactiveSince DateTime? // when status flipped from active; used for 4-h GC + version Int @default(1) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt apiKeySecret Secret? @relation(fields: [apiKeySecretId], references: [id], onDelete: SetNull) agents Agent[] @@ -204,6 +227,8 @@ model Llm { @@index([name]) @@index([tier]) @@index([apiKeySecretId]) + @@index([kind, status]) + @@index([providerSessionId]) } // ── Groups ── diff --git a/src/db/tests/llm-virtual-schema.test.ts b/src/db/tests/llm-virtual-schema.test.ts new file mode 100644 index 0000000..12fdfde --- /dev/null +++ b/src/db/tests/llm-virtual-schema.test.ts @@ -0,0 +1,136 @@ +import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest'; +import type { PrismaClient } from '@prisma/client'; +import { setupTestDb, cleanupTestDb, clearAllTables } from './helpers.js'; + +describe('llm virtual-provider schema', () => { + let prisma: PrismaClient; + + beforeAll(async () => { + prisma = await setupTestDb(); + }, 30_000); + + afterAll(async () => { + await cleanupTestDb(); + }); + + beforeEach(async () => { + await clearAllTables(prisma); + }); + + it('defaults a freshly inserted Llm to kind=public, status=active', async () => { + const llm = await prisma.llm.create({ + data: { name: 'plain', type: 'openai', model: 'gpt-4o' }, + }); + expect(llm.kind).toBe('public'); + expect(llm.status).toBe('active'); + expect(llm.providerSessionId).toBeNull(); + expect(llm.lastHeartbeatAt).toBeNull(); + expect(llm.inactiveSince).toBeNull(); + }); + + it('persists kind=virtual + lifecycle fields together', async () => { + const now = new Date(); + const llm = await prisma.llm.create({ + data: { + name: 'vllm-local', + type: 'openai', + model: 'Qwen/Qwen2.5-7B-Instruct-AWQ', + kind: 'virtual', + providerSessionId: 'sess-abc', + lastHeartbeatAt: now, + status: 'active', + }, + }); + expect(llm.kind).toBe('virtual'); + expect(llm.providerSessionId).toBe('sess-abc'); + expect(llm.lastHeartbeatAt?.getTime()).toBe(now.getTime()); + expect(llm.status).toBe('active'); + }); + + it('flips status active → inactive and records inactiveSince', async () => { + const llm = await prisma.llm.create({ + data: { + name: 'goingaway', + type: 'openai', + model: 'm', + kind: 'virtual', + providerSessionId: 's1', + }, + }); + const flippedAt = new Date(); + await prisma.llm.update({ + where: { id: llm.id }, + data: { status: 'inactive', inactiveSince: flippedAt }, + }); + const reloaded = await prisma.llm.findUnique({ where: { id: llm.id } }); + expect(reloaded?.status).toBe('inactive'); + expect(reloaded?.inactiveSince?.getTime()).toBe(flippedAt.getTime()); + }); + + it('hibernating is a valid LlmStatus value (reserved for v2 wake path)', async () => { + const llm = await prisma.llm.create({ + data: { + name: 'sleepy', + type: 'openai', + model: 'm', + kind: 'virtual', + providerSessionId: 's-sleep', + status: 'hibernating', + }, + }); + expect(llm.status).toBe('hibernating'); + }); + + it('rejects unknown enum values for kind / status', async () => { + await expect( + prisma.llm.create({ + // Cast through unknown — runtime test of the enum constraint, not TS. + data: ({ name: 'bad', type: 'openai', model: 'm', kind: 'made-up' } as unknown) as Parameters[0]['data'], + }), + ).rejects.toThrow(); + + await expect( + prisma.llm.create({ + data: ({ name: 'bad2', type: 'openai', model: 'm', status: 'unknown' } as unknown) as Parameters[0]['data'], + }), + ).rejects.toThrow(); + }); + + it('finds virtual rows by (kind, status) cheaply', async () => { + // Mix of public + virtual + assorted statuses — confirms the + // @@index([kind, status]) covers the GC sweep query. + await prisma.llm.create({ data: { name: 'pub-1', type: 'openai', model: 'm' } }); + await prisma.llm.create({ data: { name: 'pub-2', type: 'openai', model: 'm' } }); + await prisma.llm.create({ + data: { name: 'v-1', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 's1', status: 'active' }, + }); + await prisma.llm.create({ + data: { name: 'v-2', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() }, + }); + + const stale = await prisma.llm.findMany({ + where: { kind: 'virtual', status: 'inactive' }, + select: { name: true }, + }); + expect(stale.map((l) => l.name)).toEqual(['v-2']); + }); + + it('finds rows by providerSessionId (used on mcplocal reconnect)', async () => { + await prisma.llm.create({ + data: { name: 'a', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.llm.create({ + data: { name: 'b', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.llm.create({ + data: { name: 'c', type: 'openai', model: 'm', kind: 'virtual', providerSessionId: 'other' }, + }); + + const owned = await prisma.llm.findMany({ + where: { providerSessionId: 'shared' }, + select: { name: true }, + orderBy: { name: 'asc' }, + }); + expect(owned.map((l) => l.name)).toEqual(['a', 'b']); + }); +}); diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index fc05068..03ce6fd 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -40,6 +40,8 @@ import { ChatToolDispatcherImpl } from './services/chat-tool-dispatcher.js'; import { LlmAdapterRegistry } from './services/llm/dispatcher.js'; import { registerLlmRoutes } from './routes/llms.js'; import { registerLlmInferRoutes } from './routes/llm-infer.js'; +import { registerVirtualLlmRoutes } from './routes/virtual-llms.js'; +import { VirtualLlmService } from './services/virtual-llm.service.js'; import { registerAgentRoutes } from './routes/agents.js'; import { registerAgentChatRoutes } from './routes/agent-chat.js'; import { PromptRepository } from './repositories/prompt.repository.js'; @@ -433,6 +435,10 @@ async function main(): Promise { adapters: llmAdapters, log: { warn: (msg) => app.log.warn(msg) }, }); + // Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker + // is started below after `app.listen` so it doesn't fire before the + // server is accepting traffic. + const virtualLlmService = new VirtualLlmService(llmRepo); // AgentService + ChatService get fully wired below once projectService and // mcpProxyService are constructed (ChatService needs them via the // ChatToolDispatcher bridge). @@ -606,6 +612,7 @@ async function main(): Promise { registerLlmInferRoutes(app, { llmService, adapters: llmAdapters, + virtualLlms: virtualLlmService, onInferenceEvent: (event) => { app.log.info({ event: 'llm_inference_call', @@ -620,6 +627,7 @@ async function main(): Promise { }); }, }); + registerVirtualLlmRoutes(app, virtualLlmService); registerInstanceRoutes(app, instanceService); registerProjectRoutes(app, projectService); registerAuditLogRoutes(app, auditLogService); @@ -753,6 +761,21 @@ async function main(): Promise { } }, RECONCILE_INTERVAL_MS); + // Virtual-LLM GC sweep — flips heartbeat-stale rows to inactive (90 s + // cutoff) and deletes inactives past the 4 h retention window. Runs + // every 60 s; cheap (two indexed queries) when there are no virtuals. + const VIRTUAL_LLM_GC_INTERVAL_MS = 60_000; + const virtualLlmGcTimer = setInterval(async () => { + try { + const { markedInactive, deleted } = await virtualLlmService.gcSweep(); + if (markedInactive > 0 || deleted > 0) { + app.log.info(`[virtual-llm gc] markedInactive=${String(markedInactive)} deleted=${String(deleted)}`); + } + } catch (err) { + app.log.error({ err }, 'Virtual LLM GC sweep failed'); + } + }, VIRTUAL_LLM_GC_INTERVAL_MS); + // Health probe runner — periodic MCP probes (like k8s livenessProbe). // Without explicit healthCheck.tool, probes send tools/list through // McpProxyService so they traverse the exact production call path. @@ -787,6 +810,7 @@ async function main(): Promise { setupGracefulShutdown(app, { disconnectDb: async () => { clearInterval(reconcileTimer); + clearInterval(virtualLlmGcTimer); healthProbeRunner.stop(); secretBackendRotatorLoop.stop(); gitBackup.stop(); diff --git a/src/mcpd/src/repositories/llm.repository.ts b/src/mcpd/src/repositories/llm.repository.ts index 92f839a..06a5868 100644 --- a/src/mcpd/src/repositories/llm.repository.ts +++ b/src/mcpd/src/repositories/llm.repository.ts @@ -1,4 +1,4 @@ -import type { PrismaClient, Llm, Prisma } from '@prisma/client'; +import type { PrismaClient, Llm, Prisma, LlmKind, LlmStatus } from '@prisma/client'; export interface CreateLlmInput { name: string; @@ -10,9 +10,16 @@ export interface CreateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // Virtual-provider lifecycle (omit for kind=public). + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface UpdateLlmInput { + type?: string; model?: string; url?: string; tier?: string; @@ -20,6 +27,13 @@ export interface UpdateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // Virtual-provider lifecycle. VirtualLlmService is the only writer for + // these in v1; the public CRUD path leaves them undefined. + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface ILlmRepository { @@ -30,6 +44,10 @@ export interface ILlmRepository { create(data: CreateLlmInput): Promise; update(id: string, data: UpdateLlmInput): Promise; delete(id: string): Promise; + // Virtual-provider lifecycle helpers (called by VirtualLlmService). + findBySessionId(sessionId: string): Promise; + findStaleVirtuals(heartbeatCutoff: Date): Promise; + findExpiredInactives(deletionCutoff: Date): Promise; } export class LlmRepository implements ILlmRepository { @@ -63,12 +81,18 @@ export class LlmRepository implements ILlmRepository { apiKeySecretId: data.apiKeySecretId ?? null, apiKeySecretKey: data.apiKeySecretKey ?? null, extraConfig: (data.extraConfig ?? {}) as Prisma.InputJsonValue, + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), }, }); } async update(id: string, data: UpdateLlmInput): Promise { const updateData: Prisma.LlmUpdateInput = {}; + if (data.type !== undefined) updateData.type = data.type; if (data.model !== undefined) updateData.model = data.model; if (data.url !== undefined) updateData.url = data.url; if (data.tier !== undefined) updateData.tier = data.tier; @@ -80,10 +104,54 @@ export class LlmRepository implements ILlmRepository { } if (data.apiKeySecretKey !== undefined) updateData.apiKeySecretKey = data.apiKeySecretKey; if (data.extraConfig !== undefined) updateData.extraConfig = data.extraConfig as Prisma.InputJsonValue; + if (data.kind !== undefined) updateData.kind = data.kind; + if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; + if (data.status !== undefined) updateData.status = data.status; + if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt; + if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince; return this.prisma.llm.update({ where: { id }, data: updateData }); } async delete(id: string): Promise { await this.prisma.llm.delete({ where: { id } }); } + + // ── Virtual-provider lifecycle queries ── + + async findBySessionId(sessionId: string): Promise { + return this.prisma.llm.findMany({ + where: { providerSessionId: sessionId }, + orderBy: { name: 'asc' }, + }); + } + + /** + * Virtuals whose lastHeartbeatAt is older than the cutoff and are still + * marked active. The GC sweep flips these to `inactive`. Public rows + * never have lastHeartbeatAt set so they're naturally excluded by the + * non-null compare on `lt`. + */ + async findStaleVirtuals(heartbeatCutoff: Date): Promise { + return this.prisma.llm.findMany({ + where: { + kind: 'virtual', + status: 'active', + lastHeartbeatAt: { lt: heartbeatCutoff }, + }, + }); + } + + /** + * Virtuals that have been inactive longer than the deletion cutoff (4h + * by default). The GC sweep removes these. + */ + async findExpiredInactives(deletionCutoff: Date): Promise { + return this.prisma.llm.findMany({ + where: { + kind: 'virtual', + status: 'inactive', + inactiveSince: { lt: deletionCutoff }, + }, + }); + } } diff --git a/src/mcpd/src/routes/llm-infer.ts b/src/mcpd/src/routes/llm-infer.ts index 6c93cc9..38a84f7 100644 --- a/src/mcpd/src/routes/llm-infer.ts +++ b/src/mcpd/src/routes/llm-infer.ts @@ -15,12 +15,20 @@ import type { FastifyInstance, FastifyReply } from 'fastify'; import type { LlmService } from '../services/llm.service.js'; import type { LlmAdapterRegistry } from '../services/llm/dispatcher.js'; +import type { VirtualLlmService } from '../services/virtual-llm.service.js'; import { NotFoundError } from '../services/mcp-server.service.js'; import type { OpenAiChatRequest, InferContext } from '../services/llm/types.js'; export interface LlmInferDeps { llmService: LlmService; adapters: LlmAdapterRegistry; + /** + * Optional. When provided, requests for `kind=virtual` Llm rows are + * fanned through the SSE control channel rather than calling an + * upstream URL directly. Required for v1 of the virtual-LLM feature; + * absent in older test configurations. + */ + virtualLlms?: VirtualLlmService; /** Optional hook to emit audit events — consumer may ignore. */ onInferenceEvent?: (event: InferenceAuditEvent) => void; } @@ -62,6 +70,73 @@ export function registerLlmInferRoutes( return { error: 'messages is required' }; } + const streaming = body.stream === true; + + const audit = (status: number): void => { + if (deps.onInferenceEvent === undefined) return; + deps.onInferenceEvent({ + kind: 'llm_inference_call', + llmName: llm.name, + model: llm.model, + type: llm.type, + userId: request.userId, + tokenSha: request.mcpToken?.tokenSha, + streaming, + durationMs: Date.now() - started, + status, + }); + }; + + // ── Virtual-provider branch ── + // For kind=virtual rows there is no upstream URL — inference is fanned + // through the SSE control channel back to the publishing mcplocal. + // VirtualLlmService.enqueueInferTask handles the routing. + if (llm.kind === 'virtual') { + if (deps.virtualLlms === undefined) { + reply.code(500); + audit(500); + return { error: 'virtual LLM dispatch unavailable (server misconfiguration)' }; + } + try { + if (!streaming) { + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false); + const result = await ref.done; + reply.code(result.status); + audit(result.status); + return result.body; + } + // Streaming: open SSE response, fan chunks from the result stream + // into outgoing SSE frames. + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true); + const unsubscribe = ref.onChunk((chunk) => writeSseChunk(reply, chunk.data)); + try { + await ref.done; + audit(200); + } catch (err) { + const payload = JSON.stringify({ error: err instanceof Error ? err.message : String(err) }); + writeSseChunk(reply, payload); + audit(502); + } finally { + unsubscribe(); + if (!reply.raw.writableEnded) reply.raw.end(); + } + return reply; + } catch (err) { + const status = (err as { statusCode?: number }).statusCode ?? 502; + reply.code(status); + audit(status); + return { error: err instanceof Error ? err.message : String(err) }; + } + } + + // ── Public-provider branch (existing behavior) ── + // Resolve API key (may be empty string for providers that don't take one). let apiKey = ''; if (llm.apiKeyRef !== null) { @@ -82,22 +157,6 @@ export function registerLlmInferRoutes( }; const adapter = deps.adapters.get(llm.type); - const streaming = body.stream === true; - - const audit = (status: number): void => { - if (deps.onInferenceEvent === undefined) return; - deps.onInferenceEvent({ - kind: 'llm_inference_call', - llmName: llm.name, - model: llm.model, - type: llm.type, - userId: request.userId, - tokenSha: request.mcpToken?.tokenSha, - streaming, - durationMs: Date.now() - started, - status, - }); - }; if (!streaming) { try { diff --git a/src/mcpd/src/routes/virtual-llms.ts b/src/mcpd/src/routes/virtual-llms.ts new file mode 100644 index 0000000..59664cb --- /dev/null +++ b/src/mcpd/src/routes/virtual-llms.ts @@ -0,0 +1,174 @@ +/** + * Routes for the virtual-LLM control plane (`kind=virtual` Llm rows). + * + * POST /api/v1/llms/_provider-register — register/refresh, returns sessionId + * GET /api/v1/llms/_provider-stream — SSE channel; mcpd → mcplocal task fan-out + * POST /api/v1/llms/_provider-heartbeat — keep-alive (every 30 s from mcplocal) + * POST /api/v1/llms/_provider-task/:id/result — mcplocal pushes result/chunks back + * + * RBAC: these all live under `/api/v1/llms/...` so the existing + * `mapUrlToPermission` in main.ts maps them to the `llms` resource — + * POST = create:llms, GET = view:llms. That's appropriate: publishing + * a virtual LLM is morally the same as creating one. + * + * Inference for virtual rows still lands on `/api/v1/llms/:name/infer` + * (unchanged URL); that route gains a `kind=virtual` branch in this stage + * and delegates here via VirtualLlmService. + */ +import type { FastifyInstance, FastifyReply } from 'fastify'; +import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js'; + +const SSE_PING_MS = 20_000; +const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; + +export function registerVirtualLlmRoutes( + app: FastifyInstance, + service: VirtualLlmService, +): void { + app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>( + '/api/v1/llms/_provider-register', + async (request, reply) => { + const body = (request.body ?? {}); + const providers = Array.isArray(body.providers) ? body.providers : null; + if (providers === null || providers.length === 0) { + reply.code(400); + return { error: '`providers` array is required and must be non-empty' }; + } + + try { + const result = await service.register({ + providerSessionId: body.providerSessionId ?? null, + providers: providers.map(coerceProviderInput), + }); + reply.code(201); + return result; + } catch (err) { + const status = (err as { statusCode?: number }).statusCode ?? 500; + reply.code(status); + return { error: (err as Error).message }; + } + }, + ); + + app.get('/api/v1/llms/_provider-stream', (request, reply): FastifyReply => { + const sessionHeader = request.headers[PROVIDER_SESSION_HEADER]; + const sessionId = typeof sessionHeader === 'string' ? sessionHeader : null; + if (sessionId === null || sessionId === '') { + reply.code(400); + void reply.send({ error: `${PROVIDER_SESSION_HEADER} header is required (call /_provider-register first)` }); + return reply; + } + + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + + const handle: VirtualSessionHandle = { + pushTask(task: VirtualTaskFrame): void { + if (reply.raw.destroyed || reply.raw.writableEnded) return; + reply.raw.write(`event: task\ndata: ${JSON.stringify(task)}\n\n`); + }, + get alive(): boolean { + return !reply.raw.destroyed && !reply.raw.writableEnded; + }, + }; + + service.bindSession(sessionId, handle); + reply.raw.write(`event: hello\ndata: ${JSON.stringify({ sessionId })}\n\n`); + + // Keep-alive comment lines so proxies (Cilium, k8s ingress) don't time + // out an idle SSE connection. + const pingTimer = setInterval(() => { + if (reply.raw.destroyed || reply.raw.writableEnded) return; + reply.raw.write(`: ping\n\n`); + }, SSE_PING_MS); + + request.raw.on('close', () => { + clearInterval(pingTimer); + service.unbindSession(sessionId).catch((err: unknown) => { + app.log.warn({ err, sessionId }, 'unbindSession failed'); + }); + }); + + return reply; + }); + + app.post<{ Body: { providerSessionId?: string } }>( + '/api/v1/llms/_provider-heartbeat', + async (request, reply) => { + const sessionId = request.body?.providerSessionId; + if (typeof sessionId !== 'string' || sessionId === '') { + reply.code(400); + return { error: 'providerSessionId required' }; + } + await service.heartbeat(sessionId); + return { ok: true }; + }, + ); + + app.post<{ + Params: { taskId: string }; + Body: { + status?: number; + body?: unknown; + chunk?: { data: string; done?: boolean }; + error?: string; + }; + }>( + '/api/v1/llms/_provider-task/:taskId/result', + async (request, reply) => { + const { taskId } = request.params; + const body = request.body ?? {}; + + if (typeof body.error === 'string' && body.error !== '') { + const ok = service.failTask(taskId, new Error(body.error)); + return { ok }; + } + if (body.chunk !== undefined && typeof body.chunk.data === 'string') { + const ok = service.pushTaskChunk(taskId, body.chunk); + return { ok }; + } + if (typeof body.status === 'number') { + const ok = service.completeTask(taskId, { status: body.status, body: body.body }); + return { ok }; + } + + reply.code(400); + return { error: 'body must contain one of: { error }, { chunk: { data, done? } }, { status, body }' }; + }, + ); +} + +/** Narrow an unknown providers array element into the service's input shape. */ +function coerceProviderInput(raw: unknown): { + name: string; + type: string; + model: string; + tier?: string; + description?: string; + extraConfig?: Record; +} { + if (raw === null || typeof raw !== 'object') { + throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 }); + } + const o = raw as Record; + const name = o['name']; + const type = o['type']; + const model = o['model']; + if (typeof name !== 'string' || typeof type !== 'string' || typeof model !== 'string') { + throw Object.assign( + new Error('provider entry requires string `name`, `type`, `model`'), + { statusCode: 400 }, + ); + } + const out: ReturnType = { name, type, model }; + if (typeof o['tier'] === 'string') out.tier = o['tier']; + if (typeof o['description'] === 'string') out.description = o['description']; + if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') { + out.extraConfig = o['extraConfig'] as Record; + } + return out; +} diff --git a/src/mcpd/src/services/llm.service.ts b/src/mcpd/src/services/llm.service.ts index 6dd3932..92c5edd 100644 --- a/src/mcpd/src/services/llm.service.ts +++ b/src/mcpd/src/services/llm.service.ts @@ -50,6 +50,11 @@ export interface LlmView { description: string; apiKeyRef: ApiKeyRef | null; extraConfig: Record; + // Virtual-provider lifecycle (kind defaults to 'public' for legacy rows). + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; + lastHeartbeatAt: Date | null; + inactiveSince: Date | null; version: number; createdAt: Date; updatedAt: Date; @@ -275,6 +280,10 @@ export class LlmService { description: row.description, apiKeyRef, extraConfig: row.extraConfig as Record, + kind: row.kind, + status: row.status, + lastHeartbeatAt: row.lastHeartbeatAt, + inactiveSince: row.inactiveSince, version: row.version, createdAt: row.createdAt, updatedAt: row.updatedAt, diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts new file mode 100644 index 0000000..f242843 --- /dev/null +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -0,0 +1,327 @@ +/** + * VirtualLlmService — lifecycle for `kind=virtual` Llm rows. + * + * The story end-to-end: + * 1. mcplocal POSTs `/api/v1/llms/_provider-register` with the providers + * it wants to publish. We upsert each into the `Llm` table marked + * kind=virtual / status=active and return a stable + * `providerSessionId` to the caller. + * 2. mcplocal opens the SSE channel on `/api/v1/llms/_provider-stream`. + * `bindSession()` records the SSE handle in memory keyed by + * `providerSessionId`. Disconnect → `disconnect()` flips the rows to + * inactive immediately. + * 3. Heartbeats land on `/api/v1/llms/_provider-heartbeat` and bump + * `lastHeartbeatAt`. The 60-s GC sweep moves heartbeat-stale rows to + * inactive (catches sessions whose disconnect we missed) and deletes + * anything inactive past the 4-h cutoff. + * 4. At inference time `/api/v1/llms/:name/infer` resolves the row, sees + * kind=virtual, and asks `enqueueInferTask()` to relay through the SSE + * session. The session pumps the OpenAI body to mcplocal as a `task` + * frame and waits for the result POST on + * `/api/v1/llms/_provider-task/:taskId/result`. + * + * In v1 there's no wake-on-demand (v2) and no LB pool (v4). One open SSE + * session per `providerSessionId`; one inference at a time per task id. + */ +import type { Llm } from '@prisma/client'; +import { randomUUID } from 'node:crypto'; +import type { ILlmRepository } from '../repositories/llm.repository.js'; +import type { OpenAiChatRequest } from './llm/types.js'; +import { NotFoundError } from './mcp-server.service.js'; + +/** A virtual provider's announcement at registration time. */ +export interface RegisterProviderInput { + name: string; + type: string; + model: string; + tier?: string; + description?: string; + extraConfig?: Record; +} + +export interface RegisterResult { + providerSessionId: string; + llms: Llm[]; +} + +/** + * In-memory handle for a live SSE session. The route owns the actual + * Fastify reply object; this interface is what the service expects from + * it. Decouples the service from Fastify so unit tests can use a stub. + */ +export interface VirtualSessionHandle { + /** Send a server-sent task frame to the publisher (`event: task`). */ + pushTask(task: VirtualTaskFrame): void; + /** True iff the underlying SSE response is still writable. */ + readonly alive: boolean; +} + +export type VirtualTaskFrame = + | { kind: 'infer'; taskId: string; llmName: string; request: OpenAiChatRequest; streaming: boolean } + // v2 wake task lives here so the SSE protocol stays additive. + | { kind: 'wake'; taskId: string; llmName: string }; + +/** + * Pending inference task. The route handler awaits `done`; the result POST + * resolves it via `completeTask()`. The error path rejects via `failTask()`. + */ +interface PendingTask { + taskId: string; + sessionId: string; + llmName: string; + streaming: boolean; + resolveNonStreaming: (body: unknown, status: number) => void; + rejectNonStreaming: (err: Error) => void; + /** For streaming tasks only; null on non-streaming. */ + pushChunk: ((chunk: { data: string; done?: boolean }) => void) | null; +} + +const HEARTBEAT_TIMEOUT_MS = 90_000; +const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; // 4 h + +export interface IVirtualLlmService { + register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise; + heartbeat(providerSessionId: string): Promise; + bindSession(providerSessionId: string, handle: VirtualSessionHandle): void; + unbindSession(providerSessionId: string): Promise; + enqueueInferTask(llmName: string, request: OpenAiChatRequest, streaming: boolean): Promise; + completeTask(taskId: string, result: { status: number; body: unknown }): boolean; + pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean; + failTask(taskId: string, error: Error): boolean; + gcSweep(now?: Date): Promise<{ markedInactive: number; deleted: number }>; +} + +/** Returned to the route handler so it can await the result. */ +export interface PendingTaskRef { + taskId: string; + /** Resolves for non-streaming tasks. Streaming tasks reject this — use the chunk callback path. */ + done: Promise<{ status: number; body: unknown }>; + /** Streaming-only: subscribe to chunks. Returns an unsubscribe fn. */ + onChunk(cb: (chunk: { data: string; done?: boolean }) => void): () => void; +} + +export class VirtualLlmService implements IVirtualLlmService { + private readonly sessions = new Map(); + private readonly tasksById = new Map(); + + constructor(private readonly repo: ILlmRepository) {} + + async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise { + const sessionId = input.providerSessionId ?? randomUUID(); + const now = new Date(); + const llms: Llm[] = []; + + for (const p of input.providers) { + const existing = await this.repo.findByName(p.name); + if (existing === null) { + const created = await this.repo.create({ + name: p.name, + type: p.type, + model: p.model, + tier: p.tier ?? 'fast', + description: p.description ?? '', + ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + llms.push(created); + continue; + } + + // Existing row. Only allowed to (re-)register over a virtual row owned + // by the same session, OR an inactive virtual whose session went away + // (sticky reconnect). Refuse to overwrite a public row or someone + // else's active virtual. + if (existing.kind === 'public') { + throw Object.assign( + new Error(`Cannot publish over public LLM: ${p.name}`), + { statusCode: 409 }, + ); + } + if (existing.providerSessionId !== sessionId && existing.status === 'active') { + throw Object.assign( + new Error(`Virtual LLM '${p.name}' is already active under a different session`), + { statusCode: 409 }, + ); + } + + const updated = await this.repo.update(existing.id, { + type: p.type, + model: p.model, + ...(p.tier !== undefined ? { tier: p.tier } : {}), + ...(p.description !== undefined ? { description: p.description } : {}), + ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + llms.push(updated); + } + + return { providerSessionId: sessionId, llms }; + } + + async heartbeat(providerSessionId: string): Promise { + const owned = await this.repo.findBySessionId(providerSessionId); + if (owned.length === 0) return; + const now = new Date(); + for (const row of owned) { + // Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a + // network blip that lapsed the SSE), revive it. + await this.repo.update(row.id, { + lastHeartbeatAt: now, + ...(row.status === 'inactive' + ? { status: 'active', inactiveSince: null } + : {}), + }); + } + } + + bindSession(providerSessionId: string, handle: VirtualSessionHandle): void { + // Replace any prior handle for this session — keeps "last writer wins" + // simple. The old SSE will have been closed by the publisher anyway. + this.sessions.set(providerSessionId, handle); + } + + async unbindSession(providerSessionId: string): Promise { + this.sessions.delete(providerSessionId); + // Flip every Llm owned by that session to inactive immediately. + const owned = await this.repo.findBySessionId(providerSessionId); + const now = new Date(); + for (const row of owned) { + if (row.status === 'active') { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + } + } + // Reject any in-flight tasks for this session — the relay can't deliver + // a result POST anymore. + for (const t of this.tasksById.values()) { + if (t.sessionId === providerSessionId) { + this.failTask(t.taskId, new Error('publisher disconnected')); + } + } + } + + async enqueueInferTask( + llmName: string, + request: OpenAiChatRequest, + streaming: boolean, + ): Promise { + const llm = await this.repo.findByName(llmName); + if (llm === null) throw new NotFoundError(`Llm not found: ${llmName}`); + if (llm.kind !== 'virtual' || llm.providerSessionId === null) { + throw Object.assign( + new Error(`Llm '${llmName}' is not a virtual provider`), + { statusCode: 500 }, + ); + } + if (llm.status !== 'active') { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`), + { statusCode: 503 }, + ); + } + const handle = this.sessions.get(llm.providerSessionId); + if (handle === undefined || !handle.alive) { + throw Object.assign( + new Error(`Virtual Llm '${llmName}' has no live SSE session; publisher offline`), + { statusCode: 503 }, + ); + } + + const taskId = randomUUID(); + const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>(); + + let resolveDone!: (v: { status: number; body: unknown }) => void; + let rejectDone!: (err: Error) => void; + const done = new Promise<{ status: number; body: unknown }>((resolve, reject) => { + resolveDone = resolve; + rejectDone = reject; + }); + + const pending: PendingTask = { + taskId, + sessionId: llm.providerSessionId, + llmName, + streaming, + resolveNonStreaming: (body, status) => resolveDone({ status, body }), + rejectNonStreaming: rejectDone, + pushChunk: streaming + ? (chunk): void => { for (const cb of chunkSubscribers) cb(chunk); } + : null, + }; + this.tasksById.set(taskId, pending); + + handle.pushTask({ + kind: 'infer', + taskId, + llmName, + request, + streaming, + }); + + return { + taskId, + done, + onChunk(cb): () => void { + chunkSubscribers.add(cb); + return () => chunkSubscribers.delete(cb); + }, + }; + } + + completeTask(taskId: string, result: { status: number; body: unknown }): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined) return false; + this.tasksById.delete(taskId); + t.resolveNonStreaming(result.body, result.status); + return true; + } + + pushTaskChunk(taskId: string, chunk: { data: string; done?: boolean }): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined || t.pushChunk === null) return false; + t.pushChunk(chunk); + if (chunk.done === true) { + // For streaming tasks, also resolve the `done` promise so the route + // handler can clean up. + t.resolveNonStreaming(null, 200); + this.tasksById.delete(taskId); + } + return true; + } + + failTask(taskId: string, error: Error): boolean { + const t = this.tasksById.get(taskId); + if (t === undefined) return false; + this.tasksById.delete(taskId); + t.rejectNonStreaming(error); + return true; + } + + async gcSweep(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> { + let markedInactive = 0; + let deleted = 0; + + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); + const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); + for (const row of stale) { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + markedInactive += 1; + } + + const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); + const expired = await this.repo.findExpiredInactives(deletionCutoff); + for (const row of expired) { + await this.repo.delete(row.id); + deleted += 1; + } + + return { markedInactive, deleted }; + } +} diff --git a/src/mcpd/tests/llm-infer-route.test.ts b/src/mcpd/tests/llm-infer-route.test.ts index 20e5339..33279e3 100644 --- a/src/mcpd/tests/llm-infer-route.test.ts +++ b/src/mcpd/tests/llm-infer-route.test.ts @@ -20,6 +20,10 @@ function makeLlmView(overrides: Partial = {}): LlmView { description: '', apiKeyRef: { name: 'anthropic-key', key: 'token' }, extraConfig: {}, + kind: 'public', + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, version: 1, createdAt: new Date(), updatedAt: new Date(), @@ -205,4 +209,87 @@ describe('POST /api/v1/llms/:name/infer', () => { expect(res.statusCode).toBe(502); expect(res.json<{ error: string }>().error).toMatch(/upstream down/); }); + + // ── Virtual-provider branch (kind=virtual) ── + + it('routes kind=virtual non-streaming through VirtualLlmService.enqueueInferTask', async () => { + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', type: 'openai', apiKeyRef: null }), + resolveApiKey: async () => '', + }; + const enqueue = vi.fn(async () => ({ + taskId: 't-1', + done: Promise.resolve({ status: 200, body: { choices: [{ message: { content: 'hello from relay' } }] } }), + onChunk: () => () => undefined, + })); + app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + registerLlmInferRoutes(app, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + llmService: svc as any, + adapters: new LlmAdapterRegistry(), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + virtualLlms: { enqueueInferTask: enqueue } as any, + }); + await app.ready(); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(200); + expect(res.json<{ choices: Array<{ message: { content: string } }> }>().choices[0]!.message.content).toBe('hello from relay'); + expect(enqueue).toHaveBeenCalledWith( + 'claude', + expect.objectContaining({ messages: expect.any(Array) }), + false, + ); + }); + + it('returns 503 when the publisher is offline (VirtualLlmService throws)', async () => { + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }), + resolveApiKey: async () => '', + }; + const enqueue = vi.fn(async () => { + throw Object.assign(new Error('no live SSE session; publisher offline'), { statusCode: 503 }); + }); + app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + registerLlmInferRoutes(app, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + llmService: svc as any, + adapters: new LlmAdapterRegistry(), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + virtualLlms: { enqueueInferTask: enqueue } as any, + }); + await app.ready(); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(503); + expect(res.json<{ error: string }>().error).toMatch(/publisher offline/); + }); + + it('returns 500 when virtualLlms dep is missing but the row is kind=virtual', async () => { + // Defensive: prior test configurations may not pass virtualLlms. We + // surface a clear server-misconfiguration error rather than calling + // the public-adapter path, which would try to hit an empty URL. + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }), + resolveApiKey: async () => '', + }; + await setupApp(svc, new LlmAdapterRegistry()); // no virtualLlms + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(500); + expect(res.json<{ error: string }>().error).toMatch(/virtual LLM dispatch unavailable/); + }); }); diff --git a/src/mcpd/tests/virtual-llm-routes.test.ts b/src/mcpd/tests/virtual-llm-routes.test.ts new file mode 100644 index 0000000..a4477bc --- /dev/null +++ b/src/mcpd/tests/virtual-llm-routes.test.ts @@ -0,0 +1,184 @@ +import { describe, it, expect, vi, afterEach } from 'vitest'; +import Fastify from 'fastify'; +import type { FastifyInstance } from 'fastify'; +import { registerVirtualLlmRoutes } from '../src/routes/virtual-llms.js'; +import type { + VirtualLlmService, + VirtualSessionHandle, +} from '../src/services/virtual-llm.service.js'; + +let app: FastifyInstance; + +afterEach(async () => { + if (app) await app.close(); +}); + +function fakeService(overrides: Partial = {}): VirtualLlmService { + return { + register: vi.fn(async (input) => ({ + providerSessionId: input.providerSessionId ?? 'sess-generated', + llms: [], + })), + heartbeat: vi.fn(async () => undefined), + bindSession: vi.fn(), + unbindSession: vi.fn(async () => undefined), + enqueueInferTask: vi.fn(), + completeTask: vi.fn(() => true), + pushTaskChunk: vi.fn(() => true), + failTask: vi.fn(() => true), + gcSweep: vi.fn(), + ...overrides, + } as unknown as VirtualLlmService; +} + +async function setupApp(svc: VirtualLlmService): Promise { + app = Fastify({ logger: false }); + registerVirtualLlmRoutes(app, svc); + await app.ready(); + return app; +} + +describe('POST /api/v1/llms/_provider-register', () => { + it('returns 400 when providers is missing or empty', async () => { + await setupApp(fakeService()); + const a = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: {} }); + expect(a.statusCode).toBe(400); + const b = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: { providers: [] } }); + expect(b.statusCode).toBe(400); + }); + + it('returns 400 when a provider entry is missing required fields', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { providers: [{ name: 'incomplete' }] }, + }); + expect(res.statusCode).toBe(400); + }); + + it('forwards a valid registration to the service and returns 201', async () => { + const register = vi.fn(async () => ({ + providerSessionId: 'sess-xyz', + llms: [{ id: 'l1' }], + })); + await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { + providerSessionId: 'sess-xyz', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }], + }, + }); + expect(res.statusCode).toBe(201); + expect(register).toHaveBeenCalledWith({ + providerSessionId: 'sess-xyz', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }], + }); + expect(res.json()).toMatchObject({ providerSessionId: 'sess-xyz' }); + }); + + it('surfaces service errors with their declared status code (e.g. 409 conflict)', async () => { + const register = vi.fn(async () => { + throw Object.assign(new Error('Cannot publish over public LLM: dup'), { statusCode: 409 }); + }); + await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { providers: [{ name: 'dup', type: 'openai', model: 'm' }] }, + }); + expect(res.statusCode).toBe(409); + expect(res.json()).toMatchObject({ error: expect.stringMatching(/public LLM/) }); + }); +}); + +describe('POST /api/v1/llms/_provider-heartbeat', () => { + it('returns 400 without providerSessionId', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-heartbeat', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); + + it('forwards the sessionId to service.heartbeat', async () => { + const heartbeat = vi.fn(async () => undefined); + await setupApp(fakeService({ heartbeat })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-heartbeat', + payload: { providerSessionId: 'sess-abc' }, + }); + expect(res.statusCode).toBe(200); + expect(heartbeat).toHaveBeenCalledWith('sess-abc'); + }); +}); + +describe('POST /api/v1/llms/_provider-task/:taskId/result', () => { + it('forwards { error } to service.failTask', async () => { + const failTask = vi.fn(() => true); + await setupApp(fakeService({ failTask })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-1/result', + payload: { error: 'upstream blew up' }, + }); + expect(res.statusCode).toBe(200); + expect(failTask).toHaveBeenCalledWith('t-1', expect.objectContaining({ message: 'upstream blew up' })); + }); + + it('forwards { chunk } to service.pushTaskChunk', async () => { + const pushTaskChunk = vi.fn(() => true); + await setupApp(fakeService({ pushTaskChunk })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-2/result', + payload: { chunk: { data: 'hello' } }, + }); + expect(res.statusCode).toBe(200); + expect(pushTaskChunk).toHaveBeenCalledWith('t-2', { data: 'hello' }); + }); + + it('forwards { status, body } to service.completeTask', async () => { + const completeTask = vi.fn(() => true); + await setupApp(fakeService({ completeTask })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-3/result', + payload: { status: 200, body: { ok: true } }, + }); + expect(res.statusCode).toBe(200); + expect(completeTask).toHaveBeenCalledWith('t-3', { status: 200, body: { ok: true } }); + }); + + it('returns 400 for an empty/unrecognised result body', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-4/result', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('GET /api/v1/llms/_provider-stream', () => { + it('returns 400 without the x-mcpctl-provider-session header', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'GET', + url: '/api/v1/llms/_provider-stream', + }); + expect(res.statusCode).toBe(400); + }); + + // Note: a full SSE handshake test would require a real HTTP listen + // because `app.inject` holds the response open and never returns under + // the `text/event-stream` keep-alive. The smoke test in Stage 6 spins + // up a real listener and exercises the open → bind → task → close + // round-trip end to end. +}); diff --git a/src/mcpd/tests/virtual-llm-service.test.ts b/src/mcpd/tests/virtual-llm-service.test.ts new file mode 100644 index 0000000..2459dcc --- /dev/null +++ b/src/mcpd/tests/virtual-llm-service.test.ts @@ -0,0 +1,347 @@ +import { describe, it, expect, vi } from 'vitest'; +import { VirtualLlmService, type VirtualSessionHandle } from '../src/services/virtual-llm.service.js'; +import type { ILlmRepository } from '../src/repositories/llm.repository.js'; +import type { Llm } from '@prisma/client'; + +function makeLlm(overrides: Partial = {}): Llm { + return { + id: `llm-${Math.random().toString(36).slice(2, 8)}`, + name: 'vllm-local', + type: 'openai', + model: 'm', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {} as Llm['extraConfig'], + kind: 'virtual', + providerSessionId: 's-1', + lastHeartbeatAt: new Date(), + status: 'active', + inactiveSince: null, + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + }; +} + +function mockRepo(initial: Llm[] = []): ILlmRepository { + const rows = new Map(initial.map((l) => [l.id, l])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const l of rows.values()) if (l.name === name) return l; + return null; + }), + findByTier: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((l) => l.providerSessionId === sid)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((l) => + l.kind === 'virtual' + && l.status === 'active' + && l.lastHeartbeatAt !== null + && l.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((l) => + l.kind === 'virtual' + && l.status === 'inactive' + && l.inactiveSince !== null + && l.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeLlm({ + id: `llm-${String(counter)}`, + name: data.name, + type: data.type, + model: data.model, + url: data.url ?? '', + tier: data.tier ?? 'fast', + description: data.description ?? '', + kind: data.kind ?? 'public', + providerSessionId: data.providerSessionId ?? null, + status: data.status ?? 'active', + lastHeartbeatAt: data.lastHeartbeatAt ?? null, + inactiveSince: data.inactiveSince ?? null, + }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Llm = { + ...existing, + ...(data.type !== undefined ? { type: data.type } : {}), + ...(data.model !== undefined ? { model: data.model } : {}), + ...(data.tier !== undefined ? { tier: data.tier } : {}), + ...(data.description !== undefined ? { description: data.description } : {}), + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), + }; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; +} + +function fakeSession(): VirtualSessionHandle & { tasks: Array; alive: boolean } { + const tasks: unknown[] = []; + return { + tasks, + alive: true, + pushTask(t) { tasks.push(t); }, + }; +} + +describe('VirtualLlmService', () => { + it('register inserts new virtual rows with active status + sessionId', async () => { + const repo = mockRepo(); + const svc = new VirtualLlmService(repo); + const { providerSessionId, llms } = await svc.register({ + providerSessionId: null, + providers: [ + { name: 'vllm-local', type: 'openai', model: 'Qwen/Qwen2.5-7B-Instruct-AWQ', tier: 'fast' }, + ], + }); + expect(providerSessionId).toMatch(/^[0-9a-f-]{36}$/); + expect(llms).toHaveLength(1); + expect(llms[0]!.kind).toBe('virtual'); + expect(llms[0]!.status).toBe('active'); + expect(llms[0]!.providerSessionId).toBe(providerSessionId); + expect(llms[0]!.lastHeartbeatAt).not.toBeNull(); + }); + + it('register reuses the same row on sticky reconnect (same name + sessionId)', async () => { + const repo = mockRepo(); + const svc = new VirtualLlmService(repo); + const first = await svc.register({ + providerSessionId: 'fixed-id', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + }); + expect(first.llms[0]!.id).toMatch(/^llm-/); + const firstId = first.llms[0]!.id; + + const second = await svc.register({ + providerSessionId: 'fixed-id', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm-updated' }], + }); + expect(second.llms[0]!.id).toBe(firstId); + expect(second.llms[0]!.model).toBe('m-updated'); + }); + + it('register refuses to overwrite a public LLM with the same name', async () => { + const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); + const svc = new VirtualLlmService(repo); + await expect(svc.register({ + providerSessionId: 'sess-x', + providers: [{ name: 'qwen3-thinking', type: 'openai', model: 'm' }], + })).rejects.toThrow(/Cannot publish over public/); + }); + + it('register refuses if another active session owns the name', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'other', status: 'active' })]); + const svc = new VirtualLlmService(repo); + await expect(svc.register({ + providerSessionId: 'mine', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + })).rejects.toThrow(/already active under a different session/); + }); + + it('register adopts an inactive virtual row from a different session (sticky reconnect after lapse)', async () => { + const repo = mockRepo([makeLlm({ + name: 'vllm-local', providerSessionId: 'old-session', + status: 'inactive', inactiveSince: new Date(), + })]); + const svc = new VirtualLlmService(repo); + const { llms } = await svc.register({ + providerSessionId: 'new-session', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm' }], + }); + expect(llms[0]!.providerSessionId).toBe('new-session'); + expect(llms[0]!.status).toBe('active'); + expect(llms[0]!.inactiveSince).toBeNull(); + }); + + it('heartbeat bumps lastHeartbeatAt + revives an inactive row', async () => { + const past = new Date(Date.now() - 5_000); + const repo = mockRepo([makeLlm({ + name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', + lastHeartbeatAt: past, inactiveSince: past, + })]); + const svc = new VirtualLlmService(repo); + await svc.heartbeat('sess'); + const row = await repo.findByName('vllm-local'); + expect(row?.status).toBe('active'); + expect(row?.inactiveSince).toBeNull(); + expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); + + it('unbindSession flips all owned rows to inactive immediately', async () => { + const repo = mockRepo([ + makeLlm({ name: 'a', providerSessionId: 'sess' }), + makeLlm({ name: 'b', providerSessionId: 'sess' }), + makeLlm({ name: 'c', providerSessionId: 'other' }), + ]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + await svc.unbindSession('sess'); + expect((await repo.findByName('a'))?.status).toBe('inactive'); + expect((await repo.findByName('b'))?.status).toBe('inactive'); + expect((await repo.findByName('c'))?.status).toBe('active'); + }); + + it('enqueueInferTask pushes a task frame to the SSE session', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + const session = fakeSession(); + svc.bindSession('sess', session); + + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(session.tasks).toHaveLength(1); + const t = session.tasks[0] as { kind: string; taskId: string; llmName: string; streaming: boolean }; + expect(t.kind).toBe('infer'); + expect(t.taskId).toBe(ref.taskId); + expect(t.llmName).toBe('vllm-local'); + expect(t.streaming).toBe(false); + }); + + it('enqueueInferTask rejects when the publisher is offline (no session bound)', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + await expect( + svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/no live SSE session|publisher offline/); + }); + + it('enqueueInferTask rejects when the row is inactive', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + await expect( + svc.enqueueInferTask('vllm-local', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/inactive|publisher offline/); + }); + + it('enqueueInferTask rejects when the LLM is public (not virtual)', async () => { + const repo = mockRepo([makeLlm({ name: 'qwen3-thinking', kind: 'public', providerSessionId: null })]); + const svc = new VirtualLlmService(repo); + await expect( + svc.enqueueInferTask('qwen3-thinking', { model: 'm', messages: [] }, false), + ).rejects.toThrow(/not a virtual provider/); + }); + + it('completeTask resolves the pending non-streaming promise', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(svc.completeTask(ref.taskId, { status: 200, body: { ok: true } })).toBe(true); + await expect(ref.done).resolves.toEqual({ status: 200, body: { ok: true } }); + }); + + it('streaming: pushTaskChunk fans chunks to subscribers; done resolves the ref', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }], stream: true }, + true, + ); + const got: Array<{ data: string; done?: boolean }> = []; + ref.onChunk((c) => got.push(c)); + + expect(svc.pushTaskChunk(ref.taskId, { data: 'hello' })).toBe(true); + expect(svc.pushTaskChunk(ref.taskId, { data: ' world' })).toBe(true); + expect(svc.pushTaskChunk(ref.taskId, { data: '[DONE]', done: true })).toBe(true); + + expect(got.map((c) => c.data)).toEqual(['hello', ' world', '[DONE]']); + await expect(ref.done).resolves.toMatchObject({ status: 200 }); + }); + + it('failTask rejects the pending promise with a clear error', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + expect(svc.failTask(ref.taskId, new Error('upstream blew up'))).toBe(true); + await expect(ref.done).rejects.toThrow(/upstream blew up/); + }); + + it('unbindSession rejects in-flight tasks for that session', async () => { + const repo = mockRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const svc = new VirtualLlmService(repo); + svc.bindSession('sess', fakeSession()); + const ref = await svc.enqueueInferTask( + 'vllm-local', + { model: 'm', messages: [{ role: 'user', content: 'hi' }] }, + false, + ); + await svc.unbindSession('sess'); + await expect(ref.done).rejects.toThrow(/publisher disconnected/); + }); + + it('gcSweep flips heartbeat-stale active virtuals to inactive', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago — past the 90-s cutoff + const recent = new Date(Date.now() - 30 * 1000); // 30 s ago — within the cutoff + const repo = mockRepo([ + makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + makeLlm({ name: 'fresh', providerSessionId: 'b', status: 'active', lastHeartbeatAt: recent }), + ]); + const svc = new VirtualLlmService(repo); + const result = await svc.gcSweep(); + expect(result.markedInactive).toBe(1); + expect((await repo.findByName('stale'))?.status).toBe('inactive'); + expect((await repo.findByName('fresh'))?.status).toBe('active'); + }); + + it('gcSweep deletes virtuals inactive past the 4h retention window', async () => { + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago + const fresh = new Date(Date.now() - 1 * 60 * 60 * 1000); // 1 h ago + const repo = mockRepo([ + makeLlm({ name: 'old', providerSessionId: 'a', status: 'inactive', inactiveSince: ancient }), + makeLlm({ name: 'recent', providerSessionId: 'b', status: 'inactive', inactiveSince: fresh }), + makeLlm({ name: 'public-survivor', providerSessionId: null, kind: 'public' }), + ]); + const svc = new VirtualLlmService(repo); + const result = await svc.gcSweep(); + expect(result.deleted).toBe(1); + expect(await repo.findByName('old')).toBeNull(); + expect(await repo.findByName('recent')).not.toBeNull(); + expect(await repo.findByName('public-survivor')).not.toBeNull(); + }); + + it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); + const repo = mockRepo([ + makeLlm({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + ]); + const svc = new VirtualLlmService(repo); + const first = await svc.gcSweep(); + const second = await svc.gcSweep(); + expect(first.markedInactive).toBe(1); + expect(second.markedInactive).toBe(0); + expect(second.deleted).toBe(0); + }); +}); diff --git a/src/mcplocal/src/http/config.ts b/src/mcplocal/src/http/config.ts index 9be7fb4..d70f9c6 100644 --- a/src/mcplocal/src/http/config.ts +++ b/src/mcplocal/src/http/config.ts @@ -72,6 +72,14 @@ export interface LlmProviderFileEntry { * itself can't be reached). */ failoverFor?: string; + /** + * Opt in to publishing this local provider into mcpd as a virtual Llm + * row (see docs/virtual-llms.md). When true, the mcplocal registrar + * announces the provider to mcpd at startup, opens an SSE control + * channel, and relays inference tasks back to this local provider. + * Default: false — existing setups don't change behavior. + */ + publish?: boolean; } export interface ProjectLlmOverride { diff --git a/src/mcplocal/src/main.ts b/src/mcplocal/src/main.ts index f9e78e8..df631a9 100644 --- a/src/mcplocal/src/main.ts +++ b/src/mcplocal/src/main.ts @@ -8,11 +8,15 @@ import { StdioUpstream } from './upstream/stdio.js'; import { HttpUpstream } from './upstream/http.js'; import { createHttpServer } from './http/server.js'; import { loadHttpConfig, loadLlmProviders } from './http/config.js'; -import type { HttpConfig } from './http/config.js'; +import type { HttpConfig, LlmProviderFileEntry } from './http/config.js'; import { createProvidersFromConfig } from './llm-config.js'; import { createSecretStore } from '@mcpctl/shared'; import type { ProviderRegistry } from './providers/registry.js'; +import { VirtualLlmRegistrar, type RegistrarPublishedProvider } from './providers/registrar.js'; import { startWatchers, stopWatchers, reloadStages } from './proxymodel/watcher.js'; +import { existsSync, readFileSync as readFileSyncNs } from 'node:fs'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; interface ParsedArgs { configPath: string | undefined; @@ -144,6 +148,11 @@ export async function main(argv: string[] = process.argv): Promise { await reloadStages(); startWatchers(); + // Virtual-LLM registrar: publish opted-in providers (`publish: true`) + // into mcpd's Llm registry. Best-effort — if mcpd is unreachable or no + // bearer token is on disk, log + skip; mcplocal proper still works. + const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries); + // Graceful shutdown let shuttingDown = false; const shutdown = async () => { @@ -151,6 +160,7 @@ export async function main(argv: string[] = process.argv): Promise { shuttingDown = true; stopWatchers(); + registrar?.stop(); providerRegistry.disposeAll(); server.stop(); if (httpServer) { @@ -177,3 +187,76 @@ if (isMain) { process.exit(1); }); } + +/** + * Start the virtual-LLM registrar if any local provider has `publish: true` + * AND we have a bearer token on disk. Returns the registrar instance for + * lifecycle teardown, or null when nothing was started. All failure paths + * log a warning and resolve null — the registrar is best-effort, not a + * boot blocker. + */ +async function maybeStartVirtualLlmRegistrar( + providerRegistry: ProviderRegistry, + llmEntries: LlmProviderFileEntry[], +): Promise { + const opted = llmEntries.filter((e) => e.publish === true); + if (opted.length === 0) return null; + + const published: RegistrarPublishedProvider[] = []; + for (const entry of opted) { + const provider = providerRegistry.get(entry.name); + if (provider === undefined) { + process.stderr.write(`virtual-llm registrar: provider '${entry.name}' opted-in but not registered locally; skipping\n`); + continue; + } + const item: RegistrarPublishedProvider = { + provider, + type: entry.type, + model: entry.model ?? entry.name, + }; + if (entry.tier !== undefined) item.tier = entry.tier; + published.push(item); + } + if (published.length === 0) return null; + + // Resolve mcpd URL + bearer. Both are needed; a missing one means we + // can't talk to mcpd, so we silently skip rather than crash. + const credsPath = join(homedir(), '.mcpctl', 'credentials'); + if (!existsSync(credsPath)) { + process.stderr.write(`virtual-llm registrar: ~/.mcpctl/credentials missing — skipping (run \`mcpctl auth login\` to publish virtual LLMs)\n`); + return null; + } + let mcpdUrl: string; + let token: string; + try { + const parsed = JSON.parse(readFileSyncNs(credsPath, 'utf-8')) as { mcpdUrl?: string; token?: string }; + if (typeof parsed.mcpdUrl !== 'string' || typeof parsed.token !== 'string') { + process.stderr.write(`virtual-llm registrar: credentials missing mcpdUrl/token — skipping\n`); + return null; + } + mcpdUrl = parsed.mcpdUrl; + token = parsed.token; + } catch (err) { + process.stderr.write(`virtual-llm registrar: failed to read credentials: ${(err as Error).message}\n`); + return null; + } + + const registrar = new VirtualLlmRegistrar({ + mcpdUrl, + token, + publishedProviders: published, + sessionFilePath: join(homedir(), '.mcpctl', 'provider-session'), + log: { + info: (msg) => process.stderr.write(`${msg}\n`), + warn: (msg) => process.stderr.write(`${msg}\n`), + error: (msg) => process.stderr.write(`${msg}\n`), + }, + }); + try { + await registrar.start(); + } catch (err) { + process.stderr.write(`virtual-llm registrar: start failed: ${(err as Error).message}\n`); + return null; + } + return registrar; +} diff --git a/src/mcplocal/src/providers/registrar.ts b/src/mcplocal/src/providers/registrar.ts new file mode 100644 index 0000000..120e4c9 --- /dev/null +++ b/src/mcplocal/src/providers/registrar.ts @@ -0,0 +1,409 @@ +/** + * Virtual-LLM registrar — the mcplocal counterpart to mcpd's + * VirtualLlmService. + * + * Lifecycle: + * 1. start() called on mcplocal boot. If no providers are opted-in + * (`publish: true` in config), do nothing. + * 2. POST /api/v1/llms/_provider-register with the publishable set; + * receive a stable providerSessionId. Persist it to + * `~/.mcpctl/provider-session` so reconnects after a crash/restart + * adopt the same row instead of orphaning it. + * 3. Open the SSE channel at /api/v1/llms/_provider-stream with the + * session id in `x-mcpctl-provider-session`. Listen for + * `event: task` frames; for each, call the local provider's + * `complete()` and POST the OpenAI-shaped result back. + * 4. Heartbeat every 30 s. If the SSE drops, exponential backoff + * reconnect from 5 s up to 60 s. + * 5. stop() destroys the SSE socket and clears the timer; mcpd's + * 90-s heartbeat watchdog will then flip our rows to inactive. + * + * v1 caveat: the LlmProvider abstraction returns a finalized + * CompletionResult, not a token stream. We therefore translate + * streaming requests into a single SSE chunk + [DONE]. Real + * per-token streaming is a v2 concern — see docs/virtual-llms.md. + */ +import http from 'node:http'; +import https from 'node:https'; +import { promises as fs } from 'node:fs'; +import { dirname } from 'node:path'; +import type { LlmProvider, CompletionOptions } from './types.js'; + +export interface RegistrarLogger { + info: (msg: string) => void; + warn: (msg: string) => void; + error: (msg: string) => void; +} + +export interface RegistrarPublishedProvider { + provider: LlmProvider; + /** mcpd-side `type` field (openai | anthropic | …). */ + type: string; + /** Model id surfaced under `mcpctl get llm`. */ + model: string; + /** Optional tier (default 'fast'). */ + tier?: 'fast' | 'heavy'; + /** Optional human-readable description for `mcpctl get llm`. */ + description?: string; +} + +export interface RegistrarOptions { + mcpdUrl: string; + token: string; + publishedProviders: RegistrarPublishedProvider[]; + /** Where to persist the providerSessionId so reconnects are sticky. */ + sessionFilePath: string; + log: RegistrarLogger; + /** Override knobs for tests (ms). */ + heartbeatIntervalMs?: number; + reconnectBaseMs?: number; + reconnectMaxMs?: number; +} + +const DEFAULT_HEARTBEAT_MS = 30_000; +const DEFAULT_RECONNECT_BASE_MS = 5_000; +const DEFAULT_RECONNECT_MAX_MS = 60_000; +const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; + +interface InferTask { + kind: 'infer'; + taskId: string; + llmName: string; + request: { + messages: Array<{ role: 'system' | 'user' | 'assistant' | 'tool'; content: string }>; + temperature?: number; + max_tokens?: number; + model?: string; + }; + streaming: boolean; +} + +type TaskFrame = InferTask | { kind: 'wake'; taskId: string; llmName: string }; + +export class VirtualLlmRegistrar { + private sessionId: string | null = null; + private heartbeatTimer: NodeJS.Timeout | null = null; + private sseRequest: http.ClientRequest | null = null; + private reconnectAttempt = 0; + private stopped = false; + + constructor(private readonly opts: RegistrarOptions) {} + + /** Bring the registrar online. Idempotent — calling twice is a no-op. */ + async start(): Promise { + if (this.opts.publishedProviders.length === 0) { + this.opts.log.info('virtual-llm registrar: nothing to publish (no provider has `publish: true`)'); + return; + } + this.stopped = false; + this.sessionId = await this.loadStickySessionId(); + await this.register(); + this.startHeartbeat(); + this.openSseStream(); + } + + /** Tear down the registrar. Safe to call multiple times. */ + stop(): void { + this.stopped = true; + if (this.heartbeatTimer !== null) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + if (this.sseRequest !== null) { + this.sseRequest.destroy(); + this.sseRequest = null; + } + } + + /** Test/inspection helper — avoids exposing internals broadly. */ + getSessionId(): string | null { + return this.sessionId; + } + + private async loadStickySessionId(): Promise { + try { + const raw = await fs.readFile(this.opts.sessionFilePath, 'utf-8'); + const id = raw.trim(); + return id === '' ? null : id; + } catch { + return null; + } + } + + private async saveStickySessionId(id: string): Promise { + try { + await fs.mkdir(dirname(this.opts.sessionFilePath), { recursive: true }); + await fs.writeFile(this.opts.sessionFilePath, id, 'utf-8'); + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: failed to persist session id: ${(err as Error).message}`); + } + } + + private async register(): Promise { + const body: Record = { + providers: this.opts.publishedProviders.map((p) => ({ + name: p.provider.name, + type: p.type, + model: p.model, + ...(p.tier !== undefined ? { tier: p.tier } : {}), + ...(p.description !== undefined ? { description: p.description } : {}), + })), + }; + if (this.sessionId !== null) body['providerSessionId'] = this.sessionId; + + const res = await postJson( + this.urlFor('/api/v1/llms/_provider-register'), + body, + this.opts.token, + ); + if (res.statusCode !== 201) { + throw new Error(`provider-register HTTP ${String(res.statusCode)}: ${res.body}`); + } + const parsed = JSON.parse(res.body) as { providerSessionId: string }; + this.sessionId = parsed.providerSessionId; + await this.saveStickySessionId(this.sessionId); + this.opts.log.info( + `virtual-llm registrar: published ${String(this.opts.publishedProviders.length)} provider(s); sessionId=${this.sessionId}`, + ); + } + + private startHeartbeat(): void { + const intervalMs = this.opts.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_MS; + this.heartbeatTimer = setInterval(() => { + if (this.sessionId === null) return; + void this.heartbeatOnce(); + }, intervalMs); + } + + private async heartbeatOnce(): Promise { + if (this.sessionId === null) return; + try { + const res = await postJson( + this.urlFor('/api/v1/llms/_provider-heartbeat'), + { providerSessionId: this.sessionId }, + this.opts.token, + ); + if (res.statusCode !== 200) { + this.opts.log.warn(`virtual-llm registrar: heartbeat HTTP ${String(res.statusCode)}: ${res.body}`); + } + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: heartbeat failed: ${(err as Error).message}`); + } + } + + private openSseStream(): void { + if (this.stopped || this.sessionId === null) return; + const url = new URL(this.urlFor('/api/v1/llms/_provider-stream')); + const driver = url.protocol === 'https:' ? https : http; + const req = driver.request( + { + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'GET', + headers: { + Accept: 'text/event-stream', + Authorization: `Bearer ${this.opts.token}`, + [PROVIDER_SESSION_HEADER]: this.sessionId, + }, + }, + (res) => { + if (res.statusCode !== 200) { + let body = ''; + res.on('data', (c: Buffer) => { body += c.toString('utf-8'); }); + res.on('end', () => { + this.opts.log.warn(`virtual-llm registrar: SSE HTTP ${String(res.statusCode ?? 0)}: ${body}`); + this.scheduleReconnect(); + }); + return; + } + this.reconnectAttempt = 0; + res.setEncoding('utf-8'); + let buf = ''; + res.on('data', (chunk: string) => { + buf += chunk; + let nl: number; + while ((nl = buf.indexOf('\n\n')) !== -1) { + const frame = buf.slice(0, nl); + buf = buf.slice(nl + 2); + this.handleSseFrame(frame); + } + }); + res.on('end', () => this.scheduleReconnect()); + res.on('error', (err) => { + this.opts.log.warn(`virtual-llm registrar: SSE response error: ${err.message}`); + this.scheduleReconnect(); + }); + }, + ); + req.on('error', (err) => { + this.opts.log.warn(`virtual-llm registrar: SSE request error: ${err.message}`); + this.scheduleReconnect(); + }); + req.end(); + this.sseRequest = req; + } + + private scheduleReconnect(): void { + this.sseRequest = null; + if (this.stopped) return; + const base = this.opts.reconnectBaseMs ?? DEFAULT_RECONNECT_BASE_MS; + const max = this.opts.reconnectMaxMs ?? DEFAULT_RECONNECT_MAX_MS; + const delay = Math.min(max, base * 2 ** this.reconnectAttempt); + this.reconnectAttempt += 1; + this.opts.log.info(`virtual-llm registrar: SSE disconnected, reconnecting in ${String(delay)} ms`); + setTimeout(() => { + if (!this.stopped) this.openSseStream(); + }, delay).unref(); + } + + private handleSseFrame(frame: string): void { + let event = 'message'; + let data = ''; + for (const line of frame.split('\n')) { + if (line.startsWith('event:')) event = line.slice(6).trim(); + else if (line.startsWith('data:')) data += line.slice(5).trim(); + } + if (event !== 'task' || data === '') return; + let task: TaskFrame; + try { + task = JSON.parse(data) as TaskFrame; + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: malformed task frame: ${(err as Error).message}`); + return; + } + if (task.kind === 'infer') { + void this.handleInferTask(task); + return; + } + // Wake tasks are reserved for v2 — acknowledge with an error so mcpd + // surfaces a clean failure rather than waiting forever. + void this.postResult(task.taskId, { error: 'wake task type not implemented in this client (v2)' }); + } + + private async handleInferTask(task: InferTask): Promise { + const published = this.opts.publishedProviders.find((p) => p.provider.name === task.llmName); + if (published === undefined) { + await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` }); + return; + } + try { + const completionOpts: CompletionOptions = { + messages: task.request.messages.map((m) => ({ role: m.role, content: m.content })), + }; + if (task.request.temperature !== undefined) completionOpts.temperature = task.request.temperature; + if (task.request.max_tokens !== undefined) completionOpts.maxTokens = task.request.max_tokens; + if (task.request.model !== undefined) completionOpts.model = task.request.model; + + const result = await published.provider.complete(completionOpts); + const completionBody = openAiCompletionEnvelope(published.model, result, task.request.model); + + if (task.streaming) { + // Single-chunk streaming for v1: emit one delta then a [DONE] marker. + // The server-side relay forwards both as SSE frames to the original + // caller. Real per-token streaming would require LlmProvider.stream(). + const deltaFrame = openAiStreamChunk(published.model, result, task.request.model); + await this.postResult(task.taskId, { chunk: { data: JSON.stringify(deltaFrame) } }); + await this.postResult(task.taskId, { chunk: { data: '[DONE]', done: true } }); + } else { + await this.postResult(task.taskId, { status: 200, body: completionBody }); + } + } catch (err) { + await this.postResult(task.taskId, { error: (err as Error).message }); + } + } + + private async postResult(taskId: string, body: unknown): Promise { + try { + await postJson( + this.urlFor(`/api/v1/llms/_provider-task/${encodeURIComponent(taskId)}/result`), + body as Record, + this.opts.token, + ); + } catch (err) { + this.opts.log.warn(`virtual-llm registrar: result POST failed: ${(err as Error).message}`); + } + } + + private urlFor(path: string): string { + return `${this.opts.mcpdUrl.replace(/\/$/, '')}${path}`; + } +} + +/** Wrap a CompletionResult in the OpenAI chat.completion envelope. */ +function openAiCompletionEnvelope( + modelFromConfig: string, + result: { content: string; finishReason: string; usage: { promptTokens: number; completionTokens: number; totalTokens: number } }, + modelFromRequest: string | undefined, +): unknown { + return { + id: `cmpl-${Math.random().toString(36).slice(2)}`, + object: 'chat.completion', + created: Math.floor(Date.now() / 1000), + model: modelFromRequest ?? modelFromConfig, + choices: [{ + index: 0, + message: { role: 'assistant', content: result.content }, + finish_reason: result.finishReason, + }], + usage: { + prompt_tokens: result.usage.promptTokens, + completion_tokens: result.usage.completionTokens, + total_tokens: result.usage.totalTokens, + }, + }; +} + +/** Single-chunk OpenAI chat.completion.chunk frame for v1 streaming. */ +function openAiStreamChunk( + modelFromConfig: string, + result: { content: string; finishReason: string }, + modelFromRequest: string | undefined, +): unknown { + return { + id: `chunk-${Math.random().toString(36).slice(2)}`, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: modelFromRequest ?? modelFromConfig, + choices: [{ + index: 0, + delta: { content: result.content }, + finish_reason: result.finishReason, + }], + }; +} + +interface PostResponse { statusCode: number; body: string } + +/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */ +function postJson(url: string, body: unknown, bearer: string): Promise { + return new Promise((resolve, reject) => { + const u = new URL(url); + const driver = u.protocol === 'https:' ? https : http; + const payload = JSON.stringify(body); + const req = driver.request( + { + hostname: u.hostname, + port: u.port || (u.protocol === 'https:' ? 443 : 80), + path: u.pathname + u.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${bearer}`, + 'Content-Length': String(Buffer.byteLength(payload)), + Accept: 'application/json', + }, + timeout: 30_000, + }, + (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => resolve({ statusCode: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') })); + }, + ); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error('request timed out')); }); + req.write(payload); + req.end(); + }); +} diff --git a/src/mcplocal/tests/registrar.test.ts b/src/mcplocal/tests/registrar.test.ts new file mode 100644 index 0000000..40e99ce --- /dev/null +++ b/src/mcplocal/tests/registrar.test.ts @@ -0,0 +1,244 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import http from 'node:http'; +import { mkdtempSync, rmSync, readFileSync, writeFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, +} from '../src/providers/registrar.js'; +import type { LlmProvider, CompletionOptions, CompletionResult } from '../src/providers/types.js'; + +/** + * The registrar talks HTTP. Spin a tiny in-process server in each test so + * we can assert what it sends without mocking node:http itself. + */ +interface FakeServer { + url: string; + close: () => Promise; + /** Calls observed in arrival order. */ + calls: Array<{ method: string; path: string; body: string; headers: Record }>; + /** + * Optional handler. If set, runs per-request and decides response. If not, + * defaults to 201 + JSON `{ providerSessionId: 'sess-FAKE' }` for register + * and 200 + `{}` for everything else. + */ + handler?: (req: http.IncomingMessage, res: http.ServerResponse, body: string) => void; +} + +async function startFakeServer(): Promise { + const calls: FakeServer['calls'] = []; + let server!: http.Server; + const ready = new Promise((resolve, reject) => { + server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', (c: Buffer) => chunks.push(c)); + req.on('end', () => { + const body = Buffer.concat(chunks).toString('utf-8'); + calls.push({ + method: req.method ?? '', + path: req.url ?? '', + body, + headers: req.headers, + }); + if (fake.handler !== undefined) { + fake.handler(req, res, body); + return; + } + if (req.url === '/api/v1/llms/_provider-register') { + res.writeHead(201, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ providerSessionId: 'sess-FAKE', llms: [] })); + return; + } + res.writeHead(200, { 'content-type': 'application/json' }); + res.end('{}'); + }); + }); + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + if (addr === null || typeof addr === 'string') { + reject(new Error('listen failed')); + return; + } + const fakeReady: FakeServer = { + url: `http://127.0.0.1:${String(addr.port)}`, + close: () => new Promise((r) => { server.close(() => r()); }), + calls, + }; + Object.assign(fake, fakeReady); + resolve(fake); + }); + }); + const fake: FakeServer = {} as FakeServer; + return ready; +} + +function makeProvider(name: string, content = 'hi from local'): LlmProvider { + return { + name, + async complete(_opts: CompletionOptions): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +let tempDir: string; + +beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-registrar-test-')); +}); + +afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); +}); + +function silentLog(): { info: ReturnType; warn: ReturnType; error: ReturnType } { + return { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; +} + +describe('VirtualLlmRegistrar', () => { + it('start() with no published providers is a silent no-op', async () => { + const log = silentLog(); + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: 'http://unreachable.example', + token: 'tok', + publishedProviders: [], + sessionFilePath: join(tempDir, 'provider-session'), + log, + }); + await registrar.start(); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining('nothing to publish')); + registrar.stop(); + }); + + it('register POSTs to /_provider-register and persists the returned sessionId', async () => { + const fake = await startFakeServer(); + try { + const sessionFilePath = join(tempDir, 'provider-session'); + const published: RegistrarPublishedProvider[] = [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'qwen', tier: 'fast' }, + ]; + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 'tok-abc', + publishedProviders: published, + sessionFilePath, + log: silentLog(), + // Make heartbeat huge so it doesn't fire mid-test. + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + // Allow the SSE open to enter flight (we never feed it a response, + // but the request fires synchronously after register). + await new Promise((r) => setTimeout(r, 20)); + + const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register'); + expect(registerCall).toBeDefined(); + expect(registerCall!.method).toBe('POST'); + const body = JSON.parse(registerCall!.body) as { providers: Array<{ name: string; type: string; model: string; tier: string }> }; + expect(body.providers).toEqual([{ + name: 'vllm-local', + type: 'openai', + model: 'qwen', + tier: 'fast', + }]); + expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc'); + + // Sticky session id persisted. + expect(readFileSync(sessionFilePath, 'utf-8').trim()).toBe('sess-FAKE'); + expect(registrar.getSessionId()).toBe('sess-FAKE'); + + registrar.stop(); + } finally { + await fake.close(); + } + }); + + it('reuses an existing sticky session id from disk on next start', async () => { + const fake = await startFakeServer(); + try { + const sessionFilePath = join(tempDir, 'provider-session'); + writeFileSync(sessionFilePath, 'sess-existing\n', 'utf-8'); + + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath, + log: silentLog(), + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + await new Promise((r) => setTimeout(r, 20)); + + const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register'); + const body = JSON.parse(registerCall!.body) as { providerSessionId?: string }; + expect(body.providerSessionId).toBe('sess-existing'); + + registrar.stop(); + } finally { + await fake.close(); + } + }); + + it('heartbeat ticker POSTs the session id at the configured interval', async () => { + const fake = await startFakeServer(); + try { + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath: join(tempDir, 'provider-session'), + log: silentLog(), + heartbeatIntervalMs: 30, // tight so the test doesn't drag + }); + await registrar.start(); + // Wait long enough for at least 2 heartbeats to fire. + await new Promise((r) => setTimeout(r, 100)); + registrar.stop(); + + const heartbeats = fake.calls.filter((c) => c.path === '/api/v1/llms/_provider-heartbeat'); + expect(heartbeats.length).toBeGreaterThanOrEqual(2); + for (const h of heartbeats) { + const body = JSON.parse(h.body) as { providerSessionId: string }; + expect(body.providerSessionId).toBe('sess-FAKE'); + } + } finally { + await fake.close(); + } + }); + + it('throws when mcpd returns non-201 from /_provider-register', async () => { + const fake = await startFakeServer(); + fake.handler = (_req, res, _body) => { + res.writeHead(409, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ error: 'Cannot publish over public LLM: vllm-local' })); + }; + try { + const registrar = new VirtualLlmRegistrar({ + mcpdUrl: fake.url, + token: 't', + publishedProviders: [ + { provider: makeProvider('vllm-local'), type: 'openai', model: 'm' }, + ], + sessionFilePath: join(tempDir, 'provider-session'), + log: silentLog(), + heartbeatIntervalMs: 60_000, + }); + await expect(registrar.start()).rejects.toThrow(/HTTP 409/); + } finally { + await fake.close(); + } + }); +}); diff --git a/src/mcplocal/tests/smoke/virtual-llm.smoke.test.ts b/src/mcplocal/tests/smoke/virtual-llm.smoke.test.ts new file mode 100644 index 0000000..1586a1a --- /dev/null +++ b/src/mcplocal/tests/smoke/virtual-llm.smoke.test.ts @@ -0,0 +1,209 @@ +/** + * Smoke tests: virtual-LLM register → infer relay → cleanup against a live + * mcpd. Uses an in-process LlmProvider (returns canned content) so we + * exercise the SSE control plane and the kind=virtual infer branch + * without depending on a real upstream model. + * + * The 90-s heartbeat-stale flip and 4-h auto-deletion are covered by unit + * tests (mcpd virtual-llm-service.test.ts); waiting > 90 s in smoke would + * triple the suite duration. + */ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import http from 'node:http'; +import https from 'node:https'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, +} from '../../src/providers/registrar.js'; +import type { LlmProvider, CompletionResult } from '../../src/providers/types.js'; + +const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; +const SUFFIX = Date.now().toString(36); +const PROVIDER_NAME = `smoke-virtual-${SUFFIX}`; + +function makeFakeProvider(name: string, content: string): LlmProvider { + return { + name, + async complete(): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +function healthz(url: string, timeoutMs = 5000): Promise { + return new Promise((resolve) => { + const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`); + const driver = parsed.protocol === 'https:' ? https : http; + const req = driver.get( + { + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname, + timeout: timeoutMs, + }, + (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); }, + ); + req.on('error', () => resolve(false)); + req.on('timeout', () => { req.destroy(); resolve(false); }); + }); +} + +function readToken(): string | null { + try { + const home = process.env.HOME ?? ''; + const path = `${home}/.mcpctl/credentials`; + // eslint-disable-next-line @typescript-eslint/no-require-imports + const fs = require('node:fs') as typeof import('node:fs'); + if (!fs.existsSync(path)) return null; + const raw = fs.readFileSync(path, 'utf-8'); + const parsed = JSON.parse(raw) as { token?: string }; + return parsed.token ?? null; + } catch { + return null; + } +} + +interface HttpResponse { status: number; body: string } + +function httpRequest(method: string, urlStr: string, body: unknown): Promise { + return new Promise((resolve, reject) => { + const tokenRaw = readToken(); + const parsed = new URL(urlStr); + const driver = parsed.protocol === 'https:' ? https : http; + const headers: Record = { + Accept: 'application/json', + ...(body !== undefined ? { 'Content-Type': 'application/json' } : {}), + ...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}), + }; + const req = driver.request({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname + parsed.search, + method, + headers, + timeout: 30_000, + }, (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => { + resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') }); + }); + }); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); }); + if (body !== undefined) req.write(JSON.stringify(body)); + req.end(); + }); +} + +let mcpdUp = false; +let registrar: VirtualLlmRegistrar | null = null; +let tempDir: string; + +describe('virtual-LLM smoke', () => { + beforeAll(async () => { + mcpdUp = await healthz(MCPD_URL); + if (!mcpdUp) { + // eslint-disable-next-line no-console + console.warn(`\n ○ virtual-llm smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`); + return; + } + if (readToken() === null) { + mcpdUp = false; + // eslint-disable-next-line no-console + console.warn('\n ○ virtual-llm smoke: skipped — no ~/.mcpctl/credentials.\n'); + return; + } + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-virtual-llm-smoke-')); + }, 20_000); + + afterAll(async () => { + if (registrar !== null) registrar.stop(); + if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true }); + // Best-effort cleanup of the row in case the disconnect didn't finish + // before mcpd's heartbeat watchdog ticks. Idempotent. + if (mcpdUp) { + const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + if (list.status === 200) { + const rows = JSON.parse(list.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === PROVIDER_NAME); + if (row !== undefined) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined); + } + } + } + }); + + it('registrar publishes the provider and mcpd lists it as kind=virtual / status=active', async () => { + if (!mcpdUp) return; + const token = readToken(); + if (token === null) return; + const published: RegistrarPublishedProvider[] = [ + { provider: makeFakeProvider(PROVIDER_NAME, 'hi from smoke'), type: 'openai', model: 'fake-smoke', tier: 'fast' }, + ]; + registrar = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: published, + sessionFilePath: join(tempDir, 'session'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + expect(registrar.getSessionId()).not.toBeNull(); + // Give the SSE handshake + register a moment to settle on mcpd's side. + await new Promise((r) => setTimeout(r, 400)); + + const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + expect(res.status).toBe(200); + const rows = JSON.parse(res.body) as Array<{ name: string; kind: string; status: string; type: string; model: string }>; + const row = rows.find((r) => r.name === PROVIDER_NAME); + expect(row, `${PROVIDER_NAME} must be present`).toBeDefined(); + expect(row!.kind).toBe('virtual'); + expect(row!.status).toBe('active'); + expect(row!.type).toBe('openai'); + expect(row!.model).toBe('fake-smoke'); + }, 30_000); + + it('mcpd routes /api/v1/llms//infer back through the SSE relay to the fake provider', async () => { + if (!mcpdUp) return; + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/llms/${PROVIDER_NAME}/infer`, { + messages: [{ role: 'user', content: 'say something' }], + }); + expect(res.status).toBe(200); + const body = JSON.parse(res.body) as { + choices?: Array<{ message?: { content?: string }; finish_reason?: string }>; + usage?: { total_tokens?: number }; + }; + expect(body.choices?.[0]?.message?.content).toBe('hi from smoke'); + expect(body.choices?.[0]?.finish_reason).toBe('stop'); + expect(body.usage?.total_tokens).toBe(5); + }, 30_000); + + it('returns 503 with a clear error when the publisher disconnects mid-session', async () => { + if (!mcpdUp) return; + if (registrar !== null) { + registrar.stop(); + registrar = null; + } + // Immediately after stop(), the SSE socket closes and mcpd's + // unbindSession flips the row to inactive. Inference should 503. + await new Promise((r) => setTimeout(r, 300)); + + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/llms/${PROVIDER_NAME}/infer`, { + messages: [{ role: 'user', content: 'still there?' }], + }); + expect(res.status).toBe(503); + expect(res.body).toMatch(/publisher offline|inactive/); + }, 30_000); +});