feat(mcpd): inference proxy — POST /api/v1/llms/:name/infer

Why: the point of the Llm resource (Phase 1) is that credentials never leave
the server. This lands the proxy: clients POST OpenAI chat/completions to
mcpd, mcpd attaches the provider API key server-side, and the response
streams back as OpenAI-format SSE.

Design:
- Wire format client-side is always OpenAI chat/completions — every existing
  SDK speaks it. Adapters translate on the provider side.
- `openai | vllm | deepseek | ollama` → pure passthrough (they already speak
  OpenAI). `anthropic` → translator to/from Anthropic Messages API
  (system-string extraction, content-block flattening, SSE event remap).
- Plain fetch; no @anthropic-ai/sdk dep. Consistent with the OpenBao driver
  shape and keeps the proxy layer thin.
- `gemini-cli` intentionally rejected — subprocess providers need extra
  lifecycle plumbing; deferred to a follow-up.
- Streaming: adapters yield `StreamingChunk`s; the route frames them as
  `data: <json>\n\n` + terminal `data: [DONE]\n\n` so any OpenAI client
  works unchanged.

RBAC:
- New URL special-case in mapUrlToPermission: `POST /api/v1/llms/:name/infer`
  → `run:llms:<name>` (not the default create:llms). Users need an explicit
  `{role: 'run', resource: 'llms', [name: X]}` binding to call infer.
- Possession of `edit:llms` does NOT imply `run` — keeps catalogue
  management separate from spend.

Audit: route emits an `llm_inference_call` event per request (llm name,
model, user/tokenSha, streaming, duration, status). main.ts wires it to the
structured logger for now; hook is in place for a richer audit sink later.

Tests:
- 11 adapter tests (passthrough POST shape + default URLs + no-auth ollama +
  SSE forwarding; anthropic translate request/response + non-2xx wrap + SSE
  event translation; registry dispatch + caching + unsupported-provider).
- 7 route tests (404, 400, non-streaming dispatch + audit, apiKey failure,
  null apiKeyRef path, streaming SSE output, 502 on adapter error).
