Compare commits

...

2 Commits

Author SHA1 Message Date
Michal
4d8ee23d0e feat(mcplocal): RBAC-bounded vllm-managed failover + name-based llm lookup
Why: when mcpd's inference proxy is unreachable, clients with a local
vllm-managed provider should be able to substitute — but only if they still
have view permission on the centralized Llm. Otherwise revoking an Llm
wouldn't actually stop a misbehaving client.

Infrastructure (the agent + mcplocal HTTP-mode wire-up will land separately
when those clients pivot to mcpd's proxy):

- LlmProviderFileEntry gains optional `failoverFor: <central llm name>`. The
  entry is otherwise the same local provider it always was; the new field
  just declares which central Llm it can substitute for.
- ProviderRegistry tracks a failover map (registerFailover / getFailoverFor /
  listFailovers). Unregister removes any failover entry pointing at the
  removed provider so we don't end up with dangling references.
- New FailoverRouter wraps a primary inference call. On primary failure: if
  a local provider is registered for the Llm, HEAD-probe `mcpd /api/v1/llms/
  :name` with the caller's bearer to verify view permission, then either
  invoke the local provider (allowed) or re-throw the primary error (403,
  401, network unreachable, anything else — all fail-closed).
- Server: GET /api/v1/llms/:idOrName accepts both CUID and human name. Lets
  FailoverRouter probe by name without a separate id-resolution call. HEAD
  derives automatically from GET in Fastify, which runs the same RBAC hook
  and drops the body — exactly what the probe needs.

Tests: 11 failover unit tests (registry map, decision flow, fail-closed for
forbidden + unreachable, checkAuth status mapping) + 4 new route tests
(name lookup, HEAD existing/missing). Full suite 1844/1844 (+14 from Phase
2's 1830). TypeScript clean across mcpd + mcplocal.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 13:05:43 +01:00
Michal
23f53a0798 feat(mcpd): inference proxy — POST /api/v1/llms/:name/infer
Why: the point of the Llm resource (Phase 1) is that credentials never leave
the server. This lands the proxy: clients POST OpenAI chat/completions to
mcpd, mcpd attaches the provider API key server-side, and the response
streams back as OpenAI-format SSE.

Design:
- Wire format client-side is always OpenAI chat/completions — every existing
  SDK speaks it. Adapters translate on the provider side.
- `openai | vllm | deepseek | ollama` → pure passthrough (they already speak
  OpenAI). `anthropic` → translator to/from Anthropic Messages API
  (system-string extraction, content-block flattening, SSE event remap).
- Plain fetch; no @anthropic-ai/sdk dep. Consistent with the OpenBao driver
  shape and keeps the proxy layer thin.
- `gemini-cli` intentionally rejected — subprocess providers need extra
  lifecycle plumbing; deferred to a follow-up.
- Streaming: adapters yield `StreamingChunk`s; the route frames them as
  `data: <json>\n\n` + terminal `data: [DONE]\n\n` so any OpenAI client
  works unchanged.

RBAC:
- New URL special-case in mapUrlToPermission: `POST /api/v1/llms/:name/infer`
  → `run:llms:<name>` (not the default create:llms). Users need an explicit
  `{role: 'run', resource: 'llms', [name: X]}` binding to call infer.
- Possession of `edit:llms` does NOT imply `run` — keeps catalogue
  management separate from spend.

Audit: route emits an `llm_inference_call` event per request (llm name,
model, user/tokenSha, streaming, duration, status). main.ts wires it to the
structured logger for now; hook is in place for a richer audit sink later.

Tests:
- 11 adapter tests (passthrough POST shape + default URLs + no-auth ollama +
  SSE forwarding; anthropic translate request/response + non-2xx wrap + SSE
  event translation; registry dispatch + caching + unsupported-provider).
- 7 route tests (404, 400, non-streaming dispatch + audit, apiKey failure,
  null apiKeyRef path, streaming SSE output, 502 on adapter error).
- Full suite 1830/1830 (+18 from Phase 1's 1812). TypeScript clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 22:43:55 +01:00
15 changed files with 1434 additions and 1 deletions

View File

@@ -28,7 +28,9 @@ import { registerSecretBackendRoutes } from './routes/secret-backends.js';
import { registerSecretMigrateRoutes } from './routes/secret-migrate.js';
import { LlmRepository } from './repositories/llm.repository.js';
import { LlmService } from './services/llm.service.js';
import { LlmAdapterRegistry } from './services/llm/dispatcher.js';
import { registerLlmRoutes } from './routes/llms.js';
import { registerLlmInferRoutes } from './routes/llm-infer.js';
import { PromptRepository } from './repositories/prompt.repository.js';
import { PromptRequestRepository } from './repositories/prompt-request.repository.js';
import { bootstrapSystemProject } from './bootstrap/system-project.js';
@@ -105,6 +107,12 @@ function mapUrlToPermission(method: string, url: string): PermissionCheck {
// /api/v1/secrets/migrate is a bulk cross-backend operation — treat as op, not a plain secret write.
if (url.startsWith('/api/v1/secrets/migrate')) return { kind: 'operation', operation: 'migrate-secrets' };
// /api/v1/llms/:name/infer → `run:llms:<name>` (not the default create:llms).
const inferMatch = url.match(/^\/api\/v1\/llms\/([^/?]+)\/infer/);
if (inferMatch?.[1]) {
return { kind: 'resource', resource: 'llms', action: 'run', resourceName: inferMatch[1] };
}
const resourceMap: Record<string, string | undefined> = {
'servers': 'servers',
'instances': 'instances',
@@ -334,6 +342,7 @@ async function main(): Promise<void> {
const secretService = new SecretService(secretRepo, secretBackendService);
const secretMigrateService = new SecretMigrateService(secretRepo, secretBackendService);
const llmService = new LlmService(llmRepo, secretService);
const llmAdapters = new LlmAdapterRegistry();
const instanceService = new InstanceService(instanceRepo, serverRepo, orchestrator, secretService);
serverService.setInstanceService(instanceService);
const projectService = new ProjectService(projectRepo, serverRepo);
@@ -475,6 +484,23 @@ async function main(): Promise<void> {
registerSecretBackendRoutes(app, secretBackendService);
registerSecretMigrateRoutes(app, secretMigrateService);
registerLlmRoutes(app, llmService);
registerLlmInferRoutes(app, {
llmService,
adapters: llmAdapters,
onInferenceEvent: (event) => {
app.log.info({
event: 'llm_inference_call',
llm: event.llmName,
model: event.model,
type: event.type,
userId: event.userId,
tokenSha: event.tokenSha,
streaming: event.streaming,
durationMs: event.durationMs,
status: event.status,
});
},
});
registerInstanceRoutes(app, instanceService);
registerProjectRoutes(app, projectService);
registerAuditLogRoutes(app, auditLogService);

View File

@@ -0,0 +1,145 @@
/**
* POST /api/v1/llms/:name/infer
*
* OpenAI-compatible chat completions endpoint. The RBAC check runs in the
* global hook — this URL maps to `run:llms:<name>`, not the default
* `create:llms`. See `main.ts:mapUrlToPermission`.
*
* Non-streaming: resolves the Llm, dispatches to the right provider adapter,
* returns the OpenAI chat.completion JSON.
*
* Streaming (`stream: true`): pipes adapter-emitted chunks back as
* `text/event-stream`. Adapters translate provider-native SSE into OpenAI
* `chat.completion.chunk`s so clients can use any OpenAI SDK unchanged.
*/
import type { FastifyInstance, FastifyReply } from 'fastify';
import type { LlmService } from '../services/llm.service.js';
import type { LlmAdapterRegistry } from '../services/llm/dispatcher.js';
import { NotFoundError } from '../services/mcp-server.service.js';
import type { OpenAiChatRequest, InferContext } from '../services/llm/types.js';
export interface LlmInferDeps {
llmService: LlmService;
adapters: LlmAdapterRegistry;
/** Optional hook to emit audit events — consumer may ignore. */
onInferenceEvent?: (event: InferenceAuditEvent) => void;
}
export interface InferenceAuditEvent {
kind: 'llm_inference_call';
llmName: string;
model: string;
type: string;
userId?: string | undefined;
tokenSha?: string | undefined;
streaming: boolean;
durationMs: number;
status: number;
}
export function registerLlmInferRoutes(
app: FastifyInstance,
deps: LlmInferDeps,
): void {
app.post<{ Params: { name: string }; Body: OpenAiChatRequest }>(
'/api/v1/llms/:name/infer',
async (request, reply) => {
const started = Date.now();
let llm;
try {
llm = await deps.llmService.getByName(request.params.name);
} catch (err) {
if (err instanceof NotFoundError) {
reply.code(404);
return { error: err.message };
}
throw err;
}
const body = (request.body ?? {}) as OpenAiChatRequest;
if (!body.messages || body.messages.length === 0) {
reply.code(400);
return { error: 'messages is required' };
}
// Resolve API key (may be empty string for providers that don't take one).
let apiKey = '';
if (llm.apiKeyRef !== null) {
try {
apiKey = await deps.llmService.resolveApiKey(llm.name);
} catch (err) {
reply.code(500);
return { error: `Failed to resolve API key: ${err instanceof Error ? err.message : String(err)}` };
}
}
const ctx: InferContext = {
body,
modelOverride: llm.model,
apiKey,
url: llm.url,
extraConfig: llm.extraConfig,
};
const adapter = deps.adapters.get(llm.type);
const streaming = body.stream === true;
const audit = (status: number): void => {
if (deps.onInferenceEvent === undefined) return;
deps.onInferenceEvent({
kind: 'llm_inference_call',
llmName: llm.name,
model: llm.model,
type: llm.type,
userId: request.userId,
tokenSha: request.mcpToken?.tokenSha,
streaming,
durationMs: Date.now() - started,
status,
});
};
if (!streaming) {
try {
const result = await adapter.infer(ctx);
reply.code(result.status);
audit(result.status);
return result.body;
} catch (err) {
audit(502);
reply.code(502);
return { error: err instanceof Error ? err.message : String(err) };
}
}
// Streaming path — set SSE headers and pipe chunks.
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
try {
for await (const chunk of adapter.stream(ctx)) {
writeSseChunk(reply, chunk.data);
if (chunk.done === true) break;
}
audit(200);
} catch (err) {
const payload = JSON.stringify({
error: { message: err instanceof Error ? err.message : String(err) },
});
writeSseChunk(reply, payload);
writeSseChunk(reply, '[DONE]');
audit(502);
} finally {
reply.raw.end();
}
return reply;
},
);
}
function writeSseChunk(reply: FastifyReply, data: string): void {
reply.raw.write(`data: ${data}\n\n`);
}

View File

@@ -10,9 +10,12 @@ export function registerLlmRoutes(
return service.list();
});
// Accepts either CUID or human name. Used both by the CLI (which usually
// resolves to CUID first) and by FailoverRouter's RBAC pre-check (which
// hands over the user-facing name to avoid an extra round-trip).
app.get<{ Params: { id: string } }>('/api/v1/llms/:id', async (request, reply) => {
try {
return await service.getById(request.params.id);
return await getByIdOrName(service, request.params.id);
} catch (err) {
if (err instanceof NotFoundError) {
reply.code(404);
@@ -22,6 +25,10 @@ export function registerLlmRoutes(
}
});
// No explicit HEAD handler: Fastify auto-derives HEAD from GET, which runs
// the same RBAC hook + lookup and drops the body. That's exactly what
// FailoverRouter wants for its "can the caller still view this Llm?" probe.
app.post('/api/v1/llms', async (request, reply) => {
try {
const row = await service.create(request.body);
@@ -62,3 +69,17 @@ export function registerLlmRoutes(
}
});
}
const CUID_RE = /^c[a-z0-9]{24}/i;
/**
* Look up by CUID first; if the input doesn't look like one, fall back to
* findByName. Lets the same URL serve both `mcpctl describe llm <name>` and
* the FailoverRouter's name-based RBAC check.
*/
async function getByIdOrName(service: LlmService, idOrName: string) {
if (CUID_RE.test(idOrName)) {
return service.getById(idOrName);
}
return service.getByName(idOrName);
}

View File

@@ -0,0 +1,256 @@
/**
* Anthropic adapter — translates between OpenAI chat/completions format and
* the Anthropic Messages API (`POST /v1/messages`).
*
* Key differences we translate:
* - OpenAI `role: 'system'` messages become a top-level `system` string.
* - Anthropic returns `content: [{ type: 'text', text }]` — we join into
* OpenAI's `content: "…"` string.
* - Streaming: Anthropic emits a sequence of
* `message_start / content_block_{start,delta,stop} / message_delta /
* message_stop` events. We translate those to OpenAI
* `chat.completion.chunk` deltas.
*
* This adapter implements the subset needed for plain-text chat — tool-use
* translation is intentionally left out for this phase; agents that need tool
* calling should target an OpenAI-compatible provider until the translator
* covers it.
*/
import type {
LlmAdapter,
InferContext,
NonStreamingResult,
StreamingChunk,
AdapterDeps,
OpenAiMessage,
} from '../types.js';
const DEFAULT_ANTHROPIC_URL = 'https://api.anthropic.com';
const ANTHROPIC_VERSION = '2023-06-01';
interface AnthropicMessageResponse {
id: string;
model: string;
role: 'assistant';
content: Array<{ type: 'text'; text: string } | { type: string; [k: string]: unknown }>;
stop_reason?: string;
usage?: { input_tokens: number; output_tokens: number };
}
export class AnthropicAdapter implements LlmAdapter {
readonly kind = 'anthropic';
private readonly fetchImpl: typeof globalThis.fetch;
constructor(deps: AdapterDeps = {}) {
this.fetchImpl = deps.fetch ?? globalThis.fetch;
}
async infer(ctx: InferContext): Promise<NonStreamingResult> {
const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, '');
const body = this.toAnthropicRequest(ctx, false);
const res = await this.fetchImpl(`${url}/v1/messages`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
return {
status: res.status,
body: { error: { message: `anthropic: HTTP ${String(res.status)} ${text}` } },
};
}
const anth = await res.json() as AnthropicMessageResponse;
return { status: 200, body: this.toOpenAiResponse(anth) };
}
async *stream(ctx: InferContext): AsyncGenerator<StreamingChunk> {
const url = (ctx.url !== '' ? ctx.url : DEFAULT_ANTHROPIC_URL).replace(/\/+$/, '');
const body = this.toAnthropicRequest(ctx, true);
const res = await this.fetchImpl(`${url}/v1/messages`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok || res.body === null) {
const text = await res.text().catch(() => '');
throw new Error(`anthropic stream: HTTP ${String(res.status)} ${text}`);
}
const id = `chatcmpl-${cryptoNonce()}`;
const model = body.model;
const created = Math.floor(Date.now() / 1000);
// Parse Anthropic SSE. Each event is `event: <name>\ndata: <json>\n\n`.
const decoder = new TextDecoder();
let buf = '';
const reader = res.body.getReader();
let emittedFirst = false;
const baseChunk = (delta: Record<string, unknown>, finishReason?: string): string => {
const chunk = {
id,
object: 'chat.completion.chunk',
created,
model,
choices: [{
index: 0,
delta,
finish_reason: finishReason ?? null,
}],
};
return JSON.stringify(chunk);
};
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const rawEvent = buf.slice(0, idx);
buf = buf.slice(idx + 2);
const parsed = parseSseEvent(rawEvent);
if (parsed === null) continue;
const { event, data } = parsed;
if (event === 'content_block_delta') {
const textDelta = (data as { delta?: { type?: string; text?: string } }).delta;
if (textDelta?.type === 'text_delta' && typeof textDelta.text === 'string') {
if (!emittedFirst) {
yield { data: baseChunk({ role: 'assistant', content: '' }) };
emittedFirst = true;
}
yield { data: baseChunk({ content: textDelta.text }) };
}
} else if (event === 'message_delta') {
const stopReason = (data as { delta?: { stop_reason?: string } }).delta?.stop_reason;
if (typeof stopReason === 'string') {
yield { data: baseChunk({}, mapStopReason(stopReason)) };
}
} else if (event === 'message_stop') {
yield { data: '[DONE]', done: true };
return;
} else if (event === 'error') {
throw new Error(`anthropic stream error: ${JSON.stringify(data)}`);
}
}
}
} finally {
reader.releaseLock();
}
// Anthropic closed without message_stop — give consumer a clean end.
yield { data: '[DONE]', done: true };
}
private headers(ctx: InferContext): Record<string, string> {
return {
'Content-Type': 'application/json',
'x-api-key': ctx.apiKey,
'anthropic-version': ANTHROPIC_VERSION,
};
}
/** Translate the OpenAI request to the Anthropic Messages shape. */
private toAnthropicRequest(ctx: InferContext, stream: boolean): {
model: string;
max_tokens: number;
messages: Array<{ role: 'user' | 'assistant'; content: string }>;
system?: string;
stream?: boolean;
temperature?: number;
top_p?: number;
stop_sequences?: string[];
} {
const { body } = ctx;
const systemParts: string[] = [];
const messages: Array<{ role: 'user' | 'assistant'; content: string }> = [];
for (const msg of body.messages) {
const text = normaliseContent(msg);
if (msg.role === 'system') {
systemParts.push(text);
} else if (msg.role === 'user' || msg.role === 'assistant') {
messages.push({ role: msg.role, content: text });
}
// `tool` role messages are dropped — tool translation is out of scope
// for this phase.
}
const out: ReturnType<typeof this.toAnthropicRequest> = {
model: body.model !== '' ? body.model : ctx.modelOverride,
max_tokens: typeof body.max_tokens === 'number' ? body.max_tokens : 1024,
messages,
};
if (systemParts.length > 0) out.system = systemParts.join('\n\n');
if (stream) out.stream = true;
if (typeof body.temperature === 'number') out.temperature = body.temperature;
if (typeof body.top_p === 'number') out.top_p = body.top_p;
if (body.stop !== undefined) {
out.stop_sequences = Array.isArray(body.stop) ? body.stop : [body.stop];
}
return out;
}
private toOpenAiResponse(anth: AnthropicMessageResponse): Record<string, unknown> {
const text = anth.content
.map((c) => (c.type === 'text' && typeof (c as { text?: unknown }).text === 'string'
? (c as { text: string }).text
: ''))
.join('');
return {
id: `chatcmpl-${anth.id}`,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
model: anth.model,
choices: [{
index: 0,
message: { role: 'assistant', content: text },
finish_reason: mapStopReason(anth.stop_reason ?? 'end_turn'),
}],
usage: anth.usage ? {
prompt_tokens: anth.usage.input_tokens,
completion_tokens: anth.usage.output_tokens,
total_tokens: anth.usage.input_tokens + anth.usage.output_tokens,
} : undefined,
};
}
}
function normaliseContent(msg: OpenAiMessage): string {
if (typeof msg.content === 'string') return msg.content;
return msg.content
.map((part) => (typeof part.text === 'string' ? part.text : ''))
.join('');
}
function mapStopReason(r: string): string {
// Anthropic → OpenAI finish_reason
if (r === 'end_turn' || r === 'stop_sequence') return 'stop';
if (r === 'max_tokens') return 'length';
if (r === 'tool_use') return 'tool_calls';
return r;
}
function parseSseEvent(raw: string): { event: string; data: unknown } | null {
let event = '';
let dataLine = '';
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) event = line.slice(6).trim();
else if (line.startsWith('data:')) dataLine += line.slice(5).trim();
}
if (dataLine === '') return null;
try {
return { event, data: JSON.parse(dataLine) as unknown };
} catch {
return null;
}
}
function cryptoNonce(): string {
// Not security-sensitive — just a short randomish id.
return Math.random().toString(36).slice(2, 10);
}

View File

@@ -0,0 +1,112 @@
/**
* OpenAI-passthrough adapter.
*
* Covers any provider that already speaks OpenAI chat/completions on the
* wire: `openai`, `vllm`, `deepseek`, `ollama` (with their openai-compatible
* endpoint enabled). The adapter forwards the request body verbatim and
* streams the response straight through — no wire translation.
*
* Defaults when `url` is empty:
* - openai → https://api.openai.com
* - deepseek → https://api.deepseek.com
* - vllm/ollama → must be configured; these have no canonical public URL.
*/
import type { LlmAdapter, InferContext, NonStreamingResult, StreamingChunk, AdapterDeps } from '../types.js';
const DEFAULT_URLS: Record<string, string> = {
openai: 'https://api.openai.com',
deepseek: 'https://api.deepseek.com',
};
export class OpenAiPassthroughAdapter implements LlmAdapter {
readonly kind: string;
private readonly fetchImpl: typeof globalThis.fetch;
constructor(kind: 'openai' | 'vllm' | 'deepseek' | 'ollama', deps: AdapterDeps = {}) {
this.kind = kind;
this.fetchImpl = deps.fetch ?? globalThis.fetch;
}
async infer(ctx: InferContext): Promise<NonStreamingResult> {
const url = this.endpointUrl(ctx.url);
const body = this.prepareBody(ctx, false);
const res = await this.fetchImpl(`${url}/v1/chat/completions`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
const json = await res.json() as unknown;
return { status: res.status, body: json };
}
async *stream(ctx: InferContext): AsyncGenerator<StreamingChunk> {
const url = this.endpointUrl(ctx.url);
const body = this.prepareBody(ctx, true);
const res = await this.fetchImpl(`${url}/v1/chat/completions`, {
method: 'POST',
headers: this.headers(ctx),
body: JSON.stringify(body),
});
if (!res.ok || res.body === null) {
const text = await res.text().catch(() => '');
throw new Error(`${this.kind} stream: HTTP ${String(res.status)} ${text}`);
}
// Re-frame the provider's SSE stream into our `StreamingChunk` shape.
// OpenAI-compat providers already emit `data: {...}` + `data: [DONE]` —
// we just unwrap the `data: ` prefix, forward payloads, and emit a
// single terminal `done` chunk so the consumer always gets one.
const decoder = new TextDecoder();
let buf = '';
const reader = res.body.getReader();
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const event = buf.slice(0, idx);
buf = buf.slice(idx + 2);
for (const line of event.split('\n')) {
if (!line.startsWith('data:')) continue;
const payload = line.slice(5).trim();
if (payload === '') continue;
if (payload === '[DONE]') {
yield { data: '[DONE]', done: true };
return;
}
yield { data: payload };
}
}
}
} finally {
reader.releaseLock();
}
// Provider closed without emitting [DONE] — give the consumer a clean end.
yield { data: '[DONE]', done: true };
}
private endpointUrl(url: string): string {
if (url !== '') return url.replace(/\/+$/, '');
const def = DEFAULT_URLS[this.kind];
if (def === undefined) {
throw new Error(`${this.kind}: url is required (no default endpoint for this provider)`);
}
return def;
}
private headers(ctx: InferContext): Record<string, string> {
const headers: Record<string, string> = { 'Content-Type': 'application/json' };
if (ctx.apiKey !== '') headers['Authorization'] = `Bearer ${ctx.apiKey}`;
return headers;
}
private prepareBody(ctx: InferContext, stream: boolean): Record<string, unknown> {
const out: Record<string, unknown> = { ...ctx.body };
if (out.model === undefined || out.model === '') out.model = ctx.modelOverride;
out.stream = stream;
return out;
}
}

