fix: SSE health probe uses proper SSE protocol (GET /sse + POST /messages)
SSE-transport MCP servers (like ha-mcp) use a different protocol flow: GET /sse to establish event stream, read endpoint event, then POST JSON-RPC messages to /messages?session_id=... URL. Previously was POSTing to root which returned 404. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -185,24 +185,35 @@ export class HealthProbeRunner {
|
|||||||
const containerInfo = await this.orchestrator.inspectContainer(instance.containerId);
|
const containerInfo = await this.orchestrator.inspectContainer(instance.containerId);
|
||||||
const containerPort = (server.containerPort as number | null) ?? 3000;
|
const containerPort = (server.containerPort as number | null) ?? 3000;
|
||||||
|
|
||||||
let url: string;
|
let baseUrl: string;
|
||||||
if (containerInfo.ip) {
|
if (containerInfo.ip) {
|
||||||
url = `http://${containerInfo.ip}:${containerPort}`;
|
baseUrl = `http://${containerInfo.ip}:${containerPort}`;
|
||||||
} else if (instance.port) {
|
} else if (instance.port) {
|
||||||
url = `http://localhost:${instance.port}`;
|
baseUrl = `http://localhost:${instance.port}`;
|
||||||
} else {
|
} else {
|
||||||
return { healthy: false, latencyMs: 0, message: 'No container IP or port' };
|
return { healthy: false, latencyMs: 0, message: 'No container IP or port' };
|
||||||
}
|
}
|
||||||
|
|
||||||
const start = Date.now();
|
if (server.transport === 'SSE') {
|
||||||
|
return this.probeSse(baseUrl, healthCheck, timeoutMs);
|
||||||
|
}
|
||||||
|
return this.probeStreamableHttp(baseUrl, healthCheck, timeoutMs);
|
||||||
|
}
|
||||||
|
|
||||||
// For HTTP servers, we need to initialize a session first, then call the tool
|
/**
|
||||||
|
* Probe a streamable-http MCP server (POST to root endpoint).
|
||||||
|
*/
|
||||||
|
private async probeStreamableHttp(
|
||||||
|
baseUrl: string,
|
||||||
|
healthCheck: HealthCheckSpec,
|
||||||
|
timeoutMs: number,
|
||||||
|
): Promise<ProbeResult> {
|
||||||
|
const start = Date.now();
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Initialize
|
const initResp = await fetch(baseUrl, {
|
||||||
const initResp = await fetch(url, {
|
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' },
|
headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' },
|
||||||
body: JSON.stringify({
|
body: JSON.stringify({
|
||||||
@@ -220,15 +231,13 @@ export class HealthProbeRunner {
|
|||||||
const headers: Record<string, string> = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' };
|
const headers: Record<string, string> = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' };
|
||||||
if (sessionId) headers['Mcp-Session-Id'] = sessionId;
|
if (sessionId) headers['Mcp-Session-Id'] = sessionId;
|
||||||
|
|
||||||
// Send initialized notification
|
await fetch(baseUrl, {
|
||||||
await fetch(url, {
|
|
||||||
method: 'POST', headers,
|
method: 'POST', headers,
|
||||||
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
|
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
|
||||||
signal: controller.signal,
|
signal: controller.signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Call health check tool
|
const toolResp = await fetch(baseUrl, {
|
||||||
const toolResp = await fetch(url, {
|
|
||||||
method: 'POST', headers,
|
method: 'POST', headers,
|
||||||
body: JSON.stringify({
|
body: JSON.stringify({
|
||||||
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
||||||
@@ -244,7 +253,6 @@ export class HealthProbeRunner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const body = await toolResp.text();
|
const body = await toolResp.text();
|
||||||
// Check for JSON-RPC error in response
|
|
||||||
try {
|
try {
|
||||||
const parsed = JSON.parse(body.includes('data: ') ? body.split('data: ')[1]!.split('\n')[0]! : body);
|
const parsed = JSON.parse(body.includes('data: ') ? body.split('data: ')[1]!.split('\n')[0]! : body);
|
||||||
if (parsed.error) {
|
if (parsed.error) {
|
||||||
@@ -260,6 +268,146 @@ export class HealthProbeRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Probe an SSE-transport MCP server.
|
||||||
|
* SSE protocol: GET /sse → endpoint event → POST /messages?session_id=...
|
||||||
|
*/
|
||||||
|
private async probeSse(
|
||||||
|
baseUrl: string,
|
||||||
|
healthCheck: HealthCheckSpec,
|
||||||
|
timeoutMs: number,
|
||||||
|
): Promise<ProbeResult> {
|
||||||
|
const start = Date.now();
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 1. Connect to SSE endpoint to get the message URL
|
||||||
|
const sseResp = await fetch(`${baseUrl}/sse`, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: { 'Accept': 'text/event-stream' },
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!sseResp.ok) {
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: `SSE connect HTTP ${sseResp.status}` };
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Read the SSE stream to find the endpoint event
|
||||||
|
const reader = sseResp.body?.getReader();
|
||||||
|
if (!reader) {
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: 'No SSE stream body' };
|
||||||
|
}
|
||||||
|
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let buffer = '';
|
||||||
|
let messagesUrl = '';
|
||||||
|
|
||||||
|
// Read until we get the endpoint event
|
||||||
|
while (!messagesUrl) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
|
||||||
|
for (const line of buffer.split('\n')) {
|
||||||
|
if (line.startsWith('data: ') && buffer.includes('event: endpoint')) {
|
||||||
|
const endpoint = line.slice(6).trim();
|
||||||
|
// Endpoint may be relative (e.g., /messages?session_id=...) or absolute
|
||||||
|
messagesUrl = endpoint.startsWith('http') ? endpoint : `${baseUrl}${endpoint}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Keep only the last incomplete line
|
||||||
|
const lines = buffer.split('\n');
|
||||||
|
buffer = lines[lines.length - 1] ?? '';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!messagesUrl) {
|
||||||
|
reader.cancel();
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: 'No endpoint event from SSE' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Initialize via the messages endpoint
|
||||||
|
const postHeaders = { 'Content-Type': 'application/json' };
|
||||||
|
|
||||||
|
const initResp = await fetch(messagesUrl, {
|
||||||
|
method: 'POST', headers: postHeaders,
|
||||||
|
body: JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 1, method: 'initialize',
|
||||||
|
params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } },
|
||||||
|
}),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!initResp.ok) {
|
||||||
|
reader.cancel();
|
||||||
|
return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` };
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Send initialized notification
|
||||||
|
await fetch(messagesUrl, {
|
||||||
|
method: 'POST', headers: postHeaders,
|
||||||
|
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
// 5. Call health check tool
|
||||||
|
const toolResp = await fetch(messagesUrl, {
|
||||||
|
method: 'POST', headers: postHeaders,
|
||||||
|
body: JSON.stringify({
|
||||||
|
jsonrpc: '2.0', id: 2, method: 'tools/call',
|
||||||
|
params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} },
|
||||||
|
}),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
const latencyMs = Date.now() - start;
|
||||||
|
|
||||||
|
// 6. Read tool response from SSE stream
|
||||||
|
// The response comes back on the SSE stream, not the POST response
|
||||||
|
let responseBuffer = '';
|
||||||
|
const readTimeout = setTimeout(() => reader.cancel(), 5000);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
responseBuffer += decoder.decode(value, { stream: true });
|
||||||
|
|
||||||
|
// Look for data lines containing our response (id: 2)
|
||||||
|
for (const line of responseBuffer.split('\n')) {
|
||||||
|
if (line.startsWith('data: ')) {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(line.slice(6));
|
||||||
|
if (parsed.id === 2) {
|
||||||
|
clearTimeout(readTimeout);
|
||||||
|
reader.cancel();
|
||||||
|
if (parsed.error) {
|
||||||
|
return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' };
|
||||||
|
}
|
||||||
|
return { healthy: true, latencyMs, message: 'ok' };
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Not valid JSON, skip
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const respLines = responseBuffer.split('\n');
|
||||||
|
responseBuffer = respLines[respLines.length - 1] ?? '';
|
||||||
|
}
|
||||||
|
|
||||||
|
clearTimeout(readTimeout);
|
||||||
|
reader.cancel();
|
||||||
|
|
||||||
|
// If POST response itself was ok (202 for SSE), consider it healthy
|
||||||
|
if (toolResp.ok) {
|
||||||
|
return { healthy: true, latencyMs, message: 'ok' };
|
||||||
|
}
|
||||||
|
|
||||||
|
return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` };
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Probe a STDIO MCP server by running `docker exec` with a disposable Node.js
|
* Probe a STDIO MCP server by running `docker exec` with a disposable Node.js
|
||||||
* script that pipes JSON-RPC messages into the package binary.
|
* script that pipes JSON-RPC messages into the package binary.
|
||||||
|
|||||||
Reference in New Issue
Block a user