Files
mcpctl/src/mcpd/tests/chat-service-virtual-llm.test.ts
Michal 9afd24a3aa feat(db+mcpd): Agent lifecycle + chat.service kind=virtual branch (v3 Stage 1)
Two pieces of v3 plumbing — schema + the latent v1 chat.service bug.

Schema (db):
- Agent gains kind/providerSessionId/lastHeartbeatAt/status/inactiveSince
  mirroring Llm's v1 lifecycle. Reuses LlmKind / LlmStatus enums; no
  new types. Existing rows backfill kind=public/status=active so v1
  CRUD is unaffected.
- @@index([kind, status]) for the GC sweep, @@index([providerSessionId])
  for disconnect-cascade lookups.
- 4 new prisma-level tests cover defaults, persisting virtual fields,
  the (kind, status) GC index, and providerSessionId lookups.
  Total agent-schema tests: 20/20.

chat.service (mcpd) — fixes the v1 latent bug:
- LlmView's kind is now plumbed through prepareContext as ctx.llmKind.
- Two new private helpers, runOneInference / streamInference, branch
  on ctx.llmKind: 'public' goes through the existing adapter
  registry, 'virtual' relays through VirtualLlmService.enqueueInferTask
  (mirrors the route-handler branch from v1 Stage 3).
- Streaming bridges VirtualLlmService's onChunk callback API to an
  async iterator via a small queue + wake pattern.
- ChatService gains an optional virtualLlms constructor parameter;
  main.ts wires it in. Older test wirings without it raise a clear
  "virtualLlms dispatcher not wired" error when the row is virtual,
  rather than silently falling through to the public path against an
  empty URL.

This unblocks any Agent (public OR future v3-virtual) pinned to a
kind=virtual Llm. Pre-this-stage, those agents 502'd against the
empty url field.

Tests: 4 new chat-service-virtual-llm.test.ts cover the relay path
non-streaming, streaming, missing-dispatcher error, and rejection
surfacing. mcpd suite: 841/841 (was 833, +8 across stages 1+v3-Stage-1).
Workspace: 2054/2054 across 153 files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:07:23 +01:00

252 lines
8.6 KiB
TypeScript

import { describe, it, expect, vi } from 'vitest';
import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js';
import type { AgentService } from '../src/services/agent.service.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
import type { IChatRepository } from '../src/repositories/chat.repository.js';
import type { IPromptRepository } from '../src/repositories/prompt.repository.js';
import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { ChatMessage, ChatThread, Prompt } from '@prisma/client';
const NOW = new Date();
/**
* Tests targeting v3 Stage 1's chat.service kind=virtual branch.
* Mirror the existing chat-service.test.ts patterns but isolate the
* adapter-vs-relay dispatch decision.
*/
function mockChatRepo(): IChatRepository {
const msgs: ChatMessage[] = [];
const threads: ChatThread[] = [];
let idCounter = 1;
return {
createThread: vi.fn(async ({ agentId, ownerId, title }) => {
const t: ChatThread = {
id: `thread-${String(idCounter++)}`, agentId, ownerId,
title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW,
};
threads.push(t);
return t;
}),
findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null),
listThreadsByAgent: vi.fn(async () => []),
listMessages: vi.fn(async () => []),
appendMessage: vi.fn(async (input) => {
const m: ChatMessage = {
id: `msg-${String(idCounter++)}`,
threadId: input.threadId,
turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length,
role: input.role,
content: input.content,
toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'],
toolCallId: input.toolCallId ?? null,
status: input.status ?? 'complete',
createdAt: NOW,
};
msgs.push(m);
return m;
}),
updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)),
markPendingAsError: vi.fn(async () => 0),
touchThread: vi.fn(async () => undefined),
nextTurnIndex: vi.fn(async () => msgs.length),
};
}
function mockAgents(): AgentService {
return {
getByName: vi.fn(async (name: string) => ({
id: `agent-${name}`, name, description: '',
systemPrompt: 'You are a helpful agent.',
llm: { id: 'llm-1', name: 'vllm-local' },
project: null,
defaultPersonality: null,
proxyModelName: null,
defaultParams: {},
extras: {},
ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW,
})),
} as unknown as AgentService;
}
function mockLlmsVirtual(): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'fake',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: 'virtual',
status: 'active',
lastHeartbeatAt: NOW,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => ''),
} as unknown as LlmService;
}
function mockPromptRepo(): IPromptRepository {
return {
findAll: vi.fn(async () => []),
findGlobal: vi.fn(async () => []),
findByAgent: vi.fn(async () => []),
findById: vi.fn(async () => null),
findByNameAndProject: vi.fn(async () => null),
findByNameAndAgent: vi.fn(async () => null),
create: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
};
}
function mockTools(): ChatToolDispatcher {
return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) };
}
function emptyAdapterRegistry(): LlmAdapterRegistry {
return {
get: () => { throw new Error('adapter should not be used for kind=virtual'); },
} as unknown as LlmAdapterRegistry;
}
function mockVirtualLlms(opts: {
reply?: string;
rejectWith?: Error;
streamingChunks?: string[];
}): IVirtualLlmService {
const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => {
if (opts.rejectWith !== undefined) {
return {
taskId: 't-1',
done: Promise.reject(opts.rejectWith),
onChunk: () => () => undefined,
};
}
if (!streaming) {
const body = {
choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }],
};
return {
taskId: 't-1',
done: Promise.resolve({ status: 200, body }),
onChunk: () => () => undefined,
};
}
// Streaming path: collect subscribers, push the configured chunks
// synchronously, then resolve done.
const subs = new Set<(c: { data: string; done?: boolean }) => void>();
const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}'];
return {
taskId: 't-1',
done: (async (): Promise<{ status: number; body: unknown }> => {
// Wait long enough for the caller to register subscribers
// before fanning chunks. Promise.resolve() isn't enough — the
// microtask running this IIFE is queued ahead of the caller's
// await on enqueueInferTask, so subs would still be empty.
await new Promise((r) => setTimeout(r, 0));
for (const c of chunks) for (const s of subs) s({ data: c });
for (const s of subs) s({ data: '[DONE]', done: true });
return { status: 200, body: null };
})(),
onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); },
};
});
return {
register: vi.fn(),
heartbeat: vi.fn(),
bindSession: vi.fn(),
unbindSession: vi.fn(),
enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'],
completeTask: vi.fn(),
pushTaskChunk: vi.fn(),
failTask: vi.fn(),
gcSweep: vi.fn(),
};
}
describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({ reply: 'hello back from local' });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toBe('hello back from local');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array) }),
false,
);
});
it('streaming relays through VirtualLlmService and emits the same text deltas', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({
streamingChunks: [
'{"choices":[{"delta":{"content":"hello "}}]}',
'{"choices":[{"delta":{"content":"world"}}]}',
],
});
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const deltas: string[] = [];
for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
if (evt.type === 'text') deltas.push(evt.delta);
if (evt.type === 'final') break;
}
expect(deltas.join('')).toBe('hello world');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array), stream: true }),
true,
);
});
it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => {
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
// no personalities, no virtualLlms
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/virtualLlms dispatcher not wired/);
});
it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => {
const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/publisher offline/);
});
});