Merge pull request 'feat(mcpd): LLM inference proxy + OpenAI/Anthropic adapters' (#53) from feat/llm-infer into main
Some checks failed
CI/CD / lint (push) Has started running
CI/CD / typecheck (push) Has started running
CI/CD / test (push) Has been cancelled
CI/CD / smoke (push) Has been cancelled
CI/CD / build (push) Has been cancelled
CI/CD / publish (push) Has been cancelled

This commit was merged in pull request #53.
This commit is contained in:
2026-04-19 21:39:39 +00:00
8 changed files with 1079 additions and 0 deletions

View File

@@ -28,7 +28,9 @@ import { registerSecretBackendRoutes } from './routes/secret-backends.js';
import { registerSecretMigrateRoutes } from './routes/secret-migrate.js';
import { LlmRepository } from './repositories/llm.repository.js';
import { LlmService } from './services/llm.service.js';
import { LlmAdapterRegistry } from './services/llm/dispatcher.js';
import { registerLlmRoutes } from './routes/llms.js';
import { registerLlmInferRoutes } from './routes/llm-infer.js';
import { PromptRepository } from './repositories/prompt.repository.js';
import { PromptRequestRepository } from './repositories/prompt-request.repository.js';
import { bootstrapSystemProject } from './bootstrap/system-project.js';
@@ -105,6 +107,12 @@ function mapUrlToPermission(method: string, url: string): PermissionCheck {
// /api/v1/secrets/migrate is a bulk cross-backend operation — treat as op, not a plain secret write.
if (url.startsWith('/api/v1/secrets/migrate')) return { kind: 'operation', operation: 'migrate-secrets' };
// /api/v1/llms/:name/infer → `run:llms:<name>` (not the default create:llms).
const inferMatch = url.match(/^\/api\/v1\/llms\/([^/?]+)\/infer/);
if (inferMatch?.[1]) {
return { kind: 'resource', resource: 'llms', action: 'run', resourceName: inferMatch[1] };
}
const resourceMap: Record<string, string | undefined> = {
'servers': 'servers',
'instances': 'instances',
@@ -334,6 +342,7 @@ async function main(): Promise<void> {
const secretService = new SecretService(secretRepo, secretBackendService);
const secretMigrateService = new SecretMigrateService(secretRepo, secretBackendService);
const llmService = new LlmService(llmRepo, secretService);
const llmAdapters = new LlmAdapterRegistry();
const instanceService = new InstanceService(instanceRepo, serverRepo, orchestrator, secretService);
serverService.setInstanceService(instanceService);
const projectService = new ProjectService(projectRepo, serverRepo);
@@ -475,6 +484,23 @@ async function main(): Promise<void> {
registerSecretBackendRoutes(app, secretBackendService);
registerSecretMigrateRoutes(app, secretMigrateService);
registerLlmRoutes(app, llmService);
registerLlmInferRoutes(app, {
llmService,
adapters: llmAdapters,
onInferenceEvent: (event) => {
app.log.info({
event: 'llm_inference_call',
llm: event.llmName,
model: event.model,
type: event.type,
userId: event.userId,
tokenSha: event.tokenSha,
streaming: event.streaming,
durationMs: event.durationMs,
status: event.status,
});
},
});
registerInstanceRoutes(app, instanceService);
registerProjectRoutes(app, projectService);
registerAuditLogRoutes(app, auditLogService);

View File

@@ -0,0 +1,145 @@
/**
* POST /api/v1/llms/:name/infer
*
* OpenAI-compatible chat completions endpoint. The RBAC check runs in the
* global hook — this URL maps to `run:llms:<name>`, not the default
* `create:llms`. See `main.ts:mapUrlToPermission`.
*
* Non-streaming: resolves the Llm, dispatches to the right provider adapter,
* returns the OpenAI chat.completion JSON.
*
* Streaming (`stream: true`): pipes adapter-emitted chunks back as
* `text/event-stream`. Adapters translate provider-native SSE into OpenAI
* `chat.completion.chunk`s so clients can use any OpenAI SDK unchanged.
*/
import type { FastifyInstance, FastifyReply } from 'fastify';
import type { LlmService } from '../services/llm.service.js';
import type { LlmAdapterRegistry } from '../services/llm/dispatcher.js';
import { NotFoundError } from '../services/mcp-server.service.js';
import type { OpenAiChatRequest, InferContext } from '../services/llm/types.js';
export interface LlmInferDeps {
llmService: LlmService;
adapters: LlmAdapterRegistry;
/** Optional hook to emit audit events — consumer may ignore. */
onInferenceEvent?: (event: InferenceAuditEvent) => void;
}
export interface InferenceAuditEvent {
kind: 'llm_inference_call';
llmName: string;
model: string;
type: string;
userId?: string | undefined;
tokenSha?: string | undefined;
streaming: boolean;
durationMs: number;
status: number;
}
export function registerLlmInferRoutes(
app: FastifyInstance,
deps: LlmInferDeps,
): void {
app.post<{ Params: { name: string }; Body: OpenAiChatRequest }>(
'/api/v1/llms/:name/infer',
async (request, reply) => {
const started = Date.now();
let llm;
try {
llm = await deps.llmService.getByName(request.params.name);
} catch (err) {
if (err instanceof NotFoundError) {
reply.code(404);
return { error: err.message };
}
throw err;
}
const body = (request.body ?? {}) as OpenAiChatRequest;
if (!body.messages || body.messages.length === 0) {
reply.code(400);
return { error: 'messages is required' };
}
// Resolve API key (may be empty string for providers that don't take one).
let apiKey = '';
if (llm.apiKeyRef !== null) {
try {
apiKey = await deps.llmService.resolveApiKey(llm.name);
} catch (err) {
reply.code(500);
return { error: `Failed to resolve API key: ${err instanceof Error ? err.message : String(err)}` };
}
}
const ctx: InferContext = {
body,
modelOverride: llm.model,
apiKey,
url: llm.url,
extraConfig: llm.extraConfig,
};
const adapter = deps.adapters.get(llm.type);
const streaming = body.stream === true;
const audit = (status: number): void => {
if (deps.onInferenceEvent === undefined) return;
deps.onInferenceEvent({
kind: 'llm_inference_call',
llmName: llm.name,
model: llm.model,
type: llm.type,
userId: request.userId,
tokenSha: request.mcpToken?.tokenSha,
streaming,
durationMs: Date.now() - started,
status,
});
};
if (!streaming) {
try {
const result = await adapter.infer(ctx);
reply.code(result.status);
audit(result.status);
return result.body;
} catch (err) {
audit(502);
reply.code(502);
return { error: err instanceof Error ? err.message : String(err) };
}
}
// Streaming path — set SSE headers and pipe chunks.
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
try {
for await (const chunk of adapter.stream(ctx)) {
writeSseChunk(reply, chunk.data);
if (chunk.done === true) break;
}
audit(200);
} catch (err) {
const payload = JSON.stringify({
error: { message: err instanceof Error ? err.message : String(err) },
});
writeSseChunk(reply, payload);
writeSseChunk(reply, '[DONE]');
audit(502);
} finally {
reply.raw.end();
}
return reply;
},
);
}
function writeSseChunk(reply: FastifyReply, data: string): void {
reply.raw.write(`data: ${data}\n\n`);
}

View File

@@ -0,0 +1,256 @@
/**
* Anthropic adapter — translates between OpenAI chat/completions format and
* the Anthropic Messages API (`POST /v1/messages`).
*
* Key differences we translate:
* - OpenAI `role: 'system'` messages become a top-level `system` string.
* - Anthropic returns `content: [{ type: 'text', text }]` — we join into
* OpenAI's `content: "…"` string.
* - Streaming: Anthropic emits a sequence of
* `message_start / content_block_{start,delta,stop} / message_delta /
* message_stop` events. We translate those to OpenAI
* `chat.completion.chunk` deltas.
*
* This adapter implements the subset needed for plain-text chat — tool-use
* translation is intentionally left out for this phase; agents that need tool
* calling should target an OpenAI-compatible provider until the translator
* covers it.
*/
import type {
LlmAdapter,
InferContext,
NonStreamingResult,
StreamingChunk,
AdapterDeps,
OpenAiMessage,
} from '../types.js';
const DEFAULT_ANTHROPIC_URL = 'https://api.anthropic.com';
const ANTHROPIC_VERSION = '2023-06-01';
interface AnthropicMessageResponse {
id: string;
model: string;
role: 'assistant';
content: Array<{ type: 'text'; text: string } | { type: string; [k: string]: unknown }>;
stop_reason?: string;
usage?: { input_tokens: number; output_tokens: number };
}
export class AnthropicAdapter implements LlmAdapter {
readonly kind = 'anthropic';
private readonly fetchImpl: typeof globalThis.fetch;
constructor(deps: AdapterDeps = {}) {
this.fetchImpl = deps.fetch ?? globalThis.fetch;
}
async infer(ctx: InferContext): Promise<NonStreamingResult> {
const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, '');
const body = this.toAnthropicRequest(ctx, false);
const res = await this.fetchImpl(`${url}/v1/messages`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
return {
status: res.status,
body: { error: { message: `anthropic: HTTP ${String(res.status)} ${text}` } },
};
}
const anth = await res.json() as AnthropicMessageResponse;
return { status: 200, body: this.toOpenAiResponse(anth) };
}
async *stream(ctx: InferContext): AsyncGenerator<StreamingChunk> {
const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, '');
const body = this.toAnthropicRequest(ctx, true);
const res = await this.fetchImpl(`${url}/v1/messages`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok || res.body === null) {
const text = await res.text().catch(() => '');
throw new Error(`anthropic stream: HTTP ${String(res.status)} ${text}`);
}
const id = `chatcmpl-${cryptoNonce()}`;
const model = body.model;
const created = Math.floor(Date.now() / 1000);
// Parse Anthropic SSE. Each event is `event: <name>\ndata: <json>\n\n`.
const decoder = new TextDecoder();
let buf = '';
const reader = res.body.getReader();
let emittedFirst = false;
const baseChunk = (delta: Record<string, unknown>, finishReason?: string): string => {
const chunk = {
id,
object: 'chat.completion.chunk',
created,
model,
choices: [{
index: 0,
delta,
finish_reason: finishReason ?? null,
}],
};
return JSON.stringify(chunk);
};
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const rawEvent = buf.slice(0, idx);
buf = buf.slice(idx + 2);
const parsed = parseSseEvent(rawEvent);
if (parsed === null) continue;
const { event, data } = parsed;
if (event === 'content_block_delta') {
const textDelta = (data as { delta?: { type?: string; text?: string } }).delta;
if (textDelta?.type === 'text_delta' && typeof textDelta.text === 'string') {
if (!emittedFirst) {
yield { data: baseChunk({ role: 'assistant', content: '' }) };
emittedFirst = true;
}
yield { data: baseChunk({ content: textDelta.text }) };
}
} else if (event === 'message_delta') {
const stopReason = (data as { delta?: { stop_reason?: string } }).delta?.stop_reason;
if (typeof stopReason === 'string') {
yield { data: baseChunk({}, mapStopReason(stopReason)) };
}
} else if (event === 'message_stop') {
yield { data: '[DONE]', done: true };
return;
} else if (event === 'error') {
throw new Error(`anthropic stream error: ${JSON.stringify(data)}`);
}
}
}
} finally {
reader.releaseLock();
}
// Anthropic closed without message_stop — give consumer a clean end.
yield { data: '[DONE]', done: true };
}
private headers(ctx: InferContext): Record<string, string> {
return {
'Content-Type': 'application/json',
'x-api-key': ctx.apiKey,
'anthropic-version': ANTHROPIC_VERSION,
};
}
/** Translate the OpenAI request to the Anthropic Messages shape. */
private toAnthropicRequest(ctx: InferContext, stream: boolean): {
model: string;
max_tokens: number;
messages: Array<{ role: 'user' | 'assistant'; content: string }>;
system?: string;
stream?: boolean;
temperature?: number;
top_p?: number;
stop_sequences?: string[];
} {
const { body } = ctx;
const systemParts: string[] = [];
const messages: Array<{ role: 'user' | 'assistant'; content: string }> = [];
for (const msg of body.messages) {
const text = normaliseContent(msg);
if (msg.role === 'system') {
systemParts.push(text);
} else if (msg.role === 'user' || msg.role === 'assistant') {
messages.push({ role: msg.role, content: text });
}
// `tool` role messages are dropped — tool translation is out of scope
// for this phase.
}
const out: ReturnType<typeof this.toAnthropicRequest> = {
model: body.model !== '' ? body.model : ctx.modelOverride,
max_tokens: typeof body.max_tokens === 'number' ? body.max_tokens : 1024,
messages,
};
if (systemParts.length > 0) out.system = systemParts.join('\n\n');
if (stream) out.stream = true;
if (typeof body.temperature === 'number') out.temperature = body.temperature;
if (typeof body.top_p === 'number') out.top_p = body.top_p;
if (body.stop !== undefined) {
out.stop_sequences = Array.isArray(body.stop) ? body.stop : [body.stop];
}
return out;
}
private toOpenAiResponse(anth: AnthropicMessageResponse): Record<string, unknown> {
const text = anth.content
.map((c) => (c.type === 'text' && typeof (c as { text?: unknown }).text === 'string'
? (c as { text: string }).text
: ''))
.join('');
return {
id: `chatcmpl-${anth.id}`,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
model: anth.model,
choices: [{
index: 0,
message: { role: 'assistant', content: text },
finish_reason: mapStopReason(anth.stop_reason ?? 'end_turn'),
}],
usage: anth.usage ? {
prompt_tokens: anth.usage.input_tokens,
completion_tokens: anth.usage.output_tokens,
total_tokens: anth.usage.input_tokens + anth.usage.output_tokens,
} : undefined,
};
}
}
function normaliseContent(msg: OpenAiMessage): string {
if (typeof msg.content === 'string') return msg.content;
return msg.content
.map((part) => (typeof part.text === 'string' ? part.text : ''))
.join('');
}
function mapStopReason(r: string): string {
// Anthropic → OpenAI finish_reason
if (r === 'end_turn' || r === 'stop_sequence') return 'stop';
if (r === 'max_tokens') return 'length';
if (r === 'tool_use') return 'tool_calls';
return r;
}
function parseSseEvent(raw: string): { event: string; data: unknown } | null {
let event = '';
let dataLine = '';
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) event = line.slice(6).trim();
else if (line.startsWith('data:')) dataLine += line.slice(5).trim();
}
if (dataLine === '') return null;
try {
return { event, data: JSON.parse(dataLine) as unknown };
} catch {
return null;
}
}
function cryptoNonce(): string {
// Not security-sensitive — just a short randomish id.
return Math.random().toString(36).slice(2, 10);
}

