feat: add external MCP server support with streamable-http proxy

Support non-containerized MCP servers via externalUrl field and add
streamable-http session management for HA MCP proof of concept.

- Add externalUrl, command, containerPort fields to McpServer schema
- Skip Docker orchestration for external servers (virtual instances)
- Implement streamable-http proxy with Mcp-Session-Id session management
- Parse SSE-framed responses from streamable-http endpoints
- Add command passthrough to Docker container creation
- Create HA MCP example manifest (examples/ha-mcp.yaml)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Michal
2026-02-22 12:21:25 +00:00
parent 46e07e4515
commit 5d13a0c562
12 changed files with 417 additions and 8 deletions

View File

@@ -69,7 +69,7 @@ async function main(): Promise<void> {
const backupService = new BackupService(serverRepo, profileRepo, projectRepo);
const restoreService = new RestoreService(serverRepo, profileRepo, projectRepo);
const authService = new AuthService(prisma);
const mcpProxyService = new McpProxyService(instanceRepo);
const mcpProxyService = new McpProxyService(instanceRepo, serverRepo);
// Server
const app = await createServer(config, {

View File

@@ -1,4 +1,4 @@
import type { PrismaClient, McpServer } from '@prisma/client';
import { type PrismaClient, type McpServer, Prisma } from '@prisma/client';
import type { IMcpServerRepository } from './interfaces.js';
import type { CreateMcpServerInput, UpdateMcpServerInput } from '../validation/mcp-server.schema.js';
@@ -26,6 +26,9 @@ export class McpServerRepository implements IMcpServerRepository {
dockerImage: data.dockerImage ?? null,
transport: data.transport,
repositoryUrl: data.repositoryUrl ?? null,
externalUrl: data.externalUrl ?? null,
command: data.command ?? Prisma.DbNull,
containerPort: data.containerPort ?? null,
envTemplate: data.envTemplate,
},
});
@@ -38,6 +41,9 @@ export class McpServerRepository implements IMcpServerRepository {
if (data.dockerImage !== undefined) updateData['dockerImage'] = data.dockerImage;
if (data.transport !== undefined) updateData['transport'] = data.transport;
if (data.repositoryUrl !== undefined) updateData['repositoryUrl'] = data.repositoryUrl;
if (data.externalUrl !== undefined) updateData['externalUrl'] = data.externalUrl;
if (data.command !== undefined) updateData['command'] = data.command;
if (data.containerPort !== undefined) updateData['containerPort'] = data.containerPort;
if (data.envTemplate !== undefined) updateData['envTemplate'] = data.envTemplate;
return this.prisma.mcpServer.update({ where: { id }, data: updateData });

View File

@@ -74,7 +74,7 @@ export class DockerContainerManager implements McpOrchestrator {
? Object.entries(spec.env).map(([k, v]) => `${k}=${v}`)
: undefined;
const container = await this.docker.createContainer({
const createOpts: Docker.ContainerCreateOptions = {
Image: spec.image,
name: spec.name,
Env: envArr,
@@ -86,7 +86,12 @@ export class DockerContainerManager implements McpOrchestrator {
NanoCpus: nanoCpus,
NetworkMode: spec.network ?? 'bridge',
},
});
};
if (spec.command) {
createOpts.Cmd = spec.command;
}
const container = await this.docker.createContainer(createOpts);
await container.start();

View File

@@ -32,6 +32,15 @@ export class InstanceService {
const server = await this.serverRepo.findById(serverId);
if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`);
// External servers don't need container management
if (server.externalUrl) {
return this.instanceRepo.create({
serverId,
status: 'RUNNING',
metadata: { external: true, url: server.externalUrl },
});
}
const image = server.dockerImage ?? server.packageName ?? server.name;
// Create DB record first in STARTING state
@@ -51,7 +60,11 @@ export class InstanceService {
},
};
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') {
spec.containerPort = 3000;
spec.containerPort = server.containerPort ?? 3000;
}
const command = server.command as string[] | null;
if (command) {
spec.command = command;
}
if (opts?.env) {
spec.env = opts.env;

View File

@@ -1,5 +1,5 @@
import type { McpInstance } from '@prisma/client';
import type { IMcpInstanceRepository } from '../repositories/interfaces.js';
import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js';
import { NotFoundError } from './mcp-server.service.js';
import { InvalidStateError } from './instance.service.js';
@@ -16,11 +16,39 @@ export interface McpProxyResponse {
error?: { code: number; message: string; data?: unknown };
}
/**
* Parses a streamable-http SSE response body to extract the JSON-RPC payload.
* Streamable-http returns `event: message\ndata: {...}\n\n` format.
*/
function parseStreamableResponse(body: string): McpProxyResponse {
for (const line of body.split('\n')) {
const trimmed = line.trim();
if (trimmed.startsWith('data: ')) {
return JSON.parse(trimmed.slice(6)) as McpProxyResponse;
}
}
// If body is plain JSON (no SSE framing), parse directly
return JSON.parse(body) as McpProxyResponse;
}
export class McpProxyService {
constructor(private readonly instanceRepo: IMcpInstanceRepository) {}
/** Session IDs per server for streamable-http protocol */
private sessions = new Map<string, string>();
constructor(
private readonly instanceRepo: IMcpInstanceRepository,
private readonly serverRepo: IMcpServerRepository,
) {}
async execute(request: McpProxyRequest): Promise<McpProxyResponse> {
// Find a running instance for this server
const server = await this.serverRepo.findById(request.serverId);
// External server: proxy directly to externalUrl
if (server?.externalUrl) {
return this.sendToExternal(server.id, server.externalUrl, request.method, request.params);
}
// Managed server: find running instance
const instances = await this.instanceRepo.findAll(request.serverId);
const running = instances.find((i) => i.status === 'RUNNING');
@@ -37,6 +65,116 @@ export class McpProxyService {
return this.sendJsonRpc(running, request.method, request.params);
}
/**
* Send a JSON-RPC request to an external MCP server.
* Handles streamable-http protocol (session management + SSE response parsing).
*/
private async sendToExternal(
serverId: string,
url: string,
method: string,
params?: Record<string, unknown>,
): Promise<McpProxyResponse> {
// Ensure we have a session (initialize on first call)
if (!this.sessions.has(serverId)) {
await this.initSession(serverId, url);
}
const sessionId = this.sessions.get(serverId);
const body: Record<string, unknown> = {
jsonrpc: '2.0',
id: 1,
method,
};
if (params !== undefined) {
body.params = params;
}
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
};
if (sessionId) {
headers['Mcp-Session-Id'] = sessionId;
}
const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(body),
});
if (!response.ok) {
// Session expired? Clear and retry once
if (response.status === 400 || response.status === 404) {
this.sessions.delete(serverId);
return this.sendToExternal(serverId, url, method, params);
}
return {
jsonrpc: '2.0',
id: 1,
error: {
code: -32000,
message: `External MCP server returned HTTP ${response.status}: ${response.statusText}`,
},
};
}
const text = await response.text();
return parseStreamableResponse(text);
}
/**
* Initialize a streamable-http session with an external server.
* Sends `initialize` and `notifications/initialized`, caches the session ID.
*/
private async initSession(serverId: string, url: string): Promise<void> {
const initBody = {
jsonrpc: '2.0',
id: 1,
method: 'initialize',
params: {
protocolVersion: '2025-03-26',
capabilities: {},
clientInfo: { name: 'mcpctl', version: '0.1.0' },
},
};
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
},
body: JSON.stringify(initBody),
});
if (!response.ok) {
throw new Error(`Failed to initialize session: HTTP ${response.status}`);
}
const sessionId = response.headers.get('mcp-session-id');
if (sessionId) {
this.sessions.set(serverId, sessionId);
}
// Send notifications/initialized
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
};
if (sessionId) {
headers['Mcp-Session-Id'] = sessionId;
}
await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }),
});
}
private async sendJsonRpc(
instance: McpInstance,
method: string,

View File

@@ -7,6 +7,8 @@ export interface ContainerSpec {
image: string;
/** Human-readable name (used as container name prefix) */
name: string;
/** Custom command to run (overrides image CMD) */
command?: string[];
/** Environment variables */
env?: Record<string, string>;
/** Host port to bind (null = auto-assign) */

View File

@@ -14,6 +14,9 @@ export const CreateMcpServerSchema = z.object({
dockerImage: z.string().max(200).optional(),
transport: z.enum(['STDIO', 'SSE', 'STREAMABLE_HTTP']).default('STDIO'),
repositoryUrl: z.string().url().optional(),
externalUrl: z.string().url().optional(),
command: z.array(z.string()).optional(),
containerPort: z.number().int().min(1).max(65535).optional(),
envTemplate: z.array(EnvTemplateEntrySchema).default([]),
});
@@ -23,6 +26,9 @@ export const UpdateMcpServerSchema = z.object({
dockerImage: z.string().max(200).nullable().optional(),
transport: z.enum(['STDIO', 'SSE', 'STREAMABLE_HTTP']).optional(),
repositoryUrl: z.string().url().nullable().optional(),
externalUrl: z.string().url().nullable().optional(),
command: z.array(z.string()).nullable().optional(),
containerPort: z.number().int().min(1).max(65535).nullable().optional(),
envTemplate: z.array(EnvTemplateEntrySchema).optional(),
});