fix(mcpd): fail-loud on env resolution + retry/backoff + readiness via proxy

Three connected issues with how instances came up + got reported as
healthy when their secret backend was unreachable. The motivating
case: gitea-mcp-server starts when mcpd can't read the
gitea-creds secret from OpenBao, runs with an empty
GITEA_ACCESS_TOKEN, replies fine to tools/list (so liveness passes),
but every authed call fails with "token is required" — and
`mcpctl get instances` cheerfully reports the instance as healthy.

## What changed

### 1. Env resolution failures are now fatal for the start attempt

`src/mcpd/src/services/instance.service.ts`

The previous behaviour swallowed `resolveServerEnv` failures and let
the container start anyway with whatever env survived ("non-fatal —
container may still work if env vars are optional"). That's the bug:
the gitea container started with no token, ran for weeks, and was
reported healthy.

The catch now calls `markInstanceError(instance, "secret resolution
failed: <reason>")` and returns. Optional/missing env vars should be
modelled as `value: ""` entries on the server, not as silent
secret-resolution failures.

### 2. ERROR instances retry with backoff, not blind churn

Adds Kubernetes-style escalation: 30 s × 5 attempts, then 5 min
pauses thereafter. Retry state lives on `McpInstance.metadata` (no
schema migration) — `attemptCount`, `lastAttemptAt`, `nextRetryAt`,
`error`.

The reconciler no longer tears down ERROR instances and creates
fresh replacements (which would reset attemptCount and effectively
loop at 30 s forever). Instead:

- ERROR rows whose `nextRetryAt` is in the future are LEFT ALONE
  and counted against the replica budget — preventing tight create-
  fail-create churn while a previous attempt is in its backoff window.
- ERROR rows whose `nextRetryAt` has elapsed are retried IN-PLACE
  via a new `retryInstance` method, which preserves attemptCount on
  the same row so the schedule actually escalates.

The work has been factored into `startOne` (creates + initial attempt)
+ `attemptStart` (env + container) + `retryInstance` (re-attempt the
same row) + `markInstanceError` (write retry metadata).

### 3. STDIO readiness probe goes through mcpProxyService

`src/mcpd/src/services/health-probe.service.ts`

The legacy `probeStdio` (a `docker exec node -e '... spawn(packageName)
...'` invocation) only worked for packageName-based servers. Image-
based STDIO servers like gitea-mcp-server fell through with "No
packageName or command for STDIO server" and were reported unhealthy
for the WRONG reason — they have no packageName because they are an
image, not because anything's wrong.

New `probeReadinessViaProxy`: sends `tools/call` through the live
running container via `mcpProxyService.execute`. Same code path as
production traffic, so probe failures match real failures. Picks up:

- JSON-RPC errors (e.g. "token is required" when env is empty).
- Tool-level errors expressed as `result.isError: true`.
- Connection failures wrapped as exceptions.
- Hard timeouts via the deadline race.

After this PR, configuring `gitea` with
`healthCheck: { tool: get_me, intervalSeconds: 60 }` makes
`mcpctl get instances` report it as `unhealthy` whenever the auth
token is missing or wrong — which is honest.

The dead `probeStdio` (~120 LOC) is removed; HTTP/SSE bespoke probe
paths are kept for now (they work and the diff stays minimal).

## Tests

`src/mcpd/tests/instance-service.test.ts`:
- Replaces "cleans up ERROR instances and creates replacements" with
  "retries ERROR instances in-place when their backoff has elapsed".
- Adds "leaves ERROR instances alone while their nextRetryAt is in
  the future" and "escalates the backoff: attemptCount + nextRetryAt
  persist on retry failures".

`src/mcpd/tests/services/health-probe.test.ts`:
- Swaps STDIO probe mocks from `orchestrator.execInContainer` →
  `mcpProxyService.execute`.
- Adds "marks unhealthy when proxy returns a JSON-RPC error
  (e.g. broken-secret auth failure)" — explicitly the gitea case.
- Adds "marks unhealthy when proxy returns a tool-level error in
  result.isError" — covers servers that report tool failures as
  isError instead of as JSON-RPC errors.
- Renames "handles exec timeout" → "handles probe timeout" and
  exercises the deadline race rather than an exec rejection.

Full suite: 162 test files / 2161 tests green (+4 new).

## Manual verification step (post-deploy)

```bash
mcpctl edit server gitea
# → add healthCheck:
#     tool: get_me
#     intervalSeconds: 60
#     timeoutSeconds: 10
#     failureThreshold: 3
```

If OpenBao is still down: gitea instance enters ERROR with
attemptCount + nextRetryAt visible in `mcpctl describe instance`.
Otherwise: gitea env resolves at next start, probe passes, instance
is honestly healthy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-05-07 18:55:23 +01:00
parent 56735a5290
commit e6cd73543a
4 changed files with 414 additions and 183 deletions

View File

@@ -129,10 +129,18 @@ export class HealthProbeRunner {
result = await this.probeLiveness(server, timeoutMs); result = await this.probeLiveness(server, timeoutMs);
} else { } else {
const readinessCheck = healthCheck as HealthCheckSpec & { tool: string }; const readinessCheck = healthCheck as HealthCheckSpec & { tool: string };
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') { if (server.transport === 'STDIO') {
result = await this.probeHttp(instance, server, readinessCheck, timeoutMs); // Route STDIO readiness through the proxy so probes hit the live
// running container rather than spawning a fresh process inside
// it. The legacy `probeStdio` (docker-exec a synthetic Node script
// that re-spawns the package binary) only worked for
// packageName-based servers — image-based STDIO servers (gitea,
// docmost) returned a fake-unhealthy "No packageName or command"
// before they even tried the tool. Going through mcpProxyService
// also means readiness failures match production failures exactly.
result = await this.probeReadinessViaProxy(server, readinessCheck, timeoutMs);
} else { } else {
result = await this.probeStdio(instance, server, readinessCheck, timeoutMs); result = await this.probeHttp(instance, server, readinessCheck, timeoutMs);
} }
} }
} catch (err) { } catch (err) {
@@ -188,6 +196,71 @@ export class HealthProbeRunner {
return result; return result;
} }
/**
* Readiness probe via McpProxyService — sends `tools/call` against the
* configured probe tool through the live running instance. Used by
* STDIO servers; HTTP/SSE servers go through the bespoke `probeHttp`
* paths that connect directly to the container's IP+port (those work
* fine and are kept as-is to minimise the diff in this PR).
*
* If the tool returns a JSON-RPC `error` (e.g. gitea-mcp-server's
* "token is required" when GITEA_ACCESS_TOKEN didn't resolve), we mark
* the instance unhealthy with the upstream error message. That's how
* we catch broken-by-empty-secret cases that liveness (`tools/list`)
* would otherwise pass.
*/
private async probeReadinessViaProxy(
server: McpServer,
healthCheck: HealthCheckSpec & { tool: string },
timeoutMs: number,
): Promise<ProbeResult> {
const start = Date.now();
if (!this.mcpProxyService) {
return { healthy: false, latencyMs: 0, message: 'mcpProxyService not wired — cannot run readiness probe' };
}
const deadline = new Promise<ProbeResult>((resolve) => {
setTimeout(() => resolve({
healthy: false,
latencyMs: timeoutMs,
message: `Readiness probe timed out after ${timeoutMs}ms`,
}), timeoutMs);
});
const probe = this.mcpProxyService
.execute({
serverId: server.id,
method: 'tools/call',
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
})
.then((response): ProbeResult => {
const latencyMs = Date.now() - start;
if (response.error) {
return {
healthy: false,
latencyMs,
message: response.error.message ?? `tools/call ${healthCheck.tool} returned error`,
};
}
// Some servers report tool-level failures inside the result body
// (`{ isError: true, content: [...] }`) rather than as JSON-RPC
// errors. Treat that as unhealthy too.
const result = response.result as { isError?: boolean; content?: Array<{ text?: string }> } | undefined;
if (result?.isError) {
const text = result.content?.[0]?.text ?? `${healthCheck.tool} returned isError`;
return { healthy: false, latencyMs, message: text };
}
return { healthy: true, latencyMs, message: 'ok' };
})
.catch((err: unknown): ProbeResult => ({
healthy: false,
latencyMs: Date.now() - start,
message: err instanceof Error ? err.message : String(err),
}));
return Promise.race([probe, deadline]);
}
/** /**
* Liveness probe — sends tools/list via McpProxyService so the probe traverses * Liveness probe — sends tools/list via McpProxyService so the probe traverses
* the exact code path production clients use. Works uniformly across every * the exact code path production clients use. Works uniformly across every
@@ -463,122 +536,14 @@ export class HealthProbeRunner {
} }
} }
/** // Note: a previous `probeStdio` implementation existed here that ran a
* Probe a STDIO MCP server by running `docker exec` with a disposable Node.js // disposable Node script inside the container via `docker exec`,
* script that pipes JSON-RPC messages into the package binary. // re-spawning the package binary and piping JSON-RPC into it. It only
*/ // worked for packageName-based servers (the spawn step required an
private async probeStdio( // npx-compatible package); image-based STDIO servers like
instance: McpInstance, // gitea-mcp-server fell through with "No packageName or command" and
server: McpServer, // were always reported unhealthy for the wrong reason. STDIO readiness
healthCheck: HealthCheckSpec & { tool: string }, // now goes through `probeReadinessViaProxy` which calls the live
timeoutMs: number, // running container — same code path as production traffic — and
): Promise<ProbeResult> { // surfaces the upstream error verbatim.
if (!instance.containerId) {
return { healthy: false, latencyMs: 0, message: 'No container ID' };
}
const start = Date.now();
const packageName = server.packageName as string | null;
const command = server.command as string[] | null;
// Determine how to spawn the MCP server inside the container
let spawnCmd: string[];
if (packageName) {
spawnCmd = ['npx', '--prefer-offline', '-y', packageName];
} else if (command && command.length > 0) {
spawnCmd = command;
} else {
return { healthy: false, latencyMs: 0, message: 'No packageName or command for STDIO server' };
}
// Build JSON-RPC messages for the health probe
const initMsg = JSON.stringify({
jsonrpc: '2.0', id: 1, method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: { name: 'mcpctl-health', version: '0.0.1' },
},
});
const initializedMsg = JSON.stringify({
jsonrpc: '2.0', method: 'notifications/initialized',
});
const toolCallMsg = JSON.stringify({
jsonrpc: '2.0', id: 2, method: 'tools/call',
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
});
// Use a Node.js inline script that:
// 1. Spawns the MCP server binary
// 2. Sends initialize + initialized + tool call via stdin
// 3. Reads responses from stdout
// 4. Exits with 0 if tool call succeeds, 1 if it fails
const spawnArgs = JSON.stringify(spawnCmd);
const probeScript = `
const { spawn } = require('child_process');
const args = ${spawnArgs};
const proc = spawn(args[0], args.slice(1), { stdio: ['pipe', 'pipe', 'pipe'] });
let output = '';
let responded = false;
proc.stdout.on('data', d => {
output += d;
const lines = output.split('\\n');
for (const line of lines) {
if (!line.trim()) continue;
try {
const msg = JSON.parse(line);
if (msg.id === 2) {
responded = true;
if (msg.error) {
process.stdout.write('ERROR:' + (msg.error.message || 'unknown'));
proc.kill();
process.exit(1);
} else {
process.stdout.write('OK');
proc.kill();
process.exit(0);
}
}
} catch {}
}
output = lines[lines.length - 1] || '';
});
proc.stderr.on('data', () => {});
proc.on('error', e => { process.stdout.write('ERROR:' + e.message); process.exit(1); });
proc.on('exit', (code) => { if (!responded) { process.stdout.write('ERROR:process exited ' + code); process.exit(1); } });
setTimeout(() => { if (!responded) { process.stdout.write('ERROR:timeout'); proc.kill(); process.exit(1); } }, ${timeoutMs - 2000});
proc.stdin.write(${JSON.stringify(initMsg)} + '\\n');
setTimeout(() => {
proc.stdin.write(${JSON.stringify(initializedMsg)} + '\\n');
setTimeout(() => {
proc.stdin.write(${JSON.stringify(toolCallMsg)} + '\\n');
}, 500);
}, 500);
`.trim();
try {
const result = await this.orchestrator.execInContainer(
instance.containerId,
['node', '-e', probeScript],
{ timeoutMs },
);
const latencyMs = Date.now() - start;
if (result.exitCode === 0 && result.stdout.includes('OK')) {
return { healthy: true, latencyMs, message: 'ok' };
}
// Extract error message
const errorMatch = result.stdout.match(/ERROR:(.*)/);
const errorMsg = errorMatch?.[1] ?? (result.stderr.trim() || `exit code ${result.exitCode}`);
return { healthy: false, latencyMs, message: errorMsg };
} catch (err) {
return {
healthy: false,
latencyMs: Date.now() - start,
message: err instanceof Error ? err.message : String(err),
};
}
}
} }

