feat: virtual agents v3 (Stages 1-3) + real fixes for chat/adapter/CLI thread format #67

Merged
michal merged 5 commits from feat/virtual-agent-v3 into main 2026-04-27 18:06:59 +00:00
7 changed files with 490 additions and 26 deletions
Showing only changes of commit 9afd24a3aa - Show all commits

View File

@@ -0,0 +1,14 @@
-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the
-- existing LlmKind / LlmStatus enums so we don't double-define them.
-- Existing rows backfill with kind='public' / status='active' so
-- nothing changes for manually-created agents.
ALTER TABLE "Agent"
ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public',
ADD COLUMN "providerSessionId" TEXT,
ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3),
ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active',
ADD COLUMN "inactiveSince" TIMESTAMP(3);
CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status");
CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId");

View File

@@ -479,6 +479,12 @@ model Agent {
proxyModelName String? // optional informational override
defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ...
extras Json @default("{}") // future LoRA / tool-allowlist
// ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ──
kind LlmKind @default(public)
providerSessionId String? // mcplocal session that owns this row when virtual
lastHeartbeatAt DateTime?
status LlmStatus @default(active)
inactiveSince DateTime?
ownerId String
version Int @default(1)
createdAt DateTime @default(now())
@@ -497,6 +503,8 @@ model Agent {
@@index([projectId])
@@index([ownerId])
@@index([defaultPersonalityId])
@@index([kind, status])
@@index([providerSessionId])
}
// ── Personalities (named overlay bundles of prompts on top of an Agent) ──

View File

@@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => {
expect(reloaded?.defaultPersonalityId).toBeNull();
});
// ── v3: Agent.kind virtual + lifecycle fields ──
it('defaults a freshly inserted Agent to kind=public, status=active', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-default-kind');
const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id });
expect(agent.kind).toBe('public');
expect(agent.status).toBe('active');
expect(agent.providerSessionId).toBeNull();
expect(agent.lastHeartbeatAt).toBeNull();
expect(agent.inactiveSince).toBeNull();
});
it('persists kind=virtual + lifecycle fields together', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-pub-virtual');
const now = new Date();
const agent = await prisma.agent.create({
data: {
name: 'local-coder',
llmId: llm.id,
ownerId: user.id,
kind: 'virtual',
providerSessionId: 'sess-abc',
lastHeartbeatAt: now,
status: 'active',
},
});
expect(agent.kind).toBe('virtual');
expect(agent.providerSessionId).toBe('sess-abc');
expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime());
});
it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-gc-agent');
await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } });
await prisma.agent.create({
data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' },
});
await prisma.agent.create({
data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() },
});
const stale = await prisma.agent.findMany({
where: { kind: 'virtual', status: 'inactive' },
select: { name: true },
});
expect(stale.map((a) => a.name)).toEqual(['v-inactive']);
});
it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-sess-cascade');
await prisma.agent.create({
data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' },
});
const owned = await prisma.agent.findMany({
where: { providerSessionId: 'shared' },
select: { name: true },
orderBy: { name: 'asc' },
});
expect(owned.map((a) => a.name)).toEqual(['a', 'b']);
});
it('binds the same prompt to multiple personalities of an agent', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-shared-prompt');

View File

@@ -607,6 +607,7 @@ async function main(): Promise<void> {
promptRepo,
chatToolDispatcher,
personalityRepo,
virtualLlmService,
);
registerAgentChatRoutes(app, chatService);
registerLlmInferRoutes(app, {

View File

@@ -31,6 +31,7 @@ import type {
} from '../repositories/chat.repository.js';
import type { IPromptRepository } from '../repositories/prompt.repository.js';
import type { IPersonalityRepository } from '../repositories/personality.repository.js';
import type { IVirtualLlmService } from './virtual-llm.service.js';
import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js';
import type { AgentChatParams } from '../validation/agent.schema.js';
import { NotFoundError } from './mcp-server.service.js';
@@ -132,6 +133,14 @@ export class ChatService {
private readonly promptRepo: IPromptRepository,
private readonly tools: ChatToolDispatcher,
private readonly personalities?: IPersonalityRepository,
/**
* v3: when an Agent is pinned to a `kind=virtual` Llm, inference is
* relayed via this service rather than an HTTP adapter (the virtual
* row has no upstream URL). Optional so older test wirings still
* compile; in those tests the chat path will refuse virtual Llms
* with a clear error.
*/
private readonly virtualLlms?: IVirtualLlmService,
) {}
async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> {
@@ -170,14 +179,7 @@ export class ChatService {
let lastTurnIndex = ctx.startingTurnIndex;
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const result = await adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
const result = await this.runOneInference(ctx);
const choice = extractChoice(result.body);
if (choice === null) {
throw new Error(`Adapter returned no choice (status ${String(result.status)})`);
@@ -240,19 +242,12 @@ export class ChatService {
const ctx = await this.prepareContext(args);
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = {
content: '',
toolCalls: [],
};
let finishReason: string | null = null;
for await (const chunk of adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
})) {
for await (const chunk of this.streamInference(ctx)) {
if (chunk.done === true) break;
if (chunk.data === '[DONE]') break;
const evt = parseStreamingChunk(chunk.data);
@@ -347,12 +342,130 @@ export class ChatService {
}
}
/**
* Streaming counterpart of runOneInference. Yields raw OpenAI-style
* SSE chunks ({data: string; done?: boolean}) regardless of whether
* we're hitting a public adapter or relaying through VirtualLlmService.
* The caller's `parseStreamingChunk` already speaks OpenAI shape, so
* downstream code doesn't need to know which path produced the chunks.
*/
private async *streamInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): AsyncGenerator<{ data: string; done?: boolean }> {
if (ctx.llmKind !== 'virtual') {
const adapter = this.adapters.get(ctx.llmType);
yield* adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
return;
}
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm',
);
}
// Bridge VirtualLlmService's onChunk callback API to an async
// iterator. Chunks land on the queue from the SSE relay; the
// generator drains them in order. ref.done resolves when the
// publisher emits its `[DONE]` marker.
const ref = await this.virtualLlms.enqueueInferTask(
ctx.llmName,
{ ...this.buildBody(ctx), stream: true },
true,
);
const queue: Array<{ data: string; done?: boolean }> = [];
let resolveTick: (() => void) | null = null;
const wake = (): void => {
const r = resolveTick;
resolveTick = null;
if (r !== null) r();
};
const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); });
let finished = false;
let failure: Error | null = null;
ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); });
try {
while (true) {
while (queue.length > 0) {
const c = queue.shift()!;
yield c;
if (c.done === true) return;
}
if (finished) {
if (failure !== null) throw failure;
return;
}
await new Promise<void>((r) => { resolveTick = r; });
}
} finally {
unsubscribe();
}
}
/**
* Run a single non-streaming inference iteration. Branches on
* ctx.llmKind: public goes through the existing adapter registry,
* virtual relays through VirtualLlmService.enqueueInferTask (mirrors
* the same branch in `routes/llm-infer.ts` from v1 Stage 3).
*
* Throws when virtualLlms isn't wired but the row is virtual — older
* test wirings hit this path.
*/
private async runOneInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): Promise<{ status: number; body: unknown }> {
if (ctx.llmKind === 'virtual') {
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm',
);
}
const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false);
return ref.done;
}
const adapter = this.adapters.get(ctx.llmType);
return adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
}
private async prepareContext(args: ChatRequestArgs): Promise<{
threadId: string;
history: OpenAiMessage[];
systemBlock: string;
llmName: string;
llmType: string;
/** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
@@ -435,6 +548,7 @@ export class ChatService {
systemBlock,
llmName: llm.name,
llmType: llm.type,
llmKind: llm.kind,
modelOverride: llm.model,
url: llm.url,
apiKey,

View File

@@ -0,0 +1,251 @@
import { describe, it, expect, vi } from 'vitest';
import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js';
import type { AgentService } from '../src/services/agent.service.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
import type { IChatRepository } from '../src/repositories/chat.repository.js';
import type { IPromptRepository } from '../src/repositories/prompt.repository.js';
import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { ChatMessage, ChatThread, Prompt } from '@prisma/client';
const NOW = new Date();
/**
* Tests targeting v3 Stage 1's chat.service kind=virtual branch.
* Mirror the existing chat-service.test.ts patterns but isolate the
* adapter-vs-relay dispatch decision.
*/
function mockChatRepo(): IChatRepository {
const msgs: ChatMessage[] = [];
const threads: ChatThread[] = [];
let idCounter = 1;
return {
createThread: vi.fn(async ({ agentId, ownerId, title }) => {
const t: ChatThread = {
id: `thread-${String(idCounter++)}`, agentId, ownerId,
title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW,
};
threads.push(t);
return t;
}),
findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null),
listThreadsByAgent: vi.fn(async () => []),
listMessages: vi.fn(async () => []),
appendMessage: vi.fn(async (input) => {
const m: ChatMessage = {
id: `msg-${String(idCounter++)}`,
threadId: input.threadId,
turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length,
role: input.role,
content: input.content,
toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'],
toolCallId: input.toolCallId ?? null,
status: input.status ?? 'complete',
createdAt: NOW,
};
msgs.push(m);
return m;
}),
updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)),
markPendingAsError: vi.fn(async () => 0),
touchThread: vi.fn(async () => undefined),
nextTurnIndex: vi.fn(async () => msgs.length),
};
}
function mockAgents(): AgentService {
return {
getByName: vi.fn(async (name: string) => ({
id: `agent-${name}`, name, description: '',
systemPrompt: 'You are a helpful agent.',
llm: { id: 'llm-1', name: 'vllm-local' },
project: null,
defaultPersonality: null,
proxyModelName: null,
defaultParams: {},
extras: {},
ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW,
})),
} as unknown as AgentService;
}
function mockLlmsVirtual(): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'fake',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: 'virtual',
status: 'active',
lastHeartbeatAt: NOW,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => ''),
} as unknown as LlmService;
}
function mockPromptRepo(): IPromptRepository {
return {
findAll: vi.fn(async () => []),
findGlobal: vi.fn(async () => []),
findByAgent: vi.fn(async () => []),
findById: vi.fn(async () => null),
findByNameAndProject: vi.fn(async () => null),
findByNameAndAgent: vi.fn(async () => null),
create: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
};
}
function mockTools(): ChatToolDispatcher {
return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) };
}
function emptyAdapterRegistry(): LlmAdapterRegistry {
return {
get: () => { throw new Error('adapter should not be used for kind=virtual'); },
} as unknown as LlmAdapterRegistry;
}
function mockVirtualLlms(opts: {
reply?: string;
rejectWith?: Error;
streamingChunks?: string[];
}): IVirtualLlmService {
const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => {
if (opts.rejectWith !== undefined) {
return {
taskId: 't-1',
done: Promise.reject(opts.rejectWith),
onChunk: () => () => undefined,
};
}
if (!streaming) {
const body = {
choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }],
};
return {
taskId: 't-1',
done: Promise.resolve({ status: 200, body }),
onChunk: () => () => undefined,
};
}
// Streaming path: collect subscribers, push the configured chunks
// synchronously, then resolve done.
const subs = new Set<(c: { data: string; done?: boolean }) => void>();
const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}'];
return {
taskId: 't-1',
done: (async (): Promise<{ status: number; body: unknown }> => {
// Wait long enough for the caller to register subscribers
// before fanning chunks. Promise.resolve() isn't enough — the
// microtask running this IIFE is queued ahead of the caller's
// await on enqueueInferTask, so subs would still be empty.
await new Promise((r) => setTimeout(r, 0));
for (const c of chunks) for (const s of subs) s({ data: c });
for (const s of subs) s({ data: '[DONE]', done: true });
return { status: 200, body: null };
})(),
onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); },
};
});
return {
register: vi.fn(),
heartbeat: vi.fn(),
bindSession: vi.fn(),
unbindSession: vi.fn(),
enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'],
completeTask: vi.fn(),
pushTaskChunk: vi.fn(),
failTask: vi.fn(),
gcSweep: vi.fn(),
};
}
describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({ reply: 'hello back from local' });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toBe('hello back from local');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array) }),
false,
);
});
it('streaming relays through VirtualLlmService and emits the same text deltas', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({
streamingChunks: [
'{"choices":[{"delta":{"content":"hello "}}]}',
'{"choices":[{"delta":{"content":"world"}}]}',
],
});
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const deltas: string[] = [];
for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
if (evt.type === 'text') deltas.push(evt.delta);
if (evt.type === 'final') break;
}
expect(deltas.join('')).toBe('hello world');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array), stream: true }),
true,
);
});
it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => {
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
// no personalities, no virtualLlms
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/virtualLlms dispatcher not wired/);
});
it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => {
const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/publisher offline/);
});
});

View File

@@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } |
} as unknown as AgentService;
}
function mockLlms(): LlmService {
function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: opts.kind ?? 'public',
status: 'active',
lastHeartbeatAt: null,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => 'fake-key'),