Compare commits

...

2 Commits

Author SHA1 Message Date
Michal
0591c62845 feat(mcpd): AgentService virtual methods + GC cascade (v3 Stage 2)
State machine for kind=virtual Agent rows. Mirrors what
VirtualLlmService did for Llms in v1, then wires both lifecycles
together so disconnect/heartbeat/GC cascade through both at once.

AgentRepository:
- create/update accept the new lifecycle fields (kind, providerSessionId,
  status, lastHeartbeatAt, inactiveSince).
- Adds findBySessionId, findByLlmId, findStaleVirtuals, findExpiredInactives.

AgentService — new virtual-agent methods:
- registerVirtualAgents(sessionId, inputs, ownerId) — sticky upsert.
  New names insert as kind=virtual/status=active. Existing virtuals
  owned by the same session reactivate; existing inactive virtuals
  from a foreign session can be adopted (sticky reconnect). Refuses
  to overwrite a public agent or a foreign session's still-active
  virtual (HTTP 409). Pinned LLM is resolved via LlmService — caller
  posts Llms first.
- heartbeatVirtualAgents(sessionId) — bumps owned agents on a session
  heartbeat; revives inactive rows.
- markVirtualAgentsInactiveBySession(sessionId) — disconnect cascade.
- deleteVirtualAgentsForLlm(llmId) — defensive cascade for the GC's
  Llm-delete step (Agent.llmId is Restrict).
- gcSweepVirtualAgents() — same shape as VirtualLlmService.gcSweep
  (90s heartbeat-stale → inactive, 4h inactive → delete).

VirtualLlmService:
- Optional AgentService dependency. heartbeat() now also bumps owned
  agents; unbindSession() flips them inactive. gcSweep() runs the
  agent sweep FIRST (so any agent that would block an Llm delete via
  Restrict is already gone), and adds a defensive
  deleteVirtualAgentsForLlm step right before each Llm delete in case
  an agent's heartbeat lagged its Llm's just enough to escape this
  round's 4h cutoff.

main.ts:
- VirtualLlmService construction moves below AgentService so it can
  receive the cascade dependency.

Tests: 13 new in virtual-agent-service.test.ts cover all the register
variants (insert, sticky reconnect, adopt-inactive-foreign, refuse
public-overwrite, refuse foreign-session-active), heartbeat-revive,
disconnect-cascade, deleteVirtualAgentsForLlm scope, GC sweep flip
+ delete + idempotence, and three VirtualLlmService cascade scenarios
(unbindSession, gcSweep deleting agent before Llm, defensive cascade
when agent's heartbeat lagged).

mcpd suite: 854/854 (was 841 + 13 new). Workspace unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:03:59 +01:00
Michal
8d59b0bf2c feat(db+mcpd): Agent lifecycle + chat.service kind=virtual branch (v3 Stage 1)
Two pieces of v3 plumbing — schema + the latent v1 chat.service bug.

Schema (db):
- Agent gains kind/providerSessionId/lastHeartbeatAt/status/inactiveSince
  mirroring Llm's v1 lifecycle. Reuses LlmKind / LlmStatus enums; no
  new types. Existing rows backfill kind=public/status=active so v1
  CRUD is unaffected.
- @@index([kind, status]) for the GC sweep, @@index([providerSessionId])
  for disconnect-cascade lookups.
- 4 new prisma-level tests cover defaults, persisting virtual fields,
  the (kind, status) GC index, and providerSessionId lookups.
  Total agent-schema tests: 20/20.

chat.service (mcpd) — fixes the v1 latent bug:
- LlmView's kind is now plumbed through prepareContext as ctx.llmKind.
- Two new private helpers, runOneInference / streamInference, branch
  on ctx.llmKind: 'public' goes through the existing adapter
  registry, 'virtual' relays through VirtualLlmService.enqueueInferTask
  (mirrors the route-handler branch from v1 Stage 3).
- Streaming bridges VirtualLlmService's onChunk callback API to an
  async iterator via a small queue + wake pattern.
- ChatService gains an optional virtualLlms constructor parameter;
  main.ts wires it in. Older test wirings without it raise a clear
  "virtualLlms dispatcher not wired" error when the row is virtual,
  rather than silently falling through to the public path against an
  empty URL.

This unblocks any Agent (public OR future v3-virtual) pinned to a
kind=virtual Llm. Pre-this-stage, those agents 502'd against the
empty url field.

Tests: 4 new chat-service-virtual-llm.test.ts cover the relay path
non-streaming, streaming, missing-dispatcher error, and rejection
surfacing. mcpd suite: 841/841 (was 833, +8 across stages 1+v3-Stage-1).
Workspace: 2054/2054 across 153 files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 16:55:02 +01:00
11 changed files with 1142 additions and 33 deletions

View File

@@ -0,0 +1,14 @@
-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the
-- existing LlmKind / LlmStatus enums so we don't double-define them.
-- Existing rows backfill with kind='public' / status='active' so
-- nothing changes for manually-created agents.
ALTER TABLE "Agent"
ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public',
ADD COLUMN "providerSessionId" TEXT,
ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3),
ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active',
ADD COLUMN "inactiveSince" TIMESTAMP(3);
CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status");
CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId");

View File

@@ -479,6 +479,12 @@ model Agent {
proxyModelName String? // optional informational override proxyModelName String? // optional informational override
defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ... defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ...
extras Json @default("{}") // future LoRA / tool-allowlist extras Json @default("{}") // future LoRA / tool-allowlist
// ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ──
kind LlmKind @default(public)
providerSessionId String? // mcplocal session that owns this row when virtual
lastHeartbeatAt DateTime?
status LlmStatus @default(active)
inactiveSince DateTime?
ownerId String ownerId String
version Int @default(1) version Int @default(1)
createdAt DateTime @default(now()) createdAt DateTime @default(now())
@@ -497,6 +503,8 @@ model Agent {
@@index([projectId]) @@index([projectId])
@@index([ownerId]) @@index([ownerId])
@@index([defaultPersonalityId]) @@index([defaultPersonalityId])
@@index([kind, status])
@@index([providerSessionId])
} }
// ── Personalities (named overlay bundles of prompts on top of an Agent) ── // ── Personalities (named overlay bundles of prompts on top of an Agent) ──

