fix: MCP proxy resilience — timeouts, parallel discovery, error propagation #48

Merged
michal merged 2 commits from feat/k8s-operator into main 2026-04-10 17:29:34 +00:00
8 changed files with 349 additions and 45 deletions
Showing only changes of commit 857f8c72ae - Show all commits

View File

@@ -132,6 +132,15 @@ export async function runMcpBridge(opts: McpBridgeOptions): Promise<void> {
const trimmed = line.trim(); const trimmed = line.trim();
if (!trimmed) continue; if (!trimmed) continue;
// Parse request ID for error responses
let requestId: unknown = null;
try {
const parsed = JSON.parse(trimmed) as Record<string, unknown>;
requestId = parsed.id ?? null;
} catch {
// Non-JSON or notification — no id to respond to
}
try { try {
const result = await postJsonRpc(endpointUrl, trimmed, sessionId, token); const result = await postJsonRpc(endpointUrl, trimmed, sessionId, token);
@@ -156,7 +165,18 @@ export async function runMcpBridge(opts: McpBridgeOptions): Promise<void> {
} }
} }
} catch (err) { } catch (err) {
stderr.write(`MCP bridge error: ${err instanceof Error ? err.message : String(err)}\n`); const errMsg = err instanceof Error ? err.message : String(err);
stderr.write(`MCP bridge error: ${errMsg}\n`);
// Send JSON-RPC error response so the client doesn't hang
if (requestId !== null) {
const errorResponse = JSON.stringify({
jsonrpc: '2.0',
id: requestId,
error: { code: -32603, message: `Bridge error: ${errMsg}` },
});
stdout.write(errorResponse + '\n');
}
} }
} }

View File