View File

@@ -0,0 +1,52 @@
/**
* Adapter dispatcher for the inference proxy.
*
* `getAdapter(type)` returns the right adapter instance for an Llm's `type`
* column. Adapters are cached per-type — they carry no per-request state.
* The caller (the infer route) supplies the resolved API key + request body
* through `InferContext`, so a single adapter instance serves every Llm of
* that type.
*/
import type { LlmAdapter, AdapterDeps } from './types.js';
import { OpenAiPassthroughAdapter } from './adapters/openai-passthrough.js';
import { AnthropicAdapter } from './adapters/anthropic.js';
export class UnsupportedProviderError extends Error {
constructor(type: string) {
super(`Unsupported LLM provider: ${type}`);
this.name = 'UnsupportedProviderError';
}
}
export class LlmAdapterRegistry {
private readonly cache = new Map<string, LlmAdapter>();
constructor(private readonly deps: AdapterDeps = {}) {}
get(type: string): LlmAdapter {
const cached = this.cache.get(type);
if (cached !== undefined) return cached;
const adapter = this.build(type);
this.cache.set(type, adapter);
return adapter;
}
private build(type: string): LlmAdapter {
switch (type) {
case 'openai':
case 'vllm':
case 'deepseek':
case 'ollama':
return new OpenAiPassthroughAdapter(type, this.deps);
case 'anthropic':
return new AnthropicAdapter(this.deps);
case 'gemini-cli':
// Intentionally deferred — gemini-cli requires the binary on the mcpd
// pod filesystem and subprocess lifecycle management. Flagged as
// homelab-only in the plan; not landing in this phase.
throw new UnsupportedProviderError(`${type} (subprocess providers are not supported in the proxy yet)`);
default:
throw new UnsupportedProviderError(type);
}
}
}