View File

@@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => {
expect(reloaded?.defaultPersonalityId).toBeNull(); expect(reloaded?.defaultPersonalityId).toBeNull();
}); });
// ── v3: Agent.kind virtual + lifecycle fields ──
it('defaults a freshly inserted Agent to kind=public, status=active', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-default-kind');
const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id });
expect(agent.kind).toBe('public');
expect(agent.status).toBe('active');
expect(agent.providerSessionId).toBeNull();
expect(agent.lastHeartbeatAt).toBeNull();
expect(agent.inactiveSince).toBeNull();
});
it('persists kind=virtual + lifecycle fields together', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-pub-virtual');
const now = new Date();
const agent = await prisma.agent.create({
data: {
name: 'local-coder',
llmId: llm.id,
ownerId: user.id,
kind: 'virtual',
providerSessionId: 'sess-abc',
lastHeartbeatAt: now,
status: 'active',
},
});
expect(agent.kind).toBe('virtual');
expect(agent.providerSessionId).toBe('sess-abc');
expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime());
});
it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-gc-agent');
await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } });
await prisma.agent.create({
data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' },
});
await prisma.agent.create({
data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() },
});
const stale = await prisma.agent.findMany({
where: { kind: 'virtual', status: 'inactive' },
select: { name: true },
});
expect(stale.map((a) => a.name)).toEqual(['v-inactive']);
});
it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-sess-cascade');
await prisma.agent.create({
data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' },
});
const owned = await prisma.agent.findMany({
where: { providerSessionId: 'shared' },
select: { name: true },
orderBy: { name: 'asc' },
});
expect(owned.map((a) => a.name)).toEqual(['a', 'b']);
});
it('binds the same prompt to multiple personalities of an agent', async () => { it('binds the same prompt to multiple personalities of an agent', async () => {
const user = await makeUser(); const user = await makeUser();
const llm = await makeLlm('llm-shared-prompt'); const llm = await makeLlm('llm-shared-prompt');

View File

@@ -435,10 +435,8 @@ async function main(): Promise<void> {
adapters: llmAdapters, adapters: llmAdapters,
log: { warn: (msg) => app.log.warn(msg) }, log: { warn: (msg) => app.log.warn(msg) },
}); });
// Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker // VirtualLlmService is constructed lower down (after AgentService) so
// is started below after `app.listen` so it doesn't fire before the // it can wire the agent-cascade callbacks introduced in v3 Stage 2.
// server is accepting traffic.
const virtualLlmService = new VirtualLlmService(llmRepo);
// AgentService + ChatService get fully wired below once projectService and // AgentService + ChatService get fully wired below once projectService and
// mcpProxyService are constructed (ChatService needs them via the // mcpProxyService are constructed (ChatService needs them via the
// ChatToolDispatcher bridge). // ChatToolDispatcher bridge).
@@ -465,6 +463,10 @@ async function main(): Promise<void> {
const personalityRepo = new PersonalityRepository(prisma); const personalityRepo = new PersonalityRepository(prisma);
const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo); const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo);
const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo); const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo);
// Virtual-provider state machine (kind=virtual rows for both Llms and
// Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade.
// The 60-s GC ticker is started below after `app.listen`.
const virtualLlmService = new VirtualLlmService(llmRepo, agentService);
// ChatService needs the proxy + project repo via the ChatToolDispatcher // ChatService needs the proxy + project repo via the ChatToolDispatcher
// bridge. The dispatcher's logger references `app.log`, which is not // bridge. The dispatcher's logger references `app.log`, which is not
// constructed until further down — `chatService` itself is built right // constructed until further down — `chatService` itself is built right
@@ -607,6 +609,7 @@ async function main(): Promise<void> {
promptRepo, promptRepo,
chatToolDispatcher, chatToolDispatcher,
personalityRepo, personalityRepo,
virtualLlmService,
); );
registerAgentChatRoutes(app, chatService); registerAgentChatRoutes(app, chatService);
registerLlmInferRoutes(app, { registerLlmInferRoutes(app, {

View File

@@ -1,4 +1,4 @@
import type { PrismaClient, Agent, Prisma } from '@prisma/client'; import type { PrismaClient, Agent, Prisma, LlmKind, LlmStatus } from '@prisma/client';
export interface CreateAgentRepoInput { export interface CreateAgentRepoInput {
name: string; name: string;
@@ -11,6 +11,12 @@ export interface CreateAgentRepoInput {
defaultParams?: Record<string, unknown>; defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>; extras?: Record<string, unknown>;
ownerId: string; ownerId: string;
// Virtual-agent lifecycle (omit for kind=public).
kind?: LlmKind;
providerSessionId?: string | null;
status?: LlmStatus;
lastHeartbeatAt?: Date | null;
inactiveSince?: Date | null;
} }
export interface UpdateAgentRepoInput { export interface UpdateAgentRepoInput {
@@ -22,6 +28,13 @@ export interface UpdateAgentRepoInput {
proxyModelName?: string | null; proxyModelName?: string | null;
defaultParams?: Record<string, unknown>; defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>; extras?: Record<string, unknown>;
// Virtual-agent lifecycle. AgentService is the only public writer; the
// VirtualAgentService methods (Stage 2) bypass the public CRUD path.
kind?: LlmKind;
providerSessionId?: string | null;
status?: LlmStatus;
lastHeartbeatAt?: Date | null;
inactiveSince?: Date | null;
} }
export interface IAgentRepository { export interface IAgentRepository {
@@ -32,6 +45,11 @@ export interface IAgentRepository {
create(data: CreateAgentRepoInput): Promise<Agent>; create(data: CreateAgentRepoInput): Promise<Agent>;
update(id: string, data: UpdateAgentRepoInput): Promise<Agent>; update(id: string, data: UpdateAgentRepoInput): Promise<Agent>;
delete(id: string): Promise<void>; delete(id: string): Promise<void>;
// Virtual-agent lifecycle helpers.
findBySessionId(sessionId: string): Promise<Agent[]>;
findByLlmId(llmId: string): Promise<Agent[]>;
findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]>;
findExpiredInactives(deletionCutoff: Date): Promise<Agent[]>;
} }
export class AgentRepository implements IAgentRepository { export class AgentRepository implements IAgentRepository {
@@ -69,6 +87,11 @@ export class AgentRepository implements IAgentRepository {
defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue, defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue,
extras: (data.extras ?? {}) as Prisma.InputJsonValue, extras: (data.extras ?? {}) as Prisma.InputJsonValue,
ownerId: data.ownerId, ownerId: data.ownerId,
...(data.kind !== undefined ? { kind: data.kind } : {}),
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
...(data.status !== undefined ? { status: data.status } : {}),
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
}, },
}); });
} }
@@ -99,6 +122,11 @@ export class AgentRepository implements IAgentRepository {
if (data.extras !== undefined) { if (data.extras !== undefined) {
updateData.extras = data.extras as Prisma.InputJsonValue; updateData.extras = data.extras as Prisma.InputJsonValue;
} }
if (data.kind !== undefined) updateData.kind = data.kind;
if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId;
if (data.status !== undefined) updateData.status = data.status;
if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt;
if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince;
// Bump optimistic version on every update. // Bump optimistic version on every update.
updateData.version = { increment: 1 }; updateData.version = { increment: 1 };
return this.prisma.agent.update({ where: { id }, data: updateData }); return this.prisma.agent.update({ where: { id }, data: updateData });
@@ -107,4 +135,40 @@ export class AgentRepository implements IAgentRepository {
async delete(id: string): Promise<void> { async delete(id: string): Promise<void> {
await this.prisma.agent.delete({ where: { id } }); await this.prisma.agent.delete({ where: { id } });
} }
// ── Virtual-agent lifecycle queries ──
async findBySessionId(sessionId: string): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: { providerSessionId: sessionId },
orderBy: { name: 'asc' },
});
}
async findByLlmId(llmId: string): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: { llmId },
orderBy: { name: 'asc' },
});
}
async findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: {
kind: 'virtual',
status: 'active',
lastHeartbeatAt: { lt: heartbeatCutoff },
},
});
}
async findExpiredInactives(deletionCutoff: Date): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: {
kind: 'virtual',
status: 'inactive',
inactiveSince: { lt: deletionCutoff },
},
});
}
} }

