diff --git a/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql b/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql new file mode 100644 index 0000000..5cec191 --- /dev/null +++ b/src/db/prisma/migrations/20260427154803_add_virtual_agent_lifecycle/migration.sql @@ -0,0 +1,14 @@ +-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the +-- existing LlmKind / LlmStatus enums so we don't double-define them. +-- Existing rows backfill with kind='public' / status='active' so +-- nothing changes for manually-created agents. + +ALTER TABLE "Agent" + ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public', + ADD COLUMN "providerSessionId" TEXT, + ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3), + ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active', + ADD COLUMN "inactiveSince" TIMESTAMP(3); + +CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status"); +CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId"); diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index dfc83e5..8b64ad8 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -469,20 +469,26 @@ model BackupPending { // Per-call LiteLLM-style overrides stack on top of `defaultParams`. model Agent { - id String @id @default(cuid()) - name String @unique - description String @default("") // shown in MCP tools/list - systemPrompt String @default("") @db.Text // agent persona + id String @id @default(cuid()) + name String @unique + description String @default("") // shown in MCP tools/list + systemPrompt String @default("") @db.Text // agent persona llmId String projectId String? defaultPersonalityId String? // applied at chat time when no --personality flag proxyModelName String? // optional informational override - defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ... - extras Json @default("{}") // future LoRA / tool-allowlist + defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ... + extras Json @default("{}") // future LoRA / tool-allowlist + // ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ── + kind LlmKind @default(public) + providerSessionId String? // mcplocal session that owns this row when virtual + lastHeartbeatAt DateTime? + status LlmStatus @default(active) + inactiveSince DateTime? ownerId String - version Int @default(1) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + version Int @default(1) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt llm Llm @relation(fields: [llmId], references: [id], onDelete: Restrict) project Project? @relation(fields: [projectId], references: [id], onDelete: SetNull) @@ -497,6 +503,8 @@ model Agent { @@index([projectId]) @@index([ownerId]) @@index([defaultPersonalityId]) + @@index([kind, status]) + @@index([providerSessionId]) } // ── Personalities (named overlay bundles of prompts on top of an Agent) ── diff --git a/src/db/tests/agent-schema.test.ts b/src/db/tests/agent-schema.test.ts index 4fa3b0c..4c56dfb 100644 --- a/src/db/tests/agent-schema.test.ts +++ b/src/db/tests/agent-schema.test.ts @@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => { expect(reloaded?.defaultPersonalityId).toBeNull(); }); + // ── v3: Agent.kind virtual + lifecycle fields ── + + it('defaults a freshly inserted Agent to kind=public, status=active', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-default-kind'); + const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id }); + expect(agent.kind).toBe('public'); + expect(agent.status).toBe('active'); + expect(agent.providerSessionId).toBeNull(); + expect(agent.lastHeartbeatAt).toBeNull(); + expect(agent.inactiveSince).toBeNull(); + }); + + it('persists kind=virtual + lifecycle fields together', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-pub-virtual'); + const now = new Date(); + const agent = await prisma.agent.create({ + data: { + name: 'local-coder', + llmId: llm.id, + ownerId: user.id, + kind: 'virtual', + providerSessionId: 'sess-abc', + lastHeartbeatAt: now, + status: 'active', + }, + }); + expect(agent.kind).toBe('virtual'); + expect(agent.providerSessionId).toBe('sess-abc'); + expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime()); + }); + + it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-gc-agent'); + await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } }); + await prisma.agent.create({ + data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' }, + }); + await prisma.agent.create({ + data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() }, + }); + + const stale = await prisma.agent.findMany({ + where: { kind: 'virtual', status: 'inactive' }, + select: { name: true }, + }); + expect(stale.map((a) => a.name)).toEqual(['v-inactive']); + }); + + it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => { + const user = await makeUser(); + const llm = await makeLlm('llm-sess-cascade'); + await prisma.agent.create({ + data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.agent.create({ + data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' }, + }); + await prisma.agent.create({ + data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' }, + }); + + const owned = await prisma.agent.findMany({ + where: { providerSessionId: 'shared' }, + select: { name: true }, + orderBy: { name: 'asc' }, + }); + expect(owned.map((a) => a.name)).toEqual(['a', 'b']); + }); + it('binds the same prompt to multiple personalities of an agent', async () => { const user = await makeUser(); const llm = await makeLlm('llm-shared-prompt'); diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 03ce6fd..9814b74 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -607,6 +607,7 @@ async function main(): Promise { promptRepo, chatToolDispatcher, personalityRepo, + virtualLlmService, ); registerAgentChatRoutes(app, chatService); registerLlmInferRoutes(app, { diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index fc375b7..087ea11 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -31,6 +31,7 @@ import type { } from '../repositories/chat.repository.js'; import type { IPromptRepository } from '../repositories/prompt.repository.js'; import type { IPersonalityRepository } from '../repositories/personality.repository.js'; +import type { IVirtualLlmService } from './virtual-llm.service.js'; import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js'; import type { AgentChatParams } from '../validation/agent.schema.js'; import { NotFoundError } from './mcp-server.service.js'; @@ -132,6 +133,14 @@ export class ChatService { private readonly promptRepo: IPromptRepository, private readonly tools: ChatToolDispatcher, private readonly personalities?: IPersonalityRepository, + /** + * v3: when an Agent is pinned to a `kind=virtual` Llm, inference is + * relayed via this service rather than an HTTP adapter (the virtual + * row has no upstream URL). Optional so older test wirings still + * compile; in those tests the chat path will refuse virtual Llms + * with a clear error. + */ + private readonly virtualLlms?: IVirtualLlmService, ) {} async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> { @@ -170,14 +179,7 @@ export class ChatService { let lastTurnIndex = ctx.startingTurnIndex; try { for (let i = 0; i < ctx.maxIterations; i += 1) { - const adapter = this.adapters.get(ctx.llmType); - const result = await adapter.infer({ - body: this.buildBody(ctx), - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, - }); + const result = await this.runOneInference(ctx); const choice = extractChoice(result.body); if (choice === null) { throw new Error(`Adapter returned no choice (status ${String(result.status)})`); @@ -240,19 +242,12 @@ export class ChatService { const ctx = await this.prepareContext(args); try { for (let i = 0; i < ctx.maxIterations; i += 1) { - const adapter = this.adapters.get(ctx.llmType); const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = { content: '', toolCalls: [], }; let finishReason: string | null = null; - for await (const chunk of adapter.stream({ - body: { ...this.buildBody(ctx), stream: true }, - modelOverride: ctx.modelOverride, - apiKey: ctx.apiKey, - url: ctx.url, - extraConfig: ctx.extraConfig, - })) { + for await (const chunk of this.streamInference(ctx)) { if (chunk.done === true) break; if (chunk.data === '[DONE]') break; const evt = parseStreamingChunk(chunk.data); @@ -347,12 +342,130 @@ export class ChatService { } } + /** + * Streaming counterpart of runOneInference. Yields raw OpenAI-style + * SSE chunks ({data: string; done?: boolean}) regardless of whether + * we're hitting a public adapter or relaying through VirtualLlmService. + * The caller's `parseStreamingChunk` already speaks OpenAI shape, so + * downstream code doesn't need to know which path produced the chunks. + */ + private async *streamInference(ctx: { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }): AsyncGenerator<{ data: string; done?: boolean }> { + if (ctx.llmKind !== 'virtual') { + const adapter = this.adapters.get(ctx.llmType); + yield* adapter.stream({ + body: { ...this.buildBody(ctx), stream: true }, + modelOverride: ctx.modelOverride, + apiKey: ctx.apiKey, + url: ctx.url, + extraConfig: ctx.extraConfig, + }); + return; + } + if (this.virtualLlms === undefined) { + throw new Error( + 'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm', + ); + } + // Bridge VirtualLlmService's onChunk callback API to an async + // iterator. Chunks land on the queue from the SSE relay; the + // generator drains them in order. ref.done resolves when the + // publisher emits its `[DONE]` marker. + const ref = await this.virtualLlms.enqueueInferTask( + ctx.llmName, + { ...this.buildBody(ctx), stream: true }, + true, + ); + const queue: Array<{ data: string; done?: boolean }> = []; + let resolveTick: (() => void) | null = null; + const wake = (): void => { + const r = resolveTick; + resolveTick = null; + if (r !== null) r(); + }; + const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); }); + let finished = false; + let failure: Error | null = null; + ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); }); + + try { + while (true) { + while (queue.length > 0) { + const c = queue.shift()!; + yield c; + if (c.done === true) return; + } + if (finished) { + if (failure !== null) throw failure; + return; + } + await new Promise((r) => { resolveTick = r; }); + } + } finally { + unsubscribe(); + } + } + + /** + * Run a single non-streaming inference iteration. Branches on + * ctx.llmKind: public goes through the existing adapter registry, + * virtual relays through VirtualLlmService.enqueueInferTask (mirrors + * the same branch in `routes/llm-infer.ts` from v1 Stage 3). + * + * Throws when virtualLlms isn't wired but the row is virtual — older + * test wirings hit this path. + */ + private async runOneInference(ctx: { + llmName: string; + llmType: string; + llmKind: 'public' | 'virtual'; + modelOverride: string; + url: string; + apiKey: string; + extraConfig: Record; + history: OpenAiMessage[]; + systemBlock: string; + toolList: ChatTool[]; + mergedParams: AgentChatParams; + }): Promise<{ status: number; body: unknown }> { + if (ctx.llmKind === 'virtual') { + if (this.virtualLlms === undefined) { + throw new Error( + 'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm', + ); + } + const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false); + return ref.done; + } + const adapter = this.adapters.get(ctx.llmType); + return adapter.infer({ + body: this.buildBody(ctx), + modelOverride: ctx.modelOverride, + apiKey: ctx.apiKey, + url: ctx.url, + extraConfig: ctx.extraConfig, + }); + } + private async prepareContext(args: ChatRequestArgs): Promise<{ threadId: string; history: OpenAiMessage[]; systemBlock: string; llmName: string; llmType: string; + /** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */ + llmKind: 'public' | 'virtual'; modelOverride: string; url: string; apiKey: string; @@ -435,6 +548,7 @@ export class ChatService { systemBlock, llmName: llm.name, llmType: llm.type, + llmKind: llm.kind, modelOverride: llm.model, url: llm.url, apiKey, diff --git a/src/mcpd/tests/chat-service-virtual-llm.test.ts b/src/mcpd/tests/chat-service-virtual-llm.test.ts new file mode 100644 index 0000000..44f44d2 --- /dev/null +++ b/src/mcpd/tests/chat-service-virtual-llm.test.ts @@ -0,0 +1,251 @@ +import { describe, it, expect, vi } from 'vitest'; +import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js'; +import type { AgentService } from '../src/services/agent.service.js'; +import type { LlmService } from '../src/services/llm.service.js'; +import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js'; +import type { IChatRepository } from '../src/repositories/chat.repository.js'; +import type { IPromptRepository } from '../src/repositories/prompt.repository.js'; +import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js'; +import type { ChatMessage, ChatThread, Prompt } from '@prisma/client'; + +const NOW = new Date(); + +/** + * Tests targeting v3 Stage 1's chat.service kind=virtual branch. + * Mirror the existing chat-service.test.ts patterns but isolate the + * adapter-vs-relay dispatch decision. + */ + +function mockChatRepo(): IChatRepository { + const msgs: ChatMessage[] = []; + const threads: ChatThread[] = []; + let idCounter = 1; + return { + createThread: vi.fn(async ({ agentId, ownerId, title }) => { + const t: ChatThread = { + id: `thread-${String(idCounter++)}`, agentId, ownerId, + title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW, + }; + threads.push(t); + return t; + }), + findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null), + listThreadsByAgent: vi.fn(async () => []), + listMessages: vi.fn(async () => []), + appendMessage: vi.fn(async (input) => { + const m: ChatMessage = { + id: `msg-${String(idCounter++)}`, + threadId: input.threadId, + turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length, + role: input.role, + content: input.content, + toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'], + toolCallId: input.toolCallId ?? null, + status: input.status ?? 'complete', + createdAt: NOW, + }; + msgs.push(m); + return m; + }), + updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)), + markPendingAsError: vi.fn(async () => 0), + touchThread: vi.fn(async () => undefined), + nextTurnIndex: vi.fn(async () => msgs.length), + }; +} + +function mockAgents(): AgentService { + return { + getByName: vi.fn(async (name: string) => ({ + id: `agent-${name}`, name, description: '', + systemPrompt: 'You are a helpful agent.', + llm: { id: 'llm-1', name: 'vllm-local' }, + project: null, + defaultPersonality: null, + proxyModelName: null, + defaultParams: {}, + extras: {}, + ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW, + })), + } as unknown as AgentService; +} + +function mockLlmsVirtual(): LlmService { + return { + getByName: vi.fn(async (name: string) => ({ + id: 'llm-1', name, type: 'openai', model: 'fake', + url: '', tier: 'fast', description: '', + apiKeyRef: null, extraConfig: {}, + kind: 'virtual', + status: 'active', + lastHeartbeatAt: NOW, + inactiveSince: null, + version: 1, createdAt: NOW, updatedAt: NOW, + })), + resolveApiKey: vi.fn(async () => ''), + } as unknown as LlmService; +} + +function mockPromptRepo(): IPromptRepository { + return { + findAll: vi.fn(async () => []), + findGlobal: vi.fn(async () => []), + findByAgent: vi.fn(async () => []), + findById: vi.fn(async () => null), + findByNameAndProject: vi.fn(async () => null), + findByNameAndAgent: vi.fn(async () => null), + create: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + }; +} + +function mockTools(): ChatToolDispatcher { + return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) }; +} + +function emptyAdapterRegistry(): LlmAdapterRegistry { + return { + get: () => { throw new Error('adapter should not be used for kind=virtual'); }, + } as unknown as LlmAdapterRegistry; +} + +function mockVirtualLlms(opts: { + reply?: string; + rejectWith?: Error; + streamingChunks?: string[]; +}): IVirtualLlmService { + const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => { + if (opts.rejectWith !== undefined) { + return { + taskId: 't-1', + done: Promise.reject(opts.rejectWith), + onChunk: () => () => undefined, + }; + } + if (!streaming) { + const body = { + choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }], + }; + return { + taskId: 't-1', + done: Promise.resolve({ status: 200, body }), + onChunk: () => () => undefined, + }; + } + // Streaming path: collect subscribers, push the configured chunks + // synchronously, then resolve done. + const subs = new Set<(c: { data: string; done?: boolean }) => void>(); + const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}']; + return { + taskId: 't-1', + done: (async (): Promise<{ status: number; body: unknown }> => { + // Wait long enough for the caller to register subscribers + // before fanning chunks. Promise.resolve() isn't enough — the + // microtask running this IIFE is queued ahead of the caller's + // await on enqueueInferTask, so subs would still be empty. + await new Promise((r) => setTimeout(r, 0)); + for (const c of chunks) for (const s of subs) s({ data: c }); + for (const s of subs) s({ data: '[DONE]', done: true }); + return { status: 200, body: null }; + })(), + onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); }, + }; + }); + return { + register: vi.fn(), + heartbeat: vi.fn(), + bindSession: vi.fn(), + unbindSession: vi.fn(), + enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'], + completeTask: vi.fn(), + pushTaskChunk: vi.fn(), + failTask: vi.fn(), + gcSweep: vi.fn(), + }; +} + +describe('ChatService — kind=virtual branch (v3 Stage 1)', () => { + it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => { + const chatRepo = mockChatRepo(); + const virtual = mockVirtualLlms({ reply: 'hello back from local' }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + chatRepo, + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }); + expect(result.assistant).toBe('hello back from local'); + expect(virtual.enqueueInferTask).toHaveBeenCalledWith( + 'vllm-local', + expect.objectContaining({ messages: expect.any(Array) }), + false, + ); + }); + + it('streaming relays through VirtualLlmService and emits the same text deltas', async () => { + const chatRepo = mockChatRepo(); + const virtual = mockVirtualLlms({ + streamingChunks: [ + '{"choices":[{"delta":{"content":"hello "}}]}', + '{"choices":[{"delta":{"content":"world"}}]}', + ], + }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + chatRepo, + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + const deltas: string[] = []; + for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) { + if (evt.type === 'text') deltas.push(evt.delta); + if (evt.type === 'final') break; + } + expect(deltas.join('')).toBe('hello world'); + expect(virtual.enqueueInferTask).toHaveBeenCalledWith( + 'vllm-local', + expect.objectContaining({ messages: expect.any(Array), stream: true }), + true, + ); + }); + + it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => { + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + mockChatRepo(), + mockPromptRepo(), + mockTools(), + // no personalities, no virtualLlms + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/virtualLlms dispatcher not wired/); + }); + + it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => { + const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) }); + const svc = new ChatService( + mockAgents(), + mockLlmsVirtual(), + emptyAdapterRegistry(), + mockChatRepo(), + mockPromptRepo(), + mockTools(), + undefined, + virtual, + ); + await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) + .rejects.toThrow(/publisher offline/); + }); +}); diff --git a/src/mcpd/tests/chat-service.test.ts b/src/mcpd/tests/chat-service.test.ts index fe62c3c..3eb205a 100644 --- a/src/mcpd/tests/chat-service.test.ts +++ b/src/mcpd/tests/chat-service.test.ts @@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } | } as unknown as AgentService; } -function mockLlms(): LlmService { +function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService { return { getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking', url: '', tier: 'fast', description: '', apiKeyRef: null, extraConfig: {}, + kind: opts.kind ?? 'public', + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, version: 1, createdAt: NOW, updatedAt: NOW, })), resolveApiKey: vi.fn(async () => 'fake-key'),