View File

@@ -0,0 +1,112 @@
/**
* OpenAI-passthrough adapter.
*
* Covers any provider that already speaks OpenAI chat/completions on the
* wire: `openai`, `vllm`, `deepseek`, `ollama` (with their openai-compatible
* endpoint enabled). The adapter forwards the request body verbatim and
* streams the response straight through — no wire translation.
*
* Defaults when `url` is empty:
* - openai → https://api.openai.com
* - deepseek → https://api.deepseek.com
* - vllm/ollama → must be configured; these have no canonical public URL.
*/
import type { LlmAdapter, InferContext, NonStreamingResult, StreamingChunk, AdapterDeps } from '../types.js';
const DEFAULT_URLS: Record<string, string> = {
openai: 'https://api.openai.com',
deepseek: 'https://api.deepseek.com',
};
export class OpenAiPassthroughAdapter implements LlmAdapter {
readonly kind: string;
private readonly fetchImpl: typeof globalThis.fetch;
constructor(kind: 'openai' | 'vllm' | 'deepseek' | 'ollama', deps: AdapterDeps = {}) {
this.kind = kind;
this.fetchImpl = deps.fetch ?? globalThis.fetch;
}
async infer(ctx: InferContext): Promise<NonStreamingResult> {
const url = this.endpointUrl(ctx.url);
const body = this.prepareBody(ctx, false);
const res = await this.fetchImpl(`${url}/v1/chat/completions`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
const json = await res.json() as unknown;
return { status: res.status, body: json };
}
async *stream(ctx: InferContext): AsyncGenerator<StreamingChunk> {
const url = this.endpointUrl(ctx.url);
const body = this.prepareBody(ctx, true);
const res = await this.fetchImpl(`${url}/v1/chat/completions`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok || res.body === null) {
const text = await res.text().catch(() => '');
throw new Error(`${this.kind} stream: HTTP ${String(res.status)} ${text}`);
}
// Re-frame the provider's SSE stream into our `StreamingChunk` shape.
// OpenAI-compat providers already emit `data: {...}` + `data: [DONE]` —
// we just unwrap the `data: ` prefix, forward payloads, and emit a
// single terminal `done` chunk so the consumer always gets one.
const decoder = new TextDecoder();
let buf = '';
const reader = res.body.getReader();
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const event = buf.slice(0, idx);
buf = buf.slice(idx + 2);
for (const line of event.split('\n')) {
if (!line.startsWith('data:')) continue;
const payload = line.slice(5).trim();
if (payload === '') continue;
if (payload === '[DONE]') {
yield { data: '[DONE]', done: true };
return;
}
yield { data: payload };
}
}
}
} finally {
reader.releaseLock();
}
// Provider closed without emitting [DONE] — give the consumer a clean end.
yield { data: '[DONE]', done: true };
}
private endpointUrl(url: string): string {
if (url !== '') return url.replace(/\/+$/, '');
const def = DEFAULT_URLS[this.kind];
if (def === undefined) {
throw new Error(`${this.kind}: url is required (no default endpoint for this provider)`);
}
return def;
}
private headers(ctx: InferContext): Record<string, string> {
const headers: Record<string, string> = { 'Content-Type': 'application/json' };
if (ctx.apiKey !== '') headers['Authorization'] = `Bearer ${ctx.apiKey}`;
return headers;
}
private prepareBody(ctx: InferContext, stream: boolean): Record<string, unknown> {
const out: Record<string, unknown> = { ...ctx.body };
if (out.model === undefined || out.model === '') out.model = ctx.modelOverride;
out.stream = stream;
return out;
}
}

