Files
mcpctl/src/mcpd/src/services/instance.service.ts
Michal 016f8abe68
All checks were successful
CI/CD / typecheck (pull_request) Successful in 52s
CI/CD / lint (pull_request) Successful in 1m53s
CI/CD / test (pull_request) Successful in 1m2s
CI/CD / build (pull_request) Successful in 4m0s
CI/CD / smoke (pull_request) Successful in 8m38s
CI/CD / publish-rpm (pull_request) Has been skipped
CI/CD / publish-deb (pull_request) Has been skipped
fix: accurate instance status — STARTING until pod is actually running
Instance status now reflects actual container state:
- startOne() sets STARTING (not RUNNING) after container creation
- syncStatus() promotes STARTING→RUNNING when pod is ready
- syncStatus() demotes RUNNING→STARTING if pod restarts (CrashLoop)
- External servers still get RUNNING immediately (no container)

Previously, CrashLooping pods showed as RUNNING in mcpctl get instances.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 23:45:10 +01:00

339 lines
12 KiB
TypeScript

import type { McpInstance } from '@prisma/client';
import type { IMcpInstanceRepository, IMcpServerRepository, ISecretRepository } from '../repositories/interfaces.js';
import type { McpOrchestrator, ContainerSpec, ContainerInfo } from './orchestrator.js';
import { NotFoundError } from './mcp-server.service.js';
import { resolveServerEnv } from './env-resolver.js';
/** Runner images for package-based MCP servers, keyed by runtime name. */
const RUNNER_IMAGES: Record<string, string> = {
node: process.env['MCPD_NODE_RUNNER_IMAGE'] ?? 'mysources.co.uk/michal/mcpctl-node-runner:latest',
python: process.env['MCPD_PYTHON_RUNNER_IMAGE'] ?? 'mysources.co.uk/michal/mcpctl-python-runner:latest',
};
/** Network for MCP server containers (matches docker-compose mcp-servers network). */
const MCP_SERVERS_NETWORK = process.env['MCPD_MCP_NETWORK'] ?? 'mcp-servers';
export class InvalidStateError extends Error {
readonly statusCode = 409;
constructor(message: string) {
super(message);
this.name = 'InvalidStateError';
}
}
export class InstanceService {
constructor(
private instanceRepo: IMcpInstanceRepository,
private serverRepo: IMcpServerRepository,
private orchestrator: McpOrchestrator,
private secretRepo?: ISecretRepository,
) {}
async list(serverId?: string): Promise<McpInstance[]> {
return this.instanceRepo.findAll(serverId);
}
async getById(id: string): Promise<McpInstance> {
const instance = await this.instanceRepo.findById(id);
if (!instance) throw new NotFoundError(`Instance '${id}' not found`);
return instance;
}
/**
* Sync instance statuses with actual container state.
* Detects crashed/stopped containers and marks them ERROR.
*/
async syncStatus(): Promise<void> {
const instances = await this.instanceRepo.findAll();
for (const inst of instances) {
if ((inst.status === 'RUNNING' || inst.status === 'STARTING') && inst.containerId) {
try {
const info = await this.orchestrator.inspectContainer(inst.containerId);
if (info.state === 'stopped' || info.state === 'error') {
// Container died — get last logs for error context
let errorMsg = `Container ${info.state}`;
try {
const logs = await this.orchestrator.getContainerLogs(inst.containerId, { tail: 5 });
const lastLog = (logs.stdout || logs.stderr).trim().split('\n').pop();
if (lastLog) errorMsg = lastLog;
} catch { /* best-effort */ }
await this.instanceRepo.updateStatus(inst.id, 'ERROR', {
metadata: { error: errorMsg },
});
} else if (info.state === 'starting' && inst.status === 'RUNNING') {
// Pod went back to starting (e.g. CrashLoopBackOff restart)
await this.instanceRepo.updateStatus(inst.id, 'STARTING', {});
} else if (info.state === 'running' && inst.status === 'STARTING') {
// Pod became ready — promote to RUNNING
await this.instanceRepo.updateStatus(inst.id, 'RUNNING', {});
}
} catch {
// Container gone entirely
await this.instanceRepo.updateStatus(inst.id, 'ERROR', {
metadata: { error: 'Container not found' },
});
}
}
}
}
/**
* Reconcile instances for a server to match desired replica count.
* - Syncs container statuses first (detect crashed containers)
* - If fewer running instances than replicas: start new ones
* - If more running instances than replicas: remove excess (oldest first)
*/
async reconcile(serverId: string): Promise<McpInstance[]> {
const server = await this.serverRepo.findById(serverId);
if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`);
// Sync container statuses before counting active instances
await this.syncStatus();
const instances = await this.instanceRepo.findAll(serverId);
const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING');
const desired = server.replicas;
if (active.length < desired) {
// Scale up
const toStart = desired - active.length;
for (let i = 0; i < toStart; i++) {
await this.startOne(serverId);
}
} else if (active.length > desired) {
// Scale down — remove oldest first
const excess = active
.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime())
.slice(0, active.length - desired);
for (const inst of excess) {
await this.removeOne(inst);
}
}
return this.instanceRepo.findAll(serverId);
}
/**
* Reconcile ALL servers — the operator loop.
*
* For every server with replicas > 0, ensures the correct number of
* healthy instances exist. Cleans up ERROR instances and starts
* replacements. This is the core self-healing mechanism.
*/
async reconcileAll(): Promise<{ reconciled: number; errors: string[] }> {
await this.syncStatus();
const servers = await this.serverRepo.findAll();
let reconciled = 0;
const errors: string[] = [];
for (const server of servers) {
if (server.replicas <= 0) continue;
try {
const instances = await this.instanceRepo.findAll(server.id);
const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING');
const errored = instances.filter((i) => i.status === 'ERROR');
// Clean up ERROR instances so they don't accumulate
for (const inst of errored) {
await this.removeOne(inst);
}
// Scale up if needed
const toStart = server.replicas - active.length;
if (toStart > 0) {
for (let i = 0; i < toStart; i++) {
await this.startOne(server.id);
}
reconciled++;
}
} catch (err) {
errors.push(`${server.name}: ${err instanceof Error ? err.message : String(err)}`);
}
}
return { reconciled, errors };
}
/**
* Remove an instance (stop container + delete DB record).
* Does NOT reconcile — caller should reconcile after if needed.
*/
async remove(id: string): Promise<{ serverId: string }> {
const instance = await this.getById(id);
if (instance.containerId) {
try {
await this.orchestrator.stopContainer(instance.containerId);
} catch {
// Container may already be stopped
}
try {
await this.orchestrator.removeContainer(instance.containerId, true);
} catch {
// Container may already be gone
}
}
await this.instanceRepo.delete(id);
return { serverId: instance.serverId };
}
/**
* Remove all instances for a server (used before server deletion).
* Stops all containers so Prisma cascade only cleans up DB records.
*/
async removeAllForServer(serverId: string): Promise<void> {
const instances = await this.instanceRepo.findAll(serverId);
for (const inst of instances) {
if (inst.containerId) {
try {
await this.orchestrator.stopContainer(inst.containerId);
} catch {
// best-effort
}
try {
await this.orchestrator.removeContainer(inst.containerId, true);
} catch {
// best-effort
}
}
}
}
async inspect(id: string): Promise<ContainerInfo> {
const instance = await this.getById(id);
if (!instance.containerId) {
throw new InvalidStateError(`Instance '${id}' has no container`);
}
return this.orchestrator.inspectContainer(instance.containerId);
}
async getLogs(id: string, opts?: { tail?: number }): Promise<{ stdout: string; stderr: string }> {
const instance = await this.getById(id);
if (!instance.containerId) {
return { stdout: '', stderr: '' };
}
return this.orchestrator.getContainerLogs(instance.containerId, opts);
}
/** Start a single instance for a server. */
private async startOne(serverId: string): Promise<McpInstance> {
const server = await this.serverRepo.findById(serverId);
if (!server) throw new NotFoundError(`McpServer '${serverId}' not found`);
// External servers don't need container management
if (server.externalUrl) {
return this.instanceRepo.create({
serverId,
status: 'RUNNING',
metadata: { external: true, url: server.externalUrl },
});
}
// Determine image + command based on server config:
// 1. Explicit dockerImage → use as-is
// 2. packageName → use runtime-specific runner image (node/python/go/...)
// 3. Fallback → server name (legacy)
let image: string;
let pkgCommand: string[] | undefined;
if (server.dockerImage) {
image = server.dockerImage;
} else if (server.packageName) {
const runtime = (server.runtime as string | null) ?? 'node';
image = RUNNER_IMAGES[runtime] ?? RUNNER_IMAGES['node']!;
// Runner entrypoint handles package execution (npx -y / uvx / go run)
const serverCommand = server.command as string[] | null;
pkgCommand = [server.packageName, ...(serverCommand ?? [])];
} else {
image = server.name;
}
let instance = await this.instanceRepo.create({
serverId,
status: 'STARTING',
});
try {
const spec: ContainerSpec = {
image,
name: `mcpctl-${server.name}-${instance.id}`,
hostPort: null,
network: MCP_SERVERS_NETWORK,
labels: {
'mcpctl.server-id': serverId,
'mcpctl.instance-id': instance.id,
},
};
if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') {
spec.containerPort = server.containerPort ?? 3000;
}
// Package-based servers: command = [packageName, ...args] (entrypoint handles execution)
// Docker-image servers: use explicit command if provided
if (pkgCommand) {
spec.command = pkgCommand;
} else {
const command = server.command as string[] | null;
if (command) {
spec.command = command;
}
}
// Resolve env vars from inline values and secret refs
if (this.secretRepo) {
try {
const resolvedEnv = await resolveServerEnv(server, this.secretRepo);
if (Object.keys(resolvedEnv).length > 0) {
spec.env = resolvedEnv;
}
} catch (envErr) {
// Log but don't prevent startup — env resolution failures are non-fatal
// The container may still work if env vars are optional
}
}
// Pull image if not available locally
try {
await this.orchestrator.pullImage(image);
} catch {
// Image may already be available locally
}
const containerInfo = await this.orchestrator.createContainer(spec);
const updateFields: { containerId: string; port?: number } = {
containerId: containerInfo.containerId,
};
if (containerInfo.port !== undefined) {
updateFields.port = containerInfo.port;
}
// Set STARTING — syncStatus will promote to RUNNING once the container is actually ready
instance = await this.instanceRepo.updateStatus(instance.id, 'STARTING', updateFields);
} catch (err) {
instance = await this.instanceRepo.updateStatus(instance.id, 'ERROR', {
metadata: { error: err instanceof Error ? err.message : String(err) },
});
}
return instance;
}
/** Stop and remove a single instance. */
private async removeOne(instance: McpInstance): Promise<void> {
if (instance.containerId) {
try {
await this.orchestrator.stopContainer(instance.containerId);
} catch { /* best-effort */ }
try {
await this.orchestrator.removeContainer(instance.containerId, true);
} catch { /* best-effort */ }
}
await this.instanceRepo.delete(instance.id);
}
}