@@ -347,7 +347,7 @@ describe('MCP STDIO Bridge', () => {
expect(recorded.filter((r) => r.method === 'DELETE')).toHaveLength(0); expect(recorded.filter((r) => r.method === 'DELETE')).toHaveLength(0);
}); });
it('writes errors to stderr, not stdout', async () => { it('writes errors to stderr and sends JSON-RPC error to stdout', async () => {
recorded.length = 0; recorded.length = 0;
const stdin = new Readable({ read() {} }); const stdin = new Readable({ read() {} });
const { stdout, stdoutChunks, stderr, stderrChunks } = createMockStreams(); const { stdout, stdoutChunks, stderr, stderrChunks } = createMockStreams();
@@ -364,8 +364,12 @@ describe('MCP STDIO Bridge', () => {
// Error should be on stderr // Error should be on stderr
expect(stderrChunks.join('')).toContain('MCP bridge error'); expect(stderrChunks.join('')).toContain('MCP bridge error');
// stdout should be empty (no corrupted output) // stdout should contain a JSON-RPC error response so the client doesn't hang
expect(stdoutChunks.join('')).toBe(''); const out = stdoutChunks.join('');
const parsed = JSON.parse(out.trim()) as { id: number; error: { code: number; message: string } };
expect(parsed.id).toBe(1);
expect(parsed.error.code).toBe(-32603);
expect(parsed.error.message).toContain('Bridge error');
}); });
it('skips blank lines in stdin', async () => { it('skips blank lines in stdin', async () => {

View File

@@ -20,24 +20,29 @@ export class ConnectionError extends Error {
} }
} }
/** Default timeout for mcpd requests (ms). Prevents indefinite hangs on slow upstream tool calls. */
const DEFAULT_TIMEOUT_MS = 30_000;
export class McpdClient { export class McpdClient {
private readonly baseUrl: string; private readonly baseUrl: string;
private readonly token: string; private readonly token: string;
private readonly extraHeaders: Record<string, string>; private readonly extraHeaders: Record<string, string>;
private readonly timeoutMs: number;
constructor(baseUrl: string, token: string, extraHeaders?: Record<string, string>) { constructor(baseUrl: string, token: string, extraHeaders?: Record<string, string>, timeoutMs?: number) {
// Strip trailing slash for consistent URL joining // Strip trailing slash for consistent URL joining
this.baseUrl = baseUrl.replace(/\/+$/, ''); this.baseUrl = baseUrl.replace(/\/+$/, '');
this.token = token; this.token = token;
this.extraHeaders = extraHeaders ?? {}; this.extraHeaders = extraHeaders ?? {};
this.timeoutMs = timeoutMs ?? DEFAULT_TIMEOUT_MS;
} }
/** /**
* Create a new client with additional default headers. * Create a new client with additional default headers.
* Inherits base URL and token from the current client. * Inherits base URL, token, and timeout from the current client.
*/ */
withHeaders(headers: Record<string, string>): McpdClient { withHeaders(headers: Record<string, string>): McpdClient {
return new McpdClient(this.baseUrl, this.token, { ...this.extraHeaders, ...headers }); return new McpdClient(this.baseUrl, this.token, { ...this.extraHeaders, ...headers }, this.timeoutMs);
} }
async get<T>(path: string): Promise<T> { async get<T>(path: string): Promise<T> {
@@ -77,7 +82,11 @@ export class McpdClient {
'Accept': 'application/json', 'Accept': 'application/json',
}; };
const init: RequestInit = { method, headers }; const init: RequestInit = {
method,
headers,
signal: AbortSignal.timeout(this.timeoutMs),
};
if (body !== undefined && body !== null && method !== 'GET' && method !== 'HEAD') { if (body !== undefined && body !== null && method !== 'GET' && method !== 'HEAD') {
headers['Content-Type'] = 'application/json'; headers['Content-Type'] = 'application/json';
init.body = JSON.stringify(body); init.body = JSON.stringify(body);
@@ -87,6 +96,9 @@ export class McpdClient {
try { try {
res = await fetch(url, init); res = await fetch(url, init);
} catch (err: unknown) { } catch (err: unknown) {
if (err instanceof DOMException && err.name === 'TimeoutError') {
throw new ConnectionError(this.baseUrl, new Error(`Request timed out after ${this.timeoutMs}ms`));
}
throw new ConnectionError(this.baseUrl, err); throw new ConnectionError(this.baseUrl, err);
} }

View File

@@ -37,6 +37,8 @@ export interface ManagedVllmStatus {
const POLL_INTERVAL_MS = 2000; const POLL_INTERVAL_MS = 2000;
const STARTUP_TIMEOUT_MS = 120_000; const STARTUP_TIMEOUT_MS = 120_000;
/** After entering error state, wait this long before retrying startup. */
const ERROR_COOLDOWN_MS = 60_000;
/** /**
* Managed vLLM provider — spawns and manages a local vLLM process. * Managed vLLM provider — spawns and manages a local vLLM process.
@@ -54,6 +56,7 @@ export class ManagedVllmProvider implements LlmProvider {
private lastError: string | null = null; private lastError: string | null = null;
private lastUsed = 0; private lastUsed = 0;
private startedAt = 0; private startedAt = 0;
private errorAt = 0;
private idleTimer: ReturnType<typeof setInterval> | null = null; private idleTimer: ReturnType<typeof setInterval> | null = null;
private startPromise: Promise<void> | null = null; private startPromise: Promise<void> | null = null;
@@ -140,6 +143,11 @@ export class ManagedVllmProvider implements LlmProvider {
return this.startPromise; return this.startPromise;
} }
// Fast-fail if we recently errored — don't retry startup on every call
if (this.state === 'error' && (Date.now() - this.errorAt) < ERROR_COOLDOWN_MS) {
throw new Error(this.lastError ?? 'vLLM in error state (cooldown active)');
}
this.startPromise = this.doStart(); this.startPromise = this.doStart();
try { try {
await this.startPromise; await this.startPromise;
@@ -215,6 +223,7 @@ export class ManagedVllmProvider implements LlmProvider {
} }
this.killProcess(); this.killProcess();
this.state = 'error'; this.state = 'error';
this.errorAt = Date.now();
throw new Error(this.lastError); throw new Error(this.lastError);
} }
@@ -243,6 +252,7 @@ export class ManagedVllmProvider implements LlmProvider {
} catch (err) { } catch (err) {
if (this.state === 'starting') { if (this.state === 'starting') {
this.state = 'error'; this.state = 'error';
this.errorAt = Date.now();
this.lastError = (err as Error).message; this.lastError = (err as Error).message;
} }
throw err; throw err;

View File

@@ -252,8 +252,10 @@ export class McpRouter {
async discoverTools(correlationId?: string): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> { async discoverTools(correlationId?: string): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = []; const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
for (const [serverName, upstream] of this.upstreams) { // Discover tools from all servers in parallel so one slow server doesn't block the rest
try { const entries = [...this.upstreams.entries()];
const results = await Promise.allSettled(
entries.map(async ([serverName, upstream]) => {
const req = { const req = {
jsonrpc: '2.0' as const, jsonrpc: '2.0' as const,
id: `discover-tools-${serverName}`, id: `discover-tools-${serverName}`,
@@ -268,30 +270,37 @@ export class McpRouter {
} else { } else {
response = await upstream.send(req); response = await upstream.send(req);
} }
return { serverName, upstream, response };
}),
);
if (response.error) { for (const result of results) {
console.warn(`[discoverTools] ${serverName}: ${(response.error as { message?: string }).message ?? 'unknown error'}`); if (result.status === 'rejected') {
} else if (response.result && typeof response.result === 'object' && 'tools' in response.result) { console.warn(`[discoverTools] ${(result.reason as Error).message ?? 'unknown error'}`);
const tools = (response.result as { tools: Array<{ name: string; description?: string; inputSchema?: unknown }> }).tools; continue;
for (const tool of tools) { }
const namespacedName = `${serverName}/${tool.name}`; const { serverName, upstream, response } = result.value;
this.toolToServer.set(namespacedName, serverName);
// Enrich description with server context if available if (response.error) {
const entry: { name: string; description?: string; inputSchema?: unknown } = { console.warn(`[discoverTools] ${serverName}: ${(response.error as { message?: string }).message ?? 'unknown error'}`);
...tool, } else if (response.result && typeof response.result === 'object' && 'tools' in response.result) {
name: namespacedName, const tools = (response.result as { tools: Array<{ name: string; description?: string; inputSchema?: unknown }> }).tools;
}; for (const tool of tools) {
if (upstream.description && tool.description) { const namespacedName = `${serverName}/${tool.name}`;
entry.description = `[${upstream.description}] ${tool.description}`; this.toolToServer.set(namespacedName, serverName);
} else if (upstream.description) { // Enrich description with server context if available
entry.description = `[${upstream.description}]`; const entry: { name: string; description?: string; inputSchema?: unknown } = {
} ...tool,
// If neither upstream.description nor tool.description, keep tool.description (may be undefined — that's fine, just don't set it) name: namespacedName,
allTools.push(entry); };
if (upstream.description && tool.description) {
entry.description = `[${upstream.description}] ${tool.description}`;
} else if (upstream.description) {
entry.description = `[${upstream.description}]`;
} }
// If neither upstream.description nor tool.description, keep tool.description (may be undefined — that's fine, just don't set it)
allTools.push(entry);
} }
} catch (err) {
console.warn(`[discoverTools] ${serverName}: ${err instanceof Error ? err.message : err}`);
} }
} }
@@ -304,8 +313,10 @@ export class McpRouter {
async discoverResources(correlationId?: string): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> { async discoverResources(correlationId?: string): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> {
const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = []; const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = [];
for (const [serverName, upstream] of this.upstreams) { // Discover resources from all servers in parallel
try { const entries = [...this.upstreams.entries()];
const results = await Promise.allSettled(
entries.map(async ([serverName, upstream]) => {
const req = { const req = {
jsonrpc: '2.0' as const, jsonrpc: '2.0' as const,
id: `discover-resources-${serverName}`, id: `discover-resources-${serverName}`,
@@ -320,20 +331,24 @@ export class McpRouter {
} else { } else {
response = await upstream.send(req); response = await upstream.send(req);
} }
return { serverName, response };
}),
);
if (response.result && typeof response.result === 'object' && 'resources' in response.result) { for (const result of results) {
const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources; if (result.status === 'rejected') continue;
for (const resource of resources) { const { serverName, response } = result.value;
const namespacedUri = `${serverName}://${resource.uri}`;
this.resourceToServer.set(namespacedUri, serverName); if (response.result && typeof response.result === 'object' && 'resources' in response.result) {
allResources.push({ const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources;
...resource, for (const resource of resources) {
uri: namespacedUri, const namespacedUri = `${serverName}://${resource.uri}`;
}); this.resourceToServer.set(namespacedUri, serverName);
} allResources.push({
...resource,
uri: namespacedUri,
});
} }
} catch {
// Server may be unavailable; skip its resources
} }
} }

View File

@@ -0,0 +1,168 @@
import { describe, it, expect, afterAll, afterEach } from 'vitest';
import http from 'node:http';
import { McpdClient, ConnectionError } from '../src/http/mcpd-client.js';
/**
* Create a local HTTP server for testing McpdClient behavior.
* Returns the server and its URL.
*/
function createTestServer(
handler: (req: http.IncomingMessage, res: http.ServerResponse) => void,
): Promise<{ server: http.Server; url: string }> {
return new Promise((resolve) => {
const server = http.createServer(handler);
server.listen(0, '127.0.0.1', () => {
const addr = server.address() as { port: number };
resolve({ server, url: `http://127.0.0.1:${addr.port}` });
});
});
}
describe('McpdClient', () => {
const servers: http.Server[] = [];
afterEach(() => {
for (const s of servers) s.close();
servers.length = 0;
});
afterAll(() => {
for (const s of servers) s.close();
});
it('makes GET requests with auth header', async () => {
let capturedAuth = '';
const { server, url } = await createTestServer((req, res) => {
capturedAuth = req.headers['authorization'] ?? '';
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ ok: true }));
});
servers.push(server);
const client = new McpdClient(url, 'my-token');
const result = await client.get<{ ok: boolean }>('/api/v1/test');
expect(result).toEqual({ ok: true });
expect(capturedAuth).toBe('Bearer my-token');
});
it('makes POST requests with JSON body', async () => {
let capturedBody = '';
const { server, url } = await createTestServer((req, res) => {
const chunks: Buffer[] = [];
req.on('data', (c: Buffer) => chunks.push(c));
req.on('end', () => {
capturedBody = Buffer.concat(chunks).toString();
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ received: true }));
});
});
servers.push(server);
const client = new McpdClient(url, 'tok');
const result = await client.post<{ received: boolean }>('/api/v1/proxy', { serverId: 's1' });
expect(result).toEqual({ received: true });
expect(JSON.parse(capturedBody)).toEqual({ serverId: 's1' });
});
it('throws ConnectionError on connection refused', async () => {
const client = new McpdClient('http://127.0.0.1:1', 'tok');
await expect(client.get('/test')).rejects.toThrow(ConnectionError);
});
it('throws on 4xx/5xx responses', async () => {
const { server, url } = await createTestServer((_req, res) => {
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'internal' }));
});
servers.push(server);
const client = new McpdClient(url, 'tok');
await expect(client.get('/test')).rejects.toThrow(/mcpd returned 500/);
});
// ── Timeout behavior ──
it('times out on slow responses and throws ConnectionError', async () => {
const { server, url } = await createTestServer((_req, _res) => {
// Never respond — simulates a hanging upstream tool call
});
servers.push(server);
// Use a very short timeout for the test
const client = new McpdClient(url, 'tok', undefined, 500);
const start = Date.now();
await expect(client.post('/api/v1/mcp/proxy', { serverId: 's1' })).rejects.toThrow(
/timed out/,
);
const elapsed = Date.now() - start;
// Should have timed out around 500ms, not hung for seconds
expect(elapsed).toBeGreaterThanOrEqual(450);
expect(elapsed).toBeLessThan(3000);
});
it('timeout error is a ConnectionError with descriptive message', async () => {
const { server, url } = await createTestServer((_req, _res) => {
// Never respond
});
servers.push(server);
const client = new McpdClient(url, 'tok', undefined, 200);
try {
await client.get('/test');
expect.unreachable('Should have thrown');
} catch (err) {
expect(err).toBeInstanceOf(ConnectionError);
expect((err as Error).message).toContain('Request timed out after 200ms');
}
});
it('fast responses succeed within the timeout window', async () => {
const { server, url } = await createTestServer((_req, res) => {
// Respond immediately
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ fast: true }));
});
servers.push(server);
// Short timeout, but response is immediate — should work
const client = new McpdClient(url, 'tok', undefined, 500);
const result = await client.get<{ fast: boolean }>('/test');
expect(result).toEqual({ fast: true });
});
it('withHeaders preserves timeout', async () => {
const { server, url } = await createTestServer((_req, _res) => {
// Never respond
});
servers.push(server);
const client = new McpdClient(url, 'tok', undefined, 300);
const derived = client.withHeaders({ 'X-Custom': 'val' });
const start = Date.now();
await expect(derived.get('/test')).rejects.toThrow(/timed out/);
const elapsed = Date.now() - start;
expect(elapsed).toBeLessThan(2000);
});
it('default timeout is 30 seconds', async () => {
// We can't wait 30s in a test, but we can verify the error message format
// when a custom timeout is not set. Use a fast-failing server instead.
const { server, url } = await createTestServer((_req, res) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ ok: true }));
});
servers.push(server);
// Default constructor — should work for fast responses
const client = new McpdClient(url, 'tok');
const result = await client.get<{ ok: boolean }>('/test');
expect(result).toEqual({ ok: true });
});
});