View File

@@ -0,0 +1,52 @@
/**
* Adapter dispatcher for the inference proxy.
*
* `getAdapter(type)` returns the right adapter instance for an Llm's `type`
* column. Adapters are cached per-type — they carry no per-request state.
* The caller (the infer route) supplies the resolved API key + request body
* through `InferContext`, so a single adapter instance serves every Llm of
* that type.
*/
import type { LlmAdapter, AdapterDeps } from './types.js';
import { OpenAiPassthroughAdapter } from './adapters/openai-passthrough.js';
import { AnthropicAdapter } from './adapters/anthropic.js';
export class UnsupportedProviderError extends Error {
constructor(type: string) {
super(`Unsupported LLM provider: ${type}`);
this.name = 'UnsupportedProviderError';
}
}
export class LlmAdapterRegistry {
private readonly cache = new Map<string, LlmAdapter>();
constructor(private readonly deps: AdapterDeps = {}) {}
get(type: string): LlmAdapter {
const cached = this.cache.get(type);
if (cached !== undefined) return cached;
const adapter = this.build(type);
this.cache.set(type, adapter);
return adapter;
}
private build(type: string): LlmAdapter {
switch (type) {
case 'openai':
case 'vllm':
case 'deepseek':
case 'ollama':
return new OpenAiPassthroughAdapter(type, this.deps);
case 'anthropic':
return new AnthropicAdapter(this.deps);
case 'gemini-cli':
// Intentionally deferred — gemini-cli requires the binary on the mcpd
// pod filesystem and subprocess lifecycle management. Flagged as
// homelab-only in the plan; not landing in this phase.
throw new UnsupportedProviderError(`${type} (subprocess providers are not supported in the proxy yet)`);
default:
throw new UnsupportedProviderError(type);
}
}
}

