fix: MCP proxy resilience — timeouts, parallel discovery, error propagation #48
@@ -132,6 +132,15 @@ export async function runMcpBridge(opts: McpBridgeOptions): Promise<void> {
|
|||||||
const trimmed = line.trim();
|
const trimmed = line.trim();
|
||||||
if (!trimmed) continue;
|
if (!trimmed) continue;
|
||||||
|
|
||||||
|
// Parse request ID for error responses
|
||||||
|
let requestId: unknown = null;
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(trimmed) as Record<string, unknown>;
|
||||||
|
requestId = parsed.id ?? null;
|
||||||
|
} catch {
|
||||||
|
// Non-JSON or notification — no id to respond to
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await postJsonRpc(endpointUrl, trimmed, sessionId, token);
|
const result = await postJsonRpc(endpointUrl, trimmed, sessionId, token);
|
||||||
|
|
||||||
@@ -156,7 +165,18 @@ export async function runMcpBridge(opts: McpBridgeOptions): Promise<void> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
stderr.write(`MCP bridge error: ${err instanceof Error ? err.message : String(err)}\n`);
|
const errMsg = err instanceof Error ? err.message : String(err);
|
||||||
|
stderr.write(`MCP bridge error: ${errMsg}\n`);
|
||||||
|
|
||||||
|
// Send JSON-RPC error response so the client doesn't hang
|
||||||
|
if (requestId !== null) {
|
||||||
|
const errorResponse = JSON.stringify({
|
||||||
|
jsonrpc: '2.0',
|
||||||
|
id: requestId,
|
||||||
|
error: { code: -32603, message: `Bridge error: ${errMsg}` },
|
||||||
|
});
|
||||||
|
stdout.write(errorResponse + '\n');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -347,7 +347,7 @@ describe('MCP STDIO Bridge', () => {
|
|||||||
expect(recorded.filter((r) => r.method === 'DELETE')).toHaveLength(0);
|
expect(recorded.filter((r) => r.method === 'DELETE')).toHaveLength(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('writes errors to stderr, not stdout', async () => {
|
it('writes errors to stderr and sends JSON-RPC error to stdout', async () => {
|
||||||
recorded.length = 0;
|
recorded.length = 0;
|
||||||
const stdin = new Readable({ read() {} });
|
const stdin = new Readable({ read() {} });
|
||||||
const { stdout, stdoutChunks, stderr, stderrChunks } = createMockStreams();
|
const { stdout, stdoutChunks, stderr, stderrChunks } = createMockStreams();
|
||||||
@@ -364,8 +364,12 @@ describe('MCP STDIO Bridge', () => {
|
|||||||
|
|
||||||
// Error should be on stderr
|
// Error should be on stderr
|
||||||
expect(stderrChunks.join('')).toContain('MCP bridge error');
|
expect(stderrChunks.join('')).toContain('MCP bridge error');
|
||||||
// stdout should be empty (no corrupted output)
|
// stdout should contain a JSON-RPC error response so the client doesn't hang
|
||||||
expect(stdoutChunks.join('')).toBe('');
|
const out = stdoutChunks.join('');
|
||||||
|
const parsed = JSON.parse(out.trim()) as { id: number; error: { code: number; message: string } };
|
||||||
|
expect(parsed.id).toBe(1);
|
||||||
|
expect(parsed.error.code).toBe(-32603);
|
||||||
|
expect(parsed.error.message).toContain('Bridge error');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('skips blank lines in stdin', async () => {
|
it('skips blank lines in stdin', async () => {
|
||||||
|
|||||||
@@ -20,24 +20,29 @@ export class ConnectionError extends Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Default timeout for mcpd requests (ms). Prevents indefinite hangs on slow upstream tool calls. */
|
||||||
|
const DEFAULT_TIMEOUT_MS = 30_000;
|
||||||
|
|
||||||
export class McpdClient {
|
export class McpdClient {
|
||||||
private readonly baseUrl: string;
|
private readonly baseUrl: string;
|
||||||
private readonly token: string;
|
private readonly token: string;
|
||||||
private readonly extraHeaders: Record<string, string>;
|
private readonly extraHeaders: Record<string, string>;
|
||||||
|
private readonly timeoutMs: number;
|
||||||
|
|
||||||
constructor(baseUrl: string, token: string, extraHeaders?: Record<string, string>) {
|
constructor(baseUrl: string, token: string, extraHeaders?: Record<string, string>, timeoutMs?: number) {
|
||||||
// Strip trailing slash for consistent URL joining
|
// Strip trailing slash for consistent URL joining
|
||||||
this.baseUrl = baseUrl.replace(/\/+$/, '');
|
this.baseUrl = baseUrl.replace(/\/+$/, '');
|
||||||
this.token = token;
|
this.token = token;
|
||||||
this.extraHeaders = extraHeaders ?? {};
|
this.extraHeaders = extraHeaders ?? {};
|
||||||
|
this.timeoutMs = timeoutMs ?? DEFAULT_TIMEOUT_MS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new client with additional default headers.
|
* Create a new client with additional default headers.
|
||||||
* Inherits base URL and token from the current client.
|
* Inherits base URL, token, and timeout from the current client.
|
||||||
*/
|
*/
|
||||||
withHeaders(headers: Record<string, string>): McpdClient {
|
withHeaders(headers: Record<string, string>): McpdClient {
|
||||||
return new McpdClient(this.baseUrl, this.token, { ...this.extraHeaders, ...headers });
|
return new McpdClient(this.baseUrl, this.token, { ...this.extraHeaders, ...headers }, this.timeoutMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
async get<T>(path: string): Promise<T> {
|
async get<T>(path: string): Promise<T> {
|
||||||
@@ -77,7 +82,11 @@ export class McpdClient {
|
|||||||
'Accept': 'application/json',
|
'Accept': 'application/json',
|
||||||
};
|
};
|
||||||
|
|
||||||
const init: RequestInit = { method, headers };
|
const init: RequestInit = {
|
||||||
|
method,
|
||||||
|
headers,
|
||||||
|
signal: AbortSignal.timeout(this.timeoutMs),
|
||||||
|
};
|
||||||
if (body !== undefined && body !== null && method !== 'GET' && method !== 'HEAD') {
|
if (body !== undefined && body !== null && method !== 'GET' && method !== 'HEAD') {
|
||||||
headers['Content-Type'] = 'application/json';
|
headers['Content-Type'] = 'application/json';
|
||||||
init.body = JSON.stringify(body);
|
init.body = JSON.stringify(body);
|
||||||
@@ -87,6 +96,9 @@ export class McpdClient {
|
|||||||
try {
|
try {
|
||||||
res = await fetch(url, init);
|
res = await fetch(url, init);
|
||||||
} catch (err: unknown) {
|
} catch (err: unknown) {
|
||||||
|
if (err instanceof DOMException && err.name === 'TimeoutError') {
|
||||||
|
throw new ConnectionError(this.baseUrl, new Error(`Request timed out after ${this.timeoutMs}ms`));
|
||||||
|
}
|
||||||
throw new ConnectionError(this.baseUrl, err);
|
throw new ConnectionError(this.baseUrl, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ export interface ManagedVllmStatus {
|
|||||||
|
|
||||||
const POLL_INTERVAL_MS = 2000;
|
const POLL_INTERVAL_MS = 2000;
|
||||||
const STARTUP_TIMEOUT_MS = 120_000;
|
const STARTUP_TIMEOUT_MS = 120_000;
|
||||||
|
/** After entering error state, wait this long before retrying startup. */
|
||||||
|
const ERROR_COOLDOWN_MS = 60_000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Managed vLLM provider — spawns and manages a local vLLM process.
|
* Managed vLLM provider — spawns and manages a local vLLM process.
|
||||||
@@ -54,6 +56,7 @@ export class ManagedVllmProvider implements LlmProvider {
|
|||||||
private lastError: string | null = null;
|
private lastError: string | null = null;
|
||||||
private lastUsed = 0;
|
private lastUsed = 0;
|
||||||
private startedAt = 0;
|
private startedAt = 0;
|
||||||
|
private errorAt = 0;
|
||||||
private idleTimer: ReturnType<typeof setInterval> | null = null;
|
private idleTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
private startPromise: Promise<void> | null = null;
|
private startPromise: Promise<void> | null = null;
|
||||||
|
|
||||||
@@ -140,6 +143,11 @@ export class ManagedVllmProvider implements LlmProvider {
|
|||||||
return this.startPromise;
|
return this.startPromise;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fast-fail if we recently errored — don't retry startup on every call
|
||||||
|
if (this.state === 'error' && (Date.now() - this.errorAt) < ERROR_COOLDOWN_MS) {
|
||||||
|
throw new Error(this.lastError ?? 'vLLM in error state (cooldown active)');
|
||||||
|
}
|
||||||
|
|
||||||
this.startPromise = this.doStart();
|
this.startPromise = this.doStart();
|
||||||
try {
|
try {
|
||||||
await this.startPromise;
|
await this.startPromise;
|
||||||
@@ -215,6 +223,7 @@ export class ManagedVllmProvider implements LlmProvider {
|
|||||||
}
|
}
|
||||||
this.killProcess();
|
this.killProcess();
|
||||||
this.state = 'error';
|
this.state = 'error';
|
||||||
|
this.errorAt = Date.now();
|
||||||
throw new Error(this.lastError);
|
throw new Error(this.lastError);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -243,6 +252,7 @@ export class ManagedVllmProvider implements LlmProvider {
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (this.state === 'starting') {
|
if (this.state === 'starting') {
|
||||||
this.state = 'error';
|
this.state = 'error';
|
||||||
|
this.errorAt = Date.now();
|
||||||
this.lastError = (err as Error).message;
|
this.lastError = (err as Error).message;
|
||||||
}
|
}
|
||||||
throw err;
|
throw err;
|
||||||
|
|||||||
@@ -252,8 +252,10 @@ export class McpRouter {
|
|||||||
async discoverTools(correlationId?: string): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
|
async discoverTools(correlationId?: string): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
|
||||||
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
|
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
|
||||||
|
|
||||||
for (const [serverName, upstream] of this.upstreams) {
|
// Discover tools from all servers in parallel so one slow server doesn't block the rest
|
||||||
try {
|
const entries = [...this.upstreams.entries()];
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
entries.map(async ([serverName, upstream]) => {
|
||||||
const req = {
|
const req = {
|
||||||
jsonrpc: '2.0' as const,
|
jsonrpc: '2.0' as const,
|
||||||
id: `discover-tools-${serverName}`,
|
id: `discover-tools-${serverName}`,
|
||||||
@@ -268,6 +270,16 @@ export class McpRouter {
|
|||||||
} else {
|
} else {
|
||||||
response = await upstream.send(req);
|
response = await upstream.send(req);
|
||||||
}
|
}
|
||||||
|
return { serverName, upstream, response };
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const result of results) {
|
||||||
|
if (result.status === 'rejected') {
|
||||||
|
console.warn(`[discoverTools] ${(result.reason as Error).message ?? 'unknown error'}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const { serverName, upstream, response } = result.value;
|
||||||
|
|
||||||
if (response.error) {
|
if (response.error) {
|
||||||
console.warn(`[discoverTools] ${serverName}: ${(response.error as { message?: string }).message ?? 'unknown error'}`);
|
console.warn(`[discoverTools] ${serverName}: ${(response.error as { message?: string }).message ?? 'unknown error'}`);
|
||||||
@@ -290,9 +302,6 @@ export class McpRouter {
|
|||||||
allTools.push(entry);
|
allTools.push(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
|
||||||
console.warn(`[discoverTools] ${serverName}: ${err instanceof Error ? err.message : err}`);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return allTools;
|
return allTools;
|
||||||
@@ -304,8 +313,10 @@ export class McpRouter {
|
|||||||
async discoverResources(correlationId?: string): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> {
|
async discoverResources(correlationId?: string): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> {
|
||||||
const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = [];
|
const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = [];
|
||||||
|
|
||||||
for (const [serverName, upstream] of this.upstreams) {
|
// Discover resources from all servers in parallel
|
||||||
try {
|
const entries = [...this.upstreams.entries()];
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
entries.map(async ([serverName, upstream]) => {
|
||||||
const req = {
|
const req = {
|
||||||
jsonrpc: '2.0' as const,
|
jsonrpc: '2.0' as const,
|
||||||
id: `discover-resources-${serverName}`,
|
id: `discover-resources-${serverName}`,
|
||||||
@@ -320,6 +331,13 @@ export class McpRouter {
|
|||||||
} else {
|
} else {
|
||||||
response = await upstream.send(req);
|
response = await upstream.send(req);
|
||||||
}
|
}
|
||||||
|
return { serverName, response };
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const result of results) {
|
||||||
|
if (result.status === 'rejected') continue;
|
||||||
|
const { serverName, response } = result.value;
|
||||||
|
|
||||||
if (response.result && typeof response.result === 'object' && 'resources' in response.result) {
|
if (response.result && typeof response.result === 'object' && 'resources' in response.result) {
|
||||||
const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources;
|
const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources;
|
||||||
@@ -332,9 +350,6 @@ export class McpRouter {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch {
|
|
||||||
// Server may be unavailable; skip its resources
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return allResources;
|
return allResources;
|
||||||
|
|||||||
168
src/mcplocal/tests/mcpd-client.test.ts
Normal file
168
src/mcplocal/tests/mcpd-client.test.ts
Normal file
@@ -0,0 +1,168 @@
|
|||||||
|
import { describe, it, expect, afterAll, afterEach } from 'vitest';
|
||||||
|
import http from 'node:http';
|
||||||
|
import { McpdClient, ConnectionError } from '../src/http/mcpd-client.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a local HTTP server for testing McpdClient behavior.
|
||||||
|
* Returns the server and its URL.
|
||||||
|
*/
|
||||||
|
function createTestServer(
|
||||||
|
handler: (req: http.IncomingMessage, res: http.ServerResponse) => void,
|
||||||
|
): Promise<{ server: http.Server; url: string }> {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
const server = http.createServer(handler);
|
||||||
|
server.listen(0, '127.0.0.1', () => {
|
||||||
|
const addr = server.address() as { port: number };
|
||||||
|
resolve({ server, url: `http://127.0.0.1:${addr.port}` });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('McpdClient', () => {
|
||||||
|
const servers: http.Server[] = [];
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
for (const s of servers) s.close();
|
||||||
|
servers.length = 0;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(() => {
|
||||||
|
for (const s of servers) s.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('makes GET requests with auth header', async () => {
|
||||||
|
let capturedAuth = '';
|
||||||
|
const { server, url } = await createTestServer((req, res) => {
|
||||||
|
capturedAuth = req.headers['authorization'] ?? '';
|
||||||
|
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||||
|
res.end(JSON.stringify({ ok: true }));
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
const client = new McpdClient(url, 'my-token');
|
||||||
|
const result = await client.get<{ ok: boolean }>('/api/v1/test');
|
||||||
|
|
||||||
|
expect(result).toEqual({ ok: true });
|
||||||
|
expect(capturedAuth).toBe('Bearer my-token');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('makes POST requests with JSON body', async () => {
|
||||||
|
let capturedBody = '';
|
||||||
|
const { server, url } = await createTestServer((req, res) => {
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
req.on('data', (c: Buffer) => chunks.push(c));
|
||||||
|
req.on('end', () => {
|
||||||
|
capturedBody = Buffer.concat(chunks).toString();
|
||||||
|
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||||
|
res.end(JSON.stringify({ received: true }));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
const client = new McpdClient(url, 'tok');
|
||||||
|
const result = await client.post<{ received: boolean }>('/api/v1/proxy', { serverId: 's1' });
|
||||||
|
|
||||||
|
expect(result).toEqual({ received: true });
|
||||||
|
expect(JSON.parse(capturedBody)).toEqual({ serverId: 's1' });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('throws ConnectionError on connection refused', async () => {
|
||||||
|
const client = new McpdClient('http://127.0.0.1:1', 'tok');
|
||||||
|
|
||||||
|
await expect(client.get('/test')).rejects.toThrow(ConnectionError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('throws on 4xx/5xx responses', async () => {
|
||||||
|
const { server, url } = await createTestServer((_req, res) => {
|
||||||
|
res.writeHead(500, { 'Content-Type': 'application/json' });
|
||||||
|
res.end(JSON.stringify({ error: 'internal' }));
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
const client = new McpdClient(url, 'tok');
|
||||||
|
await expect(client.get('/test')).rejects.toThrow(/mcpd returned 500/);
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Timeout behavior ──
|
||||||
|
|
||||||
|
it('times out on slow responses and throws ConnectionError', async () => {
|
||||||
|
const { server, url } = await createTestServer((_req, _res) => {
|
||||||
|
// Never respond — simulates a hanging upstream tool call
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
// Use a very short timeout for the test
|
||||||
|
const client = new McpdClient(url, 'tok', undefined, 500);
|
||||||
|
|
||||||
|
const start = Date.now();
|
||||||
|
await expect(client.post('/api/v1/mcp/proxy', { serverId: 's1' })).rejects.toThrow(
|
||||||
|
/timed out/,
|
||||||
|
);
|
||||||
|
const elapsed = Date.now() - start;
|
||||||
|
|
||||||
|
// Should have timed out around 500ms, not hung for seconds
|
||||||
|
expect(elapsed).toBeGreaterThanOrEqual(450);
|
||||||
|
expect(elapsed).toBeLessThan(3000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('timeout error is a ConnectionError with descriptive message', async () => {
|
||||||
|
const { server, url } = await createTestServer((_req, _res) => {
|
||||||
|
// Never respond
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
const client = new McpdClient(url, 'tok', undefined, 200);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.get('/test');
|
||||||
|
expect.unreachable('Should have thrown');
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).toBeInstanceOf(ConnectionError);
|
||||||
|
expect((err as Error).message).toContain('Request timed out after 200ms');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fast responses succeed within the timeout window', async () => {
|
||||||
|
const { server, url } = await createTestServer((_req, res) => {
|
||||||
|
// Respond immediately
|
||||||
|
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||||
|
res.end(JSON.stringify({ fast: true }));
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
// Short timeout, but response is immediate — should work
|
||||||
|
const client = new McpdClient(url, 'tok', undefined, 500);
|
||||||
|
const result = await client.get<{ fast: boolean }>('/test');
|
||||||
|
expect(result).toEqual({ fast: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('withHeaders preserves timeout', async () => {
|
||||||
|
const { server, url } = await createTestServer((_req, _res) => {
|
||||||
|
// Never respond
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
const client = new McpdClient(url, 'tok', undefined, 300);
|
||||||
|
const derived = client.withHeaders({ 'X-Custom': 'val' });
|
||||||
|
|
||||||
|
const start = Date.now();
|
||||||
|
await expect(derived.get('/test')).rejects.toThrow(/timed out/);
|
||||||
|
const elapsed = Date.now() - start;
|
||||||
|
expect(elapsed).toBeLessThan(2000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('default timeout is 30 seconds', async () => {
|
||||||
|
// We can't wait 30s in a test, but we can verify the error message format
|
||||||
|
// when a custom timeout is not set. Use a fast-failing server instead.
|
||||||
|
const { server, url } = await createTestServer((_req, res) => {
|
||||||
|
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||||
|
res.end(JSON.stringify({ ok: true }));
|
||||||
|
});
|
||||||
|
servers.push(server);
|
||||||
|
|
||||||
|
// Default constructor — should work for fast responses
|
||||||
|
const client = new McpdClient(url, 'tok');
|
||||||
|
const result = await client.get<{ ok: boolean }>('/test');
|
||||||
|
expect(result).toEqual({ ok: true });
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -157,6 +157,45 @@ describe('McpRouter', () => {
|
|||||||
expect(result.tools).toHaveLength(1);
|
expect(result.tools).toHaveLength(1);
|
||||||
expect(result.tools[0]?.name).toBe('working/do_thing');
|
expect(result.tools[0]?.name).toBe('working/do_thing');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('slow upstream does not block fast upstreams (parallel discovery)', async () => {
|
||||||
|
// Simulate a server that takes 5s to respond to tools/list
|
||||||
|
const slowUpstream = mockUpstream('slow-server', {
|
||||||
|
tools: [{ name: 'slow_tool' }],
|
||||||
|
});
|
||||||
|
vi.mocked(slowUpstream.send).mockImplementation(
|
||||||
|
() => new Promise((resolve) => setTimeout(() => resolve({
|
||||||
|
jsonrpc: '2.0' as const,
|
||||||
|
id: 'delayed',
|
||||||
|
result: { tools: [{ name: 'slow_tool' }] },
|
||||||
|
}), 5000)),
|
||||||
|
);
|
||||||
|
|
||||||
|
const fastUpstream = mockUpstream('fast-server', {
|
||||||
|
tools: [{ name: 'fast_tool', description: 'Responds instantly' }],
|
||||||
|
});
|
||||||
|
|
||||||
|
router.addUpstream(slowUpstream);
|
||||||
|
router.addUpstream(fastUpstream);
|
||||||
|
|
||||||
|
const start = Date.now();
|
||||||
|
const res = await router.route({
|
||||||
|
jsonrpc: '2.0',
|
||||||
|
id: 1,
|
||||||
|
method: 'tools/list',
|
||||||
|
});
|
||||||
|
const elapsed = Date.now() - start;
|
||||||
|
|
||||||
|
const result = res.result as { tools: Array<{ name: string }> };
|
||||||
|
// Both servers' tools should be present (parallel, not sequential)
|
||||||
|
expect(result.tools).toHaveLength(2);
|
||||||
|
expect(result.tools.map((t) => t.name)).toContain('fast-server/fast_tool');
|
||||||
|
expect(result.tools.map((t) => t.name)).toContain('slow-server/slow_tool');
|
||||||
|
// Should complete in ~5s (parallel), not ~5s + fast (sequential wouldn't matter here)
|
||||||
|
// but critically, if this were sequential with a truly hanging server, it would never complete.
|
||||||
|
// The key assertion: it took roughly the slow server's time, not slow + fast.
|
||||||
|
expect(elapsed).toBeLessThan(7000);
|
||||||
|
}, 10_000);
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('tools/call', () => {
|
describe('tools/call', () => {
|
||||||
|
|||||||
289
src/mcplocal/tests/smoke/backup-and-servers.test.ts
Normal file
289
src/mcplocal/tests/smoke/backup-and-servers.test.ts
Normal file
@@ -0,0 +1,289 @@
|
|||||||
|
/**
|
||||||
|
* Smoke tests: Backup completeness + server type coverage.
|
||||||
|
*
|
||||||
|
* These tests verify that:
|
||||||
|
* 1. Backup includes ALL fields (runtime, command, containerPort, prompts, templates)
|
||||||
|
* 2. All server types work via MCP proxy (STDIO, SSE, docker-image)
|
||||||
|
* 3. Instance status reflects actual container state
|
||||||
|
*
|
||||||
|
* Prerequisites:
|
||||||
|
* - mcplocal running on localhost:3200
|
||||||
|
* - mcpd running (k8s or Portainer)
|
||||||
|
* - At least one server of each type deployed
|
||||||
|
*/
|
||||||
|
import { describe, it, expect, beforeAll } from 'vitest';
|
||||||
|
import http from 'node:http';
|
||||||
|
import https from 'node:https';
|
||||||
|
import { existsSync, readFileSync } from 'node:fs';
|
||||||
|
import { join } from 'node:path';
|
||||||
|
import { homedir } from 'node:os';
|
||||||
|
|
||||||
|
// Load mcpd URL and token from config
|
||||||
|
const CONFIG_PATH = join(homedir(), '.mcpctl', 'config.json');
|
||||||
|
const CREDS_PATH = join(homedir(), '.mcpctl', 'credentials');
|
||||||
|
|
||||||
|
function loadConfig(): { mcpdUrl: string; token: string } {
|
||||||
|
let mcpdUrl = 'http://localhost:3100';
|
||||||
|
let token = '';
|
||||||
|
try {
|
||||||
|
if (existsSync(CONFIG_PATH)) {
|
||||||
|
const cfg = JSON.parse(readFileSync(CONFIG_PATH, 'utf-8')) as { mcpdUrl?: string };
|
||||||
|
if (cfg.mcpdUrl) mcpdUrl = cfg.mcpdUrl;
|
||||||
|
}
|
||||||
|
if (existsSync(CREDS_PATH)) {
|
||||||
|
const creds = JSON.parse(readFileSync(CREDS_PATH, 'utf-8')) as { token?: string };
|
||||||
|
if (creds.token) token = creds.token;
|
||||||
|
}
|
||||||
|
} catch { /* use defaults */ }
|
||||||
|
return { mcpdUrl, token };
|
||||||
|
}
|
||||||
|
|
||||||
|
const { mcpdUrl, token } = loadConfig();
|
||||||
|
|
||||||
|
function mcpdRequest<T>(method: string, path: string, body?: unknown): Promise<{ status: number; data: T }> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const url = new URL(path, mcpdUrl);
|
||||||
|
const isHttps = url.protocol === 'https:';
|
||||||
|
const transport = isHttps ? https : http;
|
||||||
|
|
||||||
|
const headers: Record<string, string> = { Accept: 'application/json' };
|
||||||
|
if (body !== undefined) headers['Content-Type'] = 'application/json';
|
||||||
|
if (token) headers['Authorization'] = `Bearer ${token}`;
|
||||||
|
const bodyStr = body !== undefined ? JSON.stringify(body) : undefined;
|
||||||
|
if (bodyStr) headers['Content-Length'] = String(Buffer.byteLength(bodyStr));
|
||||||
|
|
||||||
|
const req = transport.request(url, {
|
||||||
|
method,
|
||||||
|
timeout: 30_000,
|
||||||
|
headers,
|
||||||
|
rejectUnauthorized: false,
|
||||||
|
}, (res) => {
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
res.on('data', (chunk: Buffer) => chunks.push(chunk));
|
||||||
|
res.on('end', () => {
|
||||||
|
const raw = Buffer.concat(chunks).toString();
|
||||||
|
try {
|
||||||
|
resolve({ status: res.statusCode ?? 500, data: raw ? JSON.parse(raw) as T : (undefined as T) });
|
||||||
|
} catch {
|
||||||
|
resolve({ status: res.statusCode ?? 500, data: raw as unknown as T });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
req.on('error', reject);
|
||||||
|
req.on('timeout', () => { req.destroy(); reject(new Error('Request timeout')); });
|
||||||
|
if (bodyStr) req.write(bodyStr);
|
||||||
|
req.end();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
interface BackupBundle {
|
||||||
|
servers: Array<{
|
||||||
|
name: string;
|
||||||
|
runtime: string | null;
|
||||||
|
command: unknown;
|
||||||
|
containerPort: number | null;
|
||||||
|
replicas: number;
|
||||||
|
transport: string;
|
||||||
|
dockerImage: string | null;
|
||||||
|
packageName: string | null;
|
||||||
|
env: unknown;
|
||||||
|
healthCheck: unknown;
|
||||||
|
externalUrl: string | null;
|
||||||
|
}>;
|
||||||
|
prompts: Array<{ name: string; projectName: string | null; content: string }>;
|
||||||
|
templates: Array<{ name: string; transport: string }>;
|
||||||
|
secrets: unknown[];
|
||||||
|
projects: unknown[];
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Server {
|
||||||
|
id: string;
|
||||||
|
name: string;
|
||||||
|
transport: string;
|
||||||
|
dockerImage: string | null;
|
||||||
|
packageName: string | null;
|
||||||
|
runtime: string | null;
|
||||||
|
command: string[] | null;
|
||||||
|
containerPort: number | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Instance {
|
||||||
|
id: string;
|
||||||
|
serverId: string;
|
||||||
|
containerId: string | null;
|
||||||
|
status: string;
|
||||||
|
server: { name: string };
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ProxyResult {
|
||||||
|
result?: { tools?: Array<{ name: string }> };
|
||||||
|
error?: { code: number; message: string };
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('Smoke: Backup completeness', () => {
|
||||||
|
let available = false;
|
||||||
|
let bundle: BackupBundle;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
try {
|
||||||
|
const res = await mcpdRequest<{ status: string }>('GET', '/healthz');
|
||||||
|
available = res.status === 200;
|
||||||
|
} catch {
|
||||||
|
available = false;
|
||||||
|
}
|
||||||
|
if (!available) return;
|
||||||
|
|
||||||
|
const res = await mcpdRequest<BackupBundle>('POST', '/api/v1/backup', {});
|
||||||
|
bundle = res.data;
|
||||||
|
}, 30_000);
|
||||||
|
|
||||||
|
it('skips if mcpd not reachable', () => {
|
||||||
|
if (!available) console.log('SKIP: mcpd not reachable');
|
||||||
|
expect(true).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('backup includes prompts', () => {
|
||||||
|
if (!available) return;
|
||||||
|
expect(bundle.prompts).toBeDefined();
|
||||||
|
expect(bundle.prompts.length).toBeGreaterThan(0);
|
||||||
|
console.log(` ${bundle.prompts.length} prompts in backup`);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('backup includes templates', () => {
|
||||||
|
if (!available) return;
|
||||||
|
expect(bundle.templates).toBeDefined();
|
||||||
|
expect(bundle.templates.length).toBeGreaterThan(0);
|
||||||
|
console.log(` ${bundle.templates.length} templates in backup`);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('backup servers have runtime field', () => {
|
||||||
|
if (!available) return;
|
||||||
|
// Python servers must have runtime=python
|
||||||
|
const pythonServers = bundle.servers.filter((s) =>
|
||||||
|
s.packageName?.includes('aws-documentation') || s.packageName?.includes('awslabs'),
|
||||||
|
);
|
||||||
|
for (const s of pythonServers) {
|
||||||
|
expect(s.runtime, `${s.name} should have runtime=python`).toBe('python');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('backup servers have command field for docker-image STDIO servers', () => {
|
||||||
|
if (!available) return;
|
||||||
|
const dockerStdio = bundle.servers.filter((s) => s.dockerImage && s.transport === 'STDIO');
|
||||||
|
for (const s of dockerStdio) {
|
||||||
|
expect(s.command, `${s.name} (dockerImage STDIO) should have command`).toBeTruthy();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('backup SSE servers have containerPort', () => {
|
||||||
|
if (!available) return;
|
||||||
|
const sseServers = bundle.servers.filter((s) => s.transport === 'SSE');
|
||||||
|
for (const s of sseServers) {
|
||||||
|
expect(s.containerPort, `${s.name} (SSE) should have containerPort`).toBeGreaterThan(0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('backup servers have replicas field', () => {
|
||||||
|
if (!available) return;
|
||||||
|
for (const s of bundle.servers) {
|
||||||
|
expect(typeof s.replicas, `${s.name} should have numeric replicas`).toBe('number');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('Smoke: Server type proxy coverage', () => {
|
||||||
|
let available = false;
|
||||||
|
let servers: Server[];
|
||||||
|
let instances: Instance[];
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
try {
|
||||||
|
const res = await mcpdRequest<{ status: string }>('GET', '/healthz');
|
||||||
|
available = res.status === 200;
|
||||||
|
} catch {
|
||||||
|
available = false;
|
||||||
|
}
|
||||||
|
if (!available) return;
|
||||||
|
|
||||||
|
servers = (await mcpdRequest<Server[]>('GET', '/api/v1/servers')).data;
|
||||||
|
instances = (await mcpdRequest<Instance[]>('GET', '/api/v1/instances')).data;
|
||||||
|
}, 30_000);
|
||||||
|
|
||||||
|
it('skips if mcpd not reachable', () => {
|
||||||
|
if (!available) console.log('SKIP: mcpd not reachable');
|
||||||
|
expect(true).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('SSE server returns tools via proxy', async () => {
|
||||||
|
if (!available) return;
|
||||||
|
const sseServer = servers.find((s) => s.transport === 'SSE');
|
||||||
|
if (!sseServer) { console.log(' SKIP: no SSE server'); return; }
|
||||||
|
|
||||||
|
const running = instances.find((i) => i.serverId === sseServer.id && (i.status === 'RUNNING' || i.status === 'STARTING'));
|
||||||
|
if (!running) { console.log(` SKIP: ${sseServer.name} has no running instance`); return; }
|
||||||
|
|
||||||
|
const res = await mcpdRequest<ProxyResult>('POST', '/api/v1/mcp/proxy', {
|
||||||
|
serverId: sseServer.id,
|
||||||
|
method: 'tools/list',
|
||||||
|
});
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(res.data.result?.tools?.length, `${sseServer.name} should have tools`).toBeGreaterThan(0);
|
||||||
|
console.log(` ${sseServer.name} (SSE): ${res.data.result?.tools?.length} tools`);
|
||||||
|
}, 30_000);
|
||||||
|
|
||||||
|
it('docker-image STDIO server returns tools via proxy', async () => {
|
||||||
|
if (!available) return;
|
||||||
|
const dockerStdio = servers.find((s) => s.transport === 'STDIO' && s.dockerImage && !s.packageName);
|
||||||
|
if (!dockerStdio) { console.log(' SKIP: no docker-image STDIO server'); return; }
|
||||||
|
|
||||||
|
const running = instances.find((i) => i.serverId === dockerStdio.id && (i.status === 'RUNNING' || i.status === 'STARTING'));
|
||||||
|
if (!running) { console.log(` SKIP: ${dockerStdio.name} has no running instance`); return; }
|
||||||
|
|
||||||
|
const res = await mcpdRequest<ProxyResult>('POST', '/api/v1/mcp/proxy', {
|
||||||
|
serverId: dockerStdio.id,
|
||||||
|
method: 'tools/list',
|
||||||
|
});
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(res.data.result?.tools?.length, `${dockerStdio.name} should have tools`).toBeGreaterThan(0);
|
||||||
|
console.log(` ${dockerStdio.name} (docker STDIO): ${res.data.result?.tools?.length} tools`);
|
||||||
|
}, 60_000);
|
||||||
|
|
||||||
|
it('package STDIO server returns tools via proxy', async () => {
|
||||||
|
if (!available) return;
|
||||||
|
const pkgStdio = servers.find((s) => s.transport === 'STDIO' && s.packageName && !s.dockerImage);
|
||||||
|
if (!pkgStdio) { console.log(' SKIP: no package STDIO server'); return; }
|
||||||
|
|
||||||
|
const running = instances.find((i) => i.serverId === pkgStdio.id && (i.status === 'RUNNING' || i.status === 'STARTING'));
|
||||||
|
if (!running) { console.log(` SKIP: ${pkgStdio.name} has no running instance`); return; }
|
||||||
|
|
||||||
|
const res = await mcpdRequest<ProxyResult>('POST', '/api/v1/mcp/proxy', {
|
||||||
|
serverId: pkgStdio.id,
|
||||||
|
method: 'tools/list',
|
||||||
|
});
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(res.data.result?.tools?.length, `${pkgStdio.name} should have tools`).toBeGreaterThan(0);
|
||||||
|
console.log(` ${pkgStdio.name} (package STDIO): ${res.data.result?.tools?.length} tools`);
|
||||||
|
}, 60_000);
|
||||||
|
|
||||||
|
it('all running instances have actual running containers', async () => {
|
||||||
|
if (!available) return;
|
||||||
|
const runningInstances = instances.filter((i) => i.status === 'RUNNING' && i.containerId);
|
||||||
|
expect(runningInstances.length).toBeGreaterThan(0);
|
||||||
|
|
||||||
|
for (const inst of runningInstances) {
|
||||||
|
// Verify the proxy can actually reach the container
|
||||||
|
const server = servers.find((s) => s.id === inst.serverId);
|
||||||
|
if (!server) continue;
|
||||||
|
|
||||||
|
// Quick health check: try tools/list (should not 500)
|
||||||
|
const res = await mcpdRequest<ProxyResult>('POST', '/api/v1/mcp/proxy', {
|
||||||
|
serverId: server.id,
|
||||||
|
method: 'tools/list',
|
||||||
|
});
|
||||||
|
expect(
|
||||||
|
res.status,
|
||||||
|
`${server.name} instance claims RUNNING but proxy returned ${res.status}`,
|
||||||
|
).not.toBe(500);
|
||||||
|
}
|
||||||
|
}, 120_000);
|
||||||
|
});
|
||||||
@@ -294,4 +294,40 @@ describe('ManagedVllmProvider', () => {
|
|||||||
provider.dispose();
|
provider.dispose();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('error cooldown', () => {
|
||||||
|
it('fast-fails during cooldown instead of retrying startup', async () => {
|
||||||
|
const { provider, fakeProcess, healthCheckFn, spawnFn } = createProvider();
|
||||||
|
healthCheckFn.mockResolvedValue(false);
|
||||||
|
|
||||||
|
// First attempt: triggers startup, process exits with error
|
||||||
|
const p1 = (provider as unknown as { ensureRunning(): Promise<void> }).ensureRunning();
|
||||||
|
p1.catch(() => {});
|
||||||
|
(fakeProcess as Record<string, unknown>).exitCode = 1;
|
||||||
|
fakeProcess._emit('exit', 1);
|
||||||
|
await vi.advanceTimersByTimeAsync(2100);
|
||||||
|
await expect(p1).rejects.toThrow();
|
||||||
|
expect(provider.getStatus().state).toBe('error');
|
||||||
|
expect(spawnFn).toHaveBeenCalledOnce();
|
||||||
|
|
||||||
|
// Second attempt within cooldown: should throw immediately without spawning
|
||||||
|
await expect(
|
||||||
|
(provider as unknown as { ensureRunning(): Promise<void> }).ensureRunning(),
|
||||||
|
).rejects.toThrow();
|
||||||
|
expect(spawnFn).toHaveBeenCalledOnce(); // no extra spawn
|
||||||
|
|
||||||
|
// After cooldown (60s): should retry
|
||||||
|
const newProc = createFakeProcess();
|
||||||
|
spawnFn.mockReturnValue(newProc);
|
||||||
|
healthCheckFn.mockResolvedValue(true);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(60_000);
|
||||||
|
const p3 = (provider as unknown as { ensureRunning(): Promise<void> }).ensureRunning();
|
||||||
|
await vi.advanceTimersByTimeAsync(2100);
|
||||||
|
await p3;
|
||||||
|
|
||||||
|
expect(spawnFn).toHaveBeenCalledTimes(2);
|
||||||
|
expect(provider.getStatus().state).toBe('running');
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user