View File

@@ -157,6 +157,45 @@ describe('McpRouter', () => {
expect(result.tools).toHaveLength(1); expect(result.tools).toHaveLength(1);
expect(result.tools[0]?.name).toBe('working/do_thing'); expect(result.tools[0]?.name).toBe('working/do_thing');
}); });
it('slow upstream does not block fast upstreams (parallel discovery)', async () => {
// Simulate a server that takes 5s to respond to tools/list
const slowUpstream = mockUpstream('slow-server', {
tools: [{ name: 'slow_tool' }],
});
vi.mocked(slowUpstream.send).mockImplementation(
() => new Promise((resolve) => setTimeout(() => resolve({
jsonrpc: '2.0' as const,
id: 'delayed',
result: { tools: [{ name: 'slow_tool' }] },
}), 5000)),
);
const fastUpstream = mockUpstream('fast-server', {
tools: [{ name: 'fast_tool', description: 'Responds instantly' }],
});
router.addUpstream(slowUpstream);
router.addUpstream(fastUpstream);
const start = Date.now();
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/list',
});
const elapsed = Date.now() - start;
const result = res.result as { tools: Array<{ name: string }> };
// Both servers' tools should be present (parallel, not sequential)
expect(result.tools).toHaveLength(2);
expect(result.tools.map((t) => t.name)).toContain('fast-server/fast_tool');
expect(result.tools.map((t) => t.name)).toContain('slow-server/slow_tool');
// Should complete in ~5s (parallel), not ~5s + fast (sequential wouldn't matter here)
// but critically, if this were sequential with a truly hanging server, it would never complete.
// The key assertion: it took roughly the slow server's time, not slow + fast.
expect(elapsed).toBeLessThan(7000);
}, 10_000);
}); });
describe('tools/call', () => { describe('tools/call', () => {

View File

@@ -294,4 +294,40 @@ describe('ManagedVllmProvider', () => {
provider.dispose(); provider.dispose();
}); });
}); });
describe('error cooldown', () => {
it('fast-fails during cooldown instead of retrying startup', async () => {
const { provider, fakeProcess, healthCheckFn, spawnFn } = createProvider();
healthCheckFn.mockResolvedValue(false);
// First attempt: triggers startup, process exits with error
const p1 = (provider as unknown as { ensureRunning(): Promise<void> }).ensureRunning();
p1.catch(() => {});
(fakeProcess as Record<string, unknown>).exitCode = 1;
fakeProcess._emit('exit', 1);
await vi.advanceTimersByTimeAsync(2100);
await expect(p1).rejects.toThrow();
expect(provider.getStatus().state).toBe('error');
expect(spawnFn).toHaveBeenCalledOnce();
// Second attempt within cooldown: should throw immediately without spawning
await expect(
(provider as unknown as { ensureRunning(): Promise<void> }).ensureRunning(),
).rejects.toThrow();
expect(spawnFn).toHaveBeenCalledOnce(); // no extra spawn
// After cooldown (60s): should retry
const newProc = createFakeProcess();
spawnFn.mockReturnValue(newProc);
healthCheckFn.mockResolvedValue(true);
await vi.advanceTimersByTimeAsync(60_000);
const p3 = (provider as unknown as { ensureRunning(): Promise<void> }).ensureRunning();
await vi.advanceTimersByTimeAsync(2100);
await p3;
expect(spawnFn).toHaveBeenCalledTimes(2);
expect(provider.getStatus().state).toBe('running');
});
});
}); });