Intercepts oversized tool responses (>80K chars), caches them, and returns a page index. LLM can fetch specific pages via _resultId/_page params. Supports LLM-generated smart summaries with simple fallback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
542 lines
18 KiB
TypeScript
542 lines
18 KiB
TypeScript
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification } from './types.js';
|
|
import type { LlmProcessor } from './llm/processor.js';
|
|
import { ResponsePaginator } from './llm/pagination.js';
|
|
import type { McpdClient } from './http/mcpd-client.js';
|
|
|
|
export interface RouteContext {
|
|
sessionId?: string;
|
|
}
|
|
|
|
/**
|
|
* Routes MCP requests to the appropriate upstream server.
|
|
*
|
|
* The proxy presents a unified MCP interface to clients. Tools, resources,
|
|
* and prompts from all upstreams are merged under namespaced prefixes
|
|
* (e.g., "slack/send_message").
|
|
*
|
|
* Routing is done by name prefix: "servername/toolname" -> upstream "servername".
|
|
*/
|
|
export class McpRouter {
|
|
private upstreams = new Map<string, UpstreamConnection>();
|
|
private toolToServer = new Map<string, string>();
|
|
private resourceToServer = new Map<string, string>();
|
|
private promptToServer = new Map<string, string>();
|
|
private notificationHandler: ((notification: JsonRpcNotification) => void) | null = null;
|
|
private llmProcessor: LlmProcessor | null = null;
|
|
private instructions: string | null = null;
|
|
private mcpdClient: McpdClient | null = null;
|
|
private projectName: string | null = null;
|
|
private mcpctlResourceContents = new Map<string, string>();
|
|
private paginator: ResponsePaginator | null = null;
|
|
|
|
setPaginator(paginator: ResponsePaginator): void {
|
|
this.paginator = paginator;
|
|
}
|
|
|
|
setLlmProcessor(processor: LlmProcessor): void {
|
|
this.llmProcessor = processor;
|
|
}
|
|
|
|
setInstructions(instructions: string): void {
|
|
this.instructions = instructions;
|
|
}
|
|
|
|
setPromptConfig(mcpdClient: McpdClient, projectName: string): void {
|
|
this.mcpdClient = mcpdClient;
|
|
this.projectName = projectName;
|
|
}
|
|
|
|
addUpstream(connection: UpstreamConnection): void {
|
|
this.upstreams.set(connection.name, connection);
|
|
if (this.notificationHandler && connection.onNotification) {
|
|
const serverName = connection.name;
|
|
const handler = this.notificationHandler;
|
|
connection.onNotification((notification) => {
|
|
handler({
|
|
...notification,
|
|
params: {
|
|
...notification.params,
|
|
_source: serverName,
|
|
},
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
removeUpstream(name: string): void {
|
|
this.upstreams.delete(name);
|
|
for (const map of [this.toolToServer, this.resourceToServer, this.promptToServer]) {
|
|
for (const [key, server] of map) {
|
|
if (server === name) {
|
|
map.delete(key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
setNotificationHandler(handler: (notification: JsonRpcNotification) => void): void {
|
|
this.notificationHandler = handler;
|
|
// Wire to all existing upstreams
|
|
for (const [serverName, upstream] of this.upstreams) {
|
|
if (upstream.onNotification) {
|
|
upstream.onNotification((notification) => {
|
|
handler({
|
|
...notification,
|
|
params: {
|
|
...notification.params,
|
|
_source: serverName,
|
|
},
|
|
});
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Discover tools from all upstreams by calling tools/list on each.
|
|
*/
|
|
async discoverTools(): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
|
|
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
|
|
|
|
for (const [serverName, upstream] of this.upstreams) {
|
|
try {
|
|
const response = await upstream.send({
|
|
jsonrpc: '2.0',
|
|
id: `discover-tools-${serverName}`,
|
|
method: 'tools/list',
|
|
});
|
|
|
|
if (response.result && typeof response.result === 'object' && 'tools' in response.result) {
|
|
const tools = (response.result as { tools: Array<{ name: string; description?: string; inputSchema?: unknown }> }).tools;
|
|
for (const tool of tools) {
|
|
const namespacedName = `${serverName}/${tool.name}`;
|
|
this.toolToServer.set(namespacedName, serverName);
|
|
// Enrich description with server context if available
|
|
const entry: { name: string; description?: string; inputSchema?: unknown } = {
|
|
...tool,
|
|
name: namespacedName,
|
|
};
|
|
if (upstream.description && tool.description) {
|
|
entry.description = `[${upstream.description}] ${tool.description}`;
|
|
} else if (upstream.description) {
|
|
entry.description = `[${upstream.description}]`;
|
|
}
|
|
// If neither upstream.description nor tool.description, keep tool.description (may be undefined — that's fine, just don't set it)
|
|
allTools.push(entry);
|
|
}
|
|
}
|
|
} catch {
|
|
// Server may be unavailable; skip its tools
|
|
}
|
|
}
|
|
|
|
return allTools;
|
|
}
|
|
|
|
/**
|
|
* Discover resources from all upstreams by calling resources/list on each.
|
|
*/
|
|
async discoverResources(): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> {
|
|
const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = [];
|
|
|
|
for (const [serverName, upstream] of this.upstreams) {
|
|
try {
|
|
const response = await upstream.send({
|
|
jsonrpc: '2.0',
|
|
id: `discover-resources-${serverName}`,
|
|
method: 'resources/list',
|
|
});
|
|
|
|
if (response.result && typeof response.result === 'object' && 'resources' in response.result) {
|
|
const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources;
|
|
for (const resource of resources) {
|
|
const namespacedUri = `${serverName}://${resource.uri}`;
|
|
this.resourceToServer.set(namespacedUri, serverName);
|
|
allResources.push({
|
|
...resource,
|
|
uri: namespacedUri,
|
|
});
|
|
}
|
|
}
|
|
} catch {
|
|
// Server may be unavailable; skip its resources
|
|
}
|
|
}
|
|
|
|
return allResources;
|
|
}
|
|
|
|
/**
|
|
* Discover prompts from all upstreams by calling prompts/list on each.
|
|
*/
|
|
async discoverPrompts(): Promise<Array<{ name: string; description?: string; arguments?: unknown[] }>> {
|
|
const allPrompts: Array<{ name: string; description?: string; arguments?: unknown[] }> = [];
|
|
|
|
for (const [serverName, upstream] of this.upstreams) {
|
|
try {
|
|
const response = await upstream.send({
|
|
jsonrpc: '2.0',
|
|
id: `discover-prompts-${serverName}`,
|
|
method: 'prompts/list',
|
|
});
|
|
|
|
if (response.result && typeof response.result === 'object' && 'prompts' in response.result) {
|
|
const prompts = (response.result as { prompts: Array<{ name: string; description?: string; arguments?: unknown[] }> }).prompts;
|
|
for (const prompt of prompts) {
|
|
const namespacedName = `${serverName}/${prompt.name}`;
|
|
this.promptToServer.set(namespacedName, serverName);
|
|
allPrompts.push({
|
|
...prompt,
|
|
name: namespacedName,
|
|
});
|
|
}
|
|
}
|
|
} catch {
|
|
// Server may be unavailable; skip its prompts
|
|
}
|
|
}
|
|
|
|
return allPrompts;
|
|
}
|
|
|
|
/**
|
|
* Route a namespaced call to the correct upstream, stripping the namespace prefix.
|
|
*/
|
|
private async routeNamespacedCall(
|
|
request: JsonRpcRequest,
|
|
nameField: string,
|
|
routingMap: Map<string, string>,
|
|
): Promise<JsonRpcResponse> {
|
|
const params = request.params as Record<string, unknown> | undefined;
|
|
const name = params?.[nameField] as string | undefined;
|
|
if (!name) {
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: { code: -32602, message: `Missing ${nameField} in params` },
|
|
};
|
|
}
|
|
|
|
const serverName = routingMap.get(name);
|
|
if (!serverName) {
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: { code: -32601, message: `Unknown ${nameField}: ${name}` },
|
|
};
|
|
}
|
|
|
|
const upstream = this.upstreams.get(serverName);
|
|
if (!upstream || !upstream.isAlive()) {
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: { code: -32603, message: `Upstream '${serverName}' is not available` },
|
|
};
|
|
}
|
|
|
|
// Strip the namespace prefix
|
|
const originalName = nameField === 'uri'
|
|
? name.slice(`${serverName}://`.length)
|
|
: name.slice(serverName.length + 1);
|
|
|
|
const upstreamRequest: JsonRpcRequest = {
|
|
...request,
|
|
params: {
|
|
...params,
|
|
[nameField]: originalName,
|
|
},
|
|
};
|
|
|
|
return upstream.send(upstreamRequest);
|
|
}
|
|
|
|
/**
|
|
* Route a generic request. Handles protocol-level methods locally,
|
|
* delegates tool/resource/prompt calls to upstreams.
|
|
*/
|
|
async route(request: JsonRpcRequest, context?: RouteContext): Promise<JsonRpcResponse> {
|
|
switch (request.method) {
|
|
case 'initialize':
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: {
|
|
protocolVersion: '2024-11-05',
|
|
serverInfo: {
|
|
name: 'mcpctl-proxy',
|
|
version: '0.1.0',
|
|
},
|
|
capabilities: {
|
|
tools: {},
|
|
resources: {},
|
|
prompts: {},
|
|
},
|
|
...(this.instructions ? { instructions: this.instructions } : {}),
|
|
},
|
|
};
|
|
|
|
case 'tools/list': {
|
|
const tools = await this.discoverTools();
|
|
// Append propose_prompt tool if prompt config is set
|
|
if (this.mcpdClient && this.projectName) {
|
|
tools.push({
|
|
name: 'propose_prompt',
|
|
description: 'Propose a new prompt for this project. Creates a pending request that must be approved by a user before becoming active.',
|
|
inputSchema: {
|
|
type: 'object',
|
|
properties: {
|
|
name: { type: 'string', description: 'Prompt name (lowercase alphanumeric with hyphens, e.g. "debug-guide")' },
|
|
content: { type: 'string', description: 'Prompt content text' },
|
|
},
|
|
required: ['name', 'content'],
|
|
},
|
|
});
|
|
}
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: { tools },
|
|
};
|
|
}
|
|
|
|
case 'tools/call':
|
|
return this.routeToolCall(request, context);
|
|
|
|
case 'resources/list': {
|
|
const resources = await this.discoverResources();
|
|
// Append mcpctl prompt resources
|
|
if (this.mcpdClient && this.projectName) {
|
|
try {
|
|
const sessionParam = context?.sessionId ? `?session=${encodeURIComponent(context.sessionId)}` : '';
|
|
const visible = await this.mcpdClient.get<Array<{ name: string; content: string; type: string }>>(
|
|
`/api/v1/projects/${encodeURIComponent(this.projectName)}/prompts/visible${sessionParam}`,
|
|
);
|
|
this.mcpctlResourceContents.clear();
|
|
for (const p of visible) {
|
|
const uri = `mcpctl://prompts/${p.name}`;
|
|
resources.push({
|
|
uri,
|
|
name: p.name,
|
|
description: p.type === 'promptrequest' ? `[Pending proposal] ${p.name}` : `[Approved prompt] ${p.name}`,
|
|
mimeType: 'text/plain',
|
|
});
|
|
this.mcpctlResourceContents.set(uri, p.content);
|
|
}
|
|
} catch {
|
|
// Prompt resources are optional — don't fail discovery
|
|
}
|
|
}
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: { resources },
|
|
};
|
|
}
|
|
|
|
case 'resources/read': {
|
|
const params = request.params as Record<string, unknown> | undefined;
|
|
const uri = params?.['uri'] as string | undefined;
|
|
if (uri?.startsWith('mcpctl://')) {
|
|
const content = this.mcpctlResourceContents.get(uri);
|
|
if (content !== undefined) {
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: {
|
|
contents: [{ uri, mimeType: 'text/plain', text: content }],
|
|
},
|
|
};
|
|
}
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: { code: -32602, message: `Resource not found: ${uri}` },
|
|
};
|
|
}
|
|
return this.routeNamespacedCall(request, 'uri', this.resourceToServer);
|
|
}
|
|
|
|
case 'resources/subscribe':
|
|
case 'resources/unsubscribe':
|
|
return this.routeNamespacedCall(request, 'uri', this.resourceToServer);
|
|
|
|
case 'prompts/list': {
|
|
const prompts = await this.discoverPrompts();
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: { prompts },
|
|
};
|
|
}
|
|
|
|
case 'prompts/get':
|
|
return this.routeNamespacedCall(request, 'name', this.promptToServer);
|
|
|
|
// Handle MCP notifications (no response expected, but return empty result if called as request)
|
|
case 'notifications/initialized':
|
|
case 'notifications/cancelled':
|
|
case 'notifications/progress':
|
|
case 'notifications/roots/list_changed':
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: {},
|
|
};
|
|
|
|
default:
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: { code: -32601, message: `Method not found: ${request.method}` },
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Route a tools/call request, optionally applying LLM pre/post-processing.
|
|
*/
|
|
private async routeToolCall(request: JsonRpcRequest, context?: RouteContext): Promise<JsonRpcResponse> {
|
|
const params = request.params as Record<string, unknown> | undefined;
|
|
const toolName = params?.['name'] as string | undefined;
|
|
|
|
// Handle built-in propose_prompt tool
|
|
if (toolName === 'propose_prompt') {
|
|
return this.handleProposePrompt(request, context);
|
|
}
|
|
|
|
// Intercept pagination page requests before routing to upstream
|
|
const toolArgs = (params?.['arguments'] ?? {}) as Record<string, unknown>;
|
|
if (this.paginator) {
|
|
const paginationReq = ResponsePaginator.extractPaginationParams(toolArgs);
|
|
if (paginationReq) {
|
|
const pageResult = this.paginator.getPage(paginationReq.resultId, paginationReq.page);
|
|
if (pageResult) {
|
|
return { jsonrpc: '2.0', id: request.id, result: pageResult };
|
|
}
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: {
|
|
content: [{
|
|
type: 'text',
|
|
text: 'Cached result not found (expired or invalid _resultId). Please re-call the tool without _resultId/_page to get a fresh result.',
|
|
}],
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
// If no processor or tool shouldn't be processed, route directly
|
|
if (!this.llmProcessor || !toolName || !this.llmProcessor.shouldProcess('tools/call', toolName)) {
|
|
const response = await this.routeNamespacedCall(request, 'name', this.toolToServer);
|
|
return this.maybePaginate(toolName, response);
|
|
}
|
|
|
|
// Preprocess request params
|
|
const processed = await this.llmProcessor.preprocessRequest(toolName, toolArgs);
|
|
const processedRequest: JsonRpcRequest = processed.optimized
|
|
? { ...request, params: { ...params, arguments: processed.params } }
|
|
: request;
|
|
|
|
// Route to upstream
|
|
const response = await this.routeNamespacedCall(processedRequest, 'name', this.toolToServer);
|
|
|
|
// Paginate if response is large (skip LLM filtering for paginated responses)
|
|
const paginated = await this.maybePaginate(toolName, response);
|
|
if (paginated !== response) return paginated;
|
|
|
|
// Filter response
|
|
if (response.error) return response;
|
|
const filtered = await this.llmProcessor.filterResponse(toolName, response);
|
|
if (filtered.filtered) {
|
|
return { ...response, result: filtered.result };
|
|
}
|
|
return response;
|
|
}
|
|
|
|
/**
|
|
* If the response is large enough, paginate it and return the index instead.
|
|
*/
|
|
private async maybePaginate(toolName: string | undefined, response: JsonRpcResponse): Promise<JsonRpcResponse> {
|
|
if (!this.paginator || !toolName || response.error) return response;
|
|
|
|
const raw = JSON.stringify(response.result);
|
|
if (!this.paginator.shouldPaginate(raw)) return response;
|
|
|
|
const paginated = await this.paginator.paginate(toolName, raw);
|
|
if (!paginated) return response;
|
|
|
|
return { jsonrpc: '2.0', id: response.id, result: paginated };
|
|
}
|
|
|
|
private async handleProposePrompt(request: JsonRpcRequest, context?: RouteContext): Promise<JsonRpcResponse> {
|
|
if (!this.mcpdClient || !this.projectName) {
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: { code: -32603, message: 'Prompt config not set — propose_prompt unavailable' },
|
|
};
|
|
}
|
|
|
|
const params = request.params as Record<string, unknown> | undefined;
|
|
const args = (params?.['arguments'] ?? {}) as Record<string, unknown>;
|
|
const name = args['name'] as string | undefined;
|
|
const content = args['content'] as string | undefined;
|
|
|
|
if (!name || !content) {
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: { code: -32602, message: 'Missing required arguments: name and content' },
|
|
};
|
|
}
|
|
|
|
try {
|
|
const body: Record<string, unknown> = { name, content };
|
|
if (context?.sessionId) {
|
|
body['createdBySession'] = context.sessionId;
|
|
}
|
|
await this.mcpdClient.post(
|
|
`/api/v1/projects/${encodeURIComponent(this.projectName)}/promptrequests`,
|
|
body,
|
|
);
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
result: {
|
|
content: [
|
|
{
|
|
type: 'text',
|
|
text: `Prompt request "${name}" created successfully. It will be visible to you as a resource at mcpctl://prompts/${name}. A user must approve it before it becomes permanent.`,
|
|
},
|
|
],
|
|
},
|
|
};
|
|
} catch (err) {
|
|
return {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: {
|
|
code: -32603,
|
|
message: `Failed to propose prompt: ${err instanceof Error ? err.message : String(err)}`,
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
getUpstreamNames(): string[] {
|
|
return [...this.upstreams.keys()];
|
|
}
|
|
|
|
async closeAll(): Promise<void> {
|
|
for (const upstream of this.upstreams.values()) {
|
|
await upstream.close();
|
|
}
|
|
this.upstreams.clear();
|
|
this.toolToServer.clear();
|
|
this.resourceToServer.clear();
|
|
this.promptToServer.clear();
|
|
}
|
|
}
|