- Full suite 1830/1830 (+18 from Phase 1's 1812). TypeScript clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-04-18 22:43:55 +01:00
parent 6ff90a8228
commit 23f53a0798
8 changed files with 1079 additions and 0 deletions

View File

@@ -0,0 +1,256 @@
/**
* Anthropic adapter — translates between OpenAI chat/completions format and
* the Anthropic Messages API (`POST /v1/messages`).
*
* Key differences we translate:
* - OpenAI `role: 'system'` messages become a top-level `system` string.
* - Anthropic returns `content: [{ type: 'text', text }]` — we join into
* OpenAI's `content: "…"` string.
* - Streaming: Anthropic emits a sequence of
* `message_start / content_block_{start,delta,stop} / message_delta /
* message_stop` events. We translate those to OpenAI
* `chat.completion.chunk` deltas.
*
* This adapter implements the subset needed for plain-text chat — tool-use
* translation is intentionally left out for this phase; agents that need tool
* calling should target an OpenAI-compatible provider until the translator
* covers it.
*/
import type {
LlmAdapter,
InferContext,
NonStreamingResult,
StreamingChunk,
AdapterDeps,
OpenAiMessage,
} from '../types.js';
const DEFAULT_ANTHROPIC_URL = 'https://api.anthropic.com';
const ANTHROPIC_VERSION = '2023-06-01';
interface AnthropicMessageResponse {
id: string;
model: string;
role: 'assistant';
content: Array<{ type: 'text'; text: string } | { type: string; [k: string]: unknown }>;
stop_reason?: string;
usage?: { input_tokens: number; output_tokens: number };
}
export class AnthropicAdapter implements LlmAdapter {
readonly kind = 'anthropic';
private readonly fetchImpl: typeof globalThis.fetch;
constructor(deps: AdapterDeps = {}) {
this.fetchImpl = deps.fetch ?? globalThis.fetch;
}
async infer(ctx: InferContext): Promise<NonStreamingResult> {
const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, '');
const body = this.toAnthropicRequest(ctx, false);
const res = await this.fetchImpl(`${url}/v1/messages`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
return {
status: res.status,
body: { error: { message: `anthropic: HTTP ${String(res.status)} ${text}` } },
};
}
const anth = await res.json() as AnthropicMessageResponse;
return { status: 200, body: this.toOpenAiResponse(anth) };
}
async *stream(ctx: InferContext): AsyncGenerator<StreamingChunk> {
const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, '');
const body = this.toAnthropicRequest(ctx, true);
const res = await this.fetchImpl(`${url}/v1/messages`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok || res.body === null) {
const text = await res.text().catch(() => '');
throw new Error(`anthropic stream: HTTP ${String(res.status)} ${text}`);
}
const id = `chatcmpl-${cryptoNonce()}`;
const model = body.model;
const created = Math.floor(Date.now() / 1000);
// Parse Anthropic SSE. Each event is `event: <name>\ndata: <json>\n\n`.
const decoder = new TextDecoder();
let buf = '';
const reader = res.body.getReader();
let emittedFirst = false;
const baseChunk = (delta: Record<string, unknown>, finishReason?: string): string => {
const chunk = {
id,
object: 'chat.completion.chunk',
created,
model,
choices: [{
index: 0,
delta,
finish_reason: finishReason ?? null,
}],
};
return JSON.stringify(chunk);
};
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const rawEvent = buf.slice(0, idx);
buf = buf.slice(idx + 2);
const parsed = parseSseEvent(rawEvent);
if (parsed === null) continue;
const { event, data } = parsed;
if (event === 'content_block_delta') {
const textDelta = (data as { delta?: { type?: string; text?: string } }).delta;
if (textDelta?.type === 'text_delta' && typeof textDelta.text === 'string') {
if (!emittedFirst) {
yield { data: baseChunk({ role: 'assistant', content: '' }) };
emittedFirst = true;
}
yield { data: baseChunk({ content: textDelta.text }) };
}
} else if (event === 'message_delta') {
const stopReason = (data as { delta?: { stop_reason?: string } }).delta?.stop_reason;
if (typeof stopReason === 'string') {
yield { data: baseChunk({}, mapStopReason(stopReason)) };
}
} else if (event === 'message_stop') {
yield { data: '[DONE]', done: true };
return;
} else if (event === 'error') {
throw new Error(`anthropic stream error: ${JSON.stringify(data)}`);
}
}
}
} finally {
reader.releaseLock();
}
// Anthropic closed without message_stop — give consumer a clean end.
yield { data: '[DONE]', done: true };
}
private headers(ctx: InferContext): Record<string, string> {
return {
'Content-Type': 'application/json',
'x-api-key': ctx.apiKey,
'anthropic-version': ANTHROPIC_VERSION,
};
}
/** Translate the OpenAI request to the Anthropic Messages shape. */
private toAnthropicRequest(ctx: InferContext, stream: boolean): {
model: string;
max_tokens: number;
messages: Array<{ role: 'user' | 'assistant'; content: string }>;
system?: string;
stream?: boolean;
temperature?: number;
top_p?: number;
stop_sequences?: string[];
} {
const { body } = ctx;
const systemParts: string[] = [];
const messages: Array<{ role: 'user' | 'assistant'; content: string }> = [];
for (const msg of body.messages) {
const text = normaliseContent(msg);
if (msg.role === 'system') {
systemParts.push(text);
} else if (msg.role === 'user' || msg.role === 'assistant') {
messages.push({ role: msg.role, content: text });
}
// `tool` role messages are dropped — tool translation is out of scope
// for this phase.
}
const out: ReturnType<typeof this.toAnthropicRequest> = {
model: body.model !== '' ? body.model : ctx.modelOverride,
max_tokens: typeof body.max_tokens === 'number' ? body.max_tokens : 1024,
messages,
};
if (systemParts.length > 0) out.system = systemParts.join('\n\n');
if (stream) out.stream = true;
if (typeof body.temperature === 'number') out.temperature = body.temperature;
if (typeof body.top_p === 'number') out.top_p = body.top_p;
if (body.stop !== undefined) {
out.stop_sequences = Array.isArray(body.stop) ? body.stop : [body.stop];
}
return out;
}
private toOpenAiResponse(anth: AnthropicMessageResponse): Record<string, unknown> {
const text = anth.content
.map((c) => (c.type === 'text' && typeof (c as { text?: unknown }).text === 'string'
? (c as { text: string }).text
: ''))
.join('');
return {
id: `chatcmpl-${anth.id}`,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
model: anth.model,
choices: [{
index: 0,
message: { role: 'assistant', content: text },
finish_reason: mapStopReason(anth.stop_reason ?? 'end_turn'),
}],
usage: anth.usage ? {
prompt_tokens: anth.usage.input_tokens,
completion_tokens: anth.usage.output_tokens,
total_tokens: anth.usage.input_tokens + anth.usage.output_tokens,
} : undefined,
};
}
}
function normaliseContent(msg: OpenAiMessage): string {
if (typeof msg.content === 'string') return msg.content;
return msg.content
.map((part) => (typeof part.text === 'string' ? part.text : ''))
.join('');
}
function mapStopReason(r: string): string {
// Anthropic → OpenAI finish_reason
if (r === 'end_turn' || r === 'stop_sequence') return 'stop';
if (r === 'max_tokens') return 'length';
if (r === 'tool_use') return 'tool_calls';
return r;
}
function parseSseEvent(raw: string): { event: string; data: unknown } | null {
let event = '';
let dataLine = '';
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) event = line.slice(6).trim();
else if (line.startsWith('data:')) dataLine += line.slice(5).trim();
}
if (dataLine === '') return null;
try {
return { event, data: JSON.parse(dataLine) as unknown };
} catch {
return null;
}
}
function cryptoNonce(): string {
// Not security-sensitive — just a short randomish id.
return Math.random().toString(36).slice(2, 10);
}