From cde1c59fd62e98621d4ce69e189f020048cd4dc1 Mon Sep 17 00:00:00 2001 From: Michal Date: Mon, 23 Feb 2026 00:38:48 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20MCP=20health=20probe=20runner=20?= =?UTF-8?q?=E2=80=94=20periodic=20tool-call=20probes=20for=20instances?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Kubernetes-style liveness probes that call MCP tools defined in server healthCheck configs. For STDIO servers, uses docker exec to spawn a disposable MCP client that sends initialize + tool call. For HTTP/SSE servers, sends JSON-RPC directly. - HealthProbeRunner service with configurable interval/threshold/timeout - execInContainer added to orchestrator interface + Docker implementation - Instance findById now includes server relation (fixes describe showing IDs) - Events appended to instance (last 50), healthStatus tracked as healthy/degraded/unhealthy - 12 unit tests covering probing, thresholds, intervals, cleanup Co-Authored-By: Claude Opus 4.6 --- src/mcpd/src/main.ts | 11 + .../repositories/mcp-instance.repository.ts | 5 +- .../src/services/docker/container-manager.ts | 65 ++++ src/mcpd/src/services/health-probe.service.ts | 357 ++++++++++++++++++ src/mcpd/src/services/index.ts | 4 +- .../services/k8s/kubernetes-orchestrator.ts | 10 + src/mcpd/src/services/orchestrator.ts | 9 + src/mcpd/tests/services/health-probe.test.ts | 355 +++++++++++++++++ 8 files changed, 814 insertions(+), 2 deletions(-) create mode 100644 src/mcpd/src/services/health-probe.service.ts create mode 100644 src/mcpd/tests/services/health-probe.test.ts diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index e407a96..cca9960 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -29,6 +29,7 @@ import { AuthService, McpProxyService, TemplateService, + HealthProbeRunner, } from './services/index.js'; import { registerMcpServerRoutes, @@ -144,10 +145,20 @@ async function main(): Promise { } }, SYNC_INTERVAL_MS); + // Health probe runner — periodic MCP tool-call probes (like k8s livenessProbe) + const healthProbeRunner = new HealthProbeRunner( + instanceRepo, + serverRepo, + orchestrator, + { info: (msg) => app.log.info(msg), error: (obj, msg) => app.log.error(obj, msg) }, + ); + healthProbeRunner.start(15_000); + // Graceful shutdown setupGracefulShutdown(app, { disconnectDb: async () => { clearInterval(syncTimer); + healthProbeRunner.stop(); await prisma.$disconnect(); }, }); diff --git a/src/mcpd/src/repositories/mcp-instance.repository.ts b/src/mcpd/src/repositories/mcp-instance.repository.ts index e0e8a37..414fa6f 100644 --- a/src/mcpd/src/repositories/mcp-instance.repository.ts +++ b/src/mcpd/src/repositories/mcp-instance.repository.ts @@ -17,7 +17,10 @@ export class McpInstanceRepository implements IMcpInstanceRepository { } async findById(id: string): Promise { - return this.prisma.mcpInstance.findUnique({ where: { id } }); + return this.prisma.mcpInstance.findUnique({ + where: { id }, + include: { server: { select: { name: true } } }, + }); } async findByContainerId(containerId: string): Promise { diff --git a/src/mcpd/src/services/docker/container-manager.ts b/src/mcpd/src/services/docker/container-manager.ts index 6c69c51..0ea15f5 100644 --- a/src/mcpd/src/services/docker/container-manager.ts +++ b/src/mcpd/src/services/docker/container-manager.ts @@ -1,9 +1,11 @@ import Docker from 'dockerode'; +import { PassThrough } from 'node:stream'; import type { McpOrchestrator, ContainerSpec, ContainerInfo, ContainerLogs, + ExecResult, } from '../orchestrator.js'; import { DEFAULT_MEMORY_LIMIT } from '../orchestrator.js'; @@ -161,4 +163,67 @@ export class DockerContainerManager implements McpOrchestrator { // For simplicity we return everything as stdout. return { stdout: raw, stderr: '' }; } + + async execInContainer( + containerId: string, + cmd: string[], + opts?: { stdin?: string; timeoutMs?: number }, + ): Promise { + const container = this.docker.getContainer(containerId); + const hasStdin = opts?.stdin !== undefined; + + const exec = await container.exec({ + Cmd: cmd, + AttachStdin: hasStdin, + AttachStdout: true, + AttachStderr: true, + }); + + const stream = await exec.start({ hijack: hasStdin, stdin: hasStdin }); + const timeoutMs = opts?.timeoutMs ?? 30_000; + + return new Promise((resolve, reject) => { + const stdout = new PassThrough(); + const stderr = new PassThrough(); + const stdoutChunks: Buffer[] = []; + const stderrChunks: Buffer[] = []; + + stdout.on('data', (chunk: Buffer) => stdoutChunks.push(chunk)); + stderr.on('data', (chunk: Buffer) => stderrChunks.push(chunk)); + + this.docker.modem.demuxStream(stream, stdout, stderr); + + if (hasStdin) { + stream.write(opts!.stdin); + stream.end(); + } + + const timer = setTimeout(() => { + stream.destroy(); + reject(new Error(`Exec timed out after ${timeoutMs}ms`)); + }, timeoutMs); + + stream.on('end', () => { + clearTimeout(timer); + exec.inspect().then((info) => { + resolve({ + exitCode: (info as { ExitCode: number }).ExitCode, + stdout: Buffer.concat(stdoutChunks).toString('utf-8'), + stderr: Buffer.concat(stderrChunks).toString('utf-8'), + }); + }).catch((err) => { + resolve({ + exitCode: -1, + stdout: Buffer.concat(stdoutChunks).toString('utf-8'), + stderr: err instanceof Error ? err.message : String(err), + }); + }); + }); + + stream.on('error', (err: Error) => { + clearTimeout(timer); + reject(err); + }); + }); + } } diff --git a/src/mcpd/src/services/health-probe.service.ts b/src/mcpd/src/services/health-probe.service.ts new file mode 100644 index 0000000..d13a766 --- /dev/null +++ b/src/mcpd/src/services/health-probe.service.ts @@ -0,0 +1,357 @@ +import type { McpServer, McpInstance } from '@prisma/client'; +import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js'; +import type { McpOrchestrator } from './orchestrator.js'; + +export interface HealthCheckSpec { + tool: string; + arguments?: Record; + intervalSeconds?: number; + timeoutSeconds?: number; + failureThreshold?: number; +} + +export interface ProbeResult { + healthy: boolean; + latencyMs: number; + message: string; +} + +interface ProbeState { + consecutiveFailures: number; + lastProbeAt: number; +} + +/** + * Periodic health probe runner — calls MCP tools on running instances to verify + * they are alive and responsive. Mirrors Kubernetes liveness probe semantics. + * + * For STDIO servers: runs `docker exec` with a disposable MCP client script + * that sends initialize + tool/call via the package binary. + * + * For SSE/HTTP servers: sends HTTP JSON-RPC directly to the container port. + */ +export class HealthProbeRunner { + private probeStates = new Map(); + private timer: ReturnType | null = null; + + constructor( + private instanceRepo: IMcpInstanceRepository, + private serverRepo: IMcpServerRepository, + private orchestrator: McpOrchestrator, + private logger?: { info: (msg: string) => void; error: (obj: unknown, msg: string) => void }, + ) {} + + /** Start the periodic probe loop. Runs every `tickIntervalMs` (default 15s). */ + start(tickIntervalMs = 15_000): void { + if (this.timer) return; + this.timer = setInterval(() => { + this.tick().catch((err) => { + this.logger?.error({ err }, 'Health probe tick failed'); + }); + }, tickIntervalMs); + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + } + } + + /** Single tick: probe all RUNNING instances that have healthCheck configs and are due. */ + async tick(): Promise { + const instances = await this.instanceRepo.findAll(); + const running = instances.filter((i) => i.status === 'RUNNING' && i.containerId); + + // Cache servers by ID to avoid repeated lookups + const serverCache = new Map(); + + for (const inst of running) { + let server = serverCache.get(inst.serverId); + if (!server) { + const s = await this.serverRepo.findById(inst.serverId); + if (!s) continue; + serverCache.set(inst.serverId, s); + server = s; + } + + const healthCheck = server.healthCheck as HealthCheckSpec | null; + if (!healthCheck) continue; + + const intervalMs = (healthCheck.intervalSeconds ?? 60) * 1000; + const state = this.probeStates.get(inst.id); + const now = Date.now(); + + // Skip if not due yet + if (state && (now - state.lastProbeAt) < intervalMs) continue; + + await this.probeInstance(inst, server, healthCheck); + } + + // Clean up states for instances that no longer exist + const activeIds = new Set(running.map((i) => i.id)); + for (const key of this.probeStates.keys()) { + if (!activeIds.has(key)) { + this.probeStates.delete(key); + } + } + } + + /** Probe a single instance and update its health status. */ + async probeInstance( + instance: McpInstance, + server: McpServer, + healthCheck: HealthCheckSpec, + ): Promise { + const timeoutMs = (healthCheck.timeoutSeconds ?? 10) * 1000; + const failureThreshold = healthCheck.failureThreshold ?? 3; + const now = new Date(); + const start = Date.now(); + + let result: ProbeResult; + + try { + if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') { + result = await this.probeHttp(instance, healthCheck, timeoutMs); + } else { + result = await this.probeStdio(instance, server, healthCheck, timeoutMs); + } + } catch (err) { + result = { + healthy: false, + latencyMs: Date.now() - start, + message: err instanceof Error ? err.message : String(err), + }; + } + + // Update probe state + const state = this.probeStates.get(instance.id) ?? { consecutiveFailures: 0, lastProbeAt: 0 }; + state.lastProbeAt = Date.now(); + + if (result.healthy) { + state.consecutiveFailures = 0; + } else { + state.consecutiveFailures++; + } + this.probeStates.set(instance.id, state); + + // Determine health status + const healthStatus = result.healthy + ? 'healthy' + : state.consecutiveFailures >= failureThreshold + ? 'unhealthy' + : 'degraded'; + + // Build event + const eventType = result.healthy ? 'Normal' : 'Warning'; + const eventMessage = result.healthy + ? `Health check passed (${result.latencyMs}ms)` + : `Health check failed: ${result.message}`; + + const existingEvents = (instance.events as Array<{ timestamp: string; type: string; message: string }>) ?? []; + // Keep last 50 events + const events = [ + ...existingEvents.slice(-49), + { timestamp: now.toISOString(), type: eventType, message: eventMessage }, + ]; + + // Update instance + await this.instanceRepo.updateStatus(instance.id, instance.status as 'RUNNING', { + healthStatus, + lastHealthCheck: now, + events, + }); + + this.logger?.info( + `[health] ${(instance as unknown as { server?: { name: string } }).server?.name ?? instance.serverId}: ${healthStatus} (${result.latencyMs}ms) - ${eventMessage}`, + ); + + return result; + } + + /** Probe an HTTP/SSE MCP server by sending a JSON-RPC tool call. */ + private async probeHttp( + instance: McpInstance, + healthCheck: HealthCheckSpec, + timeoutMs: number, + ): Promise { + if (!instance.port) { + return { healthy: false, latencyMs: 0, message: 'No port assigned' }; + } + + const start = Date.now(); + + // For HTTP servers, we need to initialize a session first, then call the tool + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + + try { + // Initialize + const initResp = await fetch(`http://localhost:${instance.port}`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' }, + body: JSON.stringify({ + jsonrpc: '2.0', id: 1, method: 'initialize', + params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } }, + }), + signal: controller.signal, + }); + + if (!initResp.ok) { + return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` }; + } + + const sessionId = initResp.headers.get('mcp-session-id'); + const headers: Record = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' }; + if (sessionId) headers['Mcp-Session-Id'] = sessionId; + + // Send initialized notification + await fetch(`http://localhost:${instance.port}`, { + method: 'POST', headers, + body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }), + signal: controller.signal, + }); + + // Call health check tool + const toolResp = await fetch(`http://localhost:${instance.port}`, { + method: 'POST', headers, + body: JSON.stringify({ + jsonrpc: '2.0', id: 2, method: 'tools/call', + params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} }, + }), + signal: controller.signal, + }); + + const latencyMs = Date.now() - start; + + if (!toolResp.ok) { + return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` }; + } + + const body = await toolResp.text(); + // Check for JSON-RPC error in response + try { + const parsed = JSON.parse(body.includes('data: ') ? body.split('data: ')[1]!.split('\n')[0]! : body); + if (parsed.error) { + return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' }; + } + } catch { + // If parsing fails but HTTP was ok, consider it healthy + } + + return { healthy: true, latencyMs, message: 'ok' }; + } finally { + clearTimeout(timer); + } + } + + /** + * Probe a STDIO MCP server by running `docker exec` with a disposable Node.js + * script that pipes JSON-RPC messages into the package binary. + */ + private async probeStdio( + instance: McpInstance, + server: McpServer, + healthCheck: HealthCheckSpec, + timeoutMs: number, + ): Promise { + if (!instance.containerId) { + return { healthy: false, latencyMs: 0, message: 'No container ID' }; + } + + const start = Date.now(); + const packageName = server.packageName as string | null; + + if (!packageName) { + return { healthy: false, latencyMs: 0, message: 'No package name for STDIO server' }; + } + + // Build JSON-RPC messages for the health probe + const initMsg = JSON.stringify({ + jsonrpc: '2.0', id: 1, method: 'initialize', + params: { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'mcpctl-health', version: '0.1.0' }, + }, + }); + const initializedMsg = JSON.stringify({ + jsonrpc: '2.0', method: 'notifications/initialized', + }); + const toolCallMsg = JSON.stringify({ + jsonrpc: '2.0', id: 2, method: 'tools/call', + params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} }, + }); + + // Use a Node.js inline script that: + // 1. Spawns the MCP server binary via npx + // 2. Sends initialize + initialized + tool call via stdin + // 3. Reads responses from stdout + // 4. Exits with 0 if tool call succeeds, 1 if it fails + const probeScript = ` +const { spawn } = require('child_process'); +const proc = spawn('npx', ['--prefer-offline', '-y', ${JSON.stringify(packageName)}], { stdio: ['pipe', 'pipe', 'pipe'] }); +let output = ''; +let responded = false; +proc.stdout.on('data', d => { + output += d; + const lines = output.split('\\n'); + for (const line of lines) { + if (!line.trim()) continue; + try { + const msg = JSON.parse(line); + if (msg.id === 2) { + responded = true; + if (msg.error) { + process.stdout.write('ERROR:' + (msg.error.message || 'unknown')); + proc.kill(); + process.exit(1); + } else { + process.stdout.write('OK'); + proc.kill(); + process.exit(0); + } + } + } catch {} + } + output = lines[lines.length - 1] || ''; +}); +proc.stderr.on('data', () => {}); +proc.on('error', e => { process.stdout.write('ERROR:' + e.message); process.exit(1); }); +proc.on('exit', (code) => { if (!responded) { process.stdout.write('ERROR:process exited ' + code); process.exit(1); } }); +setTimeout(() => { if (!responded) { process.stdout.write('ERROR:timeout'); proc.kill(); process.exit(1); } }, ${timeoutMs - 2000}); +proc.stdin.write(${JSON.stringify(initMsg)} + '\\n'); +setTimeout(() => { + proc.stdin.write(${JSON.stringify(initializedMsg)} + '\\n'); + setTimeout(() => { + proc.stdin.write(${JSON.stringify(toolCallMsg)} + '\\n'); + }, 500); +}, 500); +`.trim(); + + try { + const result = await this.orchestrator.execInContainer( + instance.containerId, + ['node', '-e', probeScript], + { timeoutMs }, + ); + + const latencyMs = Date.now() - start; + + if (result.exitCode === 0 && result.stdout.includes('OK')) { + return { healthy: true, latencyMs, message: 'ok' }; + } + + // Extract error message + const errorMatch = result.stdout.match(/ERROR:(.*)/); + const errorMsg = errorMatch?.[1] ?? (result.stderr.trim() || `exit code ${result.exitCode}`); + return { healthy: false, latencyMs, message: errorMsg }; + } catch (err) { + return { + healthy: false, + latencyMs: Date.now() - start, + message: err instanceof Error ? err.message : String(err), + }; + } + } +} diff --git a/src/mcpd/src/services/index.ts b/src/mcpd/src/services/index.ts index 9800d2e..f775088 100644 --- a/src/mcpd/src/services/index.ts +++ b/src/mcpd/src/services/index.ts @@ -5,7 +5,7 @@ export { ProjectService } from './project.service.js'; export { InstanceService, InvalidStateError } from './instance.service.js'; export { generateMcpConfig } from './mcp-config-generator.js'; export type { McpConfig, McpConfigServer } from './mcp-config-generator.js'; -export type { McpOrchestrator, ContainerSpec, ContainerInfo, ContainerLogs } from './orchestrator.js'; +export type { McpOrchestrator, ContainerSpec, ContainerInfo, ContainerLogs, ExecResult } from './orchestrator.js'; export { DEFAULT_MEMORY_LIMIT, DEFAULT_NANO_CPUS } from './orchestrator.js'; export { DockerContainerManager } from './docker/container-manager.js'; export { AuditLogService } from './audit-log.service.js'; @@ -25,3 +25,5 @@ export type { LoginResult } from './auth.service.js'; export { McpProxyService } from './mcp-proxy-service.js'; export type { McpProxyRequest, McpProxyResponse } from './mcp-proxy-service.js'; export { TemplateService } from './template.service.js'; +export { HealthProbeRunner } from './health-probe.service.js'; +export type { HealthCheckSpec, ProbeResult } from './health-probe.service.js'; diff --git a/src/mcpd/src/services/k8s/kubernetes-orchestrator.ts b/src/mcpd/src/services/k8s/kubernetes-orchestrator.ts index c4b4f82..2fefbdd 100644 --- a/src/mcpd/src/services/k8s/kubernetes-orchestrator.ts +++ b/src/mcpd/src/services/k8s/kubernetes-orchestrator.ts @@ -3,6 +3,7 @@ import type { ContainerSpec, ContainerInfo, ContainerLogs, + ExecResult, } from '../orchestrator.js'; import { K8sClient } from './k8s-client.js'; import type { K8sClientConfig } from './k8s-client.js'; @@ -164,6 +165,15 @@ export class KubernetesOrchestrator implements McpOrchestrator { return { stdout, stderr: '' }; } + async execInContainer( + _containerId: string, + _cmd: string[], + _opts?: { stdin?: string; timeoutMs?: number }, + ): Promise { + // K8s exec via API — future implementation + throw new Error('execInContainer not yet implemented for Kubernetes'); + } + async listContainers(namespace?: string): Promise { const ns = namespace ?? this.namespace; const res = await this.client.get( diff --git a/src/mcpd/src/services/orchestrator.ts b/src/mcpd/src/services/orchestrator.ts index 66147e6..17553de 100644 --- a/src/mcpd/src/services/orchestrator.ts +++ b/src/mcpd/src/services/orchestrator.ts @@ -38,6 +38,12 @@ export interface ContainerLogs { stderr: string; } +export interface ExecResult { + exitCode: number; + stdout: string; + stderr: string; +} + export interface McpOrchestrator { /** Pull an image if not present locally */ pullImage(image: string): Promise; @@ -57,6 +63,9 @@ export interface McpOrchestrator { /** Get container logs */ getContainerLogs(containerId: string, opts?: { tail?: number; since?: number }): Promise; + /** Execute a command inside a running container with optional stdin */ + execInContainer(containerId: string, cmd: string[], opts?: { stdin?: string; timeoutMs?: number }): Promise; + /** Check if the orchestrator runtime is available */ ping(): Promise; } diff --git a/src/mcpd/tests/services/health-probe.test.ts b/src/mcpd/tests/services/health-probe.test.ts new file mode 100644 index 0000000..98df7bc --- /dev/null +++ b/src/mcpd/tests/services/health-probe.test.ts @@ -0,0 +1,355 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { HealthProbeRunner } from '../../src/services/health-probe.service.js'; +import type { HealthCheckSpec } from '../../src/services/health-probe.service.js'; +import type { IMcpInstanceRepository, IMcpServerRepository } from '../../src/repositories/interfaces.js'; +import type { McpOrchestrator, ExecResult } from '../../src/services/orchestrator.js'; +import type { McpInstance, McpServer } from '@prisma/client'; + +function makeInstance(overrides: Partial = {}): McpInstance { + return { + id: 'inst-1', + serverId: 'srv-1', + status: 'RUNNING', + containerId: 'container-abc', + port: null, + healthStatus: null, + lastHealthCheck: null, + events: [], + metadata: {}, + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + } as McpInstance; +} + +function makeServer(overrides: Partial = {}): McpServer { + return { + id: 'srv-1', + name: 'my-grafana', + transport: 'STDIO', + packageName: '@leval/mcp-grafana', + dockerImage: null, + externalUrl: null, + containerPort: null, + repositoryUrl: null, + description: null, + command: null, + env: [], + replicas: 1, + projectId: null, + healthCheck: { + tool: 'list_datasources', + arguments: {}, + intervalSeconds: 60, + timeoutSeconds: 10, + failureThreshold: 3, + }, + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + } as McpServer; +} + +function mockInstanceRepo(): IMcpInstanceRepository { + return { + findAll: vi.fn(async () => []), + findById: vi.fn(async () => null), + findByContainerId: vi.fn(async () => null), + create: vi.fn(async (data) => makeInstance(data)), + updateStatus: vi.fn(async (id, status, fields) => makeInstance({ id, status, ...fields })), + delete: vi.fn(async () => {}), + }; +} + +function mockServerRepo(): IMcpServerRepository { + return { + findAll: vi.fn(async () => []), + findById: vi.fn(async () => null), + findByName: vi.fn(async () => null), + create: vi.fn(async () => makeServer()), + update: vi.fn(async () => makeServer()), + delete: vi.fn(async () => {}), + }; +} + +function mockOrchestrator(): McpOrchestrator { + return { + pullImage: vi.fn(async () => {}), + createContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })), + stopContainer: vi.fn(async () => {}), + removeContainer: vi.fn(async () => {}), + inspectContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })), + getContainerLogs: vi.fn(async () => ({ stdout: '', stderr: '' })), + execInContainer: vi.fn(async () => ({ exitCode: 0, stdout: 'OK', stderr: '' })), + ping: vi.fn(async () => true), + }; +} + +describe('HealthProbeRunner', () => { + let instanceRepo: IMcpInstanceRepository; + let serverRepo: IMcpServerRepository; + let orchestrator: McpOrchestrator; + let runner: HealthProbeRunner; + + beforeEach(() => { + instanceRepo = mockInstanceRepo(); + serverRepo = mockServerRepo(); + orchestrator = mockOrchestrator(); + runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator); + }); + + it('skips instances without healthCheck config', async () => { + const instance = makeInstance(); + const server = makeServer({ healthCheck: null }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + + await runner.tick(); + + expect(orchestrator.execInContainer).not.toHaveBeenCalled(); + expect(instanceRepo.updateStatus).not.toHaveBeenCalled(); + }); + + it('skips non-RUNNING instances', async () => { + const instance = makeInstance({ status: 'ERROR' }); + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + + await runner.tick(); + + expect(serverRepo.findById).not.toHaveBeenCalled(); + }); + + it('probes STDIO instance with exec and marks healthy on success', async () => { + const instance = makeInstance(); + const server = makeServer(); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 0, + stdout: 'OK', + stderr: '', + }); + + await runner.tick(); + + expect(orchestrator.execInContainer).toHaveBeenCalledWith( + 'container-abc', + expect.arrayContaining(['node', '-e']), + expect.objectContaining({ timeoutMs: 10000 }), + ); + + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ + healthStatus: 'healthy', + lastHealthCheck: expect.any(Date), + events: expect.arrayContaining([ + expect.objectContaining({ type: 'Normal', message: expect.stringContaining('passed') }), + ]), + }), + ); + }); + + it('marks unhealthy after failureThreshold consecutive failures', async () => { + const instance = makeInstance(); + const healthCheck: HealthCheckSpec = { + tool: 'list_datasources', + arguments: {}, + intervalSeconds: 0, // always due + failureThreshold: 2, + }; + const server = makeServer({ healthCheck: healthCheck as unknown as undefined }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 1, + stdout: 'ERROR:connection refused', + stderr: '', + }); + + // First failure → degraded + await runner.tick(); + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ healthStatus: 'degraded' }), + ); + + // Second failure → unhealthy (threshold = 2) + await runner.tick(); + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ healthStatus: 'unhealthy' }), + ); + }); + + it('resets failure count on success', async () => { + const instance = makeInstance(); + const healthCheck: HealthCheckSpec = { + tool: 'list_datasources', + arguments: {}, + intervalSeconds: 0, + failureThreshold: 3, + }; + const server = makeServer({ healthCheck: healthCheck as unknown as undefined }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + + // Two failures + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 1, stdout: 'ERROR:fail', stderr: '', + }); + await runner.tick(); + await runner.tick(); + + // Then success — should reset to healthy + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 0, stdout: 'OK', stderr: '', + }); + await runner.tick(); + + const lastCall = vi.mocked(instanceRepo.updateStatus).mock.calls.at(-1); + expect(lastCall?.[2]).toEqual(expect.objectContaining({ healthStatus: 'healthy' })); + }); + + it('handles exec timeout as failure', async () => { + const instance = makeInstance(); + const server = makeServer(); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(orchestrator.execInContainer).mockRejectedValue(new Error('Exec timed out after 10000ms')); + + await runner.tick(); + + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ + healthStatus: 'degraded', + events: expect.arrayContaining([ + expect.objectContaining({ type: 'Warning', message: expect.stringContaining('timed out') }), + ]), + }), + ); + }); + + it('appends events without losing history', async () => { + const existingEvents = [ + { timestamp: '2025-01-01T00:00:00Z', type: 'Normal', message: 'old event' }, + ]; + const instance = makeInstance({ events: existingEvents }); + const server = makeServer({ + healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'], + }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 0, stdout: 'OK', stderr: '', + }); + + await runner.tick(); + + const events = vi.mocked(instanceRepo.updateStatus).mock.calls[0]?.[2]?.events as unknown[]; + expect(events).toHaveLength(2); + expect((events[0] as { message: string }).message).toBe('old event'); + expect((events[1] as { message: string }).message).toContain('passed'); + }); + + it('respects interval — skips probing if not due', async () => { + const instance = makeInstance(); + const server = makeServer({ + healthCheck: { tool: 'test', intervalSeconds: 300 } as McpServer['healthCheck'], + }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 0, stdout: 'OK', stderr: '', + }); + + // First tick: should probe + await runner.tick(); + expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); + + // Second tick immediately: should skip (300s interval not elapsed) + await runner.tick(); + expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); + }); + + it('cleans up probe states for removed instances', async () => { + const instance = makeInstance(); + const server = makeServer({ + healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'], + }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + + await runner.tick(); + expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1); + + // Instance removed + vi.mocked(instanceRepo.findAll).mockResolvedValue([]); + await runner.tick(); + + // Re-add same instance — should probe again (state was cleaned) + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + await runner.tick(); + expect(orchestrator.execInContainer).toHaveBeenCalledTimes(2); + }); + + it('skips STDIO instances without containerId', async () => { + const instance = makeInstance({ containerId: null }); + const server = makeServer(); + + // containerId is null, but status is RUNNING — shouldn't be probed + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + + await runner.tick(); + expect(serverRepo.findById).not.toHaveBeenCalled(); + }); + + it('probeInstance returns result directly', async () => { + const instance = makeInstance(); + const server = makeServer(); + const healthCheck: HealthCheckSpec = { + tool: 'list_datasources', + arguments: {}, + }; + + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 0, stdout: 'OK', stderr: '', + }); + + const result = await runner.probeInstance(instance, server, healthCheck); + expect(result.healthy).toBe(true); + expect(result.latencyMs).toBeGreaterThanOrEqual(0); + expect(result.message).toBe('ok'); + }); + + it('handles STDIO exec failure with error message', async () => { + const instance = makeInstance(); + const server = makeServer(); + const healthCheck: HealthCheckSpec = { tool: 'list_datasources', arguments: {} }; + + vi.mocked(orchestrator.execInContainer).mockResolvedValue({ + exitCode: 1, + stdout: 'ERROR:ECONNREFUSED 10.0.0.1:3000', + stderr: '', + }); + + const result = await runner.probeInstance(instance, server, healthCheck); + expect(result.healthy).toBe(false); + expect(result.message).toBe('ECONNREFUSED 10.0.0.1:3000'); + }); +});