diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 9814b74..cb65a87 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -435,10 +435,8 @@ async function main(): Promise { adapters: llmAdapters, log: { warn: (msg) => app.log.warn(msg) }, }); - // Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker - // is started below after `app.listen` so it doesn't fire before the - // server is accepting traffic. - const virtualLlmService = new VirtualLlmService(llmRepo); + // VirtualLlmService is constructed lower down (after AgentService) so + // it can wire the agent-cascade callbacks introduced in v3 Stage 2. // AgentService + ChatService get fully wired below once projectService and // mcpProxyService are constructed (ChatService needs them via the // ChatToolDispatcher bridge). @@ -465,6 +463,10 @@ async function main(): Promise { const personalityRepo = new PersonalityRepository(prisma); const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo); const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo); + // Virtual-provider state machine (kind=virtual rows for both Llms and + // Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade. + // The 60-s GC ticker is started below after `app.listen`. + const virtualLlmService = new VirtualLlmService(llmRepo, agentService); // ChatService needs the proxy + project repo via the ChatToolDispatcher // bridge. The dispatcher's logger references `app.log`, which is not // constructed until further down — `chatService` itself is built right diff --git a/src/mcpd/src/repositories/agent.repository.ts b/src/mcpd/src/repositories/agent.repository.ts index fb5c597..9f83541 100644 --- a/src/mcpd/src/repositories/agent.repository.ts +++ b/src/mcpd/src/repositories/agent.repository.ts @@ -1,4 +1,4 @@ -import type { PrismaClient, Agent, Prisma } from '@prisma/client'; +import type { PrismaClient, Agent, Prisma, LlmKind, LlmStatus } from '@prisma/client'; export interface CreateAgentRepoInput { name: string; @@ -11,6 +11,12 @@ export interface CreateAgentRepoInput { defaultParams?: Record; extras?: Record; ownerId: string; + // Virtual-agent lifecycle (omit for kind=public). + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface UpdateAgentRepoInput { @@ -22,6 +28,13 @@ export interface UpdateAgentRepoInput { proxyModelName?: string | null; defaultParams?: Record; extras?: Record; + // Virtual-agent lifecycle. AgentService is the only public writer; the + // VirtualAgentService methods (Stage 2) bypass the public CRUD path. + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface IAgentRepository { @@ -32,6 +45,11 @@ export interface IAgentRepository { create(data: CreateAgentRepoInput): Promise; update(id: string, data: UpdateAgentRepoInput): Promise; delete(id: string): Promise; + // Virtual-agent lifecycle helpers. + findBySessionId(sessionId: string): Promise; + findByLlmId(llmId: string): Promise; + findStaleVirtuals(heartbeatCutoff: Date): Promise; + findExpiredInactives(deletionCutoff: Date): Promise; } export class AgentRepository implements IAgentRepository { @@ -69,6 +87,11 @@ export class AgentRepository implements IAgentRepository { defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue, extras: (data.extras ?? {}) as Prisma.InputJsonValue, ownerId: data.ownerId, + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), }, }); } @@ -99,6 +122,11 @@ export class AgentRepository implements IAgentRepository { if (data.extras !== undefined) { updateData.extras = data.extras as Prisma.InputJsonValue; } + if (data.kind !== undefined) updateData.kind = data.kind; + if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; + if (data.status !== undefined) updateData.status = data.status; + if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt; + if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince; // Bump optimistic version on every update. updateData.version = { increment: 1 }; return this.prisma.agent.update({ where: { id }, data: updateData }); @@ -107,4 +135,40 @@ export class AgentRepository implements IAgentRepository { async delete(id: string): Promise { await this.prisma.agent.delete({ where: { id } }); } + + // ── Virtual-agent lifecycle queries ── + + async findBySessionId(sessionId: string): Promise { + return this.prisma.agent.findMany({ + where: { providerSessionId: sessionId }, + orderBy: { name: 'asc' }, + }); + } + + async findByLlmId(llmId: string): Promise { + return this.prisma.agent.findMany({ + where: { llmId }, + orderBy: { name: 'asc' }, + }); + } + + async findStaleVirtuals(heartbeatCutoff: Date): Promise { + return this.prisma.agent.findMany({ + where: { + kind: 'virtual', + status: 'active', + lastHeartbeatAt: { lt: heartbeatCutoff }, + }, + }); + } + + async findExpiredInactives(deletionCutoff: Date): Promise { + return this.prisma.agent.findMany({ + where: { + kind: 'virtual', + status: 'inactive', + inactiveSince: { lt: deletionCutoff }, + }, + }); + } } diff --git a/src/mcpd/src/services/agent.service.ts b/src/mcpd/src/services/agent.service.ts index 0be9e5f..caadcf2 100644 --- a/src/mcpd/src/services/agent.service.ts +++ b/src/mcpd/src/services/agent.service.ts @@ -33,12 +33,28 @@ export interface AgentView { proxyModelName: string | null; defaultParams: AgentChatParams; extras: Record; + // Virtual-agent lifecycle (defaults make public agents look like "active"). + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; + lastHeartbeatAt: Date | null; + inactiveSince: Date | null; ownerId: string; version: number; createdAt: Date; updatedAt: Date; } +/** Input shape mcplocal sends per virtual agent on register. */ +export interface VirtualAgentInput { + name: string; + llmName: string; + description?: string; + systemPrompt?: string; + project?: string; + defaultParams?: Record; + extras?: Record; +} + export class AgentService { constructor( private readonly repo: IAgentRepository, @@ -179,10 +195,162 @@ export class AgentService { proxyModelName: row.proxyModelName, defaultParams: row.defaultParams as AgentChatParams, extras: row.extras as Record, + kind: row.kind, + status: row.status, + lastHeartbeatAt: row.lastHeartbeatAt, + inactiveSince: row.inactiveSince, ownerId: row.ownerId, version: row.version, createdAt: row.createdAt, updatedAt: row.updatedAt, }; } + + // ── Virtual-agent lifecycle (v3) ── + + /** + * Sticky upsert of virtual agents owned by a publishing mcplocal session. + * Mirrors VirtualLlmService.register's semantics: + * - New agents → insert with kind=virtual / status=active. + * - Existing virtual agents owned by the same session → update + reactivate. + * - Existing virtual agents owned by a different session, but currently + * inactive → adopt (sticky reconnect after a session lapse). + * - Existing public agents OR foreign-active virtuals → 409 Conflict. + * - Pinned LLM must already exist (publisher posts Llms first in the same + * register payload). + */ + async registerVirtualAgents( + sessionId: string, + inputs: VirtualAgentInput[], + ownerId: string, + ): Promise { + const now = new Date(); + const out: AgentView[] = []; + for (const a of inputs) { + const llm = await this.llms.getByName(a.llmName); + const projectId = a.project !== undefined + ? (await this.projects.resolveAndGet(a.project)).id + : null; + const existing = await this.repo.findByName(a.name); + if (existing !== null) { + if (existing.kind === 'public') { + throw Object.assign( + new Error(`Cannot publish over public Agent: ${a.name}`), + { statusCode: 409 }, + ); + } + if (existing.providerSessionId !== sessionId && existing.status === 'active') { + throw Object.assign( + new Error(`Virtual Agent '${a.name}' is already active under a different session`), + { statusCode: 409 }, + ); + } + const updated = await this.repo.update(existing.id, { + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + llmId: llm.id, + projectId, + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + out.push(await this.toView(updated)); + continue; + } + const created = await this.repo.create({ + name: a.name, + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + llmId: llm.id, + projectId, + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + ownerId, + }); + out.push(await this.toView(created)); + } + return out; + } + + /** + * Bumps lastHeartbeatAt on every virtual agent owned by the session. + * Revives inactive rows. Called from VirtualLlmService.heartbeat so + * one publisher heartbeat covers both Llms and Agents. + */ + async heartbeatVirtualAgents(sessionId: string): Promise { + const owned = await this.repo.findBySessionId(sessionId); + if (owned.length === 0) return; + const now = new Date(); + for (const row of owned) { + await this.repo.update(row.id, { + lastHeartbeatAt: now, + ...(row.status === 'inactive' ? { status: 'active', inactiveSince: null } : {}), + }); + } + } + + /** Flip every virtual agent owned by the session to inactive immediately. */ + async markVirtualAgentsInactiveBySession(sessionId: string): Promise { + const owned = await this.repo.findBySessionId(sessionId); + const now = new Date(); + for (const row of owned) { + if (row.status === 'active') { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + } + } + } + + /** + * Cascade-delete virtual agents pinned to a virtual Llm. Called from + * VirtualLlmService.gcSweep BEFORE deleting the inactive Llm row, since + * Agent.llmId is `onDelete: Restrict` and would otherwise block the + * Llm delete. + */ + async deleteVirtualAgentsForLlm(llmId: string): Promise { + const pinned = await this.repo.findByLlmId(llmId); + let deleted = 0; + for (const row of pinned) { + if (row.kind !== 'virtual') continue; + await this.repo.delete(row.id); + deleted += 1; + } + return deleted; + } + + /** + * GC sweep for virtual agents — same shape as VirtualLlmService.gcSweep: + * 1. Heartbeat-stale active virtuals → inactive (90-s cutoff). + * 2. 4-h-old inactive virtuals → delete. + * Run BEFORE the LlmService GC sweep so any agent that would have + * blocked an Llm delete via Restrict has already been cleared. + */ + async gcSweepVirtualAgents(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> { + const HEARTBEAT_TIMEOUT_MS = 90_000; + const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; + let markedInactive = 0; + let deleted = 0; + + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); + const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); + for (const row of stale) { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + markedInactive += 1; + } + + const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); + const expired = await this.repo.findExpiredInactives(deletionCutoff); + for (const row of expired) { + await this.repo.delete(row.id); + deleted += 1; + } + return { markedInactive, deleted }; + } } diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts index af12bfa..fe70e9f 100644 --- a/src/mcpd/src/services/virtual-llm.service.ts +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -28,6 +28,7 @@ import { randomUUID } from 'node:crypto'; import type { ILlmRepository } from '../repositories/llm.repository.js'; import type { OpenAiChatRequest } from './llm/types.js'; import { NotFoundError } from './mcp-server.service.js'; +import type { AgentService } from './agent.service.js'; /** A virtual provider's announcement at registration time. */ export interface RegisterProviderInput { @@ -119,7 +120,16 @@ export class VirtualLlmService implements IVirtualLlmService { */ private readonly wakeInFlight = new Map>(); - constructor(private readonly repo: ILlmRepository) {} + constructor( + private readonly repo: ILlmRepository, + /** + * Optional. v3 wires AgentService here so the lifecycle cascades: + * heartbeat → bump owned agents; disconnect → mark agents inactive; + * gcSweep → sweep agents first, then delete pinned-to-Llm cascade + * before deleting the Llm itself (Agent.llmId is Restrict). + */ + private readonly agents?: AgentService, + ) {} async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise { const sessionId = input.providerSessionId ?? randomUUID(); @@ -184,7 +194,6 @@ export class VirtualLlmService implements IVirtualLlmService { async heartbeat(providerSessionId: string): Promise { const owned = await this.repo.findBySessionId(providerSessionId); - if (owned.length === 0) return; const now = new Date(); for (const row of owned) { // Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a @@ -196,6 +205,11 @@ export class VirtualLlmService implements IVirtualLlmService { : {}), }); } + // Cascade to virtual agents owned by the same session — same heartbeat + // covers them. No-op if AgentService isn't wired (older test configs). + if (this.agents !== undefined) { + await this.agents.heartbeatVirtualAgents(providerSessionId); + } } bindSession(providerSessionId: string, handle: VirtualSessionHandle): void { @@ -214,6 +228,10 @@ export class VirtualLlmService implements IVirtualLlmService { await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); } } + // Cascade to virtual agents owned by the same session. + if (this.agents !== undefined) { + await this.agents.markVirtualAgentsInactiveBySession(providerSessionId); + } // Reject any in-flight tasks for this session — the relay can't deliver // a result POST anymore. for (const t of this.tasksById.values()) { @@ -405,6 +423,16 @@ export class VirtualLlmService implements IVirtualLlmService { let markedInactive = 0; let deleted = 0; + // v3: sweep virtual agents FIRST so any Llm-pinned agent that's + // about to be cascaded (because its Llm is also expiring) is gone + // before we attempt to delete the Llm. Agent.llmId is Restrict and + // would otherwise block. + if (this.agents !== undefined) { + const agentSweep = await this.agents.gcSweepVirtualAgents(now); + markedInactive += agentSweep.markedInactive; + deleted += agentSweep.deleted; + } + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); for (const row of stale) { @@ -415,6 +443,13 @@ export class VirtualLlmService implements IVirtualLlmService { const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); const expired = await this.repo.findExpiredInactives(deletionCutoff); for (const row of expired) { + // Final defensive cascade: drop any virtual agents still pinned + // to this Llm (e.g. their lastHeartbeatAt happens to lag the + // Llm's by a few seconds and they didn't make this round's + // 4-h cutoff). Without this we'd hit a Restrict FK error. + if (this.agents !== undefined) { + await this.agents.deleteVirtualAgentsForLlm(row.id); + } await this.repo.delete(row.id); deleted += 1; } diff --git a/src/mcpd/tests/virtual-agent-service.test.ts b/src/mcpd/tests/virtual-agent-service.test.ts new file mode 100644 index 0000000..d5c8989 --- /dev/null +++ b/src/mcpd/tests/virtual-agent-service.test.ts @@ -0,0 +1,376 @@ +import { describe, it, expect, vi } from 'vitest'; +import { AgentService, type VirtualAgentInput } from '../src/services/agent.service.js'; +import { VirtualLlmService } from '../src/services/virtual-llm.service.js'; +import type { IAgentRepository } from '../src/repositories/agent.repository.js'; +import type { ILlmRepository } from '../src/repositories/llm.repository.js'; +import type { LlmService } from '../src/services/llm.service.js'; +import type { ProjectService } from '../src/services/project.service.js'; +import type { Agent, Llm } from '@prisma/client'; + +/** + * v3 Stage 2 — virtual-agent lifecycle methods on AgentService and the + * cascade callbacks wired into VirtualLlmService.gcSweep / heartbeat / + * unbindSession. Mirrors the shape of virtual-llm-service.test.ts but + * focused on the agent-side state machine + the Llm→Agent cascade. + */ + +const NOW = new Date(); + +function makeAgent(overrides: Partial = {}): Agent { + return { + id: `agent-${Math.random().toString(36).slice(2, 8)}`, + name: 'fake-agent', + description: '', + systemPrompt: '', + llmId: 'llm-1', + projectId: null, + defaultPersonalityId: null, + proxyModelName: null, + defaultParams: {} as Agent['defaultParams'], + extras: {} as Agent['extras'], + kind: 'virtual', + providerSessionId: 'sess-1', + lastHeartbeatAt: NOW, + status: 'active', + inactiveSince: null, + ownerId: 'owner-1', + version: 1, + createdAt: NOW, + updatedAt: NOW, + ...overrides, + }; +} + +function makeLlm(overrides: Partial = {}): Llm { + return { + id: `llm-${Math.random().toString(36).slice(2, 8)}`, + name: 'vllm-local', + type: 'openai', + model: 'm', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {} as Llm['extraConfig'], + kind: 'virtual', + providerSessionId: 'sess-1', + lastHeartbeatAt: NOW, + status: 'active', + inactiveSince: null, + version: 1, + createdAt: NOW, + updatedAt: NOW, + ...overrides, + }; +} + +function mockAgentRepo(initial: Agent[] = []): IAgentRepository { + const rows = new Map(initial.map((r) => [r.id, r])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const r of rows.values()) if (r.name === name) return r; + return null; + }), + findByProjectId: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((r) => r.providerSessionId === sid)), + findByLlmId: vi.fn(async (llmId: string) => + [...rows.values()].filter((r) => r.llmId === llmId)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'active' + && r.lastHeartbeatAt !== null + && r.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'inactive' + && r.inactiveSince !== null + && r.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeAgent({ + id: `agent-${String(counter)}`, + name: data.name, + description: data.description ?? '', + systemPrompt: data.systemPrompt ?? '', + llmId: data.llmId, + projectId: data.projectId ?? null, + kind: data.kind ?? 'public', + providerSessionId: data.providerSessionId ?? null, + status: data.status ?? 'active', + lastHeartbeatAt: data.lastHeartbeatAt ?? null, + inactiveSince: data.inactiveSince ?? null, + ownerId: data.ownerId, + }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Agent = { + ...existing, + ...(data.description !== undefined ? { description: data.description } : {}), + ...(data.systemPrompt !== undefined ? { systemPrompt: data.systemPrompt } : {}), + ...(data.llmId !== undefined ? { llmId: data.llmId } : {}), + ...(data.projectId !== undefined ? { projectId: data.projectId } : {}), + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), + }; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; +} + +function mockLlms(): LlmService { + return { + getById: vi.fn(async (id: string) => ({ id, name: 'vllm-local', type: 'openai', model: 'm', kind: 'virtual', status: 'active' })), + getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'm', kind: 'virtual', status: 'active' })), + } as unknown as LlmService; +} + +function mockProjects(): ProjectService { + return { + getById: vi.fn(async (id: string) => ({ id, name: 'mcpctl-dev' })), + resolveAndGet: vi.fn(async (idOrName: string) => ({ + id: idOrName === 'mcpctl-dev' ? 'proj-1' : 'proj-other', + name: idOrName, + })), + } as unknown as ProjectService; +} + +describe('AgentService — virtual-agent lifecycle (v3 Stage 2)', () => { + it('registerVirtualAgents inserts new rows with kind=virtual / status=active', async () => { + const repo = mockAgentRepo(); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const inputs: VirtualAgentInput[] = [ + { name: 'local-coder', llmName: 'vllm-local', description: 'd', systemPrompt: 's' }, + ]; + const out = await svc.registerVirtualAgents('sess-1', inputs, 'owner-1'); + expect(out).toHaveLength(1); + expect(out[0]!.kind).toBe('virtual'); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents reuses an existing row from the same session (sticky reconnect)', async () => { + const existing = makeAgent({ name: 'local-coder', providerSessionId: 'sess-1', status: 'inactive', inactiveSince: NOW }); + const repo = mockAgentRepo([existing]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const out = await svc.registerVirtualAgents( + 'sess-1', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + ); + expect(out[0]!.id).toBe(existing.id); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents adopts an inactive virtual from a different session', async () => { + const existing = makeAgent({ + name: 'local-coder', providerSessionId: 'old-session', + status: 'inactive', inactiveSince: NOW, + }); + const repo = mockAgentRepo([existing]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const out = await svc.registerVirtualAgents( + 'new-session', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + ); + expect(out[0]!.id).toBe(existing.id); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents refuses to overwrite a public agent (409)', async () => { + const repo = mockAgentRepo([makeAgent({ name: 'reviewer', kind: 'public', providerSessionId: null })]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await expect(svc.registerVirtualAgents( + 'sess-x', + [{ name: 'reviewer', llmName: 'vllm-local' }], + 'owner-1', + )).rejects.toThrow(/Cannot publish over public Agent/); + }); + + it('registerVirtualAgents refuses if another active session owns the name', async () => { + const repo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'other', status: 'active' })]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await expect(svc.registerVirtualAgents( + 'mine', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + )).rejects.toThrow(/already active under a different session/); + }); + + it('heartbeatVirtualAgents bumps + revives inactive', async () => { + const past = new Date(Date.now() - 5_000); + const a = makeAgent({ name: 'a', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past }); + const repo = mockAgentRepo([a]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await svc.heartbeatVirtualAgents('sess'); + const row = await repo.findByName('a'); + expect(row?.status).toBe('active'); + expect(row?.inactiveSince).toBeNull(); + expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); + + it('markVirtualAgentsInactiveBySession flips owned actives to inactive', async () => { + const repo = mockAgentRepo([ + makeAgent({ name: 'a', providerSessionId: 'sess' }), + makeAgent({ name: 'b', providerSessionId: 'sess' }), + makeAgent({ name: 'c', providerSessionId: 'other' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await svc.markVirtualAgentsInactiveBySession('sess'); + expect((await repo.findByName('a'))?.status).toBe('inactive'); + expect((await repo.findByName('b'))?.status).toBe('inactive'); + expect((await repo.findByName('c'))?.status).toBe('active'); + }); + + it('deleteVirtualAgentsForLlm deletes only virtuals pinned to that Llm', async () => { + const repo = mockAgentRepo([ + makeAgent({ name: 'v-1', llmId: 'doomed', kind: 'virtual' }), + makeAgent({ name: 'v-2', llmId: 'doomed', kind: 'virtual' }), + makeAgent({ name: 'pub-1', llmId: 'doomed', kind: 'public', providerSessionId: null }), + makeAgent({ name: 'v-other', llmId: 'safe', kind: 'virtual' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const deleted = await svc.deleteVirtualAgentsForLlm('doomed'); + expect(deleted).toBe(2); + expect(await repo.findByName('v-1')).toBeNull(); + expect(await repo.findByName('v-2')).toBeNull(); + expect(await repo.findByName('pub-1')).not.toBeNull(); + expect(await repo.findByName('v-other')).not.toBeNull(); + }); + + it('gcSweepVirtualAgents flips heartbeat-stale + deletes 4h-old inactive', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago, past 90s cutoff + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago, past 4h cutoff + const repo = mockAgentRepo([ + makeAgent({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + makeAgent({ name: 'old', providerSessionId: 'b', status: 'inactive', inactiveSince: ancient }), + makeAgent({ name: 'pub', providerSessionId: null, kind: 'public' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const r = await svc.gcSweepVirtualAgents(); + expect(r.markedInactive).toBe(1); + expect(r.deleted).toBe(1); + expect((await repo.findByName('stale'))?.status).toBe('inactive'); + expect(await repo.findByName('old')).toBeNull(); + expect(await repo.findByName('pub')).not.toBeNull(); + }); +}); + +describe('VirtualLlmService cascade through AgentService (v3 Stage 2)', () => { + function mockLlmRepo(initial: Llm[] = []): ILlmRepository { + const rows = new Map(initial.map((r) => [r.id, r])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const r of rows.values()) if (r.name === name) return r; + return null; + }), + findByTier: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((r) => r.providerSessionId === sid)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'active' + && r.lastHeartbeatAt !== null + && r.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'inactive' + && r.inactiveSince !== null + && r.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeLlm({ id: `llm-${String(counter)}`, name: data.name, type: data.type }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Llm = { ...existing, ...data } as Llm; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; + } + + it('unbindSession cascades to mark virtual agents inactive', async () => { + const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'local-coder', providerSessionId: 'sess' }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.unbindSession('sess'); + expect((await agentRepo.findByName('local-coder'))?.status).toBe('inactive'); + }); + + it('gcSweep deletes virtual agents BEFORE their pinned virtual Llm', async () => { + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); + const llmRepo = mockLlmRepo([makeLlm({ + id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess', + status: 'inactive', inactiveSince: ancient, + })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: ancient }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + const r = await svc.gcSweep(); + expect(r.deleted).toBeGreaterThanOrEqual(2); // 1 agent + 1 llm + expect(await llmRepo.findByName('vllm-local')).toBeNull(); + expect(await agentRepo.findByName('pinned')).toBeNull(); + }); + + it('gcSweep defensive cascade: still drops the agent when its heartbeat lagged the Llm', async () => { + // The Llm is past the 4h cutoff. The agent is inactive but only + // 1h old — wouldn't be GC'd by gcSweepVirtualAgents on its own. + // The defensive cascade in gcSweep deletes it anyway because the + // Restrict FK would otherwise block the Llm delete. + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); + const recent = new Date(Date.now() - 1 * 60 * 60 * 1000); + const llmRepo = mockLlmRepo([makeLlm({ + id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess', + status: 'inactive', inactiveSince: ancient, + })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: recent }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.gcSweep(); + expect(await llmRepo.findByName('vllm-local')).toBeNull(); + expect(await agentRepo.findByName('pinned')).toBeNull(); + }); + + it('heartbeat cascades to bump owned virtual agents', async () => { + const past = new Date(Date.now() - 10_000); + const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', lastHeartbeatAt: past })]); + const agentRepo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'sess', lastHeartbeatAt: past })]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.heartbeat('sess'); + const a = await agentRepo.findByName('local-coder'); + expect(a!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); +});