feat(mcpd+db): Llm.poolName + chat dispatcher pool failover (v4 Stage 1)
Adds LB-pool-by-shared-name without introducing a new resource. The existing `Llm.name` stays globally unique; a new optional `poolName` column declares membership in a pool. Multiple Llms sharing a non-null `poolName` stack into one load-balanced pool that the chat dispatcher expands at request time. Effective pool key = `poolName ?? name`. Solo rows (poolName=null) are addressable as a "pool of 1" via their own name, so existing single-Llm agents and YAMLs keep working unchanged. A solo row whose name happens to match an explicit poolName joins the same pool — by design — so an operator can transparently promote an existing Llm to pool seed. Dispatcher (chat.service): prepareContext now resolves a randomly- shuffled list of viable pool candidates (status != inactive) once per turn. runOneInference and streamInference iterate the list on transport-level failure (network, virtual publisher disconnect) until one succeeds or the list is exhausted. Streaming failover only covers "failed before first chunk" — once we've yielded text, we're committed to that backend. Auth/4xx errors surfaced as result.status are NOT retried; siblings with the same key/model would fail identically. When the agent's pinned Llm is itself inactive but a sibling pool member is up, dispatch transparently uses the sibling — that's the whole point. When every member is inactive, prepareContext throws a clear "No active Llm in pool '<key>' (pinned: <name>)" error rather than letting the dispatcher's "exhausted" branch surface it. Tests: - 5 new chat-service tests for pool dispatch / failover / pinned-down / all-inactive (chat-service.test.ts). - 7 new db schema tests for the column, the unique-name invariant, the fallback-to-name semantics, and the solo-name-joins-explicit-pool edge case (llm-pool-schema.test.ts). - mcpd 865/865 (was 860; +5), db pool-schema 7/7, no regressions. Stage 2 (next): HTTP route /api/v1/llms/<name>/members + aggregate pool stats on the existing single-Llm route, CLI POOL column + describe block + --pool-name flag, yaml round-trip.
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
-- v4: optional pool key. When NULL, the effective pool key is the row's
|
||||
-- own `name` (pool of 1, identical to pre-v4 behavior). Multiple Llms
|
||||
-- sharing a non-null `poolName` stack into one load-balanced pool that
|
||||
-- the chat dispatcher expands at request time.
|
||||
ALTER TABLE "Llm" ADD COLUMN "poolName" TEXT;
|
||||
|
||||
-- Index covers both the dispatcher's `WHERE poolName = $1` lookup and
|
||||
-- the v4 admin endpoint `GET /api/v1/llms/<name>/members` (which expands
|
||||
-- by effective pool key).
|
||||
CREATE INDEX "Llm_poolName_idx" ON "Llm"("poolName");
|
||||
@@ -211,6 +211,14 @@ model Llm {
|
||||
apiKeySecretId String? // FK to Secret
|
||||
apiKeySecretKey String? // key inside the Secret's data
|
||||
extraConfig Json @default("{}") // per-type extras
|
||||
// ── LB pool by name (v4) ──
|
||||
// When set, this Llm is part of a load-balanced pool. Multiple Llms
|
||||
// sharing the same `poolName` stack into a pool that the chat dispatcher
|
||||
// expands at request time and picks an active member from. When NULL,
|
||||
// the effective pool key is the row's own `name` (i.e. "pool of 1",
|
||||
// identical to pre-v4 behavior). Agents pin to a single Llm by id; the
|
||||
// dispatcher transparently widens to the pool when this field is set.
|
||||
poolName String?
|
||||
// ── Virtual-provider lifecycle (NULL/default for kind=public) ──
|
||||
kind LlmKind @default(public)
|
||||
providerSessionId String? // mcplocal session that owns this row when virtual
|
||||
@@ -229,6 +237,7 @@ model Llm {
|
||||
@@index([apiKeySecretId])
|
||||
@@index([kind, status])
|
||||
@@index([providerSessionId])
|
||||
@@index([poolName])
|
||||
}
|
||||
|
||||
// ── Groups ──
|
||||
|
||||
129
src/db/tests/llm-pool-schema.test.ts
Normal file
129
src/db/tests/llm-pool-schema.test.ts
Normal file
@@ -0,0 +1,129 @@
|
||||
/**
|
||||
* v4 schema-level tests for `Llm.poolName` and the dispatcher's
|
||||
* `findByPoolName` query semantics. Lives in the db package because it
|
||||
* exercises the actual Prisma column + index — the mcpd-side unit tests
|
||||
* already cover the dispatcher's behavior with a mocked LlmService.
|
||||
*/
|
||||
import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest';
|
||||
import type { PrismaClient } from '@prisma/client';
|
||||
import { setupTestDb, cleanupTestDb, clearAllTables } from './helpers.js';
|
||||
|
||||
/** Re-implementation of the LlmRepository query for direct schema verification. */
|
||||
function findByPoolName(prisma: PrismaClient, poolName: string) {
|
||||
return prisma.llm.findMany({
|
||||
where: {
|
||||
OR: [
|
||||
{ poolName },
|
||||
{ AND: [{ poolName: null }, { name: poolName }] },
|
||||
],
|
||||
},
|
||||
orderBy: { name: 'asc' },
|
||||
});
|
||||
}
|
||||
|
||||
describe('Llm.poolName (v4)', () => {
|
||||
let prisma: PrismaClient;
|
||||
|
||||
beforeAll(async () => {
|
||||
prisma = await setupTestDb();
|
||||
}, 30_000);
|
||||
|
||||
afterAll(async () => {
|
||||
await cleanupTestDb();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await clearAllTables(prisma);
|
||||
});
|
||||
|
||||
it('defaults poolName to NULL for freshly inserted rows', async () => {
|
||||
const llm = await prisma.llm.create({
|
||||
data: { name: 'plain', type: 'openai', model: 'gpt-4o' },
|
||||
});
|
||||
expect(llm.poolName).toBeNull();
|
||||
});
|
||||
|
||||
it('allows multiple rows to share a poolName (the v4 stacking behavior)', async () => {
|
||||
await prisma.llm.create({
|
||||
data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' },
|
||||
});
|
||||
await prisma.llm.create({
|
||||
data: { name: 'qwen-prod-2', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' },
|
||||
});
|
||||
await prisma.llm.create({
|
||||
data: { name: 'qwen-prod-3', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' },
|
||||
});
|
||||
|
||||
const members = await findByPoolName(prisma, 'qwen-pool');
|
||||
expect(members.map((m) => m.name).sort()).toEqual(['qwen-prod-1', 'qwen-prod-2', 'qwen-prod-3']);
|
||||
});
|
||||
|
||||
it('keeps `name` globally unique even when multiple rows share a poolName', async () => {
|
||||
await prisma.llm.create({
|
||||
data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' },
|
||||
});
|
||||
await expect(
|
||||
prisma.llm.create({
|
||||
data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' },
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
|
||||
it('falls back to `name` as the effective pool key for solo rows (poolName=NULL)', async () => {
|
||||
// Solo row addressable via its own name as a "pool of 1".
|
||||
await prisma.llm.create({
|
||||
data: { name: 'gpt-4o', type: 'openai', model: 'gpt-4o' },
|
||||
});
|
||||
const members = await findByPoolName(prisma, 'gpt-4o');
|
||||
expect(members.map((m) => m.name)).toEqual(['gpt-4o']);
|
||||
});
|
||||
|
||||
it('a solo row with name=X joins the same pool as explicit poolName=X members', async () => {
|
||||
// Edge case: an existing solo Llm named "qwen-pool" pre-dates pool
|
||||
// adoption, then a publisher registers with poolName=qwen-pool. Both
|
||||
// should appear in the dispatcher's candidate list — the effective
|
||||
// pool key (poolName ?? name) is "qwen-pool" for each.
|
||||
await prisma.llm.create({
|
||||
data: { name: 'qwen-pool', type: 'openai', model: 'qwen3-thinking' },
|
||||
});
|
||||
await prisma.llm.create({
|
||||
data: { name: 'qwen-prod-2', type: 'openai', model: 'qwen3-thinking', poolName: 'qwen-pool' },
|
||||
});
|
||||
|
||||
const members = await findByPoolName(prisma, 'qwen-pool');
|
||||
expect(members.map((m) => m.name).sort()).toEqual(['qwen-pool', 'qwen-prod-2']);
|
||||
});
|
||||
|
||||
it('does not match a solo row by `name` when its poolName is set to something else', async () => {
|
||||
// Solo with name=foo but poolName=bar should NOT match findByPoolName('foo')
|
||||
// — the explicit poolName takes precedence over the name fallback.
|
||||
await prisma.llm.create({
|
||||
data: { name: 'foo', type: 'openai', model: 'm', poolName: 'bar' },
|
||||
});
|
||||
const members = await findByPoolName(prisma, 'foo');
|
||||
expect(members.map((m) => m.name)).toEqual([]);
|
||||
|
||||
const inBar = await findByPoolName(prisma, 'bar');
|
||||
expect(inBar.map((m) => m.name)).toEqual(['foo']);
|
||||
});
|
||||
|
||||
it('updates poolName via update() and round-trips correctly', async () => {
|
||||
const llm = await prisma.llm.create({
|
||||
data: { name: 'qwen-prod-1', type: 'openai', model: 'qwen3-thinking' },
|
||||
});
|
||||
expect(llm.poolName).toBeNull();
|
||||
|
||||
const updated = await prisma.llm.update({
|
||||
where: { id: llm.id },
|
||||
data: { poolName: 'qwen-pool' },
|
||||
});
|
||||
expect(updated.poolName).toBe('qwen-pool');
|
||||
|
||||
// Revert to solo (NULL).
|
||||
const reverted = await prisma.llm.update({
|
||||
where: { id: llm.id },
|
||||
data: { poolName: null },
|
||||
});
|
||||
expect(reverted.poolName).toBeNull();
|
||||
});
|
||||
});
|
||||
@@ -10,6 +10,8 @@ export interface CreateLlmInput {
|
||||
apiKeySecretId?: string | null;
|
||||
apiKeySecretKey?: string | null;
|
||||
extraConfig?: Record<string, unknown>;
|
||||
// v4: optional pool key. NULL = "pool of 1" (effective key falls back to `name`).
|
||||
poolName?: string | null;
|
||||
// Virtual-provider lifecycle (omit for kind=public).
|
||||
kind?: LlmKind;
|
||||
providerSessionId?: string | null;
|
||||
@@ -27,6 +29,7 @@ export interface UpdateLlmInput {
|
||||
apiKeySecretId?: string | null;
|
||||
apiKeySecretKey?: string | null;
|
||||
extraConfig?: Record<string, unknown>;
|
||||
poolName?: string | null;
|
||||
// Virtual-provider lifecycle. VirtualLlmService is the only writer for
|
||||
// these in v1; the public CRUD path leaves them undefined.
|
||||
kind?: LlmKind;
|
||||
@@ -41,6 +44,13 @@ export interface ILlmRepository {
|
||||
findById(id: string): Promise<Llm | null>;
|
||||
findByName(name: string): Promise<Llm | null>;
|
||||
findByTier(tier: string): Promise<Llm[]>;
|
||||
/**
|
||||
* v4: members of an effective pool, ordered by name. The pool is
|
||||
* defined as: `WHERE poolName = $1 OR (poolName IS NULL AND name = $1)`.
|
||||
* For a non-pooled Llm this returns exactly the row whose own name is
|
||||
* the key. For a pool of N, returns all N members.
|
||||
*/
|
||||
findByPoolName(poolName: string): Promise<Llm[]>;
|
||||
create(data: CreateLlmInput): Promise<Llm>;
|
||||
update(id: string, data: UpdateLlmInput): Promise<Llm>;
|
||||
delete(id: string): Promise<void>;
|
||||
@@ -69,6 +79,22 @@ export class LlmRepository implements ILlmRepository {
|
||||
return this.prisma.llm.findMany({ where: { tier }, orderBy: { name: 'asc' } });
|
||||
}
|
||||
|
||||
async findByPoolName(poolName: string): Promise<Llm[]> {
|
||||
return this.prisma.llm.findMany({
|
||||
where: {
|
||||
OR: [
|
||||
{ poolName },
|
||||
// Solo rows fall back to their own `name` as the effective pool
|
||||
// key. Querying for "qwen-pool" must also pick up a solo row
|
||||
// whose name is "qwen-pool" (poolName=null) — otherwise existing
|
||||
// single-Llm targets would silently disappear from the pool view.
|
||||
{ AND: [{ poolName: null }, { name: poolName }] },
|
||||
],
|
||||
},
|
||||
orderBy: { name: 'asc' },
|
||||
});
|
||||
}
|
||||
|
||||
async create(data: CreateLlmInput): Promise<Llm> {
|
||||
return this.prisma.llm.create({
|
||||
data: {
|
||||
@@ -81,6 +107,7 @@ export class LlmRepository implements ILlmRepository {
|
||||
apiKeySecretId: data.apiKeySecretId ?? null,
|
||||
apiKeySecretKey: data.apiKeySecretKey ?? null,
|
||||
extraConfig: (data.extraConfig ?? {}) as Prisma.InputJsonValue,
|
||||
...(data.poolName !== undefined ? { poolName: data.poolName } : {}),
|
||||
...(data.kind !== undefined ? { kind: data.kind } : {}),
|
||||
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
|
||||
...(data.status !== undefined ? { status: data.status } : {}),
|
||||
@@ -104,6 +131,7 @@ export class LlmRepository implements ILlmRepository {
|
||||
}
|
||||
if (data.apiKeySecretKey !== undefined) updateData.apiKeySecretKey = data.apiKeySecretKey;
|
||||
if (data.extraConfig !== undefined) updateData.extraConfig = data.extraConfig as Prisma.InputJsonValue;
|
||||
if (data.poolName !== undefined) updateData.poolName = data.poolName;
|
||||
if (data.kind !== undefined) updateData.kind = data.kind;
|
||||
if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId;
|
||||
if (data.status !== undefined) updateData.status = data.status;
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
import type { ChatMessage } from '@prisma/client';
|
||||
import type { AgentService } from './agent.service.js';
|
||||
import type { LlmService } from './llm.service.js';
|
||||
import { effectivePoolName } from './llm.service.js';
|
||||
import type { LlmAdapterRegistry } from './llm/dispatcher.js';
|
||||
import type {
|
||||
IChatRepository,
|
||||
@@ -101,6 +102,23 @@ export interface ChatStreamChunk {
|
||||
message?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* v4: a single resolvable pool member. The chat dispatcher iterates a
|
||||
* non-empty list of these on transport-level failure (network, virtual
|
||||
* publisher disconnect, etc.) until one succeeds or the list is exhausted.
|
||||
* For pool size 1 (no `poolName` set on the pinned Llm) the list has one
|
||||
* entry and behavior is identical to pre-v4.
|
||||
*/
|
||||
export interface PoolCandidate {
|
||||
llmName: string;
|
||||
llmType: string;
|
||||
llmKind: 'public' | 'virtual';
|
||||
modelOverride: string;
|
||||
url: string;
|
||||
apiKey: string;
|
||||
extraConfig: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface ChatRequestArgs {
|
||||
agentName: string;
|
||||
threadId?: string;
|
||||
@@ -384,27 +402,54 @@ export class ChatService {
|
||||
* The caller's `parseStreamingChunk` already speaks OpenAI shape, so
|
||||
* downstream code doesn't need to know which path produced the chunks.
|
||||
*/
|
||||
/**
|
||||
* Stream inference with v4 pool failover. We commit to a candidate the
|
||||
* moment the first chunk arrives — partial output cannot be re-streamed
|
||||
* from a different backend without confusing the caller. Failover
|
||||
* therefore covers only "couldn't establish stream" errors (transport
|
||||
* failure pre-first-chunk). After the first yield, exceptions propagate.
|
||||
*/
|
||||
private async *streamInference(ctx: {
|
||||
llmName: string;
|
||||
llmType: string;
|
||||
llmKind: 'public' | 'virtual';
|
||||
modelOverride: string;
|
||||
url: string;
|
||||
apiKey: string;
|
||||
extraConfig: Record<string, unknown>;
|
||||
poolCandidates: PoolCandidate[];
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
toolList: ChatTool[];
|
||||
mergedParams: AgentChatParams;
|
||||
}): AsyncGenerator<{ data: string; done?: boolean }> {
|
||||
if (ctx.llmKind !== 'virtual') {
|
||||
const adapter = this.adapters.get(ctx.llmType);
|
||||
let lastErr: Error | null = null;
|
||||
for (const c of ctx.poolCandidates) {
|
||||
try {
|
||||
yield* this.streamOneCandidate(c, ctx);
|
||||
return;
|
||||
} catch (err) {
|
||||
lastErr = err as Error;
|
||||
// Try the next pool member only if no chunk has been yielded yet.
|
||||
// streamOneCandidate throws *before* its first yield on dispatch
|
||||
// failure; once it begins yielding, any error inside it propagates
|
||||
// out of this `for` loop because the caller has already started
|
||||
// consuming the stream.
|
||||
}
|
||||
}
|
||||
throw lastErr ?? new Error('chat stream dispatch exhausted: no pool candidates');
|
||||
}
|
||||
|
||||
private async *streamOneCandidate(
|
||||
candidate: PoolCandidate,
|
||||
ctx: {
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
toolList: ChatTool[];
|
||||
mergedParams: AgentChatParams;
|
||||
},
|
||||
): AsyncGenerator<{ data: string; done?: boolean }> {
|
||||
if (candidate.llmKind !== 'virtual') {
|
||||
const adapter = this.adapters.get(candidate.llmType);
|
||||
yield* adapter.stream({
|
||||
body: { ...this.buildBody(ctx), stream: true },
|
||||
modelOverride: ctx.modelOverride,
|
||||
apiKey: ctx.apiKey,
|
||||
url: ctx.url,
|
||||
extraConfig: ctx.extraConfig,
|
||||
body: { ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true },
|
||||
modelOverride: candidate.modelOverride,
|
||||
apiKey: candidate.apiKey,
|
||||
url: candidate.url,
|
||||
extraConfig: candidate.extraConfig,
|
||||
});
|
||||
return;
|
||||
}
|
||||
@@ -418,8 +463,8 @@ export class ChatService {
|
||||
// generator drains them in order. ref.done resolves when the
|
||||
// publisher emits its `[DONE]` marker.
|
||||
const ref = await this.virtualLlms.enqueueInferTask(
|
||||
ctx.llmName,
|
||||
{ ...this.buildBody(ctx), stream: true },
|
||||
candidate.llmName,
|
||||
{ ...this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }), stream: true },
|
||||
true,
|
||||
);
|
||||
const queue: Array<{ data: string; done?: boolean }> = [];
|
||||
@@ -453,43 +498,62 @@ export class ChatService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a single non-streaming inference iteration. Branches on
|
||||
* ctx.llmKind: public goes through the existing adapter registry,
|
||||
* virtual relays through VirtualLlmService.enqueueInferTask (mirrors
|
||||
* the same branch in `routes/llm-infer.ts` from v1 Stage 3).
|
||||
*
|
||||
* Throws when virtualLlms isn't wired but the row is virtual — older
|
||||
* test wirings hit this path.
|
||||
* Run a single non-streaming inference iteration with v4 pool failover.
|
||||
* We try each pool candidate in order; on dispatch error (transport
|
||||
* failure, virtual publisher disconnect) we move to the next member.
|
||||
* Auth/4xx errors that the adapter surfaces as a status-code in the
|
||||
* result are NOT retried — those would fail identically on a sibling
|
||||
* with the same key/model, and the cost of one wasted retry per call
|
||||
* is real.
|
||||
*/
|
||||
private async runOneInference(ctx: {
|
||||
llmName: string;
|
||||
llmType: string;
|
||||
llmKind: 'public' | 'virtual';
|
||||
modelOverride: string;
|
||||
url: string;
|
||||
apiKey: string;
|
||||
extraConfig: Record<string, unknown>;
|
||||
poolCandidates: PoolCandidate[];
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
toolList: ChatTool[];
|
||||
mergedParams: AgentChatParams;
|
||||
}): Promise<{ status: number; body: unknown }> {
|
||||
if (ctx.llmKind === 'virtual') {
|
||||
let lastErr: Error | null = null;
|
||||
for (const c of ctx.poolCandidates) {
|
||||
try {
|
||||
return await this.dispatchOneCandidate(c, ctx);
|
||||
} catch (err) {
|
||||
lastErr = err as Error;
|
||||
// Try the next pool member.
|
||||
}
|
||||
}
|
||||
throw lastErr ?? new Error('chat dispatch exhausted: no pool candidates');
|
||||
}
|
||||
|
||||
private async dispatchOneCandidate(
|
||||
candidate: PoolCandidate,
|
||||
ctx: {
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
toolList: ChatTool[];
|
||||
mergedParams: AgentChatParams;
|
||||
},
|
||||
): Promise<{ status: number; body: unknown }> {
|
||||
if (candidate.llmKind === 'virtual') {
|
||||
if (this.virtualLlms === undefined) {
|
||||
throw new Error(
|
||||
'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm',
|
||||
);
|
||||
}
|
||||
const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false);
|
||||
const ref = await this.virtualLlms.enqueueInferTask(
|
||||
candidate.llmName,
|
||||
this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }),
|
||||
false,
|
||||
);
|
||||
return ref.done;
|
||||
}
|
||||
const adapter = this.adapters.get(ctx.llmType);
|
||||
const adapter = this.adapters.get(candidate.llmType);
|
||||
return adapter.infer({
|
||||
body: this.buildBody(ctx),
|
||||
modelOverride: ctx.modelOverride,
|
||||
apiKey: ctx.apiKey,
|
||||
url: ctx.url,
|
||||
extraConfig: ctx.extraConfig,
|
||||
body: this.buildBody({ ...ctx, modelOverride: candidate.modelOverride }),
|
||||
modelOverride: candidate.modelOverride,
|
||||
apiKey: candidate.apiKey,
|
||||
url: candidate.url,
|
||||
extraConfig: candidate.extraConfig,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -497,14 +561,14 @@ export class ChatService {
|
||||
threadId: string;
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
llmName: string;
|
||||
llmType: string;
|
||||
/** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */
|
||||
llmKind: 'public' | 'virtual';
|
||||
modelOverride: string;
|
||||
url: string;
|
||||
apiKey: string;
|
||||
extraConfig: Record<string, unknown>;
|
||||
/**
|
||||
* v4: ordered pool members for this turn. [0] is the candidate to try
|
||||
* first (random shuffle of viable members so load spreads). On
|
||||
* transport-level failure the dispatcher iterates the rest. Always
|
||||
* non-empty — at minimum the agent's pinned Llm (or a sibling pool
|
||||
* member when the pinned row is itself inactive but others are up).
|
||||
*/
|
||||
poolCandidates: PoolCandidate[];
|
||||
mergedParams: AgentChatParams;
|
||||
toolList: ChatTool[];
|
||||
projectId: string | null;
|
||||
@@ -512,8 +576,18 @@ export class ChatService {
|
||||
maxIterations: number;
|
||||
}> {
|
||||
const agent = await this.agents.getByName(args.agentName);
|
||||
const llm = await this.llms.getByName(agent.llm.name);
|
||||
const apiKey = await this.llms.resolveApiKey(agent.llm.name).catch(() => '');
|
||||
const pinned = await this.llms.getByName(agent.llm.name);
|
||||
const poolCandidates = await this.resolvePoolCandidates(pinned);
|
||||
if (poolCandidates.length === 0) {
|
||||
// Pool exists but every member is inactive (publishers offline,
|
||||
// public Llm explicitly disabled, etc.). Surface a clear error
|
||||
// before running through the rest of preparation; otherwise the
|
||||
// chat loop would hit the dispatcher's "exhausted" branch and the
|
||||
// caller would see a less helpful message.
|
||||
throw new NotFoundError(
|
||||
`No active Llm in pool '${effectivePoolName(pinned)}' (pinned: ${pinned.name})`,
|
||||
);
|
||||
}
|
||||
|
||||
const threadId = await this.resolveThreadId(args, agent.id);
|
||||
const projectId = agent.project?.id ?? null;
|
||||
@@ -581,13 +655,7 @@ export class ChatService {
|
||||
threadId,
|
||||
history,
|
||||
systemBlock,
|
||||
llmName: llm.name,
|
||||
llmType: llm.type,
|
||||
llmKind: llm.kind,
|
||||
modelOverride: llm.model,
|
||||
url: llm.url,
|
||||
apiKey,
|
||||
extraConfig: llm.extraConfig,
|
||||
poolCandidates,
|
||||
mergedParams,
|
||||
toolList: filteredTools,
|
||||
projectId,
|
||||
@@ -596,6 +664,40 @@ export class ChatService {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* v4: resolve the load-balanced pool for an agent's pinned Llm. The
|
||||
* pool is "all Llms whose effective key (poolName ?? name) matches the
|
||||
* pinned row's effective key", filtered to viable backends:
|
||||
* - public: always included (no per-row health probe in v4)
|
||||
* - virtual: included when status is 'active' or 'hibernating'
|
||||
* (hibernating members get woken by VirtualLlmService when
|
||||
* dispatched). 'inactive' = publisher offline, exclude.
|
||||
*
|
||||
* Order is randomised so load spreads across members. The pinned row
|
||||
* doesn't get priority — if it happens to be down, a sibling takes
|
||||
* the call without the caller noticing.
|
||||
*/
|
||||
private async resolvePoolCandidates(pinned: { name: string; poolName: string | null }): Promise<PoolCandidate[]> {
|
||||
const poolKey = effectivePoolName(pinned);
|
||||
const rows = await this.llms.findByPoolName(poolKey);
|
||||
const viable = rows.filter((r) => r.status !== 'inactive');
|
||||
// Fisher-Yates is overkill for typical pool sizes (1-5); a
|
||||
// sort-by-random-key is adequate and side-effect-free.
|
||||
const shuffled = [...viable].sort(() => Math.random() - 0.5);
|
||||
return Promise.all(shuffled.map(async (r) => {
|
||||
const apiKey = await this.llms.resolveApiKey(r.name).catch(() => '');
|
||||
return {
|
||||
llmName: r.name,
|
||||
llmType: r.type,
|
||||
llmKind: r.kind as 'public' | 'virtual',
|
||||
modelOverride: r.model,
|
||||
url: r.url,
|
||||
apiKey,
|
||||
extraConfig: r.extraConfig as Record<string, unknown>,
|
||||
};
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves a personality (request override → agent default) and returns
|
||||
* its bound prompt contents in `PersonalityPrompt.priority` desc order.
|
||||
|
||||
@@ -50,6 +50,13 @@ export interface LlmView {
|
||||
description: string;
|
||||
apiKeyRef: ApiKeyRef | null;
|
||||
extraConfig: Record<string, unknown>;
|
||||
/**
|
||||
* v4: explicit pool key. NULL means "pool of 1" — the row's effective
|
||||
* pool key falls back to its own `name`. Multiple rows sharing a non-null
|
||||
* `poolName` stack into one load-balanced pool that the chat dispatcher
|
||||
* expands at request time.
|
||||
*/
|
||||
poolName: string | null;
|
||||
// Virtual-provider lifecycle (kind defaults to 'public' for legacy rows).
|
||||
kind: 'public' | 'virtual';
|
||||
status: 'active' | 'inactive' | 'hibernating';
|
||||
@@ -60,6 +67,16 @@ export interface LlmView {
|
||||
updatedAt: Date;
|
||||
}
|
||||
|
||||
/**
|
||||
* v4: compute the effective pool key for an Llm. When `poolName` is
|
||||
* explicitly set we use it; otherwise we fall back to the row's own
|
||||
* `name` so a non-pooled Llm dispatches as a "pool of 1" without any
|
||||
* branching at the call site.
|
||||
*/
|
||||
export function effectivePoolName(row: { name: string; poolName: string | null }): string {
|
||||
return row.poolName !== null && row.poolName !== '' ? row.poolName : row.name;
|
||||
}
|
||||
|
||||
export class LlmService {
|
||||
constructor(
|
||||
private readonly repo: ILlmRepository,
|
||||
@@ -84,6 +101,20 @@ export class LlmService {
|
||||
return this.toView(row);
|
||||
}
|
||||
|
||||
/**
|
||||
* v4: members of an effective pool. The pool is defined as:
|
||||
* `WHERE poolName = $1 OR (poolName IS NULL AND name = $1)`
|
||||
* — solo Llms (poolName=null) are addressable by their own name as a
|
||||
* pool of 1; explicit pool members share a `poolName`. Returns raw
|
||||
* `Llm` rows because the chat dispatcher needs per-row api key
|
||||
* resolution to build its candidate list — `toView` would force a
|
||||
* round-trip through SecretService for every row, which is wasted
|
||||
* work for a pool of N where most members are about to be skipped.
|
||||
*/
|
||||
async findByPoolName(poolName: string): Promise<Llm[]> {
|
||||
return this.repo.findByPoolName(poolName);
|
||||
}
|
||||
|
||||
async create(input: unknown, opts: { skipAuthCheck?: boolean } = {}): Promise<LlmView> {
|
||||
const data = CreateLlmSchema.parse(input);
|
||||
const existing = await this.repo.findByName(data.name);
|
||||
@@ -117,6 +148,7 @@ export class LlmService {
|
||||
apiKeySecretId: apiKeyFields.id,
|
||||
apiKeySecretKey: apiKeyFields.key,
|
||||
extraConfig: data.extraConfig,
|
||||
...(data.poolName !== undefined ? { poolName: data.poolName } : {}),
|
||||
});
|
||||
return this.toView(row);
|
||||
}
|
||||
@@ -131,6 +163,8 @@ export class LlmService {
|
||||
if (data.tier !== undefined) updateFields.tier = data.tier;
|
||||
if (data.description !== undefined) updateFields.description = data.description;
|
||||
if (data.extraConfig !== undefined) updateFields.extraConfig = data.extraConfig;
|
||||
// poolName: null → explicit unlink (revert to "pool of 1"); string → set; undefined → leave alone.
|
||||
if (data.poolName !== undefined) updateFields.poolName = data.poolName;
|
||||
|
||||
// apiKeyRef: null → explicit unlink; object → replace; undefined → leave alone.
|
||||
if (data.apiKeyRef !== undefined) {
|
||||
@@ -280,6 +314,7 @@ export class LlmService {
|
||||
description: row.description,
|
||||
apiKeyRef,
|
||||
extraConfig: row.extraConfig as Record<string, unknown>,
|
||||
poolName: row.poolName,
|
||||
kind: row.kind,
|
||||
status: row.status,
|
||||
lastHeartbeatAt: row.lastHeartbeatAt,
|
||||
|
||||
@@ -14,6 +14,15 @@ export const ApiKeyRefSchema = z.object({
|
||||
key: z.string().min(1),
|
||||
});
|
||||
|
||||
/**
|
||||
* v4: pool key. Same character set as `name` (lowercase + digits + hyphens)
|
||||
* because at the CLI/yaml level the two share a namespace — agents and
|
||||
* operators read both interchangeably and a stray uppercase or underscore
|
||||
* here would be a confusing footgun. Empty string is rejected; use `null`
|
||||
* (or omit the field) to declare "pool of 1, fall back to name".
|
||||
*/
|
||||
const PoolNameSchema = z.string().min(1).max(100).regex(/^[a-z0-9-]+$/, 'poolName must be lowercase alphanumeric with hyphens');
|
||||
|
||||
export const CreateLlmSchema = z.object({
|
||||
name: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/, 'Name must be lowercase alphanumeric with hyphens'),
|
||||
type: z.enum(LLM_TYPES),
|
||||
@@ -23,6 +32,7 @@ export const CreateLlmSchema = z.object({
|
||||
description: z.string().max(500).default(''),
|
||||
apiKeyRef: ApiKeyRefSchema.optional(),
|
||||
extraConfig: z.record(z.unknown()).default({}),
|
||||
poolName: PoolNameSchema.nullable().optional(),
|
||||
});
|
||||
|
||||
export const UpdateLlmSchema = z.object({
|
||||
@@ -32,6 +42,7 @@ export const UpdateLlmSchema = z.object({
|
||||
description: z.string().max(500).optional(),
|
||||
apiKeyRef: ApiKeyRefSchema.nullable().optional(),
|
||||
extraConfig: z.record(z.unknown()).optional(),
|
||||
poolName: PoolNameSchema.nullable().optional(),
|
||||
});
|
||||
|
||||
export type CreateLlmInput = z.infer<typeof CreateLlmSchema>;
|
||||
|
||||
@@ -71,17 +71,25 @@ function mockAgents(): AgentService {
|
||||
}
|
||||
|
||||
function mockLlmsVirtual(): LlmService {
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => ({
|
||||
const baseRow = (name: string): Record<string, unknown> => ({
|
||||
id: 'llm-1', name, type: 'openai', model: 'fake',
|
||||
url: '', tier: 'fast', description: '',
|
||||
apiKeyRef: null, extraConfig: {},
|
||||
apiKeySecretId: null, apiKeySecretKey: null,
|
||||
extraConfig: {},
|
||||
poolName: null,
|
||||
kind: 'virtual',
|
||||
providerSessionId: null,
|
||||
status: 'active',
|
||||
lastHeartbeatAt: NOW,
|
||||
inactiveSince: null,
|
||||
version: 1, createdAt: NOW, updatedAt: NOW,
|
||||
});
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => ({
|
||||
...baseRow(name),
|
||||
apiKeyRef: null,
|
||||
})),
|
||||
findByPoolName: vi.fn(async (poolName: string) => [baseRow(poolName)]),
|
||||
resolveApiKey: vi.fn(async () => ''),
|
||||
} as unknown as LlmService;
|
||||
}
|
||||
|
||||
@@ -119,17 +119,30 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } |
|
||||
}
|
||||
|
||||
function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService {
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => ({
|
||||
// v4: prepareContext now resolves a pool by effective key. For unit
|
||||
// tests that pass a single agent.llm we return that one row twice —
|
||||
// once for getByName (LlmView shape) and once for findByPoolName (raw
|
||||
// Llm shape with the same name) so the dispatcher's poolCandidates
|
||||
// ends up with exactly one member, matching pre-v4 behavior.
|
||||
const baseRow = (name: string): Record<string, unknown> => ({
|
||||
id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking',
|
||||
url: '', tier: 'fast', description: '',
|
||||
apiKeyRef: null, extraConfig: {},
|
||||
apiKeySecretId: null, apiKeySecretKey: null,
|
||||
extraConfig: {},
|
||||
poolName: null,
|
||||
kind: opts.kind ?? 'public',
|
||||
providerSessionId: null,
|
||||
status: 'active',
|
||||
lastHeartbeatAt: null,
|
||||
inactiveSince: null,
|
||||
version: 1, createdAt: NOW, updatedAt: NOW,
|
||||
});
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => ({
|
||||
...baseRow(name),
|
||||
apiKeyRef: null,
|
||||
})),
|
||||
findByPoolName: vi.fn(async (poolName: string) => [baseRow(poolName)]),
|
||||
resolveApiKey: vi.fn(async () => 'fake-key'),
|
||||
} as unknown as LlmService;
|
||||
}
|
||||
@@ -604,6 +617,176 @@ describe('ChatService', () => {
|
||||
.toEqual(['thinking via litellm...']);
|
||||
});
|
||||
|
||||
// ── v4: LB pool by shared name ──
|
||||
|
||||
// Helper: build a multi-member mock LlmService where all members share an
|
||||
// effective pool key. `nameToFail` lets a test mark specific names as
|
||||
// throwing on dispatch, exercising failover.
|
||||
function mockLlmsPool(opts: {
|
||||
pinnedName: string;
|
||||
poolName: string | null;
|
||||
members: Array<{ name: string; status?: 'active' | 'inactive'; kind?: 'public' | 'virtual' }>;
|
||||
}): LlmService {
|
||||
const baseRow = (m: { name: string; status?: 'active' | 'inactive'; kind?: 'public' | 'virtual' }): Record<string, unknown> => ({
|
||||
id: `id-${m.name}`,
|
||||
name: m.name,
|
||||
type: 'openai',
|
||||
model: 'qwen3-thinking',
|
||||
url: '',
|
||||
tier: 'fast',
|
||||
description: '',
|
||||
apiKeySecretId: null,
|
||||
apiKeySecretKey: null,
|
||||
extraConfig: {},
|
||||
poolName: opts.poolName,
|
||||
kind: m.kind ?? 'public',
|
||||
providerSessionId: null,
|
||||
status: m.status ?? 'active',
|
||||
lastHeartbeatAt: null,
|
||||
inactiveSince: null,
|
||||
version: 1,
|
||||
createdAt: NOW,
|
||||
updatedAt: NOW,
|
||||
});
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => {
|
||||
const m = opts.members.find((x) => x.name === name) ?? opts.members[0]!;
|
||||
return { ...baseRow(m), apiKeyRef: null };
|
||||
}),
|
||||
findByPoolName: vi.fn(async () => opts.members.map(baseRow)),
|
||||
resolveApiKey: vi.fn(async () => 'fake-key'),
|
||||
} as unknown as LlmService;
|
||||
}
|
||||
|
||||
it('chat dispatches to a pool member and persists the reply (pool size N, primary returns)', async () => {
|
||||
// Three healthy members; the (random) primary wins on first try.
|
||||
// Adapter returns the same canned reply regardless of which member
|
||||
// got picked because in this test we don't differentiate by name —
|
||||
// we just assert that dispatch goes through and the agent gets a
|
||||
// reply. Per-member assertion is covered by the failover test below.
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter = scriptedAdapter([chatCompletion('hello from pool')]);
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsPool({
|
||||
pinnedName: 'qwen-prod-1',
|
||||
poolName: 'qwen-pool',
|
||||
members: [
|
||||
{ name: 'qwen-prod-1' },
|
||||
{ name: 'qwen-prod-2' },
|
||||
{ name: 'qwen-prod-3' },
|
||||
],
|
||||
}),
|
||||
adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
|
||||
expect(result.assistant).toBe('hello from pool');
|
||||
expect(adapter.infer).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('chat fails over to the next pool member when the first throws on dispatch', async () => {
|
||||
// 3 members; first 2 throw, 3rd succeeds. Verify exactly 3 dispatches
|
||||
// and the final reply propagates.
|
||||
const chatRepo = mockChatRepo();
|
||||
let call = 0;
|
||||
const adapter: LlmAdapter = {
|
||||
kind: 'flaky',
|
||||
infer: vi.fn(async () => {
|
||||
call += 1;
|
||||
if (call < 3) throw new Error(`transport-error-${String(call)}`);
|
||||
return chatCompletion('survived to the third try').body !== undefined
|
||||
? { status: 200, body: chatCompletion('survived to the third try').body }
|
||||
: (() => { throw new Error('unreachable'); })();
|
||||
}),
|
||||
stream: async function*() { yield { data: '[DONE]', done: true }; },
|
||||
};
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsPool({
|
||||
pinnedName: 'qwen-prod-1',
|
||||
poolName: 'qwen-pool',
|
||||
members: [
|
||||
{ name: 'qwen-prod-1' },
|
||||
{ name: 'qwen-prod-2' },
|
||||
{ name: 'qwen-prod-3' },
|
||||
],
|
||||
}),
|
||||
adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
|
||||
expect(result.assistant).toBe('survived to the third try');
|
||||
expect(adapter.infer).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it('chat throws when every pool member throws (exhausted)', async () => {
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter: LlmAdapter = {
|
||||
kind: 'all-broken',
|
||||
infer: vi.fn(async () => { throw new Error('transport-down'); }),
|
||||
stream: async function*() { yield { data: '[DONE]', done: true }; },
|
||||
};
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsPool({
|
||||
pinnedName: 'qwen-prod-1',
|
||||
poolName: 'qwen-pool',
|
||||
members: [{ name: 'qwen-prod-1' }, { name: 'qwen-prod-2' }],
|
||||
}),
|
||||
adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
|
||||
.rejects.toThrow(/transport-down/);
|
||||
expect(adapter.infer).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('chat refuses with 404 when every pool member is inactive', async () => {
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter = scriptedAdapter([chatCompletion('should never run')]);
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsPool({
|
||||
pinnedName: 'qwen-prod-1',
|
||||
poolName: 'qwen-pool',
|
||||
members: [
|
||||
{ name: 'qwen-prod-1', status: 'inactive' },
|
||||
{ name: 'qwen-prod-2', status: 'inactive' },
|
||||
],
|
||||
}),
|
||||
adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
|
||||
.rejects.toThrow(/No active Llm in pool/);
|
||||
expect(adapter.infer).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('chat picks a healthy sibling when the pinned Llm is itself inactive', async () => {
|
||||
// The agent pins to qwen-prod-1 which is inactive. qwen-prod-2 is
|
||||
// active. Pool dispatch must skip the dead pinned row and use the
|
||||
// sibling instead — that's the whole point of v4.
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter = scriptedAdapter([chatCompletion('via sibling')]);
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsPool({
|
||||
pinnedName: 'qwen-prod-1',
|
||||
poolName: 'qwen-pool',
|
||||
members: [
|
||||
{ name: 'qwen-prod-1', status: 'inactive' },
|
||||
{ name: 'qwen-prod-2', status: 'active' },
|
||||
],
|
||||
}),
|
||||
adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
|
||||
expect(result.assistant).toBe('via sibling');
|
||||
expect(adapter.infer).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
// Regression: per-agent maxIterations override + clamp.
|
||||
// Found by /gstack-review on 2026-04-25.
|
||||
// Without the clamp, a hostile agent definition with `extras.maxIterations:1000000`
|
||||
|
||||
Reference in New Issue
Block a user