From 137711fdf6fb88cc6eeb64b16e3dbb1a2396cfc1 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 23:22:15 +0100 Subject: [PATCH] feat(docs+smoke): LB pool live smoke + virtual-llms.md pool semantics (v4 Stage 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Smoke (tests/smoke/llm-pool.smoke.test.ts): two in-process registrars publish virtual Llms with distinct names but a shared poolName, then: 1. /api/v1/llms//members surfaces both with the correct effective pool key, size, activeCount, and per-member kind/status. 2. Chat through an agent pinned to one pool member dispatches across the pool — verified by running 12 calls and asserting at least one response from each backend (the random-shuffle selection would have to hit only-A or only-B in 12 fair coin flips, ~1/2048). 3. Failover: stop one publisher, the surviving member still serves chat. /members shows the stopped row as inactive immediately (unbindSession runs synchronously on SSE close). docs/virtual-llms.md gets a full "LB pools (v4)" section with the two-field schema model, dispatcher selection + failover semantics, public + virtual declaration examples, list/describe rendering, the "pin to specific instance" escape hatch, and an API surface entry for /members. docs/agents.md cross-link extended. Tests: full smoke 144/144 (was 141, +3 for the new pool smoke). Stages 1-3 ship the complete v4 — public and virtual Llms can both join pools, agents transparently load-balance across them, yaml round-trip preserves poolName, and the existing single-Llm world keeps working byte-identically when poolName is null. --- docs/agents.md | 5 +- docs/virtual-llms.md | 150 +++++++++- .../tests/smoke/llm-pool.smoke.test.ts | 282 ++++++++++++++++++ 3 files changed, 432 insertions(+), 5 deletions(-) create mode 100644 src/mcplocal/tests/smoke/llm-pool.smoke.test.ts diff --git a/docs/agents.md b/docs/agents.md index 462dda2..b8c1bdb 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -208,5 +208,8 @@ mcpctl chat reviewer extends the same publishing model to **virtual agents** declared in mcplocal config — they show up in `mcpctl get agent` with `KIND=virtual / STATUS=active` and become chat-able via - `mcpctl chat ` like any other agent. + `mcpctl chat ` like any other agent. **v4** adds **pools**: + Llms sharing a `poolName` stack into one load-balanced pool that + the chat dispatcher transparently widens to at request time, with + random selection + sequential failover on transport errors. - [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags. diff --git a/docs/virtual-llms.md b/docs/virtual-llms.md index 25dc089..6257d83 100644 --- a/docs/virtual-llms.md +++ b/docs/virtual-llms.md @@ -278,10 +278,148 @@ when the agent's pinned Llm is virtual. same agent name collide on the second register with HTTP 409. Per-publisher namespacing is a v4+ concern — same constraint as virtual Llms in v1. +## LB pools (v4) + +Two or more `Llm` rows that share a `poolName` stack into one +load-balanced pool. Agents pin to a single Llm by id; the chat +dispatcher transparently widens to "all healthy Llms with the same +effective pool key" at request time and picks one. There is no new +`LlmPool` resource — `poolName` is just an optional column on `Llm`, +so RBAC, listing, yaml round-trip, and apply all work the same way +they did pre-v4. + +### Pool semantics + +| Field | Behavior | +|---|---| +| `Llm.name` | Globally unique (unchanged). The apply key. | +| `Llm.poolName` | Optional. When set, declares membership. When NULL, falls back to `name` ("solo Llm, pool of 1"). | + +Effective pool key = `poolName ?? name`. The dispatcher's lookup is: + +```sql +SELECT * FROM Llm +WHERE poolName = $1 OR (poolName IS NULL AND name = $1) +``` + +So a solo Llm whose `name` happens to equal an explicit `poolName` +joins that pool — by design, an existing single-row Llm can be +promoted to "pool seed" without a rename or migration. + +### Selection + failover + +- **Selection**: random shuffle of all members whose `status` is + `active` (or `hibernating` — VirtualLlmService handles wake on + dispatch). `inactive` members are skipped. +- **Failover** (non-streaming): if dispatch throws on the first + candidate (transport failure, virtual publisher disconnect), the + dispatcher iterates the rest of the shuffled list until one + succeeds or the list is exhausted. Auth/4xx responses are NOT + retried — siblings with the same key/model would fail identically. +- **Failover** (streaming): only covers "couldn't establish stream" + failures (transport error before any chunk yielded). Once any + output has been streamed, we're committed to that backend. + +### Declaring a pool + +#### Public Llms + +```bash +mcpctl create llm prod-qwen-1 --type openai --model qwen3-thinking \ + --url https://prod-1.example.com --pool-name qwen-pool \ + --api-key-ref qwen-key/API_KEY + +mcpctl create llm prod-qwen-2 --type openai --model qwen3-thinking \ + --url https://prod-2.example.com --pool-name qwen-pool \ + --api-key-ref qwen-key/API_KEY +``` + +Or via apply (yaml round-trip preserves `poolName`): + +```yaml +--- +kind: llm +name: prod-qwen-1 +type: openai +model: qwen3-thinking +url: https://prod-1.example.com +poolName: qwen-pool +apiKeyRef: { name: qwen-key, key: API_KEY } +--- +kind: llm +name: prod-qwen-2 +type: openai +model: qwen3-thinking +url: https://prod-2.example.com +poolName: qwen-pool +apiKeyRef: { name: qwen-key, key: API_KEY } +``` + +#### Virtual Llms (mcplocal-published) + +```jsonc +// ~/.mcpctl/config.json +{ + "llm": { + "providers": [ + { + "name": "vllm-alice-qwen3", // unique per publisher + "type": "vllm-managed", + "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", + "venvPath": "~/vllm_env", + "publish": true, + "poolName": "user-vllm-qwen3-thinking" // shared pool key + } + ] + } +} +``` + +Each user's mcplocal picks a unique `name` (e.g. include the hostname +to guarantee no collisions) but shares the `poolName`. Agents pinned +to any single member — or to `qwen3-thinking` (the public LiteLLM +endpoint, also given `poolName: user-vllm-qwen3-thinking` if mixing +public + virtual is desired) — see one logical pool that auto-grows +as more workers come online. + +### Listing + describe + +The `mcpctl get llm` table has a `POOL` column right after `NAME`. +Solo rows render as `-`; pool members show their explicit pool key: + +``` +NAME POOL KIND STATUS TYPE MODEL ID +qwen3-thinking - public active openai qwen3-thinking cmo... +prod-qwen-1 qwen-pool public active openai qwen3-thinking cmo... +prod-qwen-2 qwen-pool public active openai qwen3-thinking cmo... +``` + +`mcpctl describe llm ` adds a `Pool:` block at the top when the +row is in an explicit pool OR when its implicit pool has size > 1: + +``` +Pool: + Pool name: qwen-pool + Members: 2 (2 active) + - prod-qwen-1 [public/active] ← this row + - prod-qwen-2 [public/active] +``` + +`GET /api/v1/llms//members` is the API surface — returns full +`LlmView`s for every member plus aggregate `size` / `activeCount` so +operator tooling doesn't need a second roundtrip. + +### Pinning to a specific instance + +To pin an agent to one specific instance (e.g. for debugging, +RBAC-scoped routing, or "this agent must hit this model with this +key"), give that instance a unique name and leave its `poolName` +unset. The agent's pool is then size 1 and dispatch is deterministic. +Pool membership is opt-in via `poolName` — the default behavior is +single-Llm. + ## Roadmap (later stages) -- **v4 — LB pool by model**: agents can target a model name instead of - a specific Llm; mcpd picks the healthiest pool member per request. - **v5 — Task queue**: persisted requests for hibernating/saturated pools. Workers pull tasks of their model when they come online. @@ -301,8 +439,12 @@ POST /api/v1/llms/_provider-task/:id/result { chunk: { data, done? } } { status, body } -GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince) -POST /api/v1/llms//infer → routes through the SSE relay +GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince, poolName) +GET /api/v1/llms/ → single Llm row (also accepts a CUID id) +GET /api/v1/llms//members → v4: pool members for the effective pool key: + { poolName, explicitPoolName, size, activeCount, members[] } +POST /api/v1/llms//infer → routes through the SSE relay (v4: dispatcher + also expands by poolName when set) DELETE /api/v1/llms/ → delete unconditionally (also runs GC's job) GET /api/v1/agents → list (v3: includes kind, status, lastHeartbeatAt, inactiveSince) ``` diff --git a/src/mcplocal/tests/smoke/llm-pool.smoke.test.ts b/src/mcplocal/tests/smoke/llm-pool.smoke.test.ts new file mode 100644 index 0000000..61abdfc --- /dev/null +++ b/src/mcplocal/tests/smoke/llm-pool.smoke.test.ts @@ -0,0 +1,282 @@ +/** + * v4 smoke: LB pool by shared `poolName`. Spins up two in-process + * registrars publishing virtual Llms with distinct `name`s but a + * shared `poolName`. Verifies: + * + * - both rows show in `GET /api/v1/llms//members` with the + * correct effective pool key + active count. + * - chat through an agent pinned to one pool member dispatches + * across the pool (proven by the second member's content showing + * up at least once across N calls). + * - failover: stop one publisher, chat continues to work via the + * surviving member. + * + * Lifecycle (heartbeat-stale, 4 h GC) is unit-test territory; smoke + * just covers the path the operator-facing flow runs through. + */ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import http from 'node:http'; +import https from 'node:https'; +import { mkdtempSync, rmSync, readFileSync, existsSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, + type RegistrarPublishedAgent, +} from '../../src/providers/registrar.js'; +import type { LlmProvider, CompletionResult } from '../../src/providers/types.js'; + +const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; +const SUFFIX = Date.now().toString(36); +const POOL_NAME = `smoke-pool-${SUFFIX}`; +const PROVIDER_A = `smoke-pool-a-${SUFFIX}`; +const PROVIDER_B = `smoke-pool-b-${SUFFIX}`; +const AGENT_NAME = `smoke-pool-agent-${SUFFIX}`; + +function makeFakeProvider(name: string, content: string): LlmProvider { + return { + name, + async complete(): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +function healthz(url: string, timeoutMs = 5000): Promise { + return new Promise((resolve) => { + const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`); + const driver = parsed.protocol === 'https:' ? https : http; + const req = driver.get({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname, + timeout: timeoutMs, + }, (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); }); + req.on('error', () => resolve(false)); + req.on('timeout', () => { req.destroy(); resolve(false); }); + }); +} + +function readToken(): string | null { + try { + const path = join(process.env.HOME ?? '', '.mcpctl', 'credentials'); + if (!existsSync(path)) return null; + const parsed = JSON.parse(readFileSync(path, 'utf-8')) as { token?: string }; + return parsed.token ?? null; + } catch { + return null; + } +} + +interface HttpResponse { status: number; body: string } + +function httpRequest(method: string, urlStr: string, body: unknown): Promise { + return new Promise((resolve, reject) => { + const tokenRaw = readToken(); + const parsed = new URL(urlStr); + const driver = parsed.protocol === 'https:' ? https : http; + const headers: Record = { + Accept: 'application/json', + ...(body !== undefined ? { 'Content-Type': 'application/json' } : {}), + ...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}), + }; + const req = driver.request({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname + parsed.search, + method, + headers, + timeout: 30_000, + }, (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => { + resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') }); + }); + }); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); }); + if (body !== undefined) req.write(JSON.stringify(body)); + req.end(); + }); +} + +let mcpdUp = false; +let registrarA: VirtualLlmRegistrar | null = null; +let registrarB: VirtualLlmRegistrar | null = null; +let tempDir: string; + +describe('llm-pool smoke (v4)', () => { + beforeAll(async () => { + mcpdUp = await healthz(MCPD_URL); + if (!mcpdUp) { + // eslint-disable-next-line no-console + console.warn(`\n ○ llm-pool smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`); + return; + } + if (readToken() === null) { + mcpdUp = false; + // eslint-disable-next-line no-console + console.warn('\n ○ llm-pool smoke: skipped — no ~/.mcpctl/credentials.\n'); + return; + } + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-llm-pool-smoke-')); + }, 20_000); + + afterAll(async () => { + if (registrarA !== null) registrarA.stop(); + if (registrarB !== null) registrarB.stop(); + if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true }); + if (!mcpdUp) return; + // Best-effort cleanup. Agent first (Restrict FK), then both Llms. + const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + if (agents.status === 200) { + const rows = JSON.parse(agents.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === AGENT_NAME); + if (row !== undefined) await httpRequest('DELETE', `${MCPD_URL}/api/v1/agents/${row.id}`, undefined); + } + const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + if (llms.status === 200) { + const rows = JSON.parse(llms.body) as Array<{ id: string; name: string }>; + for (const r of rows) { + if (r.name === PROVIDER_A || r.name === PROVIDER_B) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${r.id}`, undefined); + } + } + } + }); + + it('two publishers with shared poolName show up in /api/v1/llms//members', async () => { + if (!mcpdUp) return; + const token = readToken(); + if (token === null) return; + + // Publisher A — also publishes the agent so we can chat through the pool. + const pubA: RegistrarPublishedProvider = { + provider: makeFakeProvider(PROVIDER_A, 'reply from A'), + type: 'openai', + model: 'fake-pool', + poolName: POOL_NAME, + }; + const pubAgent: RegistrarPublishedAgent = { + name: AGENT_NAME, + // Agent pins to publisher A specifically — pool dispatch then widens + // at chat time. Demonstrates the v4 transparency: pinning to one + // member implicitly opts the agent into the whole pool. + llmName: PROVIDER_A, + description: 'v4 pool smoke', + systemPrompt: 'Reply with whatever the backend returns.', + }; + registrarA = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: [pubA], + publishedAgents: [pubAgent], + sessionFilePath: join(tempDir, 'session-a'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrarA.start(); + + // Publisher B — same poolName, different name. No agent. + const pubB: RegistrarPublishedProvider = { + provider: makeFakeProvider(PROVIDER_B, 'reply from B'), + type: 'openai', + model: 'fake-pool', + poolName: POOL_NAME, + }; + registrarB = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: [pubB], + sessionFilePath: join(tempDir, 'session-b'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrarB.start(); + + // Let both registers settle on mcpd's side. + await new Promise((r) => setTimeout(r, 600)); + + // Hit the new /members endpoint via either pool member's name. + const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms/${PROVIDER_A}/members`, undefined); + expect(res.status).toBe(200); + const body = JSON.parse(res.body) as { + poolName: string; + explicitPoolName: string | null; + size: number; + activeCount: number; + members: Array<{ name: string; poolName: string | null; status: string }>; + }; + expect(body.poolName).toBe(POOL_NAME); + expect(body.explicitPoolName).toBe(POOL_NAME); + expect(body.size).toBe(2); + expect(body.activeCount).toBe(2); + const names = body.members.map((m) => m.name).sort(); + expect(names).toEqual([PROVIDER_A, PROVIDER_B].sort()); + for (const m of body.members) { + expect(m.poolName).toBe(POOL_NAME); + } + }, 30_000); + + it('chat through the agent dispatches across both pool members over multiple calls', async () => { + if (!mcpdUp) return; + // The chat dispatcher randomly shuffles candidates per call. Run + // enough turns that hitting only one member would be statistically + // suspicious (P(hit only A or only B) over 12 calls is ~1/2048 if the + // shuffle is fair). We assert >= one of each. + const seen = new Set(); + for (let i = 0; i < 12; i += 1) { + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/agents/${AGENT_NAME}/chat`, { + message: `ping ${String(i)}`, + stream: false, + }); + expect(res.status, res.body).toBe(200); + const body = JSON.parse(res.body) as { assistant: string }; + seen.add(body.assistant); + if (seen.has('reply from A') && seen.has('reply from B')) break; + } + expect(seen.has('reply from A'), `pool dispatch should have hit A at least once; saw: ${[...seen].join(', ')}`).toBe(true); + expect(seen.has('reply from B'), `pool dispatch should have hit B at least once; saw: ${[...seen].join(', ')}`).toBe(true); + }, 90_000); + + it('failover: stop one publisher, chat still succeeds via the surviving member', async () => { + if (!mcpdUp) return; + // Stop publisher B. mcpd's unbindSession flips B's row to inactive + // synchronously on SSE close, so the next chat's pool resolution + // skips it. + if (registrarB !== null) { + registrarB.stop(); + registrarB = null; + } + await new Promise((r) => setTimeout(r, 400)); + + // Confirm B is inactive in /members. + const members = await httpRequest('GET', `${MCPD_URL}/api/v1/llms/${PROVIDER_A}/members`, undefined); + expect(members.status).toBe(200); + const body = JSON.parse(members.body) as { + members: Array<{ name: string; status: string }>; + }; + const memB = body.members.find((m) => m.name === PROVIDER_B); + expect(memB?.status).toBe('inactive'); + + // Chat continues to work — only A responds now. + for (let i = 0; i < 3; i += 1) { + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/agents/${AGENT_NAME}/chat`, { + message: `post-failover ping ${String(i)}`, + stream: false, + }); + expect(res.status, res.body).toBe(200); + const out = JSON.parse(res.body) as { assistant: string }; + expect(out.assistant).toBe('reply from A'); + } + }, 30_000); +});