diff --git a/src/mcpd/src/services/health-probe.service.ts b/src/mcpd/src/services/health-probe.service.ts index 199c718..8bd4192 100644 --- a/src/mcpd/src/services/health-probe.service.ts +++ b/src/mcpd/src/services/health-probe.service.ts @@ -129,10 +129,18 @@ export class HealthProbeRunner { result = await this.probeLiveness(server, timeoutMs); } else { const readinessCheck = healthCheck as HealthCheckSpec & { tool: string }; - if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') { - result = await this.probeHttp(instance, server, readinessCheck, timeoutMs); + if (server.transport === 'STDIO') { + // Route STDIO readiness through the proxy so probes hit the live + // running container rather than spawning a fresh process inside + // it. The legacy `probeStdio` (docker-exec a synthetic Node script + // that re-spawns the package binary) only worked for + // packageName-based servers — image-based STDIO servers (gitea, + // docmost) returned a fake-unhealthy "No packageName or command" + // before they even tried the tool. Going through mcpProxyService + // also means readiness failures match production failures exactly. + result = await this.probeReadinessViaProxy(server, readinessCheck, timeoutMs); } else { - result = await this.probeStdio(instance, server, readinessCheck, timeoutMs); + result = await this.probeHttp(instance, server, readinessCheck, timeoutMs); } } } catch (err) { @@ -188,6 +196,71 @@ export class HealthProbeRunner { return result; } + /** + * Readiness probe via McpProxyService — sends `tools/call` against the + * configured probe tool through the live running instance. Used by + * STDIO servers; HTTP/SSE servers go through the bespoke `probeHttp` + * paths that connect directly to the container's IP+port (those work + * fine and are kept as-is to minimise the diff in this PR). + * + * If the tool returns a JSON-RPC `error` (e.g. gitea-mcp-server's + * "token is required" when GITEA_ACCESS_TOKEN didn't resolve), we mark + * the instance unhealthy with the upstream error message. That's how + * we catch broken-by-empty-secret cases that liveness (`tools/list`) + * would otherwise pass. + */ + private async probeReadinessViaProxy( + server: McpServer, + healthCheck: HealthCheckSpec & { tool: string }, + timeoutMs: number, + ): Promise { + const start = Date.now(); + if (!this.mcpProxyService) { + return { healthy: false, latencyMs: 0, message: 'mcpProxyService not wired — cannot run readiness probe' }; + } + + const deadline = new Promise((resolve) => { + setTimeout(() => resolve({ + healthy: false, + latencyMs: timeoutMs, + message: `Readiness probe timed out after ${timeoutMs}ms`, + }), timeoutMs); + }); + + const probe = this.mcpProxyService + .execute({ + serverId: server.id, + method: 'tools/call', + params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} }, + }) + .then((response): ProbeResult => { + const latencyMs = Date.now() - start; + if (response.error) { + return { + healthy: false, + latencyMs, + message: response.error.message ?? `tools/call ${healthCheck.tool} returned error`, + }; + } + // Some servers report tool-level failures inside the result body + // (`{ isError: true, content: [...] }`) rather than as JSON-RPC + // errors. Treat that as unhealthy too. + const result = response.result as { isError?: boolean; content?: Array<{ text?: string }> } | undefined; + if (result?.isError) { + const text = result.content?.[0]?.text ?? `${healthCheck.tool} returned isError`; + return { healthy: false, latencyMs, message: text }; + } + return { healthy: true, latencyMs, message: 'ok' }; + }) + .catch((err: unknown): ProbeResult => ({ + healthy: false, + latencyMs: Date.now() - start, + message: err instanceof Error ? err.message : String(err), + })); + + return Promise.race([probe, deadline]); + } + /** * Liveness probe — sends tools/list via McpProxyService so the probe traverses * the exact code path production clients use. Works uniformly across every @@ -463,122 +536,14 @@ export class HealthProbeRunner { } } - /** - * Probe a STDIO MCP server by running `docker exec` with a disposable Node.js - * script that pipes JSON-RPC messages into the package binary. - */ - private async probeStdio( - instance: McpInstance, - server: McpServer, - healthCheck: HealthCheckSpec & { tool: string }, - timeoutMs: number, - ): Promise { - if (!instance.containerId) { - return { healthy: false, latencyMs: 0, message: 'No container ID' }; - } - - const start = Date.now(); - const packageName = server.packageName as string | null; - const command = server.command as string[] | null; - - // Determine how to spawn the MCP server inside the container - let spawnCmd: string[]; - if (packageName) { - spawnCmd = ['npx', '--prefer-offline', '-y', packageName]; - } else if (command && command.length > 0) { - spawnCmd = command; - } else { - return { healthy: false, latencyMs: 0, message: 'No packageName or command for STDIO server' }; - } - - // Build JSON-RPC messages for the health probe - const initMsg = JSON.stringify({ - jsonrpc: '2.0', id: 1, method: 'initialize', - params: { - protocolVersion: '2024-11-05', - capabilities: {}, - clientInfo: { name: 'mcpctl-health', version: '0.0.1' }, - }, - }); - const initializedMsg = JSON.stringify({ - jsonrpc: '2.0', method: 'notifications/initialized', - }); - const toolCallMsg = JSON.stringify({ - jsonrpc: '2.0', id: 2, method: 'tools/call', - params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} }, - }); - - // Use a Node.js inline script that: - // 1. Spawns the MCP server binary - // 2. Sends initialize + initialized + tool call via stdin - // 3. Reads responses from stdout - // 4. Exits with 0 if tool call succeeds, 1 if it fails - const spawnArgs = JSON.stringify(spawnCmd); - const probeScript = ` -const { spawn } = require('child_process'); -const args = ${spawnArgs}; -const proc = spawn(args[0], args.slice(1), { stdio: ['pipe', 'pipe', 'pipe'] }); -let output = ''; -let responded = false; -proc.stdout.on('data', d => { - output += d; - const lines = output.split('\\n'); - for (const line of lines) { - if (!line.trim()) continue; - try { - const msg = JSON.parse(line); - if (msg.id === 2) { - responded = true; - if (msg.error) { - process.stdout.write('ERROR:' + (msg.error.message || 'unknown')); - proc.kill(); - process.exit(1); - } else { - process.stdout.write('OK'); - proc.kill(); - process.exit(0); - } - } - } catch {} - } - output = lines[lines.length - 1] || ''; -}); -proc.stderr.on('data', () => {}); -proc.on('error', e => { process.stdout.write('ERROR:' + e.message); process.exit(1); }); -proc.on('exit', (code) => { if (!responded) { process.stdout.write('ERROR:process exited ' + code); process.exit(1); } }); -setTimeout(() => { if (!responded) { process.stdout.write('ERROR:timeout'); proc.kill(); process.exit(1); } }, ${timeoutMs - 2000}); -proc.stdin.write(${JSON.stringify(initMsg)} + '\\n'); -setTimeout(() => { - proc.stdin.write(${JSON.stringify(initializedMsg)} + '\\n'); - setTimeout(() => { - proc.stdin.write(${JSON.stringify(toolCallMsg)} + '\\n'); - }, 500); -}, 500); -`.trim(); - - try { - const result = await this.orchestrator.execInContainer( - instance.containerId, - ['node', '-e', probeScript], - { timeoutMs }, - ); - - const latencyMs = Date.now() - start; - - if (result.exitCode === 0 && result.stdout.includes('OK')) { - return { healthy: true, latencyMs, message: 'ok' }; - } - - // Extract error message - const errorMatch = result.stdout.match(/ERROR:(.*)/); - const errorMsg = errorMatch?.[1] ?? (result.stderr.trim() || `exit code ${result.exitCode}`); - return { healthy: false, latencyMs, message: errorMsg }; - } catch (err) { - return { - healthy: false, - latencyMs: Date.now() - start, - message: err instanceof Error ? err.message : String(err), - }; - } - } + // Note: a previous `probeStdio` implementation existed here that ran a + // disposable Node script inside the container via `docker exec`, + // re-spawning the package binary and piping JSON-RPC into it. It only + // worked for packageName-based servers (the spawn step required an + // npx-compatible package); image-based STDIO servers like + // gitea-mcp-server fell through with "No packageName or command" and + // were always reported unhealthy for the wrong reason. STDIO readiness + // now goes through `probeReadinessViaProxy` which calls the live + // running container — same code path as production traffic — and + // surfaces the upstream error verbatim. } diff --git a/src/mcpd/src/services/instance.service.ts b/src/mcpd/src/services/instance.service.ts index ed0b1f9..39175ff 100644 --- a/src/mcpd/src/services/instance.service.ts +++ b/src/mcpd/src/services/instance.service.ts @@ -1,4 +1,4 @@ -import type { McpInstance } from '@prisma/client'; +import type { McpInstance, McpServer } from '@prisma/client'; import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js'; import type { McpOrchestrator, ContainerSpec, ContainerInfo } from './orchestrator.js'; import { NotFoundError } from './mcp-server.service.js'; @@ -13,6 +13,36 @@ const RUNNER_IMAGES: Record = { /** Network for MCP server containers (matches docker-compose mcp-servers network). */ const MCP_SERVERS_NETWORK = process.env['MCPD_MCP_NETWORK'] ?? 'mcp-servers'; +/** + * Backoff schedule for instance startup failures (env resolution, container + * creation, etc). Mirrors Kubernetes-style escalation: fast retries for + * transient hiccups, then a longer pause once it's clear something is + * persistently wrong. + * + * The retry state lives on `McpInstance.metadata` (no schema migration + * needed) and is preserved across reconcile cycles by the in-place + * `retryInstance` path so attemptCount actually accumulates. + */ +const FAST_RETRY_MS = 30_000; // first 5 attempts: 30s apart +const SLOW_RETRY_MS = 5 * 60_000; // afterwards: 5 minutes +const MAX_FAST_RETRIES = 5; + +interface RetryMetadata { + error?: string; + attemptCount?: number; + lastAttemptAt?: string; + nextRetryAt?: string; + [k: string]: unknown; +} + +function readRetryMeta(instance: McpInstance): RetryMetadata { + return (instance.metadata ?? {}) as RetryMetadata; +} + +function nextDelayMs(attemptCount: number): number { + return attemptCount <= MAX_FAST_RETRIES ? FAST_RETRY_MS : SLOW_RETRY_MS; +} + export class InvalidStateError extends Error { readonly statusCode = 409; constructor(message: string) { @@ -118,8 +148,12 @@ export class InstanceService { * Reconcile ALL servers — the operator loop. * * For every server with replicas > 0, ensures the correct number of - * healthy instances exist. Cleans up ERROR instances and starts - * replacements. This is the core self-healing mechanism. + * healthy instances exist. ERROR instances are not blindly recreated: + * within their `nextRetryAt` window they're left alone (and counted + * against the replica budget so we don't churn replacements while one + * is in backoff); past their window they're retried in-place via + * `retryInstance` so attemptCount accumulates and backoff escalates + * correctly. */ async reconcileAll(): Promise<{ reconciled: number; errors: string[] }> { await this.syncStatus(); @@ -128,6 +162,8 @@ export class InstanceService { let reconciled = 0; const errors: string[] = []; + const now = Date.now(); + for (const server of servers) { if (server.replicas <= 0) continue; @@ -136,17 +172,38 @@ export class InstanceService { const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING'); const errored = instances.filter((i) => i.status === 'ERROR'); - // Clean up ERROR instances so they don't accumulate + // Partition ERROR instances by whether their backoff window has elapsed. + const dueForRetry: McpInstance[] = []; + const stillWaiting: McpInstance[] = []; for (const inst of errored) { - await this.removeOne(inst); + const meta = readRetryMeta(inst); + const ts = meta.nextRetryAt ? Date.parse(meta.nextRetryAt) : 0; + if (Number.isNaN(ts) || ts <= now) { + dueForRetry.push(inst); + } else { + stillWaiting.push(inst); + } } - // Scale up if needed - const toStart = server.replicas - active.length; + // Retry elapsed ones in-place. This preserves attemptCount across + // attempts so the 30s × 5 → 5min schedule actually escalates. + for (const inst of dueForRetry) { + await this.retryInstance(inst); + } + + // Scale up only if we don't already have enough live attempts. + // Live attempts = currently-running OR -starting + still-waiting + // (in backoff) + just-retried (now STARTING via retryInstance). + // Counting waiting + retried against the budget prevents tight + // create-fail-create churn while previous attempts work through + // their backoff schedule. + const toStart = server.replicas - active.length - stillWaiting.length - dueForRetry.length; if (toStart > 0) { for (let i = 0; i < toStart; i++) { await this.startOne(server.id); } + } + if (toStart > 0 || dueForRetry.length > 0) { reconciled++; } } catch (err) { @@ -220,7 +277,12 @@ export class InstanceService { return this.orchestrator.getContainerLogs(instance.containerId, opts); } - /** Start a single instance for a server. */ + /** + * Start a single instance for a server. Creates a fresh `STARTING` row + * and hands off to `attemptStart` for the env+container work. On + * failure, `attemptStart` marks the row `ERROR` with a backoff-aware + * `nextRetryAt`; the reconciler picks it up later via `retryInstance`. + */ private async startOne(serverId: string): Promise { const server = await this.serverRepo.findById(serverId); if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`); @@ -234,6 +296,49 @@ export class InstanceService { }); } + const instance = await this.instanceRepo.create({ + serverId, + status: 'STARTING', + }); + return this.attemptStart(instance, server); + } + + /** + * Re-attempt a previously-errored instance in place, preserving its + * `attemptCount` so the backoff schedule escalates correctly. Called + * by `reconcileAll` for ERROR instances whose `nextRetryAt` has elapsed. + */ + private async retryInstance(instance: McpInstance): Promise { + const server = await this.serverRepo.findById(instance.serverId); + if (!server) { + // Server was deleted underneath us — nothing to retry against. + return this.markInstanceError(instance, 'Server no longer exists'); + } + + if (server.externalUrl) { + // External servers don't need a container; the URL is the contract. + return this.instanceRepo.updateStatus(instance.id, 'RUNNING', { + metadata: { external: true, url: server.externalUrl }, + }); + } + + // Reset transient fields but keep retry counters via the metadata + // passed through `attemptStart` → `markInstanceError`. + await this.instanceRepo.updateStatus(instance.id, 'STARTING', {}); + const refreshed = (await this.instanceRepo.findById(instance.id)) ?? instance; + return this.attemptStart(refreshed, server); + } + + /** + * Run the env-resolution + container-creation steps for a STARTING + * instance. On any failure, mark the instance `ERROR` with structured + * retry metadata. Used by both initial start (`startOne`) and retry + * (`retryInstance`). + */ + private async attemptStart( + instance: McpInstance, + server: McpServer, + ): Promise { // Determine image + command based on server config: // 1. Explicit dockerImage → use as-is // 2. packageName → use runtime-specific runner image (node/python/go/...) @@ -253,11 +358,6 @@ export class InstanceService { image = server.name; } - let instance = await this.instanceRepo.create({ - serverId, - status: 'STARTING', - }); - try { const spec: ContainerSpec = { image, @@ -265,7 +365,7 @@ export class InstanceService { hostPort: null, network: MCP_SERVERS_NETWORK, labels: { - 'mcpctl.server-id': serverId, + 'mcpctl.server-id': server.id, 'mcpctl.instance-id': instance.id, }, }; @@ -283,7 +383,17 @@ export class InstanceService { } } - // Resolve env vars from inline values and secret refs + // Resolve env vars from inline values and secret refs. + // + // Failure here is FATAL for the start attempt: a container that + // boots without its declared secrets will silently mis-behave (we + // saw this with gitea-mcp-server starting up with an empty + // GITEA_ACCESS_TOKEN when OpenBao was unreachable, then reporting + // "healthy" while every authed call failed). Marking the instance + // ERROR with a backoff-aware nextRetryAt is honest; the reconciler + // will retry it in-place on the next tick whose nextRetryAt has + // elapsed. Optional/missing env vars should be modeled as `value: ""` + // entries on the server, not as silent secret-resolution failures. if (this.secretResolver) { try { const resolvedEnv = await resolveServerEnv(server, this.secretResolver); @@ -291,8 +401,8 @@ export class InstanceService { spec.env = resolvedEnv; } } catch (envErr) { - // Log but don't prevent startup — env resolution failures are non-fatal - // The container may still work if env vars are optional + const msg = envErr instanceof Error ? envErr.message : String(envErr); + return this.markInstanceError(instance, `secret resolution failed: ${msg}`); } } @@ -313,14 +423,39 @@ export class InstanceService { } // Set STARTING — syncStatus will promote to RUNNING once the container is actually ready - instance = await this.instanceRepo.updateStatus(instance.id, 'STARTING', updateFields); + return this.instanceRepo.updateStatus(instance.id, 'STARTING', updateFields); } catch (err) { - instance = await this.instanceRepo.updateStatus(instance.id, 'ERROR', { - metadata: { error: err instanceof Error ? err.message : String(err) }, - }); + return this.markInstanceError( + instance, + err instanceof Error ? err.message : String(err), + ); } + } - return instance; + /** + * Mark an instance ERROR with a backoff-aware retry schedule. The + * `attemptCount` accumulates across retries (preserved by + * `retryInstance` which reuses the same row), so the schedule + * actually escalates: 30s × 5 → 5min thereafter. + */ + private async markInstanceError( + instance: McpInstance, + error: string, + ): Promise { + const meta = readRetryMeta(instance); + const attemptCount = (typeof meta.attemptCount === 'number' ? meta.attemptCount : 0) + 1; + const delayMs = nextDelayMs(attemptCount); + const now = new Date(); + const nextRetryAt = new Date(now.getTime() + delayMs).toISOString(); + return this.instanceRepo.updateStatus(instance.id, 'ERROR', { + metadata: { + ...meta, + error, + attemptCount, + lastAttemptAt: now.toISOString(), + nextRetryAt, + }, + }); } /** Stop and remove a single instance. */ diff --git a/src/mcpd/tests/instance-service.test.ts b/src/mcpd/tests/instance-service.test.ts index 88fc73e..c19b4bd 100644 --- a/src/mcpd/tests/instance-service.test.ts +++ b/src/mcpd/tests/instance-service.test.ts @@ -334,20 +334,93 @@ describe('InstanceService', () => { expect(instanceRepo.create).not.toHaveBeenCalled(); }); - it('cleans up ERROR instances and creates replacements', async () => { + it('retries ERROR instances in-place when their backoff has elapsed (no delete, no new row)', async () => { const server = makeServer({ id: 'srv-1', replicas: 1 }); vi.mocked(serverRepo.findAll).mockResolvedValue([server]); vi.mocked(serverRepo.findById).mockResolvedValue(server); + // ERROR instance with no nextRetryAt → retry is due immediately. vi.mocked(instanceRepo.findAll).mockResolvedValue([ makeInstance({ id: 'inst-dead', serverId: 'srv-1', status: 'ERROR', containerId: 'ctr-dead' }), ]); const result = await service.reconcileAll(); - // Should delete ERROR instance and create a new one + // Retry-in-place semantics: don't delete the row, don't create a + // replacement. attemptCount needs to live on the same row so the + // backoff schedule can actually escalate. + expect(instanceRepo.delete).not.toHaveBeenCalled(); + expect(instanceRepo.create).not.toHaveBeenCalled(); + // retryInstance flips the row STARTING before attemptStart runs. + expect(instanceRepo.updateStatus).toHaveBeenCalledWith('inst-dead', 'STARTING', expect.anything()); expect(result.reconciled).toBe(1); - expect(instanceRepo.delete).toHaveBeenCalledWith('inst-dead'); - expect(instanceRepo.create).toHaveBeenCalled(); + }); + + it('leaves ERROR instances alone while their nextRetryAt is in the future', async () => { + const server = makeServer({ id: 'srv-1', replicas: 1 }); + vi.mocked(serverRepo.findAll).mockResolvedValue([server]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + const futureRetry = new Date(Date.now() + 60_000).toISOString(); + vi.mocked(instanceRepo.findAll).mockResolvedValue([ + makeInstance({ + id: 'inst-waiting', + serverId: 'srv-1', + status: 'ERROR', + metadata: { nextRetryAt: futureRetry, attemptCount: 2 }, + }), + ]); + + const result = await service.reconcileAll(); + + // Within the backoff window the reconciler must not delete the row, + // not retry it, and not spawn a replacement (counting it against + // the replica budget is what prevents tight create-fail-create churn). + expect(instanceRepo.delete).not.toHaveBeenCalled(); + expect(instanceRepo.create).not.toHaveBeenCalled(); + expect(orchestrator.createContainer).not.toHaveBeenCalled(); + expect(result.reconciled).toBe(0); + }); + + it('escalates the backoff: attemptCount + nextRetryAt persist on retry failures', async () => { + const server = makeServer({ id: 'srv-1', replicas: 1 }); + vi.mocked(serverRepo.findAll).mockResolvedValue([server]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + + // Fail container creation so attemptStart goes down the markInstanceError path. + vi.mocked(orchestrator.createContainer).mockRejectedValue(new Error('boom')); + + // Existing ERROR instance with attemptCount=2 (so the next failure + // produces attemptCount=3, still inside the fast-retry window). + vi.mocked(instanceRepo.findAll).mockResolvedValue([ + makeInstance({ + id: 'inst-1', + serverId: 'srv-1', + status: 'ERROR', + metadata: { error: 'previous failure', attemptCount: 2, nextRetryAt: new Date(Date.now() - 1000).toISOString() }, + }), + ]); + // retryInstance refreshes via findById; let it return the same row. + vi.mocked(instanceRepo.findById).mockImplementation(async () => makeInstance({ + id: 'inst-1', + serverId: 'srv-1', + status: 'STARTING', + metadata: { error: 'previous failure', attemptCount: 2, nextRetryAt: new Date(Date.now() - 1000).toISOString() }, + })); + + await service.reconcileAll(); + + // Look at the last updateStatus call — it should be the ERROR transition + // with attemptCount bumped to 3. + const errorCalls = vi.mocked(instanceRepo.updateStatus).mock.calls.filter( + (c) => c[1] === 'ERROR', + ); + expect(errorCalls.length).toBeGreaterThan(0); + const lastErrorCall = errorCalls[errorCalls.length - 1]!; + const meta = (lastErrorCall[2] as { metadata?: Record } | undefined)?.metadata; + expect(meta).toBeDefined(); + expect((meta as Record)['attemptCount']).toBe(3); + expect((meta as Record)['nextRetryAt']).toBeTypeOf('string'); + // Reason should reference the boom we threw. + expect(String((meta as Record)['error'])).toContain('boom'); }); it('reconciles multiple servers independently', async () => { diff --git a/src/mcpd/tests/services/health-probe.test.ts b/src/mcpd/tests/services/health-probe.test.ts index 9c00b10..072bef9 100644 --- a/src/mcpd/tests/services/health-probe.test.ts +++ b/src/mcpd/tests/services/health-probe.test.ts @@ -192,25 +192,28 @@ describe('HealthProbeRunner', () => { expect(serverRepo.findById).not.toHaveBeenCalled(); }); - it('probes STDIO instance with exec and marks healthy on success', async () => { + it('probes STDIO instance via mcpProxyService and marks healthy on success', async () => { const instance = makeInstance(); const server = makeServer(); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(serverRepo.findById).mockResolvedValue(server); - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 0, - stdout: 'OK', - stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, + result: { content: [{ type: 'text', text: 'ok' }] }, }); await runner.tick(); - expect(orchestrator.execInContainer).toHaveBeenCalledWith( - 'container-abc', - expect.arrayContaining(['node', '-e']), - expect.objectContaining({ timeoutMs: 10000 }), - ); + // STDIO readiness now goes through the proxy (the live container), + // not via docker-exec into a synthetic spawn — see comment on + // probeReadinessViaProxy for why. + expect(orchestrator.execInContainer).not.toHaveBeenCalled(); + expect(mcpProxyService.execute).toHaveBeenCalledWith({ + serverId: 'srv-1', + method: 'tools/call', + params: { name: 'list_datasources', arguments: {} }, + }); expect(instanceRepo.updateStatus).toHaveBeenCalledWith( 'inst-1', @@ -225,6 +228,57 @@ describe('HealthProbeRunner', () => { ); }); + it('marks unhealthy when proxy returns a JSON-RPC error (e.g. broken-secret auth failure)', async () => { + const instance = makeInstance(); + const server = makeServer({ + healthCheck: { tool: 'get_me', intervalSeconds: 0, failureThreshold: 1 } as McpServer['healthCheck'], + }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, + error: { code: -32603, message: 'token is required' }, + }); + + await runner.tick(); + + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ + healthStatus: 'unhealthy', + events: expect.arrayContaining([ + expect.objectContaining({ type: 'Warning', message: expect.stringContaining('token is required') }), + ]), + }), + ); + }); + + it('marks unhealthy when proxy returns a tool-level error in result.isError', async () => { + const instance = makeInstance(); + const server = makeServer({ + healthCheck: { tool: 'get_me', intervalSeconds: 0, failureThreshold: 1 } as McpServer['healthCheck'], + }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, + result: { isError: true, content: [{ type: 'text', text: 'auth failed: token is required' }] }, + }); + + await runner.tick(); + + const events = vi.mocked(instanceRepo.updateStatus).mock.calls[0]?.[2]?.events as Array<{ message: string }> | undefined; + expect(events?.[events.length - 1]?.message).toContain('auth failed'); + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ healthStatus: 'unhealthy' }), + ); + }); + it('marks unhealthy after failureThreshold consecutive failures', async () => { const instance = makeInstance(); const healthCheck: HealthCheckSpec = { @@ -237,10 +291,9 @@ describe('HealthProbeRunner', () => { vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(serverRepo.findById).mockResolvedValue(server); - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 1, - stdout: 'ERROR:connection refused', - stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, + error: { code: -32603, message: 'connection refused' }, }); // First failure → degraded @@ -274,15 +327,15 @@ describe('HealthProbeRunner', () => { vi.mocked(serverRepo.findById).mockResolvedValue(server); // Two failures - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 1, stdout: 'ERROR:fail', stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, error: { code: -32603, message: 'fail' }, }); await runner.tick(); await runner.tick(); // Then success — should reset to healthy - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 0, stdout: 'OK', stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, result: {}, }); await runner.tick(); @@ -290,13 +343,16 @@ describe('HealthProbeRunner', () => { expect(lastCall?.[2]).toEqual(expect.objectContaining({ healthStatus: 'healthy' })); }); - it('handles exec timeout as failure', async () => { + it('handles probe timeout as failure', async () => { const instance = makeInstance(); - const server = makeServer(); + const server = makeServer({ + healthCheck: { tool: 'list_datasources', intervalSeconds: 0, timeoutSeconds: 0.05, failureThreshold: 3 } as unknown as McpServer['healthCheck'], + }); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(serverRepo.findById).mockResolvedValue(server); - vi.mocked(orchestrator.execInContainer).mockRejectedValue(new Error('Exec timed out after 10000ms')); + // Hang forever — the probe's internal deadline should fire instead. + vi.mocked(mcpProxyService.execute).mockImplementation(() => new Promise(() => { /* never resolves */ })); await runner.tick(); @@ -323,8 +379,8 @@ describe('HealthProbeRunner', () => { vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(serverRepo.findById).mockResolvedValue(server); - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 0, stdout: 'OK', stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, result: {}, }); await runner.tick(); @@ -343,17 +399,17 @@ describe('HealthProbeRunner', () => { vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(serverRepo.findById).mockResolvedValue(server); - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 0, stdout: 'OK', stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, result: {}, }); // First tick: should probe await runner.tick(); - expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); + expect(mcpProxyService.execute).toHaveBeenCalledTimes(1); // Second tick immediately: should skip (300s interval not elapsed) await runner.tick(); - expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); + expect(mcpProxyService.execute).toHaveBeenCalledTimes(1); }); it('cleans up probe states for removed instances', async () => { @@ -364,9 +420,12 @@ describe('HealthProbeRunner', () => { vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, result: {}, + }); await runner.tick(); - expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); + expect(mcpProxyService.execute).toHaveBeenCalledTimes(1); // Instance removed vi.mocked(instanceRepo.findAll).mockResolvedValue([]); @@ -375,7 +434,7 @@ describe('HealthProbeRunner', () => { // Re-add same instance — should probe again (state was cleaned) vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); await runner.tick(); - expect(orchestrator.execInContainer).toHaveBeenCalledTimes(2); + expect(mcpProxyService.execute).toHaveBeenCalledTimes(2); }); it('skips STDIO instances without containerId', async () => { @@ -397,8 +456,8 @@ describe('HealthProbeRunner', () => { arguments: {}, }; - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 0, stdout: 'OK', stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, result: {}, }); const result = await runner.probeInstance(instance, server, healthCheck); @@ -407,15 +466,14 @@ describe('HealthProbeRunner', () => { expect(result.message).toBe('ok'); }); - it('handles STDIO exec failure with error message', async () => { + it('surfaces upstream JSON-RPC error message verbatim', async () => { const instance = makeInstance(); const server = makeServer(); const healthCheck: HealthCheckSpec = { tool: 'list_datasources', arguments: {} }; - vi.mocked(orchestrator.execInContainer).mockResolvedValue({ - exitCode: 1, - stdout: 'ERROR:ECONNREFUSED 10.0.0.1:3000', - stderr: '', + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', id: 1, + error: { code: -32603, message: 'ECONNREFUSED 10.0.0.1:3000' }, }); const result = await runner.probeInstance(instance, server, healthCheck);