Compare commits
9 Commits
feat/conta
...
fix/db-tes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a6e58274c | ||
|
|
c819b65175 | ||
|
|
c3ef5a664f | ||
|
|
4c2927a16e | ||
| 79dd6e723d | |||
|
|
cde1c59fd6 | ||
| daa5860ed2 | |||
|
|
ecbf48dd49 | ||
| d38b5aac60 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -37,3 +37,4 @@ pgdata/
|
|||||||
|
|
||||||
# Prisma
|
# Prisma
|
||||||
src/db/prisma/migrations/*.sql.backup
|
src/db/prisma/migrations/*.sql.backup
|
||||||
|
logs.sh
|
||||||
|
|||||||
@@ -74,9 +74,10 @@ function formatServerDetail(server: Record<string, unknown>): string {
|
|||||||
|
|
||||||
function formatInstanceDetail(instance: Record<string, unknown>, inspect?: Record<string, unknown>): string {
|
function formatInstanceDetail(instance: Record<string, unknown>, inspect?: Record<string, unknown>): string {
|
||||||
const lines: string[] = [];
|
const lines: string[] = [];
|
||||||
lines.push(`=== Instance: ${instance.id} ===`);
|
const server = instance.server as { name: string } | undefined;
|
||||||
|
lines.push(`=== Instance: ${server?.name ?? instance.id} ===`);
|
||||||
lines.push(`${pad('Status:')}${instance.status}`);
|
lines.push(`${pad('Status:')}${instance.status}`);
|
||||||
lines.push(`${pad('Server ID:')}${instance.serverId}`);
|
lines.push(`${pad('Server:')}${server?.name ?? String(instance.serverId)}`);
|
||||||
lines.push(`${pad('Container ID:')}${instance.containerId ?? '-'}`);
|
lines.push(`${pad('Container ID:')}${instance.containerId ?? '-'}`);
|
||||||
lines.push(`${pad('Port:')}${instance.port ?? '-'}`);
|
lines.push(`${pad('Port:')}${instance.port ?? '-'}`);
|
||||||
|
|
||||||
@@ -277,11 +278,33 @@ export function createDescribeCommand(deps: DescribeCommandDeps): Command {
|
|||||||
|
|
||||||
// Resolve name → ID
|
// Resolve name → ID
|
||||||
let id: string;
|
let id: string;
|
||||||
|
if (resource === 'instances') {
|
||||||
|
// Instances: accept instance ID or server name (resolve to first running instance)
|
||||||
|
try {
|
||||||
|
id = await resolveNameOrId(deps.client, resource, idOrName);
|
||||||
|
} catch {
|
||||||
|
// Not an instance ID — try as server name
|
||||||
|
const servers = await deps.client.get<Array<{ id: string; name: string }>>('/api/v1/servers');
|
||||||
|
const server = servers.find((s) => s.name === idOrName || s.id === idOrName);
|
||||||
|
if (server) {
|
||||||
|
const instances = await deps.client.get<Array<{ id: string; status: string }>>(`/api/v1/instances?serverId=${server.id}`);
|
||||||
|
const running = instances.find((i) => i.status === 'RUNNING') ?? instances[0];
|
||||||
|
if (running) {
|
||||||
|
id = running.id;
|
||||||
|
} else {
|
||||||
|
throw new Error(`No instances found for server '${idOrName}'`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
id = idOrName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
try {
|
try {
|
||||||
id = await resolveNameOrId(deps.client, resource, idOrName);
|
id = await resolveNameOrId(deps.client, resource, idOrName);
|
||||||
} catch {
|
} catch {
|
||||||
id = idOrName;
|
id = idOrName;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const item = await deps.fetchResource(resource, id) as Record<string, unknown>;
|
const item = await deps.fetchResource(resource, id) as Record<string, unknown>;
|
||||||
|
|
||||||
|
|||||||
@@ -139,4 +139,152 @@ describe('describe command', () => {
|
|||||||
expect(text).toContain('RUNNING');
|
expect(text).toContain('RUNNING');
|
||||||
expect(text).toContain('abc123');
|
expect(text).toContain('abc123');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('resolves server name to instance for describe instance', async () => {
|
||||||
|
const deps = makeDeps({
|
||||||
|
id: 'inst-1',
|
||||||
|
serverId: 'srv-1',
|
||||||
|
server: { name: 'my-grafana' },
|
||||||
|
status: 'RUNNING',
|
||||||
|
containerId: 'abc123',
|
||||||
|
port: 3000,
|
||||||
|
});
|
||||||
|
// resolveNameOrId will throw (not a CUID, name won't match instances)
|
||||||
|
vi.mocked(deps.client.get)
|
||||||
|
.mockResolvedValueOnce([] as never) // instances list (no name match)
|
||||||
|
.mockResolvedValueOnce([{ id: 'srv-1', name: 'my-grafana' }] as never) // servers list
|
||||||
|
.mockResolvedValueOnce([{ id: 'inst-1', status: 'RUNNING' }] as never); // instances for server
|
||||||
|
|
||||||
|
const cmd = createDescribeCommand(deps);
|
||||||
|
await cmd.parseAsync(['node', 'test', 'instance', 'my-grafana']);
|
||||||
|
|
||||||
|
expect(deps.fetchResource).toHaveBeenCalledWith('instances', 'inst-1');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('resolves server name and picks running instance over stopped', async () => {
|
||||||
|
const deps = makeDeps({
|
||||||
|
id: 'inst-2',
|
||||||
|
serverId: 'srv-1',
|
||||||
|
server: { name: 'my-ha' },
|
||||||
|
status: 'RUNNING',
|
||||||
|
containerId: 'def456',
|
||||||
|
});
|
||||||
|
vi.mocked(deps.client.get)
|
||||||
|
.mockResolvedValueOnce([] as never) // instances list
|
||||||
|
.mockResolvedValueOnce([{ id: 'srv-1', name: 'my-ha' }] as never)
|
||||||
|
.mockResolvedValueOnce([
|
||||||
|
{ id: 'inst-1', status: 'ERROR' },
|
||||||
|
{ id: 'inst-2', status: 'RUNNING' },
|
||||||
|
] as never);
|
||||||
|
|
||||||
|
const cmd = createDescribeCommand(deps);
|
||||||
|
await cmd.parseAsync(['node', 'test', 'instance', 'my-ha']);
|
||||||
|
|
||||||
|
expect(deps.fetchResource).toHaveBeenCalledWith('instances', 'inst-2');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('throws when no instances found for server name', async () => {
|
||||||
|
const deps = makeDeps();
|
||||||
|
vi.mocked(deps.client.get)
|
||||||
|
.mockResolvedValueOnce([] as never) // instances list
|
||||||
|
.mockResolvedValueOnce([{ id: 'srv-1', name: 'my-server' }] as never)
|
||||||
|
.mockResolvedValueOnce([] as never); // no instances
|
||||||
|
|
||||||
|
const cmd = createDescribeCommand(deps);
|
||||||
|
await expect(cmd.parseAsync(['node', 'test', 'instance', 'my-server'])).rejects.toThrow(
|
||||||
|
/No instances found/,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('shows instance with server name in header', async () => {
|
||||||
|
const deps = makeDeps({
|
||||||
|
id: 'inst-1',
|
||||||
|
serverId: 'srv-1',
|
||||||
|
server: { name: 'my-grafana' },
|
||||||
|
status: 'RUNNING',
|
||||||
|
containerId: 'abc123',
|
||||||
|
port: 3000,
|
||||||
|
});
|
||||||
|
const cmd = createDescribeCommand(deps);
|
||||||
|
await cmd.parseAsync(['node', 'test', 'instance', 'inst-1']);
|
||||||
|
|
||||||
|
const text = deps.output.join('\n');
|
||||||
|
expect(text).toContain('=== Instance: my-grafana ===');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('shows instance health and events', async () => {
|
||||||
|
const deps = makeDeps({
|
||||||
|
id: 'inst-1',
|
||||||
|
serverId: 'srv-1',
|
||||||
|
server: { name: 'my-grafana' },
|
||||||
|
status: 'RUNNING',
|
||||||
|
containerId: 'abc123',
|
||||||
|
healthStatus: 'healthy',
|
||||||
|
lastHealthCheck: '2025-01-15T10:30:00Z',
|
||||||
|
events: [
|
||||||
|
{ timestamp: '2025-01-15T10:30:00Z', type: 'Normal', message: 'Health check passed (45ms)' },
|
||||||
|
],
|
||||||
|
});
|
||||||
|
const cmd = createDescribeCommand(deps);
|
||||||
|
await cmd.parseAsync(['node', 'test', 'instance', 'inst-1']);
|
||||||
|
|
||||||
|
const text = deps.output.join('\n');
|
||||||
|
expect(text).toContain('Health:');
|
||||||
|
expect(text).toContain('healthy');
|
||||||
|
expect(text).toContain('Events:');
|
||||||
|
expect(text).toContain('Health check passed');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('shows server healthCheck section', async () => {
|
||||||
|
const deps = makeDeps({
|
||||||
|
id: 'srv-1',
|
||||||
|
name: 'my-grafana',
|
||||||
|
transport: 'STDIO',
|
||||||
|
healthCheck: {
|
||||||
|
tool: 'list_datasources',
|
||||||
|
arguments: {},
|
||||||
|
intervalSeconds: 60,
|
||||||
|
timeoutSeconds: 10,
|
||||||
|
failureThreshold: 3,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const cmd = createDescribeCommand(deps);
|
||||||
|
await cmd.parseAsync(['node', 'test', 'server', 'srv-1']);
|
||||||
|
|
||||||
|
const text = deps.output.join('\n');
|
||||||
|
expect(text).toContain('Health Check:');
|
||||||
|
expect(text).toContain('list_datasources');
|
||||||
|
expect(text).toContain('60s');
|
||||||
|
expect(text).toContain('Failure Threshold:');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('shows template detail with healthCheck and usage', async () => {
|
||||||
|
const deps = makeDeps({
|
||||||
|
id: 'tpl-1',
|
||||||
|
name: 'grafana',
|
||||||
|
transport: 'STDIO',
|
||||||
|
version: '1.0.0',
|
||||||
|
packageName: '@leval/mcp-grafana',
|
||||||
|
env: [
|
||||||
|
{ name: 'GRAFANA_URL', required: true, description: 'Grafana instance URL' },
|
||||||
|
],
|
||||||
|
healthCheck: {
|
||||||
|
tool: 'list_datasources',
|
||||||
|
arguments: {},
|
||||||
|
intervalSeconds: 60,
|
||||||
|
timeoutSeconds: 10,
|
||||||
|
failureThreshold: 3,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const cmd = createDescribeCommand(deps);
|
||||||
|
await cmd.parseAsync(['node', 'test', 'template', 'tpl-1']);
|
||||||
|
|
||||||
|
const text = deps.output.join('\n');
|
||||||
|
expect(text).toContain('=== Template: grafana ===');
|
||||||
|
expect(text).toContain('@leval/mcp-grafana');
|
||||||
|
expect(text).toContain('GRAFANA_URL');
|
||||||
|
expect(text).toContain('Health Check:');
|
||||||
|
expect(text).toContain('list_datasources');
|
||||||
|
expect(text).toContain('mcpctl create server my-grafana --from-template=grafana');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ async function createUser(overrides: { email?: string; name?: string; role?: 'US
|
|||||||
data: {
|
data: {
|
||||||
email: overrides.email ?? `test-${Date.now()}@example.com`,
|
email: overrides.email ?? `test-${Date.now()}@example.com`,
|
||||||
name: overrides.name ?? 'Test User',
|
name: overrides.name ?? 'Test User',
|
||||||
|
passwordHash: '$2b$10$test-hash-placeholder',
|
||||||
role: overrides.role ?? 'USER',
|
role: overrides.role ?? 'USER',
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import {
|
|||||||
AuthService,
|
AuthService,
|
||||||
McpProxyService,
|
McpProxyService,
|
||||||
TemplateService,
|
TemplateService,
|
||||||
|
HealthProbeRunner,
|
||||||
} from './services/index.js';
|
} from './services/index.js';
|
||||||
import {
|
import {
|
||||||
registerMcpServerRoutes,
|
registerMcpServerRoutes,
|
||||||
@@ -144,10 +145,20 @@ async function main(): Promise<void> {
|
|||||||
}
|
}
|
||||||
}, SYNC_INTERVAL_MS);
|
}, SYNC_INTERVAL_MS);
|
||||||
|
|
||||||
|
// Health probe runner — periodic MCP tool-call probes (like k8s livenessProbe)
|
||||||
|
const healthProbeRunner = new HealthProbeRunner(
|
||||||
|
instanceRepo,
|
||||||
|
serverRepo,
|
||||||
|
orchestrator,
|
||||||
|
{ info: (msg) => app.log.info(msg), error: (obj, msg) => app.log.error(obj, msg) },
|
||||||
|
);
|
||||||
|
healthProbeRunner.start(15_000);
|
||||||
|
|
||||||
// Graceful shutdown
|
// Graceful shutdown
|
||||||
setupGracefulShutdown(app, {
|
setupGracefulShutdown(app, {
|
||||||
disconnectDb: async () => {
|
disconnectDb: async () => {
|
||||||
clearInterval(syncTimer);
|
clearInterval(syncTimer);
|
||||||
|
healthProbeRunner.stop();
|
||||||
await prisma.$disconnect();
|
await prisma.$disconnect();
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -17,7 +17,10 @@ export class McpInstanceRepository implements IMcpInstanceRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async findById(id: string): Promise<McpInstance | null> {
|
async findById(id: string): Promise<McpInstance | null> {
|
||||||
return this.prisma.mcpInstance.findUnique({ where: { id } });
|
return this.prisma.mcpInstance.findUnique({
|
||||||
|
where: { id },
|
||||||
|
include: { server: { select: { name: true } } },
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async findByContainerId(containerId: string): Promise<McpInstance | null> {
|
async findByContainerId(containerId: string): Promise<McpInstance | null> {
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
import Docker from 'dockerode';
|
import Docker from 'dockerode';
|
||||||
|
import { PassThrough } from 'node:stream';
|
||||||
import type {
|
import type {
|
||||||
McpOrchestrator,
|
McpOrchestrator,
|
||||||
ContainerSpec,
|
ContainerSpec,
|
||||||
ContainerInfo,
|
ContainerInfo,
|
||||||
ContainerLogs,
|
ContainerLogs,
|
||||||
|
ExecResult,
|
||||||
} from '../orchestrator.js';
|
} from '../orchestrator.js';
|
||||||
import { DEFAULT_MEMORY_LIMIT } from '../orchestrator.js';
|
import { DEFAULT_MEMORY_LIMIT } from '../orchestrator.js';
|
||||||
|
|
||||||
@@ -80,6 +82,9 @@ export class DockerContainerManager implements McpOrchestrator {
|
|||||||
Env: envArr,
|
Env: envArr,
|
||||||
ExposedPorts: exposedPorts,
|
ExposedPorts: exposedPorts,
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
|
// Keep stdin open for STDIO MCP servers (they read from stdin)
|
||||||
|
OpenStdin: true,
|
||||||
|
StdinOnce: false,
|
||||||
HostConfig: {
|
HostConfig: {
|
||||||
PortBindings: portBindings,
|
PortBindings: portBindings,
|
||||||
Memory: memoryLimit,
|
Memory: memoryLimit,
|
||||||
@@ -133,6 +138,19 @@ export class DockerContainerManager implements McpOrchestrator {
|
|||||||
if (port !== undefined) {
|
if (port !== undefined) {
|
||||||
result.port = port;
|
result.port = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extract container IP from first non-default network
|
||||||
|
const networks = info.NetworkSettings?.Networks;
|
||||||
|
if (networks) {
|
||||||
|
for (const [, net] of Object.entries(networks)) {
|
||||||
|
const netInfo = net as { IPAddress?: string };
|
||||||
|
if (netInfo.IPAddress) {
|
||||||
|
result.ip = netInfo.IPAddress;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -158,4 +176,67 @@ export class DockerContainerManager implements McpOrchestrator {
|
|||||||
// For simplicity we return everything as stdout.
|
// For simplicity we return everything as stdout.
|
||||||
return { stdout: raw, stderr: '' };
|
return { stdout: raw, stderr: '' };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async execInContainer(
|
||||||
|
containerId: string,
|
||||||
|
cmd: string[],
|
||||||
|
opts?: { stdin?: string; timeoutMs?: number },
|
||||||
|
): Promise<ExecResult> {
|
||||||
|
const container = this.docker.getContainer(containerId);
|
||||||
|
const hasStdin = opts?.stdin !== undefined;
|
||||||
|
|
||||||
|
const exec = await container.exec({
|
||||||
|
Cmd: cmd,
|
||||||
|
AttachStdin: hasStdin,
|
||||||
|
AttachStdout: true,
|
||||||
|
AttachStderr: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const stream = await exec.start({ hijack: hasStdin, stdin: hasStdin });
|
||||||
|
const timeoutMs = opts?.timeoutMs ?? 30_000;
|
||||||
|
|
||||||
|
return new Promise<ExecResult>((resolve, reject) => {
|
||||||
|
const stdout = new PassThrough();
|
||||||
|
const stderr = new PassThrough();
|
||||||
|
const stdoutChunks: Buffer[] = [];
|
||||||
|
const stderrChunks: Buffer[] = [];
|
||||||
|
|
||||||
|
stdout.on('data', (chunk: Buffer) => stdoutChunks.push(chunk));
|
||||||
|
stderr.on('data', (chunk: Buffer) => stderrChunks.push(chunk));
|
||||||
|
|
||||||
|
this.docker.modem.demuxStream(stream, stdout, stderr);
|
||||||
|
|
||||||
|
if (hasStdin) {
|
||||||
|
stream.write(opts!.stdin);
|
||||||
|
stream.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
stream.destroy();
|
||||||
|
reject(new Error(`Exec timed out after ${timeoutMs}ms`));
|
||||||
|
}, timeoutMs);
|
||||||
|
|
||||||
|
stream.on('end', () => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
exec.inspect().then((info) => {
|
||||||
|
resolve({
|
||||||
|
exitCode: (info as { ExitCode: number }).ExitCode,
|
||||||
|
stdout: Buffer.concat(stdoutChunks).toString('utf-8'),
|
||||||
|
stderr: Buffer.concat(stderrChunks).toString('utf-8'),
|
||||||
|
});
|
||||||
|
}).catch((err) => {
|
||||||
|
resolve({
|
||||||
|
exitCode: -1,
|
||||||
|
stdout: Buffer.concat(stdoutChunks).toString('utf-8'),
|
||||||
|
stderr: err instanceof Error ? err.message : String(err),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.on('error', (err: Error) => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
reject(err);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
520
src/mcpd/src/services/health-probe.service.ts
Normal file
520
src/mcpd/src/services/health-probe.service.ts
Normal file
@@ -0,0 +1,520 @@
|
|||||||
|
import type { McpServer, McpInstance } from '@prisma/client';
|
||||||
|
import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js';
|
||||||
|
import type { McpOrchestrator } from './orchestrator.js';
|
||||||
|
|
||||||
|
export interface HealthCheckSpec {
|
||||||
|
tool: string;
|
||||||
|
arguments?: Record<string, unknown>;
|
||||||
|
intervalSeconds?: number;
|
||||||
|
timeoutSeconds?: number;
|
||||||
|
failureThreshold?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ProbeResult {
|
||||||
|
healthy: boolean;
|
||||||
|
latencyMs: number;
|
||||||
|
message: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ProbeState {
|
||||||
|
consecutiveFailures: number;
|
||||||
|
lastProbeAt: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodic health probe runner — calls MCP tools on running instances to verify
|
||||||
|
* they are alive and responsive. Mirrors Kubernetes liveness probe semantics.
|
||||||
|
*
|
||||||
|
* For STDIO servers: runs `docker exec` with a disposable MCP client script
|
||||||
|
* that sends initialize + tool/call via the package binary.
|
||||||
|
*
|
||||||
|
* For SSE/HTTP servers: sends HTTP JSON-RPC directly to the container port.
|
||||||
|
*/
|
||||||
|
export class HealthProbeRunner {
|
||||||
|
private probeStates = new Map<string, ProbeState>();
|
||||||
|
private timer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private instanceRepo: IMcpInstanceRepository,
|
||||||
|
private serverRepo: IMcpServerRepository,
|
||||||
|
private orchestrator: McpOrchestrator,
|
||||||
|
private logger?: { info: (msg: string) => void; error: (obj: unknown, msg: string) => void },
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/** Start the periodic probe loop. Runs every `tickIntervalMs` (default 15s). */
|
||||||
|
start(tickIntervalMs = 15_000): void {
|
||||||
|
if (this.timer) return;
|
||||||
|
this.timer = setInterval(() => {
|
||||||
|
this.tick().catch((err) => {
|
||||||
|
this.logger?.error({ err }, 'Health probe tick failed');
|
||||||
|
});
|
||||||
|
}, tickIntervalMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
stop(): void {
|
||||||
|
if (this.timer) {
|
||||||
|
clearInterval(this.timer);
|
||||||
|
this.timer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Single tick: probe all RUNNING instances that have healthCheck configs and are due. */
|
||||||
|
async tick(): Promise<void> {
|
||||||
|
const instances = await this.instanceRepo.findAll();
|
||||||
|
const running = instances.filter((i) => i.status === 'RUNNING' && i.containerId);
|
||||||
|
|
||||||
|
// Cache servers by ID to avoid repeated lookups
|
||||||
|
const serverCache = new Map<string, McpServer>();
|
||||||
|
|
||||||
|
for (const inst of running) {
|
||||||
|
let server = serverCache.get(inst.serverId);
|
||||||
|
if (!server) {
|
||||||
|
const s = await this.serverRepo.findById(inst.serverId);
|
||||||
|
if (!s) continue;
|
||||||
|
serverCache.set(inst.serverId, s);
|
||||||
|
server = s;
|
||||||
|
}
|
||||||
|
|
||||||
|
const healthCheck = server.healthCheck as HealthCheckSpec | null;
|
||||||
|
if (!healthCheck) continue;
|
||||||
|
|
||||||
|
const intervalMs = (healthCheck.intervalSeconds ?? 60) * 1000;
|
||||||
|
const state = this.probeStates.get(inst.id);
|
||||||
|
const now = Date.now();
|
||||||
|
|
||||||
|
// Skip if not due yet
|
||||||
|
if (state && (now - state.lastProbeAt) < intervalMs) continue;
|
||||||
|
|
||||||
|
await this.probeInstance(inst, server, healthCheck);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up states for instances that no longer exist
|
||||||
|
const activeIds = new Set(running.map((i) => i.id));
|
||||||
|
for (const key of this.probeStates.keys()) {
|
||||||
|
if (!activeIds.has(key)) {
|
||||||
|
this.probeStates.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Probe a single instance and update its health status. */
|
||||||
|
async probeInstance(
|
||||||
|
instance: McpInstance,
|
||||||
|
server: McpServer,
|
||||||
|
healthCheck: HealthCheckSpec,
|
||||||
|
): Promise<ProbeResult> {
|
||||||
|
const timeoutMs = (healthCheck.timeoutSeconds ?? 10) * 1000;
|
||||||
|
const failureThreshold = healthCheck.failureThreshold ?? 3;
|
||||||
|
const now = new Date();
|
||||||
|
const start = Date.now();
|
||||||
|
|
||||||
|
let result: ProbeResult;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') {
|
||||||
|
result = await this.probeHttp(instance, server, healthCheck, timeoutMs);
|
||||||
|
} else {
|
||||||
|
result = await this.probeStdio(instance, server, healthCheck, timeoutMs);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
result = {
|
||||||
|
healthy: false,
|
||||||
|
latencyMs: Date.now() - start,
|
||||||
|
message: err instanceof Error ? err.message : String(err),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update probe state
|
||||||
|
const state = this.probeStates.get(instance.id) ?? { consecutiveFailures: 0, lastProbeAt: 0 };
|
||||||
|
state.lastProbeAt = Date.now();
|
||||||
|
|
||||||
|
if (result.healthy) {
|
||||||
|
state.consecutiveFailures = 0;
|
||||||
|
} else {
|
||||||
|
state.consecutiveFailures++;
|
||||||
|
}
|
||||||
|
this.probeStates.set(instance.id, state);
|
||||||
|
|
||||||
|
// Determine health status
|
||||||
|
const healthStatus = result.healthy
|
||||||
|
? 'healthy'
|
||||||
|
: state.consecutiveFailures >= failureThreshold
|
||||||
|
? 'unhealthy'
|
||||||
|
: 'degraded';
|
||||||
|
|
||||||
|
// Build event
|
||||||
|
const eventType = result.healthy ? 'Normal' : 'Warning';
|
||||||
|
const eventMessage = result.healthy
|
||||||
|
? `Health check passed (${result.latencyMs}ms)`
|
||||||
|
: `Health check failed: ${result.message}`;
|
||||||
|
|
||||||
|
const existingEvents = (instance.events as Array<{ timestamp: string; type: string; message: string }>) ?? [];
|
||||||
|
// Keep last 50 events
|
||||||
|
const events = [
|
||||||
|
...existingEvents.slice(-49),
|
||||||
|
{ timestamp: now.toISOString(), type: eventType, message: eventMessage },
|
||||||
|
];
|
||||||
|
|
||||||
|
// Update instance
|
||||||
|
await this.instanceRepo.updateStatus(instance.id, instance.status as 'RUNNING', {
|
||||||
|
healthStatus,
|
||||||
|
lastHealthCheck: now,
|
||||||
|
events,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger?.info(
|
||||||
|
`[health] ${(instance as unknown as { server?: { name: string } }).server?.name ?? instance.serverId}: ${healthStatus} (${result.latencyMs}ms) - ${eventMessage}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Probe an HTTP/SSE MCP server by sending a JSON-RPC tool call. */
|
||||||
|
private async probeHttp(
|
||||||
|
instance: McpInstance,
|
||||||
|
server: McpServer,
|
||||||
|
healthCheck: HealthCheckSpec,
|
||||||
|
timeoutMs: number,
|
||||||
|
): Promise<ProbeResult> {
|
||||||
|
if (!instance.containerId) {
|
||||||
|
return { healthy: false, latencyMs: 0, message: 'No container ID' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get container IP for internal network communication
|
||||||
|
// (mcpd and MCP containers share the mcp-servers network)
|
||||||
|
const containerInfo = await this.orchestrator.inspectContainer(instance.containerId);
|
||||||
|
const containerPort = (server.containerPort as number | null) ?? 3000;
|
||||||
|
|
||||||
|
let baseUrl: string;
|
||||||
|
if (containerInfo.ip) {
|
||||||
|
baseUrl = `http://${containerInfo.ip}:${containerPort}`;
|
||||||
|
} else if (instance.port) {
|
||||||
|
baseUrl = `http://localhost:${instance.port}`;
|
||||||
|
} else {
|
||||||
|
return { healthy: false, latencyMs: 0, message: 'No container IP or port' };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (server.transport === 'SSE') {
|
||||||
|
return this.probeSse(baseUrl, healthCheck, timeoutMs);
|
||||||
|
}
|
||||||
|
return this.probeStreamableHttp(baseUrl, healthCheck, timeoutMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Probe a streamable-http MCP server (POST to root endpoint).
|
||||||
|
*/
|
||||||
|
private async probeStreamableHttp(
|
||||||
|
baseUrl: string,
|
||||||
|
healthCheck: HealthCheckSpec,
|
||||||
|
timeoutMs: number,
|
||||||
|
): Promise<ProbeResult> {
|
||||||
|
const start = Date.now();
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const initResp = await fetch(baseUrl, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 1, method: 'initialize',
|
||||||
|
params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } },
|
||||||
|
}),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!initResp.ok) {
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` };
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionId = initResp.headers.get('mcp-session-id');
|
||||||
|
const headers: Record<string, string> = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' };
|
||||||
|
if (sessionId) headers['Mcp-Session-Id'] = sessionId;
|
||||||
|
|
||||||
|
await fetch(baseUrl, {
|
||||||
|
method: 'POST', headers,
|
||||||
|
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
const toolResp = await fetch(baseUrl, {
|
||||||
|
method: 'POST', headers,
|
||||||
|
body: JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
||||||
|
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
|
||||||
|
}),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
const latencyMs = Date.now() - start;
|
||||||
|
|
||||||
|
if (!toolResp.ok) {
|
||||||
|
return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` };
|
||||||
|
}
|
||||||
|
|
||||||
|
const body = await toolResp.text();
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(body.includes('data: ') ? body.split('data: ')[1]!.split('\n')[0]! : body);
|
||||||
|
if (parsed.error) {
|
||||||
|
return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' };
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// If parsing fails but HTTP was ok, consider it healthy
|
||||||
|
}
|
||||||
|
|
||||||
|
return { healthy: true, latencyMs, message: 'ok' };
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Probe an SSE-transport MCP server.
|
||||||
|
* SSE protocol: GET /sse → endpoint event → POST /messages?session_id=...
|
||||||
|
*/
|
||||||
|
private async probeSse(
|
||||||
|
baseUrl: string,
|
||||||
|
healthCheck: HealthCheckSpec,
|
||||||
|
timeoutMs: number,
|
||||||
|
): Promise<ProbeResult> {
|
||||||
|
const start = Date.now();
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 1. Connect to SSE endpoint to get the message URL
|
||||||
|
const sseResp = await fetch(`${baseUrl}/sse`, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: { 'Accept': 'text/event-stream' },
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!sseResp.ok) {
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: `SSE connect HTTP ${sseResp.status}` };
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Read the SSE stream to find the endpoint event
|
||||||
|
const reader = sseResp.body?.getReader();
|
||||||
|
if (!reader) {
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: 'No SSE stream body' };
|
||||||
|
}
|
||||||
|
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let buffer = '';
|
||||||
|
let messagesUrl = '';
|
||||||
|
|
||||||
|
// Read until we get the endpoint event
|
||||||
|
while (!messagesUrl) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
|
||||||
|
for (const line of buffer.split('\n')) {
|
||||||
|
if (line.startsWith('data: ') && buffer.includes('event: endpoint')) {
|
||||||
|
const endpoint = line.slice(6).trim();
|
||||||
|
// Endpoint may be relative (e.g., /messages?session_id=...) or absolute
|
||||||
|
messagesUrl = endpoint.startsWith('http') ? endpoint : `${baseUrl}${endpoint}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Keep only the last incomplete line
|
||||||
|
const lines = buffer.split('\n');
|
||||||
|
buffer = lines[lines.length - 1] ?? '';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!messagesUrl) {
|
||||||
|
reader.cancel();
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: 'No endpoint event from SSE' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Initialize via the messages endpoint
|
||||||
|
const postHeaders = { 'Content-Type': 'application/json' };
|
||||||
|
|
||||||
|
const initResp = await fetch(messagesUrl, {
|
||||||
|
method: 'POST', headers: postHeaders,
|
||||||
|
body: JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 1, method: 'initialize',
|
||||||
|
params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } },
|
||||||
|
}),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!initResp.ok) {
|
||||||
|
reader.cancel();
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` };
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Send initialized notification
|
||||||
|
await fetch(messagesUrl, {
|
||||||
|
method: 'POST', headers: postHeaders,
|
||||||
|
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
// 5. Call health check tool
|
||||||
|
const toolResp = await fetch(messagesUrl, {
|
||||||
|
method: 'POST', headers: postHeaders,
|
||||||
|
body: JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
||||||
|
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
|
||||||
|
}),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
const latencyMs = Date.now() - start;
|
||||||
|
|
||||||
|
// 6. Read tool response from SSE stream
|
||||||
|
// The response comes back on the SSE stream, not the POST response
|
||||||
|
let responseBuffer = '';
|
||||||
|
const readTimeout = setTimeout(() => reader.cancel(), 5000);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
responseBuffer += decoder.decode(value, { stream: true });
|
||||||
|
|
||||||
|
// Look for data lines containing our response (id: 2)
|
||||||
|
for (const line of responseBuffer.split('\n')) {
|
||||||
|
if (line.startsWith('data: ')) {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(line.slice(6));
|
||||||
|
if (parsed.id === 2) {
|
||||||
|
clearTimeout(readTimeout);
|
||||||
|
reader.cancel();
|
||||||
|
if (parsed.error) {
|
||||||
|
return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' };
|
||||||
|
}
|
||||||
|
return { healthy: true, latencyMs, message: 'ok' };
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Not valid JSON, skip
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const respLines = responseBuffer.split('\n');
|
||||||
|
responseBuffer = respLines[respLines.length - 1] ?? '';
|
||||||
|
}
|
||||||
|
|
||||||
|
clearTimeout(readTimeout);
|
||||||
|
reader.cancel();
|
||||||
|
|
||||||
|
// If POST response itself was ok (202 for SSE), consider it healthy
|
||||||
|
if (toolResp.ok) {
|
||||||
|
return { healthy: true, latencyMs, message: 'ok' };
|
||||||
|
}
|
||||||
|
|
||||||
|
return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` };
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Probe a STDIO MCP server by running `docker exec` with a disposable Node.js
|
||||||
|
* script that pipes JSON-RPC messages into the package binary.
|
||||||
|
*/
|
||||||
|
private async probeStdio(
|
||||||
|
instance: McpInstance,
|
||||||
|
server: McpServer,
|
||||||
|
healthCheck: HealthCheckSpec,
|
||||||
|
timeoutMs: number,
|
||||||
|
): Promise<ProbeResult> {
|
||||||
|
if (!instance.containerId) {
|
||||||
|
return { healthy: false, latencyMs: 0, message: 'No container ID' };
|
||||||
|
}
|
||||||
|
|
||||||
|
const start = Date.now();
|
||||||
|
const packageName = server.packageName as string | null;
|
||||||
|
|
||||||
|
if (!packageName) {
|
||||||
|
return { healthy: false, latencyMs: 0, message: 'No package name for STDIO server' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build JSON-RPC messages for the health probe
|
||||||
|
const initMsg = JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 1, method: 'initialize',
|
||||||
|
params: {
|
||||||
|
protocolVersion: '2024-11-05',
|
||||||
|
capabilities: {},
|
||||||
|
clientInfo: { name: 'mcpctl-health', version: '0.1.0' },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const initializedMsg = JSON.stringify({
|
||||||
|
jsonrpc: '2.0', method: 'notifications/initialized',
|
||||||
|
});
|
||||||
|
const toolCallMsg = JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
||||||
|
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Use a Node.js inline script that:
|
||||||
|
// 1. Spawns the MCP server binary via npx
|
||||||
|
// 2. Sends initialize + initialized + tool call via stdin
|
||||||
|
// 3. Reads responses from stdout
|
||||||
|
// 4. Exits with 0 if tool call succeeds, 1 if it fails
|
||||||
|
const probeScript = `
|
||||||
|
const { spawn } = require('child_process');
|
||||||
|
const proc = spawn('npx', ['--prefer-offline', '-y', ${JSON.stringify(packageName)}], { stdio: ['pipe', 'pipe', 'pipe'] });
|
||||||
|
let output = '';
|
||||||
|
let responded = false;
|
||||||
|
proc.stdout.on('data', d => {
|
||||||
|
output += d;
|
||||||
|
const lines = output.split('\\n');
|
||||||
|
for (const line of lines) {
|
||||||
|
if (!line.trim()) continue;
|
||||||
|
try {
|
||||||
|
const msg = JSON.parse(line);
|
||||||
|
if (msg.id === 2) {
|
||||||
|
responded = true;
|
||||||
|
if (msg.error) {
|
||||||
|
process.stdout.write('ERROR:' + (msg.error.message || 'unknown'));
|
||||||
|
proc.kill();
|
||||||
|
process.exit(1);
|
||||||
|
} else {
|
||||||
|
process.stdout.write('OK');
|
||||||
|
proc.kill();
|
||||||
|
process.exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
|
output = lines[lines.length - 1] || '';
|
||||||
|
});
|
||||||
|
proc.stderr.on('data', () => {});
|
||||||
|
proc.on('error', e => { process.stdout.write('ERROR:' + e.message); process.exit(1); });
|
||||||
|
proc.on('exit', (code) => { if (!responded) { process.stdout.write('ERROR:process exited ' + code); process.exit(1); } });
|
||||||
|
setTimeout(() => { if (!responded) { process.stdout.write('ERROR:timeout'); proc.kill(); process.exit(1); } }, ${timeoutMs - 2000});
|
||||||
|
proc.stdin.write(${JSON.stringify(initMsg)} + '\\n');
|
||||||
|
setTimeout(() => {
|
||||||
|
proc.stdin.write(${JSON.stringify(initializedMsg)} + '\\n');
|
||||||
|
setTimeout(() => {
|
||||||
|
proc.stdin.write(${JSON.stringify(toolCallMsg)} + '\\n');
|
||||||
|
}, 500);
|
||||||
|
}, 500);
|
||||||
|
`.trim();
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await this.orchestrator.execInContainer(
|
||||||
|
instance.containerId,
|
||||||
|
['node', '-e', probeScript],
|
||||||
|
{ timeoutMs },
|
||||||
|
);
|
||||||
|
|
||||||
|
const latencyMs = Date.now() - start;
|
||||||
|
|
||||||
|
if (result.exitCode === 0 && result.stdout.includes('OK')) {
|
||||||
|
return { healthy: true, latencyMs, message: 'ok' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract error message
|
||||||
|
const errorMatch = result.stdout.match(/ERROR:(.*)/);
|
||||||
|
const errorMsg = errorMatch?.[1] ?? (result.stderr.trim() || `exit code ${result.exitCode}`);
|
||||||
|
return { healthy: false, latencyMs, message: errorMsg };
|
||||||
|
} catch (err) {
|
||||||
|
return {
|
||||||
|
healthy: false,
|
||||||
|
latencyMs: Date.now() - start,
|
||||||
|
message: err instanceof Error ? err.message : String(err),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,7 +5,7 @@ export { ProjectService } from './project.service.js';
|
|||||||
export { InstanceService, InvalidStateError } from './instance.service.js';
|
export { InstanceService, InvalidStateError } from './instance.service.js';
|
||||||
export { generateMcpConfig } from './mcp-config-generator.js';
|
export { generateMcpConfig } from './mcp-config-generator.js';
|
||||||
export type { McpConfig, McpConfigServer } from './mcp-config-generator.js';
|
export type { McpConfig, McpConfigServer } from './mcp-config-generator.js';
|
||||||
export type { McpOrchestrator, ContainerSpec, ContainerInfo, ContainerLogs } from './orchestrator.js';
|
export type { McpOrchestrator, ContainerSpec, ContainerInfo, ContainerLogs, ExecResult } from './orchestrator.js';
|
||||||
export { DEFAULT_MEMORY_LIMIT, DEFAULT_NANO_CPUS } from './orchestrator.js';
|
export { DEFAULT_MEMORY_LIMIT, DEFAULT_NANO_CPUS } from './orchestrator.js';
|
||||||
export { DockerContainerManager } from './docker/container-manager.js';
|
export { DockerContainerManager } from './docker/container-manager.js';
|
||||||
export { AuditLogService } from './audit-log.service.js';
|
export { AuditLogService } from './audit-log.service.js';
|
||||||
@@ -25,3 +25,5 @@ export type { LoginResult } from './auth.service.js';
|
|||||||
export { McpProxyService } from './mcp-proxy-service.js';
|
export { McpProxyService } from './mcp-proxy-service.js';
|
||||||
export type { McpProxyRequest, McpProxyResponse } from './mcp-proxy-service.js';
|
export type { McpProxyRequest, McpProxyResponse } from './mcp-proxy-service.js';
|
||||||
export { TemplateService } from './template.service.js';
|
export { TemplateService } from './template.service.js';
|
||||||
|
export { HealthProbeRunner } from './health-probe.service.js';
|
||||||
|
export type { HealthCheckSpec, ProbeResult } from './health-probe.service.js';
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import type {
|
|||||||
ContainerSpec,
|
ContainerSpec,
|
||||||
ContainerInfo,
|
ContainerInfo,
|
||||||
ContainerLogs,
|
ContainerLogs,
|
||||||
|
ExecResult,
|
||||||
} from '../orchestrator.js';
|
} from '../orchestrator.js';
|
||||||
import { K8sClient } from './k8s-client.js';
|
import { K8sClient } from './k8s-client.js';
|
||||||
import type { K8sClientConfig } from './k8s-client.js';
|
import type { K8sClientConfig } from './k8s-client.js';
|
||||||
@@ -164,6 +165,15 @@ export class KubernetesOrchestrator implements McpOrchestrator {
|
|||||||
return { stdout, stderr: '' };
|
return { stdout, stderr: '' };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async execInContainer(
|
||||||
|
_containerId: string,
|
||||||
|
_cmd: string[],
|
||||||
|
_opts?: { stdin?: string; timeoutMs?: number },
|
||||||
|
): Promise<ExecResult> {
|
||||||
|
// K8s exec via API — future implementation
|
||||||
|
throw new Error('execInContainer not yet implemented for Kubernetes');
|
||||||
|
}
|
||||||
|
|
||||||
async listContainers(namespace?: string): Promise<ContainerInfo[]> {
|
async listContainers(namespace?: string): Promise<ContainerInfo[]> {
|
||||||
const ns = namespace ?? this.namespace;
|
const ns = namespace ?? this.namespace;
|
||||||
const res = await this.client.get<K8sPodList>(
|
const res = await this.client.get<K8sPodList>(
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ export interface ContainerInfo {
|
|||||||
name: string;
|
name: string;
|
||||||
state: 'running' | 'stopped' | 'starting' | 'error' | 'unknown';
|
state: 'running' | 'stopped' | 'starting' | 'error' | 'unknown';
|
||||||
port?: number;
|
port?: number;
|
||||||
|
/** Container IP on the first non-default network (for internal communication) */
|
||||||
|
ip?: string;
|
||||||
createdAt: Date;
|
createdAt: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,6 +40,12 @@ export interface ContainerLogs {
|
|||||||
stderr: string;
|
stderr: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface ExecResult {
|
||||||
|
exitCode: number;
|
||||||
|
stdout: string;
|
||||||
|
stderr: string;
|
||||||
|
}
|
||||||
|
|
||||||
export interface McpOrchestrator {
|
export interface McpOrchestrator {
|
||||||
/** Pull an image if not present locally */
|
/** Pull an image if not present locally */
|
||||||
pullImage(image: string): Promise<void>;
|
pullImage(image: string): Promise<void>;
|
||||||
@@ -57,6 +65,9 @@ export interface McpOrchestrator {
|
|||||||
/** Get container logs */
|
/** Get container logs */
|
||||||
getContainerLogs(containerId: string, opts?: { tail?: number; since?: number }): Promise<ContainerLogs>;
|
getContainerLogs(containerId: string, opts?: { tail?: number; since?: number }): Promise<ContainerLogs>;
|
||||||
|
|
||||||
|
/** Execute a command inside a running container with optional stdin */
|
||||||
|
execInContainer(containerId: string, cmd: string[], opts?: { stdin?: string; timeoutMs?: number }): Promise<ExecResult>;
|
||||||
|
|
||||||
/** Check if the orchestrator runtime is available */
|
/** Check if the orchestrator runtime is available */
|
||||||
ping(): Promise<boolean>;
|
ping(): Promise<boolean>;
|
||||||
}
|
}
|
||||||
|
|||||||
355
src/mcpd/tests/services/health-probe.test.ts
Normal file
355
src/mcpd/tests/services/health-probe.test.ts
Normal file
@@ -0,0 +1,355 @@
|
|||||||
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||||
|
import { HealthProbeRunner } from '../../src/services/health-probe.service.js';
|
||||||
|
import type { HealthCheckSpec } from '../../src/services/health-probe.service.js';
|
||||||
|
import type { IMcpInstanceRepository, IMcpServerRepository } from '../../src/repositories/interfaces.js';
|
||||||
|
import type { McpOrchestrator, ExecResult } from '../../src/services/orchestrator.js';
|
||||||
|
import type { McpInstance, McpServer } from '@prisma/client';
|
||||||
|
|
||||||
|
function makeInstance(overrides: Partial<McpInstance> = {}): McpInstance {
|
||||||
|
return {
|
||||||
|
id: 'inst-1',
|
||||||
|
serverId: 'srv-1',
|
||||||
|
status: 'RUNNING',
|
||||||
|
containerId: 'container-abc',
|
||||||
|
port: null,
|
||||||
|
healthStatus: null,
|
||||||
|
lastHealthCheck: null,
|
||||||
|
events: [],
|
||||||
|
metadata: {},
|
||||||
|
version: 1,
|
||||||
|
createdAt: new Date(),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
...overrides,
|
||||||
|
} as McpInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeServer(overrides: Partial<McpServer> = {}): McpServer {
|
||||||
|
return {
|
||||||
|
id: 'srv-1',
|
||||||
|
name: 'my-grafana',
|
||||||
|
transport: 'STDIO',
|
||||||
|
packageName: '@leval/mcp-grafana',
|
||||||
|
dockerImage: null,
|
||||||
|
externalUrl: null,
|
||||||
|
containerPort: null,
|
||||||
|
repositoryUrl: null,
|
||||||
|
description: null,
|
||||||
|
command: null,
|
||||||
|
env: [],
|
||||||
|
replicas: 1,
|
||||||
|
projectId: null,
|
||||||
|
healthCheck: {
|
||||||
|
tool: 'list_datasources',
|
||||||
|
arguments: {},
|
||||||
|
intervalSeconds: 60,
|
||||||
|
timeoutSeconds: 10,
|
||||||
|
failureThreshold: 3,
|
||||||
|
},
|
||||||
|
version: 1,
|
||||||
|
createdAt: new Date(),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
...overrides,
|
||||||
|
} as McpServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
function mockInstanceRepo(): IMcpInstanceRepository {
|
||||||
|
return {
|
||||||
|
findAll: vi.fn(async () => []),
|
||||||
|
findById: vi.fn(async () => null),
|
||||||
|
findByContainerId: vi.fn(async () => null),
|
||||||
|
create: vi.fn(async (data) => makeInstance(data)),
|
||||||
|
updateStatus: vi.fn(async (id, status, fields) => makeInstance({ id, status, ...fields })),
|
||||||
|
delete: vi.fn(async () => {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function mockServerRepo(): IMcpServerRepository {
|
||||||
|
return {
|
||||||
|
findAll: vi.fn(async () => []),
|
||||||
|
findById: vi.fn(async () => null),
|
||||||
|
findByName: vi.fn(async () => null),
|
||||||
|
create: vi.fn(async () => makeServer()),
|
||||||
|
update: vi.fn(async () => makeServer()),
|
||||||
|
delete: vi.fn(async () => {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function mockOrchestrator(): McpOrchestrator {
|
||||||
|
return {
|
||||||
|
pullImage: vi.fn(async () => {}),
|
||||||
|
createContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })),
|
||||||
|
stopContainer: vi.fn(async () => {}),
|
||||||
|
removeContainer: vi.fn(async () => {}),
|
||||||
|
inspectContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })),
|
||||||
|
getContainerLogs: vi.fn(async () => ({ stdout: '', stderr: '' })),
|
||||||
|
execInContainer: vi.fn(async () => ({ exitCode: 0, stdout: 'OK', stderr: '' })),
|
||||||
|
ping: vi.fn(async () => true),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('HealthProbeRunner', () => {
|
||||||
|
let instanceRepo: IMcpInstanceRepository;
|
||||||
|
let serverRepo: IMcpServerRepository;
|
||||||
|
let orchestrator: McpOrchestrator;
|
||||||
|
let runner: HealthProbeRunner;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
instanceRepo = mockInstanceRepo();
|
||||||
|
serverRepo = mockServerRepo();
|
||||||
|
orchestrator = mockOrchestrator();
|
||||||
|
runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('skips instances without healthCheck config', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const server = makeServer({ healthCheck: null });
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
expect(orchestrator.execInContainer).not.toHaveBeenCalled();
|
||||||
|
expect(instanceRepo.updateStatus).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('skips non-RUNNING instances', async () => {
|
||||||
|
const instance = makeInstance({ status: 'ERROR' });
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
expect(serverRepo.findById).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('probes STDIO instance with exec and marks healthy on success', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const server = makeServer();
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 0,
|
||||||
|
stdout: 'OK',
|
||||||
|
stderr: '',
|
||||||
|
});
|
||||||
|
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
expect(orchestrator.execInContainer).toHaveBeenCalledWith(
|
||||||
|
'container-abc',
|
||||||
|
expect.arrayContaining(['node', '-e']),
|
||||||
|
expect.objectContaining({ timeoutMs: 10000 }),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||||
|
'inst-1',
|
||||||
|
'RUNNING',
|
||||||
|
expect.objectContaining({
|
||||||
|
healthStatus: 'healthy',
|
||||||
|
lastHealthCheck: expect.any(Date),
|
||||||
|
events: expect.arrayContaining([
|
||||||
|
expect.objectContaining({ type: 'Normal', message: expect.stringContaining('passed') }),
|
||||||
|
]),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('marks unhealthy after failureThreshold consecutive failures', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const healthCheck: HealthCheckSpec = {
|
||||||
|
tool: 'list_datasources',
|
||||||
|
arguments: {},
|
||||||
|
intervalSeconds: 0, // always due
|
||||||
|
failureThreshold: 2,
|
||||||
|
};
|
||||||
|
const server = makeServer({ healthCheck: healthCheck as unknown as undefined });
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 1,
|
||||||
|
stdout: 'ERROR:connection refused',
|
||||||
|
stderr: '',
|
||||||
|
});
|
||||||
|
|
||||||
|
// First failure → degraded
|
||||||
|
await runner.tick();
|
||||||
|
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||||
|
'inst-1',
|
||||||
|
'RUNNING',
|
||||||
|
expect.objectContaining({ healthStatus: 'degraded' }),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Second failure → unhealthy (threshold = 2)
|
||||||
|
await runner.tick();
|
||||||
|
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||||
|
'inst-1',
|
||||||
|
'RUNNING',
|
||||||
|
expect.objectContaining({ healthStatus: 'unhealthy' }),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('resets failure count on success', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const healthCheck: HealthCheckSpec = {
|
||||||
|
tool: 'list_datasources',
|
||||||
|
arguments: {},
|
||||||
|
intervalSeconds: 0,
|
||||||
|
failureThreshold: 3,
|
||||||
|
};
|
||||||
|
const server = makeServer({ healthCheck: healthCheck as unknown as undefined });
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
|
||||||
|
// Two failures
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 1, stdout: 'ERROR:fail', stderr: '',
|
||||||
|
});
|
||||||
|
await runner.tick();
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
// Then success — should reset to healthy
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 0, stdout: 'OK', stderr: '',
|
||||||
|
});
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
const lastCall = vi.mocked(instanceRepo.updateStatus).mock.calls.at(-1);
|
||||||
|
expect(lastCall?.[2]).toEqual(expect.objectContaining({ healthStatus: 'healthy' }));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles exec timeout as failure', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const server = makeServer();
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockRejectedValue(new Error('Exec timed out after 10000ms'));
|
||||||
|
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
|
||||||
|
'inst-1',
|
||||||
|
'RUNNING',
|
||||||
|
expect.objectContaining({
|
||||||
|
healthStatus: 'degraded',
|
||||||
|
events: expect.arrayContaining([
|
||||||
|
expect.objectContaining({ type: 'Warning', message: expect.stringContaining('timed out') }),
|
||||||
|
]),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('appends events without losing history', async () => {
|
||||||
|
const existingEvents = [
|
||||||
|
{ timestamp: '2025-01-01T00:00:00Z', type: 'Normal', message: 'old event' },
|
||||||
|
];
|
||||||
|
const instance = makeInstance({ events: existingEvents });
|
||||||
|
const server = makeServer({
|
||||||
|
healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'],
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 0, stdout: 'OK', stderr: '',
|
||||||
|
});
|
||||||
|
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
const events = vi.mocked(instanceRepo.updateStatus).mock.calls[0]?.[2]?.events as unknown[];
|
||||||
|
expect(events).toHaveLength(2);
|
||||||
|
expect((events[0] as { message: string }).message).toBe('old event');
|
||||||
|
expect((events[1] as { message: string }).message).toContain('passed');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('respects interval — skips probing if not due', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const server = makeServer({
|
||||||
|
healthCheck: { tool: 'test', intervalSeconds: 300 } as McpServer['healthCheck'],
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 0, stdout: 'OK', stderr: '',
|
||||||
|
});
|
||||||
|
|
||||||
|
// First tick: should probe
|
||||||
|
await runner.tick();
|
||||||
|
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Second tick immediately: should skip (300s interval not elapsed)
|
||||||
|
await runner.tick();
|
||||||
|
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('cleans up probe states for removed instances', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const server = makeServer({
|
||||||
|
healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'],
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||||
|
|
||||||
|
await runner.tick();
|
||||||
|
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Instance removed
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([]);
|
||||||
|
await runner.tick();
|
||||||
|
|
||||||
|
// Re-add same instance — should probe again (state was cleaned)
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
await runner.tick();
|
||||||
|
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('skips STDIO instances without containerId', async () => {
|
||||||
|
const instance = makeInstance({ containerId: null });
|
||||||
|
const server = makeServer();
|
||||||
|
|
||||||
|
// containerId is null, but status is RUNNING — shouldn't be probed
|
||||||
|
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
|
||||||
|
|
||||||
|
await runner.tick();
|
||||||
|
expect(serverRepo.findById).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('probeInstance returns result directly', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const server = makeServer();
|
||||||
|
const healthCheck: HealthCheckSpec = {
|
||||||
|
tool: 'list_datasources',
|
||||||
|
arguments: {},
|
||||||
|
};
|
||||||
|
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 0, stdout: 'OK', stderr: '',
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await runner.probeInstance(instance, server, healthCheck);
|
||||||
|
expect(result.healthy).toBe(true);
|
||||||
|
expect(result.latencyMs).toBeGreaterThanOrEqual(0);
|
||||||
|
expect(result.message).toBe('ok');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles STDIO exec failure with error message', async () => {
|
||||||
|
const instance = makeInstance();
|
||||||
|
const server = makeServer();
|
||||||
|
const healthCheck: HealthCheckSpec = { tool: 'list_datasources', arguments: {} };
|
||||||
|
|
||||||
|
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
|
||||||
|
exitCode: 1,
|
||||||
|
stdout: 'ERROR:ECONNREFUSED 10.0.0.1:3000',
|
||||||
|
stderr: '',
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await runner.probeInstance(instance, server, healthCheck);
|
||||||
|
expect(result.healthy).toBe(false);
|
||||||
|
expect(result.message).toBe('ECONNREFUSED 10.0.0.1:3000');
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -1,16 +1,22 @@
|
|||||||
name: home-assistant
|
name: home-assistant
|
||||||
version: "1.0.0"
|
version: "1.0.0"
|
||||||
description: Home Assistant MCP server for smart home control and entity management
|
description: Home Assistant MCP server for smart home control and entity management
|
||||||
packageName: "home-assistant-mcp-server"
|
dockerImage: "ghcr.io/homeassistant-ai/ha-mcp:latest"
|
||||||
transport: STDIO
|
transport: SSE
|
||||||
repositoryUrl: https://github.com/tevonsb/homeassistant-mcp
|
containerPort: 8086
|
||||||
|
repositoryUrl: https://github.com/homeassistant-ai/ha-mcp
|
||||||
|
command:
|
||||||
|
- python
|
||||||
|
- -c
|
||||||
|
- "from ha_mcp.server import HomeAssistantSmartMCPServer; s = HomeAssistantSmartMCPServer(); s.mcp.run(transport='sse', host='0.0.0.0', port=8086)"
|
||||||
healthCheck:
|
healthCheck:
|
||||||
tool: get_entities
|
tool: ha_search_entities
|
||||||
arguments: {}
|
arguments:
|
||||||
|
query: "light"
|
||||||
env:
|
env:
|
||||||
- name: HASS_URL
|
- name: HOMEASSISTANT_URL
|
||||||
description: Home Assistant instance URL (e.g. http://homeassistant.local:8123)
|
description: Home Assistant instance URL (e.g. http://homeassistant.local:8123)
|
||||||
required: true
|
required: true
|
||||||
- name: HASS_TOKEN
|
- name: HOMEASSISTANT_TOKEN
|
||||||
description: Home Assistant long-lived access token
|
description: Home Assistant long-lived access token
|
||||||
required: true
|
required: true
|
||||||
|
|||||||
Reference in New Issue
Block a user