View File

@@ -0,0 +1,70 @@
/**
* Shared types for the LLM inference proxy.
*
* The wire format on the mcpctl side is OpenAI's chat/completions v1 — it's
* the de-facto lingua franca and every client library already speaks it.
* Provider-specific adapters translate to/from that shape.
*/
export interface OpenAiMessage {
role: 'system' | 'user' | 'assistant' | 'tool';
content: string | Array<{ type: string; text?: string; [k: string]: unknown }>;
name?: string;
tool_call_id?: string;
tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>;
}
export interface OpenAiChatRequest {
model: string;
messages: OpenAiMessage[];
stream?: boolean;
temperature?: number;
max_tokens?: number;
top_p?: number;
stop?: string | string[];
tools?: Array<{ type: 'function'; function: { name: string; description?: string; parameters?: Record<string, unknown> } }>;
tool_choice?: unknown;
// Passthrough: unknown extras forwarded as-is.
[k: string]: unknown;
}
export interface InferContext {
/** Normalised OpenAI-format body. Adapters read/transform from here. */
body: OpenAiChatRequest;
/** The Llm row's `model` field, used when the request body has an empty model. */
modelOverride: string;
/** The resolved API key, or empty string for providers that don't take one. */
apiKey: string;
/** Target URL from the Llm row (may be empty for provider-default). */
url: string;
/** Arbitrary config from the Llm row (e.g. vllm gpu settings). */
extraConfig: Record<string, unknown>;
}
export interface NonStreamingResult {
status: number;
/** OpenAI chat.completion response body. */
body: unknown;
}
export interface StreamingChunk {
/** Raw SSE data payload. Consumer emits `data: <payload>\n\n`. */
data: string;
/** Mark the end of stream — consumer emits `data: [DONE]\n\n`. */
done?: boolean;
}
export interface LlmAdapter {
readonly kind: string;
/** Non-streaming request. Returns the final chat.completion body. */
infer(ctx: InferContext): Promise<NonStreamingResult>;
/**
* Streaming request. Yields OpenAI-format SSE chunks. Adapters translate
* provider-native stream formats into OpenAI `chat.completion.chunk`s.
*/
stream(ctx: InferContext): AsyncGenerator<StreamingChunk>;
}
export interface AdapterDeps {
fetch?: typeof globalThis.fetch;
}

