fix: MCP proxy resilience — discovery cache, default liveness probes
Some checks failed
Some checks failed
Adds a per-server tools/list cache in McpRouter (positive + negative TTL) so a slow or dead upstream only stalls the first discovery call, not every subsequent client request. Invalidated on upstream add/remove. Health probes now apply a default liveness spec (tools/list via the real production path) to any RUNNING instance without an explicit healthCheck, so synthetic and real failures converge on the same signal. Includes supporting updates in mcpd-client, discovery, upstream/mcpd, seeder, and fulldeploy/release scripts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -505,12 +505,15 @@ async function main(): Promise<void> {
|
||||
}
|
||||
}, RECONCILE_INTERVAL_MS);
|
||||
|
||||
// Health probe runner — periodic MCP tool-call probes (like k8s livenessProbe)
|
||||
// Health probe runner — periodic MCP probes (like k8s livenessProbe).
|
||||
// Without explicit healthCheck.tool, probes send tools/list through
|
||||
// McpProxyService so they traverse the exact production call path.
|
||||
const healthProbeRunner = new HealthProbeRunner(
|
||||
instanceRepo,
|
||||
serverRepo,
|
||||
orchestrator,
|
||||
{ info: (msg) => app.log.info(msg), error: (obj, msg) => app.log.error(obj, msg) },
|
||||
mcpProxyService,
|
||||
);
|
||||
healthProbeRunner.start(15_000);
|
||||
|
||||
|
||||
@@ -1,15 +1,24 @@
|
||||
import type { McpServer, McpInstance } from '@prisma/client';
|
||||
import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js';
|
||||
import type { McpOrchestrator } from './orchestrator.js';
|
||||
import type { McpProxyService } from './mcp-proxy-service.js';
|
||||
|
||||
export interface HealthCheckSpec {
|
||||
tool: string;
|
||||
/** When set, probe sends initialize + tools/call (readiness). When omitted, probe sends tools/list only (liveness). */
|
||||
tool?: string;
|
||||
arguments?: Record<string, unknown>;
|
||||
intervalSeconds?: number;
|
||||
timeoutSeconds?: number;
|
||||
failureThreshold?: number;
|
||||
}
|
||||
|
||||
/** Default liveness probe applied to any RUNNING instance whose server has no explicit healthCheck. */
|
||||
export const DEFAULT_HEALTH_CHECK: HealthCheckSpec = {
|
||||
intervalSeconds: 30,
|
||||
timeoutSeconds: 8,
|
||||
failureThreshold: 3,
|
||||
};
|
||||
|
||||
export interface ProbeResult {
|
||||
healthy: boolean;
|
||||
latencyMs: number;
|
||||
@@ -39,6 +48,8 @@ export class HealthProbeRunner {
|
||||
private serverRepo: IMcpServerRepository,
|
||||
private orchestrator: McpOrchestrator,
|
||||
private logger?: { info: (msg: string) => void; error: (obj: unknown, msg: string) => void },
|
||||
/** Used for liveness probes (no explicit tool) — routes tools/list through the real production path. */
|
||||
private mcpProxyService?: McpProxyService,
|
||||
) {}
|
||||
|
||||
/** Start the periodic probe loop. Runs every `tickIntervalMs` (default 15s). */
|
||||
@@ -75,8 +86,8 @@ export class HealthProbeRunner {
|
||||
server = s;
|
||||
}
|
||||
|
||||
const healthCheck = server.healthCheck as HealthCheckSpec | null;
|
||||
if (!healthCheck) continue;
|
||||
// Any server without an explicit healthCheck gets the default liveness probe.
|
||||
const healthCheck: HealthCheckSpec = (server.healthCheck as HealthCheckSpec | null) ?? DEFAULT_HEALTH_CHECK;
|
||||
|
||||
const intervalMs = (healthCheck.intervalSeconds ?? 60) * 1000;
|
||||
const state = this.probeStates.get(inst.id);
|
||||
@@ -111,10 +122,18 @@ export class HealthProbeRunner {
|
||||
let result: ProbeResult;
|
||||
|
||||
try {
|
||||
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') {
|
||||
result = await this.probeHttp(instance, server, healthCheck, timeoutMs);
|
||||
if (healthCheck.tool === undefined) {
|
||||
// Liveness probe: send tools/list through the real production path.
|
||||
// Mirrors exactly what mcplocal/client calls do, so synthetic and real
|
||||
// failures converge on the same signal.
|
||||
result = await this.probeLiveness(server, timeoutMs);
|
||||
} else {
|
||||
result = await this.probeStdio(instance, server, healthCheck, timeoutMs);
|
||||
const readinessCheck = healthCheck as HealthCheckSpec & { tool: string };
|
||||
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') {
|
||||
result = await this.probeHttp(instance, server, readinessCheck, timeoutMs);
|
||||
} else {
|
||||
result = await this.probeStdio(instance, server, readinessCheck, timeoutMs);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
result = {
|
||||
@@ -169,11 +188,47 @@ export class HealthProbeRunner {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Liveness probe — sends tools/list via McpProxyService so the probe traverses
|
||||
* the exact code path production clients use. Works uniformly across every
|
||||
* transport (STDIO exec/attach, SSE, Streamable HTTP, external).
|
||||
*/
|
||||
private async probeLiveness(server: McpServer, timeoutMs: number): Promise<ProbeResult> {
|
||||
const start = Date.now();
|
||||
if (!this.mcpProxyService) {
|
||||
return { healthy: false, latencyMs: 0, message: 'mcpProxyService not wired — cannot run default liveness probe' };
|
||||
}
|
||||
|
||||
const deadline = new Promise<ProbeResult>((resolve) => {
|
||||
setTimeout(() => resolve({
|
||||
healthy: false,
|
||||
latencyMs: timeoutMs,
|
||||
message: `Liveness probe timed out after ${timeoutMs}ms`,
|
||||
}), timeoutMs);
|
||||
});
|
||||
|
||||
const probe = this.mcpProxyService.execute({ serverId: server.id, method: 'tools/list' })
|
||||
.then((response): ProbeResult => {
|
||||
const latencyMs = Date.now() - start;
|
||||
if (response.error) {
|
||||
return { healthy: false, latencyMs, message: response.error.message ?? 'tools/list error' };
|
||||
}
|
||||
return { healthy: true, latencyMs, message: 'ok' };
|
||||
})
|
||||
.catch((err: unknown): ProbeResult => ({
|
||||
healthy: false,
|
||||
latencyMs: Date.now() - start,
|
||||
message: err instanceof Error ? err.message : String(err),
|
||||
}));
|
||||
|
||||
return Promise.race([probe, deadline]);
|
||||
}
|
||||
|
||||
/** Probe an HTTP/SSE MCP server by sending a JSON-RPC tool call. */
|
||||
private async probeHttp(
|
||||
instance: McpInstance,
|
||||
server: McpServer,
|
||||
healthCheck: HealthCheckSpec,
|
||||
healthCheck: HealthCheckSpec & { tool: string },
|
||||
timeoutMs: number,
|
||||
): Promise<ProbeResult> {
|
||||
if (!instance.containerId) {
|
||||
@@ -205,7 +260,7 @@ export class HealthProbeRunner {
|
||||
*/
|
||||
private async probeStreamableHttp(
|
||||
baseUrl: string,
|
||||
healthCheck: HealthCheckSpec,
|
||||
healthCheck: HealthCheckSpec & { tool: string },
|
||||
timeoutMs: number,
|
||||
): Promise<ProbeResult> {
|
||||
const start = Date.now();
|
||||
@@ -274,7 +329,7 @@ export class HealthProbeRunner {
|
||||
*/
|
||||
private async probeSse(
|
||||
baseUrl: string,
|
||||
healthCheck: HealthCheckSpec,
|
||||
healthCheck: HealthCheckSpec & { tool: string },
|
||||
timeoutMs: number,
|
||||
): Promise<ProbeResult> {
|
||||
const start = Date.now();
|
||||
@@ -415,7 +470,7 @@ export class HealthProbeRunner {
|
||||
private async probeStdio(
|
||||
instance: McpInstance,
|
||||
server: McpServer,
|
||||
healthCheck: HealthCheckSpec,
|
||||
healthCheck: HealthCheckSpec & { tool: string },
|
||||
timeoutMs: number,
|
||||
): Promise<ProbeResult> {
|
||||
if (!instance.containerId) {
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { HealthProbeRunner } from '../../src/services/health-probe.service.js';
|
||||
import { HealthProbeRunner, DEFAULT_HEALTH_CHECK } from '../../src/services/health-probe.service.js';
|
||||
import type { HealthCheckSpec } from '../../src/services/health-probe.service.js';
|
||||
import type { IMcpInstanceRepository, IMcpServerRepository } from '../../src/repositories/interfaces.js';
|
||||
import type { McpOrchestrator, ExecResult } from '../../src/services/orchestrator.js';
|
||||
import type { McpOrchestrator } from '../../src/services/orchestrator.js';
|
||||
import type { McpProxyService, McpProxyResponse } from '../../src/services/mcp-proxy-service.js';
|
||||
import type { McpInstance, McpServer } from '@prisma/client';
|
||||
|
||||
function makeInstance(overrides: Partial<McpInstance> = {}): McpInstance {
|
||||
@@ -87,20 +88,30 @@ function mockOrchestrator(): McpOrchestrator {
|
||||
};
|
||||
}
|
||||
|
||||
function mockMcpProxyService(): McpProxyService {
|
||||
return {
|
||||
execute: vi.fn(async (): Promise<McpProxyResponse> => ({ jsonrpc: '2.0', id: 1, result: { tools: [] } })),
|
||||
closeAll: vi.fn(),
|
||||
removeClient: vi.fn(),
|
||||
} as unknown as McpProxyService;
|
||||
}
|
||||
|
||||
describe('HealthProbeRunner', () => {
|
||||
let instanceRepo: IMcpInstanceRepository;
|
||||
let serverRepo: IMcpServerRepository;
|
||||
let orchestrator: McpOrchestrator;
|
||||
let mcpProxyService: McpProxyService;
|
||||
let runner: HealthProbeRunner;
|
||||
|
||||
beforeEach(() => {
|
||||
instanceRepo = mockInstanceRepo();
|
||||
serverRepo = mockServerRepo();
|
||||
orchestrator = mockOrchestrator();
|
||||
runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator);
|
||||
mcpProxyService = mockMcpProxyService();
|
||||
runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator, undefined, mcpProxyService);
|
||||
});
|
||||
|
||||
it('skips instances without healthCheck config', async () => {
|
||||
it('applies default liveness probe when server has no healthCheck config', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer({ healthCheck: null });
|
||||
|
||||
@@ -109,8 +120,67 @@ describe('HealthProbeRunner', () => {
|
||||
|
||||
await runner.tick();
|
||||
|
||||
// No exec fallback — liveness goes through mcpProxyService
|
||||
expect(orchestrator.execInContainer).not.toHaveBeenCalled();
|
||||
expect(instanceRepo.updateStatus).not.toHaveBeenCalled();
|
||||
expect(mcpProxyService.execute).toHaveBeenCalledWith({ serverId: 'srv-1', method: 'tools/list' });
|
||||
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||
'inst-1',
|
||||
'RUNNING',
|
||||
expect.objectContaining({ healthStatus: 'healthy' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('default liveness probe marks unhealthy when tools/list returns JSON-RPC error', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer({
|
||||
healthCheck: { intervalSeconds: 0, failureThreshold: 1 } as unknown as McpServer['healthCheck'],
|
||||
});
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(mcpProxyService.execute).mockResolvedValue({
|
||||
jsonrpc: '2.0',
|
||||
id: 1,
|
||||
error: { code: -32603, message: 'Cannot connect to upstream' },
|
||||
});
|
||||
|
||||
await runner.tick();
|
||||
|
||||
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||
'inst-1',
|
||||
'RUNNING',
|
||||
expect.objectContaining({
|
||||
healthStatus: 'unhealthy',
|
||||
events: expect.arrayContaining([
|
||||
expect.objectContaining({ type: 'Warning', message: expect.stringContaining('Cannot connect to upstream') }),
|
||||
]),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('default liveness probe marks unhealthy when mcpProxyService throws', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer({
|
||||
healthCheck: { intervalSeconds: 0, failureThreshold: 1 } as unknown as McpServer['healthCheck'],
|
||||
});
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(mcpProxyService.execute).mockRejectedValue(new Error('no running instance'));
|
||||
|
||||
await runner.tick();
|
||||
|
||||
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||
'inst-1',
|
||||
'RUNNING',
|
||||
expect.objectContaining({ healthStatus: 'unhealthy' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('DEFAULT_HEALTH_CHECK has no tool set so it acts as liveness', () => {
|
||||
expect(DEFAULT_HEALTH_CHECK.tool).toBeUndefined();
|
||||
expect(DEFAULT_HEALTH_CHECK.intervalSeconds).toBe(30);
|
||||
expect(DEFAULT_HEALTH_CHECK.failureThreshold).toBe(3);
|
||||
});
|
||||
|
||||
it('skips non-RUNNING instances', async () => {
|
||||
|
||||
Reference in New Issue
Block a user