From 588b2a9e65591f4abdaaf7019d38f996aac3aab1 Mon Sep 17 00:00:00 2001 From: Michal Date: Tue, 10 Mar 2026 15:21:05 +0000 Subject: [PATCH] fix: correlate upstream discovery events to client requests in console MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fan-out discovery methods (tools/list, prompts/list, resources/list) used synthetic request IDs that couldn't be looked up in the correlation map. This caused upstream_response events to have no correlationId, making the console unable to find upstream content for replay ("No content to replay"). Fix: pass correlationId through RouteContext → discovery methods → onUpstreamCall callback, so the handler can use it directly. Co-Authored-By: Claude Opus 4.6 --- src/mcplocal/src/http/project-mcp-endpoint.ts | 6 ++-- src/mcplocal/src/router.ts | 30 +++++++++++-------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/mcplocal/src/http/project-mcp-endpoint.ts b/src/mcplocal/src/http/project-mcp-endpoint.ts index 1a29b61..cb20d7c 100644 --- a/src/mcplocal/src/http/project-mcp-endpoint.ts +++ b/src/mcplocal/src/http/project-mcp-endpoint.ts @@ -223,9 +223,9 @@ export function registerProjectMcpEndpoint(app: FastifyInstance, mcpdClient: Mcp if (trafficCapture) { router.onUpstreamCall = (info) => { const sid = transport.sessionId ?? 'unknown'; - // Recover the correlationId from the upstream request's id (preserved from client request) + // Prefer correlationId passed by router (fan-out discovery), fall back to request ID lookup const reqId = (info.request as { id?: string | number }).id; - const corrId = reqId != null ? requestCorrelations.get(reqId) : undefined; + const corrId = info.correlationId ?? (reqId != null ? requestCorrelations.get(reqId) : undefined); trafficCapture.emit({ timestamp: new Date().toISOString(), projectName, @@ -269,7 +269,7 @@ export function registerProjectMcpEndpoint(app: FastifyInstance, mcpdClient: Mcp correlationId, }); - const ctx = transport.sessionId ? { sessionId: transport.sessionId } : undefined; + const ctx = transport.sessionId ? { sessionId: transport.sessionId, correlationId } : { correlationId }; const response = await router.route(message as unknown as JsonRpcRequest, ctx); // Forward queued notifications BEFORE the response — the response send diff --git a/src/mcplocal/src/router.ts b/src/mcplocal/src/router.ts index 64080bd..887dc84 100644 --- a/src/mcplocal/src/router.ts +++ b/src/mcplocal/src/router.ts @@ -14,6 +14,8 @@ import { pauseQueue } from './proxymodel/pause-queue.js'; export interface RouteContext { sessionId?: string; + /** Correlation ID for traffic inspection (links upstream calls to client request) */ + correlationId?: string; } /** @@ -63,7 +65,7 @@ export class McpRouter { private pluginContexts = new Map(); /** Optional callback for traffic inspection — called after each upstream call completes. */ - onUpstreamCall: ((info: { upstream: string; method?: string; request: unknown; response: unknown; durationMs: number }) => void) | null = null; + onUpstreamCall: ((info: { upstream: string; method?: string; request: unknown; response: unknown; durationMs: number; correlationId?: string }) => void) | null = null; setPaginator(paginator: ResponsePaginator): void { this.paginator = paginator; @@ -247,7 +249,7 @@ export class McpRouter { /** * Discover tools from all upstreams by calling tools/list on each. */ - async discoverTools(): Promise> { + async discoverTools(correlationId?: string): Promise> { const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = []; for (const [serverName, upstream] of this.upstreams) { @@ -262,7 +264,7 @@ export class McpRouter { const start = performance.now(); response = await upstream.send(req); const durationMs = Math.round(performance.now() - start); - this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs }); + this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs, ...(correlationId ? { correlationId } : {}) }); } else { response = await upstream.send(req); } @@ -299,7 +301,7 @@ export class McpRouter { /** * Discover resources from all upstreams by calling resources/list on each. */ - async discoverResources(): Promise> { + async discoverResources(correlationId?: string): Promise> { const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = []; for (const [serverName, upstream] of this.upstreams) { @@ -314,7 +316,7 @@ export class McpRouter { const start = performance.now(); response = await upstream.send(req); const durationMs = Math.round(performance.now() - start); - this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs }); + this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs, ...(correlationId ? { correlationId } : {}) }); } else { response = await upstream.send(req); } @@ -341,7 +343,7 @@ export class McpRouter { /** * Discover prompts from all upstreams by calling prompts/list on each. */ - async discoverPrompts(): Promise> { + async discoverPrompts(correlationId?: string): Promise> { const allPrompts: Array<{ name: string; description?: string; arguments?: unknown[] }> = []; for (const [serverName, upstream] of this.upstreams) { @@ -356,7 +358,7 @@ export class McpRouter { const start = performance.now(); response = await upstream.send(req); const durationMs = Math.round(performance.now() - start); - this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs }); + this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs, ...(correlationId ? { correlationId } : {}) }); } else { response = await upstream.send(req); } @@ -483,7 +485,7 @@ export class McpRouter { case 'tools/list': { if (this.plugin && context?.sessionId) { const ctx = await this.getOrCreatePluginContext(context.sessionId); - let tools = await this.discoverTools(); + let tools = await this.discoverTools(context?.correlationId); if (this.plugin.onToolsList) { tools = await this.plugin.onToolsList(tools, ctx); @@ -493,7 +495,7 @@ export class McpRouter { } // No plugin: return upstream tools only - const tools = await this.discoverTools(); + const tools = await this.discoverTools(context?.correlationId); return { jsonrpc: '2.0', id: request.id, result: { tools } }; } @@ -503,12 +505,12 @@ export class McpRouter { case 'resources/list': { if (this.plugin?.onResourcesList && context?.sessionId) { const ctx = await this.getOrCreatePluginContext(context.sessionId); - const resources = await this.discoverResources(); + const resources = await this.discoverResources(context?.correlationId); const filtered = await this.plugin.onResourcesList(resources, ctx); return { jsonrpc: '2.0', id: request.id, result: { resources: filtered } }; } - const resources = await this.discoverResources(); + const resources = await this.discoverResources(context?.correlationId); // Append mcpctl prompt resources const mcpdResources: Array<{ uri: string; name: string; description: string; mimeType: string }> = []; if (this.mcpdClient && this.projectName) { @@ -543,6 +545,7 @@ export class McpRouter { request: { jsonrpc: '2.0', id: request.id, method: 'resources/list' }, response: mcpdResponse, durationMs: 0, + ...(context?.correlationId ? { correlationId: context.correlationId } : {}), }); } return { @@ -620,12 +623,12 @@ export class McpRouter { case 'prompts/list': { if (this.plugin?.onPromptsList && context?.sessionId) { const ctx = await this.getOrCreatePluginContext(context.sessionId); - const upstreamPrompts = await this.discoverPrompts(); + const upstreamPrompts = await this.discoverPrompts(context?.correlationId); const filtered = await this.plugin.onPromptsList(upstreamPrompts, ctx); return { jsonrpc: '2.0', id: request.id, result: { prompts: filtered } }; } - const upstreamPrompts = await this.discoverPrompts(); + const upstreamPrompts = await this.discoverPrompts(context?.correlationId); // Include mcpctl-managed prompts from mcpd alongside upstream prompts const managedIndex = await this.fetchPromptIndex(); const managedPrompts = managedIndex.map((p) => ({ @@ -641,6 +644,7 @@ export class McpRouter { request: { jsonrpc: '2.0', id: request.id, method: 'prompts/list' }, response: mcpdResponse, durationMs: 0, + ...(context?.correlationId ? { correlationId: context.correlationId } : {}), }); } return {