From d293df738a3925ef167674f4335d8cb4d58da360 Mon Sep 17 00:00:00 2001 From: Michal Date: Wed, 8 Apr 2026 19:00:19 +0100 Subject: [PATCH] feat: automatic reconciliation loop for MCP server instances mcpd now runs a periodic reconcileAll() every 30s that: - Detects crashed/missing containers (syncStatus) - Cleans up ERROR instances - Creates replacement pods to match desired replica count This replaces the old syncStatus-only timer. Servers migrated from another deployment or recovering from node failures will automatically get their instances recreated. 6 new tests for reconcileAll covering: missing instances, skip replicas=0, already-at-count, ERROR cleanup, multi-server, error isolation. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/mcpd/src/main.ts | 22 ++++-- src/mcpd/src/services/instance.service.ts | 43 ++++++++++ src/mcpd/tests/instance-service.test.ts | 95 +++++++++++++++++++++++ 3 files changed, 153 insertions(+), 7 deletions(-) diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index 7524207..a7c212d 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -487,15 +487,23 @@ async function main(): Promise { await app.listen({ port: config.port, host: config.host }); app.log.info(`mcpd listening on ${config.host}:${config.port}`); - // Periodic container liveness sync — detect crashed containers - const SYNC_INTERVAL_MS = 30_000; // 30s - const syncTimer = setInterval(async () => { + // Periodic reconciliation loop — the operator's heartbeat. + // Detects crashed/missing containers, cleans up ERROR instances, + // and starts replacements to match desired replica counts. + const RECONCILE_INTERVAL_MS = 30_000; // 30s + const reconcileTimer = setInterval(async () => { try { - await instanceService.syncStatus(); + const { reconciled, errors } = await instanceService.reconcileAll(); + if (reconciled > 0) { + app.log.info(`[reconcile] ${reconciled} server(s) reconciled`); + } + for (const err of errors) { + app.log.error(`[reconcile] ${err}`); + } } catch (err) { - app.log.error({ err }, 'Container status sync failed'); + app.log.error({ err }, 'Reconciliation loop failed'); } - }, SYNC_INTERVAL_MS); + }, RECONCILE_INTERVAL_MS); // Health probe runner — periodic MCP tool-call probes (like k8s livenessProbe) const healthProbeRunner = new HealthProbeRunner( @@ -509,7 +517,7 @@ async function main(): Promise { // Graceful shutdown setupGracefulShutdown(app, { disconnectDb: async () => { - clearInterval(syncTimer); + clearInterval(reconcileTimer); healthProbeRunner.stop(); gitBackup.stop(); await prisma.$disconnect(); diff --git a/src/mcpd/src/services/instance.service.ts b/src/mcpd/src/services/instance.service.ts index d5509f7..7129f67 100644 --- a/src/mcpd/src/services/instance.service.ts +++ b/src/mcpd/src/services/instance.service.ts @@ -107,6 +107,49 @@ export class InstanceService { return this.instanceRepo.findAll(serverId); } + /** + * Reconcile ALL servers — the operator loop. + * + * For every server with replicas > 0, ensures the correct number of + * healthy instances exist. Cleans up ERROR instances and starts + * replacements. This is the core self-healing mechanism. + */ + async reconcileAll(): Promise<{ reconciled: number; errors: string[] }> { + await this.syncStatus(); + + const servers = await this.serverRepo.findAll(); + let reconciled = 0; + const errors: string[] = []; + + for (const server of servers) { + if (server.replicas <= 0) continue; + + try { + const instances = await this.instanceRepo.findAll(server.id); + const active = instances.filter((i) => i.status === 'RUNNING' || i.status === 'STARTING'); + const errored = instances.filter((i) => i.status === 'ERROR'); + + // Clean up ERROR instances so they don't accumulate + for (const inst of errored) { + await this.removeOne(inst); + } + + // Scale up if needed + const toStart = server.replicas - active.length; + if (toStart > 0) { + for (let i = 0; i < toStart; i++) { + await this.startOne(server.id); + } + reconciled++; + } + } catch (err) { + errors.push(`${server.name}: ${err instanceof Error ? err.message : String(err)}`); + } + } + + return { reconciled, errors }; + } + /** * Remove an instance (stop container + delete DB record). * Does NOT reconcile — caller should reconcile after if needed. diff --git a/src/mcpd/tests/instance-service.test.ts b/src/mcpd/tests/instance-service.test.ts index db4a4d7..88fc73e 100644 --- a/src/mcpd/tests/instance-service.test.ts +++ b/src/mcpd/tests/instance-service.test.ts @@ -294,4 +294,99 @@ describe('InstanceService', () => { expect(result.stdout).toBe('log output'); }); }); + + describe('reconcileAll', () => { + it('creates missing instances for servers with replicas > 0', async () => { + const server = makeServer({ id: 'srv-1', name: 'grafana', replicas: 1 }); + vi.mocked(serverRepo.findAll).mockResolvedValue([server]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + // No instances exist + vi.mocked(instanceRepo.findAll).mockResolvedValue([]); + + const result = await service.reconcileAll(); + + expect(result.reconciled).toBe(1); + expect(result.errors).toHaveLength(0); + expect(instanceRepo.create).toHaveBeenCalled(); + }); + + it('skips servers with replicas = 0', async () => { + const server = makeServer({ id: 'srv-1', replicas: 0 }); + vi.mocked(serverRepo.findAll).mockResolvedValue([server]); + vi.mocked(instanceRepo.findAll).mockResolvedValue([]); + + const result = await service.reconcileAll(); + + expect(result.reconciled).toBe(0); + expect(instanceRepo.create).not.toHaveBeenCalled(); + }); + + it('does not create instances when already at desired count', async () => { + const server = makeServer({ id: 'srv-1', replicas: 1 }); + vi.mocked(serverRepo.findAll).mockResolvedValue([server]); + vi.mocked(instanceRepo.findAll).mockResolvedValue([ + makeInstance({ id: 'inst-1', serverId: 'srv-1', status: 'RUNNING' }), + ]); + + const result = await service.reconcileAll(); + + expect(result.reconciled).toBe(0); + expect(instanceRepo.create).not.toHaveBeenCalled(); + }); + + it('cleans up ERROR instances and creates replacements', async () => { + const server = makeServer({ id: 'srv-1', replicas: 1 }); + vi.mocked(serverRepo.findAll).mockResolvedValue([server]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(instanceRepo.findAll).mockResolvedValue([ + makeInstance({ id: 'inst-dead', serverId: 'srv-1', status: 'ERROR', containerId: 'ctr-dead' }), + ]); + + const result = await service.reconcileAll(); + + // Should delete ERROR instance and create a new one + expect(result.reconciled).toBe(1); + expect(instanceRepo.delete).toHaveBeenCalledWith('inst-dead'); + expect(instanceRepo.create).toHaveBeenCalled(); + }); + + it('reconciles multiple servers independently', async () => { + const srv1 = makeServer({ id: 'srv-1', name: 'grafana', replicas: 1, dockerImage: 'grafana:latest' }); + const srv2 = makeServer({ id: 'srv-2', name: 'node-red', replicas: 1, dockerImage: 'nodered:latest' }); + vi.mocked(serverRepo.findAll).mockResolvedValue([srv1, srv2]); + vi.mocked(serverRepo.findById).mockImplementation(async (id) => { + if (id === 'srv-1') return srv1; + if (id === 'srv-2') return srv2; + return null; + }); + // srv-1 has a running instance, srv-2 has none + vi.mocked(instanceRepo.findAll).mockImplementation(async (serverId) => { + if (serverId === 'srv-1') return [makeInstance({ serverId: 'srv-1', status: 'RUNNING' })]; + return []; + }); + + const result = await service.reconcileAll(); + + // Only srv-2 needed reconciliation + expect(result.reconciled).toBe(1); + }); + + it('collects errors without stopping other servers', async () => { + const srv1 = makeServer({ id: 'srv-1', name: 'broken', replicas: 1 }); + const srv2 = makeServer({ id: 'srv-2', name: 'healthy', replicas: 1, dockerImage: 'img:latest' }); + vi.mocked(serverRepo.findAll).mockResolvedValue([srv1, srv2]); + vi.mocked(serverRepo.findById).mockImplementation(async (id) => { + if (id === 'srv-2') return srv2; + return null; // srv-1 can't be found → will error + }); + vi.mocked(instanceRepo.findAll).mockResolvedValue([]); + + const result = await service.reconcileAll(); + + // srv-1 errored, srv-2 reconciled + expect(result.errors).toHaveLength(1); + expect(result.errors[0]).toContain('broken'); + expect(result.reconciled).toBe(1); + }); + }); });