View File

@@ -0,0 +1,210 @@
import { describe, it, expect, vi } from 'vitest';
import { OpenAiPassthroughAdapter } from '../src/services/llm/adapters/openai-passthrough.js';
import { AnthropicAdapter } from '../src/services/llm/adapters/anthropic.js';
import { LlmAdapterRegistry, UnsupportedProviderError } from '../src/services/llm/dispatcher.js';
import type { InferContext } from '../src/services/llm/types.js';
function mockFetch(responses: Array<{ match: RegExp; status: number; body?: unknown; text?: string }>): ReturnType<typeof vi.fn> {
return vi.fn(async (input: string | URL, _init?: RequestInit) => {
const url = String(input);
const match = responses.find((r) => r.match.test(url));
if (!match) throw new Error(`unexpected fetch: ${url}`);
const body = match.body !== undefined ? JSON.stringify(match.body) : (match.text ?? '');
return new Response(body, { status: match.status, headers: { 'Content-Type': 'application/json' } });
});
}
function makeCtx(overrides: Partial<InferContext> = {}): InferContext {
return {
body: { model: '', messages: [{ role: 'user', content: 'hello' }] },
modelOverride: 'default-model',
apiKey: 'test-key',
url: '',
extraConfig: {},
...overrides,
};
}
// Helper to build a streaming Response from SSE lines.
function sseResponse(events: string[]): Response {
const body = events.join('\n\n') + '\n\n';
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode(body));
controller.close();
},
});
return new Response(stream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } });
}
describe('OpenAiPassthroughAdapter', () => {
it('infer: POSTs to <url>/v1/chat/completions with Authorization + body', async () => {
const fetchFn = mockFetch([{
match: /\/v1\/chat\/completions$/,
status: 200,
body: { id: 'x', choices: [{ message: { role: 'assistant', content: 'hi' } }] },
}]);
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({ url: 'https://api.example.com' });
const res = await adapter.infer(ctx);
expect(res.status).toBe(200);
const [url, init] = fetchFn.mock.calls[0] as [string, RequestInit];
expect(url).toBe('https://api.example.com/v1/chat/completions');
expect(init.method).toBe('POST');
const headers = init.headers as Record<string, string>;
expect(headers['Authorization']).toBe('Bearer test-key');
const sent = JSON.parse(init.body as string) as { model: string; stream: boolean };
expect(sent.model).toBe('default-model'); // filled from modelOverride
expect(sent.stream).toBe(false);
});
it('infer: uses default URL for openai when url is empty', async () => {
const fetchFn = mockFetch([{ match: /api\.openai\.com/, status: 200, body: {} }]);
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
await adapter.infer(makeCtx());
const [url] = fetchFn.mock.calls[0] as [string, RequestInit];
expect(url).toBe('https://api.openai.com/v1/chat/completions');
});
it('infer: throws for vllm when url is empty (no default)', async () => {
const adapter = new OpenAiPassthroughAdapter('vllm', { fetch: vi.fn() as unknown as typeof fetch });
await expect(adapter.infer(makeCtx())).rejects.toThrow(/no default endpoint/);
});
it('infer: omits Authorization when apiKey is empty', async () => {
const fetchFn = mockFetch([{ match: /ollama/, status: 200, body: {} }]);
const adapter = new OpenAiPassthroughAdapter('ollama', { fetch: fetchFn as unknown as typeof fetch });
await adapter.infer(makeCtx({ url: 'http://ollama:11434', apiKey: '' }));
const [, init] = fetchFn.mock.calls[0] as [string, RequestInit];
const headers = init.headers as Record<string, string>;
expect(headers['Authorization']).toBeUndefined();
});
it('stream: forwards SSE chunks and emits terminal [DONE]', async () => {
const fetchFn = vi.fn(async () => sseResponse([
'data: {"choices":[{"delta":{"content":"hi"}}]}',
'data: {"choices":[{"delta":{"content":"!"}}]}',
'data: [DONE]',
]));
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({ url: 'http://example', body: { model: '', messages: [], stream: true } });
const chunks: { data: string; done?: boolean }[] = [];
for await (const c of adapter.stream(ctx)) chunks.push(c);
expect(chunks).toHaveLength(3);
expect(chunks[2]?.done).toBe(true);
});
});
describe('AnthropicAdapter', () => {
it('infer: translates system+user messages, posts to /v1/messages', async () => {
const fetchFn = mockFetch([{
match: /\/v1\/messages$/,
status: 200,
body: {
id: 'msg_01', model: 'claude-3-5-sonnet-20241022', role: 'assistant',
content: [{ type: 'text', text: 'howdy' }],
stop_reason: 'end_turn',
usage: { input_tokens: 5, output_tokens: 2 },
},
}]);
const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({
body: {
model: '', messages: [
{ role: 'system', content: 'be nice' },
{ role: 'user', content: 'hi' },
],
},
modelOverride: 'claude-3-5-sonnet-20241022',
});
const res = await adapter.infer(ctx);
expect(res.status).toBe(200);
const [url, init] = fetchFn.mock.calls[0] as [string, RequestInit];
expect(url).toBe('https://api.anthropic.com/v1/messages');
const headers = init.headers as Record<string, string>;
expect(headers['x-api-key']).toBe('test-key');
expect(headers['anthropic-version']).toBeDefined();
const sent = JSON.parse(init.body as string) as {
model: string; system: string; messages: Array<{ role: string; content: string }>; max_tokens: number;
};
expect(sent.model).toBe('claude-3-5-sonnet-20241022');
expect(sent.system).toBe('be nice');
expect(sent.messages).toEqual([{ role: 'user', content: 'hi' }]);
expect(sent.max_tokens).toBe(1024); // default
// Response shape: OpenAI chat.completion
const body = res.body as { choices: Array<{ message: { content: string }; finish_reason: string }>; usage: { total_tokens: number } };
expect(body.choices[0]!.message.content).toBe('howdy');
expect(body.choices[0]!.finish_reason).toBe('stop');
expect(body.usage.total_tokens).toBe(7);
});
it('infer: returns a synthetic error body on non-2xx', async () => {
const fetchFn = vi.fn(async () => new Response('boom', { status: 500 }));
const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch });
const res = await adapter.infer(makeCtx({ body: { model: '', messages: [{ role: 'user', content: 'x' }] } }));
expect(res.status).toBe(500);
const body = res.body as { error: { message: string } };
expect(body.error.message).toMatch(/HTTP 500/);
});
it('stream: translates anthropic event stream into OpenAI chunks', async () => {
const events = [
'event: message_start\ndata: {"type":"message_start","message":{"id":"m","content":[]}}',
'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"he"}}',
'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"llo"}}',
'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn"}}',
'event: message_stop\ndata: {"type":"message_stop"}',
];
const fetchFn = vi.fn(async () => sseResponse(events));
const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({ body: { model: '', messages: [{ role: 'user', content: 'hi' }], stream: true } });
const chunks: { data: string; done?: boolean }[] = [];
for await (const c of adapter.stream(ctx)) chunks.push(c);
// Expect: role-prime, two text deltas, finish-reason, [DONE]
expect(chunks[chunks.length - 1]?.data).toBe('[DONE]');
expect(chunks[chunks.length - 1]?.done).toBe(true);
// First chunk is the role-prime (role: assistant, content: '').
const first = JSON.parse(chunks[0]!.data) as { choices: [{ delta: { role: string; content: string } }] };
expect(first.choices[0]!.delta.role).toBe('assistant');
// Next two chunks carry the text.
const d1 = JSON.parse(chunks[1]!.data) as { choices: [{ delta: { content: string } }] };
const d2 = JSON.parse(chunks[2]!.data) as { choices: [{ delta: { content: string } }] };
expect(d1.choices[0]!.delta.content).toBe('he');
expect(d2.choices[0]!.delta.content).toBe('llo');
// Finish-reason chunk.
const stopped = JSON.parse(chunks[3]!.data) as { choices: [{ finish_reason: string }] };
expect(stopped.choices[0]!.finish_reason).toBe('stop');
});
});
describe('LlmAdapterRegistry', () => {
it('returns the right adapter kind for each type', () => {
const reg = new LlmAdapterRegistry();
expect(reg.get('openai').kind).toBe('openai');
expect(reg.get('vllm').kind).toBe('vllm');
expect(reg.get('deepseek').kind).toBe('deepseek');
expect(reg.get('ollama').kind).toBe('ollama');
expect(reg.get('anthropic').kind).toBe('anthropic');
});
it('caches adapters between calls', () => {
const reg = new LlmAdapterRegistry();
const a = reg.get('openai');
const b = reg.get('openai');
expect(a).toBe(b);
});
it('rejects unsupported providers (gemini-cli is deferred)', () => {
const reg = new LlmAdapterRegistry();
expect(() => reg.get('gemini-cli')).toThrow(UnsupportedProviderError);
expect(() => reg.get('bogus')).toThrow(UnsupportedProviderError);
});
});