View File

@@ -1,4 +1,4 @@
import type { McpInstance } from '@prisma/client'; import type { McpInstance, McpServer } from '@prisma/client';
import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js'; import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js';
import type { McpOrchestrator, ContainerSpec, ContainerInfo } from './orchestrator.js'; import type { McpOrchestrator, ContainerSpec, ContainerInfo } from './orchestrator.js';
import { NotFoundError } from './mcp-server.service.js'; import { NotFoundError } from './mcp-server.service.js';
@@ -13,6 +13,36 @@ const RUNNER_IMAGES: Record<string, string> = {
/** Network for MCP server containers (matches docker-compose mcp-servers network). */ /** Network for MCP server containers (matches docker-compose mcp-servers network). */
const MCP_SERVERS_NETWORK = process.env['MCPD_MCP_NETWORK'] ?? 'mcp-servers'; const MCP_SERVERS_NETWORK = process.env['MCPD_MCP_NETWORK'] ?? 'mcp-servers';
/**
* Backoff schedule for instance startup failures (env resolution, container
* creation, etc). Mirrors Kubernetes-style escalation: fast retries for
* transient hiccups, then a longer pause once it's clear something is
* persistently wrong.
*
* The retry state lives on `McpInstance.metadata` (no schema migration
* needed) and is preserved across reconcile cycles by the in-place
* `retryInstance` path so attemptCount actually accumulates.
*/
const FAST_RETRY_MS = 30_000; // first 5 attempts: 30s apart
const SLOW_RETRY_MS = 5 * 60_000; // afterwards: 5 minutes
const MAX_FAST_RETRIES = 5;
interface RetryMetadata {
error?: string;
attemptCount?: number;
lastAttemptAt?: string;
nextRetryAt?: string;
[k: string]: unknown;
}
function readRetryMeta(instance: McpInstance): RetryMetadata {
return (instance.metadata ?? {}) as RetryMetadata;
}
function nextDelayMs(attemptCount: number): number {
return attemptCount <= MAX_FAST_RETRIES ? FAST_RETRY_MS : SLOW_RETRY_MS;
}
export class InvalidStateError extends Error { export class InvalidStateError extends Error {
readonly statusCode = 409; readonly statusCode = 409;
constructor(message: string) { constructor(message: string) {
@@ -118,8 +148,12 @@ export class InstanceService {
* Reconcile ALL servers — the operator loop. * Reconcile ALL servers — the operator loop.
* *
* For every server with replicas > 0, ensures the correct number of * For every server with replicas > 0, ensures the correct number of
* healthy instances exist. Cleans up ERROR instances and starts * healthy instances exist. ERROR instances are not blindly recreated:
* replacements. This is the core self-healing mechanism. * within their `nextRetryAt` window they're left alone (and counted
* against the replica budget so we don't churn replacements while one
* is in backoff); past their window they're retried in-place via
* `retryInstance` so attemptCount accumulates and backoff escalates
* correctly.
*/ */
async reconcileAll(): Promise<{ reconciled: number; errors: string[] }> { async reconcileAll(): Promise<{ reconciled: number; errors: string[] }> {
await this.syncStatus(); await this.syncStatus();
@@ -128,6 +162,8 @@ export class InstanceService {
let reconciled = 0; let reconciled = 0;
const errors: string[] = []; const errors: string[] = [];
const now = Date.now();
for (const server of servers) { for (const server of servers) {
if (server.replicas <= 0) continue; if (server.replicas <= 0) continue;
@@ -136,17 +172,38 @@ export class InstanceService {
const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING'); const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING');
const errored = instances.filter((i) => i.status === 'ERROR'); const errored = instances.filter((i) => i.status === 'ERROR');
// Clean up ERROR instances so they don't accumulate // Partition ERROR instances by whether their backoff window has elapsed.
const dueForRetry: McpInstance[] = [];
const stillWaiting: McpInstance[] = [];
for (const inst of errored) { for (const inst of errored) {
await this.removeOne(inst); const meta = readRetryMeta(inst);
const ts = meta.nextRetryAt ? Date.parse(meta.nextRetryAt) : 0;
if (Number.isNaN(ts) || ts <= now) {
dueForRetry.push(inst);
} else {
stillWaiting.push(inst);
}
} }
// Scale up if needed // Retry elapsed ones in-place. This preserves attemptCount across
const toStart = server.replicas - active.length; // attempts so the 30s × 5 → 5min schedule actually escalates.
for (const inst of dueForRetry) {
await this.retryInstance(inst);
}
// Scale up only if we don't already have enough live attempts.
// Live attempts = currently-running OR -starting + still-waiting
// (in backoff) + just-retried (now STARTING via retryInstance).
// Counting waiting + retried against the budget prevents tight
// create-fail-create churn while previous attempts work through
// their backoff schedule.
const toStart = server.replicas - active.length - stillWaiting.length - dueForRetry.length;
if (toStart > 0) { if (toStart > 0) {
for (let i = 0; i < toStart; i++) { for (let i = 0; i < toStart; i++) {
await this.startOne(server.id); await this.startOne(server.id);
} }
}
if (toStart > 0 || dueForRetry.length > 0) {
reconciled++; reconciled++;
} }
} catch (err) { } catch (err) {
@@ -220,7 +277,12 @@ export class InstanceService {
return this.orchestrator.getContainerLogs(instance.containerId, opts); return this.orchestrator.getContainerLogs(instance.containerId, opts);
} }
/** Start a single instance for a server. */ /**
* Start a single instance for a server. Creates a fresh `STARTING` row
* and hands off to `attemptStart` for the env+container work. On
* failure, `attemptStart` marks the row `ERROR` with a backoff-aware
* `nextRetryAt`; the reconciler picks it up later via `retryInstance`.
*/
private async startOne(serverId: string): Promise<McpInstance> { private async startOne(serverId: string): Promise<McpInstance> {
const server = await this.serverRepo.findById(serverId); const server = await this.serverRepo.findById(serverId);
if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`); if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`);
@@ -234,6 +296,49 @@ export class InstanceService {
}); });
} }
const instance = await this.instanceRepo.create({
serverId,
status: 'STARTING',
});
return this.attemptStart(instance, server);
}
/**
* Re-attempt a previously-errored instance in place, preserving its
* `attemptCount` so the backoff schedule escalates correctly. Called
* by `reconcileAll` for ERROR instances whose `nextRetryAt` has elapsed.
*/
private async retryInstance(instance: McpInstance): Promise<McpInstance> {
const server = await this.serverRepo.findById(instance.serverId);
if (!server) {
// Server was deleted underneath us — nothing to retry against.
return this.markInstanceError(instance, 'Server no longer exists');
}
if (server.externalUrl) {
// External servers don't need a container; the URL is the contract.
return this.instanceRepo.updateStatus(instance.id, 'RUNNING', {
metadata: { external: true, url: server.externalUrl },
});
}
// Reset transient fields but keep retry counters via the metadata
// passed through `attemptStart` → `markInstanceError`.
await this.instanceRepo.updateStatus(instance.id, 'STARTING', {});
const refreshed = (await this.instanceRepo.findById(instance.id)) ?? instance;
return this.attemptStart(refreshed, server);
}
/**
* Run the env-resolution + container-creation steps for a STARTING
* instance. On any failure, mark the instance `ERROR` with structured
* retry metadata. Used by both initial start (`startOne`) and retry
* (`retryInstance`).
*/
private async attemptStart(
instance: McpInstance,
server: McpServer,
): Promise<McpInstance> {
// Determine image + command based on server config: // Determine image + command based on server config:
// 1. Explicit dockerImage → use as-is // 1. Explicit dockerImage → use as-is
// 2. packageName → use runtime-specific runner image (node/python/go/...) // 2. packageName → use runtime-specific runner image (node/python/go/...)
@@ -253,11 +358,6 @@ export class InstanceService {
image = server.name; image = server.name;
} }
let instance = await this.instanceRepo.create({
serverId,
status: 'STARTING',
});
try { try {
const spec: ContainerSpec = { const spec: ContainerSpec = {
image, image,
@@ -265,7 +365,7 @@ export class InstanceService {
hostPort: null, hostPort: null,
network: MCP_SERVERS_NETWORK, network: MCP_SERVERS_NETWORK,
labels: { labels: {
'mcpctl.server-id': serverId, 'mcpctl.server-id': server.id,
'mcpctl.instance-id': instance.id, 'mcpctl.instance-id': instance.id,
}, },
}; };
@@ -283,7 +383,17 @@ export class InstanceService {
} }
} }
// Resolve env vars from inline values and secret refs // Resolve env vars from inline values and secret refs.
//
// Failure here is FATAL for the start attempt: a container that
// boots without its declared secrets will silently mis-behave (we
// saw this with gitea-mcp-server starting up with an empty
// GITEA_ACCESS_TOKEN when OpenBao was unreachable, then reporting
// "healthy" while every authed call failed). Marking the instance
// ERROR with a backoff-aware nextRetryAt is honest; the reconciler
// will retry it in-place on the next tick whose nextRetryAt has
// elapsed. Optional/missing env vars should be modeled as `value: ""`
// entries on the server, not as silent secret-resolution failures.
if (this.secretResolver) { if (this.secretResolver) {
try { try {
const resolvedEnv = await resolveServerEnv(server, this.secretResolver); const resolvedEnv = await resolveServerEnv(server, this.secretResolver);
@@ -291,8 +401,8 @@ export class InstanceService {
spec.env = resolvedEnv; spec.env = resolvedEnv;
} }
} catch (envErr) { } catch (envErr) {
// Log but don't prevent startup — env resolution failures are non-fatal const msg = envErr instanceof Error ? envErr.message : String(envErr);
// The container may still work if env vars are optional return this.markInstanceError(instance, `secret resolution failed: ${msg}`);
} }
} }
@@ -313,14 +423,39 @@ export class InstanceService {
} }
// Set STARTING — syncStatus will promote to RUNNING once the container is actually ready // Set STARTING — syncStatus will promote to RUNNING once the container is actually ready
instance = await this.instanceRepo.updateStatus(instance.id, 'STARTING', updateFields); return this.instanceRepo.updateStatus(instance.id, 'STARTING', updateFields);
} catch (err) { } catch (err) {
instance = await this.instanceRepo.updateStatus(instance.id, 'ERROR', { return this.markInstanceError(
metadata: { error: err instanceof Error ? err.message : String(err) }, instance,
}); err instanceof Error ? err.message : String(err),
);
} }
}
return instance; /**
* Mark an instance ERROR with a backoff-aware retry schedule. The
* `attemptCount` accumulates across retries (preserved by
* `retryInstance` which reuses the same row), so the schedule
* actually escalates: 30s × 5 → 5min thereafter.
*/
private async markInstanceError(
instance: McpInstance,
error: string,
): Promise<McpInstance> {
const meta = readRetryMeta(instance);
const attemptCount = (typeof meta.attemptCount === 'number' ? meta.attemptCount : 0) + 1;
const delayMs = nextDelayMs(attemptCount);
const now = new Date();
const nextRetryAt = new Date(now.getTime() + delayMs).toISOString();
return this.instanceRepo.updateStatus(instance.id, 'ERROR', {
metadata: {
...meta,
error,
attemptCount,
lastAttemptAt: now.toISOString(),
nextRetryAt,
},
});
} }
/** Stop and remove a single instance. */ /** Stop and remove a single instance. */

