feat: Kubernetes operator for MCP server management #47
@@ -487,15 +487,23 @@ async function main(): Promise<void> {
|
||||
await app.listen({ port: config.port, host: config.host });
|
||||
app.log.info(`mcpd listening on ${config.host}:${config.port}`);
|
||||
|
||||
// Periodic container liveness sync — detect crashed containers
|
||||
const SYNC_INTERVAL_MS = 30_000; // 30s
|
||||
const syncTimer = setInterval(async () => {
|
||||
// Periodic reconciliation loop — the operator's heartbeat.
|
||||
// Detects crashed/missing containers, cleans up ERROR instances,
|
||||
// and starts replacements to match desired replica counts.
|
||||
const RECONCILE_INTERVAL_MS = 30_000; // 30s
|
||||
const reconcileTimer = setInterval(async () => {
|
||||
try {
|
||||
await instanceService.syncStatus();
|
||||
} catch (err) {
|
||||
app.log.error({ err }, 'Container status sync failed');
|
||||
const { reconciled, errors } = await instanceService.reconcileAll();
|
||||
if (reconciled > 0) {
|
||||
app.log.info(`[reconcile] ${reconciled} server(s) reconciled`);
|
||||
}
|
||||
}, SYNC_INTERVAL_MS);
|
||||
for (const err of errors) {
|
||||
app.log.error(`[reconcile] ${err}`);
|
||||
}
|
||||
} catch (err) {
|
||||
app.log.error({ err }, 'Reconciliation loop failed');
|
||||
}
|
||||
}, RECONCILE_INTERVAL_MS);
|
||||
|
||||
// Health probe runner — periodic MCP tool-call probes (like k8s livenessProbe)
|
||||
const healthProbeRunner = new HealthProbeRunner(
|
||||
@@ -509,7 +517,7 @@ async function main(): Promise<void> {
|
||||
// Graceful shutdown
|
||||
setupGracefulShutdown(app, {
|
||||
disconnectDb: async () => {
|
||||
clearInterval(syncTimer);
|
||||
clearInterval(reconcileTimer);
|
||||
healthProbeRunner.stop();
|
||||
gitBackup.stop();
|
||||
await prisma.$disconnect();
|
||||
|
||||
@@ -107,6 +107,49 @@ export class InstanceService {
|
||||
return this.instanceRepo.findAll(serverId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile ALL servers — the operator loop.
|
||||
*
|
||||
* For every server with replicas > 0, ensures the correct number of
|
||||
* healthy instances exist. Cleans up ERROR instances and starts
|
||||
* replacements. This is the core self-healing mechanism.
|
||||
*/
|
||||
async reconcileAll(): Promise<{ reconciled: number; errors: string[] }> {
|
||||
await this.syncStatus();
|
||||
|
||||
const servers = await this.serverRepo.findAll();
|
||||
let reconciled = 0;
|
||||
const errors: string[] = [];
|
||||
|
||||
for (const server of servers) {
|
||||
if (server.replicas <= 0) continue;
|
||||
|
||||
try {
|
||||
const instances = await this.instanceRepo.findAll(server.id);
|
||||
const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING');
|
||||
const errored = instances.filter((i) => i.status === 'ERROR');
|
||||
|
||||
// Clean up ERROR instances so they don't accumulate
|
||||
for (const inst of errored) {
|
||||
await this.removeOne(inst);
|
||||
}
|
||||
|
||||
// Scale up if needed
|
||||
const toStart = server.replicas - active.length;
|
||||
if (toStart > 0) {
|
||||
for (let i = 0; i < toStart; i++) {
|
||||
await this.startOne(server.id);
|
||||
}
|
||||
reconciled++;
|
||||
}
|
||||
} catch (err) {
|
||||
errors.push(`${server.name}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
return { reconciled, errors };
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an instance (stop container + delete DB record).
|
||||
* Does NOT reconcile — caller should reconcile after if needed.
|
||||
|
||||
@@ -294,4 +294,99 @@ describe('InstanceService', () => {
|
||||
expect(result.stdout).toBe('log output');
|
||||
});
|
||||
});
|
||||
|
||||
describe('reconcileAll', () => {
|
||||
it('creates missing instances for servers with replicas > 0', async () => {
|
||||
const server = makeServer({ id: 'srv-1', name: 'grafana', replicas: 1 });
|
||||
vi.mocked(serverRepo.findAll).mockResolvedValue([server]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
// No instances exist
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([]);
|
||||
|
||||
const result = await service.reconcileAll();
|
||||
|
||||
expect(result.reconciled).toBe(1);
|
||||
expect(result.errors).toHaveLength(0);
|
||||
expect(instanceRepo.create).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('skips servers with replicas = 0', async () => {
|
||||
const server = makeServer({ id: 'srv-1', replicas: 0 });
|
||||
vi.mocked(serverRepo.findAll).mockResolvedValue([server]);
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([]);
|
||||
|
||||
const result = await service.reconcileAll();
|
||||
|
||||
expect(result.reconciled).toBe(0);
|
||||
expect(instanceRepo.create).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('does not create instances when already at desired count', async () => {
|
||||
const server = makeServer({ id: 'srv-1', replicas: 1 });
|
||||
vi.mocked(serverRepo.findAll).mockResolvedValue([server]);
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([
|
||||
makeInstance({ id: 'inst-1', serverId: 'srv-1', status: 'RUNNING' }),
|
||||
]);
|
||||
|
||||
const result = await service.reconcileAll();
|
||||
|
||||
expect(result.reconciled).toBe(0);
|
||||
expect(instanceRepo.create).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('cleans up ERROR instances and creates replacements', async () => {
|
||||
const server = makeServer({ id: 'srv-1', replicas: 1 });
|
||||
vi.mocked(serverRepo.findAll).mockResolvedValue([server]);
|
||||
vi.mocked(serverRepo.findById).mockResolvedValue(server);
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([
|
||||
makeInstance({ id: 'inst-dead', serverId: 'srv-1', status: 'ERROR', containerId: 'ctr-dead' }),
|
||||
]);
|
||||
|
||||
const result = await service.reconcileAll();
|
||||
|
||||
// Should delete ERROR instance and create a new one
|
||||
expect(result.reconciled).toBe(1);
|
||||
expect(instanceRepo.delete).toHaveBeenCalledWith('inst-dead');
|
||||
expect(instanceRepo.create).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('reconciles multiple servers independently', async () => {
|
||||
const srv1 = makeServer({ id: 'srv-1', name: 'grafana', replicas: 1, dockerImage: 'grafana:latest' });
|
||||
const srv2 = makeServer({ id: 'srv-2', name: 'node-red', replicas: 1, dockerImage: 'nodered:latest' });
|
||||
vi.mocked(serverRepo.findAll).mockResolvedValue([srv1, srv2]);
|
||||
vi.mocked(serverRepo.findById).mockImplementation(async (id) => {
|
||||
if (id === 'srv-1') return srv1;
|
||||
if (id === 'srv-2') return srv2;
|
||||
return null;
|
||||
});
|
||||
// srv-1 has a running instance, srv-2 has none
|
||||
vi.mocked(instanceRepo.findAll).mockImplementation(async (serverId) => {
|
||||
if (serverId === 'srv-1') return [makeInstance({ serverId: 'srv-1', status: 'RUNNING' })];
|
||||
return [];
|
||||
});
|
||||
|
||||
const result = await service.reconcileAll();
|
||||
|
||||
// Only srv-2 needed reconciliation
|
||||
expect(result.reconciled).toBe(1);
|
||||
});
|
||||
|
||||
it('collects errors without stopping other servers', async () => {
|
||||
const srv1 = makeServer({ id: 'srv-1', name: 'broken', replicas: 1 });
|
||||
const srv2 = makeServer({ id: 'srv-2', name: 'healthy', replicas: 1, dockerImage: 'img:latest' });
|
||||
vi.mocked(serverRepo.findAll).mockResolvedValue([srv1, srv2]);
|
||||
vi.mocked(serverRepo.findById).mockImplementation(async (id) => {
|
||||
if (id === 'srv-2') return srv2;
|
||||
return null; // srv-1 can't be found → will error
|
||||
});
|
||||
vi.mocked(instanceRepo.findAll).mockResolvedValue([]);
|
||||
|
||||
const result = await service.reconcileAll();
|
||||
|
||||
// srv-1 errored, srv-2 reconciled
|
||||
expect(result.errors).toHaveLength(1);
|
||||
expect(result.errors[0]).toContain('broken');
|
||||
expect(result.reconciled).toBe(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user