View File

@@ -0,0 +1,70 @@
/**
* Shared types for the LLM inference proxy.
*
* The wire format on the mcpctl side is OpenAI's chat/completions v1 — it's
* the de-facto lingua franca and every client library already speaks it.
* Provider-specific adapters translate to/from that shape.
*/
export interface OpenAiMessage {
role: 'system' | 'user' | 'assistant' | 'tool';
content: string | Array<{ type: string; text?: string; [k: string]: unknown }>;
name?: string;
tool_call_id?: string;
tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>;
}
export interface OpenAiChatRequest {
model: string;
messages: OpenAiMessage[];
stream?: boolean;
temperature?: number;
max_tokens?: number;
top_p?: number;
stop?: string | string[];
tools?: Array<{ type: 'function'; function: { name: string; description?: string; parameters?: Record<string, unknown> } }>;
tool_choice?: unknown;
// Passthrough: unknown extras forwarded as-is.
[k: string]: unknown;
}
export interface InferContext {
/** Normalised OpenAI-format body. Adapters read/transform from here. */
body: OpenAiChatRequest;
/** The Llm row's `model` field, used when the request body has an empty model. */
modelOverride: string;
/** The resolved API key, or empty string for providers that don't take one. */
apiKey: string;
/** Target URL from the Llm row (may be empty for provider-default). */
url: string;
/** Arbitrary config from the Llm row (e.g. vllm gpu settings). */
extraConfig: Record<string, unknown>;
}
export interface NonStreamingResult {
status: number;
/** OpenAI chat.completion response body. */
body: unknown;
}
export interface StreamingChunk {
/** Raw SSE data payload. Consumer emits `data: <payload>\n\n`. */
data: string;
/** Mark the end of stream — consumer emits `data: [DONE]\n\n`. */
done?: boolean;
}
export interface LlmAdapter {
readonly kind: string;
/** Non-streaming request. Returns the final chat.completion body. */
infer(ctx: InferContext): Promise<NonStreamingResult>;
/**
* Streaming request. Yields OpenAI-format SSE chunks. Adapters translate
* provider-native stream formats into OpenAI `chat.completion.chunk`s.
*/
stream(ctx: InferContext): AsyncGenerator<StreamingChunk>;
}
export interface AdapterDeps {
fetch?: typeof globalThis.fetch;
}