View File

@@ -33,12 +33,28 @@ export interface AgentView {
proxyModelName: string | null; proxyModelName: string | null;
defaultParams: AgentChatParams; defaultParams: AgentChatParams;
extras: Record<string, unknown>; extras: Record<string, unknown>;
// Virtual-agent lifecycle (defaults make public agents look like "active").
kind: 'public' | 'virtual';
status: 'active' | 'inactive' | 'hibernating';
lastHeartbeatAt: Date | null;
inactiveSince: Date | null;
ownerId: string; ownerId: string;
version: number; version: number;
createdAt: Date; createdAt: Date;
updatedAt: Date; updatedAt: Date;
} }
/** Input shape mcplocal sends per virtual agent on register. */
export interface VirtualAgentInput {
name: string;
llmName: string;
description?: string;
systemPrompt?: string;
project?: string;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
}
export class AgentService { export class AgentService {
constructor( constructor(
private readonly repo: IAgentRepository, private readonly repo: IAgentRepository,
@@ -179,10 +195,162 @@ export class AgentService {
proxyModelName: row.proxyModelName, proxyModelName: row.proxyModelName,
defaultParams: row.defaultParams as AgentChatParams, defaultParams: row.defaultParams as AgentChatParams,
extras: row.extras as Record<string, unknown>, extras: row.extras as Record<string, unknown>,
kind: row.kind,
status: row.status,
lastHeartbeatAt: row.lastHeartbeatAt,
inactiveSince: row.inactiveSince,
ownerId: row.ownerId, ownerId: row.ownerId,
version: row.version, version: row.version,
createdAt: row.createdAt, createdAt: row.createdAt,
updatedAt: row.updatedAt, updatedAt: row.updatedAt,
}; };
} }
// ── Virtual-agent lifecycle (v3) ──
/**
* Sticky upsert of virtual agents owned by a publishing mcplocal session.
* Mirrors VirtualLlmService.register's semantics:
* - New agents → insert with kind=virtual / status=active.
* - Existing virtual agents owned by the same session → update + reactivate.
* - Existing virtual agents owned by a different session, but currently
* inactive → adopt (sticky reconnect after a session lapse).
* - Existing public agents OR foreign-active virtuals → 409 Conflict.
* - Pinned LLM must already exist (publisher posts Llms first in the same
* register payload).
*/
async registerVirtualAgents(
sessionId: string,
inputs: VirtualAgentInput[],
ownerId: string,
): Promise<AgentView[]> {
const now = new Date();
const out: AgentView[] = [];
for (const a of inputs) {
const llm = await this.llms.getByName(a.llmName);
const projectId = a.project !== undefined
? (await this.projects.resolveAndGet(a.project)).id
: null;
const existing = await this.repo.findByName(a.name);
if (existing !== null) {
if (existing.kind === 'public') {
throw Object.assign(
new Error(`Cannot publish over public Agent: ${a.name}`),
{ statusCode: 409 },
);
}
if (existing.providerSessionId !== sessionId && existing.status === 'active') {
throw Object.assign(
new Error(`Virtual Agent '${a.name}' is already active under a different session`),
{ statusCode: 409 },
);
}
const updated = await this.repo.update(existing.id, {
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
llmId: llm.id,
projectId,
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
inactiveSince: null,
});
out.push(await this.toView(updated));
continue;
}
const created = await this.repo.create({
name: a.name,
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
llmId: llm.id,
projectId,
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
ownerId,
});
out.push(await this.toView(created));
}
return out;
}
/**
* Bumps lastHeartbeatAt on every virtual agent owned by the session.
* Revives inactive rows. Called from VirtualLlmService.heartbeat so
* one publisher heartbeat covers both Llms and Agents.
*/
async heartbeatVirtualAgents(sessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(sessionId);
if (owned.length === 0) return;
const now = new Date();
for (const row of owned) {
await this.repo.update(row.id, {
lastHeartbeatAt: now,
...(row.status === 'inactive' ? { status: 'active', inactiveSince: null } : {}),
});
}
}
/** Flip every virtual agent owned by the session to inactive immediately. */
async markVirtualAgentsInactiveBySession(sessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(sessionId);
const now = new Date();
for (const row of owned) {
if (row.status === 'active') {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
}
}
}
/**
* Cascade-delete virtual agents pinned to a virtual Llm. Called from
* VirtualLlmService.gcSweep BEFORE deleting the inactive Llm row, since
* Agent.llmId is `onDelete: Restrict` and would otherwise block the
* Llm delete.
*/
async deleteVirtualAgentsForLlm(llmId: string): Promise<number> {
const pinned = await this.repo.findByLlmId(llmId);
let deleted = 0;
for (const row of pinned) {
if (row.kind !== 'virtual') continue;
await this.repo.delete(row.id);
deleted += 1;
}
return deleted;
}
/**
* GC sweep for virtual agents — same shape as VirtualLlmService.gcSweep:
* 1. Heartbeat-stale active virtuals → inactive (90-s cutoff).
* 2. 4-h-old inactive virtuals → delete.
* Run BEFORE the LlmService GC sweep so any agent that would have
* blocked an Llm delete via Restrict has already been cleared.
*/
async gcSweepVirtualAgents(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> {
const HEARTBEAT_TIMEOUT_MS = 90_000;
const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000;
let markedInactive = 0;
let deleted = 0;
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
for (const row of stale) {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
markedInactive += 1;
}
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
const expired = await this.repo.findExpiredInactives(deletionCutoff);
for (const row of expired) {
await this.repo.delete(row.id);
deleted += 1;
}
return { markedInactive, deleted };
}
} }

