Compare commits

..

13 Commits

Author SHA1 Message Date
Michal
3a6e58274c fix: add missing passwordHash to DB test user factory
Some checks failed
CI / lint (pull_request) Has been cancelled
CI / typecheck (pull_request) Has been cancelled
CI / test (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / package (pull_request) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 01:02:41 +00:00
Michal
c819b65175 fix: SSE health probe uses proper SSE protocol (GET /sse + POST /messages)
Some checks are pending
CI / lint (push) Waiting to run
CI / typecheck (push) Waiting to run
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / package (push) Blocked by required conditions
SSE-transport MCP servers (like ha-mcp) use a different protocol flow:
GET /sse to establish event stream, read endpoint event, then POST
JSON-RPC messages to /messages?session_id=... URL. Previously was
POSTing to root which returned 404.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 00:55:25 +00:00
Michal
c3ef5a664f chore: remove accidentally committed logs.sh
Some checks are pending
CI / lint (push) Waiting to run
CI / typecheck (push) Waiting to run
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / package (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 00:52:31 +00:00
Michal
4c2927a16e fix: HTTP health probes use container IP for internal network communication
mcpd and MCP containers share the mcp-servers Docker network. HTTP probes
must use the container's internal IP + containerPort instead of localhost
+ host-mapped port. Also extracts container IP from Docker inspect.

Updated home-assistant template to use ghcr.io/homeassistant-ai/ha-mcp
Docker image (SSE transport) instead of broken npm package.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 00:52:17 +00:00
79dd6e723d Merge pull request 'feat: MCP health probe runner with tool-call probes' (#17) from feat/health-probe-runner into main
Some checks are pending
CI / lint (push) Waiting to run
CI / typecheck (push) Waiting to run
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / package (push) Blocked by required conditions
2026-02-23 00:39:09 +00:00
Michal
cde1c59fd6 feat: MCP health probe runner — periodic tool-call probes for instances
Some checks failed
CI / lint (pull_request) Has been cancelled
CI / typecheck (pull_request) Has been cancelled
CI / test (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / package (pull_request) Has been cancelled
Implements Kubernetes-style liveness probes that call MCP tools defined
in server healthCheck configs. For STDIO servers, uses docker exec to
spawn a disposable MCP client that sends initialize + tool call. For
HTTP/SSE servers, sends JSON-RPC directly.

- HealthProbeRunner service with configurable interval/threshold/timeout
- execInContainer added to orchestrator interface + Docker implementation
- Instance findById now includes server relation (fixes describe showing IDs)
- Events appended to instance (last 50), healthStatus tracked as
  healthy/degraded/unhealthy
- 12 unit tests covering probing, thresholds, intervals, cleanup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 00:38:48 +00:00
daa5860ed2 Merge pull request 'fix: stdin open for STDIO servers + describe instance resolution' (#16) from fix/stdin-describe-instance into main
Some checks are pending
CI / lint (push) Waiting to run
CI / typecheck (push) Waiting to run
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / package (push) Blocked by required conditions
2026-02-23 00:26:49 +00:00
Michal
ecbf48dd49 fix: keep stdin open for STDIO servers + describe instance resolves server names
Some checks failed
CI / lint (pull_request) Has been cancelled
CI / typecheck (pull_request) Has been cancelled
CI / test (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / package (pull_request) Has been cancelled
STDIO MCP servers read from stdin and exit on EOF. Docker containers close
stdin by default, causing all STDIO servers to crash immediately. Added
OpenStdin: true to container creation.

Describe instance now resolves server names (like logs command), preferring
RUNNING instances. Added 7 new describe tests covering server name resolution,
healthcheck display, events section, and template detail.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 00:26:28 +00:00
d38b5aac60 Merge pull request 'feat: container liveness sync + node-runner slim base' (#15) from feat/container-liveness-sync into main
Some checks are pending
CI / lint (push) Waiting to run
CI / typecheck (push) Waiting to run
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / package (push) Blocked by required conditions
2026-02-23 00:18:41 +00:00
Michal
d07d4d11dd feat: container liveness sync + node-runner slim base
Some checks failed
CI / lint (pull_request) Has been cancelled
CI / typecheck (pull_request) Has been cancelled
CI / test (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / package (pull_request) Has been cancelled
- Add syncStatus() to InstanceService: detects crashed/stopped containers,
  marks them ERROR with last log line as context
- Reconcile now syncs container status first (detect dead before counting)
- Add 30s periodic sync loop in main.ts
- Switch node-runner from alpine to slim (Debian) for npm compatibility
  (fixes home-assistant-mcp-server binary not found on Alpine)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 00:18:28 +00:00
fa58c1b5ed Merge pull request 'fix: logs resolves server names + replica handling + tests' (#14) from fix/logs-resolve-and-tests into main
Some checks are pending
CI / lint (push) Waiting to run
CI / typecheck (push) Waiting to run
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / package (push) Blocked by required conditions
2026-02-23 00:12:50 +00:00
Michal
dd1dfc629d fix: logs command resolves server names, proper replica handling
Some checks failed
CI / lint (pull_request) Has been cancelled
CI / typecheck (pull_request) Has been cancelled
CI / test (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / package (pull_request) Has been cancelled
- `mcpctl logs <server-name>` resolves to first RUNNING instance
- `mcpctl logs <server-name> -i <N>` selects specific replica
- Shows "instance N/M" hint when server has multiple replicas
- Added 5 proper tests: server name resolution, RUNNING preference,
  replica selection, out-of-range error, no instances error

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 00:12:39 +00:00
7b3dab142e Merge pull request 'fix: show server name in instances, logs by server name' (#13) from fix/instance-ux into main
Some checks are pending
CI / lint (push) Waiting to run
CI / typecheck (push) Waiting to run
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / package (push) Blocked by required conditions
2026-02-23 00:07:57 +00:00
17 changed files with 1358 additions and 32 deletions

1
.gitignore vendored
View File

@@ -37,3 +37,4 @@ pgdata/
# Prisma # Prisma
src/db/prisma/migrations/*.sql.backup src/db/prisma/migrations/*.sql.backup
logs.sh

View File

@@ -1,7 +1,8 @@
# Base container for npm-based MCP servers (STDIO transport). # Base container for npm-based MCP servers (STDIO transport).
# mcpd uses this image to run `npx -y <packageName>` when a server # mcpd uses this image to run `npx -y <packageName>` when a server
# has packageName but no dockerImage. # has packageName but no dockerImage.
FROM node:20-alpine # Using slim (Debian) instead of alpine for better npm package compatibility.
FROM node:20-slim
WORKDIR /mcp WORKDIR /mcp

View File

@@ -74,9 +74,10 @@ function formatServerDetail(server: Record<string, unknown>): string {
function formatInstanceDetail(instance: Record<string, unknown>, inspect?: Record<string, unknown>): string { function formatInstanceDetail(instance: Record<string, unknown>, inspect?: Record<string, unknown>): string {
const lines: string[] = []; const lines: string[] = [];
lines.push(`=== Instance: ${instance.id} ===`); const server = instance.server as { name: string } | undefined;
lines.push(`=== Instance: ${server?.name ?? instance.id} ===`);
lines.push(`${pad('Status:')}${instance.status}`); lines.push(`${pad('Status:')}${instance.status}`);
lines.push(`${pad('Server ID:')}${instance.serverId}`); lines.push(`${pad('Server:')}${server?.name ?? String(instance.serverId)}`);
lines.push(`${pad('Container ID:')}${instance.containerId ?? '-'}`); lines.push(`${pad('Container ID:')}${instance.containerId ?? '-'}`);
lines.push(`${pad('Port:')}${instance.port ?? '-'}`); lines.push(`${pad('Port:')}${instance.port ?? '-'}`);
@@ -277,11 +278,33 @@ export function createDescribeCommand(deps: DescribeCommandDeps): Command {
// Resolve name → ID // Resolve name → ID
let id: string; let id: string;
if (resource === 'instances') {
// Instances: accept instance ID or server name (resolve to first running instance)
try {
id = await resolveNameOrId(deps.client, resource, idOrName);
} catch {
// Not an instance ID — try as server name
const servers = await deps.client.get<Array<{ id: string; name: string }>>('/api/v1/servers');
const server = servers.find((s) => s.name === idOrName || s.id === idOrName);
if (server) {
const instances = await deps.client.get<Array<{ id: string; status: string }>>(`/api/v1/instances?serverId=${server.id}`);
const running = instances.find((i) => i.status === 'RUNNING') ?? instances[0];
if (running) {
id = running.id;
} else {
throw new Error(`No instances found for server '${idOrName}'`);
}
} else {
id = idOrName;
}
}
} else {
try { try {
id = await resolveNameOrId(deps.client, resource, idOrName); id = await resolveNameOrId(deps.client, resource, idOrName);
} catch { } catch {
id = idOrName; id = idOrName;
} }
}
const item = await deps.fetchResource(resource, id) as Record<string, unknown>; const item = await deps.fetchResource(resource, id) as Record<string, unknown>;

View File

@@ -6,31 +6,65 @@ export interface LogsCommandDeps {
log: (...args: unknown[]) => void; log: (...args: unknown[]) => void;
} }
interface InstanceInfo {
id: string;
status: string;
containerId: string | null;
}
/** /**
* Resolve a name/ID to an instance ID. * Resolve a name/ID to an instance ID.
* Accepts: instance ID, server name, or server ID. * Accepts: instance ID, server name, or server ID.
* For servers, picks the first RUNNING instance. * For servers with multiple replicas, picks by --instance index or first RUNNING.
*/ */
async function resolveInstanceId(client: ApiClient, nameOrId: string): Promise<string> { async function resolveInstance(
client: ApiClient,
nameOrId: string,
instanceIndex?: number,
): Promise<{ instanceId: string; serverName?: string; replicaInfo?: string }> {
// Try as instance ID first // Try as instance ID first
try { try {
await client.get(`/api/v1/instances/${nameOrId}`); await client.get(`/api/v1/instances/${nameOrId}`);
return nameOrId; return { instanceId: nameOrId };
} catch { } catch {
// Not a valid instance ID // Not a valid instance ID
} }
// Try as server name → find its instances // Try as server name/ID → find its instances
const servers = await client.get<Array<{ id: string; name: string }>>('/api/v1/servers'); const servers = await client.get<Array<{ id: string; name: string }>>('/api/v1/servers');
const server = servers.find((s) => s.name === nameOrId || s.id === nameOrId); const server = servers.find((s) => s.name === nameOrId || s.id === nameOrId);
if (server) { if (!server) {
const instances = await client.get<Array<{ id: string; status: string }>>(`/api/v1/instances?serverId=${server.id}`); throw new Error(`Instance or server '${nameOrId}' not found`);
const running = instances.find((i) => i.status === 'RUNNING') ?? instances[0];
if (running) return running.id;
throw new Error(`No instances found for server '${nameOrId}'`);
} }
throw new Error(`Instance or server '${nameOrId}' not found`); const instances = await client.get<InstanceInfo[]>(`/api/v1/instances?serverId=${server.id}`);
if (instances.length === 0) {
throw new Error(`No instances found for server '${server.name}'`);
}
// Select by index or pick first running
let selected: InstanceInfo | undefined;
if (instanceIndex !== undefined) {
if (instanceIndex < 0 || instanceIndex >= instances.length) {
throw new Error(`Instance index ${instanceIndex} out of range (server '${server.name}' has ${instances.length} instance${instances.length > 1 ? 's' : ''})`);
}
selected = instances[instanceIndex];
} else {
selected = instances.find((i) => i.status === 'RUNNING') ?? instances[0];
}
if (!selected) {
throw new Error(`No instances found for server '${server.name}'`);
}
const result: { instanceId: string; serverName?: string; replicaInfo?: string } = {
instanceId: selected.id,
serverName: server.name,
};
if (instances.length > 1) {
result.replicaInfo = `instance ${instances.indexOf(selected) + 1}/${instances.length}`;
}
return result;
} }
export function createLogsCommand(deps: LogsCommandDeps): Command { export function createLogsCommand(deps: LogsCommandDeps): Command {
@@ -40,8 +74,15 @@ export function createLogsCommand(deps: LogsCommandDeps): Command {
.description('Get logs from an MCP server instance') .description('Get logs from an MCP server instance')
.argument('<name>', 'Server name, server ID, or instance ID') .argument('<name>', 'Server name, server ID, or instance ID')
.option('-t, --tail <lines>', 'Number of lines to show') .option('-t, --tail <lines>', 'Number of lines to show')
.action(async (nameOrId: string, opts: { tail?: string }) => { .option('-i, --instance <index>', 'Instance/replica index (0-based, for servers with multiple replicas)')
const instanceId = await resolveInstanceId(client, nameOrId); .action(async (nameOrId: string, opts: { tail?: string; instance?: string }) => {
const instanceIndex = opts.instance !== undefined ? parseInt(opts.instance, 10) : undefined;
const { instanceId, serverName, replicaInfo } = await resolveInstance(client, nameOrId, instanceIndex);
if (replicaInfo) {
process.stderr.write(`Showing logs for ${serverName} (${replicaInfo})\n`);
}
let url = `/api/v1/instances/${instanceId}/logs`; let url = `/api/v1/instances/${instanceId}/logs`;
if (opts.tail) { if (opts.tail) {
url += `?tail=${opts.tail}`; url += `?tail=${opts.tail}`;

View File

@@ -139,4 +139,152 @@ describe('describe command', () => {
expect(text).toContain('RUNNING'); expect(text).toContain('RUNNING');
expect(text).toContain('abc123'); expect(text).toContain('abc123');
}); });
it('resolves server name to instance for describe instance', async () => {
const deps = makeDeps({
id: 'inst-1',
serverId: 'srv-1',
server: { name: 'my-grafana' },
status: 'RUNNING',
containerId: 'abc123',
port: 3000,
});
// resolveNameOrId will throw (not a CUID, name won't match instances)
vi.mocked(deps.client.get)
.mockResolvedValueOnce([] as never) // instances list (no name match)
.mockResolvedValueOnce([{ id: 'srv-1', name: 'my-grafana' }] as never) // servers list
.mockResolvedValueOnce([{ id: 'inst-1', status: 'RUNNING' }] as never); // instances for server
const cmd = createDescribeCommand(deps);
await cmd.parseAsync(['node', 'test', 'instance', 'my-grafana']);
expect(deps.fetchResource).toHaveBeenCalledWith('instances', 'inst-1');
});
it('resolves server name and picks running instance over stopped', async () => {
const deps = makeDeps({
id: 'inst-2',
serverId: 'srv-1',
server: { name: 'my-ha' },
status: 'RUNNING',
containerId: 'def456',
});
vi.mocked(deps.client.get)
.mockResolvedValueOnce([] as never) // instances list
.mockResolvedValueOnce([{ id: 'srv-1', name: 'my-ha' }] as never)
.mockResolvedValueOnce([
{ id: 'inst-1', status: 'ERROR' },
{ id: 'inst-2', status: 'RUNNING' },
] as never);
const cmd = createDescribeCommand(deps);
await cmd.parseAsync(['node', 'test', 'instance', 'my-ha']);
expect(deps.fetchResource).toHaveBeenCalledWith('instances', 'inst-2');
});
it('throws when no instances found for server name', async () => {
const deps = makeDeps();
vi.mocked(deps.client.get)
.mockResolvedValueOnce([] as never) // instances list
.mockResolvedValueOnce([{ id: 'srv-1', name: 'my-server' }] as never)
.mockResolvedValueOnce([] as never); // no instances
const cmd = createDescribeCommand(deps);
await expect(cmd.parseAsync(['node', 'test', 'instance', 'my-server'])).rejects.toThrow(
/No instances found/,
);
});
it('shows instance with server name in header', async () => {
const deps = makeDeps({
id: 'inst-1',
serverId: 'srv-1',
server: { name: 'my-grafana' },
status: 'RUNNING',
containerId: 'abc123',
port: 3000,
});
const cmd = createDescribeCommand(deps);
await cmd.parseAsync(['node', 'test', 'instance', 'inst-1']);
const text = deps.output.join('\n');
expect(text).toContain('=== Instance: my-grafana ===');
});
it('shows instance health and events', async () => {
const deps = makeDeps({
id: 'inst-1',
serverId: 'srv-1',
server: { name: 'my-grafana' },
status: 'RUNNING',
containerId: 'abc123',
healthStatus: 'healthy',
lastHealthCheck: '2025-01-15T10:30:00Z',
events: [
{ timestamp: '2025-01-15T10:30:00Z', type: 'Normal', message: 'Health check passed (45ms)' },
],
});
const cmd = createDescribeCommand(deps);
await cmd.parseAsync(['node', 'test', 'instance', 'inst-1']);
const text = deps.output.join('\n');
expect(text).toContain('Health:');
expect(text).toContain('healthy');
expect(text).toContain('Events:');
expect(text).toContain('Health check passed');
});
it('shows server healthCheck section', async () => {
const deps = makeDeps({
id: 'srv-1',
name: 'my-grafana',
transport: 'STDIO',
healthCheck: {
tool: 'list_datasources',
arguments: {},
intervalSeconds: 60,
timeoutSeconds: 10,
failureThreshold: 3,
},
});
const cmd = createDescribeCommand(deps);
await cmd.parseAsync(['node', 'test', 'server', 'srv-1']);
const text = deps.output.join('\n');
expect(text).toContain('Health Check:');
expect(text).toContain('list_datasources');
expect(text).toContain('60s');
expect(text).toContain('Failure Threshold:');
});
it('shows template detail with healthCheck and usage', async () => {
const deps = makeDeps({
id: 'tpl-1',
name: 'grafana',
transport: 'STDIO',
version: '1.0.0',
packageName: '@leval/mcp-grafana',
env: [
{ name: 'GRAFANA_URL', required: true, description: 'Grafana instance URL' },
],
healthCheck: {
tool: 'list_datasources',
arguments: {},
intervalSeconds: 60,
timeoutSeconds: 10,
failureThreshold: 3,
},
});
const cmd = createDescribeCommand(deps);
await cmd.parseAsync(['node', 'test', 'template', 'tpl-1']);
const text = deps.output.join('\n');
expect(text).toContain('=== Template: grafana ===');
expect(text).toContain('@leval/mcp-grafana');
expect(text).toContain('GRAFANA_URL');
expect(text).toContain('Health Check:');
expect(text).toContain('list_datasources');
expect(text).toContain('mcpctl create server my-grafana --from-template=grafana');
});
}); });

View File

@@ -68,16 +68,79 @@ describe('logs command', () => {
output = []; output = [];
}); });
it('shows logs', async () => { it('shows logs by instance ID', async () => {
vi.mocked(client.get).mockResolvedValue({ stdout: 'hello world\n', stderr: '' }); vi.mocked(client.get)
.mockResolvedValueOnce({ id: 'inst-1', status: 'RUNNING' } as never) // instance lookup
.mockResolvedValueOnce({ stdout: 'hello world\n', stderr: '' } as never); // logs
const cmd = createLogsCommand({ client, log }); const cmd = createLogsCommand({ client, log });
await cmd.parseAsync(['inst-1'], { from: 'user' }); await cmd.parseAsync(['inst-1'], { from: 'user' });
expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-1');
expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-1/logs'); expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-1/logs');
expect(output.join('\n')).toContain('hello world'); expect(output.join('\n')).toContain('hello world');
}); });
it('resolves server name to instance ID', async () => {
vi.mocked(client.get)
.mockRejectedValueOnce(new Error('not found')) // instance lookup fails
.mockResolvedValueOnce([{ id: 'srv-1', name: 'my-grafana' }] as never) // servers list
.mockResolvedValueOnce([{ id: 'inst-1', status: 'RUNNING', containerId: 'abc' }] as never) // instances for server
.mockResolvedValueOnce({ stdout: 'grafana logs\n', stderr: '' } as never); // logs
const cmd = createLogsCommand({ client, log });
await cmd.parseAsync(['my-grafana'], { from: 'user' });
expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-1/logs');
expect(output.join('\n')).toContain('grafana logs');
});
it('picks RUNNING instance over others', async () => {
vi.mocked(client.get)
.mockRejectedValueOnce(new Error('not found'))
.mockResolvedValueOnce([{ id: 'srv-1', name: 'ha-mcp' }] as never)
.mockResolvedValueOnce([
{ id: 'inst-err', status: 'ERROR', containerId: null },
{ id: 'inst-ok', status: 'RUNNING', containerId: 'abc' },
] as never)
.mockResolvedValueOnce({ stdout: 'running instance\n', stderr: '' } as never);
const cmd = createLogsCommand({ client, log });
await cmd.parseAsync(['ha-mcp'], { from: 'user' });
expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-ok/logs');
});
it('selects specific replica with --instance', async () => {
vi.mocked(client.get)
.mockRejectedValueOnce(new Error('not found'))
.mockResolvedValueOnce([{ id: 'srv-1', name: 'ha-mcp' }] as never)
.mockResolvedValueOnce([
{ id: 'inst-0', status: 'RUNNING', containerId: 'a' },
{ id: 'inst-1', status: 'RUNNING', containerId: 'b' },
] as never)
.mockResolvedValueOnce({ stdout: 'replica 1\n', stderr: '' } as never);
const cmd = createLogsCommand({ client, log });
await cmd.parseAsync(['ha-mcp', '-i', '1'], { from: 'user' });
expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-1/logs');
});
it('throws on out-of-range --instance index', async () => {
vi.mocked(client.get)
.mockRejectedValueOnce(new Error('not found'))
.mockResolvedValueOnce([{ id: 'srv-1', name: 'ha-mcp' }] as never)
.mockResolvedValueOnce([{ id: 'inst-0', status: 'RUNNING' }] as never);
const cmd = createLogsCommand({ client, log });
await expect(cmd.parseAsync(['ha-mcp', '-i', '5'], { from: 'user' })).rejects.toThrow('out of range');
});
it('throws when server has no instances', async () => {
vi.mocked(client.get)
.mockRejectedValueOnce(new Error('not found'))
.mockResolvedValueOnce([{ id: 'srv-1', name: 'empty-srv' }] as never)
.mockResolvedValueOnce([] as never);
const cmd = createLogsCommand({ client, log });
await expect(cmd.parseAsync(['empty-srv'], { from: 'user' })).rejects.toThrow('No instances found');
});
it('passes tail option', async () => { it('passes tail option', async () => {
vi.mocked(client.get).mockResolvedValue({ stdout: '', stderr: '' }); vi.mocked(client.get)
.mockResolvedValueOnce({ id: 'inst-1' } as never)
.mockResolvedValueOnce({ stdout: '', stderr: '' } as never);
const cmd = createLogsCommand({ client, log }); const cmd = createLogsCommand({ client, log });
await cmd.parseAsync(['inst-1', '-t', '50'], { from: 'user' }); await cmd.parseAsync(['inst-1', '-t', '50'], { from: 'user' });
expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-1/logs?tail=50'); expect(client.get).toHaveBeenCalledWith('/api/v1/instances/inst-1/logs?tail=50');

View File

@@ -23,6 +23,7 @@ async function createUser(overrides: { email?: string; name?: string; role?: 'US
data: { data: {
email: overrides.email ?? `test-${Date.now()}@example.com`, email: overrides.email ?? `test-${Date.now()}@example.com`,
name: overrides.name ?? 'Test User', name: overrides.name ?? 'Test User',
passwordHash: '$2b$10$test-hash-placeholder',
role: overrides.role ?? 'USER', role: overrides.role ?? 'USER',
}, },
}); });

View File

@@ -29,6 +29,7 @@ import {
AuthService, AuthService,
McpProxyService, McpProxyService,
TemplateService, TemplateService,
HealthProbeRunner,
} from './services/index.js'; } from './services/index.js';
import { import {
registerMcpServerRoutes, registerMcpServerRoutes,
@@ -134,9 +135,32 @@ async function main(): Promise<void> {
await app.listen({ port: config.port, host: config.host }); await app.listen({ port: config.port, host: config.host });
app.log.info(`mcpd listening on ${config.host}:${config.port}`); app.log.info(`mcpd listening on ${config.host}:${config.port}`);
// Periodic container liveness sync — detect crashed containers
const SYNC_INTERVAL_MS = 30_000; // 30s
const syncTimer = setInterval(async () => {
try {
await instanceService.syncStatus();
} catch (err) {
app.log.error({ err }, 'Container status sync failed');
}
}, SYNC_INTERVAL_MS);
// Health probe runner — periodic MCP tool-call probes (like k8s livenessProbe)
const healthProbeRunner = new HealthProbeRunner(
instanceRepo,
serverRepo,
orchestrator,
{ info: (msg) => app.log.info(msg), error: (obj, msg) => app.log.error(obj, msg) },
);
healthProbeRunner.start(15_000);
// Graceful shutdown // Graceful shutdown
setupGracefulShutdown(app, { setupGracefulShutdown(app, {
disconnectDb: () => prisma.$disconnect(), disconnectDb: async () => {
clearInterval(syncTimer);
healthProbeRunner.stop();
await prisma.$disconnect();
},
}); });
} }

View File

@@ -17,7 +17,10 @@ export class McpInstanceRepository implements IMcpInstanceRepository {
} }
async findById(id: string): Promise<McpInstance | null> { async findById(id: string): Promise<McpInstance | null> {
return this.prisma.mcpInstance.findUnique({ where: { id } }); return this.prisma.mcpInstance.findUnique({
where: { id },
include: { server: { select: { name: true } } },
});
} }
async findByContainerId(containerId: string): Promise<McpInstance | null> { async findByContainerId(containerId: string): Promise<McpInstance | null> {

View File

@@ -1,9 +1,11 @@
import Docker from 'dockerode'; import Docker from 'dockerode';
import { PassThrough } from 'node:stream';
import type { import type {
McpOrchestrator, McpOrchestrator,
ContainerSpec, ContainerSpec,
ContainerInfo, ContainerInfo,
ContainerLogs, ContainerLogs,
ExecResult,
} from '../orchestrator.js'; } from '../orchestrator.js';
import { DEFAULT_MEMORY_LIMIT } from '../orchestrator.js'; import { DEFAULT_MEMORY_LIMIT } from '../orchestrator.js';
@@ -80,6 +82,9 @@ export class DockerContainerManager implements McpOrchestrator {
Env: envArr, Env: envArr,
ExposedPorts: exposedPorts, ExposedPorts: exposedPorts,
Labels: labels, Labels: labels,
// Keep stdin open for STDIO MCP servers (they read from stdin)
OpenStdin: true,
StdinOnce: false,
HostConfig: { HostConfig: {
PortBindings: portBindings, PortBindings: portBindings,
Memory: memoryLimit, Memory: memoryLimit,
@@ -133,6 +138,19 @@ export class DockerContainerManager implements McpOrchestrator {
if (port !== undefined) { if (port !== undefined) {
result.port = port; result.port = port;
} }
// Extract container IP from first non-default network
const networks = info.NetworkSettings?.Networks;
if (networks) {
for (const [, net] of Object.entries(networks)) {
const netInfo = net as { IPAddress?: string };
if (netInfo.IPAddress) {
result.ip = netInfo.IPAddress;
break;
}
}
}
return result; return result;
} }
@@ -158,4 +176,67 @@ export class DockerContainerManager implements McpOrchestrator {
// For simplicity we return everything as stdout. // For simplicity we return everything as stdout.
return { stdout: raw, stderr: '' }; return { stdout: raw, stderr: '' };
} }
async execInContainer(
containerId: string,
cmd: string[],
opts?: { stdin?: string; timeoutMs?: number },
): Promise<ExecResult> {
const container = this.docker.getContainer(containerId);
const hasStdin = opts?.stdin !== undefined;
const exec = await container.exec({
Cmd: cmd,
AttachStdin: hasStdin,
AttachStdout: true,
AttachStderr: true,
});
const stream = await exec.start({ hijack: hasStdin, stdin: hasStdin });
const timeoutMs = opts?.timeoutMs ?? 30_000;
return new Promise<ExecResult>((resolve, reject) => {
const stdout = new PassThrough();
const stderr = new PassThrough();
const stdoutChunks: Buffer[] = [];
const stderrChunks: Buffer[] = [];
stdout.on('data', (chunk: Buffer) => stdoutChunks.push(chunk));
stderr.on('data', (chunk: Buffer) => stderrChunks.push(chunk));
this.docker.modem.demuxStream(stream, stdout, stderr);
if (hasStdin) {
stream.write(opts!.stdin);
stream.end();
}
const timer = setTimeout(() => {
stream.destroy();
reject(new Error(`Exec timed out after ${timeoutMs}ms`));
}, timeoutMs);
stream.on('end', () => {
clearTimeout(timer);
exec.inspect().then((info) => {
resolve({
exitCode: (info as { ExitCode: number }).ExitCode,
stdout: Buffer.concat(stdoutChunks).toString('utf-8'),
stderr: Buffer.concat(stderrChunks).toString('utf-8'),
});
}).catch((err) => {
resolve({
exitCode: -1,
stdout: Buffer.concat(stdoutChunks).toString('utf-8'),
stderr: err instanceof Error ? err.message : String(err),
});
});
});
stream.on('error', (err: Error) => {
clearTimeout(timer);
reject(err);
});
});
}
} }

View File

@@ -0,0 +1,520 @@
import type { McpServer, McpInstance } from '@prisma/client';
import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js';
import type { McpOrchestrator } from './orchestrator.js';
export interface HealthCheckSpec {
tool: string;
arguments?: Record<string, unknown>;
intervalSeconds?: number;
timeoutSeconds?: number;
failureThreshold?: number;
}
export interface ProbeResult {
healthy: boolean;
latencyMs: number;
message: string;
}
interface ProbeState {
consecutiveFailures: number;
lastProbeAt: number;
}
/**
* Periodic health probe runner — calls MCP tools on running instances to verify
* they are alive and responsive. Mirrors Kubernetes liveness probe semantics.
*
* For STDIO servers: runs `docker exec` with a disposable MCP client script
* that sends initialize + tool/call via the package binary.
*
* For SSE/HTTP servers: sends HTTP JSON-RPC directly to the container port.
*/
export class HealthProbeRunner {
private probeStates = new Map<string, ProbeState>();
private timer: ReturnType<typeof setInterval> | null = null;
constructor(
private instanceRepo: IMcpInstanceRepository,
private serverRepo: IMcpServerRepository,
private orchestrator: McpOrchestrator,
private logger?: { info: (msg: string) => void; error: (obj: unknown, msg: string) => void },
) {}
/** Start the periodic probe loop. Runs every `tickIntervalMs` (default 15s). */
start(tickIntervalMs = 15_000): void {
if (this.timer) return;
this.timer = setInterval(() => {
this.tick().catch((err) => {
this.logger?.error({ err }, 'Health probe tick failed');
});
}, tickIntervalMs);
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
/** Single tick: probe all RUNNING instances that have healthCheck configs and are due. */
async tick(): Promise<void> {
const instances = await this.instanceRepo.findAll();
const running = instances.filter((i) => i.status === 'RUNNING' && i.containerId);
// Cache servers by ID to avoid repeated lookups
const serverCache = new Map<string, McpServer>();
for (const inst of running) {
let server = serverCache.get(inst.serverId);
if (!server) {
const s = await this.serverRepo.findById(inst.serverId);
if (!s) continue;
serverCache.set(inst.serverId, s);
server = s;
}
const healthCheck = server.healthCheck as HealthCheckSpec | null;
if (!healthCheck) continue;
const intervalMs = (healthCheck.intervalSeconds ?? 60) * 1000;
const state = this.probeStates.get(inst.id);
const now = Date.now();
// Skip if not due yet
if (state && (now - state.lastProbeAt) < intervalMs) continue;
await this.probeInstance(inst, server, healthCheck);
}
// Clean up states for instances that no longer exist
const activeIds = new Set(running.map((i) => i.id));
for (const key of this.probeStates.keys()) {
if (!activeIds.has(key)) {
this.probeStates.delete(key);
}
}
}
/** Probe a single instance and update its health status. */
async probeInstance(
instance: McpInstance,
server: McpServer,
healthCheck: HealthCheckSpec,
): Promise<ProbeResult> {
const timeoutMs = (healthCheck.timeoutSeconds ?? 10) * 1000;
const failureThreshold = healthCheck.failureThreshold ?? 3;
const now = new Date();
const start = Date.now();
let result: ProbeResult;
try {
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') {
result = await this.probeHttp(instance, server, healthCheck, timeoutMs);
} else {
result = await this.probeStdio(instance, server, healthCheck, timeoutMs);
}
} catch (err) {
result = {
healthy: false,
latencyMs: Date.now() - start,
message: err instanceof Error ? err.message : String(err),
};
}
// Update probe state
const state = this.probeStates.get(instance.id) ?? { consecutiveFailures: 0, lastProbeAt: 0 };
state.lastProbeAt = Date.now();
if (result.healthy) {
state.consecutiveFailures = 0;
} else {
state.consecutiveFailures++;
}
this.probeStates.set(instance.id, state);
// Determine health status
const healthStatus = result.healthy
? 'healthy'
: state.consecutiveFailures >= failureThreshold
? 'unhealthy'
: 'degraded';
// Build event
const eventType = result.healthy ? 'Normal' : 'Warning';
const eventMessage = result.healthy
? `Health check passed (${result.latencyMs}ms)`
: `Health check failed: ${result.message}`;
const existingEvents = (instance.events as Array<{ timestamp: string; type: string; message: string }>) ?? [];
// Keep last 50 events
const events = [
...existingEvents.slice(-49),
{ timestamp: now.toISOString(), type: eventType, message: eventMessage },
];
// Update instance
await this.instanceRepo.updateStatus(instance.id, instance.status as 'RUNNING', {
healthStatus,
lastHealthCheck: now,
events,
});
this.logger?.info(
`[health] ${(instance as unknown as { server?: { name: string } }).server?.name ?? instance.serverId}: ${healthStatus} (${result.latencyMs}ms) - ${eventMessage}`,
);
return result;
}
/** Probe an HTTP/SSE MCP server by sending a JSON-RPC tool call. */
private async probeHttp(
instance: McpInstance,
server: McpServer,
healthCheck: HealthCheckSpec,
timeoutMs: number,
): Promise<ProbeResult> {
if (!instance.containerId) {
return { healthy: false, latencyMs: 0, message: 'No container ID' };
}
// Get container IP for internal network communication
// (mcpd and MCP containers share the mcp-servers network)
const containerInfo = await this.orchestrator.inspectContainer(instance.containerId);
const containerPort = (server.containerPort as number | null) ?? 3000;
let baseUrl: string;
if (containerInfo.ip) {
baseUrl = `http://${containerInfo.ip}:${containerPort}`;
} else if (instance.port) {
baseUrl = `http://localhost:${instance.port}`;
} else {
return { healthy: false, latencyMs: 0, message: 'No container IP or port' };
}
if (server.transport === 'SSE') {
return this.probeSse(baseUrl, healthCheck, timeoutMs);
}
return this.probeStreamableHttp(baseUrl, healthCheck, timeoutMs);
}
/**
* Probe a streamable-http MCP server (POST to root endpoint).
*/
private async probeStreamableHttp(
baseUrl: string,
healthCheck: HealthCheckSpec,
timeoutMs: number,
): Promise<ProbeResult> {
const start = Date.now();
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
const initResp = await fetch(baseUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' },
body: JSON.stringify({
jsonrpc: '2.0', id: 1, method: 'initialize',
params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } },
}),
signal: controller.signal,
});
if (!initResp.ok) {
return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` };
}
const sessionId = initResp.headers.get('mcp-session-id');
const headers: Record<string, string> = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' };
if (sessionId) headers['Mcp-Session-Id'] = sessionId;
await fetch(baseUrl, {
method: 'POST', headers,
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
signal: controller.signal,
});
const toolResp = await fetch(baseUrl, {
method: 'POST', headers,
body: JSON.stringify({
jsonrpc: '2.0', id: 2, method: 'tools/call',
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
}),
signal: controller.signal,
});
const latencyMs = Date.now() - start;
if (!toolResp.ok) {
return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` };
}
const body = await toolResp.text();
try {
const parsed = JSON.parse(body.includes('data: ') ? body.split('data: ')[1]!.split('\n')[0]! : body);
if (parsed.error) {
return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' };
}
} catch {
// If parsing fails but HTTP was ok, consider it healthy
}
return { healthy: true, latencyMs, message: 'ok' };
} finally {
clearTimeout(timer);
}
}
/**
* Probe an SSE-transport MCP server.
* SSE protocol: GET /sse → endpoint event → POST /messages?session_id=...
*/
private async probeSse(
baseUrl: string,
healthCheck: HealthCheckSpec,
timeoutMs: number,
): Promise<ProbeResult> {
const start = Date.now();
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
// 1. Connect to SSE endpoint to get the message URL
const sseResp = await fetch(`${baseUrl}/sse`, {
method: 'GET',
headers: { 'Accept': 'text/event-stream' },
signal: controller.signal,
});
if (!sseResp.ok) {
return { healthy: false, latencyMs: Date.now() - start, message: `SSE connect HTTP ${sseResp.status}` };
}
// 2. Read the SSE stream to find the endpoint event
const reader = sseResp.body?.getReader();
if (!reader) {
return { healthy: false, latencyMs: Date.now() - start, message: 'No SSE stream body' };
}
const decoder = new TextDecoder();
let buffer = '';
let messagesUrl = '';
// Read until we get the endpoint event
while (!messagesUrl) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
for (const line of buffer.split('\n')) {
if (line.startsWith('data: ') && buffer.includes('event: endpoint')) {
const endpoint = line.slice(6).trim();
// Endpoint may be relative (e.g., /messages?session_id=...) or absolute
messagesUrl = endpoint.startsWith('http') ? endpoint : `${baseUrl}${endpoint}`;
}
}
// Keep only the last incomplete line
const lines = buffer.split('\n');
buffer = lines[lines.length - 1] ?? '';
}
if (!messagesUrl) {
reader.cancel();
return { healthy: false, latencyMs: Date.now() - start, message: 'No endpoint event from SSE' };
}
// 3. Initialize via the messages endpoint
const postHeaders = { 'Content-Type': 'application/json' };
const initResp = await fetch(messagesUrl, {
method: 'POST', headers: postHeaders,
body: JSON.stringify({
jsonrpc: '2.0', id: 1, method: 'initialize',
params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } },
}),
signal: controller.signal,
});
if (!initResp.ok) {
reader.cancel();
return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` };
}
// 4. Send initialized notification
await fetch(messagesUrl, {
method: 'POST', headers: postHeaders,
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
signal: controller.signal,
});
// 5. Call health check tool
const toolResp = await fetch(messagesUrl, {
method: 'POST', headers: postHeaders,
body: JSON.stringify({
jsonrpc: '2.0', id: 2, method: 'tools/call',
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
}),
signal: controller.signal,
});
const latencyMs = Date.now() - start;
// 6. Read tool response from SSE stream
// The response comes back on the SSE stream, not the POST response
let responseBuffer = '';
const readTimeout = setTimeout(() => reader.cancel(), 5000);
while (true) {
const { done, value } = await reader.read();
if (done) break;
responseBuffer += decoder.decode(value, { stream: true });
// Look for data lines containing our response (id: 2)
for (const line of responseBuffer.split('\n')) {
if (line.startsWith('data: ')) {
try {
const parsed = JSON.parse(line.slice(6));
if (parsed.id === 2) {
clearTimeout(readTimeout);
reader.cancel();
if (parsed.error) {
return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' };
}
return { healthy: true, latencyMs, message: 'ok' };
}
} catch {
// Not valid JSON, skip
}
}
}
const respLines = responseBuffer.split('\n');
responseBuffer = respLines[respLines.length - 1] ?? '';
}
clearTimeout(readTimeout);
reader.cancel();
// If POST response itself was ok (202 for SSE), consider it healthy
if (toolResp.ok) {
return { healthy: true, latencyMs, message: 'ok' };
}
return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` };
} finally {
clearTimeout(timer);
}
}
/**
* Probe a STDIO MCP server by running `docker exec` with a disposable Node.js
* script that pipes JSON-RPC messages into the package binary.
*/
private async probeStdio(
instance: McpInstance,
server: McpServer,
healthCheck: HealthCheckSpec,
timeoutMs: number,
): Promise<ProbeResult> {
if (!instance.containerId) {
return { healthy: false, latencyMs: 0, message: 'No container ID' };
}
const start = Date.now();
const packageName = server.packageName as string | null;
if (!packageName) {
return { healthy: false, latencyMs: 0, message: 'No package name for STDIO server' };
}
// Build JSON-RPC messages for the health probe
const initMsg = JSON.stringify({
jsonrpc: '2.0', id: 1, method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: { name: 'mcpctl-health', version: '0.1.0' },
},
});
const initializedMsg = JSON.stringify({
jsonrpc: '2.0', method: 'notifications/initialized',
});
const toolCallMsg = JSON.stringify({
jsonrpc: '2.0', id: 2, method: 'tools/call',
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
});
// Use a Node.js inline script that:
// 1. Spawns the MCP server binary via npx
// 2. Sends initialize + initialized + tool call via stdin
// 3. Reads responses from stdout
// 4. Exits with 0 if tool call succeeds, 1 if it fails
const probeScript = `
const { spawn } = require('child_process');
const proc = spawn('npx', ['--prefer-offline', '-y', ${JSON.stringify(packageName)}], { stdio: ['pipe', 'pipe', 'pipe'] });
let output = '';
let responded = false;
proc.stdout.on('data', d => {
output += d;
const lines = output.split('\\n');
for (const line of lines) {
if (!line.trim()) continue;
try {
const msg = JSON.parse(line);
if (msg.id === 2) {
responded = true;
if (msg.error) {
process.stdout.write('ERROR:' + (msg.error.message || 'unknown'));
proc.kill();
process.exit(1);
} else {
process.stdout.write('OK');
proc.kill();
process.exit(0);
}
}
} catch {}
}
output = lines[lines.length - 1] || '';
});
proc.stderr.on('data', () => {});
proc.on('error', e => { process.stdout.write('ERROR:' + e.message); process.exit(1); });
proc.on('exit', (code) => { if (!responded) { process.stdout.write('ERROR:process exited ' + code); process.exit(1); } });
setTimeout(() => { if (!responded) { process.stdout.write('ERROR:timeout'); proc.kill(); process.exit(1); } }, ${timeoutMs - 2000});
proc.stdin.write(${JSON.stringify(initMsg)} + '\\n');
setTimeout(() => {
proc.stdin.write(${JSON.stringify(initializedMsg)} + '\\n');
setTimeout(() => {
proc.stdin.write(${JSON.stringify(toolCallMsg)} + '\\n');
}, 500);
}, 500);
`.trim();
try {
const result = await this.orchestrator.execInContainer(
instance.containerId,
['node', '-e', probeScript],
{ timeoutMs },
);
const latencyMs = Date.now() - start;
if (result.exitCode === 0 && result.stdout.includes('OK')) {
return { healthy: true, latencyMs, message: 'ok' };
}
// Extract error message
const errorMatch = result.stdout.match(/ERROR:(.*)/);
const errorMsg = errorMatch?.[1] ?? (result.stderr.trim() || `exit code ${result.exitCode}`);
return { healthy: false, latencyMs, message: errorMsg };
} catch (err) {
return {
healthy: false,
latencyMs: Date.now() - start,
message: err instanceof Error ? err.message : String(err),
};
}
}
}

View File

@@ -5,7 +5,7 @@ export { ProjectService } from './project.service.js';
export { InstanceService, InvalidStateError } from './instance.service.js'; export { InstanceService, InvalidStateError } from './instance.service.js';
export { generateMcpConfig } from './mcp-config-generator.js'; export { generateMcpConfig } from './mcp-config-generator.js';
export type { McpConfig, McpConfigServer } from './mcp-config-generator.js'; export type { McpConfig, McpConfigServer } from './mcp-config-generator.js';
export type { McpOrchestrator, ContainerSpec, ContainerInfo, ContainerLogs } from './orchestrator.js'; export type { McpOrchestrator, ContainerSpec, ContainerInfo, ContainerLogs, ExecResult } from './orchestrator.js';
export { DEFAULT_MEMORY_LIMIT, DEFAULT_NANO_CPUS } from './orchestrator.js'; export { DEFAULT_MEMORY_LIMIT, DEFAULT_NANO_CPUS } from './orchestrator.js';
export { DockerContainerManager } from './docker/container-manager.js'; export { DockerContainerManager } from './docker/container-manager.js';
export { AuditLogService } from './audit-log.service.js'; export { AuditLogService } from './audit-log.service.js';
@@ -25,3 +25,5 @@ export type { LoginResult } from './auth.service.js';
export { McpProxyService } from './mcp-proxy-service.js'; export { McpProxyService } from './mcp-proxy-service.js';
export type { McpProxyRequest, McpProxyResponse } from './mcp-proxy-service.js'; export type { McpProxyRequest, McpProxyResponse } from './mcp-proxy-service.js';
export { TemplateService } from './template.service.js'; export { TemplateService } from './template.service.js';
export { HealthProbeRunner } from './health-probe.service.js';
export type { HealthCheckSpec, ProbeResult } from './health-probe.service.js';

View File

@@ -36,8 +36,41 @@ export class InstanceService {
return instance; return instance;
} }
/**
* Sync instance statuses with actual container state.
* Detects crashed/stopped containers and marks them ERROR.
*/
async syncStatus(): Promise<void> {
const instances = await this.instanceRepo.findAll();
for (const inst of instances) {
if ((inst.status === 'RUNNING' || inst.status === 'STARTING') && inst.containerId) {
try {
const info = await this.orchestrator.inspectContainer(inst.containerId);
if (info.state === 'stopped' || info.state === 'error') {
// Container died — get last logs for error context
let errorMsg = `Container ${info.state}`;
try {
const logs = await this.orchestrator.getContainerLogs(inst.containerId, { tail: 5 });
const lastLog = (logs.stdout || logs.stderr).trim().split('\n').pop();
if (lastLog) errorMsg = lastLog;
} catch { /* best-effort */ }
await this.instanceRepo.updateStatus(inst.id, 'ERROR', {
metadata: { error: errorMsg },
});
}
} catch {
// Container gone entirely
await this.instanceRepo.updateStatus(inst.id, 'ERROR', {
metadata: { error: 'Container not found' },
});
}
}
}
}
/** /**
* Reconcile instances for a server to match desired replica count. * Reconcile instances for a server to match desired replica count.
* - Syncs container statuses first (detect crashed containers)
* - If fewer running instances than replicas: start new ones * - If fewer running instances than replicas: start new ones
* - If more running instances than replicas: remove excess (oldest first) * - If more running instances than replicas: remove excess (oldest first)
*/ */
@@ -45,6 +78,9 @@ export class InstanceService {
const server = await this.serverRepo.findById(serverId); const server = await this.serverRepo.findById(serverId);
if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`); if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`);
// Sync container statuses before counting active instances
await this.syncStatus();
const instances = await this.instanceRepo.findAll(serverId); const instances = await this.instanceRepo.findAll(serverId);
const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING'); const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING');
const desired = server.replicas; const desired = server.replicas;

View File

@@ -3,6 +3,7 @@ import type {
ContainerSpec, ContainerSpec,
ContainerInfo, ContainerInfo,
ContainerLogs, ContainerLogs,
ExecResult,
} from '../orchestrator.js'; } from '../orchestrator.js';
import { K8sClient } from './k8s-client.js'; import { K8sClient } from './k8s-client.js';
import type { K8sClientConfig } from './k8s-client.js'; import type { K8sClientConfig } from './k8s-client.js';
@@ -164,6 +165,15 @@ export class KubernetesOrchestrator implements McpOrchestrator {
return { stdout, stderr: '' }; return { stdout, stderr: '' };
} }
async execInContainer(
_containerId: string,
_cmd: string[],
_opts?: { stdin?: string; timeoutMs?: number },
): Promise<ExecResult> {
// K8s exec via API — future implementation
throw new Error('execInContainer not yet implemented for Kubernetes');
}
async listContainers(namespace?: string): Promise<ContainerInfo[]> { async listContainers(namespace?: string): Promise<ContainerInfo[]> {
const ns = namespace ?? this.namespace; const ns = namespace ?? this.namespace;
const res = await this.client.get<K8sPodList>( const res = await this.client.get<K8sPodList>(

View File

@@ -30,6 +30,8 @@ export interface ContainerInfo {
name: string; name: string;
state: 'running' | 'stopped' | 'starting' | 'error' | 'unknown'; state: 'running' | 'stopped' | 'starting' | 'error' | 'unknown';
port?: number; port?: number;
/** Container IP on the first non-default network (for internal communication) */
ip?: string;
createdAt: Date; createdAt: Date;
} }
@@ -38,6 +40,12 @@ export interface ContainerLogs {
stderr: string; stderr: string;
} }
export interface ExecResult {
exitCode: number;
stdout: string;
stderr: string;
}
export interface McpOrchestrator { export interface McpOrchestrator {
/** Pull an image if not present locally */ /** Pull an image if not present locally */
pullImage(image: string): Promise<void>; pullImage(image: string): Promise<void>;
@@ -57,6 +65,9 @@ export interface McpOrchestrator {
/** Get container logs */ /** Get container logs */
getContainerLogs(containerId: string, opts?: { tail?: number; since?: number }): Promise<ContainerLogs>; getContainerLogs(containerId: string, opts?: { tail?: number; since?: number }): Promise<ContainerLogs>;
/** Execute a command inside a running container with optional stdin */
execInContainer(containerId: string, cmd: string[], opts?: { stdin?: string; timeoutMs?: number }): Promise<ExecResult>;
/** Check if the orchestrator runtime is available */ /** Check if the orchestrator runtime is available */
ping(): Promise<boolean>; ping(): Promise<boolean>;
} }

View File

@@ -0,0 +1,355 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { HealthProbeRunner } from '../../src/services/health-probe.service.js';
import type { HealthCheckSpec } from '../../src/services/health-probe.service.js';
import type { IMcpInstanceRepository, IMcpServerRepository } from '../../src/repositories/interfaces.js';
import type { McpOrchestrator, ExecResult } from '../../src/services/orchestrator.js';
import type { McpInstance, McpServer } from '@prisma/client';
function makeInstance(overrides: Partial<McpInstance> = {}): McpInstance {
return {
id: 'inst-1',
serverId: 'srv-1',
status: 'RUNNING',
containerId: 'container-abc',
port: null,
healthStatus: null,
lastHealthCheck: null,
events: [],
metadata: {},
version: 1,
createdAt: new Date(),
updatedAt: new Date(),
...overrides,
} as McpInstance;
}
function makeServer(overrides: Partial<McpServer> = {}): McpServer {
return {
id: 'srv-1',
name: 'my-grafana',
transport: 'STDIO',
packageName: '@leval/mcp-grafana',
dockerImage: null,
externalUrl: null,
containerPort: null,
repositoryUrl: null,
description: null,
command: null,
env: [],
replicas: 1,
projectId: null,
healthCheck: {
tool: 'list_datasources',
arguments: {},
intervalSeconds: 60,
timeoutSeconds: 10,
failureThreshold: 3,
},
version: 1,
createdAt: new Date(),
updatedAt: new Date(),
...overrides,
} as McpServer;
}
function mockInstanceRepo(): IMcpInstanceRepository {
return {
findAll: vi.fn(async () => []),
findById: vi.fn(async () => null),
findByContainerId: vi.fn(async () => null),
create: vi.fn(async (data) => makeInstance(data)),
updateStatus: vi.fn(async (id, status, fields) => makeInstance({ id, status, ...fields })),
delete: vi.fn(async () => {}),
};
}
function mockServerRepo(): IMcpServerRepository {
return {
findAll: vi.fn(async () => []),
findById: vi.fn(async () => null),
findByName: vi.fn(async () => null),
create: vi.fn(async () => makeServer()),
update: vi.fn(async () => makeServer()),
delete: vi.fn(async () => {}),
};
}
function mockOrchestrator(): McpOrchestrator {
return {
pullImage: vi.fn(async () => {}),
createContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })),
stopContainer: vi.fn(async () => {}),
removeContainer: vi.fn(async () => {}),
inspectContainer: vi.fn(async () => ({ containerId: 'c1', name: 'test', state: 'running' as const, createdAt: new Date() })),
getContainerLogs: vi.fn(async () => ({ stdout: '', stderr: '' })),
execInContainer: vi.fn(async () => ({ exitCode: 0, stdout: 'OK', stderr: '' })),
ping: vi.fn(async () => true),
};
}
describe('HealthProbeRunner', () => {
let instanceRepo: IMcpInstanceRepository;
let serverRepo: IMcpServerRepository;
let orchestrator: McpOrchestrator;
let runner: HealthProbeRunner;
beforeEach(() => {
instanceRepo = mockInstanceRepo();
serverRepo = mockServerRepo();
orchestrator = mockOrchestrator();
runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator);
});
it('skips instances without healthCheck config', async () => {
const instance = makeInstance();
const server = makeServer({ healthCheck: null });
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
await runner.tick();
expect(orchestrator.execInContainer).not.toHaveBeenCalled();
expect(instanceRepo.updateStatus).not.toHaveBeenCalled();
});
it('skips non-RUNNING instances', async () => {
const instance = makeInstance({ status: 'ERROR' });
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
await runner.tick();
expect(serverRepo.findById).not.toHaveBeenCalled();
});
it('probes STDIO instance with exec and marks healthy on success', async () => {
const instance = makeInstance();
const server = makeServer();
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 0,
stdout: 'OK',
stderr: '',
});
await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledWith(
'container-abc',
expect.arrayContaining(['node', '-e']),
expect.objectContaining({ timeoutMs: 10000 }),
);
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
'inst-1',
'RUNNING',
expect.objectContaining({
healthStatus: 'healthy',
lastHealthCheck: expect.any(Date),
events: expect.arrayContaining([
expect.objectContaining({ type: 'Normal', message: expect.stringContaining('passed') }),
]),
}),
);
});
it('marks unhealthy after failureThreshold consecutive failures', async () => {
const instance = makeInstance();
const healthCheck: HealthCheckSpec = {
tool: 'list_datasources',
arguments: {},
intervalSeconds: 0, // always due
failureThreshold: 2,
};
const server = makeServer({ healthCheck: healthCheck as unknown as undefined });
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 1,
stdout: 'ERROR:connection refused',
stderr: '',
});
// First failure → degraded
await runner.tick();
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
'inst-1',
'RUNNING',
expect.objectContaining({ healthStatus: 'degraded' }),
);
// Second failure → unhealthy (threshold = 2)
await runner.tick();
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
'inst-1',
'RUNNING',
expect.objectContaining({ healthStatus: 'unhealthy' }),
);
});
it('resets failure count on success', async () => {
const instance = makeInstance();
const healthCheck: HealthCheckSpec = {
tool: 'list_datasources',
arguments: {},
intervalSeconds: 0,
failureThreshold: 3,
};
const server = makeServer({ healthCheck: healthCheck as unknown as undefined });
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
// Two failures
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 1, stdout: 'ERROR:fail', stderr: '',
});
await runner.tick();
await runner.tick();
// Then success — should reset to healthy
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '',
});
await runner.tick();
const lastCall = vi.mocked(instanceRepo.updateStatus).mock.calls.at(-1);
expect(lastCall?.[2]).toEqual(expect.objectContaining({ healthStatus: 'healthy' }));
});
it('handles exec timeout as failure', async () => {
const instance = makeInstance();
const server = makeServer();
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockRejectedValue(new Error('Exec timed out after 10000ms'));
await runner.tick();
expect(instanceRepo.updateStatus).toHaveBeenCalledWith(
'inst-1',
'RUNNING',
expect.objectContaining({
healthStatus: 'degraded',
events: expect.arrayContaining([
expect.objectContaining({ type: 'Warning', message: expect.stringContaining('timed out') }),
]),
}),
);
});
it('appends events without losing history', async () => {
const existingEvents = [
{ timestamp: '2025-01-01T00:00:00Z', type: 'Normal', message: 'old event' },
];
const instance = makeInstance({ events: existingEvents });
const server = makeServer({
healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'],
});
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '',
});
await runner.tick();
const events = vi.mocked(instanceRepo.updateStatus).mock.calls[0]?.[2]?.events as unknown[];
expect(events).toHaveLength(2);
expect((events[0] as { message: string }).message).toBe('old event');
expect((events[1] as { message: string }).message).toContain('passed');
});
it('respects interval — skips probing if not due', async () => {
const instance = makeInstance();
const server = makeServer({
healthCheck: { tool: 'test', intervalSeconds: 300 } as McpServer['healthCheck'],
});
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '',
});
// First tick: should probe
await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
// Second tick immediately: should skip (300s interval not elapsed)
await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
});
it('cleans up probe states for removed instances', async () => {
const instance = makeInstance();
const server = makeServer({
healthCheck: { tool: 'test', intervalSeconds: 0 } as McpServer['healthCheck'],
});
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
vi.mocked(serverRepo.findById).mockResolvedValue(server);
await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(1);
// Instance removed
vi.mocked(instanceRepo.findAll).mockResolvedValue([]);
await runner.tick();
// Re-add same instance — should probe again (state was cleaned)
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
await runner.tick();
expect(orchestrator.execInContainer).toHaveBeenCalledTimes(2);
});
it('skips STDIO instances without containerId', async () => {
const instance = makeInstance({ containerId: null });
const server = makeServer();
// containerId is null, but status is RUNNING — shouldn't be probed
vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]);
await runner.tick();
expect(serverRepo.findById).not.toHaveBeenCalled();
});
it('probeInstance returns result directly', async () => {
const instance = makeInstance();
const server = makeServer();
const healthCheck: HealthCheckSpec = {
tool: 'list_datasources',
arguments: {},
};
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 0, stdout: 'OK', stderr: '',
});
const result = await runner.probeInstance(instance, server, healthCheck);
expect(result.healthy).toBe(true);
expect(result.latencyMs).toBeGreaterThanOrEqual(0);
expect(result.message).toBe('ok');
});
it('handles STDIO exec failure with error message', async () => {
const instance = makeInstance();
const server = makeServer();
const healthCheck: HealthCheckSpec = { tool: 'list_datasources', arguments: {} };
vi.mocked(orchestrator.execInContainer).mockResolvedValue({
exitCode: 1,
stdout: 'ERROR:ECONNREFUSED 10.0.0.1:3000',
stderr: '',
});
const result = await runner.probeInstance(instance, server, healthCheck);
expect(result.healthy).toBe(false);
expect(result.message).toBe('ECONNREFUSED 10.0.0.1:3000');
});
});

