diff --git a/completions/mcpctl.bash b/completions/mcpctl.bash index 2b86325..21a1b1e 100644 --- a/completions/mcpctl.bash +++ b/completions/mcpctl.bash @@ -185,7 +185,7 @@ _mcpctl() { COMPREPLY=($(compgen -W "--data --force -h --help" -- "$cur")) ;; llm) - COMPREPLY=($(compgen -W "--type --model --url --tier --description --api-key-ref --extra --force --skip-auth-check -h --help" -- "$cur")) + COMPREPLY=($(compgen -W "--type --model --url --tier --description --api-key-ref --extra --pool-name --force --skip-auth-check -h --help" -- "$cur")) ;; agent) COMPREPLY=($(compgen -W "--llm --project --description --system-prompt --system-prompt-file --proxy-model --default-temperature --default-top-p --default-top-k --default-max-tokens --default-seed --default-stop --default-extra --default-params-file --force -h --help" -- "$cur")) diff --git a/completions/mcpctl.fish b/completions/mcpctl.fish index 810b375..afe320d 100644 --- a/completions/mcpctl.fish +++ b/completions/mcpctl.fish @@ -333,6 +333,7 @@ complete -c mcpctl -n "__mcpctl_subcmd_active create llm" -l tier -d 'Tier: fast complete -c mcpctl -n "__mcpctl_subcmd_active create llm" -l description -d 'Description' -x complete -c mcpctl -n "__mcpctl_subcmd_active create llm" -l api-key-ref -d 'API key reference in SECRET/KEY form (e.g. anthropic-key/token)' -x complete -c mcpctl -n "__mcpctl_subcmd_active create llm" -l extra -d 'Extra config key=value (repeat)' -x +complete -c mcpctl -n "__mcpctl_subcmd_active create llm" -l pool-name -d 'Stack with other Llms sharing this pool name; agents pinned to any member dispatch across the pool' -x complete -c mcpctl -n "__mcpctl_subcmd_active create llm" -l force -d 'Update if already exists' complete -c mcpctl -n "__mcpctl_subcmd_active create llm" -l skip-auth-check -d 'Skip the upstream auth probe (for offline registration before infra exists)' diff --git a/docs/agents.md b/docs/agents.md index 462dda2..b8c1bdb 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -208,5 +208,8 @@ mcpctl chat reviewer extends the same publishing model to **virtual agents** declared in mcplocal config — they show up in `mcpctl get agent` with `KIND=virtual / STATUS=active` and become chat-able via - `mcpctl chat ` like any other agent. + `mcpctl chat ` like any other agent. **v4** adds **pools**: + Llms sharing a `poolName` stack into one load-balanced pool that + the chat dispatcher transparently widens to at request time, with + random selection + sequential failover on transport errors. - [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags. diff --git a/docs/virtual-llms.md b/docs/virtual-llms.md index 25dc089..6257d83 100644 --- a/docs/virtual-llms.md +++ b/docs/virtual-llms.md @@ -278,10 +278,148 @@ when the agent's pinned Llm is virtual. same agent name collide on the second register with HTTP 409. Per-publisher namespacing is a v4+ concern — same constraint as virtual Llms in v1. +## LB pools (v4) + +Two or more `Llm` rows that share a `poolName` stack into one +load-balanced pool. Agents pin to a single Llm by id; the chat +dispatcher transparently widens to "all healthy Llms with the same +effective pool key" at request time and picks one. There is no new +`LlmPool` resource — `poolName` is just an optional column on `Llm`, +so RBAC, listing, yaml round-trip, and apply all work the same way +they did pre-v4. + +### Pool semantics + +| Field | Behavior | +|---|---| +| `Llm.name` | Globally unique (unchanged). The apply key. | +| `Llm.poolName` | Optional. When set, declares membership. When NULL, falls back to `name` ("solo Llm, pool of 1"). | + +Effective pool key = `poolName ?? name`. The dispatcher's lookup is: + +```sql +SELECT * FROM Llm +WHERE poolName = $1 OR (poolName IS NULL AND name = $1) +``` + +So a solo Llm whose `name` happens to equal an explicit `poolName` +joins that pool — by design, an existing single-row Llm can be +promoted to "pool seed" without a rename or migration. + +### Selection + failover + +- **Selection**: random shuffle of all members whose `status` is + `active` (or `hibernating` — VirtualLlmService handles wake on + dispatch). `inactive` members are skipped. +- **Failover** (non-streaming): if dispatch throws on the first + candidate (transport failure, virtual publisher disconnect), the + dispatcher iterates the rest of the shuffled list until one + succeeds or the list is exhausted. Auth/4xx responses are NOT + retried — siblings with the same key/model would fail identically. +- **Failover** (streaming): only covers "couldn't establish stream" + failures (transport error before any chunk yielded). Once any + output has been streamed, we're committed to that backend. + +### Declaring a pool + +#### Public Llms + +```bash +mcpctl create llm prod-qwen-1 --type openai --model qwen3-thinking \ + --url https://prod-1.example.com --pool-name qwen-pool \ + --api-key-ref qwen-key/API_KEY + +mcpctl create llm prod-qwen-2 --type openai --model qwen3-thinking \ + --url https://prod-2.example.com --pool-name qwen-pool \ + --api-key-ref qwen-key/API_KEY +``` + +Or via apply (yaml round-trip preserves `poolName`): + +```yaml +--- +kind: llm +name: prod-qwen-1 +type: openai +model: qwen3-thinking +url: https://prod-1.example.com +poolName: qwen-pool +apiKeyRef: { name: qwen-key, key: API_KEY } +--- +kind: llm +name: prod-qwen-2 +type: openai +model: qwen3-thinking +url: https://prod-2.example.com +poolName: qwen-pool +apiKeyRef: { name: qwen-key, key: API_KEY } +``` + +#### Virtual Llms (mcplocal-published) + +```jsonc +// ~/.mcpctl/config.json +{ + "llm": { + "providers": [ + { + "name": "vllm-alice-qwen3", // unique per publisher + "type": "vllm-managed", + "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", + "venvPath": "~/vllm_env", + "publish": true, + "poolName": "user-vllm-qwen3-thinking" // shared pool key + } + ] + } +} +``` + +Each user's mcplocal picks a unique `name` (e.g. include the hostname +to guarantee no collisions) but shares the `poolName`. Agents pinned +to any single member — or to `qwen3-thinking` (the public LiteLLM +endpoint, also given `poolName: user-vllm-qwen3-thinking` if mixing +public + virtual is desired) — see one logical pool that auto-grows +as more workers come online. + +### Listing + describe + +The `mcpctl get llm` table has a `POOL` column right after `NAME`. +Solo rows render as `-`; pool members show their explicit pool key: + +``` +NAME POOL KIND STATUS TYPE MODEL ID +qwen3-thinking - public active openai qwen3-thinking cmo... +prod-qwen-1 qwen-pool public active openai qwen3-thinking cmo... +prod-qwen-2 qwen-pool public active openai qwen3-thinking cmo... +``` + +`mcpctl describe llm ` adds a `Pool:` block at the top when the +row is in an explicit pool OR when its implicit pool has size > 1: + +``` +Pool: + Pool name: qwen-pool + Members: 2 (2 active) + - prod-qwen-1 [public/active] ← this row + - prod-qwen-2 [public/active] +``` + +`GET /api/v1/llms//members` is the API surface — returns full +`LlmView`s for every member plus aggregate `size` / `activeCount` so +operator tooling doesn't need a second roundtrip. + +### Pinning to a specific instance + +To pin an agent to one specific instance (e.g. for debugging, +RBAC-scoped routing, or "this agent must hit this model with this +key"), give that instance a unique name and leave its `poolName` +unset. The agent's pool is then size 1 and dispatch is deterministic. +Pool membership is opt-in via `poolName` — the default behavior is +single-Llm. + ## Roadmap (later stages) -- **v4 — LB pool by model**: agents can target a model name instead of - a specific Llm; mcpd picks the healthiest pool member per request. - **v5 — Task queue**: persisted requests for hibernating/saturated pools. Workers pull tasks of their model when they come online. @@ -301,8 +439,12 @@ POST /api/v1/llms/_provider-task/:id/result { chunk: { data, done? } } { status, body } -GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince) -POST /api/v1/llms//infer → routes through the SSE relay +GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince, poolName) +GET /api/v1/llms/ → single Llm row (also accepts a CUID id) +GET /api/v1/llms//members → v4: pool members for the effective pool key: + { poolName, explicitPoolName, size, activeCount, members[] } +POST /api/v1/llms//infer → routes through the SSE relay (v4: dispatcher + also expands by poolName when set) DELETE /api/v1/llms/ → delete unconditionally (also runs GC's job) GET /api/v1/agents → list (v3: includes kind, status, lastHeartbeatAt, inactiveSince) ``` diff --git a/src/cli/src/commands/apply.ts b/src/cli/src/commands/apply.ts index 4eac13a..ecb7b63 100644 --- a/src/cli/src/commands/apply.ts +++ b/src/cli/src/commands/apply.ts @@ -61,6 +61,10 @@ const LlmSpecSchema = z.object({ key: z.string().min(1), }).nullable().optional(), extraConfig: z.record(z.unknown()).default({}), + // v4: optional pool key. Same validation as on the mcpd side + // (CreateLlmSchema). Null means "solo Llm, effective pool key falls + // back to the row's own name". + poolName: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/).nullable().optional(), }); const AgentChatParamsAppliedSchema = z.object({ diff --git a/src/cli/src/commands/create.ts b/src/cli/src/commands/create.ts index a94f677..d0f4d69 100644 --- a/src/cli/src/commands/create.ts +++ b/src/cli/src/commands/create.ts @@ -263,6 +263,7 @@ export function createCreateCommand(deps: CreateCommandDeps): Command { .option('--description ', 'Description') .option('--api-key-ref ', 'API key reference in SECRET/KEY form (e.g. anthropic-key/token)') .option('--extra ', 'Extra config key=value (repeat)', collect, []) + .option('--pool-name ', 'Stack with other Llms sharing this pool name; agents pinned to any member dispatch across the pool') .option('--force', 'Update if already exists') .option('--skip-auth-check', 'Skip the upstream auth probe (for offline registration before infra exists)') .action(async (name: string, opts) => { @@ -274,6 +275,7 @@ export function createCreateCommand(deps: CreateCommandDeps): Command { }; if (opts.url) body.url = opts.url; if (opts.description !== undefined) body.description = opts.description; + if (opts.poolName !== undefined) body.poolName = opts.poolName; if (opts.apiKeyRef) { const slashIdx = (opts.apiKeyRef as string).indexOf('/'); if (slashIdx < 1) throw new Error(`Invalid --api-key-ref '${opts.apiKeyRef as string}'. Expected SECRET_NAME/KEY_NAME`); diff --git a/src/cli/src/commands/describe.ts b/src/cli/src/commands/describe.ts index 998d12d..bda4ffb 100644 --- a/src/cli/src/commands/describe.ts +++ b/src/cli/src/commands/describe.ts @@ -243,7 +243,15 @@ function formatSecretDetail(secret: Record, showValues: boolean return lines.join('\n'); } -function formatLlmDetail(llm: Record): string { +interface PoolMembersInfo { + poolName: string; + explicitPoolName: string | null; + size: number; + activeCount: number; + members: Array<{ id?: string; name: string; status?: string; kind?: string; url?: string }>; +} + +function formatLlmDetail(llm: Record, pool?: PoolMembersInfo): string { const lines: string[] = []; lines.push(`=== LLM: ${llm.name} ===`); lines.push(`${pad('Name:')}${llm.name}`); @@ -253,6 +261,29 @@ function formatLlmDetail(llm: Record): string { if (llm.url) lines.push(`${pad('URL:')}${llm.url}`); if (llm.description) lines.push(`${pad('Description:')}${llm.description}`); + // v4 Pool block: only render when there's actually pool context to show. + // For solo Llms (poolName null AND pool size 1), suppress the section so + // describe stays compact for the common case. For explicit-pool members + // OR rows whose name is implicitly seeding a pool (size > 1), render up + // top so it's the first thing the operator sees — pool routing is a + // significant behavioral fact. + const poolNameVal = llm.poolName as string | null | undefined; + const isExplicitPool = poolNameVal !== null && poolNameVal !== undefined && poolNameVal !== ''; + const isImplicitPool = pool !== undefined && pool.size > 1; + if (isExplicitPool || isImplicitPool) { + lines.push(''); + lines.push('Pool:'); + const effective = pool?.poolName ?? (poolNameVal ?? llm.name as string); + lines.push(` ${pad('Pool name:', 14)}${effective}${isExplicitPool ? '' : ' (implicit, falls back to name)'}`); + if (pool !== undefined) { + lines.push(` ${pad('Members:', 14)}${String(pool.size)} (${String(pool.activeCount)} active)`); + for (const m of pool.members) { + const youSuffix = m.name === llm.name ? ' ← this row' : ''; + lines.push(` - ${m.name} [${m.kind ?? '?'}/${m.status ?? '?'}]${youSuffix}`); + } + } + } + const ref = llm.apiKeyRef as { name: string; key: string } | null | undefined; lines.push(''); lines.push('API Key:'); @@ -982,9 +1013,22 @@ export function createDescribeCommand(deps: DescribeCommandDeps): Command { case 'secretbackends': deps.log(formatSecretBackendDetail(item)); break; - case 'llms': - deps.log(formatLlmDetail(item)); + case 'llms': { + // v4: also fetch pool membership so the describe Pool block + // can show siblings + active counts. Best-effort — older + // mcpd versions without the /members route 404 here, in + // which case we render the row alone. + let poolInfo: PoolMembersInfo | undefined; + try { + poolInfo = await deps.client.get( + `/api/v1/llms/${encodeURIComponent(item.name as string)}/members`, + ); + } catch { + // Old mcpd without /members, or RBAC denial — fall back silently. + } + deps.log(formatLlmDetail(item, poolInfo)); break; + } case 'projects': { const [projectPrompts, llms] = await Promise.all([ deps.client diff --git a/src/cli/src/commands/get.ts b/src/cli/src/commands/get.ts index f3e07ac..f931e30 100644 --- a/src/cli/src/commands/get.ts +++ b/src/cli/src/commands/get.ts @@ -136,10 +136,16 @@ interface LlmRow { // mcpd responses that predate the kind/status columns). kind?: 'public' | 'virtual'; status?: 'active' | 'inactive' | 'hibernating'; + // v4: explicit pool key. NULL = solo Llm (effective pool = its own name). + poolName?: string | null; } +// v4: POOL column placed right after NAME so an operator can't miss +// which Llms stack into the same dispatcher pool. Solo rows show "-" +// to make the "no pool / pool of 1" case unambiguous. const llmColumns: Column[] = [ { header: 'NAME', key: 'name' }, + { header: 'POOL', key: (r) => (r.poolName !== null && r.poolName !== undefined && r.poolName !== '') ? r.poolName : '-', width: 18 }, { header: 'KIND', key: (r) => r.kind ?? 'public', width: 8 }, { header: 'STATUS', key: (r) => r.status ?? 'active', width: 12 }, { header: 'TYPE', key: 'type', width: 12 }, diff --git a/src/db/prisma/migrations/20260427205303_add_llm_pool_name/migration.sql b/src/db/prisma/migrations/20260427205303_add_llm_pool_name/migration.sql new file mode 100644 index 0000000..eaa1de1 --- /dev/null +++ b/src/db/prisma/migrations/20260427205303_add_llm_pool_name/migration.sql @@ -0,0 +1,10 @@ +-- v4: optional pool key. When NULL, the effective pool key is the row's +-- own `name` (pool of 1, identical to pre-v4 behavior). Multiple Llms +-- sharing a non-null `poolName` stack into one load-balanced pool that +-- the chat dispatcher expands at request time. +ALTER TABLE "Llm" ADD COLUMN "poolName" TEXT; + +-- Index covers both the dispatcher's `WHERE poolName = $1` lookup and +-- the v4 admin endpoint `GET /api/v1/llms//members` (which expands +-- by effective pool key). +CREATE INDEX "Llm_poolName_idx" ON "Llm"("poolName"); diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index 8b64ad8..403f18d 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -211,6 +211,14 @@ model Llm { apiKeySecretId String? // FK to Secret apiKeySecretKey String? // key inside the Secret's data extraConfig Json @default("{}") // per-type extras + // ── LB pool by name (v4) ── + // When set, this Llm is part of a load-balanced pool. Multiple Llms + // sharing the same `poolName` stack into a pool that the chat dispatcher + // expands at request time and picks an active member from. When NULL, + // the effective pool key is the row's own `name` (i.e. "pool of 1", + // identical to pre-v4 behavior). Agents pin to a single Llm by id; the + // dispatcher transparently widens to the pool when this field is set. + poolName String? // ── Virtual-provider lifecycle (NULL/default for kind=public) ── kind LlmKind @default(public) providerSessionId String? // mcplocal session that owns this row when virtual @@ -229,6 +237,7 @@ model Llm { @@index([apiKeySecretId]) @@index([kind, status]) @@index([providerSessionId]) + @@index([poolName]) } // ── Groups ── diff --git a/src/db/tests/llm-pool-schema.test.ts b/src/db/tests/llm-pool-schema.test.ts new file mode 100644 index 0000000..6253f8b --- /dev/null +++ b/src/db/tests/llm-pool-schema.test.ts @@ -0,0 +1,129 @@ +/** + * v4 schema-level tests for `Llm.poolName` and the dispatcher's + * `findByPoolName` query semantics. Lives in the db package because it + * exercises the actual Prisma column + index — the mcpd-side unit tests + * already cover the dispatcher's behavior with a mocked LlmService. + */ +import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest'; +import type { PrismaClient } from '@prisma/client'; +import { setupTestDb, cleanupTestDb, clearAllTables } from './helpers.js'; + +/** Re-implementation of the LlmRepository query for direct schema verification. */ +function findByPoolName(prisma: PrismaClient, poolName: string) { + return prisma.llm.findMany({ + where: { + OR: [ + { poolName }, + { AND: [{ poolName: null }, { name: poolName }] }, + ], + }, + orderBy: { name: 'asc' }, + }); +} + +describe('Llm.poolName (v4)', () => { + let prisma: PrismaClient; + + beforeAll(async () => { + prisma = await setupTestDb(); + }, 30_000); + + afterAll(async () => { + await cleanupTestDb(); + }); + + beforeEach(async () => { + await clearAllTables(prisma); + }); + + it('defaults poolName to NULL for freshly inserted rows', async () => { + const llm = await prisma.llm.create({ + data: { name: 'plain', type: 'openai', model: 'gpt-4o' }, + }); + expect(llm.poolName).toBeNull(); + }); + + it('allows multiple rows to share a poolName (the v4 stacking behavior)', async () => { + await prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + await prisma.llm.create({ + data: { name: 'qwen-prod-2', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + await prisma.llm.create({ + data: { name: 'qwen-prod-3', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + + const members = await findByPoolName(prisma, 'qwen-pool'); + expect(members.map((m) => m.name).sort()).toEqual(['qwen-prod-1', 'qwen-prod-2', 'qwen-prod-3']); + }); + + it('keeps `name` globally unique even when multiple rows share a poolName', async () => { + await prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + await expect( + prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }), + ).rejects.toThrow(); + }); + + it('falls back to `name` as the effective pool key for solo rows (poolName=NULL)', async () => { + // Solo row addressable via its own name as a "pool of 1". + await prisma.llm.create({ + data: { name: 'gpt-4o', type: 'openai', model: 'gpt-4o' }, + }); + const members = await findByPoolName(prisma, 'gpt-4o'); + expect(members.map((m) => m.name)).toEqual(['gpt-4o']); + }); + + it('a solo row with name=X joins the same pool as explicit poolName=X members', async () => { + // Edge case: an existing solo Llm named "qwen-pool" pre-dates pool + // adoption, then a publisher registers with poolName=qwen-pool. Both + // should appear in the dispatcher's candidate list — the effective + // pool key (poolName ?? name) is "qwen-pool" for each. + await prisma.llm.create({ + data: { name: 'qwen-pool', type: 'openai', model: 'qwen3-thinking' }, + }); + await prisma.llm.create({ + data: { name: 'qwen-prod-2', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + + const members = await findByPoolName(prisma, 'qwen-pool'); + expect(members.map((m) => m.name).sort()).toEqual(['qwen-pool', 'qwen-prod-2']); + }); + + it('does not match a solo row by `name` when its poolName is set to something else', async () => { + // Solo with name=foo but poolName=bar should NOT match findByPoolName('foo') + // — the explicit poolName takes precedence over the name fallback. + await prisma.llm.create({ + data: { name: 'foo', type: 'openai', model: 'm', poolName: 'bar' }, + }); + const members = await findByPoolName(prisma, 'foo'); + expect(members.map((m) => m.name)).toEqual([]); + + const inBar = await findByPoolName(prisma, 'bar'); + expect(inBar.map((m) => m.name)).toEqual(['foo']); + }); + + it('updates poolName via update() and round-trips correctly', async () => { + const llm = await prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking' }, + }); + expect(llm.poolName).toBeNull(); + + const updated = await prisma.llm.update({ + where: { id: llm.id }, + data: { poolName: 'qwen-pool' }, + }); + expect(updated.poolName).toBe('qwen-pool'); + + // Revert to solo (NULL). + const reverted = await prisma.llm.update({ + where: { id: llm.id }, + data: { poolName: null }, + }); + expect(reverted.poolName).toBeNull(); + }); +}); diff --git a/src/mcpd/src/repositories/llm.repository.ts b/src/mcpd/src/repositories/llm.repository.ts index 06a5868..8942e19 100644 --- a/src/mcpd/src/repositories/llm.repository.ts +++ b/src/mcpd/src/repositories/llm.repository.ts @@ -10,6 +10,8 @@ export interface CreateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // v4: optional pool key. NULL = "pool of 1" (effective key falls back to `name`). + poolName?: string | null; // Virtual-provider lifecycle (omit for kind=public). kind?: LlmKind; providerSessionId?: string | null; @@ -27,6 +29,7 @@ export interface UpdateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + poolName?: string | null; // Virtual-provider lifecycle. VirtualLlmService is the only writer for // these in v1; the public CRUD path leaves them undefined. kind?: LlmKind; @@ -41,6 +44,13 @@ export interface ILlmRepository { findById(id: string): Promise; findByName(name: string): Promise; findByTier(tier: string): Promise; + /** + * v4: members of an effective pool, ordered by name. The pool is + * defined as: `WHERE poolName = $1 OR (poolName IS NULL AND name = $1)`. + * For a non-pooled Llm this returns exactly the row whose own name is + * the key. For a pool of N, returns all N members. + */ + findByPoolName(poolName: string): Promise; create(data: CreateLlmInput): Promise; update(id: string, data: UpdateLlmInput): Promise; delete(id: string): Promise; @@ -69,6 +79,22 @@ export class LlmRepository implements ILlmRepository { return this.prisma.llm.findMany({ where: { tier }, orderBy: { name: 'asc' } }); } + async findByPoolName(poolName: string): Promise { + return this.prisma.llm.findMany({ + where: { + OR: [ + { poolName }, + // Solo rows fall back to their own `name` as the effective pool + // key. Querying for "qwen-pool" must also pick up a solo row + // whose name is "qwen-pool" (poolName=null) — otherwise existing + // single-Llm targets would silently disappear from the pool view. + { AND: [{ poolName: null }, { name: poolName }] }, + ], + }, + orderBy: { name: 'asc' }, + }); + } + async create(data: CreateLlmInput): Promise { return this.prisma.llm.create({ data: { @@ -81,6 +107,7 @@ export class LlmRepository implements ILlmRepository { apiKeySecretId: data.apiKeySecretId ?? null, apiKeySecretKey: data.apiKeySecretKey ?? null, extraConfig: (data.extraConfig ?? {}) as Prisma.InputJsonValue, + ...(data.poolName !== undefined ? { poolName: data.poolName } : {}), ...(data.kind !== undefined ? { kind: data.kind } : {}), ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), ...(data.status !== undefined ? { status: data.status } : {}), @@ -104,6 +131,7 @@ export class LlmRepository implements ILlmRepository { } if (data.apiKeySecretKey !== undefined) updateData.apiKeySecretKey = data.apiKeySecretKey; if (data.extraConfig !== undefined) updateData.extraConfig = data.extraConfig as Prisma.InputJsonValue; + if (data.poolName !== undefined) updateData.poolName = data.poolName; if (data.kind !== undefined) updateData.kind = data.kind; if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; if (data.status !== undefined) updateData.status = data.status; diff --git a/src/mcpd/src/routes/llms.ts b/src/mcpd/src/routes/llms.ts index 7d34571..b153a9a 100644 --- a/src/mcpd/src/routes/llms.ts +++ b/src/mcpd/src/routes/llms.ts @@ -1,6 +1,6 @@ import type { FastifyInstance } from 'fastify'; -import type { LlmService } from '../services/llm.service.js'; -import { LlmAuthVerificationError } from '../services/llm.service.js'; +import type { LlmService, LlmView } from '../services/llm.service.js'; +import { LlmAuthVerificationError, effectivePoolName } from '../services/llm.service.js'; import { NotFoundError, ConflictError } from '../services/mcp-server.service.js'; export function registerLlmRoutes( @@ -86,6 +86,44 @@ export function registerLlmRoutes( throw err; } }); + + // v4: list all members of the effective pool that the named Llm belongs to. + // The path uses an explicit `/members` suffix so it can't collide with the + // single-Llm `/api/v1/llms/:id` route — an Llm whose name happens to be + // "members" would otherwise be unaddressable. + // Returns [] members[] AND a small header object so callers don't need a + // second roundtrip to compute pool stats; agents.ts and the CLI both want + // size + activeCount. + app.get<{ Params: { name: string } }>('/api/v1/llms/:name/members', async (request, reply) => { + try { + const anchor = await getByIdOrName(service, request.params.name); + const members = await service.listPoolMembers(effectivePoolName(anchor)); + return { + poolName: effectivePoolName(anchor), + explicitPoolName: anchor.poolName, + size: members.length, + activeCount: members.filter((m) => m.status === 'active').length, + members, + }; + } catch (err) { + if (err instanceof NotFoundError) { + reply.code(404); + return { error: err.message }; + } + throw err; + } + }); +} + +/** v4: convenience type for the new `/members` endpoint response. */ +export interface PoolMembersResponse { + /** Effective pool key (poolName ?? name on the anchor row). */ + poolName: string; + /** Anchor row's literal poolName field — null when it falls back to its own name. */ + explicitPoolName: string | null; + size: number; + activeCount: number; + members: LlmView[]; } const CUID_RE = /^c[a-z0-9]{24}/i; diff --git a/src/mcpd/src/routes/virtual-llms.ts b/src/mcpd/src/routes/virtual-llms.ts index d92bc8f..fbde2ba 100644 --- a/src/mcpd/src/routes/virtual-llms.ts +++ b/src/mcpd/src/routes/virtual-llms.ts @@ -201,6 +201,7 @@ function coerceProviderInput(raw: unknown): { description?: string; extraConfig?: Record; initialStatus?: 'active' | 'hibernating'; + poolName?: string; } { if (raw === null || typeof raw !== 'object') { throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 }); @@ -227,5 +228,11 @@ function coerceProviderInput(raw: unknown): { if (o['initialStatus'] === 'active' || o['initialStatus'] === 'hibernating') { out.initialStatus = o['initialStatus']; } + // v4: optional pool key. Validation matches CreateLlmSchema.poolName so + // a publisher can't slip an uppercase or whitespace name past the + // virtual-publish path that the public CRUD path would reject. + if (typeof o['poolName'] === 'string' && /^[a-z0-9-]+$/.test(o['poolName']) && o['poolName'].length >= 1 && o['poolName'].length <= 100) { + out.poolName = o['poolName']; + } return out; } diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index ff6aba8..80fe37c 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -24,6 +24,7 @@ import type { ChatMessage } from '@prisma/client'; import type { AgentService } from './agent.service.js'; import type { LlmService } from './llm.service.js'; +import { effectivePoolName } from './llm.service.js'; import type { LlmAdapterRegistry } from './llm/dispatcher.js'; import type { IChatRepository, @@ -101,6 +102,23 @@ export interface ChatStreamChunk { message?: string; } +/** + * v4: a single resolvable pool member. The chat dispatcher iterates a + * non-empty list of these on transport-level failure (network, virtual + * publisher disconnect, etc.) until one succeeds or the list is exhausted. + * For pool size 1 (no `poolName` set on the pinned Llm) the list has one + * entry and behavior is identical to pre-v4. + */ +export interface PoolCandidate { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; +} + export interface ChatRequestArgs { agentName: string; threadId?: string; @@ -384,27 +402,54 @@ export class ChatService { * The caller's `parseStreamingChunk` already speaks OpenAI shape, so * downstream code doesn't need to know which path produced the chunks. */ + /** + * Stream inference with v4 pool failover. We commit to a candidate the + * moment the first chunk arrives — partial output cannot be re-streamed + * from a different backend without confusing the caller. Failover + * therefore covers only "couldn't establish stream" errors (transport + * failure pre-first-chunk). After the first yield, exceptions propagate. + */ private async *streamInference(ctx: { - llmName: string; - llmType: string; - llmKind: 'public' | 'virtual'; - modelOverride: string; - url: string; - apiKey: string; - extraConfig: Record; + poolCandidates: PoolCandidate[]; history: OpenAiMessage[]; systemBlock: string; toolList: ChatTool[]; mergedParams: AgentChatParams; }): AsyncGenerator<{ data: string; done?: boolean }> { - if (ctx.llmKind !== 'virtual') { - const adapter = this.adapters.get(ctx.llmType); + let lastErr: Error | null = null; + for (const c of ctx.poolCandidates) { + try { + yield* this.streamOneCandidate(c, ctx); + return; + } catch (err) { + lastErr = err as Error; + // Try the next pool member only if no chunk has been yielded yet. + // streamOneCandidate throws *before* its first yield on dispatch + // failure; once it begins yielding, any error inside it propagates + // out of this `for` loop because the caller has already started + // consuming the stream. + } + } + throw lastErr ?? new Error('chat stream dispatch exhausted: no pool candidates'); + } + + private async *streamOneCandidate( + candidate: PoolCandidate, + ctx: { + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }, + ): AsyncGenerator<{ data: string; done?: boolean }> { + if (candidate.llmKind !== 'virtual') { + const adapter = this.adapters.get(candidate.llmType); yield* adapter.stream({ - body: { ...this.buildBody(ctx), stream: true }, - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, + body: { ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true }, + modelOverride: candidate.modelOverride, + apiKey: candidate.apiKey, + url: candidate.url, + extraConfig: candidate.extraConfig, }); return; } @@ -418,8 +463,8 @@ export class ChatService { // generator drains them in order. ref.done resolves when the // publisher emits its `[DONE]` marker. const ref = await this.virtualLlms.enqueueInferTask( - ctx.llmName, - { ...this.buildBody(ctx), stream: true }, + candidate.llmName, + { ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true }, true, ); const queue: Array<{ data: string; done?: boolean }> = []; @@ -453,43 +498,62 @@ export class ChatService { } /** - * Run a single non-streaming inference iteration. Branches on - * ctx.llmKind: public goes through the existing adapter registry, - * virtual relays through VirtualLlmService.enqueueInferTask (mirrors - * the same branch in `routes/llm-infer.ts` from v1 Stage 3). - * - * Throws when virtualLlms isn't wired but the row is virtual — older - * test wirings hit this path. + * Run a single non-streaming inference iteration with v4 pool failover. + * We try each pool candidate in order; on dispatch error (transport + * failure, virtual publisher disconnect) we move to the next member. + * Auth/4xx errors that the adapter surfaces as a status-code in the + * result are NOT retried — those would fail identically on a sibling + * with the same key/model, and the cost of one wasted retry per call + * is real. */ private async runOneInference(ctx: { - llmName: string; - llmType: string; - llmKind: 'public' | 'virtual'; - modelOverride: string; - url: string; - apiKey: string; - extraConfig: Record; + poolCandidates: PoolCandidate[]; history: OpenAiMessage[]; systemBlock: string; toolList: ChatTool[]; mergedParams: AgentChatParams; }): Promise<{ status: number; body: unknown }> { - if (ctx.llmKind === 'virtual') { + let lastErr: Error | null = null; + for (const c of ctx.poolCandidates) { + try { + return await this.dispatchOneCandidate(c, ctx); + } catch (err) { + lastErr = err as Error; + // Try the next pool member. + } + } + throw lastErr ?? new Error('chat dispatch exhausted: no pool candidates'); + } + + private async dispatchOneCandidate( + candidate: PoolCandidate, + ctx: { + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }, + ): Promise<{ status: number; body: unknown }> { + if (candidate.llmKind === 'virtual') { if (this.virtualLlms === undefined) { throw new Error( 'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm', ); } - const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false); + const ref = await this.virtualLlms.enqueueInferTask( + candidate.llmName, + this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), + false, + ); return ref.done; } - const adapter = this.adapters.get(ctx.llmType); + const adapter = this.adapters.get(candidate.llmType); return adapter.infer({ - body: this.buildBody(ctx), - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, + body: this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), + modelOverride: candidate.modelOverride, + apiKey: candidate.apiKey, + url: candidate.url, + extraConfig: candidate.extraConfig, }); } @@ -497,14 +561,14 @@ export class ChatService { threadId: string; history: OpenAiMessage[]; systemBlock: string; - llmName: string; - llmType: string; - /** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */ - llmKind: 'public' | 'virtual'; - modelOverride: string; - url: string; - apiKey: string; - extraConfig: Record; + /** + * v4: ordered pool members for this turn. [0] is the candidate to try + * first (random shuffle of viable members so load spreads). On + * transport-level failure the dispatcher iterates the rest. Always + * non-empty — at minimum the agent's pinned Llm (or a sibling pool + * member when the pinned row is itself inactive but others are up). + */ + poolCandidates: PoolCandidate[]; mergedParams: AgentChatParams; toolList: ChatTool[]; projectId: string | null; @@ -512,8 +576,18 @@ export class ChatService { maxIterations: number; }> { const agent = await this.agents.getByName(args.agentName); - const llm = await this.llms.getByName(agent.llm.name); - const apiKey = await this.llms.resolveApiKey(agent.llm.name).catch(() => ''); + const pinned = await this.llms.getByName(agent.llm.name); + const poolCandidates = await this.resolvePoolCandidates(pinned); + if (poolCandidates.length === 0) { + // Pool exists but every member is inactive (publishers offline, + // public Llm explicitly disabled, etc.). Surface a clear error + // before running through the rest of preparation; otherwise the + // chat loop would hit the dispatcher's "exhausted" branch and the + // caller would see a less helpful message. + throw new NotFoundError( + `No active Llm in pool '${effectivePoolName(pinned)}' (pinned: ${pinned.name})`, + ); + } const threadId = await this.resolveThreadId(args, agent.id); const projectId = agent.project?.id ?? null; @@ -581,13 +655,7 @@ export class ChatService { threadId, history, systemBlock, - llmName: llm.name, - llmType: llm.type, - llmKind: llm.kind, - modelOverride: llm.model, - url: llm.url, - apiKey, - extraConfig: llm.extraConfig, + poolCandidates, mergedParams, toolList: filteredTools, projectId, @@ -596,6 +664,40 @@ export class ChatService { }; } + /** + * v4: resolve the load-balanced pool for an agent's pinned Llm. The + * pool is "all Llms whose effective key (poolName ?? name) matches the + * pinned row's effective key", filtered to viable backends: + * - public: always included (no per-row health probe in v4) + * - virtual: included when status is 'active' or 'hibernating' + * (hibernating members get woken by VirtualLlmService when + * dispatched). 'inactive' = publisher offline, exclude. + * + * Order is randomised so load spreads across members. The pinned row + * doesn't get priority — if it happens to be down, a sibling takes + * the call without the caller noticing. + */ + private async resolvePoolCandidates(pinned: { name: string; poolName: string | null }): Promise { + const poolKey = effectivePoolName(pinned); + const rows = await this.llms.findByPoolName(poolKey); + const viable = rows.filter((r) => r.status !== 'inactive'); + // Fisher-Yates is overkill for typical pool sizes (1-5); a + // sort-by-random-key is adequate and side-effect-free. + const shuffled = [...viable].sort(() => Math.random() - 0.5); + return Promise.all(shuffled.map(async (r) => { + const apiKey = await this.llms.resolveApiKey(r.name).catch(() => ''); + return { + llmName: r.name, + llmType: r.type, + llmKind: r.kind as 'public' | 'virtual', + modelOverride: r.model, + url: r.url, + apiKey, + extraConfig: r.extraConfig as Record, + }; + })); + } + /** * Resolves a personality (request override → agent default) and returns * its bound prompt contents in `PersonalityPrompt.priority` desc order. diff --git a/src/mcpd/src/services/llm.service.ts b/src/mcpd/src/services/llm.service.ts index 92c5edd..af37176 100644 --- a/src/mcpd/src/services/llm.service.ts +++ b/src/mcpd/src/services/llm.service.ts @@ -50,6 +50,13 @@ export interface LlmView { description: string; apiKeyRef: ApiKeyRef | null; extraConfig: Record; + /** + * v4: explicit pool key. NULL means "pool of 1" — the row's effective + * pool key falls back to its own `name`. Multiple rows sharing a non-null + * `poolName` stack into one load-balanced pool that the chat dispatcher + * expands at request time. + */ + poolName: string | null; // Virtual-provider lifecycle (kind defaults to 'public' for legacy rows). kind: 'public' | 'virtual'; status: 'active' | 'inactive' | 'hibernating'; @@ -60,6 +67,16 @@ export interface LlmView { updatedAt: Date; } +/** + * v4: compute the effective pool key for an Llm. When `poolName` is + * explicitly set we use it; otherwise we fall back to the row's own + * `name` so a non-pooled Llm dispatches as a "pool of 1" without any + * branching at the call site. + */ +export function effectivePoolName(row: { name: string; poolName: string | null }): string { + return row.poolName !== null && row.poolName !== '' ? row.poolName : row.name; +} + export class LlmService { constructor( private readonly repo: ILlmRepository, @@ -84,6 +101,31 @@ export class LlmService { return this.toView(row); } + /** + * v4: members of an effective pool. The pool is defined as: + * `WHERE poolName = $1 OR (poolName IS NULL AND name = $1)` + * — solo Llms (poolName=null) are addressable by their own name as a + * pool of 1; explicit pool members share a `poolName`. Returns raw + * `Llm` rows because the chat dispatcher needs per-row api key + * resolution to build its candidate list — `toView` would force a + * round-trip through SecretService for every row, which is wasted + * work for a pool of N where most members are about to be skipped. + */ + async findByPoolName(poolName: string): Promise { + return this.repo.findByPoolName(poolName); + } + + /** + * v4: API/CLI-facing version of `findByPoolName` that returns full + * `LlmView`s (apiKeyRef resolved via SecretService). Used by the + * `GET /api/v1/llms/:name/members` route and `mcpctl describe llm` — + * both of which want the full row shape for display. + */ + async listPoolMembers(poolName: string): Promise { + const rows = await this.repo.findByPoolName(poolName); + return Promise.all(rows.map((r) => this.toView(r))); + } + async create(input: unknown, opts: { skipAuthCheck?: boolean } = {}): Promise { const data = CreateLlmSchema.parse(input); const existing = await this.repo.findByName(data.name); @@ -117,6 +159,7 @@ export class LlmService { apiKeySecretId: apiKeyFields.id, apiKeySecretKey: apiKeyFields.key, extraConfig: data.extraConfig, + ...(data.poolName !== undefined ? { poolName: data.poolName } : {}), }); return this.toView(row); } @@ -131,6 +174,8 @@ export class LlmService { if (data.tier !== undefined) updateFields.tier = data.tier; if (data.description !== undefined) updateFields.description = data.description; if (data.extraConfig !== undefined) updateFields.extraConfig = data.extraConfig; + // poolName: null → explicit unlink (revert to "pool of 1"); string → set; undefined → leave alone. + if (data.poolName !== undefined) updateFields.poolName = data.poolName; // apiKeyRef: null → explicit unlink; object → replace; undefined → leave alone. if (data.apiKeyRef !== undefined) { @@ -280,6 +325,7 @@ export class LlmService { description: row.description, apiKeyRef, extraConfig: row.extraConfig as Record, + poolName: row.poolName, kind: row.kind, status: row.status, lastHeartbeatAt: row.lastHeartbeatAt, diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts index fe70e9f..e0befd6 100644 --- a/src/mcpd/src/services/virtual-llm.service.ts +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -47,6 +47,15 @@ export interface RegisterProviderInput { * publish time. */ initialStatus?: 'active' | 'hibernating'; + /** + * v4: optional pool key. When set, this virtual Llm row stacks with + * any other Llms (public OR virtual from any session) sharing the + * same value. The chat dispatcher then load-balances across all + * healthy members. Cluster-wide name uniqueness still applies — the + * publisher picks a unique `name` (e.g. `vllm--qwen3`) and + * shares the `poolName` with siblings. + */ + poolName?: string; } export interface RegisterResult { @@ -147,6 +156,7 @@ export class VirtualLlmService implements IVirtualLlmService { tier: p.tier ?? 'fast', description: p.description ?? '', ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + ...(p.poolName !== undefined ? { poolName: p.poolName } : {}), kind: 'virtual', providerSessionId: sessionId, status: initialStatus, @@ -180,6 +190,7 @@ export class VirtualLlmService implements IVirtualLlmService { ...(p.tier !== undefined ? { tier: p.tier } : {}), ...(p.description !== undefined ? { description: p.description } : {}), ...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}), + ...(p.poolName !== undefined ? { poolName: p.poolName } : {}), kind: 'virtual', providerSessionId: sessionId, status: initialStatus, diff --git a/src/mcpd/src/validation/llm.schema.ts b/src/mcpd/src/validation/llm.schema.ts index 3982a5f..d8e0d24 100644 --- a/src/mcpd/src/validation/llm.schema.ts +++ b/src/mcpd/src/validation/llm.schema.ts @@ -14,6 +14,15 @@ export const ApiKeyRefSchema = z.object({ key: z.string().min(1), }); +/** + * v4: pool key. Same character set as `name` (lowercase + digits + hyphens) + * because at the CLI/yaml level the two share a namespace — agents and + * operators read both interchangeably and a stray uppercase or underscore + * here would be a confusing footgun. Empty string is rejected; use `null` + * (or omit the field) to declare "pool of 1, fall back to name". + */ +const PoolNameSchema = z.string().min(1).max(100).regex(/^[a-z0-9-]+$/, 'poolName must be lowercase alphanumeric with hyphens'); + export const CreateLlmSchema = z.object({ name: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/, 'Name must be lowercase alphanumeric with hyphens'), type: z.enum(LLM_TYPES), @@ -23,6 +32,7 @@ export const CreateLlmSchema = z.object({ description: z.string().max(500).default(''), apiKeyRef: ApiKeyRefSchema.optional(), extraConfig: z.record(z.unknown()).default({}), + poolName: PoolNameSchema.nullable().optional(), }); export const UpdateLlmSchema = z.object({ @@ -32,6 +42,7 @@ export const UpdateLlmSchema = z.object({ description: z.string().max(500).optional(), apiKeyRef: ApiKeyRefSchema.nullable().optional(), extraConfig: z.record(z.unknown()).optional(), + poolName: PoolNameSchema.nullable().optional(), }); export type CreateLlmInput = z.infer; diff --git a/src/mcpd/tests/chat-service-virtual-llm.test.ts b/src/mcpd/tests/chat-service-virtual-llm.test.ts index 44f44d2..a284928 100644 --- a/src/mcpd/tests/chat-service-virtual-llm.test.ts +++ b/src/mcpd/tests/chat-service-virtual-llm.test.ts @@ -71,17 +71,25 @@ function mockAgents(): AgentService { } function mockLlmsVirtual(): LlmService { + const baseRow = (name: string): Record => ({ + id: 'llm-1', name, type: 'openai', model: 'fake', + url: '', tier: 'fast', description: '', + apiKeySecretId: null, apiKeySecretKey: null, + extraConfig: {}, + poolName: null, + kind: 'virtual', + providerSessionId: null, + status: 'active', + lastHeartbeatAt: NOW, + inactiveSince: null, + version: 1, createdAt: NOW, updatedAt: NOW, + }); return { getByName: vi.fn(async (name: string) => ({ - id: 'llm-1', name, type: 'openai', model: 'fake', - url: '', tier: 'fast', description: '', - apiKeyRef: null, extraConfig: {}, - kind: 'virtual', - status: 'active', - lastHeartbeatAt: NOW, - inactiveSince: null, - version: 1, createdAt: NOW, updatedAt: NOW, + ...baseRow(name), + apiKeyRef: null, })), + findByPoolName: vi.fn(async (poolName: string) => [baseRow(poolName)]), resolveApiKey: vi.fn(async () => ''), } as unknown as LlmService; } diff --git a/src/mcpd/tests/chat-service.test.ts b/src/mcpd/tests/chat-service.test.ts index 11bdb07..2b14031 100644 --- a/src/mcpd/tests/chat-service.test.ts +++ b/src/mcpd/tests/chat-service.test.ts @@ -119,17 +119,30 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } | } function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService { + // v4: prepareContext now resolves a pool by effective key. For unit + // tests that pass a single agent.llm we return that one row twice — + // once for getByName (LlmView shape) and once for findByPoolName (raw + // Llm shape with the same name) so the dispatcher's poolCandidates + // ends up with exactly one member, matching pre-v4 behavior. + const baseRow = (name: string): Record => ({ + id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', + url: '', tier: 'fast', description: '', + apiKeySecretId: null, apiKeySecretKey: null, + extraConfig: {}, + poolName: null, + kind: opts.kind ?? 'public', + providerSessionId: null, + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, + version: 1, createdAt: NOW, updatedAt: NOW, + }); return { getByName: vi.fn(async (name: string) => ({ - id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', - url: '', tier: 'fast', description: '', - apiKeyRef: null, extraConfig: {}, - kind: opts.kind ?? 'public', - status: 'active', - lastHeartbeatAt: null, - inactiveSince: null, - version: 1, createdAt: NOW, updatedAt: NOW, + ...baseRow(name), + apiKeyRef: null, })), + findByPoolName: vi.fn(async (poolName: string) => [baseRow(poolName)]), resolveApiKey: vi.fn(async () => 'fake-key'), } as unknown as LlmService; } @@ -604,6 +617,176 @@ describe('ChatService', () => { .toEqual(['thinking via litellm...']); }); + // ── v4: LB pool by shared name ── + + // Helper: build a multi-member mock LlmService where all members share an + // effective pool key. `nameToFail` lets a test mark specific names as + // throwing on dispatch, exercising failover. + function mockLlmsPool(opts: { + pinnedName: string; + poolName: string | null; + members: Array<{ name: string; status?: 'active' | 'inactive'; kind?: 'public' | 'virtual' }>; + }): LlmService { + const baseRow = (m: { name: string; status?: 'active' | 'inactive'; kind?: 'public' | 'virtual' }): Record => ({ + id: `id-${m.name}`, + name: m.name, + type: 'openai', + model: 'qwen3-thinking', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {}, + poolName: opts.poolName, + kind: m.kind ?? 'public', + providerSessionId: null, + status: m.status ?? 'active', + lastHeartbeatAt: null, + inactiveSince: null, + version: 1, + createdAt: NOW, + updatedAt: NOW, + }); + return { + getByName: vi.fn(async (name: string) => { + const m = opts.members.find((x) => x.name === name) ?? opts.members[0]!; + return { ...baseRow(m), apiKeyRef: null }; + }), + findByPoolName: vi.fn(async () => opts.members.map(baseRow)), + resolveApiKey: vi.fn(async () => 'fake-key'), + } as unknown as LlmService; + } + + it('chat dispatches to a pool member and persists the reply (pool size N, primary returns)', async () => { + // Three healthy members; the (random) primary wins on first try. + // Adapter returns the same canned reply regardless of which member + // got picked because in this test we don't differentiate by name — + // we just assert that dispatch goes through and the agent gets a + // reply. Per-member assertion is covered by the failover test below. + const chatRepo = mockChatRepo(); + const adapter = scriptedAdapter([chatCompletion('hello from pool')]); + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1' }, + { name: 'qwen-prod-2' }, + { name: 'qwen-prod-3' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('hello from pool'); + expect(adapter.infer).toHaveBeenCalledTimes(1); + }); + + it('chat fails over to the next pool member when the first throws on dispatch', async () => { + // 3 members; first 2 throw, 3rd succeeds. Verify exactly 3 dispatches + // and the final reply propagates. + const chatRepo = mockChatRepo(); + let call = 0; + const adapter: LlmAdapter = { + kind: 'flaky', + infer: vi.fn(async () => { + call += 1; + if (call < 3) throw new Error(`transport-error-${String(call)}`); + return chatCompletion('survived to the third try').body !== undefined + ? { status: 200, body: chatCompletion('survived to the third try').body } + : (() => { throw new Error('unreachable'); })(); + }), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1' }, + { name: 'qwen-prod-2' }, + { name: 'qwen-prod-3' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('survived to the third try'); + expect(adapter.infer).toHaveBeenCalledTimes(3); + }); + + it('chat throws when every pool member throws (exhausted)', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'all-broken', + infer: vi.fn(async () => { throw new Error('transport-down'); }), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [{ name: 'qwen-prod-1' }, { name: 'qwen-prod-2' }], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/transport-down/); + expect(adapter.infer).toHaveBeenCalledTimes(2); + }); + + it('chat refuses with 404 when every pool member is inactive', async () => { + const chatRepo = mockChatRepo(); + const adapter = scriptedAdapter([chatCompletion('should never run')]); + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1', status: 'inactive' }, + { name: 'qwen-prod-2', status: 'inactive' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/No active Llm in pool/); + expect(adapter.infer).not.toHaveBeenCalled(); + }); + + it('chat picks a healthy sibling when the pinned Llm is itself inactive', async () => { + // The agent pins to qwen-prod-1 which is inactive. qwen-prod-2 is + // active. Pool dispatch must skip the dead pinned row and use the + // sibling instead — that's the whole point of v4. + const chatRepo = mockChatRepo(); + const adapter = scriptedAdapter([chatCompletion('via sibling')]); + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1', status: 'inactive' }, + { name: 'qwen-prod-2', status: 'active' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('via sibling'); + expect(adapter.infer).toHaveBeenCalledTimes(1); + }); + // Regression: per-agent maxIterations override + clamp. // Found by /gstack-review on 2026-04-25. // Without the clamp, a hostile agent definition with `extras.maxIterations:1000000` diff --git a/src/mcpd/tests/llm-routes.test.ts b/src/mcpd/tests/llm-routes.test.ts index 0a7ef6c..9ca116e 100644 --- a/src/mcpd/tests/llm-routes.test.ts +++ b/src/mcpd/tests/llm-routes.test.ts @@ -21,6 +21,12 @@ function makeLlm(overrides: Partial = {}): Llm { apiKeySecretId: null, apiKeySecretKey: null, extraConfig: {}, + poolName: null, + kind: 'public', + providerSessionId: null, + lastHeartbeatAt: null, + status: 'active', + inactiveSince: null, version: 1, createdAt: new Date(), updatedAt: new Date(), @@ -38,6 +44,17 @@ function mockRepo(initial: Llm[] = []): ILlmRepository { return null; }), findByTier: vi.fn(async () => []), + findByPoolName: vi.fn(async (poolName: string) => { + const out: Llm[] = []; + for (const r of rows.values()) { + if (r.poolName === poolName) out.push(r); + else if (r.poolName === null && r.name === poolName) out.push(r); + } + return out; + }), + findBySessionId: vi.fn(async () => []), + findStaleVirtuals: vi.fn(async () => []), + findExpiredInactives: vi.fn(async () => []), create: vi.fn(async (data) => { const row = makeLlm({ id: 'new-id', name: data.name, type: data.type, model: data.model }); rows.set(row.id, row); @@ -191,4 +208,50 @@ describe('Llm Routes', () => { const res = await app.inject({ method: 'DELETE', url: '/api/v1/llms/missing' }); expect(res.statusCode).toBe(404); }); + + // ── v4: GET /api/v1/llms/:name/members ── + + it('GET /api/v1/llms/:name/members returns all members of an explicit pool', async () => { + await createApp(mockRepo([ + makeLlm({ id: 'l1', name: 'qwen-prod-1', poolName: 'qwen-pool', model: 'qwen3' }), + makeLlm({ id: 'l2', name: 'qwen-prod-2', poolName: 'qwen-pool', model: 'qwen3' }), + makeLlm({ id: 'l3', name: 'qwen-prod-3', poolName: 'qwen-pool', model: 'qwen3', status: 'inactive' }), + makeLlm({ id: 'other', name: 'gpt-4o', poolName: null, model: 'gpt-4o' }), + ])); + // Hit via any pool member's name — the route resolves the anchor's + // effective pool key and lists all matching rows. + const res = await app.inject({ method: 'GET', url: '/api/v1/llms/qwen-prod-1/members' }); + expect(res.statusCode).toBe(200); + const body = res.json<{ + poolName: string; + explicitPoolName: string | null; + size: number; + activeCount: number; + members: Array<{ name: string }>; + }>(); + expect(body.poolName).toBe('qwen-pool'); + expect(body.explicitPoolName).toBe('qwen-pool'); + expect(body.size).toBe(3); + expect(body.activeCount).toBe(2); + expect(body.members.map((m) => m.name).sort()).toEqual(['qwen-prod-1', 'qwen-prod-2', 'qwen-prod-3']); + }); + + it('GET /api/v1/llms/:name/members for a solo Llm returns a pool of 1', async () => { + await createApp(mockRepo([ + makeLlm({ id: 'solo', name: 'gpt-4o', poolName: null, model: 'gpt-4o' }), + ])); + const res = await app.inject({ method: 'GET', url: '/api/v1/llms/gpt-4o/members' }); + expect(res.statusCode).toBe(200); + const body = res.json<{ poolName: string; explicitPoolName: string | null; size: number; activeCount: number }>(); + expect(body.poolName).toBe('gpt-4o'); + expect(body.explicitPoolName).toBeNull(); + expect(body.size).toBe(1); + expect(body.activeCount).toBe(1); + }); + + it('GET /api/v1/llms/:name/members returns 404 when the anchor name does not exist', async () => { + await createApp(mockRepo()); + const res = await app.inject({ method: 'GET', url: '/api/v1/llms/nope/members' }); + expect(res.statusCode).toBe(404); + }); }); diff --git a/src/mcplocal/src/http/config.ts b/src/mcplocal/src/http/config.ts index 9e9cd5e..d06b9b6 100644 --- a/src/mcplocal/src/http/config.ts +++ b/src/mcplocal/src/http/config.ts @@ -93,6 +93,18 @@ export interface LlmProviderFileEntry { * - `command`: spawn a shell command (e.g. `systemctl --user start vllm`) */ wake?: WakeRecipe; + /** + * v4: opt this provider into a load-balanced pool. When set, the + * published Llm row carries `poolName` and stacks with any other Llms + * (public OR virtual) sharing the same value. Agents pinned to any + * pool member dispatch across all healthy members at chat time. + * + * Convention for distributed-compute setups: each user's mcplocal + * picks a unique `name` (e.g. `vllm--qwen3`) but a shared + * `poolName` (e.g. `user-vllm-qwen3-thinking`). Result: agents see one + * logical pool that auto-grows as more workers come online. + */ + poolName?: string; } export type WakeRecipe = diff --git a/src/mcplocal/src/main.ts b/src/mcplocal/src/main.ts index 9300a20..c4d901a 100644 --- a/src/mcplocal/src/main.ts +++ b/src/mcplocal/src/main.ts @@ -218,6 +218,7 @@ async function maybeStartVirtualLlmRegistrar( }; if (entry.tier !== undefined) item.tier = entry.tier; if (entry.wake !== undefined) item.wake = entry.wake; + if (entry.poolName !== undefined) item.poolName = entry.poolName; published.push(item); } // v3: forward locally-declared agents alongside the providers. We diff --git a/src/mcplocal/src/providers/registrar.ts b/src/mcplocal/src/providers/registrar.ts index cd7e46d..aedc23e 100644 --- a/src/mcplocal/src/providers/registrar.ts +++ b/src/mcplocal/src/providers/registrar.ts @@ -54,6 +54,12 @@ export interface RegistrarPublishedProvider { * the registrar runs this recipe and waits for the backend to come up. */ wake?: WakeRecipe; + /** + * v4: optional pool key. When set, the published Llm row carries + * `poolName` and stacks with any other Llms sharing the same value. + * Agents pinned to any pool member dispatch across all healthy members. + */ + poolName?: string; } /** @@ -185,6 +191,7 @@ export class VirtualLlmRegistrar { model: p.model, ...(p.tier !== undefined ? { tier: p.tier } : {}), ...(p.description !== undefined ? { description: p.description } : {}), + ...(p.poolName !== undefined ? { poolName: p.poolName } : {}), initialStatus, }; })); diff --git a/src/mcplocal/tests/smoke/llm-pool.smoke.test.ts b/src/mcplocal/tests/smoke/llm-pool.smoke.test.ts new file mode 100644 index 0000000..61abdfc --- /dev/null +++ b/src/mcplocal/tests/smoke/llm-pool.smoke.test.ts @@ -0,0 +1,282 @@ +/** + * v4 smoke: LB pool by shared `poolName`. Spins up two in-process + * registrars publishing virtual Llms with distinct `name`s but a + * shared `poolName`. Verifies: + * + * - both rows show in `GET /api/v1/llms//members` with the + * correct effective pool key + active count. + * - chat through an agent pinned to one pool member dispatches + * across the pool (proven by the second member's content showing + * up at least once across N calls). + * - failover: stop one publisher, chat continues to work via the + * surviving member. + * + * Lifecycle (heartbeat-stale, 4 h GC) is unit-test territory; smoke + * just covers the path the operator-facing flow runs through. + */ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import http from 'node:http'; +import https from 'node:https'; +import { mkdtempSync, rmSync, readFileSync, existsSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, + type RegistrarPublishedAgent, +} from '../../src/providers/registrar.js'; +import type { LlmProvider, CompletionResult } from '../../src/providers/types.js'; + +const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; +const SUFFIX = Date.now().toString(36); +const POOL_NAME = `smoke-pool-${SUFFIX}`; +const PROVIDER_A = `smoke-pool-a-${SUFFIX}`; +const PROVIDER_B = `smoke-pool-b-${SUFFIX}`; +const AGENT_NAME = `smoke-pool-agent-${SUFFIX}`; + +function makeFakeProvider(name: string, content: string): LlmProvider { + return { + name, + async complete(): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +function healthz(url: string, timeoutMs = 5000): Promise { + return new Promise((resolve) => { + const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`); + const driver = parsed.protocol === 'https:' ? https : http; + const req = driver.get({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname, + timeout: timeoutMs, + }, (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); }); + req.on('error', () => resolve(false)); + req.on('timeout', () => { req.destroy(); resolve(false); }); + }); +} + +function readToken(): string | null { + try { + const path = join(process.env.HOME ?? '', '.mcpctl', 'credentials'); + if (!existsSync(path)) return null; + const parsed = JSON.parse(readFileSync(path, 'utf-8')) as { token?: string }; + return parsed.token ?? null; + } catch { + return null; + } +} + +interface HttpResponse { status: number; body: string } + +function httpRequest(method: string, urlStr: string, body: unknown): Promise { + return new Promise((resolve, reject) => { + const tokenRaw = readToken(); + const parsed = new URL(urlStr); + const driver = parsed.protocol === 'https:' ? https : http; + const headers: Record = { + Accept: 'application/json', + ...(body !== undefined ? { 'Content-Type': 'application/json' } : {}), + ...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}), + }; + const req = driver.request({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname + parsed.search, + method, + headers, + timeout: 30_000, + }, (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => { + resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') }); + }); + }); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); }); + if (body !== undefined) req.write(JSON.stringify(body)); + req.end(); + }); +} + +let mcpdUp = false; +let registrarA: VirtualLlmRegistrar | null = null; +let registrarB: VirtualLlmRegistrar | null = null; +let tempDir: string; + +describe('llm-pool smoke (v4)', () => { + beforeAll(async () => { + mcpdUp = await healthz(MCPD_URL); + if (!mcpdUp) { + // eslint-disable-next-line no-console + console.warn(`\n ○ llm-pool smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`); + return; + } + if (readToken() === null) { + mcpdUp = false; + // eslint-disable-next-line no-console + console.warn('\n ○ llm-pool smoke: skipped — no ~/.mcpctl/credentials.\n'); + return; + } + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-llm-pool-smoke-')); + }, 20_000); + + afterAll(async () => { + if (registrarA !== null) registrarA.stop(); + if (registrarB !== null) registrarB.stop(); + if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true }); + if (!mcpdUp) return; + // Best-effort cleanup. Agent first (Restrict FK), then both Llms. + const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + if (agents.status === 200) { + const rows = JSON.parse(agents.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === AGENT_NAME); + if (row !== undefined) await httpRequest('DELETE', `${MCPD_URL}/api/v1/agents/${row.id}`, undefined); + } + const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + if (llms.status === 200) { + const rows = JSON.parse(llms.body) as Array<{ id: string; name: string }>; + for (const r of rows) { + if (r.name === PROVIDER_A || r.name === PROVIDER_B) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${r.id}`, undefined); + } + } + } + }); + + it('two publishers with shared poolName show up in /api/v1/llms//members', async () => { + if (!mcpdUp) return; + const token = readToken(); + if (token === null) return; + + // Publisher A — also publishes the agent so we can chat through the pool. + const pubA: RegistrarPublishedProvider = { + provider: makeFakeProvider(PROVIDER_A, 'reply from A'), + type: 'openai', + model: 'fake-pool', + poolName: POOL_NAME, + }; + const pubAgent: RegistrarPublishedAgent = { + name: AGENT_NAME, + // Agent pins to publisher A specifically — pool dispatch then widens + // at chat time. Demonstrates the v4 transparency: pinning to one + // member implicitly opts the agent into the whole pool. + llmName: PROVIDER_A, + description: 'v4 pool smoke', + systemPrompt: 'Reply with whatever the backend returns.', + }; + registrarA = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: [pubA], + publishedAgents: [pubAgent], + sessionFilePath: join(tempDir, 'session-a'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrarA.start(); + + // Publisher B — same poolName, different name. No agent. + const pubB: RegistrarPublishedProvider = { + provider: makeFakeProvider(PROVIDER_B, 'reply from B'), + type: 'openai', + model: 'fake-pool', + poolName: POOL_NAME, + }; + registrarB = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: [pubB], + sessionFilePath: join(tempDir, 'session-b'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrarB.start(); + + // Let both registers settle on mcpd's side. + await new Promise((r) => setTimeout(r, 600)); + + // Hit the new /members endpoint via either pool member's name. + const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms/${PROVIDER_A}/members`, undefined); + expect(res.status).toBe(200); + const body = JSON.parse(res.body) as { + poolName: string; + explicitPoolName: string | null; + size: number; + activeCount: number; + members: Array<{ name: string; poolName: string | null; status: string }>; + }; + expect(body.poolName).toBe(POOL_NAME); + expect(body.explicitPoolName).toBe(POOL_NAME); + expect(body.size).toBe(2); + expect(body.activeCount).toBe(2); + const names = body.members.map((m) => m.name).sort(); + expect(names).toEqual([PROVIDER_A, PROVIDER_B].sort()); + for (const m of body.members) { + expect(m.poolName).toBe(POOL_NAME); + } + }, 30_000); + + it('chat through the agent dispatches across both pool members over multiple calls', async () => { + if (!mcpdUp) return; + // The chat dispatcher randomly shuffles candidates per call. Run + // enough turns that hitting only one member would be statistically + // suspicious (P(hit only A or only B) over 12 calls is ~1/2048 if the + // shuffle is fair). We assert >= one of each. + const seen = new Set(); + for (let i = 0; i < 12; i += 1) { + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/agents/${AGENT_NAME}/chat`, { + message: `ping ${String(i)}`, + stream: false, + }); + expect(res.status, res.body).toBe(200); + const body = JSON.parse(res.body) as { assistant: string }; + seen.add(body.assistant); + if (seen.has('reply from A') && seen.has('reply from B')) break; + } + expect(seen.has('reply from A'), `pool dispatch should have hit A at least once; saw: ${[...seen].join(', ')}`).toBe(true); + expect(seen.has('reply from B'), `pool dispatch should have hit B at least once; saw: ${[...seen].join(', ')}`).toBe(true); + }, 90_000); + + it('failover: stop one publisher, chat still succeeds via the surviving member', async () => { + if (!mcpdUp) return; + // Stop publisher B. mcpd's unbindSession flips B's row to inactive + // synchronously on SSE close, so the next chat's pool resolution + // skips it. + if (registrarB !== null) { + registrarB.stop(); + registrarB = null; + } + await new Promise((r) => setTimeout(r, 400)); + + // Confirm B is inactive in /members. + const members = await httpRequest('GET', `${MCPD_URL}/api/v1/llms/${PROVIDER_A}/members`, undefined); + expect(members.status).toBe(200); + const body = JSON.parse(members.body) as { + members: Array<{ name: string; status: string }>; + }; + const memB = body.members.find((m) => m.name === PROVIDER_B); + expect(memB?.status).toBe('inactive'); + + // Chat continues to work — only A responds now. + for (let i = 0; i < 3; i += 1) { + const res = await httpRequest('POST', `${MCPD_URL}/api/v1/agents/${AGENT_NAME}/chat`, { + message: `post-failover ping ${String(i)}`, + stream: false, + }); + expect(res.status, res.body).toBe(200); + const out = JSON.parse(res.body) as { assistant: string }; + expect(out.assistant).toBe('reply from A'); + } + }, 30_000); +});