View File

@@ -31,6 +31,7 @@ import type {
} from '../repositories/chat.repository.js'; } from '../repositories/chat.repository.js';
import type { IPromptRepository } from '../repositories/prompt.repository.js'; import type { IPromptRepository } from '../repositories/prompt.repository.js';
import type { IPersonalityRepository } from '../repositories/personality.repository.js'; import type { IPersonalityRepository } from '../repositories/personality.repository.js';
import type { IVirtualLlmService } from './virtual-llm.service.js';
import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js'; import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js';
import type { AgentChatParams } from '../validation/agent.schema.js'; import type { AgentChatParams } from '../validation/agent.schema.js';
import { NotFoundError } from './mcp-server.service.js'; import { NotFoundError } from './mcp-server.service.js';
@@ -132,6 +133,14 @@ export class ChatService {
private readonly promptRepo: IPromptRepository, private readonly promptRepo: IPromptRepository,
private readonly tools: ChatToolDispatcher, private readonly tools: ChatToolDispatcher,
private readonly personalities?: IPersonalityRepository, private readonly personalities?: IPersonalityRepository,
/**
* v3: when an Agent is pinned to a `kind=virtual` Llm, inference is
* relayed via this service rather than an HTTP adapter (the virtual
* row has no upstream URL). Optional so older test wirings still
* compile; in those tests the chat path will refuse virtual Llms
* with a clear error.
*/
private readonly virtualLlms?: IVirtualLlmService,
) {} ) {}
async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> { async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> {
@@ -170,14 +179,7 @@ export class ChatService {
let lastTurnIndex = ctx.startingTurnIndex; let lastTurnIndex = ctx.startingTurnIndex;
try { try {
for (let i = 0; i < ctx.maxIterations; i += 1) { for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType); const result = await this.runOneInference(ctx);
const result = await adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
const choice = extractChoice(result.body); const choice = extractChoice(result.body);
if (choice === null) { if (choice === null) {
throw new Error(`Adapter returned no choice (status ${String(result.status)})`); throw new Error(`Adapter returned no choice (status ${String(result.status)})`);
@@ -240,19 +242,12 @@ export class ChatService {
const ctx = await this.prepareContext(args); const ctx = await this.prepareContext(args);
try { try {
for (let i = 0; i < ctx.maxIterations; i += 1) { for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = { const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = {
content: '', content: '',
toolCalls: [], toolCalls: [],
}; };
let finishReason: string | null = null; let finishReason: string | null = null;
for await (const chunk of adapter.stream({ for await (const chunk of this.streamInference(ctx)) {
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
})) {
if (chunk.done === true) break; if (chunk.done === true) break;
if (chunk.data === '[DONE]') break; if (chunk.data === '[DONE]') break;
const evt = parseStreamingChunk(chunk.data); const evt = parseStreamingChunk(chunk.data);
@@ -347,12 +342,130 @@ export class ChatService {
} }
} }
/**
* Streaming counterpart of runOneInference. Yields raw OpenAI-style
* SSE chunks ({data: string; done?: boolean}) regardless of whether
* we're hitting a public adapter or relaying through VirtualLlmService.
* The caller's `parseStreamingChunk` already speaks OpenAI shape, so
* downstream code doesn't need to know which path produced the chunks.
*/
private async *streamInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): AsyncGenerator<{ data: string; done?: boolean }> {
if (ctx.llmKind !== 'virtual') {
const adapter = this.adapters.get(ctx.llmType);
yield* adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
return;
}
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm',
);
}
// Bridge VirtualLlmService's onChunk callback API to an async
// iterator. Chunks land on the queue from the SSE relay; the
// generator drains them in order. ref.done resolves when the
// publisher emits its `[DONE]` marker.
const ref = await this.virtualLlms.enqueueInferTask(
ctx.llmName,
{ ...this.buildBody(ctx), stream: true },
true,
);
const queue: Array<{ data: string; done?: boolean }> = [];
let resolveTick: (() => void) | null = null;
const wake = (): void => {
const r = resolveTick;
resolveTick = null;
if (r !== null) r();
};
const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); });
let finished = false;
let failure: Error | null = null;
ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); });
try {
while (true) {
while (queue.length > 0) {
const c = queue.shift()!;
yield c;
if (c.done === true) return;
}
if (finished) {
if (failure !== null) throw failure;
return;
}
await new Promise<void>((r) => { resolveTick = r; });
}
} finally {
unsubscribe();
}
}
/**
* Run a single non-streaming inference iteration. Branches on
* ctx.llmKind: public goes through the existing adapter registry,
* virtual relays through VirtualLlmService.enqueueInferTask (mirrors
* the same branch in `routes/llm-infer.ts` from v1 Stage 3).
*
* Throws when virtualLlms isn't wired but the row is virtual — older
* test wirings hit this path.
*/
private async runOneInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): Promise<{ status: number; body: unknown }> {
if (ctx.llmKind === 'virtual') {
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm',
);
}
const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false);
return ref.done;
}
const adapter = this.adapters.get(ctx.llmType);
return adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
}
private async prepareContext(args: ChatRequestArgs): Promise<{ private async prepareContext(args: ChatRequestArgs): Promise<{
threadId: string; threadId: string;
history: OpenAiMessage[]; history: OpenAiMessage[];
systemBlock: string; systemBlock: string;
llmName: string; llmName: string;
llmType: string; llmType: string;
/** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */
llmKind: 'public' | 'virtual';
modelOverride: string; modelOverride: string;
url: string; url: string;
apiKey: string; apiKey: string;
@@ -435,6 +548,7 @@ export class ChatService {
systemBlock, systemBlock,
llmName: llm.name, llmName: llm.name,
llmType: llm.type, llmType: llm.type,
llmKind: llm.kind,
modelOverride: llm.model, modelOverride: llm.model,
url: llm.url, url: llm.url,
apiKey, apiKey,

View File

@@ -28,6 +28,7 @@ import { randomUUID } from 'node:crypto';
import type { ILlmRepository } from '../repositories/llm.repository.js'; import type { ILlmRepository } from '../repositories/llm.repository.js';
import type { OpenAiChatRequest } from './llm/types.js'; import type { OpenAiChatRequest } from './llm/types.js';
import { NotFoundError } from './mcp-server.service.js'; import { NotFoundError } from './mcp-server.service.js';
import type { AgentService } from './agent.service.js';
/** A virtual provider's announcement at registration time. */ /** A virtual provider's announcement at registration time. */
export interface RegisterProviderInput { export interface RegisterProviderInput {
@@ -119,7 +120,16 @@ export class VirtualLlmService implements IVirtualLlmService {
*/ */
private readonly wakeInFlight = new Map<string, Promise<void>>(); private readonly wakeInFlight = new Map<string, Promise<void>>();
constructor(private readonly repo: ILlmRepository) {} constructor(
private readonly repo: ILlmRepository,
/**
* Optional. v3 wires AgentService here so the lifecycle cascades:
* heartbeat → bump owned agents; disconnect → mark agents inactive;
* gcSweep → sweep agents first, then delete pinned-to-Llm cascade
* before deleting the Llm itself (Agent.llmId is Restrict).
*/
private readonly agents?: AgentService,
) {}
async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> { async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> {
const sessionId = input.providerSessionId ?? randomUUID(); const sessionId = input.providerSessionId ?? randomUUID();
@@ -184,7 +194,6 @@ export class VirtualLlmService implements IVirtualLlmService {
async heartbeat(providerSessionId: string): Promise<void> { async heartbeat(providerSessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(providerSessionId); const owned = await this.repo.findBySessionId(providerSessionId);
if (owned.length === 0) return;
const now = new Date(); const now = new Date();
for (const row of owned) { for (const row of owned) {
// Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a // Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a
@@ -196,6 +205,11 @@ export class VirtualLlmService implements IVirtualLlmService {
: {}), : {}),
}); });
} }
// Cascade to virtual agents owned by the same session — same heartbeat
// covers them. No-op if AgentService isn't wired (older test configs).
if (this.agents !== undefined) {
await this.agents.heartbeatVirtualAgents(providerSessionId);
}
} }
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void { bindSession(providerSessionId: string, handle: VirtualSessionHandle): void {
@@ -214,6 +228,10 @@ export class VirtualLlmService implements IVirtualLlmService {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
} }
} }
// Cascade to virtual agents owned by the same session.
if (this.agents !== undefined) {
await this.agents.markVirtualAgentsInactiveBySession(providerSessionId);
}
// Reject any in-flight tasks for this session — the relay can't deliver // Reject any in-flight tasks for this session — the relay can't deliver
// a result POST anymore. // a result POST anymore.
for (const t of this.tasksById.values()) { for (const t of this.tasksById.values()) {
@@ -405,6 +423,16 @@ export class VirtualLlmService implements IVirtualLlmService {
let markedInactive = 0; let markedInactive = 0;
let deleted = 0; let deleted = 0;
// v3: sweep virtual agents FIRST so any Llm-pinned agent that's
// about to be cascaded (because its Llm is also expiring) is gone
// before we attempt to delete the Llm. Agent.llmId is Restrict and
// would otherwise block.
if (this.agents !== undefined) {
const agentSweep = await this.agents.gcSweepVirtualAgents(now);
markedInactive += agentSweep.markedInactive;
deleted += agentSweep.deleted;
}
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
for (const row of stale) { for (const row of stale) {
@@ -415,6 +443,13 @@ export class VirtualLlmService implements IVirtualLlmService {
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
const expired = await this.repo.findExpiredInactives(deletionCutoff); const expired = await this.repo.findExpiredInactives(deletionCutoff);
for (const row of expired) { for (const row of expired) {
// Final defensive cascade: drop any virtual agents still pinned
// to this Llm (e.g. their lastHeartbeatAt happens to lag the
// Llm's by a few seconds and they didn't make this round's
// 4-h cutoff). Without this we'd hit a Restrict FK error.
if (this.agents !== undefined) {
await this.agents.deleteVirtualAgentsForLlm(row.id);
}
await this.repo.delete(row.id); await this.repo.delete(row.id);
deleted += 1; deleted += 1;
} }

View File

@@ -0,0 +1,251 @@
import { describe, it, expect, vi } from 'vitest';
import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js';
import type { AgentService } from '../src/services/agent.service.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
import type { IChatRepository } from '../src/repositories/chat.repository.js';
import type { IPromptRepository } from '../src/repositories/prompt.repository.js';
import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { ChatMessage, ChatThread, Prompt } from '@prisma/client';
const NOW = new Date();
/**
* Tests targeting v3 Stage 1's chat.service kind=virtual branch.
* Mirror the existing chat-service.test.ts patterns but isolate the
* adapter-vs-relay dispatch decision.
*/
function mockChatRepo(): IChatRepository {
const msgs: ChatMessage[] = [];
const threads: ChatThread[] = [];
let idCounter = 1;
return {
createThread: vi.fn(async ({ agentId, ownerId, title }) => {
const t: ChatThread = {
id: `thread-${String(idCounter++)}`, agentId, ownerId,
title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW,
};
threads.push(t);
return t;
}),
findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null),
listThreadsByAgent: vi.fn(async () => []),
listMessages: vi.fn(async () => []),
appendMessage: vi.fn(async (input) => {
const m: ChatMessage = {
id: `msg-${String(idCounter++)}`,
threadId: input.threadId,
turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length,
role: input.role,
content: input.content,
toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'],
toolCallId: input.toolCallId ?? null,
status: input.status ?? 'complete',
createdAt: NOW,
};
msgs.push(m);
return m;
}),
updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)),
markPendingAsError: vi.fn(async () => 0),
touchThread: vi.fn(async () => undefined),
nextTurnIndex: vi.fn(async () => msgs.length),
};
}
function mockAgents(): AgentService {
return {
getByName: vi.fn(async (name: string) => ({
id: `agent-${name}`, name, description: '',
systemPrompt: 'You are a helpful agent.',
llm: { id: 'llm-1', name: 'vllm-local' },
project: null,
defaultPersonality: null,
proxyModelName: null,
defaultParams: {},
extras: {},
ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW,
})),
} as unknown as AgentService;
}
function mockLlmsVirtual(): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'fake',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: 'virtual',
status: 'active',
lastHeartbeatAt: NOW,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => ''),
} as unknown as LlmService;
}
function mockPromptRepo(): IPromptRepository {
return {
findAll: vi.fn(async () => []),
findGlobal: vi.fn(async () => []),
findByAgent: vi.fn(async () => []),
findById: vi.fn(async () => null),
findByNameAndProject: vi.fn(async () => null),
findByNameAndAgent: vi.fn(async () => null),
create: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
};
}
function mockTools(): ChatToolDispatcher {
return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) };
}
function emptyAdapterRegistry(): LlmAdapterRegistry {
return {
get: () => { throw new Error('adapter should not be used for kind=virtual'); },
} as unknown as LlmAdapterRegistry;
}
function mockVirtualLlms(opts: {
reply?: string;
rejectWith?: Error;
streamingChunks?: string[];
}): IVirtualLlmService {
const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => {
if (opts.rejectWith !== undefined) {
return {
taskId: 't-1',
done: Promise.reject(opts.rejectWith),
onChunk: () => () => undefined,
};
}
if (!streaming) {
const body = {
choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }],
};
return {
taskId: 't-1',
done: Promise.resolve({ status: 200, body }),
onChunk: () => () => undefined,
};
}
// Streaming path: collect subscribers, push the configured chunks
// synchronously, then resolve done.
const subs = new Set<(c: { data: string; done?: boolean }) => void>();
const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}'];
return {
taskId: 't-1',
done: (async (): Promise<{ status: number; body: unknown }> => {
// Wait long enough for the caller to register subscribers
// before fanning chunks. Promise.resolve() isn't enough — the
// microtask running this IIFE is queued ahead of the caller's
// await on enqueueInferTask, so subs would still be empty.
await new Promise((r) => setTimeout(r, 0));
for (const c of chunks) for (const s of subs) s({ data: c });
for (const s of subs) s({ data: '[DONE]', done: true });
return { status: 200, body: null };
})(),
onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); },
};
});
return {
register: vi.fn(),
heartbeat: vi.fn(),
bindSession: vi.fn(),
unbindSession: vi.fn(),
enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'],
completeTask: vi.fn(),
pushTaskChunk: vi.fn(),
failTask: vi.fn(),
gcSweep: vi.fn(),
};
}
describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({ reply: 'hello back from local' });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toBe('hello back from local');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array) }),
false,
);
});
it('streaming relays through VirtualLlmService and emits the same text deltas', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({
streamingChunks: [
'{"choices":[{"delta":{"content":"hello "}}]}',
'{"choices":[{"delta":{"content":"world"}}]}',
],
});
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const deltas: string[] = [];
for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
if (evt.type === 'text') deltas.push(evt.delta);
if (evt.type === 'final') break;
}
expect(deltas.join('')).toBe('hello world');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array), stream: true }),
true,
);
});
it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => {
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
// no personalities, no virtualLlms
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/virtualLlms dispatcher not wired/);
});
it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => {
const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/publisher offline/);
});
});