View File

@@ -0,0 +1,210 @@
import { describe, it, expect, vi } from 'vitest';
import { OpenAiPassthroughAdapter } from '../src/services/llm/adapters/openai-passthrough.js';
import { AnthropicAdapter } from '../src/services/llm/adapters/anthropic.js';
import { LlmAdapterRegistry, UnsupportedProviderError } from '../src/services/llm/dispatcher.js';
import type { InferContext } from '../src/services/llm/types.js';
function mockFetch(responses: Array<{ match: RegExp; status: number; body?: unknown; text?: string }>): ReturnType<typeof vi.fn> {
return vi.fn(async (input: string | URL, _init?: RequestInit) => {
const url = String(input);
const match = responses.find((r) => r.match.test(url));
if (!match) throw new Error(`unexpected fetch: ${url}`);
const body = match.body !== undefined ? JSON.stringify(match.body) : (match.text ?? '');
return new Response(body, { status: match.status, headers: { 'Content-Type': 'application/json' } });
});
}
function makeCtx(overrides: Partial<InferContext> = {}): InferContext {
return {
body: { model: '', messages: [{ role: 'user', content: 'hello' }] },
modelOverride: 'default-model',
apiKey: 'test-key',
url: '',
extraConfig: {},
...overrides,
};
}
// Helper to build a streaming Response from SSE lines.
function sseResponse(events: string[]): Response {
const body = events.join('\n\n') + '\n\n';
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode(body));
controller.close();
},
});
return new Response(stream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } });
}
describe('OpenAiPassthroughAdapter', () => {
it('infer: POSTs to <url>/v1/chat/completions with Authorization + body', async () => {
const fetchFn = mockFetch([{
match: /\/v1\/chat\/completions$/,
status: 200,
body: { id: 'x', choices: [{ message: { role: 'assistant', content: 'hi' } }] },
}]);
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({ url: 'https://api.example.com' });
const res = await adapter.infer(ctx);
expect(res.status).toBe(200);
const [url, init] = fetchFn.mock.calls[0] as [string, RequestInit];
expect(url).toBe('https://api.example.com/v1/chat/completions');
expect(init.method).toBe('POST');
const headers = init.headers as Record<string, string>;
expect(headers['Authorization']).toBe('Bearer test-key');
const sent = JSON.parse(init.body as string) as { model: string; stream: boolean };
expect(sent.model).toBe('default-model'); // filled from modelOverride
expect(sent.stream).toBe(false);
});
it('infer: uses default URL for openai when url is empty', async () => {
const fetchFn = mockFetch([{ match: /api\.openai\.com/, status: 200, body: {} }]);
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
await adapter.infer(makeCtx());
const [url] = fetchFn.mock.calls[0] as [string, RequestInit];
expect(url).toBe('https://api.openai.com/v1/chat/completions');
});
it('infer: throws for vllm when url is empty (no default)', async () => {
const adapter = new OpenAiPassthroughAdapter('vllm', { fetch: vi.fn() as unknown as typeof fetch });
await expect(adapter.infer(makeCtx())).rejects.toThrow(/no default endpoint/);
});
it('infer: omits Authorization when apiKey is empty', async () => {
const fetchFn = mockFetch([{ match: /ollama/, status: 200, body: {} }]);
const adapter = new OpenAiPassthroughAdapter('ollama', { fetch: fetchFn as unknown as typeof fetch });
await adapter.infer(makeCtx({ url: 'http://ollama:11434', apiKey: '' }));
const [, init] = fetchFn.mock.calls[0] as [string, RequestInit];
const headers = init.headers as Record<string, string>;
expect(headers['Authorization']).toBeUndefined();
});
it('stream: forwards SSE chunks and emits terminal [DONE]', async () => {
const fetchFn = vi.fn(async () => sseResponse([
'data: {"choices":[{"delta":{"content":"hi"}}]}',
'data: {"choices":[{"delta":{"content":"!"}}]}',
'data: [DONE]',
]));
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({ url: 'http://example', body: { model: '', messages: [], stream: true } });
const chunks: { data: string; done?: boolean }[] = [];
for await (const c of adapter.stream(ctx)) chunks.push(c);
expect(chunks).toHaveLength(3);
expect(chunks[2]?.done).toBe(true);
});
});
describe('AnthropicAdapter', () => {
it('infer: translates system+user messages, posts to /v1/messages', async () => {
const fetchFn = mockFetch([{
match: /\/v1\/messages$/,
status: 200,
body: {
id: 'msg_01', model: 'claude-3-5-sonnet-20241022', role: 'assistant',
content: [{ type: 'text', text: 'howdy' }],
stop_reason: 'end_turn',
usage: { input_tokens: 5, output_tokens: 2 },
},
}]);
const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({
body: {
model: '', messages: [
{ role: 'system', content: 'be nice' },
{ role: 'user', content: 'hi' },
],
},
modelOverride: 'claude-3-5-sonnet-20241022',
});
const res = await adapter.infer(ctx);
expect(res.status).toBe(200);
const [url, init] = fetchFn.mock.calls[0] as [string, RequestInit];
expect(url).toBe('https://api.anthropic.com/v1/messages');
const headers = init.headers as Record<string, string>;
expect(headers['x-api-key']).toBe('test-key');
expect(headers['anthropic-version']).toBeDefined();
const sent = JSON.parse(init.body as string) as {
model: string; system: string; messages: Array<{ role: string; content: string }>; max_tokens: number;
};
expect(sent.model).toBe('claude-3-5-sonnet-20241022');
expect(sent.system).toBe('be nice');
expect(sent.messages).toEqual([{ role: 'user', content: 'hi' }]);
expect(sent.max_tokens).toBe(1024); // default
// Response shape: OpenAI chat.completion
const body = res.body as { choices: Array<{ message: { content: string }; finish_reason: string }>; usage: { total_tokens: number } };
expect(body.choices[0]!.message.content).toBe('howdy');
expect(body.choices[0]!.finish_reason).toBe('stop');
expect(body.usage.total_tokens).toBe(7);
});
it('infer: returns a synthetic error body on non-2xx', async () => {
const fetchFn = vi.fn(async () => new Response('boom', { status: 500 }));
const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch });
const res = await adapter.infer(makeCtx({ body: { model: '', messages: [{ role: 'user', content: 'x' }] } }));
expect(res.status).toBe(500);
const body = res.body as { error: { message: string } };
expect(body.error.message).toMatch(/HTTP 500/);
});
it('stream: translates anthropic event stream into OpenAI chunks', async () => {
const events = [
'event: message_start\ndata: {"type":"message_start","message":{"id":"m","content":[]}}',
'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"he"}}',
'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"llo"}}',
'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn"}}',
'event: message_stop\ndata: {"type":"message_stop"}',
];
const fetchFn = vi.fn(async () => sseResponse(events));
const adapter = new AnthropicAdapter({ fetch: fetchFn as unknown as typeof fetch });
const ctx = makeCtx({ body: { model: '', messages: [{ role: 'user', content: 'hi' }], stream: true } });
const chunks: { data: string; done?: boolean }[] = [];
for await (const c of adapter.stream(ctx)) chunks.push(c);
// Expect: role-prime, two text deltas, finish-reason, [DONE]
expect(chunks[chunks.length - 1]?.data).toBe('[DONE]');
expect(chunks[chunks.length - 1]?.done).toBe(true);
// First chunk is the role-prime (role: assistant, content: '').
const first = JSON.parse(chunks[0]!.data) as { choices: [{ delta: { role: string; content: string } }] };
expect(first.choices[0]!.delta.role).toBe('assistant');
// Next two chunks carry the text.
const d1 = JSON.parse(chunks[1]!.data) as { choices: [{ delta: { content: string } }] };
const d2 = JSON.parse(chunks[2]!.data) as { choices: [{ delta: { content: string } }] };
expect(d1.choices[0]!.delta.content).toBe('he');
expect(d2.choices[0]!.delta.content).toBe('llo');
// Finish-reason chunk.
const stopped = JSON.parse(chunks[3]!.data) as { choices: [{ finish_reason: string }] };
expect(stopped.choices[0]!.finish_reason).toBe('stop');
});
});
describe('LlmAdapterRegistry', () => {
it('returns the right adapter kind for each type', () => {
const reg = new LlmAdapterRegistry();
expect(reg.get('openai').kind).toBe('openai');
expect(reg.get('vllm').kind).toBe('vllm');
expect(reg.get('deepseek').kind).toBe('deepseek');
expect(reg.get('ollama').kind).toBe('ollama');
expect(reg.get('anthropic').kind).toBe('anthropic');
});
it('caches adapters between calls', () => {
const reg = new LlmAdapterRegistry();
const a = reg.get('openai');
const b = reg.get('openai');
expect(a).toBe(b);
});
it('rejects unsupported providers (gemini-cli is deferred)', () => {
const reg = new LlmAdapterRegistry();
expect(() => reg.get('gemini-cli')).toThrow(UnsupportedProviderError);
expect(() => reg.get('bogus')).toThrow(UnsupportedProviderError);
});
});

