diff --git a/src/db/prisma/migrations/20260427205303_add_llm_pool_name/migration.sql b/src/db/prisma/migrations/20260427205303_add_llm_pool_name/migration.sql new file mode 100644 index 0000000..eaa1de1 --- /dev/null +++ b/src/db/prisma/migrations/20260427205303_add_llm_pool_name/migration.sql @@ -0,0 +1,10 @@ +-- v4: optional pool key. When NULL, the effective pool key is the row's +-- own `name` (pool of 1, identical to pre-v4 behavior). Multiple Llms +-- sharing a non-null `poolName` stack into one load-balanced pool that +-- the chat dispatcher expands at request time. +ALTER TABLE "Llm" ADD COLUMN "poolName" TEXT; + +-- Index covers both the dispatcher's `WHERE poolName = $1` lookup and +-- the v4 admin endpoint `GET /api/v1/llms//members` (which expands +-- by effective pool key). +CREATE INDEX "Llm_poolName_idx" ON "Llm"("poolName"); diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index 8b64ad8..403f18d 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -211,6 +211,14 @@ model Llm { apiKeySecretId String? // FK to Secret apiKeySecretKey String? // key inside the Secret's data extraConfig Json @default("{}") // per-type extras + // ── LB pool by name (v4) ── + // When set, this Llm is part of a load-balanced pool. Multiple Llms + // sharing the same `poolName` stack into a pool that the chat dispatcher + // expands at request time and picks an active member from. When NULL, + // the effective pool key is the row's own `name` (i.e. "pool of 1", + // identical to pre-v4 behavior). Agents pin to a single Llm by id; the + // dispatcher transparently widens to the pool when this field is set. + poolName String? // ── Virtual-provider lifecycle (NULL/default for kind=public) ── kind LlmKind @default(public) providerSessionId String? // mcplocal session that owns this row when virtual @@ -229,6 +237,7 @@ model Llm { @@index([apiKeySecretId]) @@index([kind, status]) @@index([providerSessionId]) + @@index([poolName]) } // ── Groups ── diff --git a/src/db/tests/llm-pool-schema.test.ts b/src/db/tests/llm-pool-schema.test.ts new file mode 100644 index 0000000..6253f8b --- /dev/null +++ b/src/db/tests/llm-pool-schema.test.ts @@ -0,0 +1,129 @@ +/** + * v4 schema-level tests for `Llm.poolName` and the dispatcher's + * `findByPoolName` query semantics. Lives in the db package because it + * exercises the actual Prisma column + index — the mcpd-side unit tests + * already cover the dispatcher's behavior with a mocked LlmService. + */ +import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest'; +import type { PrismaClient } from '@prisma/client'; +import { setupTestDb, cleanupTestDb, clearAllTables } from './helpers.js'; + +/** Re-implementation of the LlmRepository query for direct schema verification. */ +function findByPoolName(prisma: PrismaClient, poolName: string) { + return prisma.llm.findMany({ + where: { + OR: [ + { poolName }, + { AND: [{ poolName: null }, { name: poolName }] }, + ], + }, + orderBy: { name: 'asc' }, + }); +} + +describe('Llm.poolName (v4)', () => { + let prisma: PrismaClient; + + beforeAll(async () => { + prisma = await setupTestDb(); + }, 30_000); + + afterAll(async () => { + await cleanupTestDb(); + }); + + beforeEach(async () => { + await clearAllTables(prisma); + }); + + it('defaults poolName to NULL for freshly inserted rows', async () => { + const llm = await prisma.llm.create({ + data: { name: 'plain', type: 'openai', model: 'gpt-4o' }, + }); + expect(llm.poolName).toBeNull(); + }); + + it('allows multiple rows to share a poolName (the v4 stacking behavior)', async () => { + await prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + await prisma.llm.create({ + data: { name: 'qwen-prod-2', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + await prisma.llm.create({ + data: { name: 'qwen-prod-3', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + + const members = await findByPoolName(prisma, 'qwen-pool'); + expect(members.map((m) => m.name).sort()).toEqual(['qwen-prod-1', 'qwen-prod-2', 'qwen-prod-3']); + }); + + it('keeps `name` globally unique even when multiple rows share a poolName', async () => { + await prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + await expect( + prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }), + ).rejects.toThrow(); + }); + + it('falls back to `name` as the effective pool key for solo rows (poolName=NULL)', async () => { + // Solo row addressable via its own name as a "pool of 1". + await prisma.llm.create({ + data: { name: 'gpt-4o', type: 'openai', model: 'gpt-4o' }, + }); + const members = await findByPoolName(prisma, 'gpt-4o'); + expect(members.map((m) => m.name)).toEqual(['gpt-4o']); + }); + + it('a solo row with name=X joins the same pool as explicit poolName=X members', async () => { + // Edge case: an existing solo Llm named "qwen-pool" pre-dates pool + // adoption, then a publisher registers with poolName=qwen-pool. Both + // should appear in the dispatcher's candidate list — the effective + // pool key (poolName ?? name) is "qwen-pool" for each. + await prisma.llm.create({ + data: { name: 'qwen-pool', type: 'openai', model: 'qwen3-thinking' }, + }); + await prisma.llm.create({ + data: { name: 'qwen-prod-2', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' }, + }); + + const members = await findByPoolName(prisma, 'qwen-pool'); + expect(members.map((m) => m.name).sort()).toEqual(['qwen-pool', 'qwen-prod-2']); + }); + + it('does not match a solo row by `name` when its poolName is set to something else', async () => { + // Solo with name=foo but poolName=bar should NOT match findByPoolName('foo') + // — the explicit poolName takes precedence over the name fallback. + await prisma.llm.create({ + data: { name: 'foo', type: 'openai', model: 'm', poolName: 'bar' }, + }); + const members = await findByPoolName(prisma, 'foo'); + expect(members.map((m) => m.name)).toEqual([]); + + const inBar = await findByPoolName(prisma, 'bar'); + expect(inBar.map((m) => m.name)).toEqual(['foo']); + }); + + it('updates poolName via update() and round-trips correctly', async () => { + const llm = await prisma.llm.create({ + data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking' }, + }); + expect(llm.poolName).toBeNull(); + + const updated = await prisma.llm.update({ + where: { id: llm.id }, + data: { poolName: 'qwen-pool' }, + }); + expect(updated.poolName).toBe('qwen-pool'); + + // Revert to solo (NULL). + const reverted = await prisma.llm.update({ + where: { id: llm.id }, + data: { poolName: null }, + }); + expect(reverted.poolName).toBeNull(); + }); +}); diff --git a/src/mcpd/src/repositories/llm.repository.ts b/src/mcpd/src/repositories/llm.repository.ts index 06a5868..8942e19 100644 --- a/src/mcpd/src/repositories/llm.repository.ts +++ b/src/mcpd/src/repositories/llm.repository.ts @@ -10,6 +10,8 @@ export interface CreateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + // v4: optional pool key. NULL = "pool of 1" (effective key falls back to `name`). + poolName?: string | null; // Virtual-provider lifecycle (omit for kind=public). kind?: LlmKind; providerSessionId?: string | null; @@ -27,6 +29,7 @@ export interface UpdateLlmInput { apiKeySecretId?: string | null; apiKeySecretKey?: string | null; extraConfig?: Record; + poolName?: string | null; // Virtual-provider lifecycle. VirtualLlmService is the only writer for // these in v1; the public CRUD path leaves them undefined. kind?: LlmKind; @@ -41,6 +44,13 @@ export interface ILlmRepository { findById(id: string): Promise; findByName(name: string): Promise; findByTier(tier: string): Promise; + /** + * v4: members of an effective pool, ordered by name. The pool is + * defined as: `WHERE poolName = $1 OR (poolName IS NULL AND name = $1)`. + * For a non-pooled Llm this returns exactly the row whose own name is + * the key. For a pool of N, returns all N members. + */ + findByPoolName(poolName: string): Promise; create(data: CreateLlmInput): Promise; update(id: string, data: UpdateLlmInput): Promise; delete(id: string): Promise; @@ -69,6 +79,22 @@ export class LlmRepository implements ILlmRepository { return this.prisma.llm.findMany({ where: { tier }, orderBy: { name: 'asc' } }); } + async findByPoolName(poolName: string): Promise { + return this.prisma.llm.findMany({ + where: { + OR: [ + { poolName }, + // Solo rows fall back to their own `name` as the effective pool + // key. Querying for "qwen-pool" must also pick up a solo row + // whose name is "qwen-pool" (poolName=null) — otherwise existing + // single-Llm targets would silently disappear from the pool view. + { AND: [{ poolName: null }, { name: poolName }] }, + ], + }, + orderBy: { name: 'asc' }, + }); + } + async create(data: CreateLlmInput): Promise { return this.prisma.llm.create({ data: { @@ -81,6 +107,7 @@ export class LlmRepository implements ILlmRepository { apiKeySecretId: data.apiKeySecretId ?? null, apiKeySecretKey: data.apiKeySecretKey ?? null, extraConfig: (data.extraConfig ?? {}) as Prisma.InputJsonValue, + ...(data.poolName !== undefined ? { poolName: data.poolName } : {}), ...(data.kind !== undefined ? { kind: data.kind } : {}), ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), ...(data.status !== undefined ? { status: data.status } : {}), @@ -104,6 +131,7 @@ export class LlmRepository implements ILlmRepository { } if (data.apiKeySecretKey !== undefined) updateData.apiKeySecretKey = data.apiKeySecretKey; if (data.extraConfig !== undefined) updateData.extraConfig = data.extraConfig as Prisma.InputJsonValue; + if (data.poolName !== undefined) updateData.poolName = data.poolName; if (data.kind !== undefined) updateData.kind = data.kind; if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; if (data.status !== undefined) updateData.status = data.status; diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index ff6aba8..80fe37c 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -24,6 +24,7 @@ import type { ChatMessage } from '@prisma/client'; import type { AgentService } from './agent.service.js'; import type { LlmService } from './llm.service.js'; +import { effectivePoolName } from './llm.service.js'; import type { LlmAdapterRegistry } from './llm/dispatcher.js'; import type { IChatRepository, @@ -101,6 +102,23 @@ export interface ChatStreamChunk { message?: string; } +/** + * v4: a single resolvable pool member. The chat dispatcher iterates a + * non-empty list of these on transport-level failure (network, virtual + * publisher disconnect, etc.) until one succeeds or the list is exhausted. + * For pool size 1 (no `poolName` set on the pinned Llm) the list has one + * entry and behavior is identical to pre-v4. + */ +export interface PoolCandidate { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; +} + export interface ChatRequestArgs { agentName: string; threadId?: string; @@ -384,27 +402,54 @@ export class ChatService { * The caller's `parseStreamingChunk` already speaks OpenAI shape, so * downstream code doesn't need to know which path produced the chunks. */ + /** + * Stream inference with v4 pool failover. We commit to a candidate the + * moment the first chunk arrives — partial output cannot be re-streamed + * from a different backend without confusing the caller. Failover + * therefore covers only "couldn't establish stream" errors (transport + * failure pre-first-chunk). After the first yield, exceptions propagate. + */ private async *streamInference(ctx: { - llmName: string; - llmType: string; - llmKind: 'public' | 'virtual'; - modelOverride: string; - url: string; - apiKey: string; - extraConfig: Record; + poolCandidates: PoolCandidate[]; history: OpenAiMessage[]; systemBlock: string; toolList: ChatTool[]; mergedParams: AgentChatParams; }): AsyncGenerator<{ data: string; done?: boolean }> { - if (ctx.llmKind !== 'virtual') { - const adapter = this.adapters.get(ctx.llmType); + let lastErr: Error | null = null; + for (const c of ctx.poolCandidates) { + try { + yield* this.streamOneCandidate(c, ctx); + return; + } catch (err) { + lastErr = err as Error; + // Try the next pool member only if no chunk has been yielded yet. + // streamOneCandidate throws *before* its first yield on dispatch + // failure; once it begins yielding, any error inside it propagates + // out of this `for` loop because the caller has already started + // consuming the stream. + } + } + throw lastErr ?? new Error('chat stream dispatch exhausted: no pool candidates'); + } + + private async *streamOneCandidate( + candidate: PoolCandidate, + ctx: { + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }, + ): AsyncGenerator<{ data: string; done?: boolean }> { + if (candidate.llmKind !== 'virtual') { + const adapter = this.adapters.get(candidate.llmType); yield* adapter.stream({ - body: { ...this.buildBody(ctx), stream: true }, - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, + body: { ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true }, + modelOverride: candidate.modelOverride, + apiKey: candidate.apiKey, + url: candidate.url, + extraConfig: candidate.extraConfig, }); return; } @@ -418,8 +463,8 @@ export class ChatService { // generator drains them in order. ref.done resolves when the // publisher emits its `[DONE]` marker. const ref = await this.virtualLlms.enqueueInferTask( - ctx.llmName, - { ...this.buildBody(ctx), stream: true }, + candidate.llmName, + { ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true }, true, ); const queue: Array<{ data: string; done?: boolean }> = []; @@ -453,43 +498,62 @@ export class ChatService { } /** - * Run a single non-streaming inference iteration. Branches on - * ctx.llmKind: public goes through the existing adapter registry, - * virtual relays through VirtualLlmService.enqueueInferTask (mirrors - * the same branch in `routes/llm-infer.ts` from v1 Stage 3). - * - * Throws when virtualLlms isn't wired but the row is virtual — older - * test wirings hit this path. + * Run a single non-streaming inference iteration with v4 pool failover. + * We try each pool candidate in order; on dispatch error (transport + * failure, virtual publisher disconnect) we move to the next member. + * Auth/4xx errors that the adapter surfaces as a status-code in the + * result are NOT retried — those would fail identically on a sibling + * with the same key/model, and the cost of one wasted retry per call + * is real. */ private async runOneInference(ctx: { - llmName: string; - llmType: string; - llmKind: 'public' | 'virtual'; - modelOverride: string; - url: string; - apiKey: string; - extraConfig: Record; + poolCandidates: PoolCandidate[]; history: OpenAiMessage[]; systemBlock: string; toolList: ChatTool[]; mergedParams: AgentChatParams; }): Promise<{ status: number; body: unknown }> { - if (ctx.llmKind === 'virtual') { + let lastErr: Error | null = null; + for (const c of ctx.poolCandidates) { + try { + return await this.dispatchOneCandidate(c, ctx); + } catch (err) { + lastErr = err as Error; + // Try the next pool member. + } + } + throw lastErr ?? new Error('chat dispatch exhausted: no pool candidates'); + } + + private async dispatchOneCandidate( + candidate: PoolCandidate, + ctx: { + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }, + ): Promise<{ status: number; body: unknown }> { + if (candidate.llmKind === 'virtual') { if (this.virtualLlms === undefined) { throw new Error( 'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm', ); } - const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false); + const ref = await this.virtualLlms.enqueueInferTask( + candidate.llmName, + this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), + false, + ); return ref.done; } - const adapter = this.adapters.get(ctx.llmType); + const adapter = this.adapters.get(candidate.llmType); return adapter.infer({ - body: this.buildBody(ctx), - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, + body: this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), + modelOverride: candidate.modelOverride, + apiKey: candidate.apiKey, + url: candidate.url, + extraConfig: candidate.extraConfig, }); } @@ -497,14 +561,14 @@ export class ChatService { threadId: string; history: OpenAiMessage[]; systemBlock: string; - llmName: string; - llmType: string; - /** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */ - llmKind: 'public' | 'virtual'; - modelOverride: string; - url: string; - apiKey: string; - extraConfig: Record; + /** + * v4: ordered pool members for this turn. [0] is the candidate to try + * first (random shuffle of viable members so load spreads). On + * transport-level failure the dispatcher iterates the rest. Always + * non-empty — at minimum the agent's pinned Llm (or a sibling pool + * member when the pinned row is itself inactive but others are up). + */ + poolCandidates: PoolCandidate[]; mergedParams: AgentChatParams; toolList: ChatTool[]; projectId: string | null; @@ -512,8 +576,18 @@ export class ChatService { maxIterations: number; }> { const agent = await this.agents.getByName(args.agentName); - const llm = await this.llms.getByName(agent.llm.name); - const apiKey = await this.llms.resolveApiKey(agent.llm.name).catch(() => ''); + const pinned = await this.llms.getByName(agent.llm.name); + const poolCandidates = await this.resolvePoolCandidates(pinned); + if (poolCandidates.length === 0) { + // Pool exists but every member is inactive (publishers offline, + // public Llm explicitly disabled, etc.). Surface a clear error + // before running through the rest of preparation; otherwise the + // chat loop would hit the dispatcher's "exhausted" branch and the + // caller would see a less helpful message. + throw new NotFoundError( + `No active Llm in pool '${effectivePoolName(pinned)}' (pinned: ${pinned.name})`, + ); + } const threadId = await this.resolveThreadId(args, agent.id); const projectId = agent.project?.id ?? null; @@ -581,13 +655,7 @@ export class ChatService { threadId, history, systemBlock, - llmName: llm.name, - llmType: llm.type, - llmKind: llm.kind, - modelOverride: llm.model, - url: llm.url, - apiKey, - extraConfig: llm.extraConfig, + poolCandidates, mergedParams, toolList: filteredTools, projectId, @@ -596,6 +664,40 @@ export class ChatService { }; } + /** + * v4: resolve the load-balanced pool for an agent's pinned Llm. The + * pool is "all Llms whose effective key (poolName ?? name) matches the + * pinned row's effective key", filtered to viable backends: + * - public: always included (no per-row health probe in v4) + * - virtual: included when status is 'active' or 'hibernating' + * (hibernating members get woken by VirtualLlmService when + * dispatched). 'inactive' = publisher offline, exclude. + * + * Order is randomised so load spreads across members. The pinned row + * doesn't get priority — if it happens to be down, a sibling takes + * the call without the caller noticing. + */ + private async resolvePoolCandidates(pinned: { name: string; poolName: string | null }): Promise { + const poolKey = effectivePoolName(pinned); + const rows = await this.llms.findByPoolName(poolKey); + const viable = rows.filter((r) => r.status !== 'inactive'); + // Fisher-Yates is overkill for typical pool sizes (1-5); a + // sort-by-random-key is adequate and side-effect-free. + const shuffled = [...viable].sort(() => Math.random() - 0.5); + return Promise.all(shuffled.map(async (r) => { + const apiKey = await this.llms.resolveApiKey(r.name).catch(() => ''); + return { + llmName: r.name, + llmType: r.type, + llmKind: r.kind as 'public' | 'virtual', + modelOverride: r.model, + url: r.url, + apiKey, + extraConfig: r.extraConfig as Record, + }; + })); + } + /** * Resolves a personality (request override → agent default) and returns * its bound prompt contents in `PersonalityPrompt.priority` desc order. diff --git a/src/mcpd/src/services/llm.service.ts b/src/mcpd/src/services/llm.service.ts index 92c5edd..c3c7ff9 100644 --- a/src/mcpd/src/services/llm.service.ts +++ b/src/mcpd/src/services/llm.service.ts @@ -50,6 +50,13 @@ export interface LlmView { description: string; apiKeyRef: ApiKeyRef | null; extraConfig: Record; + /** + * v4: explicit pool key. NULL means "pool of 1" — the row's effective + * pool key falls back to its own `name`. Multiple rows sharing a non-null + * `poolName` stack into one load-balanced pool that the chat dispatcher + * expands at request time. + */ + poolName: string | null; // Virtual-provider lifecycle (kind defaults to 'public' for legacy rows). kind: 'public' | 'virtual'; status: 'active' | 'inactive' | 'hibernating'; @@ -60,6 +67,16 @@ export interface LlmView { updatedAt: Date; } +/** + * v4: compute the effective pool key for an Llm. When `poolName` is + * explicitly set we use it; otherwise we fall back to the row's own + * `name` so a non-pooled Llm dispatches as a "pool of 1" without any + * branching at the call site. + */ +export function effectivePoolName(row: { name: string; poolName: string | null }): string { + return row.poolName !== null && row.poolName !== '' ? row.poolName : row.name; +} + export class LlmService { constructor( private readonly repo: ILlmRepository, @@ -84,6 +101,20 @@ export class LlmService { return this.toView(row); } + /** + * v4: members of an effective pool. The pool is defined as: + * `WHERE poolName = $1 OR (poolName IS NULL AND name = $1)` + * — solo Llms (poolName=null) are addressable by their own name as a + * pool of 1; explicit pool members share a `poolName`. Returns raw + * `Llm` rows because the chat dispatcher needs per-row api key + * resolution to build its candidate list — `toView` would force a + * round-trip through SecretService for every row, which is wasted + * work for a pool of N where most members are about to be skipped. + */ + async findByPoolName(poolName: string): Promise { + return this.repo.findByPoolName(poolName); + } + async create(input: unknown, opts: { skipAuthCheck?: boolean } = {}): Promise { const data = CreateLlmSchema.parse(input); const existing = await this.repo.findByName(data.name); @@ -117,6 +148,7 @@ export class LlmService { apiKeySecretId: apiKeyFields.id, apiKeySecretKey: apiKeyFields.key, extraConfig: data.extraConfig, + ...(data.poolName !== undefined ? { poolName: data.poolName } : {}), }); return this.toView(row); } @@ -131,6 +163,8 @@ export class LlmService { if (data.tier !== undefined) updateFields.tier = data.tier; if (data.description !== undefined) updateFields.description = data.description; if (data.extraConfig !== undefined) updateFields.extraConfig = data.extraConfig; + // poolName: null → explicit unlink (revert to "pool of 1"); string → set; undefined → leave alone. + if (data.poolName !== undefined) updateFields.poolName = data.poolName; // apiKeyRef: null → explicit unlink; object → replace; undefined → leave alone. if (data.apiKeyRef !== undefined) { @@ -280,6 +314,7 @@ export class LlmService { description: row.description, apiKeyRef, extraConfig: row.extraConfig as Record, + poolName: row.poolName, kind: row.kind, status: row.status, lastHeartbeatAt: row.lastHeartbeatAt, diff --git a/src/mcpd/src/validation/llm.schema.ts b/src/mcpd/src/validation/llm.schema.ts index 3982a5f..d8e0d24 100644 --- a/src/mcpd/src/validation/llm.schema.ts +++ b/src/mcpd/src/validation/llm.schema.ts @@ -14,6 +14,15 @@ export const ApiKeyRefSchema = z.object({ key: z.string().min(1), }); +/** + * v4: pool key. Same character set as `name` (lowercase + digits + hyphens) + * because at the CLI/yaml level the two share a namespace — agents and + * operators read both interchangeably and a stray uppercase or underscore + * here would be a confusing footgun. Empty string is rejected; use `null` + * (or omit the field) to declare "pool of 1, fall back to name". + */ +const PoolNameSchema = z.string().min(1).max(100).regex(/^[a-z0-9-]+$/, 'poolName must be lowercase alphanumeric with hyphens'); + export const CreateLlmSchema = z.object({ name: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/, 'Name must be lowercase alphanumeric with hyphens'), type: z.enum(LLM_TYPES), @@ -23,6 +32,7 @@ export const CreateLlmSchema = z.object({ description: z.string().max(500).default(''), apiKeyRef: ApiKeyRefSchema.optional(), extraConfig: z.record(z.unknown()).default({}), + poolName: PoolNameSchema.nullable().optional(), }); export const UpdateLlmSchema = z.object({ @@ -32,6 +42,7 @@ export const UpdateLlmSchema = z.object({ description: z.string().max(500).optional(), apiKeyRef: ApiKeyRefSchema.nullable().optional(), extraConfig: z.record(z.unknown()).optional(), + poolName: PoolNameSchema.nullable().optional(), }); export type CreateLlmInput = z.infer; diff --git a/src/mcpd/tests/chat-service-virtual-llm.test.ts b/src/mcpd/tests/chat-service-virtual-llm.test.ts index 44f44d2..a284928 100644 --- a/src/mcpd/tests/chat-service-virtual-llm.test.ts +++ b/src/mcpd/tests/chat-service-virtual-llm.test.ts @@ -71,17 +71,25 @@ function mockAgents(): AgentService { } function mockLlmsVirtual(): LlmService { + const baseRow = (name: string): Record => ({ + id: 'llm-1', name, type: 'openai', model: 'fake', + url: '', tier: 'fast', description: '', + apiKeySecretId: null, apiKeySecretKey: null, + extraConfig: {}, + poolName: null, + kind: 'virtual', + providerSessionId: null, + status: 'active', + lastHeartbeatAt: NOW, + inactiveSince: null, + version: 1, createdAt: NOW, updatedAt: NOW, + }); return { getByName: vi.fn(async (name: string) => ({ - id: 'llm-1', name, type: 'openai', model: 'fake', - url: '', tier: 'fast', description: '', - apiKeyRef: null, extraConfig: {}, - kind: 'virtual', - status: 'active', - lastHeartbeatAt: NOW, - inactiveSince: null, - version: 1, createdAt: NOW, updatedAt: NOW, + ...baseRow(name), + apiKeyRef: null, })), + findByPoolName: vi.fn(async (poolName: string) => [baseRow(poolName)]), resolveApiKey: vi.fn(async () => ''), } as unknown as LlmService; } diff --git a/src/mcpd/tests/chat-service.test.ts b/src/mcpd/tests/chat-service.test.ts index 11bdb07..2b14031 100644 --- a/src/mcpd/tests/chat-service.test.ts +++ b/src/mcpd/tests/chat-service.test.ts @@ -119,17 +119,30 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } | } function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService { + // v4: prepareContext now resolves a pool by effective key. For unit + // tests that pass a single agent.llm we return that one row twice — + // once for getByName (LlmView shape) and once for findByPoolName (raw + // Llm shape with the same name) so the dispatcher's poolCandidates + // ends up with exactly one member, matching pre-v4 behavior. + const baseRow = (name: string): Record => ({ + id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', + url: '', tier: 'fast', description: '', + apiKeySecretId: null, apiKeySecretKey: null, + extraConfig: {}, + poolName: null, + kind: opts.kind ?? 'public', + providerSessionId: null, + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, + version: 1, createdAt: NOW, updatedAt: NOW, + }); return { getByName: vi.fn(async (name: string) => ({ - id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', - url: '', tier: 'fast', description: '', - apiKeyRef: null, extraConfig: {}, - kind: opts.kind ?? 'public', - status: 'active', - lastHeartbeatAt: null, - inactiveSince: null, - version: 1, createdAt: NOW, updatedAt: NOW, + ...baseRow(name), + apiKeyRef: null, })), + findByPoolName: vi.fn(async (poolName: string) => [baseRow(poolName)]), resolveApiKey: vi.fn(async () => 'fake-key'), } as unknown as LlmService; } @@ -604,6 +617,176 @@ describe('ChatService', () => { .toEqual(['thinking via litellm...']); }); + // ── v4: LB pool by shared name ── + + // Helper: build a multi-member mock LlmService where all members share an + // effective pool key. `nameToFail` lets a test mark specific names as + // throwing on dispatch, exercising failover. + function mockLlmsPool(opts: { + pinnedName: string; + poolName: string | null; + members: Array<{ name: string; status?: 'active' | 'inactive'; kind?: 'public' | 'virtual' }>; + }): LlmService { + const baseRow = (m: { name: string; status?: 'active' | 'inactive'; kind?: 'public' | 'virtual' }): Record => ({ + id: `id-${m.name}`, + name: m.name, + type: 'openai', + model: 'qwen3-thinking', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {}, + poolName: opts.poolName, + kind: m.kind ?? 'public', + providerSessionId: null, + status: m.status ?? 'active', + lastHeartbeatAt: null, + inactiveSince: null, + version: 1, + createdAt: NOW, + updatedAt: NOW, + }); + return { + getByName: vi.fn(async (name: string) => { + const m = opts.members.find((x) => x.name === name) ?? opts.members[0]!; + return { ...baseRow(m), apiKeyRef: null }; + }), + findByPoolName: vi.fn(async () => opts.members.map(baseRow)), + resolveApiKey: vi.fn(async () => 'fake-key'), + } as unknown as LlmService; + } + + it('chat dispatches to a pool member and persists the reply (pool size N, primary returns)', async () => { + // Three healthy members; the (random) primary wins on first try. + // Adapter returns the same canned reply regardless of which member + // got picked because in this test we don't differentiate by name — + // we just assert that dispatch goes through and the agent gets a + // reply. Per-member assertion is covered by the failover test below. + const chatRepo = mockChatRepo(); + const adapter = scriptedAdapter([chatCompletion('hello from pool')]); + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1' }, + { name: 'qwen-prod-2' }, + { name: 'qwen-prod-3' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('hello from pool'); + expect(adapter.infer).toHaveBeenCalledTimes(1); + }); + + it('chat fails over to the next pool member when the first throws on dispatch', async () => { + // 3 members; first 2 throw, 3rd succeeds. Verify exactly 3 dispatches + // and the final reply propagates. + const chatRepo = mockChatRepo(); + let call = 0; + const adapter: LlmAdapter = { + kind: 'flaky', + infer: vi.fn(async () => { + call += 1; + if (call < 3) throw new Error(`transport-error-${String(call)}`); + return chatCompletion('survived to the third try').body !== undefined + ? { status: 200, body: chatCompletion('survived to the third try').body } + : (() => { throw new Error('unreachable'); })(); + }), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1' }, + { name: 'qwen-prod-2' }, + { name: 'qwen-prod-3' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('survived to the third try'); + expect(adapter.infer).toHaveBeenCalledTimes(3); + }); + + it('chat throws when every pool member throws (exhausted)', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'all-broken', + infer: vi.fn(async () => { throw new Error('transport-down'); }), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [{ name: 'qwen-prod-1' }, { name: 'qwen-prod-2' }], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/transport-down/); + expect(adapter.infer).toHaveBeenCalledTimes(2); + }); + + it('chat refuses with 404 when every pool member is inactive', async () => { + const chatRepo = mockChatRepo(); + const adapter = scriptedAdapter([chatCompletion('should never run')]); + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1', status: 'inactive' }, + { name: 'qwen-prod-2', status: 'inactive' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/No active Llm in pool/); + expect(adapter.infer).not.toHaveBeenCalled(); + }); + + it('chat picks a healthy sibling when the pinned Llm is itself inactive', async () => { + // The agent pins to qwen-prod-1 which is inactive. qwen-prod-2 is + // active. Pool dispatch must skip the dead pinned row and use the + // sibling instead — that's the whole point of v4. + const chatRepo = mockChatRepo(); + const adapter = scriptedAdapter([chatCompletion('via sibling')]); + const svc = new ChatService( + mockAgents(), + mockLlmsPool({ + pinnedName: 'qwen-prod-1', + poolName: 'qwen-pool', + members: [ + { name: 'qwen-prod-1', status: 'inactive' }, + { name: 'qwen-prod-2', status: 'active' }, + ], + }), + adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('via sibling'); + expect(adapter.infer).toHaveBeenCalledTimes(1); + }); + // Regression: per-agent maxIterations override + clamp. // Found by /gstack-review on 2026-04-25. // Without the clamp, a hostile agent definition with `extras.maxIterations:1000000`