View File

@@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } |
} as unknown as AgentService; } as unknown as AgentService;
} }
function mockLlms(): LlmService { function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService {
return { return {
getByName: vi.fn(async (name: string) => ({ getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking',
url: '', tier: 'fast', description: '', url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {}, apiKeyRef: null, extraConfig: {},
kind: opts.kind ?? 'public',
status: 'active',
lastHeartbeatAt: null,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW, version: 1, createdAt: NOW, updatedAt: NOW,
})), })),
resolveApiKey: vi.fn(async () => 'fake-key'), resolveApiKey: vi.fn(async () => 'fake-key'),

View File

@@ -0,0 +1,376 @@
import { describe, it, expect, vi } from 'vitest';
import { AgentService, type VirtualAgentInput } from '../src/services/agent.service.js';
import { VirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { IAgentRepository } from '../src/repositories/agent.repository.js';
import type { ILlmRepository } from '../src/repositories/llm.repository.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { ProjectService } from '../src/services/project.service.js';
import type { Agent, Llm } from '@prisma/client';
/**
* v3 Stage 2 — virtual-agent lifecycle methods on AgentService and the
* cascade callbacks wired into VirtualLlmService.gcSweep / heartbeat /
* unbindSession. Mirrors the shape of virtual-llm-service.test.ts but
* focused on the agent-side state machine + the Llm→Agent cascade.
*/
const NOW = new Date();
function makeAgent(overrides: Partial<Agent> = {}): Agent {
return {
id: `agent-${Math.random().toString(36).slice(2, 8)}`,
name: 'fake-agent',
description: '',
systemPrompt: '',
llmId: 'llm-1',
projectId: null,
defaultPersonalityId: null,
proxyModelName: null,
defaultParams: {} as Agent['defaultParams'],
extras: {} as Agent['extras'],
kind: 'virtual',
providerSessionId: 'sess-1',
lastHeartbeatAt: NOW,
status: 'active',
inactiveSince: null,
ownerId: 'owner-1',
version: 1,
createdAt: NOW,
updatedAt: NOW,
...overrides,
};
}
function makeLlm(overrides: Partial<Llm> = {}): Llm {
return {
id: `llm-${Math.random().toString(36).slice(2, 8)}`,
name: 'vllm-local',
type: 'openai',
model: 'm',
url: '',
tier: 'fast',
description: '',
apiKeySecretId: null,
apiKeySecretKey: null,
extraConfig: {} as Llm['extraConfig'],
kind: 'virtual',
providerSessionId: 'sess-1',
lastHeartbeatAt: NOW,
status: 'active',
inactiveSince: null,
version: 1,
createdAt: NOW,
updatedAt: NOW,
...overrides,
};
}
function mockAgentRepo(initial: Agent[] = []): IAgentRepository {
const rows = new Map<string, Agent>(initial.map((r) => [r.id, r]));
let counter = rows.size;
return {
findAll: vi.fn(async () => [...rows.values()]),
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
findByName: vi.fn(async (name: string) => {
for (const r of rows.values()) if (r.name === name) return r;
return null;
}),
findByProjectId: vi.fn(async () => []),
findBySessionId: vi.fn(async (sid: string) =>
[...rows.values()].filter((r) => r.providerSessionId === sid)),
findByLlmId: vi.fn(async (llmId: string) =>
[...rows.values()].filter((r) => r.llmId === llmId)),
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'active'
&& r.lastHeartbeatAt !== null
&& r.lastHeartbeatAt < cutoff)),
findExpiredInactives: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'inactive'
&& r.inactiveSince !== null
&& r.inactiveSince < cutoff)),
create: vi.fn(async (data) => {
counter += 1;
const row = makeAgent({
id: `agent-${String(counter)}`,
name: data.name,
description: data.description ?? '',
systemPrompt: data.systemPrompt ?? '',
llmId: data.llmId,
projectId: data.projectId ?? null,
kind: data.kind ?? 'public',
providerSessionId: data.providerSessionId ?? null,
status: data.status ?? 'active',
lastHeartbeatAt: data.lastHeartbeatAt ?? null,
inactiveSince: data.inactiveSince ?? null,
ownerId: data.ownerId,
});
rows.set(row.id, row);
return row;
}),
update: vi.fn(async (id, data) => {
const existing = rows.get(id);
if (!existing) throw new Error('not found');
const next: Agent = {
...existing,
...(data.description !== undefined ? { description: data.description } : {}),
...(data.systemPrompt !== undefined ? { systemPrompt: data.systemPrompt } : {}),
...(data.llmId !== undefined ? { llmId: data.llmId } : {}),
...(data.projectId !== undefined ? { projectId: data.projectId } : {}),
...(data.kind !== undefined ? { kind: data.kind } : {}),
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
...(data.status !== undefined ? { status: data.status } : {}),
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
};
rows.set(id, next);
return next;
}),
delete: vi.fn(async (id: string) => { rows.delete(id); }),
};
}
function mockLlms(): LlmService {
return {
getById: vi.fn(async (id: string) => ({ id, name: 'vllm-local', type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
} as unknown as LlmService;
}
function mockProjects(): ProjectService {
return {
getById: vi.fn(async (id: string) => ({ id, name: 'mcpctl-dev' })),
resolveAndGet: vi.fn(async (idOrName: string) => ({
id: idOrName === 'mcpctl-dev' ? 'proj-1' : 'proj-other',
name: idOrName,
})),
} as unknown as ProjectService;
}
describe('AgentService — virtual-agent lifecycle (v3 Stage 2)', () => {
it('registerVirtualAgents inserts new rows with kind=virtual / status=active', async () => {
const repo = mockAgentRepo();
const svc = new AgentService(repo, mockLlms(), mockProjects());
const inputs: VirtualAgentInput[] = [
{ name: 'local-coder', llmName: 'vllm-local', description: 'd', systemPrompt: 's' },
];
const out = await svc.registerVirtualAgents('sess-1', inputs, 'owner-1');
expect(out).toHaveLength(1);
expect(out[0]!.kind).toBe('virtual');
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents reuses an existing row from the same session (sticky reconnect)', async () => {
const existing = makeAgent({ name: 'local-coder', providerSessionId: 'sess-1', status: 'inactive', inactiveSince: NOW });
const repo = mockAgentRepo([existing]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const out = await svc.registerVirtualAgents(
'sess-1',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
);
expect(out[0]!.id).toBe(existing.id);
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents adopts an inactive virtual from a different session', async () => {
const existing = makeAgent({
name: 'local-coder', providerSessionId: 'old-session',
status: 'inactive', inactiveSince: NOW,
});
const repo = mockAgentRepo([existing]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const out = await svc.registerVirtualAgents(
'new-session',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
);
expect(out[0]!.id).toBe(existing.id);
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents refuses to overwrite a public agent (409)', async () => {
const repo = mockAgentRepo([makeAgent({ name: 'reviewer', kind: 'public', providerSessionId: null })]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await expect(svc.registerVirtualAgents(
'sess-x',
[{ name: 'reviewer', llmName: 'vllm-local' }],
'owner-1',
)).rejects.toThrow(/Cannot publish over public Agent/);
});
it('registerVirtualAgents refuses if another active session owns the name', async () => {
const repo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'other', status: 'active' })]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await expect(svc.registerVirtualAgents(
'mine',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
)).rejects.toThrow(/already active under a different session/);
});
it('heartbeatVirtualAgents bumps + revives inactive', async () => {
const past = new Date(Date.now() - 5_000);
const a = makeAgent({ name: 'a', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past });
const repo = mockAgentRepo([a]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await svc.heartbeatVirtualAgents('sess');
const row = await repo.findByName('a');
expect(row?.status).toBe('active');
expect(row?.inactiveSince).toBeNull();
expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
});
it('markVirtualAgentsInactiveBySession flips owned actives to inactive', async () => {
const repo = mockAgentRepo([
makeAgent({ name: 'a', providerSessionId: 'sess' }),
makeAgent({ name: 'b', providerSessionId: 'sess' }),
makeAgent({ name: 'c', providerSessionId: 'other' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await svc.markVirtualAgentsInactiveBySession('sess');
expect((await repo.findByName('a'))?.status).toBe('inactive');
expect((await repo.findByName('b'))?.status).toBe('inactive');
expect((await repo.findByName('c'))?.status).toBe('active');
});
it('deleteVirtualAgentsForLlm deletes only virtuals pinned to that Llm', async () => {
const repo = mockAgentRepo([
makeAgent({ name: 'v-1', llmId: 'doomed', kind: 'virtual' }),
makeAgent({ name: 'v-2', llmId: 'doomed', kind: 'virtual' }),
makeAgent({ name: 'pub-1', llmId: 'doomed', kind: 'public', providerSessionId: null }),
makeAgent({ name: 'v-other', llmId: 'safe', kind: 'virtual' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const deleted = await svc.deleteVirtualAgentsForLlm('doomed');
expect(deleted).toBe(2);
expect(await repo.findByName('v-1')).toBeNull();
expect(await repo.findByName('v-2')).toBeNull();
expect(await repo.findByName('pub-1')).not.toBeNull();
expect(await repo.findByName('v-other')).not.toBeNull();
});
it('gcSweepVirtualAgents flips heartbeat-stale + deletes 4h-old inactive', async () => {
const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago, past 90s cutoff
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago, past 4h cutoff
const repo = mockAgentRepo([
makeAgent({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }),
makeAgent({ name: 'old', providerSessionId: 'b', status: 'inactive', inactiveSince: ancient }),
makeAgent({ name: 'pub', providerSessionId: null, kind: 'public' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const r = await svc.gcSweepVirtualAgents();
expect(r.markedInactive).toBe(1);
expect(r.deleted).toBe(1);
expect((await repo.findByName('stale'))?.status).toBe('inactive');
expect(await repo.findByName('old')).toBeNull();
expect(await repo.findByName('pub')).not.toBeNull();
});
});
describe('VirtualLlmService cascade through AgentService (v3 Stage 2)', () => {
function mockLlmRepo(initial: Llm[] = []): ILlmRepository {
const rows = new Map<string, Llm>(initial.map((r) => [r.id, r]));
let counter = rows.size;
return {
findAll: vi.fn(async () => [...rows.values()]),
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
findByName: vi.fn(async (name: string) => {
for (const r of rows.values()) if (r.name === name) return r;
return null;
}),
findByTier: vi.fn(async () => []),
findBySessionId: vi.fn(async (sid: string) =>
[...rows.values()].filter((r) => r.providerSessionId === sid)),
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'active'
&& r.lastHeartbeatAt !== null
&& r.lastHeartbeatAt < cutoff)),
findExpiredInactives: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'inactive'
&& r.inactiveSince !== null
&& r.inactiveSince < cutoff)),
create: vi.fn(async (data) => {
counter += 1;
const row = makeLlm({ id: `llm-${String(counter)}`, name: data.name, type: data.type });
rows.set(row.id, row);
return row;
}),
update: vi.fn(async (id, data) => {
const existing = rows.get(id);
if (!existing) throw new Error('not found');
const next: Llm = { ...existing, ...data } as Llm;
rows.set(id, next);
return next;
}),
delete: vi.fn(async (id: string) => { rows.delete(id); }),
};
}
it('unbindSession cascades to mark virtual agents inactive', async () => {
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'local-coder', providerSessionId: 'sess' }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.unbindSession('sess');
expect((await agentRepo.findByName('local-coder'))?.status).toBe('inactive');
});
it('gcSweep deletes virtual agents BEFORE their pinned virtual Llm', async () => {
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
const llmRepo = mockLlmRepo([makeLlm({
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
status: 'inactive', inactiveSince: ancient,
})]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: ancient }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
const r = await svc.gcSweep();
expect(r.deleted).toBeGreaterThanOrEqual(2); // 1 agent + 1 llm
expect(await llmRepo.findByName('vllm-local')).toBeNull();
expect(await agentRepo.findByName('pinned')).toBeNull();
});
it('gcSweep defensive cascade: still drops the agent when its heartbeat lagged the Llm', async () => {
// The Llm is past the 4h cutoff. The agent is inactive but only
// 1h old — wouldn't be GC'd by gcSweepVirtualAgents on its own.
// The defensive cascade in gcSweep deletes it anyway because the
// Restrict FK would otherwise block the Llm delete.
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
const recent = new Date(Date.now() - 1 * 60 * 60 * 1000);
const llmRepo = mockLlmRepo([makeLlm({
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
status: 'inactive', inactiveSince: ancient,
})]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: recent }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.gcSweep();
expect(await llmRepo.findByName('vllm-local')).toBeNull();
expect(await agentRepo.findByName('pinned')).toBeNull();
});
it('heartbeat cascades to bump owned virtual agents', async () => {
const past = new Date(Date.now() - 10_000);
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', lastHeartbeatAt: past })]);
const agentRepo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'sess', lastHeartbeatAt: past })]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.heartbeat('sess');
const a = await agentRepo.findByName('local-coder');
expect(a!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
});
});