feat(docs+smoke): LB pool live smoke + virtual-llms.md pool semantics (v4 Stage 3)
Some checks failed
CI/CD / lint (pull_request) Successful in 53s
CI/CD / test (pull_request) Successful in 1m8s
CI/CD / typecheck (pull_request) Successful in 2m53s
CI/CD / smoke (pull_request) Failing after 1m47s
CI/CD / build (pull_request) Successful in 6m20s
CI/CD / publish (pull_request) Has been skipped
Some checks failed
CI/CD / lint (pull_request) Successful in 53s
CI/CD / test (pull_request) Successful in 1m8s
CI/CD / typecheck (pull_request) Successful in 2m53s
CI/CD / smoke (pull_request) Failing after 1m47s
CI/CD / build (pull_request) Successful in 6m20s
CI/CD / publish (pull_request) Has been skipped
Smoke (tests/smoke/llm-pool.smoke.test.ts): two in-process registrars
publish virtual Llms with distinct names but a shared poolName, then:
1. /api/v1/llms/<name>/members surfaces both with the correct
effective pool key, size, activeCount, and per-member kind/status.
2. Chat through an agent pinned to one pool member dispatches across
the pool — verified by running 12 calls and asserting at least
one response from each backend (the random-shuffle selection
would have to hit only-A or only-B in 12 fair coin flips, ~1/2048).
3. Failover: stop one publisher, the surviving member still serves
chat. /members shows the stopped row as inactive immediately
(unbindSession runs synchronously on SSE close).
docs/virtual-llms.md gets a full "LB pools (v4)" section with the
two-field schema model, dispatcher selection + failover semantics,
public + virtual declaration examples, list/describe rendering, the
"pin to specific instance" escape hatch, and an API surface entry
for /members. docs/agents.md cross-link extended.
Tests: full smoke 144/144 (was 141, +3 for the new pool smoke).
Stages 1-3 ship the complete v4 — public and virtual Llms can both
join pools, agents transparently load-balance across them, yaml
round-trip preserves poolName, and the existing single-Llm world
keeps working byte-identically when poolName is null.
This commit is contained in:
282
src/mcplocal/tests/smoke/llm-pool.smoke.test.ts
Normal file
282
src/mcplocal/tests/smoke/llm-pool.smoke.test.ts
Normal file
@@ -0,0 +1,282 @@
|
||||
/**
|
||||
* v4 smoke: LB pool by shared `poolName`. Spins up two in-process
|
||||
* registrars publishing virtual Llms with distinct `name`s but a
|
||||
* shared `poolName`. Verifies:
|
||||
*
|
||||
* - both rows show in `GET /api/v1/llms/<name>/members` with the
|
||||
* correct effective pool key + active count.
|
||||
* - chat through an agent pinned to one pool member dispatches
|
||||
* across the pool (proven by the second member's content showing
|
||||
* up at least once across N calls).
|
||||
* - failover: stop one publisher, chat continues to work via the
|
||||
* surviving member.
|
||||
*
|
||||
* Lifecycle (heartbeat-stale, 4 h GC) is unit-test territory; smoke
|
||||
* just covers the path the operator-facing flow runs through.
|
||||
*/
|
||||
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
|
||||
import http from 'node:http';
|
||||
import https from 'node:https';
|
||||
import { mkdtempSync, rmSync, readFileSync, existsSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import {
|
||||
VirtualLlmRegistrar,
|
||||
type RegistrarPublishedProvider,
|
||||
type RegistrarPublishedAgent,
|
||||
} from '../../src/providers/registrar.js';
|
||||
import type { LlmProvider, CompletionResult } from '../../src/providers/types.js';
|
||||
|
||||
const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu';
|
||||
const SUFFIX = Date.now().toString(36);
|
||||
const POOL_NAME = `smoke-pool-${SUFFIX}`;
|
||||
const PROVIDER_A = `smoke-pool-a-${SUFFIX}`;
|
||||
const PROVIDER_B = `smoke-pool-b-${SUFFIX}`;
|
||||
const AGENT_NAME = `smoke-pool-agent-${SUFFIX}`;
|
||||
|
||||
function makeFakeProvider(name: string, content: string): LlmProvider {
|
||||
return {
|
||||
name,
|
||||
async complete(): Promise<CompletionResult> {
|
||||
return {
|
||||
content,
|
||||
toolCalls: [],
|
||||
usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 },
|
||||
finishReason: 'stop',
|
||||
};
|
||||
},
|
||||
async listModels() { return []; },
|
||||
async isAvailable() { return true; },
|
||||
};
|
||||
}
|
||||
|
||||
function healthz(url: string, timeoutMs = 5000): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`);
|
||||
const driver = parsed.protocol === 'https:' ? https : http;
|
||||
const req = driver.get({
|
||||
hostname: parsed.hostname,
|
||||
port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80),
|
||||
path: parsed.pathname,
|
||||
timeout: timeoutMs,
|
||||
}, (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); });
|
||||
req.on('error', () => resolve(false));
|
||||
req.on('timeout', () => { req.destroy(); resolve(false); });
|
||||
});
|
||||
}
|
||||
|
||||
function readToken(): string | null {
|
||||
try {
|
||||
const path = join(process.env.HOME ?? '', '.mcpctl', 'credentials');
|
||||
if (!existsSync(path)) return null;
|
||||
const parsed = JSON.parse(readFileSync(path, 'utf-8')) as { token?: string };
|
||||
return parsed.token ?? null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
interface HttpResponse { status: number; body: string }
|
||||
|
||||
function httpRequest(method: string, urlStr: string, body: unknown): Promise<HttpResponse> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const tokenRaw = readToken();
|
||||
const parsed = new URL(urlStr);
|
||||
const driver = parsed.protocol === 'https:' ? https : http;
|
||||
const headers: Record<string, string> = {
|
||||
Accept: 'application/json',
|
||||
...(body !== undefined ? { 'Content-Type': 'application/json' } : {}),
|
||||
...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}),
|
||||
};
|
||||
const req = driver.request({
|
||||
hostname: parsed.hostname,
|
||||
port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80),
|
||||
path: parsed.pathname + parsed.search,
|
||||
method,
|
||||
headers,
|
||||
timeout: 30_000,
|
||||
}, (res) => {
|
||||
const chunks: Buffer[] = [];
|
||||
res.on('data', (c: Buffer) => chunks.push(c));
|
||||
res.on('end', () => {
|
||||
resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') });
|
||||
});
|
||||
});
|
||||
req.on('error', reject);
|
||||
req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); });
|
||||
if (body !== undefined) req.write(JSON.stringify(body));
|
||||
req.end();
|
||||
});
|
||||
}
|
||||
|
||||
let mcpdUp = false;
|
||||
let registrarA: VirtualLlmRegistrar | null = null;
|
||||
let registrarB: VirtualLlmRegistrar | null = null;
|
||||
let tempDir: string;
|
||||
|
||||
describe('llm-pool smoke (v4)', () => {
|
||||
beforeAll(async () => {
|
||||
mcpdUp = await healthz(MCPD_URL);
|
||||
if (!mcpdUp) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn(`\n ○ llm-pool smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`);
|
||||
return;
|
||||
}
|
||||
if (readToken() === null) {
|
||||
mcpdUp = false;
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn('\n ○ llm-pool smoke: skipped — no ~/.mcpctl/credentials.\n');
|
||||
return;
|
||||
}
|
||||
tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-llm-pool-smoke-'));
|
||||
}, 20_000);
|
||||
|
||||
afterAll(async () => {
|
||||
if (registrarA !== null) registrarA.stop();
|
||||
if (registrarB !== null) registrarB.stop();
|
||||
if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true });
|
||||
if (!mcpdUp) return;
|
||||
// Best-effort cleanup. Agent first (Restrict FK), then both Llms.
|
||||
const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined);
|
||||
if (agents.status === 200) {
|
||||
const rows = JSON.parse(agents.body) as Array<{ id: string; name: string }>;
|
||||
const row = rows.find((r) => r.name === AGENT_NAME);
|
||||
if (row !== undefined) await httpRequest('DELETE', `${MCPD_URL}/api/v1/agents/${row.id}`, undefined);
|
||||
}
|
||||
const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||
if (llms.status === 200) {
|
||||
const rows = JSON.parse(llms.body) as Array<{ id: string; name: string }>;
|
||||
for (const r of rows) {
|
||||
if (r.name === PROVIDER_A || r.name === PROVIDER_B) {
|
||||
await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${r.id}`, undefined);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it('two publishers with shared poolName show up in /api/v1/llms/<name>/members', async () => {
|
||||
if (!mcpdUp) return;
|
||||
const token = readToken();
|
||||
if (token === null) return;
|
||||
|
||||
// Publisher A — also publishes the agent so we can chat through the pool.
|
||||
const pubA: RegistrarPublishedProvider = {
|
||||
provider: makeFakeProvider(PROVIDER_A, 'reply from A'),
|
||||
type: 'openai',
|
||||
model: 'fake-pool',
|
||||
poolName: POOL_NAME,
|
||||
};
|
||||
const pubAgent: RegistrarPublishedAgent = {
|
||||
name: AGENT_NAME,
|
||||
// Agent pins to publisher A specifically — pool dispatch then widens
|
||||
// at chat time. Demonstrates the v4 transparency: pinning to one
|
||||
// member implicitly opts the agent into the whole pool.
|
||||
llmName: PROVIDER_A,
|
||||
description: 'v4 pool smoke',
|
||||
systemPrompt: 'Reply with whatever the backend returns.',
|
||||
};
|
||||
registrarA = new VirtualLlmRegistrar({
|
||||
mcpdUrl: MCPD_URL,
|
||||
token,
|
||||
publishedProviders: [pubA],
|
||||
publishedAgents: [pubAgent],
|
||||
sessionFilePath: join(tempDir, 'session-a'),
|
||||
log: { info: () => {}, warn: () => {}, error: () => {} },
|
||||
heartbeatIntervalMs: 60_000,
|
||||
});
|
||||
await registrarA.start();
|
||||
|
||||
// Publisher B — same poolName, different name. No agent.
|
||||
const pubB: RegistrarPublishedProvider = {
|
||||
provider: makeFakeProvider(PROVIDER_B, 'reply from B'),
|
||||
type: 'openai',
|
||||
model: 'fake-pool',
|
||||
poolName: POOL_NAME,
|
||||
};
|
||||
registrarB = new VirtualLlmRegistrar({
|
||||
mcpdUrl: MCPD_URL,
|
||||
token,
|
||||
publishedProviders: [pubB],
|
||||
sessionFilePath: join(tempDir, 'session-b'),
|
||||
log: { info: () => {}, warn: () => {}, error: () => {} },
|
||||
heartbeatIntervalMs: 60_000,
|
||||
});
|
||||
await registrarB.start();
|
||||
|
||||
// Let both registers settle on mcpd's side.
|
||||
await new Promise((r) => setTimeout(r, 600));
|
||||
|
||||
// Hit the new /members endpoint via either pool member's name.
|
||||
const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms/${PROVIDER_A}/members`, undefined);
|
||||
expect(res.status).toBe(200);
|
||||
const body = JSON.parse(res.body) as {
|
||||
poolName: string;
|
||||
explicitPoolName: string | null;
|
||||
size: number;
|
||||
activeCount: number;
|
||||
members: Array<{ name: string; poolName: string | null; status: string }>;
|
||||
};
|
||||
expect(body.poolName).toBe(POOL_NAME);
|
||||
expect(body.explicitPoolName).toBe(POOL_NAME);
|
||||
expect(body.size).toBe(2);
|
||||
expect(body.activeCount).toBe(2);
|
||||
const names = body.members.map((m) => m.name).sort();
|
||||
expect(names).toEqual([PROVIDER_A, PROVIDER_B].sort());
|
||||
for (const m of body.members) {
|
||||
expect(m.poolName).toBe(POOL_NAME);
|
||||
}
|
||||
}, 30_000);
|
||||
|
||||
it('chat through the agent dispatches across both pool members over multiple calls', async () => {
|
||||
if (!mcpdUp) return;
|
||||
// The chat dispatcher randomly shuffles candidates per call. Run
|
||||
// enough turns that hitting only one member would be statistically
|
||||
// suspicious (P(hit only A or only B) over 12 calls is ~1/2048 if the
|
||||
// shuffle is fair). We assert >= one of each.
|
||||
const seen = new Set<string>();
|
||||
for (let i = 0; i < 12; i += 1) {
|
||||
const res = await httpRequest('POST', `${MCPD_URL}/api/v1/agents/${AGENT_NAME}/chat`, {
|
||||
message: `ping ${String(i)}`,
|
||||
stream: false,
|
||||
});
|
||||
expect(res.status, res.body).toBe(200);
|
||||
const body = JSON.parse(res.body) as { assistant: string };
|
||||
seen.add(body.assistant);
|
||||
if (seen.has('reply from A') && seen.has('reply from B')) break;
|
||||
}
|
||||
expect(seen.has('reply from A'), `pool dispatch should have hit A at least once; saw: ${[...seen].join(', ')}`).toBe(true);
|
||||
expect(seen.has('reply from B'), `pool dispatch should have hit B at least once; saw: ${[...seen].join(', ')}`).toBe(true);
|
||||
}, 90_000);
|
||||
|
||||
it('failover: stop one publisher, chat still succeeds via the surviving member', async () => {
|
||||
if (!mcpdUp) return;
|
||||
// Stop publisher B. mcpd's unbindSession flips B's row to inactive
|
||||
// synchronously on SSE close, so the next chat's pool resolution
|
||||
// skips it.
|
||||
if (registrarB !== null) {
|
||||
registrarB.stop();
|
||||
registrarB = null;
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 400));
|
||||
|
||||
// Confirm B is inactive in /members.
|
||||
const members = await httpRequest('GET', `${MCPD_URL}/api/v1/llms/${PROVIDER_A}/members`, undefined);
|
||||
expect(members.status).toBe(200);
|
||||
const body = JSON.parse(members.body) as {
|
||||
members: Array<{ name: string; status: string }>;
|
||||
};
|
||||
const memB = body.members.find((m) => m.name === PROVIDER_B);
|
||||
expect(memB?.status).toBe('inactive');
|
||||
|
||||
// Chat continues to work — only A responds now.
|
||||
for (let i = 0; i < 3; i += 1) {
|
||||
const res = await httpRequest('POST', `${MCPD_URL}/api/v1/agents/${AGENT_NAME}/chat`, {
|
||||
message: `post-failover ping ${String(i)}`,
|
||||
stream: false,
|
||||
});
|
||||
expect(res.status, res.body).toBe(200);
|
||||
const out = JSON.parse(res.body) as { assistant: string };
|
||||
expect(out.assistant).toBe('reply from A');
|
||||
}
|
||||
}, 30_000);
|
||||
});
|
||||
Reference in New Issue
Block a user