From 09675f020f0569dc2f5e27cdfa05973e2ed81e92 Mon Sep 17 00:00:00 2001 From: Michal Date: Sat, 21 Feb 2026 05:05:41 +0000 Subject: [PATCH] feat: add proxy entry point, resource/prompt forwarding, notifications, and health monitoring Adds main.ts for config-driven proxy startup, extends router with resources/list, resources/read, prompts/list, prompts/get forwarding, notification pass-through from upstreams, and HealthMonitor for connection state tracking with event-driven state changes. Co-Authored-By: Claude Opus 4.6 --- src/local-proxy/src/health.ts | 126 ++++++++++++ src/local-proxy/src/index.ts | 3 + src/local-proxy/src/main.ts | 112 +++++++++++ src/local-proxy/src/router.ts | 187 +++++++++++++++--- src/local-proxy/src/server.ts | 13 +- src/local-proxy/src/types.ts | 2 + src/local-proxy/src/upstream/stdio.ts | 28 ++- src/local-proxy/tests/health.test.ts | 153 +++++++++++++++ src/local-proxy/tests/router.test.ts | 271 ++++++++++++++++++++++++-- 9 files changed, 846 insertions(+), 49 deletions(-) create mode 100644 src/local-proxy/src/health.ts create mode 100644 src/local-proxy/src/main.ts create mode 100644 src/local-proxy/tests/health.test.ts diff --git a/src/local-proxy/src/health.ts b/src/local-proxy/src/health.ts new file mode 100644 index 0000000..4bfdcce --- /dev/null +++ b/src/local-proxy/src/health.ts @@ -0,0 +1,126 @@ +import { EventEmitter } from 'node:events'; +import type { UpstreamConnection, JsonRpcResponse } from './types.js'; + +export type HealthState = 'healthy' | 'degraded' | 'disconnected'; + +export interface HealthStatus { + name: string; + state: HealthState; + lastCheck: number; + consecutiveFailures: number; +} + +export interface HealthMonitorOptions { + /** Interval between health checks in ms (default: 30000) */ + intervalMs?: number; + /** Number of failures before marking disconnected (default: 3) */ + failureThreshold?: number; +} + +/** + * Monitors upstream connection health with periodic pings. + * Emits 'change' events when an upstream's health state changes. + */ +export class HealthMonitor extends EventEmitter { + private statuses = new Map(); + private upstreams = new Map(); + private timer: ReturnType | null = null; + private readonly intervalMs: number; + private readonly failureThreshold: number; + + constructor(opts?: HealthMonitorOptions) { + super(); + this.intervalMs = opts?.intervalMs ?? 30000; + this.failureThreshold = opts?.failureThreshold ?? 3; + } + + track(upstream: UpstreamConnection): void { + this.upstreams.set(upstream.name, upstream); + this.statuses.set(upstream.name, { + name: upstream.name, + state: upstream.isAlive() ? 'healthy' : 'disconnected', + lastCheck: Date.now(), + consecutiveFailures: 0, + }); + } + + untrack(name: string): void { + this.upstreams.delete(name); + this.statuses.delete(name); + } + + start(): void { + if (this.timer) return; + this.timer = setInterval(() => void this.checkAll(), this.intervalMs); + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + } + } + + getStatus(name: string): HealthStatus | undefined { + return this.statuses.get(name); + } + + getAllStatuses(): HealthStatus[] { + return [...this.statuses.values()]; + } + + isHealthy(name: string): boolean { + const status = this.statuses.get(name); + return status?.state === 'healthy'; + } + + async checkAll(): Promise { + const checks = [...this.upstreams.entries()].map(([name, upstream]) => + this.checkOne(name, upstream), + ); + await Promise.allSettled(checks); + } + + private async checkOne(name: string, upstream: UpstreamConnection): Promise { + const status = this.statuses.get(name); + if (!status) return; + + const oldState = status.state; + + if (!upstream.isAlive()) { + status.consecutiveFailures = this.failureThreshold; + status.state = 'disconnected'; + status.lastCheck = Date.now(); + if (oldState !== 'disconnected') { + this.emit('change', { name, oldState, newState: 'disconnected' }); + } + return; + } + + try { + const response: JsonRpcResponse = await upstream.send({ + jsonrpc: '2.0', + id: `health-${name}-${Date.now()}`, + method: 'ping', + }); + + // A response (even an error for unknown method) means the server is alive + if (response) { + status.consecutiveFailures = 0; + status.state = 'healthy'; + } + } catch { + status.consecutiveFailures++; + if (status.consecutiveFailures >= this.failureThreshold) { + status.state = 'disconnected'; + } else { + status.state = 'degraded'; + } + } + + status.lastCheck = Date.now(); + if (oldState !== status.state) { + this.emit('change', { name, oldState, newState: status.state }); + } + } +} diff --git a/src/local-proxy/src/index.ts b/src/local-proxy/src/index.ts index 5a35dca..800ef88 100644 --- a/src/local-proxy/src/index.ts +++ b/src/local-proxy/src/index.ts @@ -2,6 +2,9 @@ export { McpRouter } from './router.js'; export { StdioProxyServer } from './server.js'; export { StdioUpstream, HttpUpstream } from './upstream/index.js'; +export { HealthMonitor } from './health.js'; +export type { HealthState, HealthStatus, HealthMonitorOptions } from './health.js'; +export { main } from './main.js'; export type { JsonRpcRequest, JsonRpcResponse, diff --git a/src/local-proxy/src/main.ts b/src/local-proxy/src/main.ts new file mode 100644 index 0000000..16d26bb --- /dev/null +++ b/src/local-proxy/src/main.ts @@ -0,0 +1,112 @@ +#!/usr/bin/env node +import { readFileSync } from 'node:fs'; +import type { ProxyConfig, UpstreamConfig } from './types.js'; +import { McpRouter } from './router.js'; +import { StdioProxyServer } from './server.js'; +import { StdioUpstream } from './upstream/stdio.js'; +import { HttpUpstream } from './upstream/http.js'; + +function parseArgs(argv: string[]): { configPath: string | undefined; upstreams: string[] } { + let configPath: string | undefined; + const upstreams: string[] = []; + for (let i = 2; i < argv.length; i++) { + const arg = argv[i]; + if (arg === '--config' && i + 1 < argv.length) { + configPath = argv[++i]; + } else if (arg?.startsWith('--config=')) { + configPath = arg.slice('--config='.length); + } else if (arg === '--upstream' && i + 1 < argv.length) { + upstreams.push(argv[++i]!); + } else if (arg?.startsWith('--upstream=')) { + upstreams.push(arg.slice('--upstream='.length)); + } + } + return { configPath, upstreams }; +} + +function loadConfig(configPath: string): ProxyConfig { + const raw = readFileSync(configPath, 'utf-8'); + return JSON.parse(raw) as ProxyConfig; +} + +function createUpstream(config: UpstreamConfig) { + if (config.transport === 'stdio') { + return new StdioUpstream(config); + } + return new HttpUpstream(config); +} + +export async function main(argv: string[] = process.argv): Promise<{ router: McpRouter; server: StdioProxyServer }> { + const args = parseArgs(argv); + + let upstreamConfigs: UpstreamConfig[] = []; + + if (args.configPath) { + const config = loadConfig(args.configPath); + upstreamConfigs = config.upstreams; + } + + // --upstream flags: "name:command arg1 arg2" for STDIO or "name:http://url" for HTTP + for (const spec of args.upstreams) { + const colonIdx = spec.indexOf(':'); + if (colonIdx === -1) continue; + const name = spec.slice(0, colonIdx); + const rest = spec.slice(colonIdx + 1); + + if (rest.startsWith('http://') || rest.startsWith('https://')) { + upstreamConfigs.push({ + serverId: name, + name, + transport: 'streamable-http', + url: rest, + }); + } else { + const parts = rest.split(' ').filter(Boolean); + const command = parts[0]; + if (!command) continue; + const config: UpstreamConfig = { + serverId: name, + name, + transport: 'stdio', + command, + args: parts.slice(1), + }; + upstreamConfigs.push(config); + } + } + + const router = new McpRouter(); + + for (const config of upstreamConfigs) { + const upstream = createUpstream(config); + if (upstream instanceof StdioUpstream) { + await upstream.start(); + } + router.addUpstream(upstream); + } + + const server = new StdioProxyServer(router); + + const shutdown = async () => { + server.stop(); + await router.closeAll(); + process.exit(0); + }; + + process.on('SIGTERM', () => void shutdown()); + process.on('SIGINT', () => void shutdown()); + + server.start(); + process.stderr.write(`mcpctl-proxy started with ${upstreamConfigs.length} upstream(s)\n`); + + return { router, server }; +} + +// Run when executed directly +const isMain = process.argv[1]?.endsWith('main.js') || process.argv[1]?.endsWith('main.ts'); +if (isMain) { + main().catch((err) => { + process.stderr.write(`Fatal: ${err}\n`); + process.exit(1); + }); +} diff --git a/src/local-proxy/src/router.ts b/src/local-proxy/src/router.ts index aa2e44b..72664ab 100644 --- a/src/local-proxy/src/router.ts +++ b/src/local-proxy/src/router.ts @@ -1,33 +1,69 @@ -import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from './types.js'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification } from './types.js'; /** * Routes MCP requests to the appropriate upstream server. * - * The proxy presents a unified MCP interface to clients. Tools from - * all upstreams are merged under namespaced prefixes (e.g., "slack/send_message"). + * The proxy presents a unified MCP interface to clients. Tools, resources, + * and prompts from all upstreams are merged under namespaced prefixes + * (e.g., "slack/send_message"). * - * Routing is done by tool name prefix: "servername/toolname" -> upstream "servername". + * Routing is done by name prefix: "servername/toolname" -> upstream "servername". */ export class McpRouter { private upstreams = new Map(); private toolToServer = new Map(); + private resourceToServer = new Map(); + private promptToServer = new Map(); + private notificationHandler: ((notification: JsonRpcNotification) => void) | null = null; addUpstream(connection: UpstreamConnection): void { this.upstreams.set(connection.name, connection); + if (this.notificationHandler && connection.onNotification) { + const serverName = connection.name; + const handler = this.notificationHandler; + connection.onNotification((notification) => { + handler({ + ...notification, + params: { + ...notification.params, + _source: serverName, + }, + }); + }); + } } removeUpstream(name: string): void { this.upstreams.delete(name); - // Remove tool mappings for this server - for (const [tool, server] of this.toolToServer) { - if (server === name) { - this.toolToServer.delete(tool); + for (const map of [this.toolToServer, this.resourceToServer, this.promptToServer]) { + for (const [key, server] of map) { + if (server === name) { + map.delete(key); + } + } + } + } + + setNotificationHandler(handler: (notification: JsonRpcNotification) => void): void { + this.notificationHandler = handler; + // Wire to all existing upstreams + for (const [serverName, upstream] of this.upstreams) { + if (upstream.onNotification) { + upstream.onNotification((notification) => { + handler({ + ...notification, + params: { + ...notification.params, + _source: serverName, + }, + }); + }); } } } /** - * Initialize tools from all upstreams by calling tools/list on each. + * Discover tools from all upstreams by calling tools/list on each. */ async discoverTools(): Promise> { const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = []; @@ -36,7 +72,7 @@ export class McpRouter { try { const response = await upstream.send({ jsonrpc: '2.0', - id: `discover-${serverName}`, + id: `discover-tools-${serverName}`, method: 'tools/list', }); @@ -60,25 +96,95 @@ export class McpRouter { } /** - * Route a tools/call request to the correct upstream. + * Discover resources from all upstreams by calling resources/list on each. */ - async routeToolCall(request: JsonRpcRequest): Promise { - const params = request.params as { name?: string; arguments?: unknown } | undefined; - const toolName = params?.name; - if (!toolName) { + async discoverResources(): Promise> { + const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = []; + + for (const [serverName, upstream] of this.upstreams) { + try { + const response = await upstream.send({ + jsonrpc: '2.0', + id: `discover-resources-${serverName}`, + method: 'resources/list', + }); + + if (response.result && typeof response.result === 'object' && 'resources' in response.result) { + const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources; + for (const resource of resources) { + const namespacedUri = `${serverName}://${resource.uri}`; + this.resourceToServer.set(namespacedUri, serverName); + allResources.push({ + ...resource, + uri: namespacedUri, + }); + } + } + } catch { + // Server may be unavailable; skip its resources + } + } + + return allResources; + } + + /** + * Discover prompts from all upstreams by calling prompts/list on each. + */ + async discoverPrompts(): Promise> { + const allPrompts: Array<{ name: string; description?: string; arguments?: unknown[] }> = []; + + for (const [serverName, upstream] of this.upstreams) { + try { + const response = await upstream.send({ + jsonrpc: '2.0', + id: `discover-prompts-${serverName}`, + method: 'prompts/list', + }); + + if (response.result && typeof response.result === 'object' && 'prompts' in response.result) { + const prompts = (response.result as { prompts: Array<{ name: string; description?: string; arguments?: unknown[] }> }).prompts; + for (const prompt of prompts) { + const namespacedName = `${serverName}/${prompt.name}`; + this.promptToServer.set(namespacedName, serverName); + allPrompts.push({ + ...prompt, + name: namespacedName, + }); + } + } + } catch { + // Server may be unavailable; skip its prompts + } + } + + return allPrompts; + } + + /** + * Route a namespaced call to the correct upstream, stripping the namespace prefix. + */ + private async routeNamespacedCall( + request: JsonRpcRequest, + nameField: string, + routingMap: Map, + ): Promise { + const params = request.params as Record | undefined; + const name = params?.[nameField] as string | undefined; + if (!name) { return { jsonrpc: '2.0', id: request.id, - error: { code: -32602, message: 'Missing tool name in params' }, + error: { code: -32602, message: `Missing ${nameField} in params` }, }; } - const serverName = this.toolToServer.get(toolName); + const serverName = routingMap.get(name); if (!serverName) { return { jsonrpc: '2.0', id: request.id, - error: { code: -32601, message: `Unknown tool: ${toolName}` }, + error: { code: -32601, message: `Unknown ${nameField}: ${name}` }, }; } @@ -91,13 +197,16 @@ export class McpRouter { }; } - // Strip the namespace prefix for the upstream call - const originalToolName = toolName.slice(serverName.length + 1); + // Strip the namespace prefix + const originalName = nameField === 'uri' + ? name.slice(`${serverName}://`.length) + : name.slice(serverName.length + 1); + const upstreamRequest: JsonRpcRequest = { ...request, params: { ...params, - name: originalToolName, + [nameField]: originalName, }, }; @@ -106,7 +215,7 @@ export class McpRouter { /** * Route a generic request. Handles protocol-level methods locally, - * delegates tool calls to upstreams. + * delegates tool/resource/prompt calls to upstreams. */ async route(request: JsonRpcRequest): Promise { switch (request.method) { @@ -122,6 +231,8 @@ export class McpRouter { }, capabilities: { tools: {}, + resources: {}, + prompts: {}, }, }, }; @@ -136,7 +247,35 @@ export class McpRouter { } case 'tools/call': - return this.routeToolCall(request); + return this.routeNamespacedCall(request, 'name', this.toolToServer); + + case 'resources/list': { + const resources = await this.discoverResources(); + return { + jsonrpc: '2.0', + id: request.id, + result: { resources }, + }; + } + + case 'resources/read': + return this.routeNamespacedCall(request, 'uri', this.resourceToServer); + + case 'resources/subscribe': + case 'resources/unsubscribe': + return this.routeNamespacedCall(request, 'uri', this.resourceToServer); + + case 'prompts/list': { + const prompts = await this.discoverPrompts(); + return { + jsonrpc: '2.0', + id: request.id, + result: { prompts }, + }; + } + + case 'prompts/get': + return this.routeNamespacedCall(request, 'name', this.promptToServer); default: return { @@ -157,5 +296,7 @@ export class McpRouter { } this.upstreams.clear(); this.toolToServer.clear(); + this.resourceToServer.clear(); + this.promptToServer.clear(); } } diff --git a/src/local-proxy/src/server.ts b/src/local-proxy/src/server.ts index 47cc0ae..7bdc551 100644 --- a/src/local-proxy/src/server.ts +++ b/src/local-proxy/src/server.ts @@ -1,5 +1,5 @@ import { createInterface } from 'node:readline'; -import type { JsonRpcRequest, JsonRpcResponse, JsonRpcMessage } from './types.js'; +import type { JsonRpcRequest, JsonRpcResponse, JsonRpcNotification, JsonRpcMessage } from './types.js'; import type { McpRouter } from './router.js'; /** @@ -16,6 +16,13 @@ export class StdioProxyServer { start(): void { this.running = true; + // Forward notifications from upstreams to client + this.router.setNotificationHandler((notification) => { + if (this.running) { + this.sendNotification(notification); + } + }); + const rl = createInterface({ input: process.stdin }); rl.on('line', (line) => { @@ -54,6 +61,10 @@ export class StdioProxyServer { process.stdout.write(JSON.stringify(response) + '\n'); } + private sendNotification(notification: JsonRpcNotification): void { + process.stdout.write(JSON.stringify(notification) + '\n'); + } + stop(): void { this.running = false; } diff --git a/src/local-proxy/src/types.ts b/src/local-proxy/src/types.ts index cae3870..785a210 100644 --- a/src/local-proxy/src/types.ts +++ b/src/local-proxy/src/types.ts @@ -69,4 +69,6 @@ export interface UpstreamConnection { close(): Promise; /** Whether the connection is alive */ isAlive(): boolean; + /** Register a handler for notifications from this upstream */ + onNotification?(handler: (notification: JsonRpcNotification) => void): void; } diff --git a/src/local-proxy/src/upstream/stdio.ts b/src/local-proxy/src/upstream/stdio.ts index 489de37..64953cf 100644 --- a/src/local-proxy/src/upstream/stdio.ts +++ b/src/local-proxy/src/upstream/stdio.ts @@ -1,6 +1,6 @@ import { spawn, type ChildProcess } from 'node:child_process'; import { createInterface } from 'node:readline'; -import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, UpstreamConfig } from '../types.js'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification, UpstreamConfig } from '../types.js'; /** * Connects to an MCP server over STDIO (spawn a process, write JSON-RPC to stdin, read from stdout). @@ -13,6 +13,7 @@ export class StdioUpstream implements UpstreamConnection { reject: (err: Error) => void; }>(); private alive = false; + private notificationHandlers: Array<(notification: JsonRpcNotification) => void> = []; constructor(private config: UpstreamConfig) { this.name = config.name; @@ -43,12 +44,25 @@ export class StdioUpstream implements UpstreamConnection { const rl = createInterface({ input: this.process.stdout }); rl.on('line', (line) => { try { - const msg = JSON.parse(line) as JsonRpcResponse; + const msg = JSON.parse(line) as Record; if ('id' in msg && msg.id !== undefined) { - const pending = this.pendingRequests.get(msg.id); + // Response to a pending request + const pending = this.pendingRequests.get(msg.id as string | number); if (pending) { - this.pendingRequests.delete(msg.id); - pending.resolve(msg); + this.pendingRequests.delete(msg.id as string | number); + pending.resolve(msg as unknown as JsonRpcResponse); + } + } else if ('method' in msg) { + // Notification from upstream + const notification: JsonRpcNotification = { + jsonrpc: '2.0', + method: msg.method as string, + }; + if (msg.params) { + notification.params = msg.params as Record; + } + for (const handler of this.notificationHandlers) { + handler(notification); } } } catch { @@ -97,4 +111,8 @@ export class StdioUpstream implements UpstreamConnection { isAlive(): boolean { return this.alive; } + + onNotification(handler: (notification: JsonRpcNotification) => void): void { + this.notificationHandlers.push(handler); + } } diff --git a/src/local-proxy/tests/health.test.ts b/src/local-proxy/tests/health.test.ts new file mode 100644 index 0000000..6e2ca35 --- /dev/null +++ b/src/local-proxy/tests/health.test.ts @@ -0,0 +1,153 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { HealthMonitor } from '../src/health.js'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js'; + +function mockUpstream(name: string, alive = true): UpstreamConnection { + return { + name, + isAlive: vi.fn(() => alive), + close: vi.fn(async () => {}), + send: vi.fn(async (req: JsonRpcRequest): Promise => ({ + jsonrpc: '2.0', + id: req.id, + error: { code: -32601, message: 'Method not found' }, + })), + }; +} + +describe('HealthMonitor', () => { + let monitor: HealthMonitor; + + beforeEach(() => { + monitor = new HealthMonitor({ intervalMs: 100, failureThreshold: 2 }); + }); + + afterEach(() => { + monitor.stop(); + }); + + it('tracks upstream and reports initial healthy state', () => { + const upstream = mockUpstream('slack'); + monitor.track(upstream); + + const status = monitor.getStatus('slack'); + expect(status).toBeDefined(); + expect(status?.state).toBe('healthy'); + expect(status?.consecutiveFailures).toBe(0); + }); + + it('reports disconnected for dead upstream on track', () => { + const upstream = mockUpstream('slack', false); + monitor.track(upstream); + + expect(monitor.getStatus('slack')?.state).toBe('disconnected'); + }); + + it('marks upstream healthy when ping succeeds', async () => { + const upstream = mockUpstream('slack'); + monitor.track(upstream); + + await monitor.checkAll(); + + expect(monitor.isHealthy('slack')).toBe(true); + expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(0); + }); + + it('marks upstream degraded after one failure', async () => { + const upstream = mockUpstream('slack'); + vi.mocked(upstream.send).mockRejectedValue(new Error('timeout')); + monitor.track(upstream); + + await monitor.checkAll(); + + expect(monitor.getStatus('slack')?.state).toBe('degraded'); + expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(1); + }); + + it('marks upstream disconnected after threshold failures', async () => { + const upstream = mockUpstream('slack'); + vi.mocked(upstream.send).mockRejectedValue(new Error('timeout')); + monitor.track(upstream); + + await monitor.checkAll(); + await monitor.checkAll(); + + expect(monitor.getStatus('slack')?.state).toBe('disconnected'); + expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(2); + }); + + it('recovers from degraded to healthy', async () => { + const upstream = mockUpstream('slack'); + vi.mocked(upstream.send).mockRejectedValueOnce(new Error('timeout')); + monitor.track(upstream); + + await monitor.checkAll(); + expect(monitor.getStatus('slack')?.state).toBe('degraded'); + + // Next check succeeds + await monitor.checkAll(); + expect(monitor.getStatus('slack')?.state).toBe('healthy'); + expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(0); + }); + + it('emits change events on state transitions', async () => { + const upstream = mockUpstream('slack'); + vi.mocked(upstream.send).mockRejectedValue(new Error('timeout')); + monitor.track(upstream); + + const changes: Array<{ name: string; oldState: string; newState: string }> = []; + monitor.on('change', (change) => changes.push(change)); + + await monitor.checkAll(); + expect(changes).toHaveLength(1); + expect(changes[0]).toEqual({ name: 'slack', oldState: 'healthy', newState: 'degraded' }); + + await monitor.checkAll(); + expect(changes).toHaveLength(2); + expect(changes[1]).toEqual({ name: 'slack', oldState: 'degraded', newState: 'disconnected' }); + }); + + it('does not emit when state stays the same', async () => { + const upstream = mockUpstream('slack'); + monitor.track(upstream); + + const changes: unknown[] = []; + monitor.on('change', (change) => changes.push(change)); + + await monitor.checkAll(); + await monitor.checkAll(); + + expect(changes).toHaveLength(0); + }); + + it('reports disconnected when upstream is not alive', async () => { + const upstream = mockUpstream('slack'); + monitor.track(upstream); + + vi.mocked(upstream.isAlive).mockReturnValue(false); + + const changes: Array<{ name: string; oldState: string; newState: string }> = []; + monitor.on('change', (change) => changes.push(change)); + + await monitor.checkAll(); + expect(monitor.getStatus('slack')?.state).toBe('disconnected'); + expect(changes).toHaveLength(1); + }); + + it('returns all statuses', () => { + monitor.track(mockUpstream('slack')); + monitor.track(mockUpstream('github')); + + const statuses = monitor.getAllStatuses(); + expect(statuses).toHaveLength(2); + expect(statuses.map((s) => s.name)).toEqual(['slack', 'github']); + }); + + it('untracks upstream', () => { + monitor.track(mockUpstream('slack')); + monitor.untrack('slack'); + + expect(monitor.getStatus('slack')).toBeUndefined(); + expect(monitor.getAllStatuses()).toHaveLength(0); + }); +}); diff --git a/src/local-proxy/tests/router.test.ts b/src/local-proxy/tests/router.test.ts index af8264f..a56ceed 100644 --- a/src/local-proxy/tests/router.test.ts +++ b/src/local-proxy/tests/router.test.ts @@ -1,18 +1,43 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import { McpRouter } from '../src/router.js'; -import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification } from '../src/types.js'; -function mockUpstream(name: string, tools: Array<{ name: string; description?: string }> = []): UpstreamConnection { +function mockUpstream( + name: string, + opts: { + tools?: Array<{ name: string; description?: string }>; + resources?: Array<{ uri: string; name?: string; description?: string }>; + prompts?: Array<{ name: string; description?: string }>; + } = {}, +): UpstreamConnection { + const notificationHandlers: Array<(n: JsonRpcNotification) => void> = []; return { name, isAlive: vi.fn(() => true), close: vi.fn(async () => {}), + onNotification: vi.fn((handler: (n: JsonRpcNotification) => void) => { + notificationHandlers.push(handler); + }), send: vi.fn(async (req: JsonRpcRequest): Promise => { if (req.method === 'tools/list') { return { jsonrpc: '2.0', id: req.id, - result: { tools }, + result: { tools: opts.tools ?? [] }, + }; + } + if (req.method === 'resources/list') { + return { + jsonrpc: '2.0', + id: req.id, + result: { resources: opts.resources ?? [] }, + }; + } + if (req.method === 'prompts/list') { + return { + jsonrpc: '2.0', + id: req.id, + result: { prompts: opts.prompts ?? [] }, }; } if (req.method === 'tools/call') { @@ -24,9 +49,29 @@ function mockUpstream(name: string, tools: Array<{ name: string; description?: s }, }; } + if (req.method === 'resources/read') { + return { + jsonrpc: '2.0', + id: req.id, + result: { + contents: [{ uri: (req.params as Record)?.uri, text: 'resource content' }], + }, + }; + } + if (req.method === 'prompts/get') { + return { + jsonrpc: '2.0', + id: req.id, + result: { + messages: [{ role: 'user', content: { type: 'text', text: 'prompt content' } }], + }, + }; + } return { jsonrpc: '2.0', id: req.id, error: { code: -32601, message: 'Not found' } }; }), - }; + // expose for tests + _notificationHandlers: notificationHandlers, + } as UpstreamConnection & { _notificationHandlers: Array<(n: JsonRpcNotification) => void> }; } describe('McpRouter', () => { @@ -37,7 +82,7 @@ describe('McpRouter', () => { }); describe('initialize', () => { - it('responds with server info and capabilities', async () => { + it('responds with server info and capabilities including resources and prompts', async () => { const res = await router.route({ jsonrpc: '2.0', id: 1, @@ -48,6 +93,10 @@ describe('McpRouter', () => { const result = res.result as Record; expect(result['protocolVersion']).toBe('2024-11-05'); expect((result['serverInfo'] as Record)['name']).toBe('mcpctl-proxy'); + const capabilities = result['capabilities'] as Record; + expect(capabilities['tools']).toBeDefined(); + expect(capabilities['resources']).toBeDefined(); + expect(capabilities['prompts']).toBeDefined(); }); }); @@ -64,13 +113,17 @@ describe('McpRouter', () => { }); it('discovers and namespaces tools from upstreams', async () => { - router.addUpstream(mockUpstream('slack', [ - { name: 'send_message', description: 'Send a message' }, - { name: 'list_channels', description: 'List channels' }, - ])); - router.addUpstream(mockUpstream('github', [ - { name: 'create_issue', description: 'Create an issue' }, - ])); + router.addUpstream(mockUpstream('slack', { + tools: [ + { name: 'send_message', description: 'Send a message' }, + { name: 'list_channels', description: 'List channels' }, + ], + })); + router.addUpstream(mockUpstream('github', { + tools: [ + { name: 'create_issue', description: 'Create an issue' }, + ], + })); const res = await router.route({ jsonrpc: '2.0', @@ -90,9 +143,9 @@ describe('McpRouter', () => { vi.mocked(failingUpstream.send).mockRejectedValue(new Error('Connection refused')); router.addUpstream(failingUpstream); - router.addUpstream(mockUpstream('working', [ - { name: 'do_thing', description: 'Does a thing' }, - ])); + router.addUpstream(mockUpstream('working', { + tools: [{ name: 'do_thing', description: 'Does a thing' }], + })); const res = await router.route({ jsonrpc: '2.0', @@ -108,7 +161,7 @@ describe('McpRouter', () => { describe('tools/call', () => { it('routes call to correct upstream', async () => { - const slack = mockUpstream('slack', [{ name: 'send_message' }]); + const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] }); router.addUpstream(slack); await router.discoverTools(); @@ -154,7 +207,7 @@ describe('McpRouter', () => { }); it('returns error when upstream is dead', async () => { - const slack = mockUpstream('slack', [{ name: 'send_message' }]); + const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] }); router.addUpstream(slack); await router.discoverTools(); @@ -172,12 +225,177 @@ describe('McpRouter', () => { }); }); + describe('resources/list', () => { + it('returns empty resources when no upstreams', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'resources/list', + }); + + const result = res.result as { resources: unknown[] }; + expect(result.resources).toEqual([]); + }); + + it('discovers and namespaces resources from upstreams', async () => { + router.addUpstream(mockUpstream('files', { + resources: [ + { uri: 'file:///docs/readme.md', name: 'README', description: 'Project readme' }, + ], + })); + router.addUpstream(mockUpstream('db', { + resources: [ + { uri: 'db://users', name: 'Users table' }, + ], + })); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'resources/list', + }); + + const result = res.result as { resources: Array<{ uri: string }> }; + expect(result.resources).toHaveLength(2); + expect(result.resources.map((r) => r.uri)).toContain('files://file:///docs/readme.md'); + expect(result.resources.map((r) => r.uri)).toContain('db://db://users'); + }); + }); + + describe('resources/read', () => { + it('routes read to correct upstream', async () => { + const files = mockUpstream('files', { + resources: [{ uri: 'file:///docs/readme.md', name: 'README' }], + }); + router.addUpstream(files); + await router.discoverResources(); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'resources/read', + params: { uri: 'files://file:///docs/readme.md' }, + }); + + expect(res.result).toBeDefined(); + expect(vi.mocked(files.send)).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'resources/read', + params: expect.objectContaining({ uri: 'file:///docs/readme.md' }), + }), + ); + }); + + it('returns error for unknown resource', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'resources/read', + params: { uri: 'unknown://resource' }, + }); + + expect(res.error).toBeDefined(); + expect(res.error?.code).toBe(-32601); + }); + }); + + describe('prompts/list', () => { + it('returns empty prompts when no upstreams', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'prompts/list', + }); + + const result = res.result as { prompts: unknown[] }; + expect(result.prompts).toEqual([]); + }); + + it('discovers and namespaces prompts from upstreams', async () => { + router.addUpstream(mockUpstream('code', { + prompts: [ + { name: 'review', description: 'Code review' }, + { name: 'explain', description: 'Explain code' }, + ], + })); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'prompts/list', + }); + + const result = res.result as { prompts: Array<{ name: string }> }; + expect(result.prompts).toHaveLength(2); + expect(result.prompts.map((p) => p.name)).toContain('code/review'); + expect(result.prompts.map((p) => p.name)).toContain('code/explain'); + }); + }); + + describe('prompts/get', () => { + it('routes get to correct upstream', async () => { + const code = mockUpstream('code', { + prompts: [{ name: 'review', description: 'Code review' }], + }); + router.addUpstream(code); + await router.discoverPrompts(); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'prompts/get', + params: { name: 'code/review' }, + }); + + expect(res.result).toBeDefined(); + expect(vi.mocked(code.send)).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'prompts/get', + params: expect.objectContaining({ name: 'review' }), + }), + ); + }); + + it('returns error for unknown prompt', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'prompts/get', + params: { name: 'unknown/prompt' }, + }); + + expect(res.error).toBeDefined(); + expect(res.error?.code).toBe(-32601); + }); + }); + + describe('notifications', () => { + it('forwards notifications from upstreams with source tag', () => { + const received: JsonRpcNotification[] = []; + router.setNotificationHandler((n) => received.push(n)); + + const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] }) as UpstreamConnection & { + _notificationHandlers: Array<(n: JsonRpcNotification) => void>; + }; + router.addUpstream(slack); + + // Simulate upstream sending a notification + for (const handler of slack._notificationHandlers) { + handler({ jsonrpc: '2.0', method: 'notifications/progress', params: { progress: 50 } }); + } + + expect(received).toHaveLength(1); + expect(received[0]?.method).toBe('notifications/progress'); + expect(received[0]?.params?._source).toBe('slack'); + }); + }); + describe('unknown methods', () => { it('returns method not found error', async () => { const res = await router.route({ jsonrpc: '2.0', id: 1, - method: 'resources/list', + method: 'completions/complete', }); expect(res.error).toBeDefined(); @@ -192,13 +410,26 @@ describe('McpRouter', () => { expect(router.getUpstreamNames()).toEqual(['slack', 'github']); }); - it('removes upstream', async () => { - const slack = mockUpstream('slack', [{ name: 'send_message' }]); + it('removes upstream and cleans up all mappings', async () => { + const slack = mockUpstream('slack', { + tools: [{ name: 'send_message' }], + resources: [{ uri: 'slack://channels' }], + prompts: [{ name: 'compose' }], + }); router.addUpstream(slack); await router.discoverTools(); + await router.discoverResources(); + await router.discoverPrompts(); router.removeUpstream('slack'); expect(router.getUpstreamNames()).toEqual([]); + + // Verify tool/resource/prompt mappings are cleaned + const toolRes = await router.route({ + jsonrpc: '2.0', id: 1, method: 'tools/call', + params: { name: 'slack/send_message' }, + }); + expect(toolRes.error?.code).toBe(-32601); }); it('closes all upstreams', async () => {