View File

@@ -1,16 +1,22 @@
name: home-assistant name: home-assistant
version: "1.0.0" version: "1.0.0"
description: Home Assistant MCP server for smart home control and entity management description: Home Assistant MCP server for smart home control and entity management
packageName: "home-assistant-mcp-server" dockerImage: "ghcr.io/homeassistant-ai/ha-mcp:latest"
transport: STDIO transport: SSE
repositoryUrl: https://github.com/tevonsb/homeassistant-mcp containerPort: 8086
repositoryUrl: https://github.com/homeassistant-ai/ha-mcp
command:
- python
- -c
- "from ha_mcp.server import HomeAssistantSmartMCPServer; s = HomeAssistantSmartMCPServer(); s.mcp.run(transport='sse', host='0.0.0.0', port=8086)"
healthCheck: healthCheck:
tool: get_entities tool: ha_search_entities
arguments: {} arguments:
query: "light"
env: env:
- name: HASS_URL - name: HOMEASSISTANT_URL
description: Home Assistant instance URL (e.g. http://homeassistant.local:8123) description: Home Assistant instance URL (e.g. http://homeassistant.local:8123)
required: true required: true
- name: HASS_TOKEN - name: HOMEASSISTANT_TOKEN
description: Home Assistant long-lived access token description: Home Assistant long-lived access token
required: true required: true