fix: correlate upstream discovery events to client requests in console
Some checks failed
CI/CD / lint (push) Successful in 4m0s
CI/CD / typecheck (push) Successful in 2m38s
CI/CD / test (push) Successful in 3m52s
CI/CD / build (push) Successful in 5m22s
CI/CD / publish-rpm (push) Failing after 1m7s
CI/CD / publish-deb (push) Successful in 39s
CI/CD / smoke (push) Successful in 8m25s

Fan-out discovery methods (tools/list, prompts/list, resources/list)
used synthetic request IDs that couldn't be looked up in the
correlation map. This caused upstream_response events to have no
correlationId, making the console unable to find upstream content
for replay ("No content to replay").

Fix: pass correlationId through RouteContext → discovery methods →
onUpstreamCall callback, so the handler can use it directly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Michal
2026-03-10 15:21:05 +00:00
parent 6e84631d59
commit 588b2a9e65
2 changed files with 20 additions and 16 deletions

View File

@@ -223,9 +223,9 @@ export function registerProjectMcpEndpoint(app: FastifyInstance, mcpdClient: Mcp
if (trafficCapture) { if (trafficCapture) {
router.onUpstreamCall = (info) => { router.onUpstreamCall = (info) => {
const sid = transport.sessionId ?? 'unknown'; const sid = transport.sessionId ?? 'unknown';
// Recover the correlationId from the upstream request's id (preserved from client request) // Prefer correlationId passed by router (fan-out discovery), fall back to request ID lookup
const reqId = (info.request as { id?: string | number }).id; const reqId = (info.request as { id?: string | number }).id;
const corrId = reqId != null ? requestCorrelations.get(reqId) : undefined; const corrId = info.correlationId ?? (reqId != null ? requestCorrelations.get(reqId) : undefined);
trafficCapture.emit({ trafficCapture.emit({
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
projectName, projectName,
@@ -269,7 +269,7 @@ export function registerProjectMcpEndpoint(app: FastifyInstance, mcpdClient: Mcp
correlationId, correlationId,
}); });
const ctx = transport.sessionId ? { sessionId: transport.sessionId } : undefined; const ctx = transport.sessionId ? { sessionId: transport.sessionId, correlationId } : { correlationId };
const response = await router.route(message as unknown as JsonRpcRequest, ctx); const response = await router.route(message as unknown as JsonRpcRequest, ctx);
// Forward queued notifications BEFORE the response — the response send // Forward queued notifications BEFORE the response — the response send

View File

@@ -14,6 +14,8 @@ import { pauseQueue } from './proxymodel/pause-queue.js';
export interface RouteContext { export interface RouteContext {
sessionId?: string; sessionId?: string;
/** Correlation ID for traffic inspection (links upstream calls to client request) */
correlationId?: string;
} }
/** /**
@@ -63,7 +65,7 @@ export class McpRouter {
private pluginContexts = new Map<string, PluginContextImpl>(); private pluginContexts = new Map<string, PluginContextImpl>();
/** Optional callback for traffic inspection — called after each upstream call completes. */ /** Optional callback for traffic inspection — called after each upstream call completes. */
onUpstreamCall: ((info: { upstream: string; method?: string; request: unknown; response: unknown; durationMs: number }) => void) | null = null; onUpstreamCall: ((info: { upstream: string; method?: string; request: unknown; response: unknown; durationMs: number; correlationId?: string }) => void) | null = null;
setPaginator(paginator: ResponsePaginator): void { setPaginator(paginator: ResponsePaginator): void {
this.paginator = paginator; this.paginator = paginator;
@@ -247,7 +249,7 @@ export class McpRouter {
/** /**
* Discover tools from all upstreams by calling tools/list on each. * Discover tools from all upstreams by calling tools/list on each.
*/ */
async discoverTools(): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> { async discoverTools(correlationId?: string): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = []; const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
for (const [serverName, upstream] of this.upstreams) { for (const [serverName, upstream] of this.upstreams) {
@@ -262,7 +264,7 @@ export class McpRouter {
const start = performance.now(); const start = performance.now();
response = await upstream.send(req); response = await upstream.send(req);
const durationMs = Math.round(performance.now() - start); const durationMs = Math.round(performance.now() - start);
this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs }); this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs, ...(correlationId ? { correlationId } : {}) });
} else { } else {
response = await upstream.send(req); response = await upstream.send(req);
} }
@@ -299,7 +301,7 @@ export class McpRouter {
/** /**
* Discover resources from all upstreams by calling resources/list on each. * Discover resources from all upstreams by calling resources/list on each.
*/ */
async discoverResources(): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> { async discoverResources(correlationId?: string): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> {
const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = []; const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = [];
for (const [serverName, upstream] of this.upstreams) { for (const [serverName, upstream] of this.upstreams) {
@@ -314,7 +316,7 @@ export class McpRouter {
const start = performance.now(); const start = performance.now();
response = await upstream.send(req); response = await upstream.send(req);
const durationMs = Math.round(performance.now() - start); const durationMs = Math.round(performance.now() - start);
this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs }); this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs, ...(correlationId ? { correlationId } : {}) });
} else { } else {
response = await upstream.send(req); response = await upstream.send(req);
} }
@@ -341,7 +343,7 @@ export class McpRouter {
/** /**
* Discover prompts from all upstreams by calling prompts/list on each. * Discover prompts from all upstreams by calling prompts/list on each.
*/ */
async discoverPrompts(): Promise<Array<{ name: string; description?: string; arguments?: unknown[] }>> { async discoverPrompts(correlationId?: string): Promise<Array<{ name: string; description?: string; arguments?: unknown[] }>> {
const allPrompts: Array<{ name: string; description?: string; arguments?: unknown[] }> = []; const allPrompts: Array<{ name: string; description?: string; arguments?: unknown[] }> = [];
for (const [serverName, upstream] of this.upstreams) { for (const [serverName, upstream] of this.upstreams) {
@@ -356,7 +358,7 @@ export class McpRouter {
const start = performance.now(); const start = performance.now();
response = await upstream.send(req); response = await upstream.send(req);
const durationMs = Math.round(performance.now() - start); const durationMs = Math.round(performance.now() - start);
this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs }); this.onUpstreamCall({ upstream: serverName, method: req.method, request: req, response, durationMs, ...(correlationId ? { correlationId } : {}) });
} else { } else {
response = await upstream.send(req); response = await upstream.send(req);
} }
@@ -483,7 +485,7 @@ export class McpRouter {
case 'tools/list': { case 'tools/list': {
if (this.plugin && context?.sessionId) { if (this.plugin && context?.sessionId) {
const ctx = await this.getOrCreatePluginContext(context.sessionId); const ctx = await this.getOrCreatePluginContext(context.sessionId);
let tools = await this.discoverTools(); let tools = await this.discoverTools(context?.correlationId);
if (this.plugin.onToolsList) { if (this.plugin.onToolsList) {
tools = await this.plugin.onToolsList(tools, ctx); tools = await this.plugin.onToolsList(tools, ctx);
@@ -493,7 +495,7 @@ export class McpRouter {
} }
// No plugin: return upstream tools only // No plugin: return upstream tools only
const tools = await this.discoverTools(); const tools = await this.discoverTools(context?.correlationId);
return { jsonrpc: '2.0', id: request.id, result: { tools } }; return { jsonrpc: '2.0', id: request.id, result: { tools } };
} }
@@ -503,12 +505,12 @@ export class McpRouter {
case 'resources/list': { case 'resources/list': {
if (this.plugin?.onResourcesList && context?.sessionId) { if (this.plugin?.onResourcesList && context?.sessionId) {
const ctx = await this.getOrCreatePluginContext(context.sessionId); const ctx = await this.getOrCreatePluginContext(context.sessionId);
const resources = await this.discoverResources(); const resources = await this.discoverResources(context?.correlationId);
const filtered = await this.plugin.onResourcesList(resources, ctx); const filtered = await this.plugin.onResourcesList(resources, ctx);
return { jsonrpc: '2.0', id: request.id, result: { resources: filtered } }; return { jsonrpc: '2.0', id: request.id, result: { resources: filtered } };
} }
const resources = await this.discoverResources(); const resources = await this.discoverResources(context?.correlationId);
// Append mcpctl prompt resources // Append mcpctl prompt resources
const mcpdResources: Array<{ uri: string; name: string; description: string; mimeType: string }> = []; const mcpdResources: Array<{ uri: string; name: string; description: string; mimeType: string }> = [];
if (this.mcpdClient && this.projectName) { if (this.mcpdClient && this.projectName) {
@@ -543,6 +545,7 @@ export class McpRouter {
request: { jsonrpc: '2.0', id: request.id, method: 'resources/list' }, request: { jsonrpc: '2.0', id: request.id, method: 'resources/list' },
response: mcpdResponse, response: mcpdResponse,
durationMs: 0, durationMs: 0,
...(context?.correlationId ? { correlationId: context.correlationId } : {}),
}); });
} }
return { return {
@@ -620,12 +623,12 @@ export class McpRouter {
case 'prompts/list': { case 'prompts/list': {
if (this.plugin?.onPromptsList && context?.sessionId) { if (this.plugin?.onPromptsList && context?.sessionId) {
const ctx = await this.getOrCreatePluginContext(context.sessionId); const ctx = await this.getOrCreatePluginContext(context.sessionId);
const upstreamPrompts = await this.discoverPrompts(); const upstreamPrompts = await this.discoverPrompts(context?.correlationId);
const filtered = await this.plugin.onPromptsList(upstreamPrompts, ctx); const filtered = await this.plugin.onPromptsList(upstreamPrompts, ctx);
return { jsonrpc: '2.0', id: request.id, result: { prompts: filtered } }; return { jsonrpc: '2.0', id: request.id, result: { prompts: filtered } };
} }
const upstreamPrompts = await this.discoverPrompts(); const upstreamPrompts = await this.discoverPrompts(context?.correlationId);
// Include mcpctl-managed prompts from mcpd alongside upstream prompts // Include mcpctl-managed prompts from mcpd alongside upstream prompts
const managedIndex = await this.fetchPromptIndex(); const managedIndex = await this.fetchPromptIndex();
const managedPrompts = managedIndex.map((p) => ({ const managedPrompts = managedIndex.map((p) => ({
@@ -641,6 +644,7 @@ export class McpRouter {
request: { jsonrpc: '2.0', id: request.id, method: 'prompts/list' }, request: { jsonrpc: '2.0', id: request.id, method: 'prompts/list' },
response: mcpdResponse, response: mcpdResponse,
durationMs: 0, durationMs: 0,
...(context?.correlationId ? { correlationId: context.correlationId } : {}),
}); });
} }
return { return {