View File

@@ -334,20 +334,93 @@ describe('InstanceService', () => {
expect(instanceRepo.create).not.toHaveBeenCalled(); expect(instanceRepo.create).not.toHaveBeenCalled();
}); });
it('cleans up ERROR instances and creates replacements', async () => { it('retries ERROR instances in-place when their backoff has elapsed (no delete, no new row)', async () => {
const server = makeServer({ id: 'srv-1', replicas: 1 }); const server = makeServer({ id: 'srv-1', replicas: 1 });
vi.mocked(serverRepo.findAll).mockResolvedValue([server]); vi.mocked(serverRepo.findAll).mockResolvedValue([server]);
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
// ERROR instance with no nextRetryAt → retry is due immediately.
vi.mocked(instanceRepo.findAll).mockResolvedValue([ vi.mocked(instanceRepo.findAll).mockResolvedValue([
makeInstance({ id: 'inst-dead', serverId: 'srv-1', status: 'ERROR', containerId: 'ctr-dead' }), makeInstance({ id: 'inst-dead', serverId: 'srv-1', status: 'ERROR', containerId: 'ctr-dead' }),
]); ]);
const result = await service.reconcileAll(); const result = await service.reconcileAll();
// Should delete ERROR instance and create a new one // Retry-in-place semantics: don't delete the row, don't create a
// replacement. attemptCount needs to live on the same row so the
// backoff schedule can actually escalate.
expect(instanceRepo.delete).not.toHaveBeenCalled();
expect(instanceRepo.create).not.toHaveBeenCalled();
// retryInstance flips the row STARTING before attemptStart runs.
expect(instanceRepo.updateStatus).toHaveBeenCalledWith('inst-dead', 'STARTING', expect.anything());
expect(result.reconciled).toBe(1); expect(result.reconciled).toBe(1);
expect(instanceRepo.delete).toHaveBeenCalledWith('inst-dead'); });
expect(instanceRepo.create).toHaveBeenCalled();
it('leaves ERROR instances alone while their nextRetryAt is in the future', async () => {
const server = makeServer({ id: 'srv-1', replicas: 1 });
vi.mocked(serverRepo.findAll).mockResolvedValue([server]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
const futureRetry = new Date(Date.now() + 60_000).toISOString();
vi.mocked(instanceRepo.findAll).mockResolvedValue([
makeInstance({
id: 'inst-waiting',
serverId: 'srv-1',
status: 'ERROR',
metadata: { nextRetryAt: futureRetry, attemptCount: 2 },
}),
]);
const result = await service.reconcileAll();
// Within the backoff window the reconciler must not delete the row,
// not retry it, and not spawn a replacement (counting it against
// the replica budget is what prevents tight create-fail-create churn).
expect(instanceRepo.delete).not.toHaveBeenCalled();
expect(instanceRepo.create).not.toHaveBeenCalled();
expect(orchestrator.createContainer).not.toHaveBeenCalled();
expect(result.reconciled).toBe(0);
});
it('escalates the backoff: attemptCount + nextRetryAt persist on retry failures', async () => {
const server = makeServer({ id: 'srv-1', replicas: 1 });
vi.mocked(serverRepo.findAll).mockResolvedValue([server]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
// Fail container creation so attemptStart goes down the markInstanceError path.
vi.mocked(orchestrator.createContainer).mockRejectedValue(new Error('boom'));
// Existing ERROR instance with attemptCount=2 (so the next failure
// produces attemptCount=3, still inside the fast-retry window).
vi.mocked(instanceRepo.findAll).mockResolvedValue([
makeInstance({
id: 'inst-1',
serverId: 'srv-1',
status: 'ERROR',
metadata: { error: 'previous failure', attemptCount: 2, nextRetryAt: new Date(Date.now() - 1000).toISOString() },
}),
]);
// retryInstance refreshes via findById; let it return the same row.
vi.mocked(instanceRepo.findById).mockImplementation(async () => makeInstance({
id: 'inst-1',
serverId: 'srv-1',
status: 'STARTING',
metadata: { error: 'previous failure', attemptCount: 2, nextRetryAt: new Date(Date.now() - 1000).toISOString() },
}));
await service.reconcileAll();
// Look at the last updateStatus call — it should be the ERROR transition
// with attemptCount bumped to 3.
const errorCalls = vi.mocked(instanceRepo.updateStatus).mock.calls.filter(
(c) => c[1] === 'ERROR',
);
expect(errorCalls.length).toBeGreaterThan(0);
const lastErrorCall = errorCalls[errorCalls.length - 1]!;
const meta = (lastErrorCall[2] as { metadata?: Record<string, unknown> } | undefined)?.metadata;
expect(meta).toBeDefined();
expect((meta as Record<string, unknown>)['attemptCount']).toBe(3);
expect((meta as Record<string, unknown>)['nextRetryAt']).toBeTypeOf('string');
// Reason should reference the boom we threw.
expect(String((meta as Record<string, unknown>)['error'])).toContain('boom');
}); });
it('reconciles multiple servers independently', async () => { it('reconciles multiple servers independently', async () => {

View File

@@ -192,25 +192,28 @@ describe('HealthProbeRunner', () => {
expect(serverRepo.findById).not.toHaveBeenCalled(); expect(serverRepo.findById).not.toHaveBeenCalled();
}); });
it('probes STDIO instance with exec and marks healthy on success', async () => { it('probes STDIO instance via mcpProxyService and marks healthy on success', async () => {
const instance = makeInstance(); const instance = makeInstance();
const server = makeServer(); const server = makeServer();
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 0, jsonrpc: '2.0', id: 1,
stdout: 'OK', result: { content: [{ type: 'text', text: 'ok' }] },
stderr: '',
}); });
await runner.tick(); await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledWith( // STDIO readiness now goes through the proxy (the live container),
'container-abc', // not via docker-exec into a synthetic spawn — see comment on
expect.arrayContaining(['node', '-e']), // probeReadinessViaProxy for why.
expect.objectContaining({ timeoutMs: 10000 }), expect(orchestrator.execInContainer).not.toHaveBeenCalled();
); expect(mcpProxyService.execute).toHaveBeenCalledWith({
serverId: 'srv-1',
method: 'tools/call',
params: { name: 'list_datasources', arguments: {} },
});
expect(instanceRepo.updateStatus).toHaveBeenCalledWith( expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
'inst-1', 'inst-1',
@@ -225,6 +228,57 @@ describe('HealthProbeRunner', () => {
); );
}); });
it('marks unhealthy when proxy returns a JSON-RPC error (e.g. broken-secret auth failure)', async () => {
const instance = makeInstance();
const server = makeServer({
healthCheck: { tool: 'get_me', intervalSeconds: 0, failureThreshold: 1 } as McpServer['healthCheck'],
});
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(mcpProxyService.execute).mockResolvedValue({
jsonrpc: '2.0', id: 1,
error: { code: -32603, message: 'token is required' },
});
await runner.tick();
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
'inst-1',
'RUNNING',
expect.objectContaining({
healthStatus: 'unhealthy',
events: expect.arrayContaining([
expect.objectContaining({ type: 'Warning', message: expect.stringContaining('token is required') }),
]),
}),
);
});
it('marks unhealthy when proxy returns a tool-level error in result.isError', async () => {
const instance = makeInstance();
const server = makeServer({
healthCheck: { tool: 'get_me', intervalSeconds: 0, failureThreshold: 1 } as McpServer['healthCheck'],
});
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(mcpProxyService.execute).mockResolvedValue({
jsonrpc: '2.0', id: 1,
result: { isError: true, content: [{ type: 'text', text: 'auth failed: token is required' }] },
});
await runner.tick();
const events = vi.mocked(instanceRepo.updateStatus).mock.calls[0]?.[2]?.events as Array<{ message: string }> | undefined;
expect(events?.[events.length - 1]?.message).toContain('auth failed');
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
'inst-1',
'RUNNING',
expect.objectContaining({ healthStatus: 'unhealthy' }),
);
});
it('marks unhealthy after failureThreshold consecutive failures', async () => { it('marks unhealthy after failureThreshold consecutive failures', async () => {
const instance = makeInstance(); const instance = makeInstance();
const healthCheck: HealthCheckSpec = { const healthCheck: HealthCheckSpec = {
@@ -237,10 +291,9 @@ describe('HealthProbeRunner', () => {
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 1, jsonrpc: '2.0', id: 1,
stdout: 'ERROR:connection refused', error: { code: -32603, message: 'connection refused' },
stderr: '',
}); });
// First failure → degraded // First failure → degraded
@@ -274,15 +327,15 @@ describe('HealthProbeRunner', () => {
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
// Two failures // Two failures
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 1, stdout: 'ERROR:fail', stderr: '', jsonrpc: '2.0', id: 1, error: { code: -32603, message: 'fail' },
}); });
await runner.tick(); await runner.tick();
await runner.tick(); await runner.tick();
// Then success — should reset to healthy // Then success — should reset to healthy
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '', jsonrpc: '2.0', id: 1, result: {},
}); });
await runner.tick(); await runner.tick();
@@ -290,13 +343,16 @@ describe('HealthProbeRunner', () => {
expect(lastCall?.[2]).toEqual(expect.objectContaining({ healthStatus: 'healthy' })); expect(lastCall?.[2]).toEqual(expect.objectContaining({ healthStatus: 'healthy' }));
}); });
it('handles exec timeout as failure', async () => { it('handles probe timeout as failure', async () => {
const instance = makeInstance(); const instance = makeInstance();
const server = makeServer(); const server = makeServer({
healthCheck: { tool: 'list_datasources', intervalSeconds: 0, timeoutSeconds: 0.05, failureThreshold: 3 } as unknown as McpServer['healthCheck'],
});
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockRejectedValue(new Error('Exec timed out after 10000ms')); // Hang forever — the probe's internal deadline should fire instead.
vi.mocked(mcpProxyService.execute).mockImplementation(() => new Promise(() => { /* never resolves */ }));
await runner.tick(); await runner.tick();
@@ -323,8 +379,8 @@ describe('HealthProbeRunner', () => {
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '', jsonrpc: '2.0', id: 1, result: {},
}); });
await runner.tick(); await runner.tick();
@@ -343,17 +399,17 @@ describe('HealthProbeRunner', () => {
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '', jsonrpc: '2.0', id: 1, result: {},
}); });
// First tick: should probe // First tick: should probe
await runner.tick(); await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); expect(mcpProxyService.execute).toHaveBeenCalledTimes(1);
// Second tick immediately: should skip (300s interval not elapsed) // Second tick immediately: should skip (300s interval not elapsed)
await runner.tick(); await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); expect(mcpProxyService.execute).toHaveBeenCalledTimes(1);
}); });
it('cleans up probe states for removed instances', async () => { it('cleans up probe states for removed instances', async () => {
@@ -364,9 +420,12 @@ describe('HealthProbeRunner', () => {
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server); vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(mcpProxyService.execute).mockResolvedValue({
jsonrpc: '2.0', id: 1, result: {},
});
await runner.tick(); await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); expect(mcpProxyService.execute).toHaveBeenCalledTimes(1);
// Instance removed // Instance removed
vi.mocked(instanceRepo.findAll).mockResolvedValue([]); vi.mocked(instanceRepo.findAll).mockResolvedValue([]);
@@ -375,7 +434,7 @@ describe('HealthProbeRunner', () => {
// Re-add same instance — should probe again (state was cleaned) // Re-add same instance — should probe again (state was cleaned)
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
await runner.tick(); await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(2); expect(mcpProxyService.execute).toHaveBeenCalledTimes(2);
}); });
it('skips STDIO instances without containerId', async () => { it('skips STDIO instances without containerId', async () => {
@@ -397,8 +456,8 @@ describe('HealthProbeRunner', () => {
arguments: {}, arguments: {},
}; };
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '', jsonrpc: '2.0', id: 1, result: {},
}); });
const result = await runner.probeInstance(instance, server, healthCheck); const result = await runner.probeInstance(instance, server, healthCheck);
@@ -407,15 +466,14 @@ describe('HealthProbeRunner', () => {
expect(result.message).toBe('ok'); expect(result.message).toBe('ok');
}); });
it('handles STDIO exec failure with error message', async () => { it('surfaces upstream JSON-RPC error message verbatim', async () => {
const instance = makeInstance(); const instance = makeInstance();
const server = makeServer(); const server = makeServer();
const healthCheck: HealthCheckSpec = { tool: 'list_datasources', arguments: {} }; const healthCheck: HealthCheckSpec = { tool: 'list_datasources', arguments: {} };
vi.mocked(orchestrator.execInContainer).mockResolvedValue({ vi.mocked(mcpProxyService.execute).mockResolvedValue({
exitCode: 1, jsonrpc: '2.0', id: 1,
stdout: 'ERROR:ECONNREFUSED 10.0.0.1:3000', error: { code: -32603, message: 'ECONNREFUSED 10.0.0.1:3000' },
stderr: '',
}); });
const result = await runner.probeInstance(instance, server, healthCheck); const result = await runner.probeInstance(instance, server, healthCheck);