Compare commits
5 Commits
feat/healt
...
fix/db-tes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a6e58274c | ||
|
|
c819b65175 | ||
|
|
c3ef5a664f | ||
|
|
4c2927a16e | ||
| 79dd6e723d |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -37,3 +37,4 @@ pgdata/
|
||||
|
||||
# Prisma
|
||||
src/db/prisma/migrations/*.sql.backup
|
||||
logs.sh
|
||||
|
||||
@@ -23,6 +23,7 @@ async function createUser(overrides: { email?: string; name?: string; role?: 'US
|
||||
data: {
|
||||
email: overrides.email ?? `test-${Date.now()}@example.com`,
|
||||
name: overrides.name ?? 'Test User',
|
||||
passwordHash: '$2b$10$test-hash-placeholder',
|
||||
role: overrides.role ?? 'USER',
|
||||
},
|
||||
});
|
||||
|
||||
@@ -138,6 +138,19 @@ export class DockerContainerManager implements McpOrchestrator {
|
||||
if (port !== undefined) {
|
||||
result.port = port;
|
||||
}
|
||||
|
||||
// Extract container IP from first non-default network
|
||||
const networks = info.NetworkSettings?.Networks;
|
||||
if (networks) {
|
||||
for (const [, net] of Object.entries(networks)) {
|
||||
const netInfo = net as { IPAddress?: string };
|
||||
if (netInfo.IPAddress) {
|
||||
result.ip = netInfo.IPAddress;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ export class HealthProbeRunner {
|
||||
|
||||
try {
|
||||
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') {
|
||||
result = await this.probeHttp(instance, healthCheck, timeoutMs);
|
||||
result = await this.probeHttp(instance, server, healthCheck, timeoutMs);
|
||||
} else {
|
||||
result = await this.probeStdio(instance, server, healthCheck, timeoutMs);
|
||||
}
|
||||
@@ -172,22 +172,48 @@ export class HealthProbeRunner {
|
||||
/** Probe an HTTP/SSE MCP server by sending a JSON-RPC tool call. */
|
||||
private async probeHttp(
|
||||
instance: McpInstance,
|
||||
server: McpServer,
|
||||
healthCheck: HealthCheckSpec,
|
||||
timeoutMs: number,
|
||||
): Promise<ProbeResult> {
|
||||
if (!instance.port) {
|
||||
return { healthy: false, latencyMs: 0, message: 'No port assigned' };
|
||||
if (!instance.containerId) {
|
||||
return { healthy: false, latencyMs: 0, message: 'No container ID' };
|
||||
}
|
||||
|
||||
const start = Date.now();
|
||||
// Get container IP for internal network communication
|
||||
// (mcpd and MCP containers share the mcp-servers network)
|
||||
const containerInfo = await this.orchestrator.inspectContainer(instance.containerId);
|
||||
const containerPort = (server.containerPort as number | null) ?? 3000;
|
||||
|
||||
// For HTTP servers, we need to initialize a session first, then call the tool
|
||||
let baseUrl: string;
|
||||
if (containerInfo.ip) {
|
||||
baseUrl = `http://${containerInfo.ip}:${containerPort}`;
|
||||
} else if (instance.port) {
|
||||
baseUrl = `http://localhost:${instance.port}`;
|
||||
} else {
|
||||
return { healthy: false, latencyMs: 0, message: 'No container IP or port' };
|
||||
}
|
||||
|
||||
if (server.transport === 'SSE') {
|
||||
return this.probeSse(baseUrl, healthCheck, timeoutMs);
|
||||
}
|
||||
return this.probeStreamableHttp(baseUrl, healthCheck, timeoutMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe a streamable-http MCP server (POST to root endpoint).
|
||||
*/
|
||||
private async probeStreamableHttp(
|
||||
baseUrl: string,
|
||||
healthCheck: HealthCheckSpec,
|
||||
timeoutMs: number,
|
||||
): Promise<ProbeResult> {
|
||||
const start = Date.now();
|
||||
const controller = new AbortController();
|
||||
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||
|
||||
try {
|
||||
// Initialize
|
||||
const initResp = await fetch(`http://localhost:${instance.port}`, {
|
||||
const initResp = await fetch(baseUrl, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' },
|
||||
body: JSON.stringify({
|
||||
@@ -205,15 +231,13 @@ export class HealthProbeRunner {
|
||||
const headers: Record<string, string> = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' };
|
||||
if (sessionId) headers['Mcp-Session-Id'] = sessionId;
|
||||
|
||||
// Send initialized notification
|
||||
await fetch(`http://localhost:${instance.port}`, {
|
||||
await fetch(baseUrl, {
|
||||
method: 'POST', headers,
|
||||
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
// Call health check tool
|
||||
const toolResp = await fetch(`http://localhost:${instance.port}`, {
|
||||
const toolResp = await fetch(baseUrl, {
|
||||
method: 'POST', headers,
|
||||
body: JSON.stringify({
|
||||
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
||||
@@ -229,7 +253,6 @@ export class HealthProbeRunner {
|
||||
}
|
||||
|
||||
const body = await toolResp.text();
|
||||
// Check for JSON-RPC error in response
|
||||
try {
|
||||
const parsed = JSON.parse(body.includes('data: ') ? body.split('data: ')[1]!.split('\n')[0]! : body);
|
||||
if (parsed.error) {
|
||||
@@ -245,6 +268,146 @@ export class HealthProbeRunner {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe an SSE-transport MCP server.
|
||||
* SSE protocol: GET /sse → endpoint event → POST /messages?session_id=...
|
||||
*/
|
||||
private async probeSse(
|
||||
baseUrl: string,
|
||||
healthCheck: HealthCheckSpec,
|
||||
timeoutMs: number,
|
||||
): Promise<ProbeResult> {
|
||||
const start = Date.now();
|
||||
const controller = new AbortController();
|
||||
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||
|
||||
try {
|
||||
// 1. Connect to SSE endpoint to get the message URL
|
||||
const sseResp = await fetch(`${baseUrl}/sse`, {
|
||||
method: 'GET',
|
||||
headers: { 'Accept': 'text/event-stream' },
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!sseResp.ok) {
|
||||
return { healthy: false, latencyMs: Date.now() - start, message: `SSE connect HTTP ${sseResp.status}` };
|
||||
}
|
||||
|
||||
// 2. Read the SSE stream to find the endpoint event
|
||||
const reader = sseResp.body?.getReader();
|
||||
if (!reader) {
|
||||
return { healthy: false, latencyMs: Date.now() - start, message: 'No SSE stream body' };
|
||||
}
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
let messagesUrl = '';
|
||||
|
||||
// Read until we get the endpoint event
|
||||
while (!messagesUrl) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
for (const line of buffer.split('\n')) {
|
||||
if (line.startsWith('data: ') && buffer.includes('event: endpoint')) {
|
||||
const endpoint = line.slice(6).trim();
|
||||
// Endpoint may be relative (e.g., /messages?session_id=...) or absolute
|
||||
messagesUrl = endpoint.startsWith('http') ? endpoint : `${baseUrl}${endpoint}`;
|
||||
}
|
||||
}
|
||||
// Keep only the last incomplete line
|
||||
const lines = buffer.split('\n');
|
||||
buffer = lines[lines.length - 1] ?? '';
|
||||
}
|
||||
|
||||
if (!messagesUrl) {
|
||||
reader.cancel();
|
||||
return { healthy: false, latencyMs: Date.now() - start, message: 'No endpoint event from SSE' };
|
||||
}
|
||||
|
||||
// 3. Initialize via the messages endpoint
|
||||
const postHeaders = { 'Content-Type': 'application/json' };
|
||||
|
||||
const initResp = await fetch(messagesUrl, {
|
||||
method: 'POST', headers: postHeaders,
|
||||
body: JSON.stringify({
|
||||
jsonrpc: '2.0', id: 1, method: 'initialize',
|
||||
params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } },
|
||||
}),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!initResp.ok) {
|
||||
reader.cancel();
|
||||
return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` };
|
||||
}
|
||||
|
||||
// 4. Send initialized notification
|
||||
await fetch(messagesUrl, {
|
||||
method: 'POST', headers: postHeaders,
|
||||
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
// 5. Call health check tool
|
||||
const toolResp = await fetch(messagesUrl, {
|
||||
method: 'POST', headers: postHeaders,
|
||||
body: JSON.stringify({
|
||||
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
||||
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
|
||||
}),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
const latencyMs = Date.now() - start;
|
||||
|
||||
// 6. Read tool response from SSE stream
|
||||
// The response comes back on the SSE stream, not the POST response
|
||||
let responseBuffer = '';
|
||||
const readTimeout = setTimeout(() => reader.cancel(), 5000);
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
responseBuffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Look for data lines containing our response (id: 2)
|
||||
for (const line of responseBuffer.split('\n')) {
|
||||
if (line.startsWith('data: ')) {
|
||||
try {
|
||||
const parsed = JSON.parse(line.slice(6));
|
||||
if (parsed.id === 2) {
|
||||
clearTimeout(readTimeout);
|
||||
reader.cancel();
|
||||
if (parsed.error) {
|
||||
return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' };
|
||||
}
|
||||
return { healthy: true, latencyMs, message: 'ok' };
|
||||
}
|
||||
} catch {
|
||||
// Not valid JSON, skip
|
||||
}
|
||||
}
|
||||
}
|
||||
const respLines = responseBuffer.split('\n');
|
||||
responseBuffer = respLines[respLines.length - 1] ?? '';
|
||||
}
|
||||
|
||||
clearTimeout(readTimeout);
|
||||
reader.cancel();
|
||||
|
||||
// If POST response itself was ok (202 for SSE), consider it healthy
|
||||
if (toolResp.ok) {
|
||||
return { healthy: true, latencyMs, message: 'ok' };
|
||||
}
|
||||
|
||||
return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` };
|
||||
} finally {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe a STDIO MCP server by running `docker exec` with a disposable Node.js
|
||||
* script that pipes JSON-RPC messages into the package binary.
|
||||
|
||||
@@ -30,6 +30,8 @@ export interface ContainerInfo {
|
||||
name: string;
|
||||
state: 'running' | 'stopped' | 'starting' | 'error' | 'unknown';
|
||||
port?: number;
|
||||
/** Container IP on the first non-default network (for internal communication) */
|
||||
ip?: string;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,16 +1,22 @@
|
||||
name: home-assistant
|
||||
version: "1.0.0"
|
||||
description: Home Assistant MCP server for smart home control and entity management
|
||||
packageName: "home-assistant-mcp-server"
|
||||
transport: STDIO
|
||||
repositoryUrl: https://github.com/tevonsb/homeassistant-mcp
|
||||
dockerImage: "ghcr.io/homeassistant-ai/ha-mcp:latest"
|
||||
transport: SSE
|
||||
containerPort: 8086
|
||||
repositoryUrl: https://github.com/homeassistant-ai/ha-mcp
|
||||
command:
|
||||
- python
|
||||
- -c
|
||||
- "from ha_mcp.server import HomeAssistantSmartMCPServer; s = HomeAssistantSmartMCPServer(); s.mcp.run(transport='sse', host='0.0.0.0', port=8086)"
|
||||
healthCheck:
|
||||
tool: get_entities
|
||||
arguments: {}
|
||||
tool: ha_search_entities
|
||||
arguments:
|
||||
query: "light"
|
||||
env:
|
||||
- name: HASS_URL
|
||||
- name: HOMEASSISTANT_URL
|
||||
description: Home Assistant instance URL (e.g. http://homeassistant.local:8123)
|
||||
required: true
|
||||
- name: HASS_TOKEN
|
||||
- name: HOMEASSISTANT_TOKEN
|
||||
description: Home Assistant long-lived access token
|
||||
required: true
|
||||
|
||||
Reference in New Issue
Block a user