diff --git a/docs/agents.md b/docs/agents.md index b14688f..462dda2 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -204,5 +204,9 @@ mcpctl chat reviewer - [virtual-llms.md](./virtual-llms.md) — local LLMs (e.g. `vllm-local`) publishing themselves into `mcpctl get llm` so anyone can chat with them via `mcpctl chat-llm `. Inference is relayed through the - publishing mcplocal — mcpd never holds the local URL or key. + publishing mcplocal — mcpd never holds the local URL or key. **v3** + extends the same publishing model to **virtual agents** declared in + mcplocal config — they show up in `mcpctl get agent` with + `KIND=virtual / STATUS=active` and become chat-able via + `mcpctl chat ` like any other agent. - [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags. diff --git a/docs/virtual-llms.md b/docs/virtual-llms.md index 0a5629c..25dc089 100644 --- a/docs/virtual-llms.md +++ b/docs/virtual-llms.md @@ -199,10 +199,87 @@ provider doesn't come up within `maxWaitSeconds`), every queued infer is rejected with a clear error and the row stays `hibernating` — the next request gets a fresh wake attempt. +## Virtual agents (v3) + +Virtual agents extend the same publishing model to **agents** — named +LLM personas with their own system prompt and sampling defaults. mcplocal +declares them in its config alongside its providers, and the existing +`_provider-register` endpoint atomically publishes both Llms and Agents +in one round-trip. They show up under `mcpctl get agent` next to +manually-created public agents and become chat-able via +`mcpctl chat ` — no special command. + +### Declaring a virtual agent in mcplocal config + +```jsonc +// ~/.mcpctl/config.json +{ + "llm": { + "providers": [ + { "name": "vllm-local", "type": "vllm", "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", "publish": true } + ] + }, + "agents": [ + { + "name": "local-coder", + "llm": "vllm-local", + "description": "Local coding assistant on the workstation GPU", + "systemPrompt": "You are a senior engineer. Be terse.", + "defaultParams": { "temperature": 0.2 } + } + ] +} +``` + +`llm` references a published provider's name from the same config. Agents +pinned to a name that isn't being published are still forwarded to mcpd — +the server validates `llmName` and 404s with a clear message if it's +genuinely missing, which lets you point at a *public* Llm if you want. + +### Lifecycle + +Same shape as virtual Llms — 30 s heartbeat from mcplocal, 90 s +heartbeat-stale → status flips to `inactive`, 4 h inactive → row deleted +by mcpd's GC sweep. Heartbeats cover both Llms and Agents owned by the +session. + +The GC orders agent deletes **before** their pinned virtual Llm so the +`Agent.llmId onDelete: Restrict` FK doesn't block the sweep. + +### Listing + +```sh +$ mcpctl get agents +NAME KIND STATUS LLM PROJECT DESCRIPTION +local-coder virtual active vllm-local - Local coding assistant on… +reviewer public active qwen3-thinking mcpctl-development I review what you're shipping… +``` + +The `KIND` and `STATUS` columns are the v3 additions. Round-tripping +through `mcpctl get agent X -o yaml | mcpctl apply -f -` strips those +runtime fields cleanly so a virtual agent can be re-declared as a public +one (or vice versa) without manual editing. + +### Chatting + +```sh +$ mcpctl chat local-coder +> hello? +… streams through mcpd → SSE → mcplocal's vllm-local provider … +``` + +Same command as for public agents. Works because chat.service has a +`kind=virtual` branch that hands off to `VirtualLlmService.enqueueInferTask` +when the agent's pinned Llm is virtual. + +### Cluster-wide name uniqueness + +`Agent.name` is unique cluster-wide. Two mcplocals trying to publish the +same agent name collide on the second register with HTTP 409. Per-publisher +namespacing is a v4+ concern — same constraint as virtual Llms in v1. + ## Roadmap (later stages) -- **v3 — Virtual agents**: mcplocal publishes its local agent configs - (model + system prompt + sampling defaults) into mcpd's `Agent` table. - **v4 — LB pool by model**: agents can target a model name instead of a specific Llm; mcpd picks the healthiest pool member per request. - **v5 — Task queue**: persisted requests for hibernating/saturated @@ -211,18 +288,23 @@ the next request gets a fresh wake attempt. ## API surface (v1) ``` -POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[] } +POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[], agents[] } + v3: body accepts an optional `agents[]` array + alongside `providers[]`. Atomic publish; older + clients (providers-only) keep working. GET /api/v1/llms/_provider-stream → SSE channel; require x-mcpctl-provider-session header -POST /api/v1/llms/_provider-heartbeat → { providerSessionId } +POST /api/v1/llms/_provider-heartbeat → { providerSessionId } — bumps both Llms and Agents + owned by the session POST /api/v1/llms/_provider-task/:id/result → one of: { error: "msg" } { chunk: { data, done? } } { status, body } -GET /api/v1/llms → list (now includes kind, status, lastHeartbeatAt, inactiveSince) +GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince) POST /api/v1/llms//infer → routes through the SSE relay DELETE /api/v1/llms/ → delete unconditionally (also runs GC's job) +GET /api/v1/agents → list (v3: includes kind, status, lastHeartbeatAt, inactiveSince) ``` RBAC piggybacks on `view/edit/create:llms` — no new resource. Publishing diff --git a/src/cli/src/commands/chat.ts b/src/cli/src/commands/chat.ts index edece9c..f0772f1 100644 --- a/src/cli/src/commands/chat.ts +++ b/src/cli/src/commands/chat.ts @@ -151,7 +151,10 @@ async function runOneShot( const sec = Math.max(0.05, (Date.now() - startMs) / 1000); const words = (res.assistant.match(/\S+/g) ?? []).length; process.stdout.write(`${res.assistant}\n`); - process.stderr.write(styleStats(`(${String(words)}w · ${(words / sec).toFixed(1)} w/s · ${sec.toFixed(1)}s)`) + ` thread:${res.threadId}\n`); + // `thread: ` — single space after the colon, matching the streaming + // path (line 160 below) so any tooling/regex that watches one form picks + // up the other too. + process.stderr.write(styleStats(`(${String(words)}w · ${(words / sec).toFixed(1)} w/s · ${sec.toFixed(1)}s)`) + ` thread: ${res.threadId}\n`); return; } const bar = installStatusBar(); diff --git a/src/cli/src/commands/get.ts b/src/cli/src/commands/get.ts index ea78f33..f3e07ac 100644 --- a/src/cli/src/commands/get.ts +++ b/src/cli/src/commands/get.ts @@ -155,10 +155,17 @@ interface AgentRow { description: string; llm: { id: string; name: string }; project: { id: string; name: string } | null; + // v3: lifecycle fields. Public agents have kind=public/status=active and + // these never change — virtuals get them set/updated by mcpd's + // AgentService as the publishing mcplocal heartbeats and disconnects. + kind?: 'public' | 'virtual'; + status?: 'active' | 'inactive'; } const agentColumns: Column[] = [ { header: 'NAME', key: 'name' }, + { header: 'KIND', key: (r) => r.kind ?? 'public', width: 8 }, + { header: 'STATUS', key: (r) => r.status ?? 'active', width: 10 }, { header: 'LLM', key: (r) => r.llm.name, width: 24 }, { header: 'PROJECT', key: (r) => r.project?.name ?? '-', width: 20 }, { header: 'DESCRIPTION', key: (r) => truncate(r.description, 50) || '-', width: 50 }, @@ -408,8 +415,8 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string } const kind = RESOURCE_KIND[resource] ?? resource; return items.map((item) => { const cleaned = stripInternalFields(item as Record); - // Llm-specific: the new virtual-provider lifecycle fields collide with - // the apply-doc `kind` envelope (the schema uses `kind: public|virtual`) + // Llm-specific: the virtual-provider lifecycle fields collide with the + // apply-doc `kind` envelope (the schema uses `kind: public|virtual`) // and aren't apply-able anyway — they're derived runtime state managed // by VirtualLlmService. Drop them so YAML round-trips stay clean. if (resource === 'llms') { @@ -419,6 +426,17 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string } delete cleaned['inactiveSince']; delete cleaned['providerSessionId']; } + // Agent-specific: same shape as Llm — Agent gained kind/status/etc. in + // v3 Stage 1 (virtual agent lifecycle) and the schema-`kind` field + // shadows the apply-envelope `kind: agent`. Strip the same set so + // `get agent X -o yaml | apply -f -` round-trips without diff. + if (resource === 'agents') { + delete cleaned['kind']; + delete cleaned['status']; + delete cleaned['lastHeartbeatAt']; + delete cleaned['inactiveSince']; + delete cleaned['providerSessionId']; + } return { kind, ...cleaned }; }); } diff --git a/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql b/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql new file mode 100644 index 0000000..5cec191 --- /dev/null +++ b/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql @@ -0,0 +1,14 @@ +-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the +-- existing LlmKind / LlmStatus enums so we don't double-define them. +-- Existing rows backfill with kind='public' / status='active' so +-- nothing changes for manually-created agents. + +ALTER TABLE "Agent" + ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public', + ADD COLUMN "providerSessionId" TEXT, + ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3), + ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active', + ADD COLUMN "inactiveSince" TIMESTAMP(3); + +CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status"); +CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId"); diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index dfc83e5..8b64ad8 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -469,20 +469,26 @@ model BackupPending { // Per-call LiteLLM-style overrides stack on top of `defaultParams`. model Agent { - id String @id @default(cuid()) - name String @unique - description String @default("") // shown in MCP tools/list - systemPrompt String @default("") @db.Text // agent persona + id String @id @default(cuid()) + name String @unique + description String @default("") // shown in MCP tools/list + systemPrompt String @default("") @db.Text // agent persona llmId String projectId String? defaultPersonalityId String? // applied at chat time when no --personality flag proxyModelName String? // optional informational override - defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ... - extras Json @default("{}") // future LoRA / tool-allowlist + defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ... + extras Json @default("{}") // future LoRA / tool-allowlist + // ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ── + kind LlmKind @default(public) + providerSessionId String? // mcplocal session that owns this row when virtual + lastHeartbeatAt DateTime? + status LlmStatus @default(active) + inactiveSince DateTime? ownerId String - version Int @default(1) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + version Int @default(1) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt llm Llm @relation(fields: [llmId], references: [id], onDelete: Restrict) project Project? @relation(fields: [projectId], references: [id], onDelete: SetNull) @@ -497,6 +503,8 @@ model Agent { @@index([projectId]) @@index([ownerId]) @@index([defaultPersonalityId]) + @@index([kind, status]) + @@index([providerSessionId]) } // ── Personalities (named overlay bundles of prompts on top of an Agent) ── diff --git a/src/db/tests/agent-schema.test.ts b/src/db/tests/agent-schema.test.ts index 4fa3b0c..4c56dfb 100644 --- a/src/db/tests/agent-schema.test.ts +++ b/src/db/tests/agent-schema.test.ts @@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => { expect(reloaded?.defaultPersonalityId).toBeNull(); }); + // ── v3: Agent.kind virtual + lifecycle fields ── + + it('defaults a freshly inserted Agent to kind=public, status=active', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-default-kind'); + const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id }); + expect(agent.kind).toBe('public'); + expect(agent.status).toBe('active'); + expect(agent.providerSessionId).toBeNull(); + expect(agent.lastHeartbeatAt).toBeNull(); + expect(agent.inactiveSince).toBeNull(); + }); + + it('persists kind=virtual + lifecycle fields together', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-pub-virtual'); + const now = new Date(); + const agent = await prisma.agent.create({ + data: { + name: 'local-coder', + llmId: llm.id, + ownerId: user.id, + kind: 'virtual', + providerSessionId: 'sess-abc', + lastHeartbeatAt: now, + status: 'active', + }, + }); + expect(agent.kind).toBe('virtual'); + expect(agent.providerSessionId).toBe('sess-abc'); + expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime()); + }); + + it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-gc-agent'); + await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } }); + await prisma.agent.create({ + data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' }, + }); + await prisma.agent.create({ + data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() }, + }); + + const stale = await prisma.agent.findMany({ + where: { kind: 'virtual', status: 'inactive' }, + select: { name: true }, + }); + expect(stale.map((a) => a.name)).toEqual(['v-inactive']); + }); + + it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-sess-cascade'); + await prisma.agent.create({ + data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.agent.create({ + data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.agent.create({ + data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' }, + }); + + const owned = await prisma.agent.findMany({ + where: { providerSessionId: 'shared' }, + select: { name: true }, + orderBy: { name: 'asc' }, + }); + expect(owned.map((a) => a.name)).toEqual(['a', 'b']); + }); + it('binds the same prompt to multiple personalities of an agent', async () => { const user = await makeUser(); const llm = await makeLlm('llm-shared-prompt'); diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 03ce6fd..e54de89 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -435,10 +435,8 @@ async function main(): Promise { adapters: llmAdapters, log: { warn: (msg) => app.log.warn(msg) }, }); - // Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker - // is started below after `app.listen` so it doesn't fire before the - // server is accepting traffic. - const virtualLlmService = new VirtualLlmService(llmRepo); + // VirtualLlmService is constructed lower down (after AgentService) so + // it can wire the agent-cascade callbacks introduced in v3 Stage 2. // AgentService + ChatService get fully wired below once projectService and // mcpProxyService are constructed (ChatService needs them via the // ChatToolDispatcher bridge). @@ -465,6 +463,10 @@ async function main(): Promise { const personalityRepo = new PersonalityRepository(prisma); const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo); const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo); + // Virtual-provider state machine (kind=virtual rows for both Llms and + // Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade. + // The 60-s GC ticker is started below after `app.listen`. + const virtualLlmService = new VirtualLlmService(llmRepo, agentService); // ChatService needs the proxy + project repo via the ChatToolDispatcher // bridge. The dispatcher's logger references `app.log`, which is not // constructed until further down — `chatService` itself is built right @@ -607,6 +609,7 @@ async function main(): Promise { promptRepo, chatToolDispatcher, personalityRepo, + virtualLlmService, ); registerAgentChatRoutes(app, chatService); registerLlmInferRoutes(app, { @@ -627,7 +630,7 @@ async function main(): Promise { }); }, }); - registerVirtualLlmRoutes(app, virtualLlmService); + registerVirtualLlmRoutes(app, virtualLlmService, agentService); registerInstanceRoutes(app, instanceService); registerProjectRoutes(app, projectService); registerAuditLogRoutes(app, auditLogService); diff --git a/src/mcpd/src/repositories/agent.repository.ts b/src/mcpd/src/repositories/agent.repository.ts index fb5c597..9f83541 100644 --- a/src/mcpd/src/repositories/agent.repository.ts +++ b/src/mcpd/src/repositories/agent.repository.ts @@ -1,4 +1,4 @@ -import type { PrismaClient, Agent, Prisma } from '@prisma/client'; +import type { PrismaClient, Agent, Prisma, LlmKind, LlmStatus } from '@prisma/client'; export interface CreateAgentRepoInput { name: string; @@ -11,6 +11,12 @@ export interface CreateAgentRepoInput { defaultParams?: Record; extras?: Record; ownerId: string; + // Virtual-agent lifecycle (omit for kind=public). + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface UpdateAgentRepoInput { @@ -22,6 +28,13 @@ export interface UpdateAgentRepoInput { proxyModelName?: string | null; defaultParams?: Record; extras?: Record; + // Virtual-agent lifecycle. AgentService is the only public writer; the + // VirtualAgentService methods (Stage 2) bypass the public CRUD path. + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface IAgentRepository { @@ -32,6 +45,11 @@ export interface IAgentRepository { create(data: CreateAgentRepoInput): Promise; update(id: string, data: UpdateAgentRepoInput): Promise; delete(id: string): Promise; + // Virtual-agent lifecycle helpers. + findBySessionId(sessionId: string): Promise; + findByLlmId(llmId: string): Promise; + findStaleVirtuals(heartbeatCutoff: Date): Promise; + findExpiredInactives(deletionCutoff: Date): Promise; } export class AgentRepository implements IAgentRepository { @@ -69,6 +87,11 @@ export class AgentRepository implements IAgentRepository { defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue, extras: (data.extras ?? {}) as Prisma.InputJsonValue, ownerId: data.ownerId, + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), }, }); } @@ -99,6 +122,11 @@ export class AgentRepository implements IAgentRepository { if (data.extras !== undefined) { updateData.extras = data.extras as Prisma.InputJsonValue; } + if (data.kind !== undefined) updateData.kind = data.kind; + if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; + if (data.status !== undefined) updateData.status = data.status; + if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt; + if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince; // Bump optimistic version on every update. updateData.version = { increment: 1 }; return this.prisma.agent.update({ where: { id }, data: updateData }); @@ -107,4 +135,40 @@ export class AgentRepository implements IAgentRepository { async delete(id: string): Promise { await this.prisma.agent.delete({ where: { id } }); } + + // ── Virtual-agent lifecycle queries ── + + async findBySessionId(sessionId: string): Promise { + return this.prisma.agent.findMany({ + where: { providerSessionId: sessionId }, + orderBy: { name: 'asc' }, + }); + } + + async findByLlmId(llmId: string): Promise { + return this.prisma.agent.findMany({ + where: { llmId }, + orderBy: { name: 'asc' }, + }); + } + + async findStaleVirtuals(heartbeatCutoff: Date): Promise { + return this.prisma.agent.findMany({ + where: { + kind: 'virtual', + status: 'active', + lastHeartbeatAt: { lt: heartbeatCutoff }, + }, + }); + } + + async findExpiredInactives(deletionCutoff: Date): Promise { + return this.prisma.agent.findMany({ + where: { + kind: 'virtual', + status: 'inactive', + inactiveSince: { lt: deletionCutoff }, + }, + }); + } } diff --git a/src/mcpd/src/routes/virtual-llms.ts b/src/mcpd/src/routes/virtual-llms.ts index 78a0e67..d92bc8f 100644 --- a/src/mcpd/src/routes/virtual-llms.ts +++ b/src/mcpd/src/routes/virtual-llms.ts @@ -17,6 +17,7 @@ */ import type { FastifyInstance, FastifyReply } from 'fastify'; import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js'; +import type { AgentService, VirtualAgentInput } from '../services/agent.service.js'; const SSE_PING_MS = 20_000; const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; @@ -24,8 +25,15 @@ const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; export function registerVirtualLlmRoutes( app: FastifyInstance, service: VirtualLlmService, + /** + * Optional. v3 wires AgentService here so the register endpoint can + * also accept an `agents` array alongside `providers` and atomic-publish + * both. Absent (older test wirings): the route still works for Llm-only + * publishers, agents in the payload are ignored with a warning. + */ + agentService?: AgentService, ): void { - app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>( + app.post<{ Body: { providerSessionId?: string; providers?: unknown[]; agents?: unknown[] } }>( '/api/v1/llms/_provider-register', async (request, reply) => { const body = (request.body ?? {}); @@ -34,14 +42,29 @@ export function registerVirtualLlmRoutes( reply.code(400); return { error: '`providers` array is required and must be non-empty' }; } + const agentsInput = Array.isArray(body.agents) ? body.agents : null; try { const result = await service.register({ providerSessionId: body.providerSessionId ?? null, providers: providers.map(coerceProviderInput), }); + // v3: atomically publish virtual agents tied to the same session. + // If the caller didn't include an agents array, skip silently. + let agents: unknown[] = []; + if (agentsInput !== null && agentsInput.length > 0) { + if (agentService === undefined) { + app.log.warn('virtual-llm register received `agents` but AgentService is not wired'); + } else { + agents = await agentService.registerVirtualAgents( + result.providerSessionId, + agentsInput.map(coerceAgentInput), + request.userId ?? 'system', + ); + } + } reply.code(201); - return result; + return { ...result, agents }; } catch (err) { const status = (err as { statusCode?: number }).statusCode ?? 500; reply.code(status); @@ -142,6 +165,33 @@ export function registerVirtualLlmRoutes( ); } +/** Narrow an unknown agents array element into the service's input shape (v3). */ +function coerceAgentInput(raw: unknown): VirtualAgentInput { + if (raw === null || typeof raw !== 'object') { + throw Object.assign(new Error('agent entry must be an object'), { statusCode: 400 }); + } + const o = raw as Record; + const name = o['name']; + const llmName = o['llmName']; + if (typeof name !== 'string' || typeof llmName !== 'string') { + throw Object.assign( + new Error('agent entry requires string `name` and `llmName`'), + { statusCode: 400 }, + ); + } + const out: VirtualAgentInput = { name, llmName }; + if (typeof o['description'] === 'string') out.description = o['description']; + if (typeof o['systemPrompt'] === 'string') out.systemPrompt = o['systemPrompt']; + if (typeof o['project'] === 'string') out.project = o['project']; + if (o['defaultParams'] !== null && typeof o['defaultParams'] === 'object') { + out.defaultParams = o['defaultParams'] as Record; + } + if (o['extras'] !== null && typeof o['extras'] === 'object') { + out.extras = o['extras'] as Record; + } + return out; +} + /** Narrow an unknown providers array element into the service's input shape. */ function coerceProviderInput(raw: unknown): { name: string; diff --git a/src/mcpd/src/services/agent.service.ts b/src/mcpd/src/services/agent.service.ts index 0be9e5f..caadcf2 100644 --- a/src/mcpd/src/services/agent.service.ts +++ b/src/mcpd/src/services/agent.service.ts @@ -33,12 +33,28 @@ export interface AgentView { proxyModelName: string | null; defaultParams: AgentChatParams; extras: Record; + // Virtual-agent lifecycle (defaults make public agents look like "active"). + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; + lastHeartbeatAt: Date | null; + inactiveSince: Date | null; ownerId: string; version: number; createdAt: Date; updatedAt: Date; } +/** Input shape mcplocal sends per virtual agent on register. */ +export interface VirtualAgentInput { + name: string; + llmName: string; + description?: string; + systemPrompt?: string; + project?: string; + defaultParams?: Record; + extras?: Record; +} + export class AgentService { constructor( private readonly repo: IAgentRepository, @@ -179,10 +195,162 @@ export class AgentService { proxyModelName: row.proxyModelName, defaultParams: row.defaultParams as AgentChatParams, extras: row.extras as Record, + kind: row.kind, + status: row.status, + lastHeartbeatAt: row.lastHeartbeatAt, + inactiveSince: row.inactiveSince, ownerId: row.ownerId, version: row.version, createdAt: row.createdAt, updatedAt: row.updatedAt, }; } + + // ── Virtual-agent lifecycle (v3) ── + + /** + * Sticky upsert of virtual agents owned by a publishing mcplocal session. + * Mirrors VirtualLlmService.register's semantics: + * - New agents → insert with kind=virtual / status=active. + * - Existing virtual agents owned by the same session → update + reactivate. + * - Existing virtual agents owned by a different session, but currently + * inactive → adopt (sticky reconnect after a session lapse). + * - Existing public agents OR foreign-active virtuals → 409 Conflict. + * - Pinned LLM must already exist (publisher posts Llms first in the same + * register payload). + */ + async registerVirtualAgents( + sessionId: string, + inputs: VirtualAgentInput[], + ownerId: string, + ): Promise { + const now = new Date(); + const out: AgentView[] = []; + for (const a of inputs) { + const llm = await this.llms.getByName(a.llmName); + const projectId = a.project !== undefined + ? (await this.projects.resolveAndGet(a.project)).id + : null; + const existing = await this.repo.findByName(a.name); + if (existing !== null) { + if (existing.kind === 'public') { + throw Object.assign( + new Error(`Cannot publish over public Agent: ${a.name}`), + { statusCode: 409 }, + ); + } + if (existing.providerSessionId !== sessionId && existing.status === 'active') { + throw Object.assign( + new Error(`Virtual Agent '${a.name}' is already active under a different session`), + { statusCode: 409 }, + ); + } + const updated = await this.repo.update(existing.id, { + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + llmId: llm.id, + projectId, + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + out.push(await this.toView(updated)); + continue; + } + const created = await this.repo.create({ + name: a.name, + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + llmId: llm.id, + projectId, + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + ownerId, + }); + out.push(await this.toView(created)); + } + return out; + } + + /** + * Bumps lastHeartbeatAt on every virtual agent owned by the session. + * Revives inactive rows. Called from VirtualLlmService.heartbeat so + * one publisher heartbeat covers both Llms and Agents. + */ + async heartbeatVirtualAgents(sessionId: string): Promise { + const owned = await this.repo.findBySessionId(sessionId); + if (owned.length === 0) return; + const now = new Date(); + for (const row of owned) { + await this.repo.update(row.id, { + lastHeartbeatAt: now, + ...(row.status === 'inactive' ? { status: 'active', inactiveSince: null } : {}), + }); + } + } + + /** Flip every virtual agent owned by the session to inactive immediately. */ + async markVirtualAgentsInactiveBySession(sessionId: string): Promise { + const owned = await this.repo.findBySessionId(sessionId); + const now = new Date(); + for (const row of owned) { + if (row.status === 'active') { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + } + } + } + + /** + * Cascade-delete virtual agents pinned to a virtual Llm. Called from + * VirtualLlmService.gcSweep BEFORE deleting the inactive Llm row, since + * Agent.llmId is `onDelete: Restrict` and would otherwise block the + * Llm delete. + */ + async deleteVirtualAgentsForLlm(llmId: string): Promise { + const pinned = await this.repo.findByLlmId(llmId); + let deleted = 0; + for (const row of pinned) { + if (row.kind !== 'virtual') continue; + await this.repo.delete(row.id); + deleted += 1; + } + return deleted; + } + + /** + * GC sweep for virtual agents — same shape as VirtualLlmService.gcSweep: + * 1. Heartbeat-stale active virtuals → inactive (90-s cutoff). + * 2. 4-h-old inactive virtuals → delete. + * Run BEFORE the LlmService GC sweep so any agent that would have + * blocked an Llm delete via Restrict has already been cleared. + */ + async gcSweepVirtualAgents(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> { + const HEARTBEAT_TIMEOUT_MS = 90_000; + const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; + let markedInactive = 0; + let deleted = 0; + + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); + const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); + for (const row of stale) { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + markedInactive += 1; + } + + const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); + const expired = await this.repo.findExpiredInactives(deletionCutoff); + for (const row of expired) { + await this.repo.delete(row.id); + deleted += 1; + } + return { markedInactive, deleted }; + } } diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index fc375b7..ff6aba8 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -31,6 +31,7 @@ import type { } from '../repositories/chat.repository.js'; import type { IPromptRepository } from '../repositories/prompt.repository.js'; import type { IPersonalityRepository } from '../repositories/personality.repository.js'; +import type { IVirtualLlmService } from './virtual-llm.service.js'; import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js'; import type { AgentChatParams } from '../validation/agent.schema.js'; import { NotFoundError } from './mcp-server.service.js'; @@ -132,6 +133,14 @@ export class ChatService { private readonly promptRepo: IPromptRepository, private readonly tools: ChatToolDispatcher, private readonly personalities?: IPersonalityRepository, + /** + * v3: when an Agent is pinned to a `kind=virtual` Llm, inference is + * relayed via this service rather than an HTTP adapter (the virtual + * row has no upstream URL). Optional so older test wirings still + * compile; in those tests the chat path will refuse virtual Llms + * with a clear error. + */ + private readonly virtualLlms?: IVirtualLlmService, ) {} async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> { @@ -170,19 +179,16 @@ export class ChatService { let lastTurnIndex = ctx.startingTurnIndex; try { for (let i = 0; i < ctx.maxIterations; i += 1) { - const adapter = this.adapters.get(ctx.llmType); - const result = await adapter.infer({ - body: this.buildBody(ctx), - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, - }); + const result = await this.runOneInference(ctx); const choice = extractChoice(result.body); if (choice === null) { throw new Error(`Adapter returned no choice (status ${String(result.status)})`); } if (choice.tool_calls !== undefined && choice.tool_calls.length > 0) { + // Tool turns: keep `content` literal — even if empty — because the + // OpenAI tool-use protocol expects the assistant message to carry + // its tool_calls separately from any free-form text. Surfacing + // reasoning here would confuse downstream tool dispatchers. const assistantTurn = await this.chatRepo.appendMessage({ threadId: ctx.threadId, role: 'assistant', @@ -217,13 +223,17 @@ export class ChatService { await this.chatRepo.updateStatus(assistantTurn.id, 'complete'); continue; } - // Terminal text turn. + // Terminal text turn. Use pickAssistantText so thinking models that + // produced only reasoning_content still yield a usable answer (with + // a truncation marker when finish_reason indicates max_tokens + // cut-off). Empty body remains empty and bubbles up unchanged. + const assistantText = pickAssistantText(choice); const finalMsg = await this.chatRepo.appendMessage({ threadId: ctx.threadId, role: 'assistant', - content: choice.content ?? '', + content: assistantText, }); - assistantFinal = choice.content ?? ''; + assistantFinal = assistantText; lastTurnIndex = finalMsg.turnIndex; await this.chatRepo.touchThread(ctx.threadId); return { threadId: ctx.threadId, assistant: assistantFinal, turnIndex: lastTurnIndex }; @@ -240,19 +250,20 @@ export class ChatService { const ctx = await this.prepareContext(args); try { for (let i = 0; i < ctx.maxIterations; i += 1) { - const adapter = this.adapters.get(ctx.llmType); - const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = { + // `reasoning` is accumulated alongside `content` so we can fall back + // to it when the model produces no `content` (thinking models with a + // tight max_tokens, or providers that don't separate the two). + const accumulated: { + content: string; + reasoning: string; + toolCalls: Array<{ id: string; name: string; argumentsJson: string }>; + } = { content: '', + reasoning: '', toolCalls: [], }; let finishReason: string | null = null; - for await (const chunk of adapter.stream({ - body: { ...this.buildBody(ctx), stream: true }, - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, - })) { + for await (const chunk of this.streamInference(ctx)) { if (chunk.done === true) break; if (chunk.data === '[DONE]') break; const evt = parseStreamingChunk(chunk.data); @@ -262,9 +273,11 @@ export class ChatService { yield { type: 'text', delta: evt.contentDelta }; } if (evt.reasoningDelta !== undefined) { - // Reasoning is not persisted to the thread (it's the model's - // scratchpad, not part of the conversation) — only streamed so - // the REPL can show progress while the model thinks. + // Streamed live so the REPL can show progress while the model + // thinks. Also accumulated so a thinking-only response (no + // `content`) still produces a non-empty persisted assistant + // turn — see the fallback at the end of this loop iteration. + accumulated.reasoning += evt.reasoningDelta; yield { type: 'thinking', delta: evt.reasoningDelta }; } if (evt.toolCallDeltas !== undefined) { @@ -331,10 +344,27 @@ export class ChatService { continue; } + // Fall back to reasoning when the model emitted only thinking + // output. Mirrors pickAssistantText() in the non-streaming path — + // same situation (thinking model + tight max_tokens, or a provider + // that bundles the answer into reasoning_content). + const persistedContent = pickAssistantText({ + content: accumulated.content.length > 0 ? accumulated.content : null, + ...(accumulated.reasoning.length > 0 ? { reasoning: accumulated.reasoning } : {}), + finishReason, + }); + // If we synthesized text from reasoning, yield it as a final `text` + // delta so the client's stdout matches what the thread persists. + // Without this, the REPL would show only `thinking` deltas (which + // the CLI writes to stderr) and stdout would be empty for any + // thinking-only response. + if (accumulated.content.length === 0 && persistedContent.length > 0) { + yield { type: 'text', delta: persistedContent }; + } const finalMsg = await this.chatRepo.appendMessage({ threadId: ctx.threadId, role: 'assistant', - content: accumulated.content, + content: persistedContent, }); await this.chatRepo.touchThread(ctx.threadId); yield { type: 'final', threadId: ctx.threadId, turnIndex: finalMsg.turnIndex }; @@ -347,12 +377,130 @@ export class ChatService { } } + /** + * Streaming counterpart of runOneInference. Yields raw OpenAI-style + * SSE chunks ({data: string; done?: boolean}) regardless of whether + * we're hitting a public adapter or relaying through VirtualLlmService. + * The caller's `parseStreamingChunk` already speaks OpenAI shape, so + * downstream code doesn't need to know which path produced the chunks. + */ + private async *streamInference(ctx: { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }): AsyncGenerator<{ data: string; done?: boolean }> { + if (ctx.llmKind !== 'virtual') { + const adapter = this.adapters.get(ctx.llmType); + yield* adapter.stream({ + body: { ...this.buildBody(ctx), stream: true }, + modelOverride: ctx.modelOverride, + apiKey: ctx.apiKey, + url: ctx.url, + extraConfig: ctx.extraConfig, + }); + return; + } + if (this.virtualLlms === undefined) { + throw new Error( + 'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm', + ); + } + // Bridge VirtualLlmService's onChunk callback API to an async + // iterator. Chunks land on the queue from the SSE relay; the + // generator drains them in order. ref.done resolves when the + // publisher emits its `[DONE]` marker. + const ref = await this.virtualLlms.enqueueInferTask( + ctx.llmName, + { ...this.buildBody(ctx), stream: true }, + true, + ); + const queue: Array<{ data: string; done?: boolean }> = []; + let resolveTick: (() => void) | null = null; + const wake = (): void => { + const r = resolveTick; + resolveTick = null; + if (r !== null) r(); + }; + const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); }); + let finished = false; + let failure: Error | null = null; + ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); }); + + try { + while (true) { + while (queue.length > 0) { + const c = queue.shift()!; + yield c; + if (c.done === true) return; + } + if (finished) { + if (failure !== null) throw failure; + return; + } + await new Promise((r) => { resolveTick = r; }); + } + } finally { + unsubscribe(); + } + } + + /** + * Run a single non-streaming inference iteration. Branches on + * ctx.llmKind: public goes through the existing adapter registry, + * virtual relays through VirtualLlmService.enqueueInferTask (mirrors + * the same branch in `routes/llm-infer.ts` from v1 Stage 3). + * + * Throws when virtualLlms isn't wired but the row is virtual — older + * test wirings hit this path. + */ + private async runOneInference(ctx: { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }): Promise<{ status: number; body: unknown }> { + if (ctx.llmKind === 'virtual') { + if (this.virtualLlms === undefined) { + throw new Error( + 'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm', + ); + } + const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false); + return ref.done; + } + const adapter = this.adapters.get(ctx.llmType); + return adapter.infer({ + body: this.buildBody(ctx), + modelOverride: ctx.modelOverride, + apiKey: ctx.apiKey, + url: ctx.url, + extraConfig: ctx.extraConfig, + }); + } + private async prepareContext(args: ChatRequestArgs): Promise<{ threadId: string; history: OpenAiMessage[]; systemBlock: string; llmName: string; llmType: string; + /** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */ + llmKind: 'public' | 'virtual'; modelOverride: string; url: string; apiKey: string; @@ -435,6 +583,7 @@ export class ChatService { systemBlock, llmName: llm.name, llmType: llm.type, + llmKind: llm.kind, modelOverride: llm.model, url: llm.url, apiKey, @@ -568,6 +717,17 @@ export class ChatService { interface ExtractedChoice { content: string | null; + /** + * Reasoning text emitted by thinking models (qwen3-thinking, + * deepseek-reasoner, OpenAI o1 family). Different providers spell the + * field differently; the parser accepts every shape the streaming + * counterpart already accepts (see `parseStreamingChunk`). When `content` + * is null/empty, callers fall back to this so thinking models that + * exhaust their token budget on reasoning still produce a usable answer. + */ + reasoning?: string; + /** OpenAI's stop reason — `'stop' | 'length' | 'tool_calls' | 'content_filter' | ...`. */ + finishReason?: string | null; tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>; } @@ -575,17 +735,52 @@ function extractChoice(body: unknown): ExtractedChoice | null { if (typeof body !== 'object' || body === null) return null; const choices = (body as { choices?: unknown }).choices; if (!Array.isArray(choices) || choices.length === 0) return null; - const first = choices[0] as { message?: { content?: unknown; tool_calls?: unknown } } | undefined; + const first = choices[0] as { + message?: { + content?: unknown; + reasoning_content?: unknown; + reasoning?: unknown; + provider_specific_fields?: { reasoning_content?: unknown; reasoning?: unknown }; + tool_calls?: unknown; + }; + finish_reason?: unknown; + } | undefined; if (first?.message === undefined) return null; const content = typeof first.message.content === 'string' ? first.message.content : null; + const m = first.message; + const reasoning = + (typeof m.reasoning_content === 'string' && m.reasoning_content.length > 0 ? m.reasoning_content : undefined) + ?? (typeof m.reasoning === 'string' && m.reasoning.length > 0 ? m.reasoning : undefined) + ?? (typeof m.provider_specific_fields?.reasoning_content === 'string' && m.provider_specific_fields.reasoning_content.length > 0 ? m.provider_specific_fields.reasoning_content : undefined) + ?? (typeof m.provider_specific_fields?.reasoning === 'string' && m.provider_specific_fields.reasoning.length > 0 ? m.provider_specific_fields.reasoning : undefined); + const finishReason = typeof first.finish_reason === 'string' ? first.finish_reason : null; const toolCalls = first.message.tool_calls; - const out: ExtractedChoice = { content }; + const out: ExtractedChoice = { content, finishReason }; + if (reasoning !== undefined) out.reasoning = reasoning; if (Array.isArray(toolCalls)) { out.tool_calls = toolCalls as NonNullable; } return out; } +/** + * Pick what text to surface (and persist) as the assistant's reply. + * Thinking models sometimes emit only `reasoning_content` and leave + * `content` null — typically when `max_tokens` is too small for the + * thinking budget, but also when the provider configuration just doesn't + * separate the two. In that case the reasoning IS the answer for this + * request, and the caller should see it. A `length` finish_reason marker + * makes truncation visible so users can fix their max_tokens config. + */ +function pickAssistantText(choice: ExtractedChoice): string { + if (choice.content !== null && choice.content.length > 0) return choice.content; + if (choice.reasoning !== undefined && choice.reasoning.length > 0) { + const truncated = choice.finishReason === 'length' ? '\n\n[response truncated by max_tokens]' : ''; + return `${choice.reasoning}${truncated}`; + } + return ''; +} + function safeParseJson(s: string): unknown { if (s === '') return {}; try { diff --git a/src/mcpd/src/services/llm/adapters/openai-passthrough.ts b/src/mcpd/src/services/llm/adapters/openai-passthrough.ts index ddad8e2..574b57f 100644 --- a/src/mcpd/src/services/llm/adapters/openai-passthrough.ts +++ b/src/mcpd/src/services/llm/adapters/openai-passthrough.ts @@ -123,7 +123,15 @@ export class OpenAiPassthroughAdapter implements LlmAdapter { } private endpointUrl(url: string): string { - if (url !== '') return url.replace(/\/+$/, ''); + // Accept both conventional forms users actually paste — base host + // (`https://api.openai.com`) and base + version (`https://api.openai.com/v1`). + // Every OpenAI-compat provider documents their endpoint with the `/v1` + // suffix, so users naturally include it; the adapter then re-appends + // `/v1/chat/completions`, producing a doubled-`/v1` 404 against LiteLLM + // and others. Strip a trailing `/v1` (with or without slash) so both + // shapes resolve to the same canonical base. A more specific suffix + // like `/v1beta` is preserved. + if (url !== '') return url.replace(/\/+$/, '').replace(/\/v1$/, ''); const def = DEFAULT_URLS[this.kind]; if (def === undefined) { throw new Error(`${this.kind}: url is required (no default endpoint for this provider)`); diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts index af12bfa..fe70e9f 100644 --- a/src/mcpd/src/services/virtual-llm.service.ts +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -28,6 +28,7 @@ import { randomUUID } from 'node:crypto'; import type { ILlmRepository } from '../repositories/llm.repository.js'; import type { OpenAiChatRequest } from './llm/types.js'; import { NotFoundError } from './mcp-server.service.js'; +import type { AgentService } from './agent.service.js'; /** A virtual provider's announcement at registration time. */ export interface RegisterProviderInput { @@ -119,7 +120,16 @@ export class VirtualLlmService implements IVirtualLlmService { */ private readonly wakeInFlight = new Map>(); - constructor(private readonly repo: ILlmRepository) {} + constructor( + private readonly repo: ILlmRepository, + /** + * Optional. v3 wires AgentService here so the lifecycle cascades: + * heartbeat → bump owned agents; disconnect → mark agents inactive; + * gcSweep → sweep agents first, then delete pinned-to-Llm cascade + * before deleting the Llm itself (Agent.llmId is Restrict). + */ + private readonly agents?: AgentService, + ) {} async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise { const sessionId = input.providerSessionId ?? randomUUID(); @@ -184,7 +194,6 @@ export class VirtualLlmService implements IVirtualLlmService { async heartbeat(providerSessionId: string): Promise { const owned = await this.repo.findBySessionId(providerSessionId); - if (owned.length === 0) return; const now = new Date(); for (const row of owned) { // Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a @@ -196,6 +205,11 @@ export class VirtualLlmService implements IVirtualLlmService { : {}), }); } + // Cascade to virtual agents owned by the same session — same heartbeat + // covers them. No-op if AgentService isn't wired (older test configs). + if (this.agents !== undefined) { + await this.agents.heartbeatVirtualAgents(providerSessionId); + } } bindSession(providerSessionId: string, handle: VirtualSessionHandle): void { @@ -214,6 +228,10 @@ export class VirtualLlmService implements IVirtualLlmService { await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); } } + // Cascade to virtual agents owned by the same session. + if (this.agents !== undefined) { + await this.agents.markVirtualAgentsInactiveBySession(providerSessionId); + } // Reject any in-flight tasks for this session — the relay can't deliver // a result POST anymore. for (const t of this.tasksById.values()) { @@ -405,6 +423,16 @@ export class VirtualLlmService implements IVirtualLlmService { let markedInactive = 0; let deleted = 0; + // v3: sweep virtual agents FIRST so any Llm-pinned agent that's + // about to be cascaded (because its Llm is also expiring) is gone + // before we attempt to delete the Llm. Agent.llmId is Restrict and + // would otherwise block. + if (this.agents !== undefined) { + const agentSweep = await this.agents.gcSweepVirtualAgents(now); + markedInactive += agentSweep.markedInactive; + deleted += agentSweep.deleted; + } + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); for (const row of stale) { @@ -415,6 +443,13 @@ export class VirtualLlmService implements IVirtualLlmService { const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); const expired = await this.repo.findExpiredInactives(deletionCutoff); for (const row of expired) { + // Final defensive cascade: drop any virtual agents still pinned + // to this Llm (e.g. their lastHeartbeatAt happens to lag the + // Llm's by a few seconds and they didn't make this round's + // 4-h cutoff). Without this we'd hit a Restrict FK error. + if (this.agents !== undefined) { + await this.agents.deleteVirtualAgentsForLlm(row.id); + } await this.repo.delete(row.id); deleted += 1; } diff --git a/src/mcpd/tests/chat-service-virtual-llm.test.ts b/src/mcpd/tests/chat-service-virtual-llm.test.ts new file mode 100644 index 0000000..44f44d2 --- /dev/null +++ b/src/mcpd/tests/chat-service-virtual-llm.test.ts @@ -0,0 +1,251 @@ +import { describe, it, expect, vi } from 'vitest'; +import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js'; +import type { AgentService } from '../src/services/agent.service.js'; +import type { LlmService } from '../src/services/llm.service.js'; +import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js'; +import type { IChatRepository } from '../src/repositories/chat.repository.js'; +import type { IPromptRepository } from '../src/repositories/prompt.repository.js'; +import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js'; +import type { ChatMessage, ChatThread, Prompt } from '@prisma/client'; + +const NOW = new Date(); + +/** + * Tests targeting v3 Stage 1's chat.service kind=virtual branch. + * Mirror the existing chat-service.test.ts patterns but isolate the + * adapter-vs-relay dispatch decision. + */ + +function mockChatRepo(): IChatRepository { + const msgs: ChatMessage[] = []; + const threads: ChatThread[] = []; + let idCounter = 1; + return { + createThread: vi.fn(async ({ agentId, ownerId, title }) => { + const t: ChatThread = { + id: `thread-${String(idCounter++)}`, agentId, ownerId, + title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW, + }; + threads.push(t); + return t; + }), + findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null), + listThreadsByAgent: vi.fn(async () => []), + listMessages: vi.fn(async () => []), + appendMessage: vi.fn(async (input) => { + const m: ChatMessage = { + id: `msg-${String(idCounter++)}`, + threadId: input.threadId, + turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length, + role: input.role, + content: input.content, + toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'], + toolCallId: input.toolCallId ?? null, + status: input.status ?? 'complete', + createdAt: NOW, + }; + msgs.push(m); + return m; + }), + updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)), + markPendingAsError: vi.fn(async () => 0), + touchThread: vi.fn(async () => undefined), + nextTurnIndex: vi.fn(async () => msgs.length), + }; +} + +function mockAgents(): AgentService { + return { + getByName: vi.fn(async (name: string) => ({ + id: `agent-${name}`, name, description: '', + systemPrompt: 'You are a helpful agent.', + llm: { id: 'llm-1', name: 'vllm-local' }, + project: null, + defaultPersonality: null, + proxyModelName: null, + defaultParams: {}, + extras: {}, + ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW, + })), + } as unknown as AgentService; +} + +function mockLlmsVirtual(): LlmService { + return { + getByName: vi.fn(async (name: string) => ({ + id: 'llm-1', name, type: 'openai', model: 'fake', + url: '', tier: 'fast', description: '', + apiKeyRef: null, extraConfig: {}, + kind: 'virtual', + status: 'active', + lastHeartbeatAt: NOW, + inactiveSince: null, + version: 1, createdAt: NOW, updatedAt: NOW, + })), + resolveApiKey: vi.fn(async () => ''), + } as unknown as LlmService; +} + +function mockPromptRepo(): IPromptRepository { + return { + findAll: vi.fn(async () => []), + findGlobal: vi.fn(async () => []), + findByAgent: vi.fn(async () => []), + findById: vi.fn(async () => null), + findByNameAndProject: vi.fn(async () => null), + findByNameAndAgent: vi.fn(async () => null), + create: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + }; +} + +function mockTools(): ChatToolDispatcher { + return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) }; +} + +function emptyAdapterRegistry(): LlmAdapterRegistry { + return { + get: () => { throw new Error('adapter should not be used for kind=virtual'); }, + } as unknown as LlmAdapterRegistry; +} + +function mockVirtualLlms(opts: { + reply?: string; + rejectWith?: Error; + streamingChunks?: string[]; +}): IVirtualLlmService { + const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => { + if (opts.rejectWith !== undefined) { + return { + taskId: 't-1', + done: Promise.reject(opts.rejectWith), + onChunk: () => () => undefined, + }; + } + if (!streaming) { + const body = { + choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }], + }; + return { + taskId: 't-1', + done: Promise.resolve({ status: 200, body }), + onChunk: () => () => undefined, + }; + } + // Streaming path: collect subscribers, push the configured chunks + // synchronously, then resolve done. + const subs = new Set<(c: { data: string; done?: boolean }) => void>(); + const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}']; + return { + taskId: 't-1', + done: (async (): Promise<{ status: number; body: unknown }> => { + // Wait long enough for the caller to register subscribers + // before fanning chunks. Promise.resolve() isn't enough — the + // microtask running this IIFE is queued ahead of the caller's + // await on enqueueInferTask, so subs would still be empty. + await new Promise((r) => setTimeout(r, 0)); + for (const c of chunks) for (const s of subs) s({ data: c }); + for (const s of subs) s({ data: '[DONE]', done: true }); + return { status: 200, body: null }; + })(), + onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); }, + }; + }); + return { + register: vi.fn(), + heartbeat: vi.fn(), + bindSession: vi.fn(), + unbindSession: vi.fn(), + enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'], + completeTask: vi.fn(), + pushTaskChunk: vi.fn(), + failTask: vi.fn(), + gcSweep: vi.fn(), + }; +} + +describe('ChatService — kind=virtual branch (v3 Stage 1)', () => { + it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => { + const chatRepo = mockChatRepo(); + const virtual = mockVirtualLlms({ reply: 'hello back from local' }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + chatRepo, + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('hello back from local'); + expect(virtual.enqueueInferTask).toHaveBeenCalledWith( + 'vllm-local', + expect.objectContaining({ messages: expect.any(Array) }), + false, + ); + }); + + it('streaming relays through VirtualLlmService and emits the same text deltas', async () => { + const chatRepo = mockChatRepo(); + const virtual = mockVirtualLlms({ + streamingChunks: [ + '{"choices":[{"delta":{"content":"hello "}}]}', + '{"choices":[{"delta":{"content":"world"}}]}', + ], + }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + chatRepo, + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + const deltas: string[] = []; + for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) { + if (evt.type === 'text') deltas.push(evt.delta); + if (evt.type === 'final') break; + } + expect(deltas.join('')).toBe('hello world'); + expect(virtual.enqueueInferTask).toHaveBeenCalledWith( + 'vllm-local', + expect.objectContaining({ messages: expect.any(Array), stream: true }), + true, + ); + }); + + it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => { + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + mockChatRepo(), + mockPromptRepo(), + mockTools(), + // no personalities, no virtualLlms + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/virtualLlms dispatcher not wired/); + }); + + it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => { + const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + mockChatRepo(), + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/publisher offline/); + }); +}); diff --git a/src/mcpd/tests/chat-service.test.ts b/src/mcpd/tests/chat-service.test.ts index fe62c3c..11bdb07 100644 --- a/src/mcpd/tests/chat-service.test.ts +++ b/src/mcpd/tests/chat-service.test.ts @@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } | } as unknown as AgentService; } -function mockLlms(): LlmService { +function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService { return { getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', url: '', tier: 'fast', description: '', apiKeyRef: null, extraConfig: {}, + kind: opts.kind ?? 'public', + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, version: 1, createdAt: NOW, updatedAt: NOW, })), resolveApiKey: vi.fn(async () => 'fake-key'), @@ -457,6 +461,121 @@ describe('ChatService', () => { expect(assistantTurn?.content).not.toContain('Let me think'); }); + // Regression: thinking models with a tight max_tokens budget produce + // `reasoning_content` only and leave `content` null. Without falling back + // to reasoning, the assistant turn was empty and the smoke test saw an + // empty stdout. This covers BOTH chat() (non-streaming) and chatStream() + // (synthetic final text frame so the CLI's stdout matches what's + // persisted to the thread). + it('chat falls back to reasoning_content when content is null', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-truncated', + infer: vi.fn(async () => ({ + status: 200, + body: { + id: 'cmpl-1', + object: 'chat.completion', + choices: [{ + index: 0, + message: { role: 'assistant', content: null, reasoning_content: 'Thinking out loud about the answer' }, + finish_reason: 'stop', + }], + }, + })), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('Thinking out loud about the answer'); + const stored = chatRepo._msgs.find((m) => m.role === 'assistant'); + expect(stored?.content).toBe('Thinking out loud about the answer'); + }); + + it('chat appends [response truncated by max_tokens] when finish_reason is "length"', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-clipped', + infer: vi.fn(async () => ({ + status: 200, + body: { + choices: [{ + index: 0, + message: { role: 'assistant', content: null, reasoning_content: 'partial reasoning that ran out of' }, + finish_reason: 'length', + }], + }, + })), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toContain('partial reasoning that ran out of'); + expect(result.assistant).toContain('[response truncated by max_tokens]'); + }); + + it('chat prefers content when both content and reasoning_content are present', async () => { + // Thinking models that DO produce content shouldn't see the reasoning + // bleed into the response — that's what the streaming path's + // text/thinking split is for, and the non-streaming path should match. + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-with-content', + infer: vi.fn(async () => ({ + status: 200, + body: { + choices: [{ + index: 0, + message: { role: 'assistant', content: 'real answer', reasoning_content: 'background thinking' }, + finish_reason: 'stop', + }], + }, + })), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('real answer'); + expect(result.assistant).not.toContain('background thinking'); + }); + + it('chatStream emits a synthetic text frame and persists reasoning when content is empty', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-only-stream', + infer: vi.fn(), + stream: async function*() { + yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'thinking ' }, finish_reason: null }] }) }; + yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'more.' }, finish_reason: 'stop' }] }) }; + yield { data: '[DONE]', done: true }; + }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const chunks: Array<{ type: string; delta?: string }> = []; + for await (const c of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) { + chunks.push({ type: c.type, delta: c.delta }); + } + // 2 thinking deltas (live), 1 synthesized text frame, 1 final. + expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta)).toEqual(['thinking ', 'more.']); + expect(chunks.filter((c) => c.type === 'text').map((c) => c.delta)).toEqual(['thinking more.']); + // The thread message captures the synthesized text so resumed chats see + // a coherent assistant turn (rather than blank). + const stored = chatRepo._msgs.find((m) => m.role === 'assistant'); + expect(stored?.content).toBe('thinking more.'); + }); + // Regression: provider_specific_fields.reasoning_content shape (LiteLLM // passthrough from vLLM) is also recognized. it('chatStream recognizes LiteLLM provider_specific_fields.reasoning_content', async () => { diff --git a/src/mcpd/tests/llm-adapters.test.ts b/src/mcpd/tests/llm-adapters.test.ts index 045ac88..5b16f18 100644 --- a/src/mcpd/tests/llm-adapters.test.ts +++ b/src/mcpd/tests/llm-adapters.test.ts @@ -71,6 +71,36 @@ describe('OpenAiPassthroughAdapter', () => { await expect(adapter.infer(makeCtx())).rejects.toThrow(/no default endpoint/); }); + it('infer: strips a trailing /v1 from the configured URL', async () => { + // Users naturally paste the OpenAI-style base URL with /v1 because + // every provider documents it that way (https://api.openai.com/v1, + // https://llm.example.com/v1). The adapter then re-appends + // /v1/chat/completions; without normalization this would produce a + // doubled-/v1 404 against LiteLLM and friends. + const fetchFn = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]); + const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch }); + await adapter.infer(makeCtx({ url: 'https://llm.example.com/v1' })); + const [url1] = fetchFn.mock.calls[0] as [string]; + expect(url1).toBe('https://llm.example.com/v1/chat/completions'); + + // Trailing slash + /v1 should also normalize correctly. + const fetchFn2 = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]); + const adapter2 = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn2 as unknown as typeof fetch }); + await adapter2.infer(makeCtx({ url: 'https://llm.example.com/v1/' })); + const [url2] = fetchFn2.mock.calls[0] as [string]; + expect(url2).toBe('https://llm.example.com/v1/chat/completions'); + }); + + it('infer: preserves a trailing /v1beta suffix (only exact /v1 is stripped)', async () => { + // Some providers expose `/v1beta` as a parallel API surface — don't + // accidentally rewrite that to `/v1` or strip it. + const fetchFn = mockFetch([{ match: /\/v1beta\/v1\/chat\/completions$/, status: 200, body: {} }]); + const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch }); + await adapter.infer(makeCtx({ url: 'https://api.example.com/v1beta' })); + const [url] = fetchFn.mock.calls[0] as [string]; + expect(url).toBe('https://api.example.com/v1beta/v1/chat/completions'); + }); + it('infer: omits Authorization when apiKey is empty', async () => { const fetchFn = mockFetch([{ match: /ollama/, status: 200, body: {} }]); const adapter = new OpenAiPassthroughAdapter('ollama', { fetch: fetchFn as unknown as typeof fetch }); diff --git a/src/mcpd/tests/virtual-agent-service.test.ts b/src/mcpd/tests/virtual-agent-service.test.ts new file mode 100644 index 0000000..d5c8989 --- /dev/null +++ b/src/mcpd/tests/virtual-agent-service.test.ts @@ -0,0 +1,376 @@ +import { describe, it, expect, vi } from 'vitest'; +import { AgentService, type VirtualAgentInput } from '../src/services/agent.service.js'; +import { VirtualLlmService } from '../src/services/virtual-llm.service.js'; +import type { IAgentRepository } from '../src/repositories/agent.repository.js'; +import type { ILlmRepository } from '../src/repositories/llm.repository.js'; +import type { LlmService } from '../src/services/llm.service.js'; +import type { ProjectService } from '../src/services/project.service.js'; +import type { Agent, Llm } from '@prisma/client'; + +/** + * v3 Stage 2 — virtual-agent lifecycle methods on AgentService and the + * cascade callbacks wired into VirtualLlmService.gcSweep / heartbeat / + * unbindSession. Mirrors the shape of virtual-llm-service.test.ts but + * focused on the agent-side state machine + the Llm→Agent cascade. + */ + +const NOW = new Date(); + +function makeAgent(overrides: Partial = {}): Agent { + return { + id: `agent-${Math.random().toString(36).slice(2, 8)}`, + name: 'fake-agent', + description: '', + systemPrompt: '', + llmId: 'llm-1', + projectId: null, + defaultPersonalityId: null, + proxyModelName: null, + defaultParams: {} as Agent['defaultParams'], + extras: {} as Agent['extras'], + kind: 'virtual', + providerSessionId: 'sess-1', + lastHeartbeatAt: NOW, + status: 'active', + inactiveSince: null, + ownerId: 'owner-1', + version: 1, + createdAt: NOW, + updatedAt: NOW, + ...overrides, + }; +} + +function makeLlm(overrides: Partial = {}): Llm { + return { + id: `llm-${Math.random().toString(36).slice(2, 8)}`, + name: 'vllm-local', + type: 'openai', + model: 'm', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {} as Llm['extraConfig'], + kind: 'virtual', + providerSessionId: 'sess-1', + lastHeartbeatAt: NOW, + status: 'active', + inactiveSince: null, + version: 1, + createdAt: NOW, + updatedAt: NOW, + ...overrides, + }; +} + +function mockAgentRepo(initial: Agent[] = []): IAgentRepository { + const rows = new Map(initial.map((r) => [r.id, r])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const r of rows.values()) if (r.name === name) return r; + return null; + }), + findByProjectId: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((r) => r.providerSessionId === sid)), + findByLlmId: vi.fn(async (llmId: string) => + [...rows.values()].filter((r) => r.llmId === llmId)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'active' + && r.lastHeartbeatAt !== null + && r.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'inactive' + && r.inactiveSince !== null + && r.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeAgent({ + id: `agent-${String(counter)}`, + name: data.name, + description: data.description ?? '', + systemPrompt: data.systemPrompt ?? '', + llmId: data.llmId, + projectId: data.projectId ?? null, + kind: data.kind ?? 'public', + providerSessionId: data.providerSessionId ?? null, + status: data.status ?? 'active', + lastHeartbeatAt: data.lastHeartbeatAt ?? null, + inactiveSince: data.inactiveSince ?? null, + ownerId: data.ownerId, + }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Agent = { + ...existing, + ...(data.description !== undefined ? { description: data.description } : {}), + ...(data.systemPrompt !== undefined ? { systemPrompt: data.systemPrompt } : {}), + ...(data.llmId !== undefined ? { llmId: data.llmId } : {}), + ...(data.projectId !== undefined ? { projectId: data.projectId } : {}), + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), + }; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; +} + +function mockLlms(): LlmService { + return { + getById: vi.fn(async (id: string) => ({ id, name: 'vllm-local', type: 'openai', model: 'm', kind: 'virtual', status: 'active' })), + getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'm', kind: 'virtual', status: 'active' })), + } as unknown as LlmService; +} + +function mockProjects(): ProjectService { + return { + getById: vi.fn(async (id: string) => ({ id, name: 'mcpctl-dev' })), + resolveAndGet: vi.fn(async (idOrName: string) => ({ + id: idOrName === 'mcpctl-dev' ? 'proj-1' : 'proj-other', + name: idOrName, + })), + } as unknown as ProjectService; +} + +describe('AgentService — virtual-agent lifecycle (v3 Stage 2)', () => { + it('registerVirtualAgents inserts new rows with kind=virtual / status=active', async () => { + const repo = mockAgentRepo(); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const inputs: VirtualAgentInput[] = [ + { name: 'local-coder', llmName: 'vllm-local', description: 'd', systemPrompt: 's' }, + ]; + const out = await svc.registerVirtualAgents('sess-1', inputs, 'owner-1'); + expect(out).toHaveLength(1); + expect(out[0]!.kind).toBe('virtual'); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents reuses an existing row from the same session (sticky reconnect)', async () => { + const existing = makeAgent({ name: 'local-coder', providerSessionId: 'sess-1', status: 'inactive', inactiveSince: NOW }); + const repo = mockAgentRepo([existing]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const out = await svc.registerVirtualAgents( + 'sess-1', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + ); + expect(out[0]!.id).toBe(existing.id); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents adopts an inactive virtual from a different session', async () => { + const existing = makeAgent({ + name: 'local-coder', providerSessionId: 'old-session', + status: 'inactive', inactiveSince: NOW, + }); + const repo = mockAgentRepo([existing]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const out = await svc.registerVirtualAgents( + 'new-session', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + ); + expect(out[0]!.id).toBe(existing.id); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents refuses to overwrite a public agent (409)', async () => { + const repo = mockAgentRepo([makeAgent({ name: 'reviewer', kind: 'public', providerSessionId: null })]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await expect(svc.registerVirtualAgents( + 'sess-x', + [{ name: 'reviewer', llmName: 'vllm-local' }], + 'owner-1', + )).rejects.toThrow(/Cannot publish over public Agent/); + }); + + it('registerVirtualAgents refuses if another active session owns the name', async () => { + const repo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'other', status: 'active' })]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await expect(svc.registerVirtualAgents( + 'mine', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + )).rejects.toThrow(/already active under a different session/); + }); + + it('heartbeatVirtualAgents bumps + revives inactive', async () => { + const past = new Date(Date.now() - 5_000); + const a = makeAgent({ name: 'a', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past }); + const repo = mockAgentRepo([a]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await svc.heartbeatVirtualAgents('sess'); + const row = await repo.findByName('a'); + expect(row?.status).toBe('active'); + expect(row?.inactiveSince).toBeNull(); + expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); + + it('markVirtualAgentsInactiveBySession flips owned actives to inactive', async () => { + const repo = mockAgentRepo([ + makeAgent({ name: 'a', providerSessionId: 'sess' }), + makeAgent({ name: 'b', providerSessionId: 'sess' }), + makeAgent({ name: 'c', providerSessionId: 'other' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await svc.markVirtualAgentsInactiveBySession('sess'); + expect((await repo.findByName('a'))?.status).toBe('inactive'); + expect((await repo.findByName('b'))?.status).toBe('inactive'); + expect((await repo.findByName('c'))?.status).toBe('active'); + }); + + it('deleteVirtualAgentsForLlm deletes only virtuals pinned to that Llm', async () => { + const repo = mockAgentRepo([ + makeAgent({ name: 'v-1', llmId: 'doomed', kind: 'virtual' }), + makeAgent({ name: 'v-2', llmId: 'doomed', kind: 'virtual' }), + makeAgent({ name: 'pub-1', llmId: 'doomed', kind: 'public', providerSessionId: null }), + makeAgent({ name: 'v-other', llmId: 'safe', kind: 'virtual' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const deleted = await svc.deleteVirtualAgentsForLlm('doomed'); + expect(deleted).toBe(2); + expect(await repo.findByName('v-1')).toBeNull(); + expect(await repo.findByName('v-2')).toBeNull(); + expect(await repo.findByName('pub-1')).not.toBeNull(); + expect(await repo.findByName('v-other')).not.toBeNull(); + }); + + it('gcSweepVirtualAgents flips heartbeat-stale + deletes 4h-old inactive', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago, past 90s cutoff + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago, past 4h cutoff + const repo = mockAgentRepo([ + makeAgent({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + makeAgent({ name: 'old', providerSessionId: 'b', status: 'inactive', inactiveSince: ancient }), + makeAgent({ name: 'pub', providerSessionId: null, kind: 'public' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const r = await svc.gcSweepVirtualAgents(); + expect(r.markedInactive).toBe(1); + expect(r.deleted).toBe(1); + expect((await repo.findByName('stale'))?.status).toBe('inactive'); + expect(await repo.findByName('old')).toBeNull(); + expect(await repo.findByName('pub')).not.toBeNull(); + }); +}); + +describe('VirtualLlmService cascade through AgentService (v3 Stage 2)', () => { + function mockLlmRepo(initial: Llm[] = []): ILlmRepository { + const rows = new Map(initial.map((r) => [r.id, r])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const r of rows.values()) if (r.name === name) return r; + return null; + }), + findByTier: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((r) => r.providerSessionId === sid)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'active' + && r.lastHeartbeatAt !== null + && r.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'inactive' + && r.inactiveSince !== null + && r.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeLlm({ id: `llm-${String(counter)}`, name: data.name, type: data.type }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Llm = { ...existing, ...data } as Llm; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; + } + + it('unbindSession cascades to mark virtual agents inactive', async () => { + const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'local-coder', providerSessionId: 'sess' }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.unbindSession('sess'); + expect((await agentRepo.findByName('local-coder'))?.status).toBe('inactive'); + }); + + it('gcSweep deletes virtual agents BEFORE their pinned virtual Llm', async () => { + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); + const llmRepo = mockLlmRepo([makeLlm({ + id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess', + status: 'inactive', inactiveSince: ancient, + })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: ancient }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + const r = await svc.gcSweep(); + expect(r.deleted).toBeGreaterThanOrEqual(2); // 1 agent + 1 llm + expect(await llmRepo.findByName('vllm-local')).toBeNull(); + expect(await agentRepo.findByName('pinned')).toBeNull(); + }); + + it('gcSweep defensive cascade: still drops the agent when its heartbeat lagged the Llm', async () => { + // The Llm is past the 4h cutoff. The agent is inactive but only + // 1h old — wouldn't be GC'd by gcSweepVirtualAgents on its own. + // The defensive cascade in gcSweep deletes it anyway because the + // Restrict FK would otherwise block the Llm delete. + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); + const recent = new Date(Date.now() - 1 * 60 * 60 * 1000); + const llmRepo = mockLlmRepo([makeLlm({ + id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess', + status: 'inactive', inactiveSince: ancient, + })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: recent }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.gcSweep(); + expect(await llmRepo.findByName('vllm-local')).toBeNull(); + expect(await agentRepo.findByName('pinned')).toBeNull(); + }); + + it('heartbeat cascades to bump owned virtual agents', async () => { + const past = new Date(Date.now() - 10_000); + const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', lastHeartbeatAt: past })]); + const agentRepo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'sess', lastHeartbeatAt: past })]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.heartbeat('sess'); + const a = await agentRepo.findByName('local-coder'); + expect(a!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); +}); diff --git a/src/mcplocal/src/http/config.ts b/src/mcplocal/src/http/config.ts index 1d4a9c3..9e9cd5e 100644 --- a/src/mcplocal/src/http/config.ts +++ b/src/mcplocal/src/http/config.ts @@ -108,8 +108,27 @@ interface LlmMultiFileConfig { providers: LlmProviderFileEntry[]; } +/** + * Local agent declaration (v3). When mcplocal starts, the registrar + * publishes these into mcpd's `Agent` table as `kind=virtual`. They show + * up under `mcpctl get agent` and become chat-able via `mcpctl chat `. + * + * `llm` references a published provider's name from the `llm.providers` + * array — the registrar resolves it server-side. + */ +export interface AgentFileEntry { + name: string; + llm: string; + description?: string; + systemPrompt?: string; + project?: string; + defaultParams?: Record; + extras?: Record; +} + interface McpctlConfig { llm?: LlmFileConfig | LlmMultiFileConfig; + agents?: AgentFileEntry[]; projects?: Record; } @@ -190,6 +209,15 @@ export function loadProjectLlmOverride(projectName: string): ProjectLlmOverride return config.projects?.[projectName]?.llm; } +/** + * Load locally-declared agents from ~/.mcpctl/config.json (v3 virtual + * agents). Returns empty array if no agents block is configured. + */ +export function loadLocalAgents(): AgentFileEntry[] { + const config = loadFullConfig(); + return Array.isArray(config.agents) ? config.agents : []; +} + /** Reset cached config (for testing). */ export function resetConfigCache(): void { cachedConfig = null; diff --git a/src/mcplocal/src/main.ts b/src/mcplocal/src/main.ts index 670ba04..9300a20 100644 --- a/src/mcplocal/src/main.ts +++ b/src/mcplocal/src/main.ts @@ -7,12 +7,12 @@ import { StdioProxyServer } from './server.js'; import { StdioUpstream } from './upstream/stdio.js'; import { HttpUpstream } from './upstream/http.js'; import { createHttpServer } from './http/server.js'; -import { loadHttpConfig, loadLlmProviders } from './http/config.js'; -import type { HttpConfig, LlmProviderFileEntry } from './http/config.js'; +import { loadHttpConfig, loadLlmProviders, loadLocalAgents } from './http/config.js'; +import type { HttpConfig, LlmProviderFileEntry, AgentFileEntry } from './http/config.js'; import { createProvidersFromConfig } from './llm-config.js'; import { createSecretStore } from '@mcpctl/shared'; import type { ProviderRegistry } from './providers/registry.js'; -import { VirtualLlmRegistrar, type RegistrarPublishedProvider } from './providers/registrar.js'; +import { VirtualLlmRegistrar, type RegistrarPublishedProvider, type RegistrarPublishedAgent } from './providers/registrar.js'; import { startWatchers, stopWatchers, reloadStages } from './proxymodel/watcher.js'; import { existsSync, readFileSync as readFileSyncNs } from 'node:fs'; import { homedir } from 'node:os'; @@ -151,7 +151,8 @@ export async function main(argv: string[] = process.argv): Promise { // Virtual-LLM registrar: publish opted-in providers (`publish: true`) // into mcpd's Llm registry. Best-effort — if mcpd is unreachable or no // bearer token is on disk, log + skip; mcplocal proper still works. - const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries); + const localAgents = loadLocalAgents(); + const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries, localAgents); // Graceful shutdown let shuttingDown = false; @@ -198,9 +199,10 @@ if (isMain) { async function maybeStartVirtualLlmRegistrar( providerRegistry: ProviderRegistry, llmEntries: LlmProviderFileEntry[], + localAgents: AgentFileEntry[] = [], ): Promise { const opted = llmEntries.filter((e) => e.publish === true); - if (opted.length === 0) return null; + if (opted.length === 0 && localAgents.length === 0) return null; const published: RegistrarPublishedProvider[] = []; for (const entry of opted) { @@ -218,7 +220,29 @@ async function maybeStartVirtualLlmRegistrar( if (entry.wake !== undefined) item.wake = entry.wake; published.push(item); } - if (published.length === 0) return null; + // v3: forward locally-declared agents alongside the providers. We + // only forward agents whose `llm` field points at a name we're + // actually publishing (or pre-declared). Stale entries are dropped + // with a warning rather than failing the whole registration. + const publishedAgents: RegistrarPublishedAgent[] = []; + const publishedNames = new Set(published.map((p) => p.provider.name)); + for (const a of localAgents) { + if (!publishedNames.has(a.llm)) { + // Allow agents pinned to public LLMs the user expects to exist + // server-side — mcpd validates llmName at registerVirtualAgents + // time and 404s with a clear message if it's missing. + // We don't drop these client-side; just note it. + } + const item: RegistrarPublishedAgent = { name: a.name, llmName: a.llm }; + if (a.description !== undefined) item.description = a.description; + if (a.systemPrompt !== undefined) item.systemPrompt = a.systemPrompt; + if (a.project !== undefined) item.project = a.project; + if (a.defaultParams !== undefined) item.defaultParams = a.defaultParams; + if (a.extras !== undefined) item.extras = a.extras; + publishedAgents.push(item); + } + + if (published.length === 0 && publishedAgents.length === 0) return null; // Resolve mcpd URL + bearer. Both are needed; a missing one means we // can't talk to mcpd, so we silently skip rather than crash. @@ -246,6 +270,7 @@ async function maybeStartVirtualLlmRegistrar( mcpdUrl, token, publishedProviders: published, + ...(publishedAgents.length > 0 ? { publishedAgents } : {}), sessionFilePath: join(homedir(), '.mcpctl', 'provider-session'), log: { info: (msg) => process.stderr.write(`${msg}\n`), diff --git a/src/mcplocal/src/providers/registrar.ts b/src/mcplocal/src/providers/registrar.ts index e09392c..cd7e46d 100644 --- a/src/mcplocal/src/providers/registrar.ts +++ b/src/mcplocal/src/providers/registrar.ts @@ -56,10 +56,28 @@ export interface RegistrarPublishedProvider { wake?: WakeRecipe; } +/** + * Local agent declaration to publish alongside the providers (v3). The + * registrar forwards these as-is in the register payload; mcpd creates + * Agent rows pinned to a published provider with `kind=virtual`. + */ +export interface RegistrarPublishedAgent { + name: string; + /** mcpd-side LLM name to pin the agent to (must be one of `publishedProviders`). */ + llmName: string; + description?: string; + systemPrompt?: string; + project?: string; + defaultParams?: Record; + extras?: Record; +} + export interface RegistrarOptions { mcpdUrl: string; token: string; publishedProviders: RegistrarPublishedProvider[]; + /** Optional v3 — local agents to publish alongside the providers. */ + publishedAgents?: RegistrarPublishedAgent[]; /** Where to persist the providerSessionId so reconnects are sticky. */ sessionFilePath: string; log: RegistrarLogger; @@ -172,6 +190,20 @@ export class VirtualLlmRegistrar { })); const body: Record = { providers }; if (this.sessionId !== null) body['providerSessionId'] = this.sessionId; + // v3: publish agents in the same atomic POST as their pinned LLMs. + // Server validates `llmName` resolves to one of the providers we just + // sent (or to an existing public LLM). + if (this.opts.publishedAgents !== undefined && this.opts.publishedAgents.length > 0) { + body['agents'] = this.opts.publishedAgents.map((a) => ({ + name: a.name, + llmName: a.llmName, + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + ...(a.project !== undefined ? { project: a.project } : {}), + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + })); + } const res = await postJson( this.urlFor('/api/v1/llms/_provider-register'), diff --git a/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts b/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts index d96b03b..e445511 100644 --- a/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts +++ b/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts @@ -17,7 +17,7 @@ import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import http from 'node:http'; import https from 'node:https'; -import { execSync } from 'node:child_process'; +import { spawnSync, execSync } from 'node:child_process'; const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; const LLM_URL = process.env.MCPCTL_SMOKE_LLM_URL; @@ -31,21 +31,37 @@ const AGENT_NAME = `smoke-chat-agent-${SUFFIX}`; interface CliResult { code: number; stdout: string; stderr: string } function run(args: string): CliResult { - try { - const stdout = execSync(`mcpctl --direct ${args}`, { - encoding: 'utf-8', - timeout: 60_000, - stdio: ['ignore', 'pipe', 'pipe'], - }); - return { code: 0, stdout: stdout.trim(), stderr: '' }; - } catch (err) { - const e = err as { status?: number; stdout?: Buffer | string; stderr?: Buffer | string }; - return { - code: e.status ?? 1, - stdout: e.stdout ? (typeof e.stdout === 'string' ? e.stdout : e.stdout.toString('utf-8')) : '', - stderr: e.stderr ? (typeof e.stderr === 'string' ? e.stderr : e.stderr.toString('utf-8')) : '', - }; + // spawnSync (not execSync) — execSync returns only stdout on success and + // discards stderr, which made any `thread:` assertion against a successful + // chat impossible to evaluate. Splitting the args correctly handles the + // few existing call sites that quote-wrap multi-word values like + // `--system-prompt "You are..."`. + const argv = splitArgs(args); + const res = spawnSync('mcpctl', ['--direct', ...argv], { + encoding: 'utf-8', + timeout: 60_000, + }); + return { + code: res.status ?? 1, + stdout: (res.stdout ?? '').trim(), + stderr: (res.stderr ?? '').trim(), + }; +} + +/** + * Tokenize a shell-style argv string with simple double-quote support — just + * enough for the smoke test's call shapes. Not a full POSIX parser; we only + * need to keep `--system-prompt "You are a smoke test..."` together as one + * arg. + */ +function splitArgs(s: string): string[] { + const out: string[] = []; + const re = /"([^"]*)"|(\S+)/g; + let m: RegExpExecArray | null; + while ((m = re.exec(s)) !== null) { + out.push(m[1] !== undefined ? m[1] : (m[2] ?? '')); } + return out; } function healthz(url: string, timeoutMs = 5000): Promise { diff --git a/src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts b/src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts new file mode 100644 index 0000000..2d5726b --- /dev/null +++ b/src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts @@ -0,0 +1,215 @@ +/** + * Smoke tests: v3 virtual agents — register a virtual Llm + a virtual + * Agent through the same `_provider-register` payload, then verify mcpd + * surfaces the agent as kind=virtual / status=active. Mirrors + * virtual-llm.smoke.test.ts's in-process registrar pattern so we don't + * need to mutate ~/.mcpctl/config.json or bounce systemd's mcplocal. + * + * Heartbeat-stale → inactive (90 s) and 4 h auto-deletion are covered by + * the unit suite (mcpd virtual-agent-service.test.ts); waiting > 90 s in + * smoke would balloon the suite duration. + */ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import http from 'node:http'; +import https from 'node:https'; +import { mkdtempSync, rmSync, readFileSync, existsSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, + type RegistrarPublishedAgent, +} from '../../src/providers/registrar.js'; +import type { LlmProvider, CompletionResult } from '../../src/providers/types.js'; + +const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; +const SUFFIX = Date.now().toString(36); +const PROVIDER_NAME = `smoke-vagent-llm-${SUFFIX}`; +const AGENT_NAME = `smoke-vagent-${SUFFIX}`; + +function makeFakeProvider(name: string, content: string): LlmProvider { + return { + name, + async complete(): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +function healthz(url: string, timeoutMs = 5000): Promise { + return new Promise((resolve) => { + const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`); + const driver = parsed.protocol === 'https:' ? https : http; + const req = driver.get({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname, + timeout: timeoutMs, + }, (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); }); + req.on('error', () => resolve(false)); + req.on('timeout', () => { req.destroy(); resolve(false); }); + }); +} + +function readToken(): string | null { + try { + const path = join(process.env.HOME ?? '', '.mcpctl', 'credentials'); + if (!existsSync(path)) return null; + const parsed = JSON.parse(readFileSync(path, 'utf-8')) as { token?: string }; + return parsed.token ?? null; + } catch { + return null; + } +} + +interface HttpResponse { status: number; body: string } + +function httpRequest(method: string, urlStr: string, body: unknown): Promise { + return new Promise((resolve, reject) => { + const tokenRaw = readToken(); + const parsed = new URL(urlStr); + const driver = parsed.protocol === 'https:' ? https : http; + const headers: Record = { + Accept: 'application/json', + ...(body !== undefined ? { 'Content-Type': 'application/json' } : {}), + ...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}), + }; + const req = driver.request({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname + parsed.search, + method, + headers, + timeout: 30_000, + }, (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => { + resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') }); + }); + }); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); }); + if (body !== undefined) req.write(JSON.stringify(body)); + req.end(); + }); +} + +interface AgentRow { id: string; name: string; kind?: string; status?: string; llm?: { name: string }; description?: string } + +let mcpdUp = false; +let registrar: VirtualLlmRegistrar | null = null; +let tempDir: string; + +describe('virtual-agent smoke (v3)', () => { + beforeAll(async () => { + mcpdUp = await healthz(MCPD_URL); + if (!mcpdUp) { + // eslint-disable-next-line no-console + console.warn(`\n ○ virtual-agent smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`); + return; + } + if (readToken() === null) { + mcpdUp = false; + // eslint-disable-next-line no-console + console.warn('\n ○ virtual-agent smoke: skipped — no ~/.mcpctl/credentials.\n'); + return; + } + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-virtual-agent-smoke-')); + }, 20_000); + + afterAll(async () => { + if (registrar !== null) registrar.stop(); + if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true }); + // Defensive cleanup: agent first (Llm.id has Restrict FK), then Llm. + if (mcpdUp) { + const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + if (agents.status === 200) { + const rows = JSON.parse(agents.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === AGENT_NAME); + if (row !== undefined) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/agents/${row.id}`, undefined); + } + } + const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + if (llms.status === 200) { + const rows = JSON.parse(llms.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === PROVIDER_NAME); + if (row !== undefined) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined); + } + } + } + }); + + it('registrar publishes provider + agent in one round-trip and mcpd lists the agent kind=virtual / status=active', async () => { + if (!mcpdUp) return; + const token = readToken(); + if (token === null) return; + + const published: RegistrarPublishedProvider[] = [ + { provider: makeFakeProvider(PROVIDER_NAME, 'hi from virtual agent'), type: 'openai', model: 'fake-vagent', tier: 'fast' }, + ]; + const publishedAgents: RegistrarPublishedAgent[] = [ + { + name: AGENT_NAME, + llmName: PROVIDER_NAME, + description: 'v3 virtual agent smoke', + systemPrompt: 'You are a smoke test. Reply READY.', + defaultParams: { temperature: 0 }, + }, + ]; + registrar = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: published, + publishedAgents, + sessionFilePath: join(tempDir, 'session'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + expect(registrar.getSessionId()).not.toBeNull(); + // Give the SSE handshake + atomic register a moment to settle. + await new Promise((r) => setTimeout(r, 400)); + + const res = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + expect(res.status).toBe(200); + const rows = JSON.parse(res.body) as AgentRow[]; + const row = rows.find((r) => r.name === AGENT_NAME); + expect(row, `${AGENT_NAME} must be present`).toBeDefined(); + expect(row!.kind).toBe('virtual'); + expect(row!.status).toBe('active'); + expect(row!.llm?.name).toBe(PROVIDER_NAME); + expect(row!.description).toBe('v3 virtual agent smoke'); + }, 30_000); + + it('publisher disconnect flips the agent to status=inactive (paired with its Llm)', async () => { + if (!mcpdUp) return; + if (registrar !== null) { + registrar.stop(); + registrar = null; + } + // unbindSession runs synchronously on the SSE close handler; mcpd + // flips both the Llm and any agents owned by the session to + // inactive. A short wait covers the request round-trip. + await new Promise((r) => setTimeout(r, 400)); + + const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + expect(agents.status).toBe(200); + const agentRow = (JSON.parse(agents.body) as AgentRow[]).find((r) => r.name === AGENT_NAME); + expect(agentRow, `${AGENT_NAME} must still exist (deletion is GC-driven, not disconnect-driven)`).toBeDefined(); + expect(agentRow!.status).toBe('inactive'); + + const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + const llmRow = (JSON.parse(llms.body) as Array<{ name: string; status: string }>).find((r) => r.name === PROVIDER_NAME); + expect(llmRow!.status).toBe('inactive'); + }, 30_000); +});