diff --git a/examples/ha-mcp.yaml b/examples/ha-mcp.yaml new file mode 100644 index 0000000..c4a03ad --- /dev/null +++ b/examples/ha-mcp.yaml @@ -0,0 +1,26 @@ +servers: + - name: ha-mcp + description: "Home Assistant MCP - smart home control via MCP" + dockerImage: "ghcr.io/homeassistant-ai/ha-mcp:2.4" + transport: STREAMABLE_HTTP + containerPort: 3000 + # For mcpd-managed containers: + command: + - python + - "-c" + - "from ha_mcp.server import HomeAssistantSmartMCPServer; s = HomeAssistantSmartMCPServer(); s.mcp.run(transport='sse', host='0.0.0.0', port=3000)" + # For connecting to an already-running instance (host.containers.internal for container-to-host): + externalUrl: "http://host.containers.internal:8086/mcp" + envTemplate: + - name: HOMEASSISTANT_URL + description: "Home Assistant instance URL (e.g. https://ha.example.com)" + - name: HOMEASSISTANT_TOKEN + description: "Home Assistant long-lived access token" + isSecret: true + +profiles: + - name: production + server: ha-mcp + envOverrides: + HOMEASSISTANT_URL: "https://ha.itaz.eu" + HOMEASSISTANT_TOKEN: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiIyNjFlZTRhOWI2MGM0YTllOGJkNTIxN2Q3YmVmZDkzNSIsImlhdCI6MTc3MDA3NjYzOCwiZXhwIjoyMDg1NDM2NjM4fQ.17mAQxIrCBrQx3ogqAUetwEt-cngRmJiH-e7sLt-3FY" diff --git a/src/cli/src/commands/apply.ts b/src/cli/src/commands/apply.ts index 57fb1b4..8a7007f 100644 --- a/src/cli/src/commands/apply.ts +++ b/src/cli/src/commands/apply.ts @@ -11,6 +11,9 @@ const ServerSpecSchema = z.object({ dockerImage: z.string().optional(), transport: z.enum(['STDIO', 'SSE', 'STREAMABLE_HTTP']).default('STDIO'), repositoryUrl: z.string().url().optional(), + externalUrl: z.string().url().optional(), + command: z.array(z.string()).optional(), + containerPort: z.number().int().min(1).max(65535).optional(), envTemplate: z.array(z.object({ name: z.string(), description: z.string().default(''), diff --git a/src/db/prisma/migrations/20260222121228_add_external_url_command_port/migration.sql b/src/db/prisma/migrations/20260222121228_add_external_url_command_port/migration.sql new file mode 100644 index 0000000..33de3ce --- /dev/null +++ b/src/db/prisma/migrations/20260222121228_add_external_url_command_port/migration.sql @@ -0,0 +1,204 @@ +-- CreateEnum +CREATE TYPE "Role" AS ENUM ('USER', 'ADMIN'); + +-- CreateEnum +CREATE TYPE "Transport" AS ENUM ('STDIO', 'SSE', 'STREAMABLE_HTTP'); + +-- CreateEnum +CREATE TYPE "InstanceStatus" AS ENUM ('STARTING', 'RUNNING', 'STOPPING', 'STOPPED', 'ERROR'); + +-- CreateTable +CREATE TABLE "User" ( + "id" TEXT NOT NULL, + "email" TEXT NOT NULL, + "name" TEXT, + "passwordHash" TEXT NOT NULL, + "role" "Role" NOT NULL DEFAULT 'USER', + "version" INTEGER NOT NULL DEFAULT 1, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "User_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "Session" ( + "id" TEXT NOT NULL, + "token" TEXT NOT NULL, + "userId" TEXT NOT NULL, + "expiresAt" TIMESTAMP(3) NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "Session_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "McpServer" ( + "id" TEXT NOT NULL, + "name" TEXT NOT NULL, + "description" TEXT NOT NULL DEFAULT '', + "packageName" TEXT, + "dockerImage" TEXT, + "transport" "Transport" NOT NULL DEFAULT 'STDIO', + "repositoryUrl" TEXT, + "externalUrl" TEXT, + "command" JSONB, + "containerPort" INTEGER, + "envTemplate" JSONB NOT NULL DEFAULT '[]', + "version" INTEGER NOT NULL DEFAULT 1, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "McpServer_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "McpProfile" ( + "id" TEXT NOT NULL, + "name" TEXT NOT NULL, + "serverId" TEXT NOT NULL, + "permissions" JSONB NOT NULL DEFAULT '[]', + "envOverrides" JSONB NOT NULL DEFAULT '{}', + "version" INTEGER NOT NULL DEFAULT 1, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "McpProfile_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "Project" ( + "id" TEXT NOT NULL, + "name" TEXT NOT NULL, + "description" TEXT NOT NULL DEFAULT '', + "ownerId" TEXT NOT NULL, + "version" INTEGER NOT NULL DEFAULT 1, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Project_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "ProjectMcpProfile" ( + "id" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "profileId" TEXT NOT NULL, + + CONSTRAINT "ProjectMcpProfile_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "McpInstance" ( + "id" TEXT NOT NULL, + "serverId" TEXT NOT NULL, + "containerId" TEXT, + "status" "InstanceStatus" NOT NULL DEFAULT 'STOPPED', + "port" INTEGER, + "metadata" JSONB NOT NULL DEFAULT '{}', + "version" INTEGER NOT NULL DEFAULT 1, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "McpInstance_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "AuditLog" ( + "id" TEXT NOT NULL, + "userId" TEXT NOT NULL, + "action" TEXT NOT NULL, + "resource" TEXT NOT NULL, + "resourceId" TEXT, + "details" JSONB NOT NULL DEFAULT '{}', + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "AuditLog_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "User_email_key" ON "User"("email"); + +-- CreateIndex +CREATE INDEX "User_email_idx" ON "User"("email"); + +-- CreateIndex +CREATE UNIQUE INDEX "Session_token_key" ON "Session"("token"); + +-- CreateIndex +CREATE INDEX "Session_token_idx" ON "Session"("token"); + +-- CreateIndex +CREATE INDEX "Session_userId_idx" ON "Session"("userId"); + +-- CreateIndex +CREATE INDEX "Session_expiresAt_idx" ON "Session"("expiresAt"); + +-- CreateIndex +CREATE UNIQUE INDEX "McpServer_name_key" ON "McpServer"("name"); + +-- CreateIndex +CREATE INDEX "McpServer_name_idx" ON "McpServer"("name"); + +-- CreateIndex +CREATE INDEX "McpProfile_serverId_idx" ON "McpProfile"("serverId"); + +-- CreateIndex +CREATE UNIQUE INDEX "McpProfile_name_serverId_key" ON "McpProfile"("name", "serverId"); + +-- CreateIndex +CREATE UNIQUE INDEX "Project_name_key" ON "Project"("name"); + +-- CreateIndex +CREATE INDEX "Project_name_idx" ON "Project"("name"); + +-- CreateIndex +CREATE INDEX "Project_ownerId_idx" ON "Project"("ownerId"); + +-- CreateIndex +CREATE INDEX "ProjectMcpProfile_projectId_idx" ON "ProjectMcpProfile"("projectId"); + +-- CreateIndex +CREATE INDEX "ProjectMcpProfile_profileId_idx" ON "ProjectMcpProfile"("profileId"); + +-- CreateIndex +CREATE UNIQUE INDEX "ProjectMcpProfile_projectId_profileId_key" ON "ProjectMcpProfile"("projectId", "profileId"); + +-- CreateIndex +CREATE INDEX "McpInstance_serverId_idx" ON "McpInstance"("serverId"); + +-- CreateIndex +CREATE INDEX "McpInstance_status_idx" ON "McpInstance"("status"); + +-- CreateIndex +CREATE INDEX "AuditLog_userId_idx" ON "AuditLog"("userId"); + +-- CreateIndex +CREATE INDEX "AuditLog_action_idx" ON "AuditLog"("action"); + +-- CreateIndex +CREATE INDEX "AuditLog_resource_idx" ON "AuditLog"("resource"); + +-- CreateIndex +CREATE INDEX "AuditLog_createdAt_idx" ON "AuditLog"("createdAt"); + +-- AddForeignKey +ALTER TABLE "Session" ADD CONSTRAINT "Session_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "McpProfile" ADD CONSTRAINT "McpProfile_serverId_fkey" FOREIGN KEY ("serverId") REFERENCES "McpServer"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Project" ADD CONSTRAINT "Project_ownerId_fkey" FOREIGN KEY ("ownerId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "ProjectMcpProfile" ADD CONSTRAINT "ProjectMcpProfile_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "Project"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "ProjectMcpProfile" ADD CONSTRAINT "ProjectMcpProfile_profileId_fkey" FOREIGN KEY ("profileId") REFERENCES "McpProfile"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "McpInstance" ADD CONSTRAINT "McpInstance_serverId_fkey" FOREIGN KEY ("serverId") REFERENCES "McpServer"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "AuditLog" ADD CONSTRAINT "AuditLog_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/src/db/prisma/migrations/migration_lock.toml b/src/db/prisma/migrations/migration_lock.toml new file mode 100644 index 0000000..044d57c --- /dev/null +++ b/src/db/prisma/migrations/migration_lock.toml @@ -0,0 +1,3 @@ +# Please do not edit this file manually +# It should be added in your version-control system (e.g., Git) +provider = "postgresql" diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index 1c79b6a..e5f5657 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -57,6 +57,9 @@ model McpServer { dockerImage String? transport Transport @default(STDIO) repositoryUrl String? + externalUrl String? + command Json? + containerPort Int? envTemplate Json @default("[]") version Int @default(1) createdAt DateTime @default(now()) diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 8ec1eee..5fc53a7 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -69,7 +69,7 @@ async function main(): Promise { const backupService = new BackupService(serverRepo, profileRepo, projectRepo); const restoreService = new RestoreService(serverRepo, profileRepo, projectRepo); const authService = new AuthService(prisma); - const mcpProxyService = new McpProxyService(instanceRepo); + const mcpProxyService = new McpProxyService(instanceRepo, serverRepo); // Server const app = await createServer(config, { diff --git a/src/mcpd/src/repositories/mcp-server.repository.ts b/src/mcpd/src/repositories/mcp-server.repository.ts index 92a031a..893e0a0 100644 --- a/src/mcpd/src/repositories/mcp-server.repository.ts +++ b/src/mcpd/src/repositories/mcp-server.repository.ts @@ -1,4 +1,4 @@ -import type { PrismaClient, McpServer } from '@prisma/client'; +import { type PrismaClient, type McpServer, Prisma } from '@prisma/client'; import type { IMcpServerRepository } from './interfaces.js'; import type { CreateMcpServerInput, UpdateMcpServerInput } from '../validation/mcp-server.schema.js'; @@ -26,6 +26,9 @@ export class McpServerRepository implements IMcpServerRepository { dockerImage: data.dockerImage ?? null, transport: data.transport, repositoryUrl: data.repositoryUrl ?? null, + externalUrl: data.externalUrl ?? null, + command: data.command ?? Prisma.DbNull, + containerPort: data.containerPort ?? null, envTemplate: data.envTemplate, }, }); @@ -38,6 +41,9 @@ export class McpServerRepository implements IMcpServerRepository { if (data.dockerImage !== undefined) updateData['dockerImage'] = data.dockerImage; if (data.transport !== undefined) updateData['transport'] = data.transport; if (data.repositoryUrl !== undefined) updateData['repositoryUrl'] = data.repositoryUrl; + if (data.externalUrl !== undefined) updateData['externalUrl'] = data.externalUrl; + if (data.command !== undefined) updateData['command'] = data.command; + if (data.containerPort !== undefined) updateData['containerPort'] = data.containerPort; if (data.envTemplate !== undefined) updateData['envTemplate'] = data.envTemplate; return this.prisma.mcpServer.update({ where: { id }, data: updateData }); diff --git a/src/mcpd/src/services/docker/container-manager.ts b/src/mcpd/src/services/docker/container-manager.ts index 28443ab..76797e8 100644 --- a/src/mcpd/src/services/docker/container-manager.ts +++ b/src/mcpd/src/services/docker/container-manager.ts @@ -74,7 +74,7 @@ export class DockerContainerManager implements McpOrchestrator { ? Object.entries(spec.env).map(([k, v]) => `${k}=${v}`) : undefined; - const container = await this.docker.createContainer({ + const createOpts: Docker.ContainerCreateOptions = { Image: spec.image, name: spec.name, Env: envArr, @@ -86,7 +86,12 @@ export class DockerContainerManager implements McpOrchestrator { NanoCpus: nanoCpus, NetworkMode: spec.network ?? 'bridge', }, - }); + }; + if (spec.command) { + createOpts.Cmd = spec.command; + } + + const container = await this.docker.createContainer(createOpts); await container.start(); diff --git a/src/mcpd/src/services/instance.service.ts b/src/mcpd/src/services/instance.service.ts index 3eb25b4..f156b4b 100644 --- a/src/mcpd/src/services/instance.service.ts +++ b/src/mcpd/src/services/instance.service.ts @@ -32,6 +32,15 @@ export class InstanceService { const server = await this.serverRepo.findById(serverId); if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`); + // External servers don't need container management + if (server.externalUrl) { + return this.instanceRepo.create({ + serverId, + status: 'RUNNING', + metadata: { external: true, url: server.externalUrl }, + }); + } + const image = server.dockerImage ?? server.packageName ?? server.name; // Create DB record first in STARTING state @@ -51,7 +60,11 @@ export class InstanceService { }, }; if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') { - spec.containerPort = 3000; + spec.containerPort = server.containerPort ?? 3000; + } + const command = server.command as string[] | null; + if (command) { + spec.command = command; } if (opts?.env) { spec.env = opts.env; diff --git a/src/mcpd/src/services/mcp-proxy-service.ts b/src/mcpd/src/services/mcp-proxy-service.ts index a3d93af..dfee93d 100644 --- a/src/mcpd/src/services/mcp-proxy-service.ts +++ b/src/mcpd/src/services/mcp-proxy-service.ts @@ -1,5 +1,5 @@ import type { McpInstance } from '@prisma/client'; -import type { IMcpInstanceRepository } from '../repositories/interfaces.js'; +import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js'; import { NotFoundError } from './mcp-server.service.js'; import { InvalidStateError } from './instance.service.js'; @@ -16,11 +16,39 @@ export interface McpProxyResponse { error?: { code: number; message: string; data?: unknown }; } +/** + * Parses a streamable-http SSE response body to extract the JSON-RPC payload. + * Streamable-http returns `event: message\ndata: {...}\n\n` format. + */ +function parseStreamableResponse(body: string): McpProxyResponse { + for (const line of body.split('\n')) { + const trimmed = line.trim(); + if (trimmed.startsWith('data: ')) { + return JSON.parse(trimmed.slice(6)) as McpProxyResponse; + } + } + // If body is plain JSON (no SSE framing), parse directly + return JSON.parse(body) as McpProxyResponse; +} + export class McpProxyService { - constructor(private readonly instanceRepo: IMcpInstanceRepository) {} + /** Session IDs per server for streamable-http protocol */ + private sessions = new Map(); + + constructor( + private readonly instanceRepo: IMcpInstanceRepository, + private readonly serverRepo: IMcpServerRepository, + ) {} async execute(request: McpProxyRequest): Promise { - // Find a running instance for this server + const server = await this.serverRepo.findById(request.serverId); + + // External server: proxy directly to externalUrl + if (server?.externalUrl) { + return this.sendToExternal(server.id, server.externalUrl, request.method, request.params); + } + + // Managed server: find running instance const instances = await this.instanceRepo.findAll(request.serverId); const running = instances.find((i) => i.status === 'RUNNING'); @@ -37,6 +65,116 @@ export class McpProxyService { return this.sendJsonRpc(running, request.method, request.params); } + /** + * Send a JSON-RPC request to an external MCP server. + * Handles streamable-http protocol (session management + SSE response parsing). + */ + private async sendToExternal( + serverId: string, + url: string, + method: string, + params?: Record, + ): Promise { + // Ensure we have a session (initialize on first call) + if (!this.sessions.has(serverId)) { + await this.initSession(serverId, url); + } + + const sessionId = this.sessions.get(serverId); + + const body: Record = { + jsonrpc: '2.0', + id: 1, + method, + }; + if (params !== undefined) { + body.params = params; + } + + const headers: Record = { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream', + }; + if (sessionId) { + headers['Mcp-Session-Id'] = sessionId; + } + + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + + if (!response.ok) { + // Session expired? Clear and retry once + if (response.status === 400 || response.status === 404) { + this.sessions.delete(serverId); + return this.sendToExternal(serverId, url, method, params); + } + return { + jsonrpc: '2.0', + id: 1, + error: { + code: -32000, + message: `External MCP server returned HTTP ${response.status}: ${response.statusText}`, + }, + }; + } + + const text = await response.text(); + return parseStreamableResponse(text); + } + + /** + * Initialize a streamable-http session with an external server. + * Sends `initialize` and `notifications/initialized`, caches the session ID. + */ + private async initSession(serverId: string, url: string): Promise { + const initBody = { + jsonrpc: '2.0', + id: 1, + method: 'initialize', + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { name: 'mcpctl', version: '0.1.0' }, + }, + }; + + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream', + }, + body: JSON.stringify(initBody), + }); + + if (!response.ok) { + throw new Error(`Failed to initialize session: HTTP ${response.status}`); + } + + const sessionId = response.headers.get('mcp-session-id'); + if (sessionId) { + this.sessions.set(serverId, sessionId); + } + + // Send notifications/initialized + const headers: Record = { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream', + }; + if (sessionId) { + headers['Mcp-Session-Id'] = sessionId; + } + + await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify({ jsonrpc: '2.0', method: 'notifications/initialized' }), + }); + } + private async sendJsonRpc( instance: McpInstance, method: string, diff --git a/src/mcpd/src/services/orchestrator.ts b/src/mcpd/src/services/orchestrator.ts index ef906a8..66147e6 100644 --- a/src/mcpd/src/services/orchestrator.ts +++ b/src/mcpd/src/services/orchestrator.ts @@ -7,6 +7,8 @@ export interface ContainerSpec { image: string; /** Human-readable name (used as container name prefix) */ name: string; + /** Custom command to run (overrides image CMD) */ + command?: string[]; /** Environment variables */ env?: Record; /** Host port to bind (null = auto-assign) */ diff --git a/src/mcpd/src/validation/mcp-server.schema.ts b/src/mcpd/src/validation/mcp-server.schema.ts index 1a2e217..f6c87cc 100644 --- a/src/mcpd/src/validation/mcp-server.schema.ts +++ b/src/mcpd/src/validation/mcp-server.schema.ts @@ -14,6 +14,9 @@ export const CreateMcpServerSchema = z.object({ dockerImage: z.string().max(200).optional(), transport: z.enum(['STDIO', 'SSE', 'STREAMABLE_HTTP']).default('STDIO'), repositoryUrl: z.string().url().optional(), + externalUrl: z.string().url().optional(), + command: z.array(z.string()).optional(), + containerPort: z.number().int().min(1).max(65535).optional(), envTemplate: z.array(EnvTemplateEntrySchema).default([]), }); @@ -23,6 +26,9 @@ export const UpdateMcpServerSchema = z.object({ dockerImage: z.string().max(200).nullable().optional(), transport: z.enum(['STDIO', 'SSE', 'STREAMABLE_HTTP']).optional(), repositoryUrl: z.string().url().nullable().optional(), + externalUrl: z.string().url().nullable().optional(), + command: z.array(z.string()).nullable().optional(), + containerPort: z.number().int().min(1).max(65535).nullable().optional(), envTemplate: z.array(EnvTemplateEntrySchema).optional(), }); diff --git a/src/mcpd/tests/mcp-server-flow.test.ts b/src/mcpd/tests/mcp-server-flow.test.ts new file mode 100644 index 0000000..0ae6d11 --- /dev/null +++ b/src/mcpd/tests/mcp-server-flow.test.ts @@ -0,0 +1,753 @@ +import { describe, it, expect, vi, beforeAll, afterAll, beforeEach } from 'vitest'; +import Fastify from 'fastify'; +import type { FastifyInstance } from 'fastify'; +import http from 'node:http'; +import { McpServerService } from '../src/services/mcp-server.service.js'; +import { InstanceService } from '../src/services/instance.service.js'; +import { McpProxyService } from '../src/services/mcp-proxy-service.js'; +import { AuditLogService } from '../src/services/audit-log.service.js'; +import { errorHandler } from '../src/middleware/error-handler.js'; +import { registerMcpServerRoutes } from '../src/routes/mcp-servers.js'; +import { registerInstanceRoutes } from '../src/routes/instances.js'; +import { registerMcpProxyRoutes } from '../src/routes/mcp-proxy.js'; +import type { + IMcpServerRepository, + IMcpInstanceRepository, + IAuditLogRepository, +} from '../src/repositories/interfaces.js'; +import type { McpOrchestrator } from '../src/services/orchestrator.js'; +import type { McpServer, McpInstance, InstanceStatus } from '@prisma/client'; + +// --------------------------------------------------------------------------- +// In-memory repository implementations (stateful mocks) +// --------------------------------------------------------------------------- + +function createInMemoryServerRepo(): IMcpServerRepository { + const servers = new Map(); + let nextId = 1; + + return { + findAll: vi.fn(async () => [...servers.values()]), + findById: vi.fn(async (id: string) => servers.get(id) ?? null), + findByName: vi.fn(async (name: string) => [...servers.values()].find((s) => s.name === name) ?? null), + create: vi.fn(async (data) => { + const id = `srv-${nextId++}`; + const server = { + id, + name: data.name, + description: data.description ?? '', + packageName: data.packageName ?? null, + dockerImage: data.dockerImage ?? null, + transport: data.transport ?? 'STDIO', + repositoryUrl: data.repositoryUrl ?? null, + externalUrl: data.externalUrl ?? null, + command: data.command ?? null, + containerPort: data.containerPort ?? null, + envTemplate: data.envTemplate ?? [], + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + } as McpServer; + servers.set(id, server); + return server; + }), + update: vi.fn(async (id: string, data) => { + const existing = servers.get(id); + if (!existing) throw new Error(`Server ${id} not found`); + const updated = { ...existing, ...data, updatedAt: new Date() } as McpServer; + servers.set(id, updated); + return updated; + }), + delete: vi.fn(async (id: string) => { + servers.delete(id); + }), + }; +} + +function createInMemoryInstanceRepo(): IMcpInstanceRepository { + const instances = new Map(); + let nextId = 1; + + return { + findAll: vi.fn(async (serverId?: string) => { + const all = [...instances.values()]; + return serverId ? all.filter((i) => i.serverId === serverId) : all; + }), + findById: vi.fn(async (id: string) => instances.get(id) ?? null), + findByContainerId: vi.fn(async (containerId: string) => + [...instances.values()].find((i) => i.containerId === containerId) ?? null, + ), + create: vi.fn(async (data) => { + const id = `inst-${nextId++}`; + const instance = { + id, + serverId: data.serverId, + containerId: data.containerId ?? null, + status: (data.status ?? 'STOPPED') as InstanceStatus, + port: data.port ?? null, + metadata: data.metadata ?? {}, + version: 1, + createdAt: new Date(), + updatedAt: new Date(), + } as McpInstance; + instances.set(id, instance); + return instance; + }), + updateStatus: vi.fn(async (id: string, status: InstanceStatus, fields?) => { + const existing = instances.get(id); + if (!existing) throw new Error(`Instance ${id} not found`); + const updated = { + ...existing, + status, + ...(fields?.containerId !== undefined ? { containerId: fields.containerId } : {}), + ...(fields?.port !== undefined ? { port: fields.port } : {}), + ...(fields?.metadata !== undefined ? { metadata: fields.metadata } : {}), + version: existing.version + 1, + updatedAt: new Date(), + } as McpInstance; + instances.set(id, updated); + return updated; + }), + delete: vi.fn(async (id: string) => { + instances.delete(id); + }), + }; +} + +function createInMemoryAuditLogRepo(): IAuditLogRepository { + const logs: Array<{ id: string; userId: string; action: string; resource: string; resourceId: string | null; details: Record; createdAt: Date }> = []; + let nextId = 1; + + return { + findAll: vi.fn(async () => logs as never[]), + findById: vi.fn(async (id: string) => (logs.find((l) => l.id === id) as never) ?? null), + create: vi.fn(async (data) => { + const log = { + id: `log-${nextId++}`, + userId: data.userId, + action: data.action, + resource: data.resource, + resourceId: data.resourceId ?? null, + details: data.details ?? {}, + createdAt: new Date(), + }; + logs.push(log); + return log as never; + }), + count: vi.fn(async () => logs.length), + deleteOlderThan: vi.fn(async () => 0), + }; +} + +function createMockOrchestrator(): McpOrchestrator { + let containerPort = 40000; + return { + ping: vi.fn(async () => true), + pullImage: vi.fn(async () => {}), + createContainer: vi.fn(async (spec) => ({ + containerId: `ctr-${spec.name}`, + name: spec.name, + state: 'running' as const, + port: spec.containerPort ?? ++containerPort, + createdAt: new Date(), + })), + stopContainer: vi.fn(async () => {}), + removeContainer: vi.fn(async () => {}), + inspectContainer: vi.fn(async (id) => ({ + containerId: id, + name: 'test', + state: 'running' as const, + createdAt: new Date(), + })), + getContainerLogs: vi.fn(async () => ({ stdout: '', stderr: '' })), + }; +} + +// --------------------------------------------------------------------------- +// Fake MCP server (streamable-http) +// --------------------------------------------------------------------------- + +function createFakeMcpServer(): { server: http.Server; getPort: () => number; requests: Array<{ method: string; body: unknown }> } { + const requests: Array<{ method: string; body: unknown }> = []; + let sessionCounter = 0; + + const server = http.createServer((req, res) => { + let body = ''; + req.on('data', (chunk) => (body += chunk)); + req.on('end', () => { + let parsed: { method?: string; id?: number; params?: unknown } = {}; + try { + parsed = JSON.parse(body); + } catch { + // notifications may not have id + } + + requests.push({ method: parsed.method ?? 'unknown', body: parsed }); + + if (parsed.method === 'initialize') { + const sessionId = `session-${++sessionCounter}`; + const response = { + jsonrpc: '2.0', + id: parsed.id, + result: { + protocolVersion: '2025-03-26', + capabilities: { tools: {} }, + serverInfo: { name: 'fake-mcp', version: '1.0.0' }, + }, + }; + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Mcp-Session-Id': sessionId, + }); + res.end(`event: message\ndata: ${JSON.stringify(response)}\n\n`); + return; + } + + if (parsed.method === 'notifications/initialized') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(''); + return; + } + + if (parsed.method === 'tools/list') { + const response = { + jsonrpc: '2.0', + id: parsed.id, + result: { + tools: [ + { name: 'ha_get_overview', description: 'Get Home Assistant overview', inputSchema: { type: 'object', properties: {} } }, + { name: 'ha_search_entities', description: 'Search HA entities', inputSchema: { type: 'object', properties: { query: { type: 'string' } } } }, + ], + }, + }; + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.end(`event: message\ndata: ${JSON.stringify(response)}\n\n`); + return; + } + + if (parsed.method === 'tools/call') { + const toolName = (parsed.params as { name?: string })?.name; + const response = { + jsonrpc: '2.0', + id: parsed.id, + result: { + content: [{ type: 'text', text: `Result from ${toolName}` }], + }, + }; + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.end(`event: message\ndata: ${JSON.stringify(response)}\n\n`); + return; + } + + // Default: echo back + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ jsonrpc: '2.0', id: parsed.id, result: {} })); + }); + }); + + let port = 0; + return { + server, + getPort: () => port, + requests, + ...{ + listen: () => + new Promise((resolve) => { + server.listen(0, () => { + const addr = server.address(); + if (addr && typeof addr === 'object') port = addr.port; + resolve(); + }); + }), + close: () => new Promise((resolve) => server.close(() => resolve())), + }, + } as ReturnType & { listen: () => Promise; close: () => Promise }; +} + +// --------------------------------------------------------------------------- +// Test app builder +// --------------------------------------------------------------------------- + +async function buildTestApp(deps: { + serverRepo: IMcpServerRepository; + instanceRepo: IMcpInstanceRepository; + auditLogRepo: IAuditLogRepository; + orchestrator: McpOrchestrator; +}): Promise { + const app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + + const serverService = new McpServerService(deps.serverRepo); + const instanceService = new InstanceService(deps.instanceRepo, deps.serverRepo, deps.orchestrator); + const proxyService = new McpProxyService(deps.instanceRepo, deps.serverRepo); + const auditLogService = new AuditLogService(deps.auditLogRepo); + + registerMcpServerRoutes(app, serverService); + registerInstanceRoutes(app, instanceService); + registerMcpProxyRoutes(app, { + mcpProxyService: proxyService, + auditLogService, + authDeps: { + findSession: async () => ({ userId: 'test-user', expiresAt: new Date(Date.now() + 3600_000) }), + }, + }); + + await app.ready(); + return app; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('MCP server full flow', () => { + let fakeMcp: ReturnType & { listen: () => Promise; close: () => Promise }; + let fakeMcpPort: number; + + beforeAll(async () => { + fakeMcp = createFakeMcpServer() as typeof fakeMcp; + await fakeMcp.listen(); + fakeMcpPort = fakeMcp.getPort(); + }); + + afterAll(async () => { + await fakeMcp.close(); + }); + + describe('external server flow (externalUrl)', () => { + let app: FastifyInstance; + let serverRepo: IMcpServerRepository; + let instanceRepo: IMcpInstanceRepository; + + beforeEach(async () => { + serverRepo = createInMemoryServerRepo(); + instanceRepo = createInMemoryInstanceRepo(); + app = await buildTestApp({ + serverRepo, + instanceRepo, + auditLogRepo: createInMemoryAuditLogRepo(), + orchestrator: createMockOrchestrator(), + }); + }); + + afterAll(async () => { + if (app) await app.close(); + }); + + it('registers server, starts virtual instance, and proxies tools/list', async () => { + // 1. Register external MCP server + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/servers', + payload: { + name: 'ha-mcp', + description: 'Home Assistant MCP', + transport: 'STREAMABLE_HTTP', + externalUrl: `http://localhost:${fakeMcpPort}`, + containerPort: 3000, + envTemplate: [ + { name: 'HOMEASSISTANT_TOKEN', description: 'HA token', isSecret: true }, + ], + }, + }); + + expect(createRes.statusCode).toBe(201); + const server = createRes.json<{ id: string; name: string; externalUrl: string }>(); + expect(server.name).toBe('ha-mcp'); + expect(server.externalUrl).toBe(`http://localhost:${fakeMcpPort}`); + + // 2. Verify server is listed + const listRes = await app.inject({ method: 'GET', url: '/api/v1/servers' }); + expect(listRes.statusCode).toBe(200); + const servers = listRes.json>(); + expect(servers).toHaveLength(1); + expect(servers[0]!.name).toBe('ha-mcp'); + + // 3. Start a virtual instance (external server — no Docker) + const startRes = await app.inject({ + method: 'POST', + url: '/api/v1/instances', + payload: { serverId: server.id }, + }); + + expect(startRes.statusCode).toBe(201); + const instance = startRes.json<{ id: string; status: string; containerId: string | null }>(); + expect(instance.status).toBe('RUNNING'); + expect(instance.containerId).toBeNull(); + + // 4. Proxy tools/list to the fake MCP server + const proxyRes = await app.inject({ + method: 'POST', + url: '/api/v1/mcp/proxy', + headers: { authorization: 'Bearer test-token' }, + payload: { + serverId: server.id, + method: 'tools/list', + }, + }); + + expect(proxyRes.statusCode).toBe(200); + const proxyBody = proxyRes.json<{ jsonrpc: string; result: { tools: Array<{ name: string }> } }>(); + expect(proxyBody.jsonrpc).toBe('2.0'); + expect(proxyBody.result.tools).toHaveLength(2); + expect(proxyBody.result.tools.map((t) => t.name)).toContain('ha_get_overview'); + expect(proxyBody.result.tools.map((t) => t.name)).toContain('ha_search_entities'); + + // 5. Verify the fake server received the protocol handshake + tools/list + const methods = fakeMcp.requests.map((r) => r.method); + expect(methods).toContain('initialize'); + expect(methods).toContain('notifications/initialized'); + expect(methods).toContain('tools/list'); + }); + + it('proxies tools/call with parameters', async () => { + // Register + start + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/servers', + payload: { + name: 'ha-mcp-call', + description: 'HA MCP for call test', + transport: 'STREAMABLE_HTTP', + externalUrl: `http://localhost:${fakeMcpPort}`, + }, + }); + const server = createRes.json<{ id: string }>(); + + await app.inject({ + method: 'POST', + url: '/api/v1/instances', + payload: { serverId: server.id }, + }); + + // Proxy tools/call + const proxyRes = await app.inject({ + method: 'POST', + url: '/api/v1/mcp/proxy', + headers: { authorization: 'Bearer test-token' }, + payload: { + serverId: server.id, + method: 'tools/call', + params: { name: 'ha_get_overview' }, + }, + }); + + expect(proxyRes.statusCode).toBe(200); + const body = proxyRes.json<{ result: { content: Array<{ text: string }> } }>(); + expect(body.result.content[0]!.text).toBe('Result from ha_get_overview'); + }); + }); + + describe('managed server flow (Docker)', () => { + let app: FastifyInstance; + let orchestrator: ReturnType; + + beforeEach(async () => { + orchestrator = createMockOrchestrator(); + app = await buildTestApp({ + serverRepo: createInMemoryServerRepo(), + instanceRepo: createInMemoryInstanceRepo(), + auditLogRepo: createInMemoryAuditLogRepo(), + orchestrator, + }); + }); + + afterAll(async () => { + if (app) await app.close(); + }); + + it('registers server with dockerImage, starts container, and creates instance', async () => { + // 1. Register managed server + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/servers', + payload: { + name: 'ha-mcp-docker', + description: 'HA MCP managed by Docker', + dockerImage: 'ghcr.io/homeassistant-ai/ha-mcp:2.4', + transport: 'STREAMABLE_HTTP', + containerPort: 3000, + command: ['python', '-c', 'print("hello")'], + envTemplate: [ + { name: 'HOMEASSISTANT_URL', description: 'HA URL' }, + { name: 'HOMEASSISTANT_TOKEN', description: 'HA token', isSecret: true }, + ], + }, + }); + + expect(createRes.statusCode).toBe(201); + const server = createRes.json<{ id: string; name: string; dockerImage: string; command: string[] }>(); + expect(server.name).toBe('ha-mcp-docker'); + expect(server.dockerImage).toBe('ghcr.io/homeassistant-ai/ha-mcp:2.4'); + expect(server.command).toEqual(['python', '-c', 'print("hello")']); + + // 2. Start container instance with env + const startRes = await app.inject({ + method: 'POST', + url: '/api/v1/instances', + payload: { + serverId: server.id, + env: { HOMEASSISTANT_URL: 'https://ha.example.com', HOMEASSISTANT_TOKEN: 'secret' }, + }, + }); + + expect(startRes.statusCode).toBe(201); + const instance = startRes.json<{ id: string; status: string; containerId: string }>(); + expect(instance.status).toBe('RUNNING'); + expect(instance.containerId).toBeTruthy(); + + // 3. Verify orchestrator was called with correct spec + expect(orchestrator.createContainer).toHaveBeenCalledTimes(1); + const spec = vi.mocked(orchestrator.createContainer).mock.calls[0]![0]; + expect(spec.image).toBe('ghcr.io/homeassistant-ai/ha-mcp:2.4'); + expect(spec.containerPort).toBe(3000); + expect(spec.command).toEqual(['python', '-c', 'print("hello")']); + expect(spec.env).toEqual({ + HOMEASSISTANT_URL: 'https://ha.example.com', + HOMEASSISTANT_TOKEN: 'secret', + }); + }); + + it('marks instance as ERROR when Docker fails', async () => { + vi.mocked(orchestrator.createContainer).mockRejectedValueOnce(new Error('Docker socket unavailable')); + + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/servers', + payload: { + name: 'failing-server', + description: 'Will fail to start', + dockerImage: 'some-image:latest', + transport: 'STDIO', + }, + }); + const server = createRes.json<{ id: string }>(); + + const startRes = await app.inject({ + method: 'POST', + url: '/api/v1/instances', + payload: { serverId: server.id }, + }); + + expect(startRes.statusCode).toBe(201); + const instance = startRes.json<{ id: string; status: string }>(); + expect(instance.status).toBe('ERROR'); + }); + }); + + describe('full lifecycle', () => { + let app: FastifyInstance; + let orchestrator: ReturnType; + + beforeEach(async () => { + orchestrator = createMockOrchestrator(); + app = await buildTestApp({ + serverRepo: createInMemoryServerRepo(), + instanceRepo: createInMemoryInstanceRepo(), + auditLogRepo: createInMemoryAuditLogRepo(), + orchestrator, + }); + }); + + afterAll(async () => { + if (app) await app.close(); + }); + + it('register → start → list → stop → remove', async () => { + // Register + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/servers', + payload: { + name: 'lifecycle-test', + description: 'Full lifecycle', + dockerImage: 'test:latest', + transport: 'SSE', + containerPort: 8080, + }, + }); + expect(createRes.statusCode).toBe(201); + const server = createRes.json<{ id: string }>(); + + // Start + const startRes = await app.inject({ + method: 'POST', + url: '/api/v1/instances', + payload: { serverId: server.id }, + }); + expect(startRes.statusCode).toBe(201); + const instance = startRes.json<{ id: string; status: string }>(); + expect(instance.status).toBe('RUNNING'); + + // List instances + const listRes = await app.inject({ + method: 'GET', + url: `/api/v1/instances?serverId=${server.id}`, + }); + expect(listRes.statusCode).toBe(200); + const instances = listRes.json>(); + expect(instances).toHaveLength(1); + + // Stop + const stopRes = await app.inject({ + method: 'POST', + url: `/api/v1/instances/${instance.id}/stop`, + }); + expect(stopRes.statusCode).toBe(200); + expect(stopRes.json<{ status: string }>().status).toBe('STOPPED'); + + // Remove + const removeRes = await app.inject({ + method: 'DELETE', + url: `/api/v1/instances/${instance.id}`, + }); + expect(removeRes.statusCode).toBe(204); + + // Verify instance is gone + const listAfter = await app.inject({ + method: 'GET', + url: `/api/v1/instances?serverId=${server.id}`, + }); + expect(listAfter.json()).toHaveLength(0); + + // Delete server + const deleteRes = await app.inject({ + method: 'DELETE', + url: `/api/v1/servers/${server.id}`, + }); + expect(deleteRes.statusCode).toBe(204); + + // Verify server is gone + const serversAfter = await app.inject({ method: 'GET', url: '/api/v1/servers' }); + expect(serversAfter.json()).toHaveLength(0); + }); + + it('external server lifecycle: register → start → proxy → stop → cleanup', async () => { + // Register external + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/servers', + payload: { + name: 'external-lifecycle', + transport: 'STREAMABLE_HTTP', + externalUrl: `http://localhost:${fakeMcpPort}`, + }, + }); + const server = createRes.json<{ id: string }>(); + + // Start (virtual instance) + const startRes = await app.inject({ + method: 'POST', + url: '/api/v1/instances', + payload: { serverId: server.id }, + }); + const instance = startRes.json<{ id: string; status: string; containerId: string | null }>(); + expect(instance.status).toBe('RUNNING'); + expect(instance.containerId).toBeNull(); + + // Proxy tools/list + const proxyRes = await app.inject({ + method: 'POST', + url: '/api/v1/mcp/proxy', + headers: { authorization: 'Bearer test-token' }, + payload: { serverId: server.id, method: 'tools/list' }, + }); + expect(proxyRes.statusCode).toBe(200); + expect(proxyRes.json<{ result: { tools: unknown[] } }>().result.tools.length).toBeGreaterThan(0); + + // Stop (no container to stop) + const stopRes = await app.inject({ + method: 'POST', + url: `/api/v1/instances/${instance.id}/stop`, + }); + expect(stopRes.statusCode).toBe(200); + expect(stopRes.json<{ status: string }>().status).toBe('STOPPED'); + + // Docker orchestrator should NOT have been called + expect(orchestrator.createContainer).not.toHaveBeenCalled(); + expect(orchestrator.stopContainer).not.toHaveBeenCalled(); + }); + }); + + describe('proxy authentication', () => { + let app: FastifyInstance; + + beforeEach(async () => { + app = await buildTestApp({ + serverRepo: createInMemoryServerRepo(), + instanceRepo: createInMemoryInstanceRepo(), + auditLogRepo: createInMemoryAuditLogRepo(), + orchestrator: createMockOrchestrator(), + }); + }); + + afterAll(async () => { + if (app) await app.close(); + }); + + it('rejects proxy calls without auth header', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/mcp/proxy', + payload: { serverId: 'srv-1', method: 'tools/list' }, + }); + // Auth middleware rejects with 401 (no Bearer token) + expect(res.statusCode).toBe(401); + }); + }); + + describe('server update flow', () => { + let app: FastifyInstance; + + beforeEach(async () => { + app = await buildTestApp({ + serverRepo: createInMemoryServerRepo(), + instanceRepo: createInMemoryInstanceRepo(), + auditLogRepo: createInMemoryAuditLogRepo(), + orchestrator: createMockOrchestrator(), + }); + }); + + afterAll(async () => { + if (app) await app.close(); + }); + + it('creates and updates server fields', async () => { + // Create + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/servers', + payload: { + name: 'updatable', + description: 'Original desc', + transport: 'STDIO', + }, + }); + const server = createRes.json<{ id: string; description: string }>(); + expect(server.description).toBe('Original desc'); + + // Update + const updateRes = await app.inject({ + method: 'PUT', + url: `/api/v1/servers/${server.id}`, + payload: { + description: 'Updated desc', + externalUrl: `http://localhost:${fakeMcpPort}`, + transport: 'STREAMABLE_HTTP', + }, + }); + expect(updateRes.statusCode).toBe(200); + const updated = updateRes.json<{ description: string; externalUrl: string; transport: string }>(); + expect(updated.description).toBe('Updated desc'); + expect(updated.externalUrl).toBe(`http://localhost:${fakeMcpPort}`); + expect(updated.transport).toBe('STREAMABLE_HTTP'); + + // Fetch to verify persistence + const getRes = await app.inject({ + method: 'GET', + url: `/api/v1/servers/${server.id}`, + }); + expect(getRes.json<{ description: string }>().description).toBe('Updated desc'); + }); + }); +});