View File

@@ -0,0 +1,208 @@
import { describe, it, expect, vi, afterEach } from 'vitest';
import Fastify from 'fastify';
import type { FastifyInstance } from 'fastify';
import { registerLlmInferRoutes } from '../src/routes/llm-infer.js';
import { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
import { errorHandler } from '../src/middleware/error-handler.js';
import type { LlmView } from '../src/services/llm.service.js';
import { NotFoundError } from '../src/services/mcp-server.service.js';
let app: FastifyInstance;
function makeLlmView(overrides: Partial<LlmView> = {}): LlmView {
return {
id: 'llm-1',
name: 'claude',
type: 'anthropic',
model: 'claude-3-5-sonnet-20241022',
url: '',
tier: 'heavy',
description: '',
apiKeyRef: { name: 'anthropic-key', key: 'token' },
extraConfig: {},
version: 1,
createdAt: new Date(),
updatedAt: new Date(),
...overrides,
};
}
afterEach(async () => {
if (app) await app.close();
});
function sseResponse(events: string[]): Response {
const body = events.join('\n\n') + '\n\n';
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode(body));
controller.close();
},
});
return new Response(stream, { status: 200 });
}
interface LlmServiceLike {
getByName: (name: string) => Promise<LlmView>;
resolveApiKey: (name: string) => Promise<string>;
}
async function setupApp(
llmService: LlmServiceLike,
adapters: LlmAdapterRegistry,
onInferenceEvent?: Parameters<typeof registerLlmInferRoutes>[1]['onInferenceEvent'],
): Promise<FastifyInstance> {
app = Fastify({ logger: false });
app.setErrorHandler(errorHandler);
const deps: Parameters<typeof registerLlmInferRoutes>[1] = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
llmService: llmService as any,
adapters,
};
if (onInferenceEvent !== undefined) deps.onInferenceEvent = onInferenceEvent;
registerLlmInferRoutes(app, deps);
await app.ready();
return app;
}
describe('POST /api/v1/llms/:name/infer', () => {
it('returns 404 when the Llm does not exist', async () => {
const svc: LlmServiceLike = {
getByName: async () => { throw new NotFoundError('Llm not found: missing'); },
resolveApiKey: async () => '',
};
await setupApp(svc, new LlmAdapterRegistry());
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/missing/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(404);
});
it('returns 400 when messages is missing', async () => {
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ apiKeyRef: null }),
resolveApiKey: async () => '',
};
await setupApp(svc, new LlmAdapterRegistry());
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: {},
});
expect(res.statusCode).toBe(400);
});
it('dispatches non-streaming to the adapter and returns its JSON', async () => {
const fetchFn = vi.fn(async () => new Response(JSON.stringify({
id: 'msg_1', model: 'claude-3-5-sonnet-20241022', role: 'assistant',
content: [{ type: 'text', text: 'hello' }],
stop_reason: 'end_turn',
usage: { input_tokens: 1, output_tokens: 1 },
}), { status: 200 }));
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const svc: LlmServiceLike = {
getByName: async () => makeLlmView(),
resolveApiKey: async () => 'sk-ant-xyz',
};
const events: unknown[] = [];
await setupApp(svc, adapters, (e) => events.push(e));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(200);
const body = res.json<{ choices: Array<{ message: { content: string } }> }>();
expect(body.choices[0]!.message.content).toBe('hello');
// Audit event emitted
expect(events).toHaveLength(1);
expect((events[0] as { kind: string; llmName: string; status: number }).kind).toBe('llm_inference_call');
expect((events[0] as { llmName: string }).llmName).toBe('claude');
expect((events[0] as { streaming: boolean }).streaming).toBe(false);
expect((events[0] as { status: number }).status).toBe(200);
});
it('500s when apiKey resolution fails', async () => {
const adapters = new LlmAdapterRegistry();
const svc: LlmServiceLike = {
getByName: async () => makeLlmView(),
resolveApiKey: async () => { throw new Error('secret not found'); },
};
await setupApp(svc, adapters);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(500);
expect(res.json<{ error: string }>().error).toMatch(/secret not found/);
});
it('skips apiKey resolution when the Llm has no apiKeyRef', async () => {
const fetchFn = vi.fn(async () => new Response(JSON.stringify({ id: 'x', choices: [] }), { status: 200 }));
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const resolveSpy = vi.fn();
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ type: 'ollama', url: 'http://ollama:11434', apiKeyRef: null }),
resolveApiKey: resolveSpy as unknown as LlmServiceLike['resolveApiKey'],
};
await setupApp(svc, adapters);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/ollama-local/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(200);
expect(resolveSpy).not.toHaveBeenCalled();
});
it('streams SSE chunks for stream: true', async () => {
const fetchFn = vi.fn(async () => sseResponse([
'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"hi"}}',
'event: message_stop\ndata: {"type":"message_stop"}',
]));
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const svc: LlmServiceLike = {
getByName: async () => makeLlmView(),
resolveApiKey: async () => 'sk-ant-xyz',
};
const events: Array<{ streaming: boolean; status: number }> = [];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await setupApp(svc, adapters, ((e: any) => events.push(e)) as any);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: { messages: [{ role: 'user', content: 'hi' }], stream: true },
});
expect(res.statusCode).toBe(200);
expect(res.body).toContain('data:');
expect(res.body).toContain('[DONE]');
expect(events).toHaveLength(1);
expect(events[0]!.streaming).toBe(true);
});
it('502s on adapter errors (non-streaming)', async () => {
const fetchFn = vi.fn(async () => { throw new Error('upstream down'); });
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ type: 'openai', url: 'http://example', apiKeyRef: null }),
resolveApiKey: async () => '',
};
await setupApp(svc, adapters);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/x/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(502);
expect(res.json<{ error: string }>().error).toMatch(/upstream down/);
});
});