From 23f53a0798b7ad4a4cd0960f8c04a128c2488bb3 Mon Sep 17 00:00:00 2001 From: Michal Date: Sat, 18 Apr 2026 22:43:55 +0100 Subject: [PATCH] =?UTF-8?q?feat(mcpd):=20inference=20proxy=20=E2=80=94=20P?= =?UTF-8?q?OST=20/api/v1/llms/:name/infer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: the point of the Llm resource (Phase 1) is that credentials never leave the server. This lands the proxy: clients POST OpenAI chat/completions to mcpd, mcpd attaches the provider API key server-side, and the response streams back as OpenAI-format SSE. Design: - Wire format client-side is always OpenAI chat/completions — every existing SDK speaks it. Adapters translate on the provider side. - `openai | vllm | deepseek | ollama` → pure passthrough (they already speak OpenAI). `anthropic` → translator to/from Anthropic Messages API (system-string extraction, content-block flattening, SSE event remap). - Plain fetch; no @anthropic-ai/sdk dep. Consistent with the OpenBao driver shape and keeps the proxy layer thin. - `gemini-cli` intentionally rejected — subprocess providers need extra lifecycle plumbing; deferred to a follow-up. - Streaming: adapters yield `StreamingChunk`s; the route frames them as `data: \n\n` + terminal `data: [DONE]\n\n` so any OpenAI client works unchanged. RBAC: - New URL special-case in mapUrlToPermission: `POST /api/v1/llms/:name/infer` → `run:llms:` (not the default create:llms). Users need an explicit `{role: 'run', resource: 'llms', [name: X]}` binding to call infer. - Possession of `edit:llms` does NOT imply `run` — keeps catalogue management separate from spend. Audit: route emits an `llm_inference_call` event per request (llm name, model, user/tokenSha, streaming, duration, status). main.ts wires it to the structured logger for now; hook is in place for a richer audit sink later. Tests: - 11 adapter tests (passthrough POST shape + default URLs + no-auth ollama + SSE forwarding; anthropic translate request/response + non-2xx wrap + SSE event translation; registry dispatch + caching + unsupported-provider). - 7 route tests (404, 400, non-streaming dispatch + audit, apiKey failure, null apiKeyRef path, streaming SSE output, 502 on adapter error). - Full suite 1830/1830 (+18 from Phase 1's 1812). TypeScript clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/mcpd/src/main.ts | 26 ++ src/mcpd/src/routes/llm-infer.ts | 145 ++++++++++ .../src/services/llm/adapters/anthropic.ts | 256 ++++++++++++++++++ .../llm/adapters/openai-passthrough.ts | 112 ++++++++ src/mcpd/src/services/llm/dispatcher.ts | 52 ++++ src/mcpd/src/services/llm/types.ts | 70 +++++ src/mcpd/tests/llm-adapters.test.ts | 210 ++++++++++++++ src/mcpd/tests/llm-infer-route.test.ts | 208 ++++++++++++++ 8 files changed, 1079 insertions(+) create mode 100644 src/mcpd/src/routes/llm-infer.ts create mode 100644 src/mcpd/src/services/llm/adapters/anthropic.ts create mode 100644 src/mcpd/src/services/llm/adapters/openai-passthrough.ts create mode 100644 src/mcpd/src/services/llm/dispatcher.ts create mode 100644 src/mcpd/src/services/llm/types.ts create mode 100644 src/mcpd/tests/llm-adapters.test.ts create mode 100644 src/mcpd/tests/llm-infer-route.test.ts diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 0e3785c..eddafd4 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -28,7 +28,9 @@ import { registerSecretBackendRoutes } from './routes/secret-backends.js'; import { registerSecretMigrateRoutes } from './routes/secret-migrate.js'; import { LlmRepository } from './repositories/llm.repository.js'; import { LlmService } from './services/llm.service.js'; +import { LlmAdapterRegistry } from './services/llm/dispatcher.js'; import { registerLlmRoutes } from './routes/llms.js'; +import { registerLlmInferRoutes } from './routes/llm-infer.js'; import { PromptRepository } from './repositories/prompt.repository.js'; import { PromptRequestRepository } from './repositories/prompt-request.repository.js'; import { bootstrapSystemProject } from './bootstrap/system-project.js'; @@ -105,6 +107,12 @@ function mapUrlToPermission(method: string, url: string): PermissionCheck { // /api/v1/secrets/migrate is a bulk cross-backend operation — treat as op, not a plain secret write. if (url.startsWith('/api/v1/secrets/migrate')) return { kind: 'operation', operation: 'migrate-secrets' }; + // /api/v1/llms/:name/infer → `run:llms:` (not the default create:llms). + const inferMatch = url.match(/^\/api\/v1\/llms\/([^/?]+)\/infer/); + if (inferMatch?.[1]) { + return { kind: 'resource', resource: 'llms', action: 'run', resourceName: inferMatch[1] }; + } + const resourceMap: Record = { 'servers': 'servers', 'instances': 'instances', @@ -334,6 +342,7 @@ async function main(): Promise { const secretService = new SecretService(secretRepo, secretBackendService); const secretMigrateService = new SecretMigrateService(secretRepo, secretBackendService); const llmService = new LlmService(llmRepo, secretService); + const llmAdapters = new LlmAdapterRegistry(); const instanceService = new InstanceService(instanceRepo, serverRepo, orchestrator, secretService); serverService.setInstanceService(instanceService); const projectService = new ProjectService(projectRepo, serverRepo); @@ -475,6 +484,23 @@ async function main(): Promise { registerSecretBackendRoutes(app, secretBackendService); registerSecretMigrateRoutes(app, secretMigrateService); registerLlmRoutes(app, llmService); + registerLlmInferRoutes(app, { + llmService, + adapters: llmAdapters, + onInferenceEvent: (event) => { + app.log.info({ + event: 'llm_inference_call', + llm: event.llmName, + model: event.model, + type: event.type, + userId: event.userId, + tokenSha: event.tokenSha, + streaming: event.streaming, + durationMs: event.durationMs, + status: event.status, + }); + }, + }); registerInstanceRoutes(app, instanceService); registerProjectRoutes(app, projectService); registerAuditLogRoutes(app, auditLogService); diff --git a/src/mcpd/src/routes/llm-infer.ts b/src/mcpd/src/routes/llm-infer.ts new file mode 100644 index 0000000..6c93cc9 --- /dev/null +++ b/src/mcpd/src/routes/llm-infer.ts @@ -0,0 +1,145 @@ +/** + * POST /api/v1/llms/:name/infer + * + * OpenAI-compatible chat completions endpoint. The RBAC check runs in the + * global hook — this URL maps to `run:llms:`, not the default + * `create:llms`. See `main.ts:mapUrlToPermission`. + * + * Non-streaming: resolves the Llm, dispatches to the right provider adapter, + * returns the OpenAI chat.completion JSON. + * + * Streaming (`stream: true`): pipes adapter-emitted chunks back as + * `text/event-stream`. Adapters translate provider-native SSE into OpenAI + * `chat.completion.chunk`s so clients can use any OpenAI SDK unchanged. + */ +import type { FastifyInstance, FastifyReply } from 'fastify'; +import type { LlmService } from '../services/llm.service.js'; +import type { LlmAdapterRegistry } from '../services/llm/dispatcher.js'; +import { NotFoundError } from '../services/mcp-server.service.js'; +import type { OpenAiChatRequest, InferContext } from '../services/llm/types.js'; + +export interface LlmInferDeps { + llmService: LlmService; + adapters: LlmAdapterRegistry; + /** Optional hook to emit audit events — consumer may ignore. */ + onInferenceEvent?: (event: InferenceAuditEvent) => void; +} + +export interface InferenceAuditEvent { + kind: 'llm_inference_call'; + llmName: string; + model: string; + type: string; + userId?: string | undefined; + tokenSha?: string | undefined; + streaming: boolean; + durationMs: number; + status: number; +} + +export function registerLlmInferRoutes( + app: FastifyInstance, + deps: LlmInferDeps, +): void { + app.post<{ Params: { name: string }; Body: OpenAiChatRequest }>( + '/api/v1/llms/:name/infer', + async (request, reply) => { + const started = Date.now(); + let llm; + try { + llm = await deps.llmService.getByName(request.params.name); + } catch (err) { + if (err instanceof NotFoundError) { + reply.code(404); + return { error: err.message }; + } + throw err; + } + + const body = (request.body ?? {}) as OpenAiChatRequest; + if (!body.messages || body.messages.length === 0) { + reply.code(400); + return { error: 'messages is required' }; + } + + // Resolve API key (may be empty string for providers that don't take one). + let apiKey = ''; + if (llm.apiKeyRef !== null) { + try { + apiKey = await deps.llmService.resolveApiKey(llm.name); + } catch (err) { + reply.code(500); + return { error: `Failed to resolve API key: ${err instanceof Error ? err.message : String(err)}` }; + } + } + + const ctx: InferContext = { + body, + modelOverride: llm.model, + apiKey, + url: llm.url, + extraConfig: llm.extraConfig, + }; + + const adapter = deps.adapters.get(llm.type); + const streaming = body.stream === true; + + const audit = (status: number): void => { + if (deps.onInferenceEvent === undefined) return; + deps.onInferenceEvent({ + kind: 'llm_inference_call', + llmName: llm.name, + model: llm.model, + type: llm.type, + userId: request.userId, + tokenSha: request.mcpToken?.tokenSha, + streaming, + durationMs: Date.now() - started, + status, + }); + }; + + if (!streaming) { + try { + const result = await adapter.infer(ctx); + reply.code(result.status); + audit(result.status); + return result.body; + } catch (err) { + audit(502); + reply.code(502); + return { error: err instanceof Error ? err.message : String(err) }; + } + } + + // Streaming path — set SSE headers and pipe chunks. + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + try { + for await (const chunk of adapter.stream(ctx)) { + writeSseChunk(reply, chunk.data); + if (chunk.done === true) break; + } + audit(200); + } catch (err) { + const payload = JSON.stringify({ + error: { message: err instanceof Error ? err.message : String(err) }, + }); + writeSseChunk(reply, payload); + writeSseChunk(reply, '[DONE]'); + audit(502); + } finally { + reply.raw.end(); + } + return reply; + }, + ); +} + +function writeSseChunk(reply: FastifyReply, data: string): void { + reply.raw.write(`data: ${data}\n\n`); +} diff --git a/src/mcpd/src/services/llm/adapters/anthropic.ts b/src/mcpd/src/services/llm/adapters/anthropic.ts new file mode 100644 index 0000000..18c2fef --- /dev/null +++ b/src/mcpd/src/services/llm/adapters/anthropic.ts @@ -0,0 +1,256 @@ +/** + * Anthropic adapter — translates between OpenAI chat/completions format and + * the Anthropic Messages API (`POST /v1/messages`). + * + * Key differences we translate: + * - OpenAI `role: 'system'` messages become a top-level `system` string. + * - Anthropic returns `content: [{ type: 'text', text }]` — we join into + * OpenAI's `content: "…"` string. + * - Streaming: Anthropic emits a sequence of + * `message_start / content_block_{start,delta,stop} / message_delta / + * message_stop` events. We translate those to OpenAI + * `chat.completion.chunk` deltas. + * + * This adapter implements the subset needed for plain-text chat — tool-use + * translation is intentionally left out for this phase; agents that need tool + * calling should target an OpenAI-compatible provider until the translator + * covers it. + */ +import type { + LlmAdapter, + InferContext, + NonStreamingResult, + StreamingChunk, + AdapterDeps, + OpenAiMessage, +} from '../types.js'; + +const DEFAULT_ANTHROPIC_URL = 'https://api.anthropic.com'; +const ANTHROPIC_VERSION = '2023-06-01'; + +interface AnthropicMessageResponse { + id: string; + model: string; + role: 'assistant'; + content: Array<{ type: 'text'; text: string } | { type: string; [k: string]: unknown }>; + stop_reason?: string; + usage?: { input_tokens: number; output_tokens: number }; +} + +export class AnthropicAdapter implements LlmAdapter { + readonly kind = 'anthropic'; + private readonly fetchImpl: typeof globalThis.fetch; + + constructor(deps: AdapterDeps = {}) { + this.fetchImpl = deps.fetch ?? globalThis.fetch; + } + + async infer(ctx: InferContext): Promise { + const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, ''); + const body = this.toAnthropicRequest(ctx, false); + const res = await this.fetchImpl(`${url}/v1/messages`, { + method: 'POST', + headers: this.headers(ctx), + body: JSON.stringify(body), + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + return { + status: res.status, + body: { error: { message: `anthropic: HTTP ${String(res.status)} ${text}` } }, + }; + } + const anth = await res.json() as AnthropicMessageResponse; + return { status: 200, body: this.toOpenAiResponse(anth) }; + } + + async *stream(ctx: InferContext): AsyncGenerator { + const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, ''); + const body = this.toAnthropicRequest(ctx, true); + const res = await this.fetchImpl(`${url}/v1/messages`, { + method: 'POST', + headers: this.headers(ctx), + body: JSON.stringify(body), + }); + if (!res.ok || res.body === null) { + const text = await res.text().catch(() => ''); + throw new Error(`anthropic stream: HTTP ${String(res.status)} ${text}`); + } + + const id = `chatcmpl-${cryptoNonce()}`; + const model = body.model; + const created = Math.floor(Date.now() / 1000); + + // Parse Anthropic SSE. Each event is `event: \ndata: \n\n`. + const decoder = new TextDecoder(); + let buf = ''; + const reader = res.body.getReader(); + let emittedFirst = false; + + const baseChunk = (delta: Record, finishReason?: string): string => { + const chunk = { + id, + object: 'chat.completion.chunk', + created, + model, + choices: [{ + index: 0, + delta, + finish_reason: finishReason ?? null, + }], + }; + return JSON.stringify(chunk); + }; + + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + + let idx: number; + while ((idx = buf.indexOf('\n\n')) !== -1) { + const rawEvent = buf.slice(0, idx); + buf = buf.slice(idx + 2); + const parsed = parseSseEvent(rawEvent); + if (parsed === null) continue; + const { event, data } = parsed; + + if (event === 'content_block_delta') { + const textDelta = (data as { delta?: { type?: string; text?: string } }).delta; + if (textDelta?.type === 'text_delta' && typeof textDelta.text === 'string') { + if (!emittedFirst) { + yield { data: baseChunk({ role: 'assistant', content: '' }) }; + emittedFirst = true; + } + yield { data: baseChunk({ content: textDelta.text }) }; + } + } else if (event === 'message_delta') { + const stopReason = (data as { delta?: { stop_reason?: string } }).delta?.stop_reason; + if (typeof stopReason === 'string') { + yield { data: baseChunk({}, mapStopReason(stopReason)) }; + } + } else if (event === 'message_stop') { + yield { data: '[DONE]', done: true }; + return; + } else if (event === 'error') { + throw new Error(`anthropic stream error: ${JSON.stringify(data)}`); + } + } + } + } finally { + reader.releaseLock(); + } + // Anthropic closed without message_stop — give consumer a clean end. + yield { data: '[DONE]', done: true }; + } + + private headers(ctx: InferContext): Record { + return { + 'Content-Type': 'application/json', + 'x-api-key': ctx.apiKey, + 'anthropic-version': ANTHROPIC_VERSION, + }; + } + + /** Translate the OpenAI request to the Anthropic Messages shape. */ + private toAnthropicRequest(ctx: InferContext, stream: boolean): { + model: string; + max_tokens: number; + messages: Array<{ role: 'user' | 'assistant'; content: string }>; + system?: string; + stream?: boolean; + temperature?: number; + top_p?: number; + stop_sequences?: string[]; + } { + const { body } = ctx; + const systemParts: string[] = []; + const messages: Array<{ role: 'user' | 'assistant'; content: string }> = []; + + for (const msg of body.messages) { + const text = normaliseContent(msg); + if (msg.role === 'system') { + systemParts.push(text); + } else if (msg.role === 'user' || msg.role === 'assistant') { + messages.push({ role: msg.role, content: text }); + } + // `tool` role messages are dropped — tool translation is out of scope + // for this phase. + } + + const out: ReturnType = { + model: body.model !== '' ? body.model : ctx.modelOverride, + max_tokens: typeof body.max_tokens === 'number' ? body.max_tokens : 1024, + messages, + }; + if (systemParts.length > 0) out.system = systemParts.join('\n\n'); + if (stream) out.stream = true; + if (typeof body.temperature === 'number') out.temperature = body.temperature; + if (typeof body.top_p === 'number') out.top_p = body.top_p; + if (body.stop !== undefined) { + out.stop_sequences = Array.isArray(body.stop) ? body.stop : [body.stop]; + } + return out; + } + + private toOpenAiResponse(anth: AnthropicMessageResponse): Record { + const text = anth.content + .map((c) => (c.type === 'text' && typeof (c as { text?: unknown }).text === 'string' + ? (c as { text: string }).text + : '')) + .join(''); + return { + id: `chatcmpl-${anth.id}`, + object: 'chat.completion', + created: Math.floor(Date.now() / 1000), + model: anth.model, + choices: [{ + index: 0, + message: { role: 'assistant', content: text }, + finish_reason: mapStopReason(anth.stop_reason ?? 'end_turn'), + }], + usage: anth.usage ? { + prompt_tokens: anth.usage.input_tokens, + completion_tokens: anth.usage.output_tokens, + total_tokens: anth.usage.input_tokens + anth.usage.output_tokens, + } : undefined, + }; + } +} + +function normaliseContent(msg: OpenAiMessage): string { + if (typeof msg.content === 'string') return msg.content; + return msg.content + .map((part) => (typeof part.text === 'string' ? part.text : '')) + .join(''); +} + +function mapStopReason(r: string): string { + // Anthropic → OpenAI finish_reason + if (r === 'end_turn' || r === 'stop_sequence') return 'stop'; + if (r === 'max_tokens') return 'length'; + if (r === 'tool_use') return 'tool_calls'; + return r; +} + +function parseSseEvent(raw: string): { event: string; data: unknown } | null { + let event = ''; + let dataLine = ''; + for (const line of raw.split('\n')) { + if (line.startsWith('event:')) event = line.slice(6).trim(); + else if (line.startsWith('data:')) dataLine += line.slice(5).trim(); + } + if (dataLine === '') return null; + try { + return { event, data: JSON.parse(dataLine) as unknown }; + } catch { + return null; + } +} + +function cryptoNonce(): string { + // Not security-sensitive — just a short randomish id. + return Math.random().toString(36).slice(2, 10); +} diff --git a/src/mcpd/src/services/llm/adapters/openai-passthrough.ts b/src/mcpd/src/services/llm/adapters/openai-passthrough.ts new file mode 100644 index 0000000..8d9c2dd --- /dev/null +++ b/src/mcpd/src/services/llm/adapters/openai-passthrough.ts @@ -0,0 +1,112 @@ +/** + * OpenAI-passthrough adapter. + * + * Covers any provider that already speaks OpenAI chat/completions on the + * wire: `openai`, `vllm`, `deepseek`, `ollama` (with their openai-compatible + * endpoint enabled). The adapter forwards the request body verbatim and + * streams the response straight through — no wire translation. + * + * Defaults when `url` is empty: + * - openai → https://api.openai.com + * - deepseek → https://api.deepseek.com + * - vllm/ollama → must be configured; these have no canonical public URL. + */ +import type { LlmAdapter, InferContext, NonStreamingResult, StreamingChunk, AdapterDeps } from '../types.js'; + +const DEFAULT_URLS: Record = { + openai: 'https://api.openai.com', + deepseek: 'https://api.deepseek.com', +}; + +export class OpenAiPassthroughAdapter implements LlmAdapter { + readonly kind: string; + private readonly fetchImpl: typeof globalThis.fetch; + + constructor(kind: 'openai' | 'vllm' | 'deepseek' | 'ollama', deps: AdapterDeps = {}) { + this.kind = kind; + this.fetchImpl = deps.fetch ?? globalThis.fetch; + } + + async infer(ctx: InferContext): Promise { + const url = this.endpointUrl(ctx.url); + const body = this.prepareBody(ctx, false); + const res = await this.fetchImpl(`${url}/v1/chat/completions`, { + method: 'POST', + headers: this.headers(ctx), + body: JSON.stringify(body), + }); + const json = await res.json() as unknown; + return { status: res.status, body: json }; + } + + async *stream(ctx: InferContext): AsyncGenerator { + const url = this.endpointUrl(ctx.url); + const body = this.prepareBody(ctx, true); + const res = await this.fetchImpl(`${url}/v1/chat/completions`, { + method: 'POST', + headers: this.headers(ctx), + body: JSON.stringify(body), + }); + if (!res.ok || res.body === null) { + const text = await res.text().catch(() => ''); + throw new Error(`${this.kind} stream: HTTP ${String(res.status)} ${text}`); + } + + // Re-frame the provider's SSE stream into our `StreamingChunk` shape. + // OpenAI-compat providers already emit `data: {...}` + `data: [DONE]` — + // we just unwrap the `data: ` prefix, forward payloads, and emit a + // single terminal `done` chunk so the consumer always gets one. + const decoder = new TextDecoder(); + let buf = ''; + const reader = res.body.getReader(); + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + let idx: number; + while ((idx = buf.indexOf('\n\n')) !== -1) { + const event = buf.slice(0, idx); + buf = buf.slice(idx + 2); + for (const line of event.split('\n')) { + if (!line.startsWith('data:')) continue; + const payload = line.slice(5).trim(); + if (payload === '') continue; + if (payload === '[DONE]') { + yield { data: '[DONE]', done: true }; + return; + } + yield { data: payload }; + } + } + } + } finally { + reader.releaseLock(); + } + // Provider closed without emitting [DONE] — give the consumer a clean end. + yield { data: '[DONE]', done: true }; + } + + private endpointUrl(url: string): string { + if (url !== '') return url.replace(/\/+$/, ''); + const def = DEFAULT_URLS[this.kind]; + if (def === undefined) { + throw new Error(`${this.kind}: url is required (no default endpoint for this provider)`); + } + return def; + } + + private headers(ctx: InferContext): Record { + const headers: Record = { 'Content-Type': 'application/json' }; + if (ctx.apiKey !== '') headers['Authorization'] = `Bearer ${ctx.apiKey}`; + return headers; + } + + private prepareBody(ctx: InferContext, stream: boolean): Record { + const out: Record = { ...ctx.body }; + if (out.model === undefined || out.model === '') out.model = ctx.modelOverride; + out.stream = stream; + return out; + } +} diff --git a/src/mcpd/src/services/llm/dispatcher.ts b/src/mcpd/src/services/llm/dispatcher.ts new file mode 100644 index 0000000..d462d46 --- /dev/null +++ b/src/mcpd/src/services/llm/dispatcher.ts @@ -0,0 +1,52 @@ +/** + * Adapter dispatcher for the inference proxy. + * + * `getAdapter(type)` returns the right adapter instance for an Llm's `type` + * column. Adapters are cached per-type — they carry no per-request state. + * The caller (the infer route) supplies the resolved API key + request body + * through `InferContext`, so a single adapter instance serves every Llm of + * that type. + */ +import type { LlmAdapter, AdapterDeps } from './types.js'; +import { OpenAiPassthroughAdapter } from './adapters/openai-passthrough.js'; +import { AnthropicAdapter } from './adapters/anthropic.js'; + +export class UnsupportedProviderError extends Error { + constructor(type: string) { + super(`Unsupported LLM provider: ${type}`); + this.name = 'UnsupportedProviderError'; + } +} + +export class LlmAdapterRegistry { + private readonly cache = new Map(); + + constructor(private readonly deps: AdapterDeps = {}) {} + + get(type: string): LlmAdapter { + const cached = this.cache.get(type); + if (cached !== undefined) return cached; + const adapter = this.build(type); + this.cache.set(type, adapter); + return adapter; + } + + private build(type: string): LlmAdapter { + switch (type) { + case 'openai': + case 'vllm': + case 'deepseek': + case 'ollama': + return new OpenAiPassthroughAdapter(type, this.deps); + case 'anthropic': + return new AnthropicAdapter(this.deps); + case 'gemini-cli': + // Intentionally deferred — gemini-cli requires the binary on the mcpd + // pod filesystem and subprocess lifecycle management. Flagged as + // homelab-only in the plan; not landing in this phase. + throw new UnsupportedProviderError(`${type} (subprocess providers are not supported in the proxy yet)`); + default: + throw new UnsupportedProviderError(type); + } + } +} diff --git a/src/mcpd/src/services/llm/types.ts b/src/mcpd/src/services/llm/types.ts new file mode 100644 index 0000000..2e78be7 --- /dev/null +++ b/src/mcpd/src/services/llm/types.ts @@ -0,0 +1,70 @@ +/** + * Shared types for the LLM inference proxy. + * + * The wire format on the mcpctl side is OpenAI's chat/completions v1 — it's + * the de-facto lingua franca and every client library already speaks it. + * Provider-specific adapters translate to/from that shape. + */ + +export interface OpenAiMessage { + role: 'system' | 'user' | 'assistant' | 'tool'; + content: string | Array<{ type: string; text?: string; [k: string]: unknown }>; + name?: string; + tool_call_id?: string; + tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>; +} + +export interface OpenAiChatRequest { + model: string; + messages: OpenAiMessage[]; + stream?: boolean; + temperature?: number; + max_tokens?: number; + top_p?: number; + stop?: string | string[]; + tools?: Array<{ type: 'function'; function: { name: string; description?: string; parameters?: Record } }>; + tool_choice?: unknown; + // Passthrough: unknown extras forwarded as-is. + [k: string]: unknown; +} + +export interface InferContext { + /** Normalised OpenAI-format body. Adapters read/transform from here. */ + body: OpenAiChatRequest; + /** The Llm row's `model` field, used when the request body has an empty model. */ + modelOverride: string; + /** The resolved API key, or empty string for providers that don't take one. */ + apiKey: string; + /** Target URL from the Llm row (may be empty for provider-default). */ + url: string; + /** Arbitrary config from the Llm row (e.g. vllm gpu settings). */ + extraConfig: Record; +} + +export interface NonStreamingResult { + status: number; + /** OpenAI chat.completion response body. */ + body: unknown; +} + +export interface StreamingChunk { + /** Raw SSE data payload. Consumer emits `data: \n\n`. */ + data: string; + /** Mark the end of stream — consumer emits `data: [DONE]\n\n`. */ + done?: boolean; +} + +export interface LlmAdapter { + readonly kind: string; + /** Non-streaming request. Returns the final chat.completion body. */ + infer(ctx: InferContext): Promise; + /** + * Streaming request. Yields OpenAI-format SSE chunks. Adapters translate + * provider-native stream formats into OpenAI `chat.completion.chunk`s. + */ + stream(ctx: InferContext): AsyncGenerator; +} + +export interface AdapterDeps { + fetch?: typeof globalThis.fetch; +} diff --git a/src/mcpd/tests/llm-adapters.test.ts b/src/mcpd/tests/llm-adapters.test.ts new file mode 100644 index 0000000..e80991b --- /dev/null +++ b/src/mcpd/tests/llm-adapters.test.ts @@ -0,0 +1,210 @@ +import { describe, it, expect, vi } from 'vitest'; +import { OpenAiPassthroughAdapter } from '../src/services/llm/adapters/openai-passthrough.js'; +import { AnthropicAdapter } from '../src/services/llm/adapters/anthropic.js'; +import { LlmAdapterRegistry, UnsupportedProviderError } from '../src/services/llm/dispatcher.js'; +import type { InferContext } from '../src/services/llm/types.js'; + +function mockFetch(responses: Array<{ match: RegExp; status: number; body?: unknown; text?: string }>): ReturnType { + return vi.fn(async (input: string | URL, _init?: RequestInit) => { + const url = String(input); + const match = responses.find((r) => r.match.test(url)); + if (!match) throw new Error(`unexpected fetch: ${url}`); + const body = match.body !== undefined ? JSON.stringify(match.body) : (match.text ?? ''); + return new Response(body, { status: match.status, headers: { 'Content-Type': 'application/json' } }); + }); +} + +function makeCtx(overrides: Partial = {}): InferContext { + return { + body: { model: '', messages: [{ role: 'user', content: 'hello' }] }, + modelOverride: 'default-model', + apiKey: 'test-key', + url: '', + extraConfig: {}, + ...overrides, + }; +} + +// Helper to build a streaming Response from SSE lines. +function sseResponse(events: string[]): Response { + const body = events.join('\n\n') + '\n\n'; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(body)); + controller.close(); + }, + }); + return new Response(stream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }); +} + +describe('OpenAiPassthroughAdapter', () => { + it('infer: POSTs to /v1/chat/completions with Authorization + body', async () => { + const fetchFn = mockFetch([{ + match: /\/v1\/chat\/completions$/, + status: 200, + body: { id: 'x', choices: [{ message: { role: 'assistant', content: 'hi' } }] }, + }]); + const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch }); + const ctx = makeCtx({ url: 'https://api.example.com' }); + const res = await adapter.infer(ctx); + expect(res.status).toBe(200); + const [url, init] = fetchFn.mock.calls[0] as [string, RequestInit]; + expect(url).toBe('https://api.example.com/v1/chat/completions'); + expect(init.method).toBe('POST'); + const headers = init.headers as Record; + expect(headers['Authorization']).toBe('Bearer test-key'); + const sent = JSON.parse(init.body as string) as { model: string; stream: boolean }; + expect(sent.model).toBe('default-model'); // filled from modelOverride + expect(sent.stream).toBe(false); + }); + + it('infer: uses default URL for openai when url is empty', async () => { + const fetchFn = mockFetch([{ match: /api\.openai\.com/, status: 200, body: {} }]); + const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch }); + await adapter.infer(makeCtx()); + const [url] = fetchFn.mock.calls[0] as [string, RequestInit]; + expect(url).toBe('https://api.openai.com/v1/chat/completions'); + }); + + it('infer: throws for vllm when url is empty (no default)', async () => { + const adapter = new OpenAiPassthroughAdapter('vllm', { fetch: vi.fn() as unknown as typeof fetch }); + await expect(adapter.infer(makeCtx())).rejects.toThrow(/no default endpoint/); + }); + + it('infer: omits Authorization when apiKey is empty', async () => { + const fetchFn = mockFetch([{ match: /ollama/, status: 200, body: {} }]); + const adapter = new OpenAiPassthroughAdapter('ollama', { fetch: fetchFn as unknown as typeof fetch }); + await adapter.infer(makeCtx({ url: 'http://ollama:11434', apiKey: '' })); + const [, init] = fetchFn.mock.calls[0] as [string, RequestInit]; + const headers = init.headers as Record; + expect(headers['Authorization']).toBeUndefined(); + }); + + it('stream: forwards SSE chunks and emits terminal [DONE]', async () => { + const fetchFn = vi.fn(async () => sseResponse([ + 'data: {"choices":[{"delta":{"content":"hi"}}]}', + 'data: {"choices":[{"delta":{"content":"!"}}]}', + 'data: [DONE]', + ])); + const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch }); + const ctx = makeCtx({ url: 'http://example', body: { model: '', messages: [], stream: true } }); + const chunks: { data: string; done?: boolean }[] = []; + for await (const c of adapter.stream(ctx)) chunks.push(c); + expect(chunks).toHaveLength(3); + expect(chunks[2]?.done).toBe(true); + }); +}); + +describe('AnthropicAdapter', () => { + it('infer: translates system+user messages, posts to /v1/messages', async () => { + const fetchFn = mockFetch([{ + match: /\/v1\/messages$/, + status: 200, + body: { + id: 'msg_01', model: 'claude-3-5-sonnet-20241022', role: 'assistant', + content: [{ type: 'text', text: 'howdy' }], + stop_reason: 'end_turn', + usage: { input_tokens: 5, output_tokens: 2 }, + }, + }]); + const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch }); + const ctx = makeCtx({ + body: { + model: '', messages: [ + { role: 'system', content: 'be nice' }, + { role: 'user', content: 'hi' }, + ], + }, + modelOverride: 'claude-3-5-sonnet-20241022', + }); + const res = await adapter.infer(ctx); + expect(res.status).toBe(200); + + const [url, init] = fetchFn.mock.calls[0] as [string, RequestInit]; + expect(url).toBe('https://api.anthropic.com/v1/messages'); + const headers = init.headers as Record; + expect(headers['x-api-key']).toBe('test-key'); + expect(headers['anthropic-version']).toBeDefined(); + + const sent = JSON.parse(init.body as string) as { + model: string; system: string; messages: Array<{ role: string; content: string }>; max_tokens: number; + }; + expect(sent.model).toBe('claude-3-5-sonnet-20241022'); + expect(sent.system).toBe('be nice'); + expect(sent.messages).toEqual([{ role: 'user', content: 'hi' }]); + expect(sent.max_tokens).toBe(1024); // default + + // Response shape: OpenAI chat.completion + const body = res.body as { choices: Array<{ message: { content: string }; finish_reason: string }>; usage: { total_tokens: number } }; + expect(body.choices[0]!.message.content).toBe('howdy'); + expect(body.choices[0]!.finish_reason).toBe('stop'); + expect(body.usage.total_tokens).toBe(7); + }); + + it('infer: returns a synthetic error body on non-2xx', async () => { + const fetchFn = vi.fn(async () => new Response('boom', { status: 500 })); + const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch }); + const res = await adapter.infer(makeCtx({ body: { model: '', messages: [{ role: 'user', content: 'x' }] } })); + expect(res.status).toBe(500); + const body = res.body as { error: { message: string } }; + expect(body.error.message).toMatch(/HTTP 500/); + }); + + it('stream: translates anthropic event stream into OpenAI chunks', async () => { + const events = [ + 'event: message_start\ndata: {"type":"message_start","message":{"id":"m","content":[]}}', + 'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"he"}}', + 'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"llo"}}', + 'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn"}}', + 'event: message_stop\ndata: {"type":"message_stop"}', + ]; + const fetchFn = vi.fn(async () => sseResponse(events)); + const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch }); + const ctx = makeCtx({ body: { model: '', messages: [{ role: 'user', content: 'hi' }], stream: true } }); + + const chunks: { data: string; done?: boolean }[] = []; + for await (const c of adapter.stream(ctx)) chunks.push(c); + + // Expect: role-prime, two text deltas, finish-reason, [DONE] + expect(chunks[chunks.length - 1]?.data).toBe('[DONE]'); + expect(chunks[chunks.length - 1]?.done).toBe(true); + + // First chunk is the role-prime (role: assistant, content: ''). + const first = JSON.parse(chunks[0]!.data) as { choices: [{ delta: { role: string; content: string } }] }; + expect(first.choices[0]!.delta.role).toBe('assistant'); + + // Next two chunks carry the text. + const d1 = JSON.parse(chunks[1]!.data) as { choices: [{ delta: { content: string } }] }; + const d2 = JSON.parse(chunks[2]!.data) as { choices: [{ delta: { content: string } }] }; + expect(d1.choices[0]!.delta.content).toBe('he'); + expect(d2.choices[0]!.delta.content).toBe('llo'); + + // Finish-reason chunk. + const stopped = JSON.parse(chunks[3]!.data) as { choices: [{ finish_reason: string }] }; + expect(stopped.choices[0]!.finish_reason).toBe('stop'); + }); +}); + +describe('LlmAdapterRegistry', () => { + it('returns the right adapter kind for each type', () => { + const reg = new LlmAdapterRegistry(); + expect(reg.get('openai').kind).toBe('openai'); + expect(reg.get('vllm').kind).toBe('vllm'); + expect(reg.get('deepseek').kind).toBe('deepseek'); + expect(reg.get('ollama').kind).toBe('ollama'); + expect(reg.get('anthropic').kind).toBe('anthropic'); + }); + + it('caches adapters between calls', () => { + const reg = new LlmAdapterRegistry(); + const a = reg.get('openai'); + const b = reg.get('openai'); + expect(a).toBe(b); + }); + + it('rejects unsupported providers (gemini-cli is deferred)', () => { + const reg = new LlmAdapterRegistry(); + expect(() => reg.get('gemini-cli')).toThrow(UnsupportedProviderError); + expect(() => reg.get('bogus')).toThrow(UnsupportedProviderError); + }); +}); diff --git a/src/mcpd/tests/llm-infer-route.test.ts b/src/mcpd/tests/llm-infer-route.test.ts new file mode 100644 index 0000000..20e5339 --- /dev/null +++ b/src/mcpd/tests/llm-infer-route.test.ts @@ -0,0 +1,208 @@ +import { describe, it, expect, vi, afterEach } from 'vitest'; +import Fastify from 'fastify'; +import type { FastifyInstance } from 'fastify'; +import { registerLlmInferRoutes } from '../src/routes/llm-infer.js'; +import { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js'; +import { errorHandler } from '../src/middleware/error-handler.js'; +import type { LlmView } from '../src/services/llm.service.js'; +import { NotFoundError } from '../src/services/mcp-server.service.js'; + +let app: FastifyInstance; + +function makeLlmView(overrides: Partial = {}): LlmView { + return { + id: 'llm-1', + name: 'claude', + type: 'anthropic', + model: 'claude-3-5-sonnet-20241022', + url: '', + tier: 'heavy', + description: '', + apiKeyRef: { name: 'anthropic-key', key: 'token' }, + extraConfig: {}, + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + }; +} + +afterEach(async () => { + if (app) await app.close(); +}); + +function sseResponse(events: string[]): Response { + const body = events.join('\n\n') + '\n\n'; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(body)); + controller.close(); + }, + }); + return new Response(stream, { status: 200 }); +} + +interface LlmServiceLike { + getByName: (name: string) => Promise; + resolveApiKey: (name: string) => Promise; +} + +async function setupApp( + llmService: LlmServiceLike, + adapters: LlmAdapterRegistry, + onInferenceEvent?: Parameters[1]['onInferenceEvent'], +): Promise { + app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + const deps: Parameters[1] = { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + llmService: llmService as any, + adapters, + }; + if (onInferenceEvent !== undefined) deps.onInferenceEvent = onInferenceEvent; + registerLlmInferRoutes(app, deps); + await app.ready(); + return app; +} + +describe('POST /api/v1/llms/:name/infer', () => { + it('returns 404 when the Llm does not exist', async () => { + const svc: LlmServiceLike = { + getByName: async () => { throw new NotFoundError('Llm not found: missing'); }, + resolveApiKey: async () => '', + }; + await setupApp(svc, new LlmAdapterRegistry()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/missing/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(404); + }); + + it('returns 400 when messages is missing', async () => { + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ apiKeyRef: null }), + resolveApiKey: async () => '', + }; + await setupApp(svc, new LlmAdapterRegistry()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/claude/infer', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); + + it('dispatches non-streaming to the adapter and returns its JSON', async () => { + const fetchFn = vi.fn(async () => new Response(JSON.stringify({ + id: 'msg_1', model: 'claude-3-5-sonnet-20241022', role: 'assistant', + content: [{ type: 'text', text: 'hello' }], + stop_reason: 'end_turn', + usage: { input_tokens: 1, output_tokens: 1 }, + }), { status: 200 })); + const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch }); + const svc: LlmServiceLike = { + getByName: async () => makeLlmView(), + resolveApiKey: async () => 'sk-ant-xyz', + }; + const events: unknown[] = []; + await setupApp(svc, adapters, (e) => events.push(e)); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/claude/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(200); + const body = res.json<{ choices: Array<{ message: { content: string } }> }>(); + expect(body.choices[0]!.message.content).toBe('hello'); + + // Audit event emitted + expect(events).toHaveLength(1); + expect((events[0] as { kind: string; llmName: string; status: number }).kind).toBe('llm_inference_call'); + expect((events[0] as { llmName: string }).llmName).toBe('claude'); + expect((events[0] as { streaming: boolean }).streaming).toBe(false); + expect((events[0] as { status: number }).status).toBe(200); + }); + + it('500s when apiKey resolution fails', async () => { + const adapters = new LlmAdapterRegistry(); + const svc: LlmServiceLike = { + getByName: async () => makeLlmView(), + resolveApiKey: async () => { throw new Error('secret not found'); }, + }; + await setupApp(svc, adapters); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/claude/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(500); + expect(res.json<{ error: string }>().error).toMatch(/secret not found/); + }); + + it('skips apiKey resolution when the Llm has no apiKeyRef', async () => { + const fetchFn = vi.fn(async () => new Response(JSON.stringify({ id: 'x', choices: [] }), { status: 200 })); + const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch }); + const resolveSpy = vi.fn(); + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ type: 'ollama', url: 'http://ollama:11434', apiKeyRef: null }), + resolveApiKey: resolveSpy as unknown as LlmServiceLike['resolveApiKey'], + }; + await setupApp(svc, adapters); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/ollama-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(200); + expect(resolveSpy).not.toHaveBeenCalled(); + }); + + it('streams SSE chunks for stream: true', async () => { + const fetchFn = vi.fn(async () => sseResponse([ + 'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"hi"}}', + 'event: message_stop\ndata: {"type":"message_stop"}', + ])); + const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch }); + const svc: LlmServiceLike = { + getByName: async () => makeLlmView(), + resolveApiKey: async () => 'sk-ant-xyz', + }; + const events: Array<{ streaming: boolean; status: number }> = []; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await setupApp(svc, adapters, ((e: any) => events.push(e)) as any); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/claude/infer', + payload: { messages: [{ role: 'user', content: 'hi' }], stream: true }, + }); + expect(res.statusCode).toBe(200); + expect(res.body).toContain('data:'); + expect(res.body).toContain('[DONE]'); + expect(events).toHaveLength(1); + expect(events[0]!.streaming).toBe(true); + }); + + it('502s on adapter errors (non-streaming)', async () => { + const fetchFn = vi.fn(async () => { throw new Error('upstream down'); }); + const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch }); + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ type: 'openai', url: 'http://example', apiKeyRef: null }), + resolveApiKey: async () => '', + }; + await setupApp(svc, adapters); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/x/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(502); + expect(res.json<{ error: string }>().error).toMatch(/upstream down/); + }); +}); -- 2.49.1