View File

@@ -0,0 +1,208 @@
import { describe, it, expect, vi, afterEach } from 'vitest';
import Fastify from 'fastify';
import type { FastifyInstance } from 'fastify';
import { registerLlmInferRoutes } from '../src/routes/llm-infer.js';
import { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
import { errorHandler } from '../src/middleware/error-handler.js';
import type { LlmView } from '../src/services/llm.service.js';
import { NotFoundError } from '../src/services/mcp-server.service.js';
let app: FastifyInstance;
function makeLlmView(overrides: Partial<LlmView> = {}): LlmView {
return {
id: 'llm-1',
name: 'claude',
type: 'anthropic',
model: 'claude-3-5-sonnet-20241022',
url: '',
tier: 'heavy',
description: '',
apiKeyRef: { name: 'anthropic-key', key: 'token' },
extraConfig: {},
version: 1,
createdAt: new Date(),
updatedAt: new Date(),
...overrides,
};
}
afterEach(async () => {
if (app) await app.close();
});
function sseResponse(events: string[]): Response {
const body = events.join('\n\n') + '\n\n';
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode(body));
controller.close();
},
});
return new Response(stream, { status: 200 });
}
interface LlmServiceLike {
getByName: (name: string) => Promise<LlmView>;
resolveApiKey: (name: string) => Promise<string>;
}
async function setupApp(
llmService: LlmServiceLike,
adapters: LlmAdapterRegistry,
onInferenceEvent?: Parameters<typeof registerLlmInferRoutes>[1]['onInferenceEvent'],
): Promise<FastifyInstance> {
app = Fastify({ logger: false });
app.setErrorHandler(errorHandler);
const deps: Parameters<typeof registerLlmInferRoutes>[1] = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
llmService: llmService as any,
adapters,
};
if (onInferenceEvent !== undefined) deps.onInferenceEvent = onInferenceEvent;
registerLlmInferRoutes(app, deps);
await app.ready();
return app;
}
describe('POST /api/v1/llms/:name/infer', () => {
it('returns 404 when the Llm does not exist', async () => {
const svc: LlmServiceLike = {
getByName: async () => { throw new NotFoundError('Llm not found: missing'); },
resolveApiKey: async () => '',
};
await setupApp(svc, new LlmAdapterRegistry());
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/missing/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(404);
});
it('returns 400 when messages is missing', async () => {
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ apiKeyRef: null }),
resolveApiKey: async () => '',
};
await setupApp(svc, new LlmAdapterRegistry());
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: {},
});
expect(res.statusCode).toBe(400);
});
it('dispatches non-streaming to the adapter and returns its JSON', async () => {
const fetchFn = vi.fn(async () => new Response(JSON.stringify({
id: 'msg_1', model: 'claude-3-5-sonnet-20241022', role: 'assistant',
content: [{ type: 'text', text: 'hello' }],
stop_reason: 'end_turn',
usage: { input_tokens: 1, output_tokens: 1 },
}), { status: 200 }));
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const svc: LlmServiceLike = {
getByName: async () => makeLlmView(),
resolveApiKey: async () => 'sk-ant-xyz',
};
const events: unknown[] = [];
await setupApp(svc, adapters, (e) => events.push(e));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(200);
const body = res.json<{ choices: Array<{ message: { content: string } }> }>();
expect(body.choices[0]!.message.content).toBe('hello');
// Audit event emitted
expect(events).toHaveLength(1);
expect((events[0] as { kind: string; llmName: string; status: number }).kind).toBe('llm_inference_call');
expect((events[0] as { llmName: string }).llmName).toBe('claude');
expect((events[0] as { streaming: boolean }).streaming).toBe(false);
expect((events[0] as { status: number }).status).toBe(200);
});
it('500s when apiKey resolution fails', async () => {
const adapters = new LlmAdapterRegistry();
const svc: LlmServiceLike = {
getByName: async () => makeLlmView(),
resolveApiKey: async () => { throw new Error('secret not found'); },
};
await setupApp(svc, adapters);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(500);
expect(res.json<{ error: string }>().error).toMatch(/secret not found/);
});
it('skips apiKey resolution when the Llm has no apiKeyRef', async () => {
const fetchFn = vi.fn(async () => new Response(JSON.stringify({ id: 'x', choices: [] }), { status: 200 }));
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const resolveSpy = vi.fn();
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ type: 'ollama', url: 'http://ollama:11434', apiKeyRef: null }),
resolveApiKey: resolveSpy as unknown as LlmServiceLike['resolveApiKey'],
};
await setupApp(svc, adapters);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/ollama-local/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(200);
expect(resolveSpy).not.toHaveBeenCalled();
});
it('streams SSE chunks for stream: true', async () => {
const fetchFn = vi.fn(async () => sseResponse([
'event: content_block_delta\ndata: {"type":"content_block_delta","delta":{"type":"text_delta","text":"hi"}}',
'event: message_stop\ndata: {"type":"message_stop"}',
]));
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const svc: LlmServiceLike = {
getByName: async () => makeLlmView(),
resolveApiKey: async () => 'sk-ant-xyz',
};
const events: Array<{ streaming: boolean; status: number }> = [];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await setupApp(svc, adapters, ((e: any) => events.push(e)) as any);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/claude/infer',
payload: { messages: [{ role: 'user', content: 'hi' }], stream: true },
});
expect(res.statusCode).toBe(200);
expect(res.body).toContain('data:');
expect(res.body).toContain('[DONE]');
expect(events).toHaveLength(1);
expect(events[0]!.streaming).toBe(true);
});
it('502s on adapter errors (non-streaming)', async () => {
const fetchFn = vi.fn(async () => { throw new Error('upstream down'); });
const adapters = new LlmAdapterRegistry({ fetch: fetchFn as unknown as typeof fetch });
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ type: 'openai', url: 'http://example', apiKeyRef: null }),
resolveApiKey: async () => '',
};
await setupApp(svc, adapters);
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/x/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(502);
expect(res.json<{ error: string }>().error).toMatch(/upstream down/);
});
});

