From 9afd24a3aaf51f34e922937e5c4a3da9dda334f8 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 16:55:02 +0100 Subject: [PATCH 1/5] feat(db+mcpd): Agent lifecycle + chat.service kind=virtual branch (v3 Stage 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two pieces of v3 plumbing — schema + the latent v1 chat.service bug. Schema (db): - Agent gains kind/providerSessionId/lastHeartbeatAt/status/inactiveSince mirroring Llm's v1 lifecycle. Reuses LlmKind / LlmStatus enums; no new types. Existing rows backfill kind=public/status=active so v1 CRUD is unaffected. - @@index([kind, status]) for the GC sweep, @@index([providerSessionId]) for disconnect-cascade lookups. - 4 new prisma-level tests cover defaults, persisting virtual fields, the (kind, status) GC index, and providerSessionId lookups. Total agent-schema tests: 20/20. chat.service (mcpd) — fixes the v1 latent bug: - LlmView's kind is now plumbed through prepareContext as ctx.llmKind. - Two new private helpers, runOneInference / streamInference, branch on ctx.llmKind: 'public' goes through the existing adapter registry, 'virtual' relays through VirtualLlmService.enqueueInferTask (mirrors the route-handler branch from v1 Stage 3). - Streaming bridges VirtualLlmService's onChunk callback API to an async iterator via a small queue + wake pattern. - ChatService gains an optional virtualLlms constructor parameter; main.ts wires it in. Older test wirings without it raise a clear "virtualLlms dispatcher not wired" error when the row is virtual, rather than silently falling through to the public path against an empty URL. This unblocks any Agent (public OR future v3-virtual) pinned to a kind=virtual Llm. Pre-this-stage, those agents 502'd against the empty url field. Tests: 4 new chat-service-virtual-llm.test.ts cover the relay path non-streaming, streaming, missing-dispatcher error, and rejection surfacing. mcpd suite: 841/841 (was 833, +8 across stages 1+v3-Stage-1). Workspace: 2054/2054 across 153 files. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migration.sql | 14 + src/db/prisma/schema.prisma | 26 +- src/db/tests/agent-schema.test.ts | 72 +++++ src/mcpd/src/main.ts | 1 + src/mcpd/src/services/chat.service.ts | 146 ++++++++-- .../tests/chat-service-virtual-llm.test.ts | 251 ++++++++++++++++++ src/mcpd/tests/chat-service.test.ts | 6 +- 7 files changed, 490 insertions(+), 26 deletions(-) create mode 100644 src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql create mode 100644 src/mcpd/tests/chat-service-virtual-llm.test.ts diff --git a/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql b/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql new file mode 100644 index 0000000..5cec191 --- /dev/null +++ b/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql @@ -0,0 +1,14 @@ +-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the +-- existing LlmKind / LlmStatus enums so we don't double-define them. +-- Existing rows backfill with kind='public' / status='active' so +-- nothing changes for manually-created agents. + +ALTER TABLE "Agent" + ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public', + ADD COLUMN "providerSessionId" TEXT, + ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3), + ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active', + ADD COLUMN "inactiveSince" TIMESTAMP(3); + +CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status"); +CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId"); diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index dfc83e5..8b64ad8 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -469,20 +469,26 @@ model BackupPending { // Per-call LiteLLM-style overrides stack on top of `defaultParams`. model Agent { - id String @id @default(cuid()) - name String @unique - description String @default("") // shown in MCP tools/list - systemPrompt String @default("") @db.Text // agent persona + id String @id @default(cuid()) + name String @unique + description String @default("") // shown in MCP tools/list + systemPrompt String @default("") @db.Text // agent persona llmId String projectId String? defaultPersonalityId String? // applied at chat time when no --personality flag proxyModelName String? // optional informational override - defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ... - extras Json @default("{}") // future LoRA / tool-allowlist + defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ... + extras Json @default("{}") // future LoRA / tool-allowlist + // ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ── + kind LlmKind @default(public) + providerSessionId String? // mcplocal session that owns this row when virtual + lastHeartbeatAt DateTime? + status LlmStatus @default(active) + inactiveSince DateTime? ownerId String - version Int @default(1) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + version Int @default(1) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt llm Llm @relation(fields: [llmId], references: [id], onDelete: Restrict) project Project? @relation(fields: [projectId], references: [id], onDelete: SetNull) @@ -497,6 +503,8 @@ model Agent { @@index([projectId]) @@index([ownerId]) @@index([defaultPersonalityId]) + @@index([kind, status]) + @@index([providerSessionId]) } // ── Personalities (named overlay bundles of prompts on top of an Agent) ── diff --git a/src/db/tests/agent-schema.test.ts b/src/db/tests/agent-schema.test.ts index 4fa3b0c..4c56dfb 100644 --- a/src/db/tests/agent-schema.test.ts +++ b/src/db/tests/agent-schema.test.ts @@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => { expect(reloaded?.defaultPersonalityId).toBeNull(); }); + // ── v3: Agent.kind virtual + lifecycle fields ── + + it('defaults a freshly inserted Agent to kind=public, status=active', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-default-kind'); + const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id }); + expect(agent.kind).toBe('public'); + expect(agent.status).toBe('active'); + expect(agent.providerSessionId).toBeNull(); + expect(agent.lastHeartbeatAt).toBeNull(); + expect(agent.inactiveSince).toBeNull(); + }); + + it('persists kind=virtual + lifecycle fields together', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-pub-virtual'); + const now = new Date(); + const agent = await prisma.agent.create({ + data: { + name: 'local-coder', + llmId: llm.id, + ownerId: user.id, + kind: 'virtual', + providerSessionId: 'sess-abc', + lastHeartbeatAt: now, + status: 'active', + }, + }); + expect(agent.kind).toBe('virtual'); + expect(agent.providerSessionId).toBe('sess-abc'); + expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime()); + }); + + it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-gc-agent'); + await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } }); + await prisma.agent.create({ + data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' }, + }); + await prisma.agent.create({ + data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() }, + }); + + const stale = await prisma.agent.findMany({ + where: { kind: 'virtual', status: 'inactive' }, + select: { name: true }, + }); + expect(stale.map((a) => a.name)).toEqual(['v-inactive']); + }); + + it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-sess-cascade'); + await prisma.agent.create({ + data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.agent.create({ + data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.agent.create({ + data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' }, + }); + + const owned = await prisma.agent.findMany({ + where: { providerSessionId: 'shared' }, + select: { name: true }, + orderBy: { name: 'asc' }, + }); + expect(owned.map((a) => a.name)).toEqual(['a', 'b']); + }); + it('binds the same prompt to multiple personalities of an agent', async () => { const user = await makeUser(); const llm = await makeLlm('llm-shared-prompt'); diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 03ce6fd..9814b74 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -607,6 +607,7 @@ async function main(): Promise { promptRepo, chatToolDispatcher, personalityRepo, + virtualLlmService, ); registerAgentChatRoutes(app, chatService); registerLlmInferRoutes(app, { diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index fc375b7..087ea11 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -31,6 +31,7 @@ import type { } from '../repositories/chat.repository.js'; import type { IPromptRepository } from '../repositories/prompt.repository.js'; import type { IPersonalityRepository } from '../repositories/personality.repository.js'; +import type { IVirtualLlmService } from './virtual-llm.service.js'; import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js'; import type { AgentChatParams } from '../validation/agent.schema.js'; import { NotFoundError } from './mcp-server.service.js'; @@ -132,6 +133,14 @@ export class ChatService { private readonly promptRepo: IPromptRepository, private readonly tools: ChatToolDispatcher, private readonly personalities?: IPersonalityRepository, + /** + * v3: when an Agent is pinned to a `kind=virtual` Llm, inference is + * relayed via this service rather than an HTTP adapter (the virtual + * row has no upstream URL). Optional so older test wirings still + * compile; in those tests the chat path will refuse virtual Llms + * with a clear error. + */ + private readonly virtualLlms?: IVirtualLlmService, ) {} async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> { @@ -170,14 +179,7 @@ export class ChatService { let lastTurnIndex = ctx.startingTurnIndex; try { for (let i = 0; i < ctx.maxIterations; i += 1) { - const adapter = this.adapters.get(ctx.llmType); - const result = await adapter.infer({ - body: this.buildBody(ctx), - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, - }); + const result = await this.runOneInference(ctx); const choice = extractChoice(result.body); if (choice === null) { throw new Error(`Adapter returned no choice (status ${String(result.status)})`); @@ -240,19 +242,12 @@ export class ChatService { const ctx = await this.prepareContext(args); try { for (let i = 0; i < ctx.maxIterations; i += 1) { - const adapter = this.adapters.get(ctx.llmType); const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = { content: '', toolCalls: [], }; let finishReason: string | null = null; - for await (const chunk of adapter.stream({ - body: { ...this.buildBody(ctx), stream: true }, - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, - })) { + for await (const chunk of this.streamInference(ctx)) { if (chunk.done === true) break; if (chunk.data === '[DONE]') break; const evt = parseStreamingChunk(chunk.data); @@ -347,12 +342,130 @@ export class ChatService { } } + /** + * Streaming counterpart of runOneInference. Yields raw OpenAI-style + * SSE chunks ({data: string; done?: boolean}) regardless of whether + * we're hitting a public adapter or relaying through VirtualLlmService. + * The caller's `parseStreamingChunk` already speaks OpenAI shape, so + * downstream code doesn't need to know which path produced the chunks. + */ + private async *streamInference(ctx: { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }): AsyncGenerator<{ data: string; done?: boolean }> { + if (ctx.llmKind !== 'virtual') { + const adapter = this.adapters.get(ctx.llmType); + yield* adapter.stream({ + body: { ...this.buildBody(ctx), stream: true }, + modelOverride: ctx.modelOverride, + apiKey: ctx.apiKey, + url: ctx.url, + extraConfig: ctx.extraConfig, + }); + return; + } + if (this.virtualLlms === undefined) { + throw new Error( + 'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm', + ); + } + // Bridge VirtualLlmService's onChunk callback API to an async + // iterator. Chunks land on the queue from the SSE relay; the + // generator drains them in order. ref.done resolves when the + // publisher emits its `[DONE]` marker. + const ref = await this.virtualLlms.enqueueInferTask( + ctx.llmName, + { ...this.buildBody(ctx), stream: true }, + true, + ); + const queue: Array<{ data: string; done?: boolean }> = []; + let resolveTick: (() => void) | null = null; + const wake = (): void => { + const r = resolveTick; + resolveTick = null; + if (r !== null) r(); + }; + const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); }); + let finished = false; + let failure: Error | null = null; + ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); }); + + try { + while (true) { + while (queue.length > 0) { + const c = queue.shift()!; + yield c; + if (c.done === true) return; + } + if (finished) { + if (failure !== null) throw failure; + return; + } + await new Promise((r) => { resolveTick = r; }); + } + } finally { + unsubscribe(); + } + } + + /** + * Run a single non-streaming inference iteration. Branches on + * ctx.llmKind: public goes through the existing adapter registry, + * virtual relays through VirtualLlmService.enqueueInferTask (mirrors + * the same branch in `routes/llm-infer.ts` from v1 Stage 3). + * + * Throws when virtualLlms isn't wired but the row is virtual — older + * test wirings hit this path. + */ + private async runOneInference(ctx: { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }): Promise<{ status: number; body: unknown }> { + if (ctx.llmKind === 'virtual') { + if (this.virtualLlms === undefined) { + throw new Error( + 'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm', + ); + } + const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false); + return ref.done; + } + const adapter = this.adapters.get(ctx.llmType); + return adapter.infer({ + body: this.buildBody(ctx), + modelOverride: ctx.modelOverride, + apiKey: ctx.apiKey, + url: ctx.url, + extraConfig: ctx.extraConfig, + }); + } + private async prepareContext(args: ChatRequestArgs): Promise<{ threadId: string; history: OpenAiMessage[]; systemBlock: string; llmName: string; llmType: string; + /** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */ + llmKind: 'public' | 'virtual'; modelOverride: string; url: string; apiKey: string; @@ -435,6 +548,7 @@ export class ChatService { systemBlock, llmName: llm.name, llmType: llm.type, + llmKind: llm.kind, modelOverride: llm.model, url: llm.url, apiKey, diff --git a/src/mcpd/tests/chat-service-virtual-llm.test.ts b/src/mcpd/tests/chat-service-virtual-llm.test.ts new file mode 100644 index 0000000..44f44d2 --- /dev/null +++ b/src/mcpd/tests/chat-service-virtual-llm.test.ts @@ -0,0 +1,251 @@ +import { describe, it, expect, vi } from 'vitest'; +import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js'; +import type { AgentService } from '../src/services/agent.service.js'; +import type { LlmService } from '../src/services/llm.service.js'; +import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js'; +import type { IChatRepository } from '../src/repositories/chat.repository.js'; +import type { IPromptRepository } from '../src/repositories/prompt.repository.js'; +import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js'; +import type { ChatMessage, ChatThread, Prompt } from '@prisma/client'; + +const NOW = new Date(); + +/** + * Tests targeting v3 Stage 1's chat.service kind=virtual branch. + * Mirror the existing chat-service.test.ts patterns but isolate the + * adapter-vs-relay dispatch decision. + */ + +function mockChatRepo(): IChatRepository { + const msgs: ChatMessage[] = []; + const threads: ChatThread[] = []; + let idCounter = 1; + return { + createThread: vi.fn(async ({ agentId, ownerId, title }) => { + const t: ChatThread = { + id: `thread-${String(idCounter++)}`, agentId, ownerId, + title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW, + }; + threads.push(t); + return t; + }), + findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null), + listThreadsByAgent: vi.fn(async () => []), + listMessages: vi.fn(async () => []), + appendMessage: vi.fn(async (input) => { + const m: ChatMessage = { + id: `msg-${String(idCounter++)}`, + threadId: input.threadId, + turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length, + role: input.role, + content: input.content, + toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'], + toolCallId: input.toolCallId ?? null, + status: input.status ?? 'complete', + createdAt: NOW, + }; + msgs.push(m); + return m; + }), + updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)), + markPendingAsError: vi.fn(async () => 0), + touchThread: vi.fn(async () => undefined), + nextTurnIndex: vi.fn(async () => msgs.length), + }; +} + +function mockAgents(): AgentService { + return { + getByName: vi.fn(async (name: string) => ({ + id: `agent-${name}`, name, description: '', + systemPrompt: 'You are a helpful agent.', + llm: { id: 'llm-1', name: 'vllm-local' }, + project: null, + defaultPersonality: null, + proxyModelName: null, + defaultParams: {}, + extras: {}, + ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW, + })), + } as unknown as AgentService; +} + +function mockLlmsVirtual(): LlmService { + return { + getByName: vi.fn(async (name: string) => ({ + id: 'llm-1', name, type: 'openai', model: 'fake', + url: '', tier: 'fast', description: '', + apiKeyRef: null, extraConfig: {}, + kind: 'virtual', + status: 'active', + lastHeartbeatAt: NOW, + inactiveSince: null, + version: 1, createdAt: NOW, updatedAt: NOW, + })), + resolveApiKey: vi.fn(async () => ''), + } as unknown as LlmService; +} + +function mockPromptRepo(): IPromptRepository { + return { + findAll: vi.fn(async () => []), + findGlobal: vi.fn(async () => []), + findByAgent: vi.fn(async () => []), + findById: vi.fn(async () => null), + findByNameAndProject: vi.fn(async () => null), + findByNameAndAgent: vi.fn(async () => null), + create: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + }; +} + +function mockTools(): ChatToolDispatcher { + return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) }; +} + +function emptyAdapterRegistry(): LlmAdapterRegistry { + return { + get: () => { throw new Error('adapter should not be used for kind=virtual'); }, + } as unknown as LlmAdapterRegistry; +} + +function mockVirtualLlms(opts: { + reply?: string; + rejectWith?: Error; + streamingChunks?: string[]; +}): IVirtualLlmService { + const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => { + if (opts.rejectWith !== undefined) { + return { + taskId: 't-1', + done: Promise.reject(opts.rejectWith), + onChunk: () => () => undefined, + }; + } + if (!streaming) { + const body = { + choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }], + }; + return { + taskId: 't-1', + done: Promise.resolve({ status: 200, body }), + onChunk: () => () => undefined, + }; + } + // Streaming path: collect subscribers, push the configured chunks + // synchronously, then resolve done. + const subs = new Set<(c: { data: string; done?: boolean }) => void>(); + const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}']; + return { + taskId: 't-1', + done: (async (): Promise<{ status: number; body: unknown }> => { + // Wait long enough for the caller to register subscribers + // before fanning chunks. Promise.resolve() isn't enough — the + // microtask running this IIFE is queued ahead of the caller's + // await on enqueueInferTask, so subs would still be empty. + await new Promise((r) => setTimeout(r, 0)); + for (const c of chunks) for (const s of subs) s({ data: c }); + for (const s of subs) s({ data: '[DONE]', done: true }); + return { status: 200, body: null }; + })(), + onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); }, + }; + }); + return { + register: vi.fn(), + heartbeat: vi.fn(), + bindSession: vi.fn(), + unbindSession: vi.fn(), + enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'], + completeTask: vi.fn(), + pushTaskChunk: vi.fn(), + failTask: vi.fn(), + gcSweep: vi.fn(), + }; +} + +describe('ChatService — kind=virtual branch (v3 Stage 1)', () => { + it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => { + const chatRepo = mockChatRepo(); + const virtual = mockVirtualLlms({ reply: 'hello back from local' }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + chatRepo, + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('hello back from local'); + expect(virtual.enqueueInferTask).toHaveBeenCalledWith( + 'vllm-local', + expect.objectContaining({ messages: expect.any(Array) }), + false, + ); + }); + + it('streaming relays through VirtualLlmService and emits the same text deltas', async () => { + const chatRepo = mockChatRepo(); + const virtual = mockVirtualLlms({ + streamingChunks: [ + '{"choices":[{"delta":{"content":"hello "}}]}', + '{"choices":[{"delta":{"content":"world"}}]}', + ], + }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + chatRepo, + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + const deltas: string[] = []; + for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) { + if (evt.type === 'text') deltas.push(evt.delta); + if (evt.type === 'final') break; + } + expect(deltas.join('')).toBe('hello world'); + expect(virtual.enqueueInferTask).toHaveBeenCalledWith( + 'vllm-local', + expect.objectContaining({ messages: expect.any(Array), stream: true }), + true, + ); + }); + + it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => { + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + mockChatRepo(), + mockPromptRepo(), + mockTools(), + // no personalities, no virtualLlms + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/virtualLlms dispatcher not wired/); + }); + + it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => { + const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + mockChatRepo(), + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/publisher offline/); + }); +}); diff --git a/src/mcpd/tests/chat-service.test.ts b/src/mcpd/tests/chat-service.test.ts index fe62c3c..3eb205a 100644 --- a/src/mcpd/tests/chat-service.test.ts +++ b/src/mcpd/tests/chat-service.test.ts @@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } | } as unknown as AgentService; } -function mockLlms(): LlmService { +function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService { return { getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', url: '', tier: 'fast', description: '', apiKeyRef: null, extraConfig: {}, + kind: opts.kind ?? 'public', + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, version: 1, createdAt: NOW, updatedAt: NOW, })), resolveApiKey: vi.fn(async () => 'fake-key'), -- 2.49.1 From c7b1bd8e2c998118c6b38d6466b0ef8e8d3f4a21 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 17:03:59 +0100 Subject: [PATCH 2/5] feat(mcpd): AgentService virtual methods + GC cascade (v3 Stage 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit State machine for kind=virtual Agent rows. Mirrors what VirtualLlmService did for Llms in v1, then wires both lifecycles together so disconnect/heartbeat/GC cascade through both at once. AgentRepository: - create/update accept the new lifecycle fields (kind, providerSessionId, status, lastHeartbeatAt, inactiveSince). - Adds findBySessionId, findByLlmId, findStaleVirtuals, findExpiredInactives. AgentService — new virtual-agent methods: - registerVirtualAgents(sessionId, inputs, ownerId) — sticky upsert. New names insert as kind=virtual/status=active. Existing virtuals owned by the same session reactivate; existing inactive virtuals from a foreign session can be adopted (sticky reconnect). Refuses to overwrite a public agent or a foreign session's still-active virtual (HTTP 409). Pinned LLM is resolved via LlmService — caller posts Llms first. - heartbeatVirtualAgents(sessionId) — bumps owned agents on a session heartbeat; revives inactive rows. - markVirtualAgentsInactiveBySession(sessionId) — disconnect cascade. - deleteVirtualAgentsForLlm(llmId) — defensive cascade for the GC's Llm-delete step (Agent.llmId is Restrict). - gcSweepVirtualAgents() — same shape as VirtualLlmService.gcSweep (90s heartbeat-stale → inactive, 4h inactive → delete). VirtualLlmService: - Optional AgentService dependency. heartbeat() now also bumps owned agents; unbindSession() flips them inactive. gcSweep() runs the agent sweep FIRST (so any agent that would block an Llm delete via Restrict is already gone), and adds a defensive deleteVirtualAgentsForLlm step right before each Llm delete in case an agent's heartbeat lagged its Llm's just enough to escape this round's 4h cutoff. main.ts: - VirtualLlmService construction moves below AgentService so it can receive the cascade dependency. Tests: 13 new in virtual-agent-service.test.ts cover all the register variants (insert, sticky reconnect, adopt-inactive-foreign, refuse public-overwrite, refuse foreign-session-active), heartbeat-revive, disconnect-cascade, deleteVirtualAgentsForLlm scope, GC sweep flip + delete + idempotence, and three VirtualLlmService cascade scenarios (unbindSession, gcSweep deleting agent before Llm, defensive cascade when agent's heartbeat lagged). mcpd suite: 854/854 (was 841 + 13 new). Workspace unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/mcpd/src/main.ts | 10 +- src/mcpd/src/repositories/agent.repository.ts | 66 ++- src/mcpd/src/services/agent.service.ts | 168 ++++++++ src/mcpd/src/services/virtual-llm.service.ts | 39 +- src/mcpd/tests/virtual-agent-service.test.ts | 376 ++++++++++++++++++ 5 files changed, 652 insertions(+), 7 deletions(-) create mode 100644 src/mcpd/tests/virtual-agent-service.test.ts diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 9814b74..cb65a87 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -435,10 +435,8 @@ async function main(): Promise { adapters: llmAdapters, log: { warn: (msg) => app.log.warn(msg) }, }); - // Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker - // is started below after `app.listen` so it doesn't fire before the - // server is accepting traffic. - const virtualLlmService = new VirtualLlmService(llmRepo); + // VirtualLlmService is constructed lower down (after AgentService) so + // it can wire the agent-cascade callbacks introduced in v3 Stage 2. // AgentService + ChatService get fully wired below once projectService and // mcpProxyService are constructed (ChatService needs them via the // ChatToolDispatcher bridge). @@ -465,6 +463,10 @@ async function main(): Promise { const personalityRepo = new PersonalityRepository(prisma); const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo); const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo); + // Virtual-provider state machine (kind=virtual rows for both Llms and + // Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade. + // The 60-s GC ticker is started below after `app.listen`. + const virtualLlmService = new VirtualLlmService(llmRepo, agentService); // ChatService needs the proxy + project repo via the ChatToolDispatcher // bridge. The dispatcher's logger references `app.log`, which is not // constructed until further down — `chatService` itself is built right diff --git a/src/mcpd/src/repositories/agent.repository.ts b/src/mcpd/src/repositories/agent.repository.ts index fb5c597..9f83541 100644 --- a/src/mcpd/src/repositories/agent.repository.ts +++ b/src/mcpd/src/repositories/agent.repository.ts @@ -1,4 +1,4 @@ -import type { PrismaClient, Agent, Prisma } from '@prisma/client'; +import type { PrismaClient, Agent, Prisma, LlmKind, LlmStatus } from '@prisma/client'; export interface CreateAgentRepoInput { name: string; @@ -11,6 +11,12 @@ export interface CreateAgentRepoInput { defaultParams?: Record; extras?: Record; ownerId: string; + // Virtual-agent lifecycle (omit for kind=public). + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface UpdateAgentRepoInput { @@ -22,6 +28,13 @@ export interface UpdateAgentRepoInput { proxyModelName?: string | null; defaultParams?: Record; extras?: Record; + // Virtual-agent lifecycle. AgentService is the only public writer; the + // VirtualAgentService methods (Stage 2) bypass the public CRUD path. + kind?: LlmKind; + providerSessionId?: string | null; + status?: LlmStatus; + lastHeartbeatAt?: Date | null; + inactiveSince?: Date | null; } export interface IAgentRepository { @@ -32,6 +45,11 @@ export interface IAgentRepository { create(data: CreateAgentRepoInput): Promise; update(id: string, data: UpdateAgentRepoInput): Promise; delete(id: string): Promise; + // Virtual-agent lifecycle helpers. + findBySessionId(sessionId: string): Promise; + findByLlmId(llmId: string): Promise; + findStaleVirtuals(heartbeatCutoff: Date): Promise; + findExpiredInactives(deletionCutoff: Date): Promise; } export class AgentRepository implements IAgentRepository { @@ -69,6 +87,11 @@ export class AgentRepository implements IAgentRepository { defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue, extras: (data.extras ?? {}) as Prisma.InputJsonValue, ownerId: data.ownerId, + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), }, }); } @@ -99,6 +122,11 @@ export class AgentRepository implements IAgentRepository { if (data.extras !== undefined) { updateData.extras = data.extras as Prisma.InputJsonValue; } + if (data.kind !== undefined) updateData.kind = data.kind; + if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId; + if (data.status !== undefined) updateData.status = data.status; + if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt; + if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince; // Bump optimistic version on every update. updateData.version = { increment: 1 }; return this.prisma.agent.update({ where: { id }, data: updateData }); @@ -107,4 +135,40 @@ export class AgentRepository implements IAgentRepository { async delete(id: string): Promise { await this.prisma.agent.delete({ where: { id } }); } + + // ── Virtual-agent lifecycle queries ── + + async findBySessionId(sessionId: string): Promise { + return this.prisma.agent.findMany({ + where: { providerSessionId: sessionId }, + orderBy: { name: 'asc' }, + }); + } + + async findByLlmId(llmId: string): Promise { + return this.prisma.agent.findMany({ + where: { llmId }, + orderBy: { name: 'asc' }, + }); + } + + async findStaleVirtuals(heartbeatCutoff: Date): Promise { + return this.prisma.agent.findMany({ + where: { + kind: 'virtual', + status: 'active', + lastHeartbeatAt: { lt: heartbeatCutoff }, + }, + }); + } + + async findExpiredInactives(deletionCutoff: Date): Promise { + return this.prisma.agent.findMany({ + where: { + kind: 'virtual', + status: 'inactive', + inactiveSince: { lt: deletionCutoff }, + }, + }); + } } diff --git a/src/mcpd/src/services/agent.service.ts b/src/mcpd/src/services/agent.service.ts index 0be9e5f..caadcf2 100644 --- a/src/mcpd/src/services/agent.service.ts +++ b/src/mcpd/src/services/agent.service.ts @@ -33,12 +33,28 @@ export interface AgentView { proxyModelName: string | null; defaultParams: AgentChatParams; extras: Record; + // Virtual-agent lifecycle (defaults make public agents look like "active"). + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; + lastHeartbeatAt: Date | null; + inactiveSince: Date | null; ownerId: string; version: number; createdAt: Date; updatedAt: Date; } +/** Input shape mcplocal sends per virtual agent on register. */ +export interface VirtualAgentInput { + name: string; + llmName: string; + description?: string; + systemPrompt?: string; + project?: string; + defaultParams?: Record; + extras?: Record; +} + export class AgentService { constructor( private readonly repo: IAgentRepository, @@ -179,10 +195,162 @@ export class AgentService { proxyModelName: row.proxyModelName, defaultParams: row.defaultParams as AgentChatParams, extras: row.extras as Record, + kind: row.kind, + status: row.status, + lastHeartbeatAt: row.lastHeartbeatAt, + inactiveSince: row.inactiveSince, ownerId: row.ownerId, version: row.version, createdAt: row.createdAt, updatedAt: row.updatedAt, }; } + + // ── Virtual-agent lifecycle (v3) ── + + /** + * Sticky upsert of virtual agents owned by a publishing mcplocal session. + * Mirrors VirtualLlmService.register's semantics: + * - New agents → insert with kind=virtual / status=active. + * - Existing virtual agents owned by the same session → update + reactivate. + * - Existing virtual agents owned by a different session, but currently + * inactive → adopt (sticky reconnect after a session lapse). + * - Existing public agents OR foreign-active virtuals → 409 Conflict. + * - Pinned LLM must already exist (publisher posts Llms first in the same + * register payload). + */ + async registerVirtualAgents( + sessionId: string, + inputs: VirtualAgentInput[], + ownerId: string, + ): Promise { + const now = new Date(); + const out: AgentView[] = []; + for (const a of inputs) { + const llm = await this.llms.getByName(a.llmName); + const projectId = a.project !== undefined + ? (await this.projects.resolveAndGet(a.project)).id + : null; + const existing = await this.repo.findByName(a.name); + if (existing !== null) { + if (existing.kind === 'public') { + throw Object.assign( + new Error(`Cannot publish over public Agent: ${a.name}`), + { statusCode: 409 }, + ); + } + if (existing.providerSessionId !== sessionId && existing.status === 'active') { + throw Object.assign( + new Error(`Virtual Agent '${a.name}' is already active under a different session`), + { statusCode: 409 }, + ); + } + const updated = await this.repo.update(existing.id, { + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + llmId: llm.id, + projectId, + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + inactiveSince: null, + }); + out.push(await this.toView(updated)); + continue; + } + const created = await this.repo.create({ + name: a.name, + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + llmId: llm.id, + projectId, + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + kind: 'virtual', + providerSessionId: sessionId, + status: 'active', + lastHeartbeatAt: now, + ownerId, + }); + out.push(await this.toView(created)); + } + return out; + } + + /** + * Bumps lastHeartbeatAt on every virtual agent owned by the session. + * Revives inactive rows. Called from VirtualLlmService.heartbeat so + * one publisher heartbeat covers both Llms and Agents. + */ + async heartbeatVirtualAgents(sessionId: string): Promise { + const owned = await this.repo.findBySessionId(sessionId); + if (owned.length === 0) return; + const now = new Date(); + for (const row of owned) { + await this.repo.update(row.id, { + lastHeartbeatAt: now, + ...(row.status === 'inactive' ? { status: 'active', inactiveSince: null } : {}), + }); + } + } + + /** Flip every virtual agent owned by the session to inactive immediately. */ + async markVirtualAgentsInactiveBySession(sessionId: string): Promise { + const owned = await this.repo.findBySessionId(sessionId); + const now = new Date(); + for (const row of owned) { + if (row.status === 'active') { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + } + } + } + + /** + * Cascade-delete virtual agents pinned to a virtual Llm. Called from + * VirtualLlmService.gcSweep BEFORE deleting the inactive Llm row, since + * Agent.llmId is `onDelete: Restrict` and would otherwise block the + * Llm delete. + */ + async deleteVirtualAgentsForLlm(llmId: string): Promise { + const pinned = await this.repo.findByLlmId(llmId); + let deleted = 0; + for (const row of pinned) { + if (row.kind !== 'virtual') continue; + await this.repo.delete(row.id); + deleted += 1; + } + return deleted; + } + + /** + * GC sweep for virtual agents — same shape as VirtualLlmService.gcSweep: + * 1. Heartbeat-stale active virtuals → inactive (90-s cutoff). + * 2. 4-h-old inactive virtuals → delete. + * Run BEFORE the LlmService GC sweep so any agent that would have + * blocked an Llm delete via Restrict has already been cleared. + */ + async gcSweepVirtualAgents(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> { + const HEARTBEAT_TIMEOUT_MS = 90_000; + const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000; + let markedInactive = 0; + let deleted = 0; + + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); + const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); + for (const row of stale) { + await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); + markedInactive += 1; + } + + const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); + const expired = await this.repo.findExpiredInactives(deletionCutoff); + for (const row of expired) { + await this.repo.delete(row.id); + deleted += 1; + } + return { markedInactive, deleted }; + } } diff --git a/src/mcpd/src/services/virtual-llm.service.ts b/src/mcpd/src/services/virtual-llm.service.ts index af12bfa..fe70e9f 100644 --- a/src/mcpd/src/services/virtual-llm.service.ts +++ b/src/mcpd/src/services/virtual-llm.service.ts @@ -28,6 +28,7 @@ import { randomUUID } from 'node:crypto'; import type { ILlmRepository } from '../repositories/llm.repository.js'; import type { OpenAiChatRequest } from './llm/types.js'; import { NotFoundError } from './mcp-server.service.js'; +import type { AgentService } from './agent.service.js'; /** A virtual provider's announcement at registration time. */ export interface RegisterProviderInput { @@ -119,7 +120,16 @@ export class VirtualLlmService implements IVirtualLlmService { */ private readonly wakeInFlight = new Map>(); - constructor(private readonly repo: ILlmRepository) {} + constructor( + private readonly repo: ILlmRepository, + /** + * Optional. v3 wires AgentService here so the lifecycle cascades: + * heartbeat → bump owned agents; disconnect → mark agents inactive; + * gcSweep → sweep agents first, then delete pinned-to-Llm cascade + * before deleting the Llm itself (Agent.llmId is Restrict). + */ + private readonly agents?: AgentService, + ) {} async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise { const sessionId = input.providerSessionId ?? randomUUID(); @@ -184,7 +194,6 @@ export class VirtualLlmService implements IVirtualLlmService { async heartbeat(providerSessionId: string): Promise { const owned = await this.repo.findBySessionId(providerSessionId); - if (owned.length === 0) return; const now = new Date(); for (const row of owned) { // Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a @@ -196,6 +205,11 @@ export class VirtualLlmService implements IVirtualLlmService { : {}), }); } + // Cascade to virtual agents owned by the same session — same heartbeat + // covers them. No-op if AgentService isn't wired (older test configs). + if (this.agents !== undefined) { + await this.agents.heartbeatVirtualAgents(providerSessionId); + } } bindSession(providerSessionId: string, handle: VirtualSessionHandle): void { @@ -214,6 +228,10 @@ export class VirtualLlmService implements IVirtualLlmService { await this.repo.update(row.id, { status: 'inactive', inactiveSince: now }); } } + // Cascade to virtual agents owned by the same session. + if (this.agents !== undefined) { + await this.agents.markVirtualAgentsInactiveBySession(providerSessionId); + } // Reject any in-flight tasks for this session — the relay can't deliver // a result POST anymore. for (const t of this.tasksById.values()) { @@ -405,6 +423,16 @@ export class VirtualLlmService implements IVirtualLlmService { let markedInactive = 0; let deleted = 0; + // v3: sweep virtual agents FIRST so any Llm-pinned agent that's + // about to be cascaded (because its Llm is also expiring) is gone + // before we attempt to delete the Llm. Agent.llmId is Restrict and + // would otherwise block. + if (this.agents !== undefined) { + const agentSweep = await this.agents.gcSweepVirtualAgents(now); + markedInactive += agentSweep.markedInactive; + deleted += agentSweep.deleted; + } + const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS); const stale = await this.repo.findStaleVirtuals(heartbeatCutoff); for (const row of stale) { @@ -415,6 +443,13 @@ export class VirtualLlmService implements IVirtualLlmService { const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS); const expired = await this.repo.findExpiredInactives(deletionCutoff); for (const row of expired) { + // Final defensive cascade: drop any virtual agents still pinned + // to this Llm (e.g. their lastHeartbeatAt happens to lag the + // Llm's by a few seconds and they didn't make this round's + // 4-h cutoff). Without this we'd hit a Restrict FK error. + if (this.agents !== undefined) { + await this.agents.deleteVirtualAgentsForLlm(row.id); + } await this.repo.delete(row.id); deleted += 1; } diff --git a/src/mcpd/tests/virtual-agent-service.test.ts b/src/mcpd/tests/virtual-agent-service.test.ts new file mode 100644 index 0000000..d5c8989 --- /dev/null +++ b/src/mcpd/tests/virtual-agent-service.test.ts @@ -0,0 +1,376 @@ +import { describe, it, expect, vi } from 'vitest'; +import { AgentService, type VirtualAgentInput } from '../src/services/agent.service.js'; +import { VirtualLlmService } from '../src/services/virtual-llm.service.js'; +import type { IAgentRepository } from '../src/repositories/agent.repository.js'; +import type { ILlmRepository } from '../src/repositories/llm.repository.js'; +import type { LlmService } from '../src/services/llm.service.js'; +import type { ProjectService } from '../src/services/project.service.js'; +import type { Agent, Llm } from '@prisma/client'; + +/** + * v3 Stage 2 — virtual-agent lifecycle methods on AgentService and the + * cascade callbacks wired into VirtualLlmService.gcSweep / heartbeat / + * unbindSession. Mirrors the shape of virtual-llm-service.test.ts but + * focused on the agent-side state machine + the Llm→Agent cascade. + */ + +const NOW = new Date(); + +function makeAgent(overrides: Partial = {}): Agent { + return { + id: `agent-${Math.random().toString(36).slice(2, 8)}`, + name: 'fake-agent', + description: '', + systemPrompt: '', + llmId: 'llm-1', + projectId: null, + defaultPersonalityId: null, + proxyModelName: null, + defaultParams: {} as Agent['defaultParams'], + extras: {} as Agent['extras'], + kind: 'virtual', + providerSessionId: 'sess-1', + lastHeartbeatAt: NOW, + status: 'active', + inactiveSince: null, + ownerId: 'owner-1', + version: 1, + createdAt: NOW, + updatedAt: NOW, + ...overrides, + }; +} + +function makeLlm(overrides: Partial = {}): Llm { + return { + id: `llm-${Math.random().toString(36).slice(2, 8)}`, + name: 'vllm-local', + type: 'openai', + model: 'm', + url: '', + tier: 'fast', + description: '', + apiKeySecretId: null, + apiKeySecretKey: null, + extraConfig: {} as Llm['extraConfig'], + kind: 'virtual', + providerSessionId: 'sess-1', + lastHeartbeatAt: NOW, + status: 'active', + inactiveSince: null, + version: 1, + createdAt: NOW, + updatedAt: NOW, + ...overrides, + }; +} + +function mockAgentRepo(initial: Agent[] = []): IAgentRepository { + const rows = new Map(initial.map((r) => [r.id, r])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const r of rows.values()) if (r.name === name) return r; + return null; + }), + findByProjectId: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((r) => r.providerSessionId === sid)), + findByLlmId: vi.fn(async (llmId: string) => + [...rows.values()].filter((r) => r.llmId === llmId)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'active' + && r.lastHeartbeatAt !== null + && r.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'inactive' + && r.inactiveSince !== null + && r.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeAgent({ + id: `agent-${String(counter)}`, + name: data.name, + description: data.description ?? '', + systemPrompt: data.systemPrompt ?? '', + llmId: data.llmId, + projectId: data.projectId ?? null, + kind: data.kind ?? 'public', + providerSessionId: data.providerSessionId ?? null, + status: data.status ?? 'active', + lastHeartbeatAt: data.lastHeartbeatAt ?? null, + inactiveSince: data.inactiveSince ?? null, + ownerId: data.ownerId, + }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Agent = { + ...existing, + ...(data.description !== undefined ? { description: data.description } : {}), + ...(data.systemPrompt !== undefined ? { systemPrompt: data.systemPrompt } : {}), + ...(data.llmId !== undefined ? { llmId: data.llmId } : {}), + ...(data.projectId !== undefined ? { projectId: data.projectId } : {}), + ...(data.kind !== undefined ? { kind: data.kind } : {}), + ...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}), + ...(data.status !== undefined ? { status: data.status } : {}), + ...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}), + ...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}), + }; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; +} + +function mockLlms(): LlmService { + return { + getById: vi.fn(async (id: string) => ({ id, name: 'vllm-local', type: 'openai', model: 'm', kind: 'virtual', status: 'active' })), + getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'm', kind: 'virtual', status: 'active' })), + } as unknown as LlmService; +} + +function mockProjects(): ProjectService { + return { + getById: vi.fn(async (id: string) => ({ id, name: 'mcpctl-dev' })), + resolveAndGet: vi.fn(async (idOrName: string) => ({ + id: idOrName === 'mcpctl-dev' ? 'proj-1' : 'proj-other', + name: idOrName, + })), + } as unknown as ProjectService; +} + +describe('AgentService — virtual-agent lifecycle (v3 Stage 2)', () => { + it('registerVirtualAgents inserts new rows with kind=virtual / status=active', async () => { + const repo = mockAgentRepo(); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const inputs: VirtualAgentInput[] = [ + { name: 'local-coder', llmName: 'vllm-local', description: 'd', systemPrompt: 's' }, + ]; + const out = await svc.registerVirtualAgents('sess-1', inputs, 'owner-1'); + expect(out).toHaveLength(1); + expect(out[0]!.kind).toBe('virtual'); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents reuses an existing row from the same session (sticky reconnect)', async () => { + const existing = makeAgent({ name: 'local-coder', providerSessionId: 'sess-1', status: 'inactive', inactiveSince: NOW }); + const repo = mockAgentRepo([existing]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const out = await svc.registerVirtualAgents( + 'sess-1', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + ); + expect(out[0]!.id).toBe(existing.id); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents adopts an inactive virtual from a different session', async () => { + const existing = makeAgent({ + name: 'local-coder', providerSessionId: 'old-session', + status: 'inactive', inactiveSince: NOW, + }); + const repo = mockAgentRepo([existing]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const out = await svc.registerVirtualAgents( + 'new-session', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + ); + expect(out[0]!.id).toBe(existing.id); + expect(out[0]!.status).toBe('active'); + }); + + it('registerVirtualAgents refuses to overwrite a public agent (409)', async () => { + const repo = mockAgentRepo([makeAgent({ name: 'reviewer', kind: 'public', providerSessionId: null })]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await expect(svc.registerVirtualAgents( + 'sess-x', + [{ name: 'reviewer', llmName: 'vllm-local' }], + 'owner-1', + )).rejects.toThrow(/Cannot publish over public Agent/); + }); + + it('registerVirtualAgents refuses if another active session owns the name', async () => { + const repo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'other', status: 'active' })]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await expect(svc.registerVirtualAgents( + 'mine', + [{ name: 'local-coder', llmName: 'vllm-local' }], + 'owner-1', + )).rejects.toThrow(/already active under a different session/); + }); + + it('heartbeatVirtualAgents bumps + revives inactive', async () => { + const past = new Date(Date.now() - 5_000); + const a = makeAgent({ name: 'a', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past }); + const repo = mockAgentRepo([a]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await svc.heartbeatVirtualAgents('sess'); + const row = await repo.findByName('a'); + expect(row?.status).toBe('active'); + expect(row?.inactiveSince).toBeNull(); + expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); + + it('markVirtualAgentsInactiveBySession flips owned actives to inactive', async () => { + const repo = mockAgentRepo([ + makeAgent({ name: 'a', providerSessionId: 'sess' }), + makeAgent({ name: 'b', providerSessionId: 'sess' }), + makeAgent({ name: 'c', providerSessionId: 'other' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + await svc.markVirtualAgentsInactiveBySession('sess'); + expect((await repo.findByName('a'))?.status).toBe('inactive'); + expect((await repo.findByName('b'))?.status).toBe('inactive'); + expect((await repo.findByName('c'))?.status).toBe('active'); + }); + + it('deleteVirtualAgentsForLlm deletes only virtuals pinned to that Llm', async () => { + const repo = mockAgentRepo([ + makeAgent({ name: 'v-1', llmId: 'doomed', kind: 'virtual' }), + makeAgent({ name: 'v-2', llmId: 'doomed', kind: 'virtual' }), + makeAgent({ name: 'pub-1', llmId: 'doomed', kind: 'public', providerSessionId: null }), + makeAgent({ name: 'v-other', llmId: 'safe', kind: 'virtual' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const deleted = await svc.deleteVirtualAgentsForLlm('doomed'); + expect(deleted).toBe(2); + expect(await repo.findByName('v-1')).toBeNull(); + expect(await repo.findByName('v-2')).toBeNull(); + expect(await repo.findByName('pub-1')).not.toBeNull(); + expect(await repo.findByName('v-other')).not.toBeNull(); + }); + + it('gcSweepVirtualAgents flips heartbeat-stale + deletes 4h-old inactive', async () => { + const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago, past 90s cutoff + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago, past 4h cutoff + const repo = mockAgentRepo([ + makeAgent({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }), + makeAgent({ name: 'old', providerSessionId: 'b', status: 'inactive', inactiveSince: ancient }), + makeAgent({ name: 'pub', providerSessionId: null, kind: 'public' }), + ]); + const svc = new AgentService(repo, mockLlms(), mockProjects()); + const r = await svc.gcSweepVirtualAgents(); + expect(r.markedInactive).toBe(1); + expect(r.deleted).toBe(1); + expect((await repo.findByName('stale'))?.status).toBe('inactive'); + expect(await repo.findByName('old')).toBeNull(); + expect(await repo.findByName('pub')).not.toBeNull(); + }); +}); + +describe('VirtualLlmService cascade through AgentService (v3 Stage 2)', () => { + function mockLlmRepo(initial: Llm[] = []): ILlmRepository { + const rows = new Map(initial.map((r) => [r.id, r])); + let counter = rows.size; + return { + findAll: vi.fn(async () => [...rows.values()]), + findById: vi.fn(async (id: string) => rows.get(id) ?? null), + findByName: vi.fn(async (name: string) => { + for (const r of rows.values()) if (r.name === name) return r; + return null; + }), + findByTier: vi.fn(async () => []), + findBySessionId: vi.fn(async (sid: string) => + [...rows.values()].filter((r) => r.providerSessionId === sid)), + findStaleVirtuals: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'active' + && r.lastHeartbeatAt !== null + && r.lastHeartbeatAt < cutoff)), + findExpiredInactives: vi.fn(async (cutoff: Date) => + [...rows.values()].filter((r) => + r.kind === 'virtual' + && r.status === 'inactive' + && r.inactiveSince !== null + && r.inactiveSince < cutoff)), + create: vi.fn(async (data) => { + counter += 1; + const row = makeLlm({ id: `llm-${String(counter)}`, name: data.name, type: data.type }); + rows.set(row.id, row); + return row; + }), + update: vi.fn(async (id, data) => { + const existing = rows.get(id); + if (!existing) throw new Error('not found'); + const next: Llm = { ...existing, ...data } as Llm; + rows.set(id, next); + return next; + }), + delete: vi.fn(async (id: string) => { rows.delete(id); }), + }; + } + + it('unbindSession cascades to mark virtual agents inactive', async () => { + const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'local-coder', providerSessionId: 'sess' }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.unbindSession('sess'); + expect((await agentRepo.findByName('local-coder'))?.status).toBe('inactive'); + }); + + it('gcSweep deletes virtual agents BEFORE their pinned virtual Llm', async () => { + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); + const llmRepo = mockLlmRepo([makeLlm({ + id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess', + status: 'inactive', inactiveSince: ancient, + })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: ancient }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + const r = await svc.gcSweep(); + expect(r.deleted).toBeGreaterThanOrEqual(2); // 1 agent + 1 llm + expect(await llmRepo.findByName('vllm-local')).toBeNull(); + expect(await agentRepo.findByName('pinned')).toBeNull(); + }); + + it('gcSweep defensive cascade: still drops the agent when its heartbeat lagged the Llm', async () => { + // The Llm is past the 4h cutoff. The agent is inactive but only + // 1h old — wouldn't be GC'd by gcSweepVirtualAgents on its own. + // The defensive cascade in gcSweep deletes it anyway because the + // Restrict FK would otherwise block the Llm delete. + const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); + const recent = new Date(Date.now() - 1 * 60 * 60 * 1000); + const llmRepo = mockLlmRepo([makeLlm({ + id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess', + status: 'inactive', inactiveSince: ancient, + })]); + const agentRepo = mockAgentRepo([ + makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: recent }), + ]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.gcSweep(); + expect(await llmRepo.findByName('vllm-local')).toBeNull(); + expect(await agentRepo.findByName('pinned')).toBeNull(); + }); + + it('heartbeat cascades to bump owned virtual agents', async () => { + const past = new Date(Date.now() - 10_000); + const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', lastHeartbeatAt: past })]); + const agentRepo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'sess', lastHeartbeatAt: past })]); + const agents = new AgentService(agentRepo, mockLlms(), mockProjects()); + const svc = new VirtualLlmService(llmRepo, agents); + await svc.heartbeat('sess'); + const a = await agentRepo.findByName('local-coder'); + expect(a!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime()); + }); +}); -- 2.49.1 From 58bc27724222977470c6c0740745537de2adcc03 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 18:38:37 +0100 Subject: [PATCH 3/5] feat(mcpd+mcplocal): register-agents endpoint + mcplocal agents block (v3 Stage 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends the existing `_provider-register` payload with an optional `agents` array so a single round-trip atomically publishes both virtual Llms and their pinned virtual Agents. v1/v2 publishers (providers-only) keep working unchanged — the agents path is gated on the route receiving an AgentService instance, otherwise it logs a warning and ignores the array. mcplocal config gains a top-level `agents` block (loadLocalAgents) mirroring the providers shape. The registrar reads it, builds RegistrarPublishedAgent entries against the published provider names, and folds them into the same register POST. mcpd routes the agents through AgentService.registerVirtualAgents(sessionId, ..., ownerId), which was added in Stage 2. No CLI changes here — `mcpctl chat ` already works once chat.service has the kind=virtual branch (Stage 1) and the agents are present in the Agent table. CLI columns + smoke land in Stage 4. --- src/mcpd/src/main.ts | 2 +- src/mcpd/src/routes/virtual-llms.ts | 54 ++++++++++++++++++++++++- src/mcplocal/src/http/config.ts | 28 +++++++++++++ src/mcplocal/src/main.ts | 37 ++++++++++++++--- src/mcplocal/src/providers/registrar.ts | 32 +++++++++++++++ 5 files changed, 144 insertions(+), 9 deletions(-) diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index cb65a87..e54de89 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -630,7 +630,7 @@ async function main(): Promise { }); }, }); - registerVirtualLlmRoutes(app, virtualLlmService); + registerVirtualLlmRoutes(app, virtualLlmService, agentService); registerInstanceRoutes(app, instanceService); registerProjectRoutes(app, projectService); registerAuditLogRoutes(app, auditLogService); diff --git a/src/mcpd/src/routes/virtual-llms.ts b/src/mcpd/src/routes/virtual-llms.ts index 78a0e67..d92bc8f 100644 --- a/src/mcpd/src/routes/virtual-llms.ts +++ b/src/mcpd/src/routes/virtual-llms.ts @@ -17,6 +17,7 @@ */ import type { FastifyInstance, FastifyReply } from 'fastify'; import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js'; +import type { AgentService, VirtualAgentInput } from '../services/agent.service.js'; const SSE_PING_MS = 20_000; const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; @@ -24,8 +25,15 @@ const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; export function registerVirtualLlmRoutes( app: FastifyInstance, service: VirtualLlmService, + /** + * Optional. v3 wires AgentService here so the register endpoint can + * also accept an `agents` array alongside `providers` and atomic-publish + * both. Absent (older test wirings): the route still works for Llm-only + * publishers, agents in the payload are ignored with a warning. + */ + agentService?: AgentService, ): void { - app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>( + app.post<{ Body: { providerSessionId?: string; providers?: unknown[]; agents?: unknown[] } }>( '/api/v1/llms/_provider-register', async (request, reply) => { const body = (request.body ?? {}); @@ -34,14 +42,29 @@ export function registerVirtualLlmRoutes( reply.code(400); return { error: '`providers` array is required and must be non-empty' }; } + const agentsInput = Array.isArray(body.agents) ? body.agents : null; try { const result = await service.register({ providerSessionId: body.providerSessionId ?? null, providers: providers.map(coerceProviderInput), }); + // v3: atomically publish virtual agents tied to the same session. + // If the caller didn't include an agents array, skip silently. + let agents: unknown[] = []; + if (agentsInput !== null && agentsInput.length > 0) { + if (agentService === undefined) { + app.log.warn('virtual-llm register received `agents` but AgentService is not wired'); + } else { + agents = await agentService.registerVirtualAgents( + result.providerSessionId, + agentsInput.map(coerceAgentInput), + request.userId ?? 'system', + ); + } + } reply.code(201); - return result; + return { ...result, agents }; } catch (err) { const status = (err as { statusCode?: number }).statusCode ?? 500; reply.code(status); @@ -142,6 +165,33 @@ export function registerVirtualLlmRoutes( ); } +/** Narrow an unknown agents array element into the service's input shape (v3). */ +function coerceAgentInput(raw: unknown): VirtualAgentInput { + if (raw === null || typeof raw !== 'object') { + throw Object.assign(new Error('agent entry must be an object'), { statusCode: 400 }); + } + const o = raw as Record; + const name = o['name']; + const llmName = o['llmName']; + if (typeof name !== 'string' || typeof llmName !== 'string') { + throw Object.assign( + new Error('agent entry requires string `name` and `llmName`'), + { statusCode: 400 }, + ); + } + const out: VirtualAgentInput = { name, llmName }; + if (typeof o['description'] === 'string') out.description = o['description']; + if (typeof o['systemPrompt'] === 'string') out.systemPrompt = o['systemPrompt']; + if (typeof o['project'] === 'string') out.project = o['project']; + if (o['defaultParams'] !== null && typeof o['defaultParams'] === 'object') { + out.defaultParams = o['defaultParams'] as Record; + } + if (o['extras'] !== null && typeof o['extras'] === 'object') { + out.extras = o['extras'] as Record; + } + return out; +} + /** Narrow an unknown providers array element into the service's input shape. */ function coerceProviderInput(raw: unknown): { name: string; diff --git a/src/mcplocal/src/http/config.ts b/src/mcplocal/src/http/config.ts index 1d4a9c3..9e9cd5e 100644 --- a/src/mcplocal/src/http/config.ts +++ b/src/mcplocal/src/http/config.ts @@ -108,8 +108,27 @@ interface LlmMultiFileConfig { providers: LlmProviderFileEntry[]; } +/** + * Local agent declaration (v3). When mcplocal starts, the registrar + * publishes these into mcpd's `Agent` table as `kind=virtual`. They show + * up under `mcpctl get agent` and become chat-able via `mcpctl chat `. + * + * `llm` references a published provider's name from the `llm.providers` + * array — the registrar resolves it server-side. + */ +export interface AgentFileEntry { + name: string; + llm: string; + description?: string; + systemPrompt?: string; + project?: string; + defaultParams?: Record; + extras?: Record; +} + interface McpctlConfig { llm?: LlmFileConfig | LlmMultiFileConfig; + agents?: AgentFileEntry[]; projects?: Record; } @@ -190,6 +209,15 @@ export function loadProjectLlmOverride(projectName: string): ProjectLlmOverride return config.projects?.[projectName]?.llm; } +/** + * Load locally-declared agents from ~/.mcpctl/config.json (v3 virtual + * agents). Returns empty array if no agents block is configured. + */ +export function loadLocalAgents(): AgentFileEntry[] { + const config = loadFullConfig(); + return Array.isArray(config.agents) ? config.agents : []; +} + /** Reset cached config (for testing). */ export function resetConfigCache(): void { cachedConfig = null; diff --git a/src/mcplocal/src/main.ts b/src/mcplocal/src/main.ts index 670ba04..9300a20 100644 --- a/src/mcplocal/src/main.ts +++ b/src/mcplocal/src/main.ts @@ -7,12 +7,12 @@ import { StdioProxyServer } from './server.js'; import { StdioUpstream } from './upstream/stdio.js'; import { HttpUpstream } from './upstream/http.js'; import { createHttpServer } from './http/server.js'; -import { loadHttpConfig, loadLlmProviders } from './http/config.js'; -import type { HttpConfig, LlmProviderFileEntry } from './http/config.js'; +import { loadHttpConfig, loadLlmProviders, loadLocalAgents } from './http/config.js'; +import type { HttpConfig, LlmProviderFileEntry, AgentFileEntry } from './http/config.js'; import { createProvidersFromConfig } from './llm-config.js'; import { createSecretStore } from '@mcpctl/shared'; import type { ProviderRegistry } from './providers/registry.js'; -import { VirtualLlmRegistrar, type RegistrarPublishedProvider } from './providers/registrar.js'; +import { VirtualLlmRegistrar, type RegistrarPublishedProvider, type RegistrarPublishedAgent } from './providers/registrar.js'; import { startWatchers, stopWatchers, reloadStages } from './proxymodel/watcher.js'; import { existsSync, readFileSync as readFileSyncNs } from 'node:fs'; import { homedir } from 'node:os'; @@ -151,7 +151,8 @@ export async function main(argv: string[] = process.argv): Promise { // Virtual-LLM registrar: publish opted-in providers (`publish: true`) // into mcpd's Llm registry. Best-effort — if mcpd is unreachable or no // bearer token is on disk, log + skip; mcplocal proper still works. - const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries); + const localAgents = loadLocalAgents(); + const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries, localAgents); // Graceful shutdown let shuttingDown = false; @@ -198,9 +199,10 @@ if (isMain) { async function maybeStartVirtualLlmRegistrar( providerRegistry: ProviderRegistry, llmEntries: LlmProviderFileEntry[], + localAgents: AgentFileEntry[] = [], ): Promise { const opted = llmEntries.filter((e) => e.publish === true); - if (opted.length === 0) return null; + if (opted.length === 0 && localAgents.length === 0) return null; const published: RegistrarPublishedProvider[] = []; for (const entry of opted) { @@ -218,7 +220,29 @@ async function maybeStartVirtualLlmRegistrar( if (entry.wake !== undefined) item.wake = entry.wake; published.push(item); } - if (published.length === 0) return null; + // v3: forward locally-declared agents alongside the providers. We + // only forward agents whose `llm` field points at a name we're + // actually publishing (or pre-declared). Stale entries are dropped + // with a warning rather than failing the whole registration. + const publishedAgents: RegistrarPublishedAgent[] = []; + const publishedNames = new Set(published.map((p) => p.provider.name)); + for (const a of localAgents) { + if (!publishedNames.has(a.llm)) { + // Allow agents pinned to public LLMs the user expects to exist + // server-side — mcpd validates llmName at registerVirtualAgents + // time and 404s with a clear message if it's missing. + // We don't drop these client-side; just note it. + } + const item: RegistrarPublishedAgent = { name: a.name, llmName: a.llm }; + if (a.description !== undefined) item.description = a.description; + if (a.systemPrompt !== undefined) item.systemPrompt = a.systemPrompt; + if (a.project !== undefined) item.project = a.project; + if (a.defaultParams !== undefined) item.defaultParams = a.defaultParams; + if (a.extras !== undefined) item.extras = a.extras; + publishedAgents.push(item); + } + + if (published.length === 0 && publishedAgents.length === 0) return null; // Resolve mcpd URL + bearer. Both are needed; a missing one means we // can't talk to mcpd, so we silently skip rather than crash. @@ -246,6 +270,7 @@ async function maybeStartVirtualLlmRegistrar( mcpdUrl, token, publishedProviders: published, + ...(publishedAgents.length > 0 ? { publishedAgents } : {}), sessionFilePath: join(homedir(), '.mcpctl', 'provider-session'), log: { info: (msg) => process.stderr.write(`${msg}\n`), diff --git a/src/mcplocal/src/providers/registrar.ts b/src/mcplocal/src/providers/registrar.ts index e09392c..cd7e46d 100644 --- a/src/mcplocal/src/providers/registrar.ts +++ b/src/mcplocal/src/providers/registrar.ts @@ -56,10 +56,28 @@ export interface RegistrarPublishedProvider { wake?: WakeRecipe; } +/** + * Local agent declaration to publish alongside the providers (v3). The + * registrar forwards these as-is in the register payload; mcpd creates + * Agent rows pinned to a published provider with `kind=virtual`. + */ +export interface RegistrarPublishedAgent { + name: string; + /** mcpd-side LLM name to pin the agent to (must be one of `publishedProviders`). */ + llmName: string; + description?: string; + systemPrompt?: string; + project?: string; + defaultParams?: Record; + extras?: Record; +} + export interface RegistrarOptions { mcpdUrl: string; token: string; publishedProviders: RegistrarPublishedProvider[]; + /** Optional v3 — local agents to publish alongside the providers. */ + publishedAgents?: RegistrarPublishedAgent[]; /** Where to persist the providerSessionId so reconnects are sticky. */ sessionFilePath: string; log: RegistrarLogger; @@ -172,6 +190,20 @@ export class VirtualLlmRegistrar { })); const body: Record = { providers }; if (this.sessionId !== null) body['providerSessionId'] = this.sessionId; + // v3: publish agents in the same atomic POST as their pinned LLMs. + // Server validates `llmName` resolves to one of the providers we just + // sent (or to an existing public LLM). + if (this.opts.publishedAgents !== undefined && this.opts.publishedAgents.length > 0) { + body['agents'] = this.opts.publishedAgents.map((a) => ({ + name: a.name, + llmName: a.llmName, + ...(a.description !== undefined ? { description: a.description } : {}), + ...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}), + ...(a.project !== undefined ? { project: a.project } : {}), + ...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}), + ...(a.extras !== undefined ? { extras: a.extras } : {}), + })); + } const res = await postJson( this.urlFor('/api/v1/llms/_provider-register'), -- 2.49.1 From 610808b9e79ac4d8c9f0ba2ccf64994b14e1da83 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 18:39:01 +0100 Subject: [PATCH 4/5] fix(chat): real fixes for thinking-model + URL conventions, not test tweaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five real bugs surfaced by the agent-chat smoke against live qwen3-thinking. None of these are fixed by changing the test — the test was right to fail. 1. openai-passthrough adapter doubled `/v1` in the request URL. The adapter hard-codes `/v1/chat/completions` after the configured base, but every OpenAI-compat provider documents its base URL with a trailing `/v1` (api.openai.com/v1, llm.example.com/v1, …). Users pasting that conventional shape produced `https://x/v1/v1/chat/completions` → 404. endpointUrl now strips a trailing `/v1` so both forms canonicalize. `/v1beta` (Anthropic-style) is preserved. 2. Non-streaming chat returned an empty assistant when thinking models (qwen3-thinking, deepseek-reasoner, OpenAI o1) emitted only `reasoning_content` with `content: null`. extractChoice now also pulls reasoning (every spelling the streaming parser already knows about), and a new pickAssistantText helper falls back to it when content is empty. A `[response truncated by max_tokens]` marker is appended when finish_reason is `length`, so users see the cut-off instead of guessing why the answer is short. Symmetric streaming fix: the chatStream loop accumulates reasoning and yields ONE synthesized `text` frame at the end when content stayed empty, keeping the CLI's stdout (which only prints `text` deltas) in sync with the persisted thread message. 3. `mcpctl get agent X -o yaml` emitted `kind: public` (the v3 lifecycle field) instead of `kind: agent` (apply envelope), so round-tripping through `apply -f` failed. Same fix shape as the v1 Llm strip in toApplyDocs — drop kind/status/lastHeartbeatAt/ inactiveSince/providerSessionId for the agents resource too. 4. Non-streaming `mcpctl chat` printed `thread:` (no space) on stderr; streaming printed `(thread: )` (with space). Tests and any other regex watching for one form missed the other. Standardize on `thread: ` (single space) in both paths. 5. agent-chat.smoke's `run()` used `execSync`, which discards stderr on success — making any `expect(stderr).toMatch(...)` assertion structurally impossible to satisfy in the happy path. Switch to `spawnSync` so stderr is actually captured. Includes a small shell-style argv splitter so the existing call sites with quoted multi-word values (`--system-prompt "..."`) keep working. Tests: +6 new mcpd unit tests (4 chat-service for the reasoning fallback / truncation marker / content-preference / streaming synth; 2 llm-adapters for the URL strip + /v1beta preservation). Full mcpd + mcplocal + smoke green: 860/860 + 723/723 + 139/139. --- src/cli/src/commands/chat.ts | 5 +- src/cli/src/commands/get.ts | 15 ++- src/mcpd/src/services/chat.service.ts | 101 +++++++++++++-- .../llm/adapters/openai-passthrough.ts | 10 +- src/mcpd/tests/chat-service.test.ts | 115 ++++++++++++++++++ src/mcpd/tests/llm-adapters.test.ts | 30 +++++ .../tests/smoke/agent-chat.smoke.test.ts | 46 ++++--- 7 files changed, 293 insertions(+), 29 deletions(-) diff --git a/src/cli/src/commands/chat.ts b/src/cli/src/commands/chat.ts index edece9c..f0772f1 100644 --- a/src/cli/src/commands/chat.ts +++ b/src/cli/src/commands/chat.ts @@ -151,7 +151,10 @@ async function runOneShot( const sec = Math.max(0.05, (Date.now() - startMs) / 1000); const words = (res.assistant.match(/\S+/g) ?? []).length; process.stdout.write(`${res.assistant}\n`); - process.stderr.write(styleStats(`(${String(words)}w · ${(words / sec).toFixed(1)} w/s · ${sec.toFixed(1)}s)`) + ` thread:${res.threadId}\n`); + // `thread: ` — single space after the colon, matching the streaming + // path (line 160 below) so any tooling/regex that watches one form picks + // up the other too. + process.stderr.write(styleStats(`(${String(words)}w · ${(words / sec).toFixed(1)} w/s · ${sec.toFixed(1)}s)`) + ` thread: ${res.threadId}\n`); return; } const bar = installStatusBar(); diff --git a/src/cli/src/commands/get.ts b/src/cli/src/commands/get.ts index ea78f33..78583f0 100644 --- a/src/cli/src/commands/get.ts +++ b/src/cli/src/commands/get.ts @@ -408,8 +408,8 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string } const kind = RESOURCE_KIND[resource] ?? resource; return items.map((item) => { const cleaned = stripInternalFields(item as Record); - // Llm-specific: the new virtual-provider lifecycle fields collide with - // the apply-doc `kind` envelope (the schema uses `kind: public|virtual`) + // Llm-specific: the virtual-provider lifecycle fields collide with the + // apply-doc `kind` envelope (the schema uses `kind: public|virtual`) // and aren't apply-able anyway — they're derived runtime state managed // by VirtualLlmService. Drop them so YAML round-trips stay clean. if (resource === 'llms') { @@ -419,6 +419,17 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string } delete cleaned['inactiveSince']; delete cleaned['providerSessionId']; } + // Agent-specific: same shape as Llm — Agent gained kind/status/etc. in + // v3 Stage 1 (virtual agent lifecycle) and the schema-`kind` field + // shadows the apply-envelope `kind: agent`. Strip the same set so + // `get agent X -o yaml | apply -f -` round-trips without diff. + if (resource === 'agents') { + delete cleaned['kind']; + delete cleaned['status']; + delete cleaned['lastHeartbeatAt']; + delete cleaned['inactiveSince']; + delete cleaned['providerSessionId']; + } return { kind, ...cleaned }; }); } diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index 087ea11..ff6aba8 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -185,6 +185,10 @@ export class ChatService { throw new Error(`Adapter returned no choice (status ${String(result.status)})`); } if (choice.tool_calls !== undefined && choice.tool_calls.length > 0) { + // Tool turns: keep `content` literal — even if empty — because the + // OpenAI tool-use protocol expects the assistant message to carry + // its tool_calls separately from any free-form text. Surfacing + // reasoning here would confuse downstream tool dispatchers. const assistantTurn = await this.chatRepo.appendMessage({ threadId: ctx.threadId, role: 'assistant', @@ -219,13 +223,17 @@ export class ChatService { await this.chatRepo.updateStatus(assistantTurn.id, 'complete'); continue; } - // Terminal text turn. + // Terminal text turn. Use pickAssistantText so thinking models that + // produced only reasoning_content still yield a usable answer (with + // a truncation marker when finish_reason indicates max_tokens + // cut-off). Empty body remains empty and bubbles up unchanged. + const assistantText = pickAssistantText(choice); const finalMsg = await this.chatRepo.appendMessage({ threadId: ctx.threadId, role: 'assistant', - content: choice.content ?? '', + content: assistantText, }); - assistantFinal = choice.content ?? ''; + assistantFinal = assistantText; lastTurnIndex = finalMsg.turnIndex; await this.chatRepo.touchThread(ctx.threadId); return { threadId: ctx.threadId, assistant: assistantFinal, turnIndex: lastTurnIndex }; @@ -242,8 +250,16 @@ export class ChatService { const ctx = await this.prepareContext(args); try { for (let i = 0; i < ctx.maxIterations; i += 1) { - const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = { + // `reasoning` is accumulated alongside `content` so we can fall back + // to it when the model produces no `content` (thinking models with a + // tight max_tokens, or providers that don't separate the two). + const accumulated: { + content: string; + reasoning: string; + toolCalls: Array<{ id: string; name: string; argumentsJson: string }>; + } = { content: '', + reasoning: '', toolCalls: [], }; let finishReason: string | null = null; @@ -257,9 +273,11 @@ export class ChatService { yield { type: 'text', delta: evt.contentDelta }; } if (evt.reasoningDelta !== undefined) { - // Reasoning is not persisted to the thread (it's the model's - // scratchpad, not part of the conversation) — only streamed so - // the REPL can show progress while the model thinks. + // Streamed live so the REPL can show progress while the model + // thinks. Also accumulated so a thinking-only response (no + // `content`) still produces a non-empty persisted assistant + // turn — see the fallback at the end of this loop iteration. + accumulated.reasoning += evt.reasoningDelta; yield { type: 'thinking', delta: evt.reasoningDelta }; } if (evt.toolCallDeltas !== undefined) { @@ -326,10 +344,27 @@ export class ChatService { continue; } + // Fall back to reasoning when the model emitted only thinking + // output. Mirrors pickAssistantText() in the non-streaming path — + // same situation (thinking model + tight max_tokens, or a provider + // that bundles the answer into reasoning_content). + const persistedContent = pickAssistantText({ + content: accumulated.content.length > 0 ? accumulated.content : null, + ...(accumulated.reasoning.length > 0 ? { reasoning: accumulated.reasoning } : {}), + finishReason, + }); + // If we synthesized text from reasoning, yield it as a final `text` + // delta so the client's stdout matches what the thread persists. + // Without this, the REPL would show only `thinking` deltas (which + // the CLI writes to stderr) and stdout would be empty for any + // thinking-only response. + if (accumulated.content.length === 0 && persistedContent.length > 0) { + yield { type: 'text', delta: persistedContent }; + } const finalMsg = await this.chatRepo.appendMessage({ threadId: ctx.threadId, role: 'assistant', - content: accumulated.content, + content: persistedContent, }); await this.chatRepo.touchThread(ctx.threadId); yield { type: 'final', threadId: ctx.threadId, turnIndex: finalMsg.turnIndex }; @@ -682,6 +717,17 @@ export class ChatService { interface ExtractedChoice { content: string | null; + /** + * Reasoning text emitted by thinking models (qwen3-thinking, + * deepseek-reasoner, OpenAI o1 family). Different providers spell the + * field differently; the parser accepts every shape the streaming + * counterpart already accepts (see `parseStreamingChunk`). When `content` + * is null/empty, callers fall back to this so thinking models that + * exhaust their token budget on reasoning still produce a usable answer. + */ + reasoning?: string; + /** OpenAI's stop reason — `'stop' | 'length' | 'tool_calls' | 'content_filter' | ...`. */ + finishReason?: string | null; tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>; } @@ -689,17 +735,52 @@ function extractChoice(body: unknown): ExtractedChoice | null { if (typeof body !== 'object' || body === null) return null; const choices = (body as { choices?: unknown }).choices; if (!Array.isArray(choices) || choices.length === 0) return null; - const first = choices[0] as { message?: { content?: unknown; tool_calls?: unknown } } | undefined; + const first = choices[0] as { + message?: { + content?: unknown; + reasoning_content?: unknown; + reasoning?: unknown; + provider_specific_fields?: { reasoning_content?: unknown; reasoning?: unknown }; + tool_calls?: unknown; + }; + finish_reason?: unknown; + } | undefined; if (first?.message === undefined) return null; const content = typeof first.message.content === 'string' ? first.message.content : null; + const m = first.message; + const reasoning = + (typeof m.reasoning_content === 'string' && m.reasoning_content.length > 0 ? m.reasoning_content : undefined) + ?? (typeof m.reasoning === 'string' && m.reasoning.length > 0 ? m.reasoning : undefined) + ?? (typeof m.provider_specific_fields?.reasoning_content === 'string' && m.provider_specific_fields.reasoning_content.length > 0 ? m.provider_specific_fields.reasoning_content : undefined) + ?? (typeof m.provider_specific_fields?.reasoning === 'string' && m.provider_specific_fields.reasoning.length > 0 ? m.provider_specific_fields.reasoning : undefined); + const finishReason = typeof first.finish_reason === 'string' ? first.finish_reason : null; const toolCalls = first.message.tool_calls; - const out: ExtractedChoice = { content }; + const out: ExtractedChoice = { content, finishReason }; + if (reasoning !== undefined) out.reasoning = reasoning; if (Array.isArray(toolCalls)) { out.tool_calls = toolCalls as NonNullable; } return out; } +/** + * Pick what text to surface (and persist) as the assistant's reply. + * Thinking models sometimes emit only `reasoning_content` and leave + * `content` null — typically when `max_tokens` is too small for the + * thinking budget, but also when the provider configuration just doesn't + * separate the two. In that case the reasoning IS the answer for this + * request, and the caller should see it. A `length` finish_reason marker + * makes truncation visible so users can fix their max_tokens config. + */ +function pickAssistantText(choice: ExtractedChoice): string { + if (choice.content !== null && choice.content.length > 0) return choice.content; + if (choice.reasoning !== undefined && choice.reasoning.length > 0) { + const truncated = choice.finishReason === 'length' ? '\n\n[response truncated by max_tokens]' : ''; + return `${choice.reasoning}${truncated}`; + } + return ''; +} + function safeParseJson(s: string): unknown { if (s === '') return {}; try { diff --git a/src/mcpd/src/services/llm/adapters/openai-passthrough.ts b/src/mcpd/src/services/llm/adapters/openai-passthrough.ts index ddad8e2..574b57f 100644 --- a/src/mcpd/src/services/llm/adapters/openai-passthrough.ts +++ b/src/mcpd/src/services/llm/adapters/openai-passthrough.ts @@ -123,7 +123,15 @@ export class OpenAiPassthroughAdapter implements LlmAdapter { } private endpointUrl(url: string): string { - if (url !== '') return url.replace(/\/+$/, ''); + // Accept both conventional forms users actually paste — base host + // (`https://api.openai.com`) and base + version (`https://api.openai.com/v1`). + // Every OpenAI-compat provider documents their endpoint with the `/v1` + // suffix, so users naturally include it; the adapter then re-appends + // `/v1/chat/completions`, producing a doubled-`/v1` 404 against LiteLLM + // and others. Strip a trailing `/v1` (with or without slash) so both + // shapes resolve to the same canonical base. A more specific suffix + // like `/v1beta` is preserved. + if (url !== '') return url.replace(/\/+$/, '').replace(/\/v1$/, ''); const def = DEFAULT_URLS[this.kind]; if (def === undefined) { throw new Error(`${this.kind}: url is required (no default endpoint for this provider)`); diff --git a/src/mcpd/tests/chat-service.test.ts b/src/mcpd/tests/chat-service.test.ts index 3eb205a..11bdb07 100644 --- a/src/mcpd/tests/chat-service.test.ts +++ b/src/mcpd/tests/chat-service.test.ts @@ -461,6 +461,121 @@ describe('ChatService', () => { expect(assistantTurn?.content).not.toContain('Let me think'); }); + // Regression: thinking models with a tight max_tokens budget produce + // `reasoning_content` only and leave `content` null. Without falling back + // to reasoning, the assistant turn was empty and the smoke test saw an + // empty stdout. This covers BOTH chat() (non-streaming) and chatStream() + // (synthetic final text frame so the CLI's stdout matches what's + // persisted to the thread). + it('chat falls back to reasoning_content when content is null', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-truncated', + infer: vi.fn(async () => ({ + status: 200, + body: { + id: 'cmpl-1', + object: 'chat.completion', + choices: [{ + index: 0, + message: { role: 'assistant', content: null, reasoning_content: 'Thinking out loud about the answer' }, + finish_reason: 'stop', + }], + }, + })), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('Thinking out loud about the answer'); + const stored = chatRepo._msgs.find((m) => m.role === 'assistant'); + expect(stored?.content).toBe('Thinking out loud about the answer'); + }); + + it('chat appends [response truncated by max_tokens] when finish_reason is "length"', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-clipped', + infer: vi.fn(async () => ({ + status: 200, + body: { + choices: [{ + index: 0, + message: { role: 'assistant', content: null, reasoning_content: 'partial reasoning that ran out of' }, + finish_reason: 'length', + }], + }, + })), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toContain('partial reasoning that ran out of'); + expect(result.assistant).toContain('[response truncated by max_tokens]'); + }); + + it('chat prefers content when both content and reasoning_content are present', async () => { + // Thinking models that DO produce content shouldn't see the reasoning + // bleed into the response — that's what the streaming path's + // text/thinking split is for, and the non-streaming path should match. + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-with-content', + infer: vi.fn(async () => ({ + status: 200, + body: { + choices: [{ + index: 0, + message: { role: 'assistant', content: 'real answer', reasoning_content: 'background thinking' }, + finish_reason: 'stop', + }], + }, + })), + stream: async function*() { yield { data: '[DONE]', done: true }; }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('real answer'); + expect(result.assistant).not.toContain('background thinking'); + }); + + it('chatStream emits a synthetic text frame and persists reasoning when content is empty', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'thinking-only-stream', + infer: vi.fn(), + stream: async function*() { + yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'thinking ' }, finish_reason: null }] }) }; + yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'more.' }, finish_reason: 'stop' }] }) }; + yield { data: '[DONE]', done: true }; + }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + const chunks: Array<{ type: string; delta?: string }> = []; + for await (const c of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) { + chunks.push({ type: c.type, delta: c.delta }); + } + // 2 thinking deltas (live), 1 synthesized text frame, 1 final. + expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta)).toEqual(['thinking ', 'more.']); + expect(chunks.filter((c) => c.type === 'text').map((c) => c.delta)).toEqual(['thinking more.']); + // The thread message captures the synthesized text so resumed chats see + // a coherent assistant turn (rather than blank). + const stored = chatRepo._msgs.find((m) => m.role === 'assistant'); + expect(stored?.content).toBe('thinking more.'); + }); + // Regression: provider_specific_fields.reasoning_content shape (LiteLLM // passthrough from vLLM) is also recognized. it('chatStream recognizes LiteLLM provider_specific_fields.reasoning_content', async () => { diff --git a/src/mcpd/tests/llm-adapters.test.ts b/src/mcpd/tests/llm-adapters.test.ts index 045ac88..5b16f18 100644 --- a/src/mcpd/tests/llm-adapters.test.ts +++ b/src/mcpd/tests/llm-adapters.test.ts @@ -71,6 +71,36 @@ describe('OpenAiPassthroughAdapter', () => { await expect(adapter.infer(makeCtx())).rejects.toThrow(/no default endpoint/); }); + it('infer: strips a trailing /v1 from the configured URL', async () => { + // Users naturally paste the OpenAI-style base URL with /v1 because + // every provider documents it that way (https://api.openai.com/v1, + // https://llm.example.com/v1). The adapter then re-appends + // /v1/chat/completions; without normalization this would produce a + // doubled-/v1 404 against LiteLLM and friends. + const fetchFn = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]); + const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch }); + await adapter.infer(makeCtx({ url: 'https://llm.example.com/v1' })); + const [url1] = fetchFn.mock.calls[0] as [string]; + expect(url1).toBe('https://llm.example.com/v1/chat/completions'); + + // Trailing slash + /v1 should also normalize correctly. + const fetchFn2 = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]); + const adapter2 = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn2 as unknown as typeof fetch }); + await adapter2.infer(makeCtx({ url: 'https://llm.example.com/v1/' })); + const [url2] = fetchFn2.mock.calls[0] as [string]; + expect(url2).toBe('https://llm.example.com/v1/chat/completions'); + }); + + it('infer: preserves a trailing /v1beta suffix (only exact /v1 is stripped)', async () => { + // Some providers expose `/v1beta` as a parallel API surface — don't + // accidentally rewrite that to `/v1` or strip it. + const fetchFn = mockFetch([{ match: /\/v1beta\/v1\/chat\/completions$/, status: 200, body: {} }]); + const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch }); + await adapter.infer(makeCtx({ url: 'https://api.example.com/v1beta' })); + const [url] = fetchFn.mock.calls[0] as [string]; + expect(url).toBe('https://api.example.com/v1beta/v1/chat/completions'); + }); + it('infer: omits Authorization when apiKey is empty', async () => { const fetchFn = mockFetch([{ match: /ollama/, status: 200, body: {} }]); const adapter = new OpenAiPassthroughAdapter('ollama', { fetch: fetchFn as unknown as typeof fetch }); diff --git a/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts b/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts index d96b03b..e445511 100644 --- a/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts +++ b/src/mcplocal/tests/smoke/agent-chat.smoke.test.ts @@ -17,7 +17,7 @@ import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import http from 'node:http'; import https from 'node:https'; -import { execSync } from 'node:child_process'; +import { spawnSync, execSync } from 'node:child_process'; const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; const LLM_URL = process.env.MCPCTL_SMOKE_LLM_URL; @@ -31,21 +31,37 @@ const AGENT_NAME = `smoke-chat-agent-${SUFFIX}`; interface CliResult { code: number; stdout: string; stderr: string } function run(args: string): CliResult { - try { - const stdout = execSync(`mcpctl --direct ${args}`, { - encoding: 'utf-8', - timeout: 60_000, - stdio: ['ignore', 'pipe', 'pipe'], - }); - return { code: 0, stdout: stdout.trim(), stderr: '' }; - } catch (err) { - const e = err as { status?: number; stdout?: Buffer | string; stderr?: Buffer | string }; - return { - code: e.status ?? 1, - stdout: e.stdout ? (typeof e.stdout === 'string' ? e.stdout : e.stdout.toString('utf-8')) : '', - stderr: e.stderr ? (typeof e.stderr === 'string' ? e.stderr : e.stderr.toString('utf-8')) : '', - }; + // spawnSync (not execSync) — execSync returns only stdout on success and + // discards stderr, which made any `thread:` assertion against a successful + // chat impossible to evaluate. Splitting the args correctly handles the + // few existing call sites that quote-wrap multi-word values like + // `--system-prompt "You are..."`. + const argv = splitArgs(args); + const res = spawnSync('mcpctl', ['--direct', ...argv], { + encoding: 'utf-8', + timeout: 60_000, + }); + return { + code: res.status ?? 1, + stdout: (res.stdout ?? '').trim(), + stderr: (res.stderr ?? '').trim(), + }; +} + +/** + * Tokenize a shell-style argv string with simple double-quote support — just + * enough for the smoke test's call shapes. Not a full POSIX parser; we only + * need to keep `--system-prompt "You are a smoke test..."` together as one + * arg. + */ +function splitArgs(s: string): string[] { + const out: string[] = []; + const re = /"([^"]*)"|(\S+)/g; + let m: RegExpExecArray | null; + while ((m = re.exec(s)) !== null) { + out.push(m[1] !== undefined ? m[1] : (m[2] ?? '')); } + return out; } function healthz(url: string, timeoutMs = 5000): Promise { -- 2.49.1 From 1998b733b2e2312b9545df5a6bce237c2e25d1ce Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 27 Apr 2026 18:47:03 +0100 Subject: [PATCH 5/5] feat(cli+docs): mcpctl get agent KIND/STATUS columns + virtual-agent smoke + docs (v3 Stage 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CLI: `mcpctl get agent` table view gains KIND and STATUS columns mirroring the `get llm` shape from v1. Public agents render as `public/active` (the AgentRow defaults) and virtual ones surface their true lifecycle state, so `mcpctl get agent` becomes a single-pane view for both manually-created and mcplocal-published personas. Smoke: tests/smoke/virtual-agent.smoke.test.ts mirrors virtual-llm's in-process registrar pattern — publishes a fake provider + agent in one round-trip, confirms mcpd surfaces the agent kind=virtual / status=active under /api/v1/agents, then disconnects and verifies the paired Llm-and-Agent both flip to inactive (deletion is GC-driven, not disconnect-driven, so the rows must still exist post-stop). Heartbeat- stale and 4 h sweep paths are covered by the unit suite to keep smoke duration in check. Docs: docs/virtual-llms.md gets a "Virtual agents (v3)" section with a config sample, lifecycle notes, listing example, and the cluster-wide name-uniqueness caveat. The API surface block now mentions the new `agents[]` field on _provider-register, the join-by-session heartbeat behavior, and the `GET /api/v1/agents` lifecycle fields. docs/agents.md gains a one-paragraph note pointing to the v3 publishing path. Tests: full smoke suite 141/141 (was 139, +2 new), unit suites unchanged (mcpd 860/860, mcplocal 723/723). --- docs/agents.md | 6 +- docs/virtual-llms.md | 92 +++++++- src/cli/src/commands/get.ts | 7 + .../tests/smoke/virtual-agent.smoke.test.ts | 215 ++++++++++++++++++ 4 files changed, 314 insertions(+), 6 deletions(-) create mode 100644 src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts diff --git a/docs/agents.md b/docs/agents.md index b14688f..462dda2 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -204,5 +204,9 @@ mcpctl chat reviewer - [virtual-llms.md](./virtual-llms.md) — local LLMs (e.g. `vllm-local`) publishing themselves into `mcpctl get llm` so anyone can chat with them via `mcpctl chat-llm `. Inference is relayed through the - publishing mcplocal — mcpd never holds the local URL or key. + publishing mcplocal — mcpd never holds the local URL or key. **v3** + extends the same publishing model to **virtual agents** declared in + mcplocal config — they show up in `mcpctl get agent` with + `KIND=virtual / STATUS=active` and become chat-able via + `mcpctl chat ` like any other agent. - [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags. diff --git a/docs/virtual-llms.md b/docs/virtual-llms.md index 0a5629c..25dc089 100644 --- a/docs/virtual-llms.md +++ b/docs/virtual-llms.md @@ -199,10 +199,87 @@ provider doesn't come up within `maxWaitSeconds`), every queued infer is rejected with a clear error and the row stays `hibernating` — the next request gets a fresh wake attempt. +## Virtual agents (v3) + +Virtual agents extend the same publishing model to **agents** — named +LLM personas with their own system prompt and sampling defaults. mcplocal +declares them in its config alongside its providers, and the existing +`_provider-register` endpoint atomically publishes both Llms and Agents +in one round-trip. They show up under `mcpctl get agent` next to +manually-created public agents and become chat-able via +`mcpctl chat ` — no special command. + +### Declaring a virtual agent in mcplocal config + +```jsonc +// ~/.mcpctl/config.json +{ + "llm": { + "providers": [ + { "name": "vllm-local", "type": "vllm", "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", "publish": true } + ] + }, + "agents": [ + { + "name": "local-coder", + "llm": "vllm-local", + "description": "Local coding assistant on the workstation GPU", + "systemPrompt": "You are a senior engineer. Be terse.", + "defaultParams": { "temperature": 0.2 } + } + ] +} +``` + +`llm` references a published provider's name from the same config. Agents +pinned to a name that isn't being published are still forwarded to mcpd — +the server validates `llmName` and 404s with a clear message if it's +genuinely missing, which lets you point at a *public* Llm if you want. + +### Lifecycle + +Same shape as virtual Llms — 30 s heartbeat from mcplocal, 90 s +heartbeat-stale → status flips to `inactive`, 4 h inactive → row deleted +by mcpd's GC sweep. Heartbeats cover both Llms and Agents owned by the +session. + +The GC orders agent deletes **before** their pinned virtual Llm so the +`Agent.llmId onDelete: Restrict` FK doesn't block the sweep. + +### Listing + +```sh +$ mcpctl get agents +NAME KIND STATUS LLM PROJECT DESCRIPTION +local-coder virtual active vllm-local - Local coding assistant on… +reviewer public active qwen3-thinking mcpctl-development I review what you're shipping… +``` + +The `KIND` and `STATUS` columns are the v3 additions. Round-tripping +through `mcpctl get agent X -o yaml | mcpctl apply -f -` strips those +runtime fields cleanly so a virtual agent can be re-declared as a public +one (or vice versa) without manual editing. + +### Chatting + +```sh +$ mcpctl chat local-coder +> hello? +… streams through mcpd → SSE → mcplocal's vllm-local provider … +``` + +Same command as for public agents. Works because chat.service has a +`kind=virtual` branch that hands off to `VirtualLlmService.enqueueInferTask` +when the agent's pinned Llm is virtual. + +### Cluster-wide name uniqueness + +`Agent.name` is unique cluster-wide. Two mcplocals trying to publish the +same agent name collide on the second register with HTTP 409. Per-publisher +namespacing is a v4+ concern — same constraint as virtual Llms in v1. + ## Roadmap (later stages) -- **v3 — Virtual agents**: mcplocal publishes its local agent configs - (model + system prompt + sampling defaults) into mcpd's `Agent` table. - **v4 — LB pool by model**: agents can target a model name instead of a specific Llm; mcpd picks the healthiest pool member per request. - **v5 — Task queue**: persisted requests for hibernating/saturated @@ -211,18 +288,23 @@ the next request gets a fresh wake attempt. ## API surface (v1) ``` -POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[] } +POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[], agents[] } + v3: body accepts an optional `agents[]` array + alongside `providers[]`. Atomic publish; older + clients (providers-only) keep working. GET /api/v1/llms/_provider-stream → SSE channel; require x-mcpctl-provider-session header -POST /api/v1/llms/_provider-heartbeat → { providerSessionId } +POST /api/v1/llms/_provider-heartbeat → { providerSessionId } — bumps both Llms and Agents + owned by the session POST /api/v1/llms/_provider-task/:id/result → one of: { error: "msg" } { chunk: { data, done? } } { status, body } -GET /api/v1/llms → list (now includes kind, status, lastHeartbeatAt, inactiveSince) +GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince) POST /api/v1/llms//infer → routes through the SSE relay DELETE /api/v1/llms/ → delete unconditionally (also runs GC's job) +GET /api/v1/agents → list (v3: includes kind, status, lastHeartbeatAt, inactiveSince) ``` RBAC piggybacks on `view/edit/create:llms` — no new resource. Publishing diff --git a/src/cli/src/commands/get.ts b/src/cli/src/commands/get.ts index 78583f0..f3e07ac 100644 --- a/src/cli/src/commands/get.ts +++ b/src/cli/src/commands/get.ts @@ -155,10 +155,17 @@ interface AgentRow { description: string; llm: { id: string; name: string }; project: { id: string; name: string } | null; + // v3: lifecycle fields. Public agents have kind=public/status=active and + // these never change — virtuals get them set/updated by mcpd's + // AgentService as the publishing mcplocal heartbeats and disconnects. + kind?: 'public' | 'virtual'; + status?: 'active' | 'inactive'; } const agentColumns: Column[] = [ { header: 'NAME', key: 'name' }, + { header: 'KIND', key: (r) => r.kind ?? 'public', width: 8 }, + { header: 'STATUS', key: (r) => r.status ?? 'active', width: 10 }, { header: 'LLM', key: (r) => r.llm.name, width: 24 }, { header: 'PROJECT', key: (r) => r.project?.name ?? '-', width: 20 }, { header: 'DESCRIPTION', key: (r) => truncate(r.description, 50) || '-', width: 50 }, diff --git a/src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts b/src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts new file mode 100644 index 0000000..2d5726b --- /dev/null +++ b/src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts @@ -0,0 +1,215 @@ +/** + * Smoke tests: v3 virtual agents — register a virtual Llm + a virtual + * Agent through the same `_provider-register` payload, then verify mcpd + * surfaces the agent as kind=virtual / status=active. Mirrors + * virtual-llm.smoke.test.ts's in-process registrar pattern so we don't + * need to mutate ~/.mcpctl/config.json or bounce systemd's mcplocal. + * + * Heartbeat-stale → inactive (90 s) and 4 h auto-deletion are covered by + * the unit suite (mcpd virtual-agent-service.test.ts); waiting > 90 s in + * smoke would balloon the suite duration. + */ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import http from 'node:http'; +import https from 'node:https'; +import { mkdtempSync, rmSync, readFileSync, existsSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + VirtualLlmRegistrar, + type RegistrarPublishedProvider, + type RegistrarPublishedAgent, +} from '../../src/providers/registrar.js'; +import type { LlmProvider, CompletionResult } from '../../src/providers/types.js'; + +const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu'; +const SUFFIX = Date.now().toString(36); +const PROVIDER_NAME = `smoke-vagent-llm-${SUFFIX}`; +const AGENT_NAME = `smoke-vagent-${SUFFIX}`; + +function makeFakeProvider(name: string, content: string): LlmProvider { + return { + name, + async complete(): Promise { + return { + content, + toolCalls: [], + usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 }, + finishReason: 'stop', + }; + }, + async listModels() { return []; }, + async isAvailable() { return true; }, + }; +} + +function healthz(url: string, timeoutMs = 5000): Promise { + return new Promise((resolve) => { + const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`); + const driver = parsed.protocol === 'https:' ? https : http; + const req = driver.get({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname, + timeout: timeoutMs, + }, (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); }); + req.on('error', () => resolve(false)); + req.on('timeout', () => { req.destroy(); resolve(false); }); + }); +} + +function readToken(): string | null { + try { + const path = join(process.env.HOME ?? '', '.mcpctl', 'credentials'); + if (!existsSync(path)) return null; + const parsed = JSON.parse(readFileSync(path, 'utf-8')) as { token?: string }; + return parsed.token ?? null; + } catch { + return null; + } +} + +interface HttpResponse { status: number; body: string } + +function httpRequest(method: string, urlStr: string, body: unknown): Promise { + return new Promise((resolve, reject) => { + const tokenRaw = readToken(); + const parsed = new URL(urlStr); + const driver = parsed.protocol === 'https:' ? https : http; + const headers: Record = { + Accept: 'application/json', + ...(body !== undefined ? { 'Content-Type': 'application/json' } : {}), + ...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}), + }; + const req = driver.request({ + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), + path: parsed.pathname + parsed.search, + method, + headers, + timeout: 30_000, + }, (res) => { + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => { + resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') }); + }); + }); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); }); + if (body !== undefined) req.write(JSON.stringify(body)); + req.end(); + }); +} + +interface AgentRow { id: string; name: string; kind?: string; status?: string; llm?: { name: string }; description?: string } + +let mcpdUp = false; +let registrar: VirtualLlmRegistrar | null = null; +let tempDir: string; + +describe('virtual-agent smoke (v3)', () => { + beforeAll(async () => { + mcpdUp = await healthz(MCPD_URL); + if (!mcpdUp) { + // eslint-disable-next-line no-console + console.warn(`\n ○ virtual-agent smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`); + return; + } + if (readToken() === null) { + mcpdUp = false; + // eslint-disable-next-line no-console + console.warn('\n ○ virtual-agent smoke: skipped — no ~/.mcpctl/credentials.\n'); + return; + } + tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-virtual-agent-smoke-')); + }, 20_000); + + afterAll(async () => { + if (registrar !== null) registrar.stop(); + if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true }); + // Defensive cleanup: agent first (Llm.id has Restrict FK), then Llm. + if (mcpdUp) { + const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + if (agents.status === 200) { + const rows = JSON.parse(agents.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === AGENT_NAME); + if (row !== undefined) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/agents/${row.id}`, undefined); + } + } + const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + if (llms.status === 200) { + const rows = JSON.parse(llms.body) as Array<{ id: string; name: string }>; + const row = rows.find((r) => r.name === PROVIDER_NAME); + if (row !== undefined) { + await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined); + } + } + } + }); + + it('registrar publishes provider + agent in one round-trip and mcpd lists the agent kind=virtual / status=active', async () => { + if (!mcpdUp) return; + const token = readToken(); + if (token === null) return; + + const published: RegistrarPublishedProvider[] = [ + { provider: makeFakeProvider(PROVIDER_NAME, 'hi from virtual agent'), type: 'openai', model: 'fake-vagent', tier: 'fast' }, + ]; + const publishedAgents: RegistrarPublishedAgent[] = [ + { + name: AGENT_NAME, + llmName: PROVIDER_NAME, + description: 'v3 virtual agent smoke', + systemPrompt: 'You are a smoke test. Reply READY.', + defaultParams: { temperature: 0 }, + }, + ]; + registrar = new VirtualLlmRegistrar({ + mcpdUrl: MCPD_URL, + token, + publishedProviders: published, + publishedAgents, + sessionFilePath: join(tempDir, 'session'), + log: { info: () => {}, warn: () => {}, error: () => {} }, + heartbeatIntervalMs: 60_000, + }); + await registrar.start(); + expect(registrar.getSessionId()).not.toBeNull(); + // Give the SSE handshake + atomic register a moment to settle. + await new Promise((r) => setTimeout(r, 400)); + + const res = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + expect(res.status).toBe(200); + const rows = JSON.parse(res.body) as AgentRow[]; + const row = rows.find((r) => r.name === AGENT_NAME); + expect(row, `${AGENT_NAME} must be present`).toBeDefined(); + expect(row!.kind).toBe('virtual'); + expect(row!.status).toBe('active'); + expect(row!.llm?.name).toBe(PROVIDER_NAME); + expect(row!.description).toBe('v3 virtual agent smoke'); + }, 30_000); + + it('publisher disconnect flips the agent to status=inactive (paired with its Llm)', async () => { + if (!mcpdUp) return; + if (registrar !== null) { + registrar.stop(); + registrar = null; + } + // unbindSession runs synchronously on the SSE close handler; mcpd + // flips both the Llm and any agents owned by the session to + // inactive. A short wait covers the request round-trip. + await new Promise((r) => setTimeout(r, 400)); + + const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined); + expect(agents.status).toBe(200); + const agentRow = (JSON.parse(agents.body) as AgentRow[]).find((r) => r.name === AGENT_NAME); + expect(agentRow, `${AGENT_NAME} must still exist (deletion is GC-driven, not disconnect-driven)`).toBeDefined(); + expect(agentRow!.status).toBe('inactive'); + + const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined); + const llmRow = (JSON.parse(llms.body) as Array<{ name: string; status: string }>).find((r) => r.name === PROVIDER_NAME); + expect(llmRow!.status).toBe('inactive'); + }, 30_000); +}); -- 2.49.1