diff --git a/fulldeploy.sh b/fulldeploy.sh index 5e8e4d8..1471494 100755 --- a/fulldeploy.sh +++ b/fulldeploy.sh @@ -1,5 +1,13 @@ #!/bin/bash -# Full deployment: Docker image → Portainer stack → RPM build/publish/install +# Full deployment: mcpd image → k8s rollout → RPM build/publish/install +# +# Production runtime is Kubernetes (context: worker0-k8s0, namespace: mcpctl). +# The docker-compose stack under stack/ + deploy/ is kept for local/VM testing +# only and is no longer invoked from here. +# +# Infra (Deployment shape, env, RBAC, NetworkPolicies) is managed by Pulumi +# in ../kubernetes-deployment. This script runs `pulumi preview` before the +# rollout; if there is infra drift it halts so you can `pulumi up` first. set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" @@ -10,19 +18,50 @@ if [ -f .env ]; then set -a; source .env; set +a fi +KUBE_CONTEXT="${KUBE_CONTEXT:-worker0-k8s0}" +KUBE_NAMESPACE="${KUBE_NAMESPACE:-mcpctl}" +KUBE_DEPLOYMENT="${KUBE_DEPLOYMENT:-mcpd}" +PULUMI_DIR="${PULUMI_DIR:-$SCRIPT_DIR/../kubernetes-deployment}" +PULUMI_STACK="${PULUMI_STACK:-homelab}" + echo "========================================" echo " mcpctl Full Deploy" echo "========================================" +# --- Pre-flight: Pulumi drift check --- +echo "" +echo ">>> Pre-flight: checking for Pulumi infra drift" +echo "" +if [ -d "$PULUMI_DIR" ]; then + if [ -z "$PULUMI_CONFIG_PASSPHRASE" ]; then + echo " WARNING: PULUMI_CONFIG_PASSPHRASE not set — skipping drift check." + echo " Set it in .env or export it to enable." + else + preview_output=$(cd "$PULUMI_DIR" && pulumi preview --stack "$PULUMI_STACK" --non-interactive --diff 2>&1) || true + if echo "$preview_output" | grep -qE '^\s+[-+~]'; then + echo "$preview_output" + echo "" + echo "ERROR: Pulumi detected infra changes that have not been applied." + echo " Run: cd $PULUMI_DIR && pulumi up -s $PULUMI_STACK" + echo " Then re-run this script." + exit 1 + fi + echo " No drift — infra is in sync." + fi # passphrase check +else + echo " WARNING: Pulumi repo not found at $PULUMI_DIR — skipping drift check." +fi + echo "" echo ">>> Step 1/3: Build & push mcpd Docker image" echo "" bash scripts/build-mcpd.sh "$@" echo "" -echo ">>> Step 2/3: Deploy stack to production" +echo ">>> Step 2/3: Roll out mcpd on k8s ($KUBE_CONTEXT / $KUBE_NAMESPACE)" echo "" -bash deploy.sh +kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" rollout restart "deployment/$KUBE_DEPLOYMENT" +kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" rollout status "deployment/$KUBE_DEPLOYMENT" --timeout=3m echo "" echo ">>> Step 3/3: Build, publish & install RPM" diff --git a/scripts/release.sh b/scripts/release.sh index dfa5b70..fc6cfc8 100755 --- a/scripts/release.sh +++ b/scripts/release.sh @@ -54,7 +54,7 @@ if command -v dpkg &>/dev/null && ! command -v dnf &>/dev/null; then sudo dpkg -i "$DEB_FILE" || sudo apt-get install -f -y else # RPM filenames use x86_64/aarch64, not amd64/arm64 - local rpm_arch + rpm_arch="" case "$NATIVE_ARCH" in amd64) rpm_arch="x86_64" ;; arm64) rpm_arch="aarch64" ;; *) rpm_arch="$NATIVE_ARCH" ;; esac RPM_FILE=$(ls dist/mcpctl-*.rpm 2>/dev/null | grep -E "[._]${rpm_arch}[._]" | head -1) sudo rpm -U --force "$RPM_FILE" diff --git a/src/cli/src/commands/get.ts b/src/cli/src/commands/get.ts index af82a06..30c38cd 100644 --- a/src/cli/src/commands/get.ts +++ b/src/cli/src/commands/get.ts @@ -174,7 +174,7 @@ const promptRequestColumns: Column[] = [ const instanceColumns: Column[] = [ { header: 'NAME', key: (r) => r.server?.name ?? '-', width: 20 }, { header: 'STATUS', key: 'status', width: 10 }, - { header: 'HEALTH', key: (r) => r.healthStatus ?? '-', width: 10 }, + { header: 'HEALTH', key: (r) => r.healthStatus ?? 'unknown', width: 10 }, { header: 'PORT', key: (r) => r.port != null ? String(r.port) : '-', width: 6 }, { header: 'CONTAINER', key: (r) => r.containerId ? r.containerId.slice(0, 12) : '-', width: 14 }, { header: 'ID', key: 'id' }, diff --git a/src/db/src/seed/index.ts b/src/db/src/seed/index.ts index 6af9046..e8872e3 100644 --- a/src/db/src/seed/index.ts +++ b/src/db/src/seed/index.ts @@ -8,7 +8,8 @@ export interface TemplateEnvEntry { } export interface HealthCheckSpec { - tool: string; + /** When set, probe sends initialize + tools/call (readiness). When omitted, probe sends tools/list only (liveness). */ + tool?: string; arguments?: Record; intervalSeconds?: number; timeoutSeconds?: number; diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index aac4493..7afef23 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -505,12 +505,15 @@ async function main(): Promise { } }, RECONCILE_INTERVAL_MS); - // Health probe runner — periodic MCP tool-call probes (like k8s livenessProbe) + // Health probe runner — periodic MCP probes (like k8s livenessProbe). + // Without explicit healthCheck.tool, probes send tools/list through + // McpProxyService so they traverse the exact production call path. const healthProbeRunner = new HealthProbeRunner( instanceRepo, serverRepo, orchestrator, { info: (msg) => app.log.info(msg), error: (obj, msg) => app.log.error(obj, msg) }, + mcpProxyService, ); healthProbeRunner.start(15_000); diff --git a/src/mcpd/src/services/health-probe.service.ts b/src/mcpd/src/services/health-probe.service.ts index 2218106..199c718 100644 --- a/src/mcpd/src/services/health-probe.service.ts +++ b/src/mcpd/src/services/health-probe.service.ts @@ -1,15 +1,24 @@ import type { McpServer, McpInstance } from '@prisma/client'; import type { IMcpInstanceRepository, IMcpServerRepository } from '../repositories/interfaces.js'; import type { McpOrchestrator } from './orchestrator.js'; +import type { McpProxyService } from './mcp-proxy-service.js'; export interface HealthCheckSpec { - tool: string; + /** When set, probe sends initialize + tools/call (readiness). When omitted, probe sends tools/list only (liveness). */ + tool?: string; arguments?: Record; intervalSeconds?: number; timeoutSeconds?: number; failureThreshold?: number; } +/** Default liveness probe applied to any RUNNING instance whose server has no explicit healthCheck. */ +export const DEFAULT_HEALTH_CHECK: HealthCheckSpec = { + intervalSeconds: 30, + timeoutSeconds: 8, + failureThreshold: 3, +}; + export interface ProbeResult { healthy: boolean; latencyMs: number; @@ -39,6 +48,8 @@ export class HealthProbeRunner { private serverRepo: IMcpServerRepository, private orchestrator: McpOrchestrator, private logger?: { info: (msg: string) => void; error: (obj: unknown, msg: string) => void }, + /** Used for liveness probes (no explicit tool) — routes tools/list through the real production path. */ + private mcpProxyService?: McpProxyService, ) {} /** Start the periodic probe loop. Runs every `tickIntervalMs` (default 15s). */ @@ -75,8 +86,8 @@ export class HealthProbeRunner { server = s; } - const healthCheck = server.healthCheck as HealthCheckSpec | null; - if (!healthCheck) continue; + // Any server without an explicit healthCheck gets the default liveness probe. + const healthCheck: HealthCheckSpec = (server.healthCheck as HealthCheckSpec | null) ?? DEFAULT_HEALTH_CHECK; const intervalMs = (healthCheck.intervalSeconds ?? 60) * 1000; const state = this.probeStates.get(inst.id); @@ -111,10 +122,18 @@ export class HealthProbeRunner { let result: ProbeResult; try { - if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') { - result = await this.probeHttp(instance, server, healthCheck, timeoutMs); + if (healthCheck.tool === undefined) { + // Liveness probe: send tools/list through the real production path. + // Mirrors exactly what mcplocal/client calls do, so synthetic and real + // failures converge on the same signal. + result = await this.probeLiveness(server, timeoutMs); } else { - result = await this.probeStdio(instance, server, healthCheck, timeoutMs); + const readinessCheck = healthCheck as HealthCheckSpec & { tool: string }; + if (server.transport === 'SSE' || server.transport === 'STREAMABLE_HTTP') { + result = await this.probeHttp(instance, server, readinessCheck, timeoutMs); + } else { + result = await this.probeStdio(instance, server, readinessCheck, timeoutMs); + } } } catch (err) { result = { @@ -169,11 +188,47 @@ export class HealthProbeRunner { return result; } + /** + * Liveness probe — sends tools/list via McpProxyService so the probe traverses + * the exact code path production clients use. Works uniformly across every + * transport (STDIO exec/attach, SSE, Streamable HTTP, external). + */ + private async probeLiveness(server: McpServer, timeoutMs: number): Promise { + const start = Date.now(); + if (!this.mcpProxyService) { + return { healthy: false, latencyMs: 0, message: 'mcpProxyService not wired — cannot run default liveness probe' }; + } + + const deadline = new Promise((resolve) => { + setTimeout(() => resolve({ + healthy: false, + latencyMs: timeoutMs, + message: `Liveness probe timed out after ${timeoutMs}ms`, + }), timeoutMs); + }); + + const probe = this.mcpProxyService.execute({ serverId: server.id, method: 'tools/list' }) + .then((response): ProbeResult => { + const latencyMs = Date.now() - start; + if (response.error) { + return { healthy: false, latencyMs, message: response.error.message ?? 'tools/list error' }; + } + return { healthy: true, latencyMs, message: 'ok' }; + }) + .catch((err: unknown): ProbeResult => ({ + healthy: false, + latencyMs: Date.now() - start, + message: err instanceof Error ? err.message : String(err), + })); + + return Promise.race([probe, deadline]); + } + /** Probe an HTTP/SSE MCP server by sending a JSON-RPC tool call. */ private async probeHttp( instance: McpInstance, server: McpServer, - healthCheck: HealthCheckSpec, + healthCheck: HealthCheckSpec & { tool: string }, timeoutMs: number, ): Promise { if (!instance.containerId) { @@ -205,7 +260,7 @@ export class HealthProbeRunner { */ private async probeStreamableHttp( baseUrl: string, - healthCheck: HealthCheckSpec, + healthCheck: HealthCheckSpec & { tool: string }, timeoutMs: number, ): Promise { const start = Date.now(); @@ -274,7 +329,7 @@ export class HealthProbeRunner { */ private async probeSse( baseUrl: string, - healthCheck: HealthCheckSpec, + healthCheck: HealthCheckSpec & { tool: string }, timeoutMs: number, ): Promise { const start = Date.now(); @@ -415,7 +470,7 @@ export class HealthProbeRunner { private async probeStdio( instance: McpInstance, server: McpServer, - healthCheck: HealthCheckSpec, + healthCheck: HealthCheckSpec & { tool: string }, timeoutMs: number, ): Promise { if (!instance.containerId) { diff --git a/src/mcpd/tests/services/health-probe.test.ts b/src/mcpd/tests/services/health-probe.test.ts index 98df7bc..9c00b10 100644 --- a/src/mcpd/tests/services/health-probe.test.ts +++ b/src/mcpd/tests/services/health-probe.test.ts @@ -1,8 +1,9 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { HealthProbeRunner } from '../../src/services/health-probe.service.js'; +import { HealthProbeRunner, DEFAULT_HEALTH_CHECK } from '../../src/services/health-probe.service.js'; import type { HealthCheckSpec } from '../../src/services/health-probe.service.js'; import type { IMcpInstanceRepository, IMcpServerRepository } from '../../src/repositories/interfaces.js'; -import type { McpOrchestrator, ExecResult } from '../../src/services/orchestrator.js'; +import type { McpOrchestrator } from '../../src/services/orchestrator.js'; +import type { McpProxyService, McpProxyResponse } from '../../src/services/mcp-proxy-service.js'; import type { McpInstance, McpServer } from '@prisma/client'; function makeInstance(overrides: Partial = {}): McpInstance { @@ -87,20 +88,30 @@ function mockOrchestrator(): McpOrchestrator { }; } +function mockMcpProxyService(): McpProxyService { + return { + execute: vi.fn(async (): Promise => ({ jsonrpc: '2.0', id: 1, result: { tools: [] } })), + closeAll: vi.fn(), + removeClient: vi.fn(), + } as unknown as McpProxyService; +} + describe('HealthProbeRunner', () => { let instanceRepo: IMcpInstanceRepository; let serverRepo: IMcpServerRepository; let orchestrator: McpOrchestrator; + let mcpProxyService: McpProxyService; let runner: HealthProbeRunner; beforeEach(() => { instanceRepo = mockInstanceRepo(); serverRepo = mockServerRepo(); orchestrator = mockOrchestrator(); - runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator); + mcpProxyService = mockMcpProxyService(); + runner = new HealthProbeRunner(instanceRepo, serverRepo, orchestrator, undefined, mcpProxyService); }); - it('skips instances without healthCheck config', async () => { + it('applies default liveness probe when server has no healthCheck config', async () => { const instance = makeInstance(); const server = makeServer({ healthCheck: null }); @@ -109,8 +120,67 @@ describe('HealthProbeRunner', () => { await runner.tick(); + // No exec fallback — liveness goes through mcpProxyService expect(orchestrator.execInContainer).not.toHaveBeenCalled(); - expect(instanceRepo.updateStatus).not.toHaveBeenCalled(); + expect(mcpProxyService.execute).toHaveBeenCalledWith({ serverId: 'srv-1', method: 'tools/list' }); + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ healthStatus: 'healthy' }), + ); + }); + + it('default liveness probe marks unhealthy when tools/list returns JSON-RPC error', async () => { + const instance = makeInstance(); + const server = makeServer({ + healthCheck: { intervalSeconds: 0, failureThreshold: 1 } as unknown as McpServer['healthCheck'], + }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(mcpProxyService.execute).mockResolvedValue({ + jsonrpc: '2.0', + id: 1, + error: { code: -32603, message: 'Cannot connect to upstream' }, + }); + + await runner.tick(); + + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ + healthStatus: 'unhealthy', + events: expect.arrayContaining([ + expect.objectContaining({ type: 'Warning', message: expect.stringContaining('Cannot connect to upstream') }), + ]), + }), + ); + }); + + it('default liveness probe marks unhealthy when mcpProxyService throws', async () => { + const instance = makeInstance(); + const server = makeServer({ + healthCheck: { intervalSeconds: 0, failureThreshold: 1 } as unknown as McpServer['healthCheck'], + }); + + vi.mocked(instanceRepo.findAll).mockResolvedValue([instance]); + vi.mocked(serverRepo.findById).mockResolvedValue(server); + vi.mocked(mcpProxyService.execute).mockRejectedValue(new Error('no running instance')); + + await runner.tick(); + + expect(instanceRepo.updateStatus).toHaveBeenCalledWith( + 'inst-1', + 'RUNNING', + expect.objectContaining({ healthStatus: 'unhealthy' }), + ); + }); + + it('DEFAULT_HEALTH_CHECK has no tool set so it acts as liveness', () => { + expect(DEFAULT_HEALTH_CHECK.tool).toBeUndefined(); + expect(DEFAULT_HEALTH_CHECK.intervalSeconds).toBe(30); + expect(DEFAULT_HEALTH_CHECK.failureThreshold).toBe(3); }); it('skips non-RUNNING instances', async () => { diff --git a/src/mcplocal/src/discovery.ts b/src/mcplocal/src/discovery.ts index 7043f25..2eb99b1 100644 --- a/src/mcplocal/src/discovery.ts +++ b/src/mcplocal/src/discovery.ts @@ -1,4 +1,5 @@ import type { McpdClient } from './http/mcpd-client.js'; +import { DISCOVERY_TIMEOUT_MS } from './http/mcpd-client.js'; import type { McpRouter } from './router.js'; import { McpdUpstream } from './upstream/mcpd.js'; @@ -96,6 +97,10 @@ export async function fetchProjectLlmConfig( function syncUpstreams(router: McpRouter, mcpdClient: McpdClient, servers: McpdServer[]): string[] { const registered: string[] = []; + // Discovery-class calls (`*\/list`) go through a short-timeout client so a single + // unreachable upstream cannot stall session init for the full tool-call window. + const discoveryClient = mcpdClient.withTimeout(DISCOVERY_TIMEOUT_MS); + // Remove stale upstreams const currentNames = new Set(router.getUpstreamNames()); const serverNames = new Set(servers.map((s) => s.name)); @@ -108,7 +113,7 @@ function syncUpstreams(router: McpRouter, mcpdClient: McpdClient, servers: McpdS // Add/update upstreams for each server for (const server of servers) { if (!currentNames.has(server.name)) { - const upstream = new McpdUpstream(server.id, server.name, mcpdClient, server.description); + const upstream = new McpdUpstream(server.id, server.name, mcpdClient, server.description, discoveryClient); router.addUpstream(upstream); } registered.push(server.name); diff --git a/src/mcplocal/src/http/mcpd-client.ts b/src/mcplocal/src/http/mcpd-client.ts index 2b206a2..b4541a3 100644 --- a/src/mcplocal/src/http/mcpd-client.ts +++ b/src/mcplocal/src/http/mcpd-client.ts @@ -21,7 +21,14 @@ export class ConnectionError extends Error { } /** Default timeout for mcpd requests (ms). Prevents indefinite hangs on slow upstream tool calls. */ -const DEFAULT_TIMEOUT_MS = 30_000; +export const DEFAULT_TIMEOUT_MS = 30_000; + +/** + * Discovery-class operations (tools/list, resources/list, prompts/list) should not share + * the full tool-call timeout budget — a single dead upstream would stall session init for + * the entire window. Override via `MCPLOCAL_DISCOVERY_TIMEOUT_MS`. + */ +export const DISCOVERY_TIMEOUT_MS = Number(process.env['MCPLOCAL_DISCOVERY_TIMEOUT_MS']) || 8_000; export class McpdClient { private readonly baseUrl: string; @@ -45,6 +52,14 @@ export class McpdClient { return new McpdClient(this.baseUrl, this.token, { ...this.extraHeaders, ...headers }, this.timeoutMs); } + /** + * Create a new client with a different per-request timeout. Used by mcplocal's + * discovery path to avoid sharing the slow tool-call budget. + */ + withTimeout(timeoutMs: number): McpdClient { + return new McpdClient(this.baseUrl, this.token, { ...this.extraHeaders }, timeoutMs); + } + async get(path: string): Promise { return this.request('GET', path); } diff --git a/src/mcplocal/src/router.ts b/src/mcplocal/src/router.ts index 53269c8..e5c80d0 100644 --- a/src/mcplocal/src/router.ts +++ b/src/mcplocal/src/router.ts @@ -18,6 +18,10 @@ export interface RouteContext { correlationId?: string; } +type ListCacheEntry = + | { kind: 'ok'; result: unknown; fetchedAt: number } + | { kind: 'err'; message: string; fetchedAt: number }; + /** * Routes MCP requests to the appropriate upstream server. * @@ -64,6 +68,13 @@ export class McpRouter { private plugin: ProxyModelPlugin | null = null; private pluginContexts = new Map(); + // Per-server discovery cache. Keyed `${serverName}:${method}`. Prevents every client + // `tools/list` from re-hitting slow/dead upstreams and absorbs negative results so one + // dead server only stalls the first POST, not every subsequent one. + private listCache = new Map(); + private readonly LIST_CACHE_POSITIVE_TTL_MS = 30_000; + private readonly LIST_CACHE_NEGATIVE_TTL_MS = 30_000; + /** Optional callback for traffic inspection — called after each upstream call completes. */ onUpstreamCall: ((info: { upstream: string; method?: string; request: unknown; response: unknown; durationMs: number; correlationId?: string }) => void) | null = null; @@ -202,6 +213,7 @@ export class McpRouter { addUpstream(connection: UpstreamConnection): void { this.upstreams.set(connection.name, connection); + this.invalidateListCache(connection.name); if (this.notificationHandler && connection.onNotification) { const serverName = connection.name; const handler = this.notificationHandler; @@ -219,6 +231,7 @@ export class McpRouter { removeUpstream(name: string): void { this.upstreams.delete(name); + this.invalidateListCache(name); for (const map of [this.toolToServer, this.resourceToServer, this.promptToServer]) { for (const [key, server] of map) { if (server === name) { @@ -228,6 +241,26 @@ export class McpRouter { } } + /** Drop all discovery-cache entries for a server (called on register / remove). */ + private invalidateListCache(serverName: string): void { + const prefix = `${serverName}:`; + for (const key of this.listCache.keys()) { + if (key.startsWith(prefix)) this.listCache.delete(key); + } + } + + private getListCacheEntry(serverName: string, method: string): ListCacheEntry | null { + const entry = this.listCache.get(`${serverName}:${method}`); + if (!entry) return null; + const ttl = entry.kind === 'ok' ? this.LIST_CACHE_POSITIVE_TTL_MS : this.LIST_CACHE_NEGATIVE_TTL_MS; + if (Date.now() - entry.fetchedAt >= ttl) return null; + return entry; + } + + private setListCacheEntry(serverName: string, method: string, entry: ListCacheEntry): void { + this.listCache.set(`${serverName}:${method}`, entry); + } + setNotificationHandler(handler: (notification: JsonRpcNotification) => void): void { this.notificationHandler = handler; // Wire to all existing upstreams @@ -248,14 +281,24 @@ export class McpRouter { /** * Discover tools from all upstreams by calling tools/list on each. + * Per-server results are cached (positive + negative) to absorb slow upstreams + * and prevent repeated 30s timeouts on every client `tools/list`. */ async discoverTools(correlationId?: string): Promise> { const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = []; + const started = Date.now(); + let cachedCount = 0; + let freshCount = 0; + const failed: string[] = []; // Discover tools from all servers in parallel so one slow server doesn't block the rest const entries = [...this.upstreams.entries()]; const results = await Promise.allSettled( entries.map(async ([serverName, upstream]) => { + const cached = this.getListCacheEntry(serverName, 'tools/list'); + if (cached) { + return { serverName, upstream, cached }; + } const req = { jsonrpc: '2.0' as const, id: `discover-tools-${serverName}`, @@ -279,11 +322,34 @@ export class McpRouter { console.warn(`[discoverTools] ${(result.reason as Error).message ?? 'unknown error'}`); continue; } - const { serverName, upstream, response } = result.value; + const { serverName, upstream } = result.value; - if (response.error) { - console.warn(`[discoverTools] ${serverName}: ${(response.error as { message?: string }).message ?? 'unknown error'}`); - } else if (response.result && typeof response.result === 'object' && 'tools' in response.result) { + let response: JsonRpcResponse | null = null; + if ('cached' in result.value) { + const cached = result.value.cached; + if (cached.kind === 'err') { + cachedCount++; + failed.push(serverName); + continue; + } + response = { jsonrpc: '2.0', id: `cached-${serverName}`, result: cached.result }; + cachedCount++; + } else { + response = result.value.response; + freshCount++; + if (response.error) { + const message = (response.error as { message?: string }).message ?? 'unknown error'; + this.setListCacheEntry(serverName, 'tools/list', { kind: 'err', message, fetchedAt: Date.now() }); + console.warn(`[discoverTools] ${serverName}: ${message}`); + failed.push(serverName); + continue; + } + if (response.result !== undefined) { + this.setListCacheEntry(serverName, 'tools/list', { kind: 'ok', result: response.result, fetchedAt: Date.now() }); + } + } + + if (response.result && typeof response.result === 'object' && 'tools' in response.result) { const tools = (response.result as { tools: Array<{ name: string; description?: string; inputSchema?: unknown }> }).tools; for (const tool of tools) { const namespacedName = `${serverName}/${tool.name}`; @@ -304,11 +370,19 @@ export class McpRouter { } } + if (entries.length > 0) { + const elapsed = Date.now() - started; + const project = this.projectName ? ` project=${this.projectName}` : ''; + const failedStr = failed.length > 0 ? ` failed=[${failed.join(',')}]` : ''; + console.info(`[discoverTools]${project} fresh=${freshCount} cached=${cachedCount}${failedStr} elapsed=${elapsed}ms`); + } + return allTools; } /** * Discover resources from all upstreams by calling resources/list on each. + * Shares the per-server list cache with `discoverTools`. */ async discoverResources(correlationId?: string): Promise> { const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = []; @@ -317,6 +391,8 @@ export class McpRouter { const entries = [...this.upstreams.entries()]; const results = await Promise.allSettled( entries.map(async ([serverName, upstream]) => { + const cached = this.getListCacheEntry(serverName, 'resources/list'); + if (cached) return { serverName, cached }; const req = { jsonrpc: '2.0' as const, id: `discover-resources-${serverName}`, @@ -337,7 +413,24 @@ export class McpRouter { for (const result of results) { if (result.status === 'rejected') continue; - const { serverName, response } = result.value; + const { serverName } = result.value; + + let response: JsonRpcResponse | null = null; + if ('cached' in result.value) { + const cached = result.value.cached; + if (cached.kind === 'err') continue; + response = { jsonrpc: '2.0', id: `cached-${serverName}`, result: cached.result }; + } else { + response = result.value.response; + if (response.error) { + const message = (response.error as { message?: string }).message ?? 'unknown error'; + this.setListCacheEntry(serverName, 'resources/list', { kind: 'err', message, fetchedAt: Date.now() }); + continue; + } + if (response.result !== undefined) { + this.setListCacheEntry(serverName, 'resources/list', { kind: 'ok', result: response.result, fetchedAt: Date.now() }); + } + } if (response.result && typeof response.result === 'object' && 'resources' in response.result) { const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources; diff --git a/src/mcplocal/src/upstream/mcpd.ts b/src/mcplocal/src/upstream/mcpd.ts index e17b732..bc8faf7 100644 --- a/src/mcplocal/src/upstream/mcpd.ts +++ b/src/mcplocal/src/upstream/mcpd.ts @@ -12,6 +12,9 @@ interface McpdProxyResponse { error?: { code: number; message: string; data?: unknown }; } +/** Discovery-class methods routed through the short-timeout client when one is provided. */ +const LIST_METHOD_SUFFIX = '/list'; + /** * An upstream that routes MCP requests through mcpd's /api/v1/mcp/proxy endpoint. * mcpd holds the credentials and manages the actual MCP server connections. @@ -26,6 +29,8 @@ export class McpdUpstream implements UpstreamConnection { serverName: string, private mcpdClient: McpdClient, serverDescription?: string, + /** Short-timeout client used for `*\/list` methods; falls back to mcpdClient when absent. */ + private discoveryClient?: McpdClient, ) { this.name = serverName; if (serverDescription !== undefined) this.description = serverDescription; @@ -46,8 +51,12 @@ export class McpdUpstream implements UpstreamConnection { params: request.params, }; + const client = request.method.endsWith(LIST_METHOD_SUFFIX) && this.discoveryClient + ? this.discoveryClient + : this.mcpdClient; + try { - const result = await this.mcpdClient.post('/api/v1/mcp/proxy', proxyRequest); + const result = await client.post('/api/v1/mcp/proxy', proxyRequest); if (result.error) { return { jsonrpc: '2.0', id: request.id, error: result.error }; } diff --git a/src/mcplocal/tests/discovery.test.ts b/src/mcplocal/tests/discovery.test.ts index 873e0f7..1c010aa 100644 --- a/src/mcplocal/tests/discovery.test.ts +++ b/src/mcplocal/tests/discovery.test.ts @@ -3,7 +3,7 @@ import { refreshUpstreams } from '../src/discovery.js'; import { McpRouter } from '../src/router.js'; function mockMcpdClient(servers: Array<{ id: string; name: string; transport: string }>) { - return { + const client = { baseUrl: 'http://test:3100', token: 'test-token', get: vi.fn(async () => servers), @@ -11,7 +11,10 @@ function mockMcpdClient(servers: Array<{ id: string; name: string; transport: st put: vi.fn(), delete: vi.fn(), forward: vi.fn(), + withTimeout: vi.fn(() => client), + withHeaders: vi.fn(() => client), }; + return client; } describe('refreshUpstreams', () => { diff --git a/src/mcplocal/tests/mcpd-upstream.test.ts b/src/mcplocal/tests/mcpd-upstream.test.ts index ddd668d..9d19255 100644 --- a/src/mcplocal/tests/mcpd-upstream.test.ts +++ b/src/mcplocal/tests/mcpd-upstream.test.ts @@ -107,4 +107,38 @@ describe('McpdUpstream', () => { const response = await upstream.send(request); expect(response.error).toEqual({ code: -32601, message: 'Tool not found' }); }); + + it('routes */list methods through discoveryClient when provided', async () => { + const mainClient = mockMcpdClient(); + const discoveryClient = mockMcpdClient(new Map([ + ['srv-1:tools/list', { result: { tools: [] } }], + ['srv-1:resources/list', { result: { resources: [] } }], + ['srv-1:prompts/list', { result: { prompts: [] } }], + ])); + + const upstream = new McpdUpstream('srv-1', 'slack', mainClient as any, undefined, discoveryClient as any); + + await upstream.send({ jsonrpc: '2.0', id: '1', method: 'tools/list' }); + await upstream.send({ jsonrpc: '2.0', id: '2', method: 'resources/list' }); + await upstream.send({ jsonrpc: '2.0', id: '3', method: 'prompts/list' }); + + expect(discoveryClient.post).toHaveBeenCalledTimes(3); + expect(mainClient.post).not.toHaveBeenCalled(); + }); + + it('routes tools/call through mainClient even when discoveryClient is set', async () => { + const mainClient = mockMcpdClient(new Map([ + ['srv-1:tools/call', { result: { ok: true } }], + ])); + const discoveryClient = mockMcpdClient(); + + const upstream = new McpdUpstream('srv-1', 'slack', mainClient as any, undefined, discoveryClient as any); + await upstream.send({ + jsonrpc: '2.0', id: '1', method: 'tools/call', + params: { name: 'noop', arguments: {} }, + }); + + expect(mainClient.post).toHaveBeenCalledTimes(1); + expect(discoveryClient.post).not.toHaveBeenCalled(); + }); }); diff --git a/src/mcplocal/tests/project-discovery.test.ts b/src/mcplocal/tests/project-discovery.test.ts index 68a6aee..a9b506c 100644 --- a/src/mcplocal/tests/project-discovery.test.ts +++ b/src/mcplocal/tests/project-discovery.test.ts @@ -3,7 +3,7 @@ import { refreshProjectUpstreams } from '../src/discovery.js'; import { McpRouter } from '../src/router.js'; function mockMcpdClient(servers: Array<{ id: string; name: string; transport: string }>) { - return { + const client = { baseUrl: 'http://test:3100', token: 'test-token', get: vi.fn(async () => servers), @@ -11,7 +11,10 @@ function mockMcpdClient(servers: Array<{ id: string; name: string; transport: st put: vi.fn(), delete: vi.fn(), forward: vi.fn(async () => ({ status: 200, body: servers })), + withTimeout: vi.fn(() => client), + withHeaders: vi.fn(() => client), }; + return client; } describe('refreshProjectUpstreams', () => { diff --git a/src/mcplocal/tests/router-discovery-cache.test.ts b/src/mcplocal/tests/router-discovery-cache.test.ts new file mode 100644 index 0000000..9feb465 --- /dev/null +++ b/src/mcplocal/tests/router-discovery-cache.test.ts @@ -0,0 +1,137 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { McpRouter } from '../src/router.js'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js'; + +function mockUpstream(name: string, opts: { tools?: Array<{ name: string }>; resources?: Array<{ uri: string }>; err?: string } = {}): UpstreamConnection { + return { + name, + isAlive: () => true, + close: async () => {}, + send: vi.fn(async (req: JsonRpcRequest): Promise => { + if (opts.err) { + return { jsonrpc: '2.0', id: req.id, error: { code: -32603, message: opts.err } }; + } + if (req.method === 'tools/list') { + return { jsonrpc: '2.0', id: req.id, result: { tools: opts.tools ?? [] } }; + } + if (req.method === 'resources/list') { + return { jsonrpc: '2.0', id: req.id, result: { resources: opts.resources ?? [] } }; + } + return { jsonrpc: '2.0', id: req.id, error: { code: -32601, message: 'not handled' } }; + }), + } as UpstreamConnection; +} + +describe('McpRouter discovery cache', () => { + let router: McpRouter; + + beforeEach(() => { + router = new McpRouter(); + vi.useFakeTimers(); + vi.setSystemTime(new Date('2026-04-15T12:00:00Z')); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('serves tools/list from cache on the second call within TTL', async () => { + const upstream = mockUpstream('slack', { tools: [{ name: 'search' }] }); + router.addUpstream(upstream); + + await router.discoverTools(); + await router.discoverTools(); + + expect(upstream.send).toHaveBeenCalledTimes(1); + }); + + it('re-fetches after positive TTL expires', async () => { + const upstream = mockUpstream('slack', { tools: [{ name: 'search' }] }); + router.addUpstream(upstream); + + await router.discoverTools(); + vi.advanceTimersByTime(31_000); + await router.discoverTools(); + + expect(upstream.send).toHaveBeenCalledTimes(2); + }); + + it('negative cache prevents repeated calls to a failing upstream', async () => { + const upstream = mockUpstream('broken', { err: 'mcpd proxy error: timeout' }); + router.addUpstream(upstream); + + await router.discoverTools(); + await router.discoverTools(); + await router.discoverTools(); + + expect(upstream.send).toHaveBeenCalledTimes(1); + }); + + it('negative cache expires after negative TTL', async () => { + const upstream = mockUpstream('broken', { err: 'mcpd proxy error: timeout' }); + router.addUpstream(upstream); + + await router.discoverTools(); + vi.advanceTimersByTime(31_000); + await router.discoverTools(); + + expect(upstream.send).toHaveBeenCalledTimes(2); + }); + + it('re-registering a server invalidates its cache entry', async () => { + const upstream1 = mockUpstream('slack', { tools: [{ name: 'v1' }] }); + router.addUpstream(upstream1); + await router.discoverTools(); + expect(upstream1.send).toHaveBeenCalledTimes(1); + + const upstream2 = mockUpstream('slack', { tools: [{ name: 'v2' }] }); + router.addUpstream(upstream2); + const tools = await router.discoverTools(); + + expect(upstream2.send).toHaveBeenCalledTimes(1); + expect(tools.map((t) => t.name)).toEqual(['slack/v2']); + }); + + it('removeUpstream clears cache so follow-up add re-fetches', async () => { + const upstream1 = mockUpstream('slack', { tools: [{ name: 'v1' }] }); + router.addUpstream(upstream1); + await router.discoverTools(); + + router.removeUpstream('slack'); + + const upstream2 = mockUpstream('slack', { tools: [{ name: 'v2' }] }); + router.addUpstream(upstream2); + await router.discoverTools(); + + expect(upstream2.send).toHaveBeenCalledTimes(1); + }); + + it('one dead server does not block cached results for others', async () => { + const broken = mockUpstream('broken', { err: 'timeout' }); + const healthy = mockUpstream('healthy', { tools: [{ name: 'ping' }] }); + router.addUpstream(broken); + router.addUpstream(healthy); + + const first = await router.discoverTools(); + expect(first.map((t) => t.name)).toEqual(['healthy/ping']); + + // Second call: both come from cache. + const second = await router.discoverTools(); + expect(second.map((t) => t.name)).toEqual(['healthy/ping']); + expect(broken.send).toHaveBeenCalledTimes(1); + expect(healthy.send).toHaveBeenCalledTimes(1); + }); + + it('discoverResources uses its own cache key independent of tools/list', async () => { + const upstream = mockUpstream('docs', { tools: [{ name: 'search' }], resources: [{ uri: 'doc://1' }] }); + router.addUpstream(upstream); + + await router.discoverTools(); + await router.discoverResources(); + await router.discoverTools(); + await router.discoverResources(); + + // Each method cached separately → exactly one call per method. + expect(upstream.send).toHaveBeenCalledTimes(2); + }); +});