diff --git a/src/mcpd/src/services/health-probe.service.ts b/src/mcpd/src/services/health-probe.service.ts index 7ede791..ae68331 100644 --- a/src/mcpd/src/services/health-probe.service.ts +++ b/src/mcpd/src/services/health-probe.service.ts @@ -185,24 +185,35 @@ export class HealthProbeRunner { const containerInfo = await this.orchestrator.inspectContainer(instance.containerId); const containerPort = (server.containerPort as number | null) ?? 3000; - let url: string; + let baseUrl: string; if (containerInfo.ip) { - url = `http://${containerInfo.ip}:${containerPort}`; + baseUrl = `http://${containerInfo.ip}:${containerPort}`; } else if (instance.port) { - url = `http://localhost:${instance.port}`; + baseUrl = `http://localhost:${instance.port}`; } else { return { healthy: false, latencyMs: 0, message: 'No container IP or port' }; } - const start = Date.now(); + if (server.transport === 'SSE') { + return this.probeSse(baseUrl, healthCheck, timeoutMs); + } + return this.probeStreamableHttp(baseUrl, healthCheck, timeoutMs); + } - // For HTTP servers, we need to initialize a session first, then call the tool + /** + * Probe a streamable-http MCP server (POST to root endpoint). + */ + private async probeStreamableHttp( + baseUrl: string, + healthCheck: HealthCheckSpec, + timeoutMs: number, + ): Promise { + const start = Date.now(); const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), timeoutMs); try { - // Initialize - const initResp = await fetch(url, { + const initResp = await fetch(baseUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' }, body: JSON.stringify({ @@ -220,15 +231,13 @@ export class HealthProbeRunner { const headers: Record = { 'Content-Type': 'application/json', 'Accept': 'application/json, text/event-stream' }; if (sessionId) headers['Mcp-Session-Id'] = sessionId; - // Send initialized notification - await fetch(url, { + await fetch(baseUrl, { method: 'POST', headers, body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }), signal: controller.signal, }); - // Call health check tool - const toolResp = await fetch(url, { + const toolResp = await fetch(baseUrl, { method: 'POST', headers, body: JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'tools/call', @@ -244,7 +253,6 @@ export class HealthProbeRunner { } const body = await toolResp.text(); - // Check for JSON-RPC error in response try { const parsed = JSON.parse(body.includes('data: ') ? body.split('data: ')[1]!.split('\n')[0]! : body); if (parsed.error) { @@ -260,6 +268,146 @@ export class HealthProbeRunner { } } + /** + * Probe an SSE-transport MCP server. + * SSE protocol: GET /sse → endpoint event → POST /messages?session_id=... + */ + private async probeSse( + baseUrl: string, + healthCheck: HealthCheckSpec, + timeoutMs: number, + ): Promise { + const start = Date.now(); + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + + try { + // 1. Connect to SSE endpoint to get the message URL + const sseResp = await fetch(`${baseUrl}/sse`, { + method: 'GET', + headers: { 'Accept': 'text/event-stream' }, + signal: controller.signal, + }); + + if (!sseResp.ok) { + return { healthy: false, latencyMs: Date.now() - start, message: `SSE connect HTTP ${sseResp.status}` }; + } + + // 2. Read the SSE stream to find the endpoint event + const reader = sseResp.body?.getReader(); + if (!reader) { + return { healthy: false, latencyMs: Date.now() - start, message: 'No SSE stream body' }; + } + + const decoder = new TextDecoder(); + let buffer = ''; + let messagesUrl = ''; + + // Read until we get the endpoint event + while (!messagesUrl) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + + for (const line of buffer.split('\n')) { + if (line.startsWith('data: ') && buffer.includes('event: endpoint')) { + const endpoint = line.slice(6).trim(); + // Endpoint may be relative (e.g., /messages?session_id=...) or absolute + messagesUrl = endpoint.startsWith('http') ? endpoint : `${baseUrl}${endpoint}`; + } + } + // Keep only the last incomplete line + const lines = buffer.split('\n'); + buffer = lines[lines.length - 1] ?? ''; + } + + if (!messagesUrl) { + reader.cancel(); + return { healthy: false, latencyMs: Date.now() - start, message: 'No endpoint event from SSE' }; + } + + // 3. Initialize via the messages endpoint + const postHeaders = { 'Content-Type': 'application/json' }; + + const initResp = await fetch(messagesUrl, { + method: 'POST', headers: postHeaders, + body: JSON.stringify({ + jsonrpc: '2.0', id: 1, method: 'initialize', + params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'mcpctl-health', version: '0.1.0' } }, + }), + signal: controller.signal, + }); + + if (!initResp.ok) { + reader.cancel(); + return { healthy: false, latencyMs: Date.now() - start, message: `Initialize HTTP ${initResp.status}` }; + } + + // 4. Send initialized notification + await fetch(messagesUrl, { + method: 'POST', headers: postHeaders, + body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }), + signal: controller.signal, + }); + + // 5. Call health check tool + const toolResp = await fetch(messagesUrl, { + method: 'POST', headers: postHeaders, + body: JSON.stringify({ + jsonrpc: '2.0', id: 2, method: 'tools/call', + params: { name: healthCheck.tool, arguments: healthCheck.arguments ?? {} }, + }), + signal: controller.signal, + }); + + const latencyMs = Date.now() - start; + + // 6. Read tool response from SSE stream + // The response comes back on the SSE stream, not the POST response + let responseBuffer = ''; + const readTimeout = setTimeout(() => reader.cancel(), 5000); + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + responseBuffer += decoder.decode(value, { stream: true }); + + // Look for data lines containing our response (id: 2) + for (const line of responseBuffer.split('\n')) { + if (line.startsWith('data: ')) { + try { + const parsed = JSON.parse(line.slice(6)); + if (parsed.id === 2) { + clearTimeout(readTimeout); + reader.cancel(); + if (parsed.error) { + return { healthy: false, latencyMs, message: parsed.error.message ?? 'Tool call error' }; + } + return { healthy: true, latencyMs, message: 'ok' }; + } + } catch { + // Not valid JSON, skip + } + } + } + const respLines = responseBuffer.split('\n'); + responseBuffer = respLines[respLines.length - 1] ?? ''; + } + + clearTimeout(readTimeout); + reader.cancel(); + + // If POST response itself was ok (202 for SSE), consider it healthy + if (toolResp.ok) { + return { healthy: true, latencyMs, message: 'ok' }; + } + + return { healthy: false, latencyMs, message: `Tool call HTTP ${toolResp.status}` }; + } finally { + clearTimeout(timer); + } + } + /** * Probe a STDIO MCP server by running `docker exec` with a disposable Node.js * script that pipes JSON-RPC messages into the package binary.