feat: MCP health probe runner — periodic tool-call probes for instances
Implements Kubernetes-style liveness probes that call MCP tools defined in server healthCheck configs. For STDIO servers, uses docker exec to spawn a disposable MCP client that sends initialize + tool call. For HTTP/SSE servers, sends JSON-RPC directly. - HealthProbeRunner service with configurable interval/threshold/timeout - execInContainer added to orchestrator interface + Docker implementation - Instance findById now includes server relation (fixes describe showing IDs) - Events appended to instance (last 50), healthStatus tracked as healthy/degraded/unhealthy - 12 unit tests covering probing, thresholds, intervals, cleanup Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
355
src/mcpd/tests/services/health-probe.test.ts
Normal file
355
src/mcpd/tests/services/health-probe.test.ts
Normal file
@@ -0,0 +1,355 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { HealthProbeRunner } from '../../src/services/health-probe.service.js';
|
||||
import type { HealthCheckSpec } from '../../src/services/health-probe.service.js';
|
||||
import type { IMcpInstanceRepository, IMcpServerRepository } from '../../src/repositories/interfaces.js';
|
||||
import type { McpOrchestrator, ExecResult } from '../../src/services/orchestrator.js';
|
||||
import type { McpInstance, McpServer } from '@prisma/client';
|
||||
|
||||
function makeInstance(overrides: Partial<McpInstance> = {}): McpInstance {
|
||||
return {
|
||||
id: 'inst-1',
|
||||
serverId: 'srv-1',
|
||||
status: 'RUNNING',
|
||||
containerId: 'container-abc',
|
||||
port: null,
|
||||
healthStatus: null,
|
||||
lastHealthCheck: null,
|
||||
events: [],
|
||||
metadata: {},
|
||||
version: 1,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
...overrides,
|
||||
} as McpInstance;
|
||||
}
|
||||
|
||||
function makeServer(overrides: Partial<McpServer> = {}): McpServer {
|
||||
return {
|
||||
id: 'srv-1',
|
||||
name: 'my-grafana',
|
||||
transport: 'STDIO',
|
||||
packageName: '@leval/mcp-grafana',
|
||||
dockerImage: null,
|
||||
externalUrl: null,
|
||||
containerPort: null,
|
||||
repositoryUrl: null,
|
||||
description: null,
|
||||
command: null,
|
||||
env: [],
|
||||
replicas: 1,
|
||||
projectId: null,
|
||||
healthCheck: {
|
||||
tool: 'list_datasources',
|
||||
arguments: {},
|
||||
intervalSeconds: 60,
|
||||
timeoutSeconds: 10,
|
||||
failureThreshold: 3,
|
||||
},
|
||||
version: 1,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
...overrides,
|
||||
} as McpServer;
|
||||
}
|
||||
|
||||
function mockInstanceRepo(): IMcpInstanceRepository {
|
||||
return {
|
||||
findAll: vi.fn(async () => []),
|
||||
findById: vi.fn(async () => null),
|
||||
findByContainerId: vi.fn(async () => null),
|
||||
create: vi.fn(async (data) => makeInstance(data)),
|
||||
updateStatus: vi.fn(async (id, status, fields) => makeInstance({ id, status, ...fields })),
|
||||
delete: vi.fn(async () => {}),
|
||||
};
|
||||
}
|
||||
|
||||
function mockServerRepo(): IMcpServerRepository {
|
||||
return {
|
||||
findAll: vi.fn(async () => []),
|
||||
findById: vi.fn(async () => null),
|
||||
findByName: vi.fn(async () => null),
|
||||
create: vi.fn(async () => makeServer()),
|
||||
update: vi.fn(async () => makeServer()),
|
||||
delete: vi.fn(async () => {}),
|
||||
};
|
||||
}
|
||||
|
||||
function mockOrchestrator(): McpOrchestrator {
|
||||
return {
|
||||
pullImage: vi.fn(async () => {}),
|
||||
createContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })),
|
||||
stopContainer: vi.fn(async () => {}),
|
||||
removeContainer: vi.fn(async () => {}),
|
||||
inspectContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })),
|
||||
getContainerLogs: vi.fn(async () => ({ stdout: '', stderr: '' })),
|
||||
execInContainer: vi.fn(async () => ({ exitCode: 0, stdout: 'OK', stderr: '' })),
|
||||
ping: vi.fn(async () => true),
|
||||
};
|
||||
}
|
||||
|
||||
describe('HealthProbeRunner', () => {
|
||||
let instanceRepo: IMcpInstanceRepository;
|
||||
let serverRepo: IMcpServerRepository;
|
||||
let orchestrator: McpOrchestrator;
|
||||
let runner: HealthProbeRunner;
|
||||
|
||||
beforeEach(() => {
|
||||
instanceRepo = mockInstanceRepo();
|
||||
serverRepo = mockServerRepo();
|
||||
orchestrator = mockOrchestrator();
|
||||
runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator);
|
||||
});
|
||||
|
||||
it('skips instances without healthCheck config', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer({ healthCheck: null });
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
|
||||
await runner.tick();
|
||||
|
||||
expect(orchestrator.execInContainer).not.toHaveBeenCalled();
|
||||
expect(instanceRepo.updateStatus).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('skips non-RUNNING instances', async () => {
|
||||
const instance = makeInstance({ status: 'ERROR' });
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
|
||||
await runner.tick();
|
||||
|
||||
expect(serverRepo.findById).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('probes STDIO instance with exec and marks healthy on success', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer();
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 0,
|
||||
stdout: 'OK',
|
||||
stderr: '',
|
||||
});
|
||||
|
||||
await runner.tick();
|
||||
|
||||
expect(orchestrator.execInContainer).toHaveBeenCalledWith(
|
||||
'container-abc',
|
||||
expect.arrayContaining(['node', '-e']),
|
||||
expect.objectContaining({ timeoutMs: 10000 }),
|
||||
);
|
||||
|
||||
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||
'inst-1',
|
||||
'RUNNING',
|
||||
expect.objectContaining({
|
||||
healthStatus: 'healthy',
|
||||
lastHealthCheck: expect.any(Date),
|
||||
events: expect.arrayContaining([
|
||||
expect.objectContaining({ type: 'Normal', message: expect.stringContaining('passed') }),
|
||||
]),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('marks unhealthy after failureThreshold consecutive failures', async () => {
|
||||
const instance = makeInstance();
|
||||
const healthCheck: HealthCheckSpec = {
|
||||
tool: 'list_datasources',
|
||||
arguments: {},
|
||||
intervalSeconds: 0, // always due
|
||||
failureThreshold: 2,
|
||||
};
|
||||
const server = makeServer({ healthCheck: healthCheck as unknown as undefined });
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 1,
|
||||
stdout: 'ERROR:connection refused',
|
||||
stderr: '',
|
||||
});
|
||||
|
||||
// First failure → degraded
|
||||
await runner.tick();
|
||||
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||
'inst-1',
|
||||
'RUNNING',
|
||||
expect.objectContaining({ healthStatus: 'degraded' }),
|
||||
);
|
||||
|
||||
// Second failure → unhealthy (threshold = 2)
|
||||
await runner.tick();
|
||||
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||
'inst-1',
|
||||
'RUNNING',
|
||||
expect.objectContaining({ healthStatus: 'unhealthy' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('resets failure count on success', async () => {
|
||||
const instance = makeInstance();
|
||||
const healthCheck: HealthCheckSpec = {
|
||||
tool: 'list_datasources',
|
||||
arguments: {},
|
||||
intervalSeconds: 0,
|
||||
failureThreshold: 3,
|
||||
};
|
||||
const server = makeServer({ healthCheck: healthCheck as unknown as undefined });
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
|
||||
// Two failures
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 1, stdout: 'ERROR:fail', stderr: '',
|
||||
});
|
||||
await runner.tick();
|
||||
await runner.tick();
|
||||
|
||||
// Then success — should reset to healthy
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 0, stdout: 'OK', stderr: '',
|
||||
});
|
||||
await runner.tick();
|
||||
|
||||
const lastCall = vi.mocked(instanceRepo.updateStatus).mock.calls.at(-1);
|
||||
expect(lastCall?.[2]).toEqual(expect.objectContaining({ healthStatus: 'healthy' }));
|
||||
});
|
||||
|
||||
it('handles exec timeout as failure', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer();
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(orchestrator.execInContainer).mockRejectedValue(new Error('Exec timed out after 10000ms'));
|
||||
|
||||
await runner.tick();
|
||||
|
||||
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||
'inst-1',
|
||||
'RUNNING',
|
||||
expect.objectContaining({
|
||||
healthStatus: 'degraded',
|
||||
events: expect.arrayContaining([
|
||||
expect.objectContaining({ type: 'Warning', message: expect.stringContaining('timed out') }),
|
||||
]),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('appends events without losing history', async () => {
|
||||
const existingEvents = [
|
||||
{ timestamp: '2025-01-01T00:00:00Z', type: 'Normal', message: 'old event' },
|
||||
];
|
||||
const instance = makeInstance({ events: existingEvents });
|
||||
const server = makeServer({
|
||||
healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'],
|
||||
});
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 0, stdout: 'OK', stderr: '',
|
||||
});
|
||||
|
||||
await runner.tick();
|
||||
|
||||
const events = vi.mocked(instanceRepo.updateStatus).mock.calls[0]?.[2]?.events as unknown[];
|
||||
expect(events).toHaveLength(2);
|
||||
expect((events[0] as { message: string }).message).toBe('old event');
|
||||
expect((events[1] as { message: string }).message).toContain('passed');
|
||||
});
|
||||
|
||||
it('respects interval — skips probing if not due', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer({
|
||||
healthCheck: { tool: 'test', intervalSeconds: 300 } as McpServer['healthCheck'],
|
||||
});
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 0, stdout: 'OK', stderr: '',
|
||||
});
|
||||
|
||||
// First tick: should probe
|
||||
await runner.tick();
|
||||
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Second tick immediately: should skip (300s interval not elapsed)
|
||||
await runner.tick();
|
||||
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('cleans up probe states for removed instances', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer({
|
||||
healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'],
|
||||
});
|
||||
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
|
||||
await runner.tick();
|
||||
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Instance removed
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([]);
|
||||
await runner.tick();
|
||||
|
||||
// Re-add same instance — should probe again (state was cleaned)
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
await runner.tick();
|
||||
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('skips STDIO instances without containerId', async () => {
|
||||
const instance = makeInstance({ containerId: null });
|
||||
const server = makeServer();
|
||||
|
||||
// containerId is null, but status is RUNNING — shouldn't be probed
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||
|
||||
await runner.tick();
|
||||
expect(serverRepo.findById).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('probeInstance returns result directly', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer();
|
||||
const healthCheck: HealthCheckSpec = {
|
||||
tool: 'list_datasources',
|
||||
arguments: {},
|
||||
};
|
||||
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 0, stdout: 'OK', stderr: '',
|
||||
});
|
||||
|
||||
const result = await runner.probeInstance(instance, server, healthCheck);
|
||||
expect(result.healthy).toBe(true);
|
||||
expect(result.latencyMs).toBeGreaterThanOrEqual(0);
|
||||
expect(result.message).toBe('ok');
|
||||
});
|
||||
|
||||
it('handles STDIO exec failure with error message', async () => {
|
||||
const instance = makeInstance();
|
||||
const server = makeServer();
|
||||
const healthCheck: HealthCheckSpec = { tool: 'list_datasources', arguments: {} };
|
||||
|
||||
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||
exitCode: 1,
|
||||
stdout: 'ERROR:ECONNREFUSED 10.0.0.1:3000',
|
||||
stderr: '',
|
||||
});
|
||||
|
||||
const result = await runner.probeInstance(instance, server, healthCheck);
|
||||
expect(result.healthy).toBe(false);
|
||||
expect(result.message).toBe('ECONNREFUSED 10.0.0.1:3000');
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user