View File

@@ -104,6 +104,25 @@ describe('Llm Routes', () => {
expect(res.statusCode).toBe(404);
});
it('GET /api/v1/llms/:nameOrId resolves by human name when not a CUID', async () => {
await createApp(mockRepo([makeLlm({ id: 'llm-1', name: 'claude' })]));
const res = await app.inject({ method: 'GET', url: '/api/v1/llms/claude' });
expect(res.statusCode).toBe(200);
expect(res.json<{ name: string; id: string }>().name).toBe('claude');
});
it('HEAD /api/v1/llms/:name returns 200 for an existing Llm (failover RBAC pre-check)', async () => {
await createApp(mockRepo([makeLlm({ name: 'claude' })]));
const res = await app.inject({ method: 'HEAD', url: '/api/v1/llms/claude' });
expect(res.statusCode).toBe(200);
});
it('HEAD /api/v1/llms/:name returns 404 for a missing Llm', async () => {
await createApp(mockRepo());
const res = await app.inject({ method: 'HEAD', url: '/api/v1/llms/missing' });
expect(res.statusCode).toBe(404);
});
it('POST /api/v1/llms creates and returns 201', async () => {
await createApp(mockRepo());
const res = await app.inject({

View File

@@ -64,6 +64,14 @@ export interface LlmProviderFileEntry {
idleTimeoutMinutes?: number;
/** vllm-managed: extra args for `vllm serve` */
extraArgs?: string[];
/**
* If set, this local provider is allowed to substitute for the centralized
* Llm of this name when the mcpd inference proxy is unreachable.
* RBAC is still enforced — the caller must have view permission on the
* named Llm via mcpd before failover is permitted (fail-closed if mcpd
* itself can't be reached).
*/
failoverFor?: string;
}
export interface ProjectLlmOverride {

View File

@@ -173,6 +173,9 @@ export async function createProvidersFromConfig(
if (entry.tier) {
registry.assignTier(provider.name, entry.tier);
}
if (entry.failoverFor) {
registry.registerFailover(entry.failoverFor, provider.name);
}
}
return registry;

View File

@@ -0,0 +1,107 @@
/**
* FailoverRouter — orchestrates "try mcpd's centralized Llm, fall back to a
* local provider when authorized" for clients that consume the inference
* proxy.
*
* Decision flow on a centralized inference call:
*
* 1. Call the primary (the supplied `primary` callback, typically an HTTP
* POST to mcpd /api/v1/llms/:name/infer).
* 2. If that succeeds → done.
* 3. If it fails AND a local provider is registered as failover for this
* Llm name → call mcpd /api/v1/llms/:name (RBAC-gated) to verify the
* caller still has permission to view this Llm. mcpd unreachable →
* fail-closed (re-throw the original error). 403 → fail-closed.
* 4. 200 → invoke the local provider's `complete()` and tag the result
* as `failover: true` for client-side audit.
*
* The check call uses HEAD to avoid pulling the Llm body (and any
* description / extraConfig) over the wire — mcpd treats both methods the
* same in the RBAC hook because the URL maps to the same permission.
*/
import type { LlmProvider } from './types.js';
import type { ProviderRegistry } from './registry.js';
export interface FailoverDecision<T> {
result: T;
failover: boolean;
/** Name of the local provider used (only set when failover === true). */
via?: string;
}
export interface FailoverRouterDeps {
/** Injected fetch for the RBAC pre-check. Tests mock this. */
fetch?: typeof globalThis.fetch;
/** mcpd base URL (no trailing slash). */
mcpdUrl: string;
/** Bearer token to attach to the RBAC pre-check call. */
bearerToken?: string;
}
/** Outcome of the RBAC pre-check. Used internally + exposed for tests. */
export type AuthCheckOutcome = 'allowed' | 'forbidden' | 'unreachable';
export class FailoverRouter {
private readonly fetchImpl: typeof globalThis.fetch;
private readonly mcpdUrl: string;
private readonly bearer: string | undefined;
constructor(
private readonly registry: ProviderRegistry,
deps: FailoverRouterDeps,
) {
this.fetchImpl = deps.fetch ?? globalThis.fetch;
this.mcpdUrl = deps.mcpdUrl.replace(/\/+$/, '');
if (deps.bearerToken !== undefined) this.bearer = deps.bearerToken;
}
/**
* Run a primary inference attempt; on failure, fall back to the local
* provider if one is registered for this Llm AND the caller still has
* `view:llms:<llmName>` on mcpd.
*
* `primary` should reject (throw) when mcpd's proxy is unreachable or
* returns a 5xx — that's the signal to consider failover. 4xx errors that
* indicate a bad request are surfaced as-is; the router only retries on
* primary failure shapes that look like an upstream/network issue.
*/
async run<T>(
llmName: string,
primary: () => Promise<T>,
localCall: (provider: LlmProvider) => Promise<T>,
): Promise<FailoverDecision<T>> {
try {
const result = await primary();
return { result, failover: false };
} catch (primaryErr) {
const local = this.registry.getFailoverFor(llmName);
if (local === null) throw primaryErr;
const auth = await this.checkAuth(llmName);
if (auth !== 'allowed') {
// Fail-closed for forbidden AND unreachable.
throw primaryErr;
}
const result = await localCall(local);
return { result, failover: true, via: local.name };
}
}
/** RBAC pre-check exposed for tests / status-display callers. */
async checkAuth(llmName: string): Promise<AuthCheckOutcome> {
const url = `${this.mcpdUrl}/api/v1/llms/${encodeURIComponent(llmName)}`;
const headers: Record<string, string> = {};
if (this.bearer !== undefined) headers['Authorization'] = `Bearer ${this.bearer}`;
let res: Response;
try {
res = await this.fetchImpl(url, { method: 'HEAD', headers });
} catch {
return 'unreachable';
}
if (res.status === 200 || res.status === 204) return 'allowed';
if (res.status === 403 || res.status === 401) return 'forbidden';
// Anything else (404, 500…) — treat as unreachable for the failover flow.
return 'unreachable';
}
}

View File

@@ -8,6 +8,8 @@ export class ProviderRegistry {
private providers = new Map<string, LlmProvider>();
private activeProvider: string | null = null;
private tierProviders = new Map<Tier, string[]>();
/** Maps a centralized Llm name → local provider name that can substitute when mcpd is unreachable. */
private failoverMap = new Map<string, string>();
register(provider: LlmProvider): void {
this.providers.set(provider.name, provider);
@@ -31,6 +33,30 @@ export class ProviderRegistry {
this.tierProviders.set(tier, filtered);
}
}
// Remove from failover map (any entry whose local-provider value points at this name)
for (const [centralName, localName] of this.failoverMap) {
if (localName === name) this.failoverMap.delete(centralName);
}
}
/** Mark `localProviderName` as the failover for the centralized Llm named `centralLlmName`. */
registerFailover(centralLlmName: string, localProviderName: string): void {
if (!this.providers.has(localProviderName)) {
throw new Error(`Provider '${localProviderName}' is not registered`);
}
this.failoverMap.set(centralLlmName, localProviderName);
}
/** Look up the local provider that can substitute for a centralized Llm, if any. */
getFailoverFor(centralLlmName: string): LlmProvider | null {
const localName = this.failoverMap.get(centralLlmName);
if (localName === undefined) return null;
return this.providers.get(localName) ?? null;
}
/** Names of central Llms that have a local failover registered. Used in status output. */
listFailovers(): Array<{ centralLlmName: string; localProviderName: string }> {
return [...this.failoverMap.entries()].map(([centralLlmName, localProviderName]) => ({ centralLlmName, localProviderName }));
}
setActive(name: string): void {

View File

@@ -0,0 +1,170 @@
import { describe, it, expect, vi } from 'vitest';
import { ProviderRegistry } from '../src/providers/registry.js';
import { FailoverRouter } from '../src/providers/failover-router.js';
import type { LlmProvider, CompleteResponse } from '../src/providers/types.js';
function fakeProvider(name: string): LlmProvider {
const completeFn = vi.fn(async (): Promise<CompleteResponse> => ({
content: 'local response',
finishReason: 'stop',
}));
return {
name,
complete: completeFn,
listModels: vi.fn(async () => [name]),
isAvailable: vi.fn(async () => true),
};
}
function makeFetch(behaviour: { method: string; status?: number; throw?: boolean }): ReturnType<typeof vi.fn> {
return vi.fn(async (url: string | URL, init?: RequestInit) => {
if (behaviour.throw === true) throw new Error('connection refused');
expect(init?.method).toBe(behaviour.method);
expect(String(url)).toMatch(/\/api\/v1\/llms\//);
return new Response(null, { status: behaviour.status ?? 200 });
});
}
describe('ProviderRegistry — failover map', () => {
it('registerFailover maps a central name → local provider name', () => {
const reg = new ProviderRegistry();
const local = fakeProvider('vllm-local');
reg.register(local);
reg.registerFailover('claude', 'vllm-local');
const found = reg.getFailoverFor('claude');
expect(found?.name).toBe('vllm-local');
});
it('getFailoverFor returns null when no map entry exists', () => {
const reg = new ProviderRegistry();
reg.register(fakeProvider('vllm-local'));
expect(reg.getFailoverFor('claude')).toBeNull();
});
it('registerFailover throws when local provider is not registered', () => {
const reg = new ProviderRegistry();
expect(() => reg.registerFailover('claude', 'missing')).toThrow(/not registered/);
});
it('unregister removes failover entries that pointed at the removed provider', () => {
const reg = new ProviderRegistry();
reg.register(fakeProvider('vllm-local'));
reg.registerFailover('claude', 'vllm-local');
reg.unregister('vllm-local');
expect(reg.getFailoverFor('claude')).toBeNull();
expect(reg.listFailovers()).toEqual([]);
});
it('listFailovers reports the current map', () => {
const reg = new ProviderRegistry();
reg.register(fakeProvider('vllm-local'));
reg.registerFailover('claude', 'vllm-local');
reg.registerFailover('opus', 'vllm-local');
expect(reg.listFailovers()).toEqual([
{ centralLlmName: 'claude', localProviderName: 'vllm-local' },
{ centralLlmName: 'opus', localProviderName: 'vllm-local' },
]);
});
});
describe('FailoverRouter', () => {
it('returns primary result when primary succeeds', async () => {
const reg = new ProviderRegistry();
reg.register(fakeProvider('vllm-local'));
reg.registerFailover('claude', 'vllm-local');
const router = new FailoverRouter(reg, {
mcpdUrl: 'http://mcpd',
fetch: vi.fn() as unknown as typeof fetch,
});
const out = await router.run('claude', async () => 'central', async () => 'local');
expect(out.failover).toBe(false);
expect(out.result).toBe('central');
});
it('falls back to local when primary fails AND mcpd auth-checks 200', async () => {
const reg = new ProviderRegistry();
reg.register(fakeProvider('vllm-local'));
reg.registerFailover('claude', 'vllm-local');
const fetchFn = makeFetch({ method: 'HEAD', status: 200 });
const router = new FailoverRouter(reg, {
mcpdUrl: 'http://mcpd',
fetch: fetchFn as unknown as typeof fetch,
bearerToken: 'bearer-x',
});
const out = await router.run(
'claude',
async () => { throw new Error('upstream down'); },
async (provider) => `via:${provider.name}`,
);
expect(out.failover).toBe(true);
expect(out.via).toBe('vllm-local');
expect(out.result).toBe('via:vllm-local');
// Bearer was attached
const [, init] = fetchFn.mock.calls[0] as [string, RequestInit];
expect((init.headers as Record<string, string>)['Authorization']).toBe('Bearer bearer-x');
});
it('re-throws primary error when no local failover is registered', async () => {
const reg = new ProviderRegistry();
const router = new FailoverRouter(reg, {
mcpdUrl: 'http://mcpd',
fetch: vi.fn() as unknown as typeof fetch,
});
await expect(router.run(
'claude',
async () => { throw new Error('boom'); },
async () => 'never',
)).rejects.toThrow('boom');
});
it('re-throws (fail-closed) when mcpd returns 403 to the auth check', async () => {
const reg = new ProviderRegistry();
reg.register(fakeProvider('vllm-local'));
reg.registerFailover('claude', 'vllm-local');
const router = new FailoverRouter(reg, {
mcpdUrl: 'http://mcpd',
fetch: makeFetch({ method: 'HEAD', status: 403 }) as unknown as typeof fetch,
});
await expect(router.run(
'claude',
async () => { throw new Error('upstream down'); },
async () => 'never',
)).rejects.toThrow('upstream down');
});
it('re-throws (fail-closed) when mcpd itself is unreachable for the auth check', async () => {
const reg = new ProviderRegistry();
reg.register(fakeProvider('vllm-local'));
reg.registerFailover('claude', 'vllm-local');
const router = new FailoverRouter(reg, {
mcpdUrl: 'http://mcpd',
fetch: makeFetch({ method: 'HEAD', throw: true }) as unknown as typeof fetch,
});
await expect(router.run(
'claude',
async () => { throw new Error('upstream down'); },
async () => 'never',
)).rejects.toThrow('upstream down');
});
it('checkAuth maps responses correctly', async () => {
const reg = new ProviderRegistry();
const make = (status: number) => new FailoverRouter(reg, {
mcpdUrl: 'http://mcpd',
fetch: (async () => new Response(null, { status })) as unknown as typeof fetch,
});
expect(await make(200).checkAuth('claude')).toBe('allowed');
expect(await make(204).checkAuth('claude')).toBe('allowed');
expect(await make(401).checkAuth('claude')).toBe('forbidden');
expect(await make(403).checkAuth('claude')).toBe('forbidden');
expect(await make(404).checkAuth('claude')).toBe('unreachable');
expect(await make(500).checkAuth('claude')).toBe('unreachable');
});
});