feat: add proxy entry point, resource/prompt forwarding, notifications, and health monitoring

Adds main.ts for config-driven proxy startup, extends router with
resources/list, resources/read, prompts/list, prompts/get forwarding,
notification pass-through from upstreams, and HealthMonitor for
connection state tracking with event-driven state changes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Michal
2026-02-21 05:05:41 +00:00
parent 4b67a9cc15
commit 09675f020f
9 changed files with 846 additions and 49 deletions

View File

@@ -0,0 +1,126 @@
import { EventEmitter } from 'node:events';
import type { UpstreamConnection, JsonRpcResponse } from './types.js';
export type HealthState = 'healthy' | 'degraded' | 'disconnected';
export interface HealthStatus {
name: string;
state: HealthState;
lastCheck: number;
consecutiveFailures: number;
}
export interface HealthMonitorOptions {
/** Interval between health checks in ms (default: 30000) */
intervalMs?: number;
/** Number of failures before marking disconnected (default: 3) */
failureThreshold?: number;
}
/**
* Monitors upstream connection health with periodic pings.
* Emits 'change' events when an upstream's health state changes.
*/
export class HealthMonitor extends EventEmitter {
private statuses = new Map<string, HealthStatus>();
private upstreams = new Map<string, UpstreamConnection>();
private timer: ReturnType<typeof setInterval> | null = null;
private readonly intervalMs: number;
private readonly failureThreshold: number;
constructor(opts?: HealthMonitorOptions) {
super();
this.intervalMs = opts?.intervalMs ?? 30000;
this.failureThreshold = opts?.failureThreshold ?? 3;
}
track(upstream: UpstreamConnection): void {
this.upstreams.set(upstream.name, upstream);
this.statuses.set(upstream.name, {
name: upstream.name,
state: upstream.isAlive() ? 'healthy' : 'disconnected',
lastCheck: Date.now(),
consecutiveFailures: 0,
});
}
untrack(name: string): void {
this.upstreams.delete(name);
this.statuses.delete(name);
}
start(): void {
if (this.timer) return;
this.timer = setInterval(() => void this.checkAll(), this.intervalMs);
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
getStatus(name: string): HealthStatus | undefined {
return this.statuses.get(name);
}
getAllStatuses(): HealthStatus[] {
return [...this.statuses.values()];
}
isHealthy(name: string): boolean {
const status = this.statuses.get(name);
return status?.state === 'healthy';
}
async checkAll(): Promise<void> {
const checks = [...this.upstreams.entries()].map(([name, upstream]) =>
this.checkOne(name, upstream),
);
await Promise.allSettled(checks);
}
private async checkOne(name: string, upstream: UpstreamConnection): Promise<void> {
const status = this.statuses.get(name);
if (!status) return;
const oldState = status.state;
if (!upstream.isAlive()) {
status.consecutiveFailures = this.failureThreshold;
status.state = 'disconnected';
status.lastCheck = Date.now();
if (oldState !== 'disconnected') {
this.emit('change', { name, oldState, newState: 'disconnected' });
}
return;
}
try {
const response: JsonRpcResponse = await upstream.send({
jsonrpc: '2.0',
id: `health-${name}-${Date.now()}`,
method: 'ping',
});
// A response (even an error for unknown method) means the server is alive
if (response) {
status.consecutiveFailures = 0;
status.state = 'healthy';
}
} catch {
status.consecutiveFailures++;
if (status.consecutiveFailures >= this.failureThreshold) {
status.state = 'disconnected';
} else {
status.state = 'degraded';
}
}
status.lastCheck = Date.now();
if (oldState !== status.state) {
this.emit('change', { name, oldState, newState: status.state });
}
}
}

View File

@@ -2,6 +2,9 @@
export { McpRouter } from './router.js'; export { McpRouter } from './router.js';
export { StdioProxyServer } from './server.js'; export { StdioProxyServer } from './server.js';
export { StdioUpstream, HttpUpstream } from './upstream/index.js'; export { StdioUpstream, HttpUpstream } from './upstream/index.js';
export { HealthMonitor } from './health.js';
export type { HealthState, HealthStatus, HealthMonitorOptions } from './health.js';
export { main } from './main.js';
export type { export type {
JsonRpcRequest, JsonRpcRequest,
JsonRpcResponse, JsonRpcResponse,

112
src/local-proxy/src/main.ts Normal file
View File

@@ -0,0 +1,112 @@
#!/usr/bin/env node
import { readFileSync } from 'node:fs';
import type { ProxyConfig, UpstreamConfig } from './types.js';
import { McpRouter } from './router.js';
import { StdioProxyServer } from './server.js';
import { StdioUpstream } from './upstream/stdio.js';
import { HttpUpstream } from './upstream/http.js';
function parseArgs(argv: string[]): { configPath: string | undefined; upstreams: string[] } {
let configPath: string | undefined;
const upstreams: string[] = [];
for (let i = 2; i < argv.length; i++) {
const arg = argv[i];
if (arg === '--config' && i + 1 < argv.length) {
configPath = argv[++i];
} else if (arg?.startsWith('--config=')) {
configPath = arg.slice('--config='.length);
} else if (arg === '--upstream' && i + 1 < argv.length) {
upstreams.push(argv[++i]!);
} else if (arg?.startsWith('--upstream=')) {
upstreams.push(arg.slice('--upstream='.length));
}
}
return { configPath, upstreams };
}
function loadConfig(configPath: string): ProxyConfig {
const raw = readFileSync(configPath, 'utf-8');
return JSON.parse(raw) as ProxyConfig;
}
function createUpstream(config: UpstreamConfig) {
if (config.transport === 'stdio') {
return new StdioUpstream(config);
}
return new HttpUpstream(config);
}
export async function main(argv: string[] = process.argv): Promise<{ router: McpRouter; server: StdioProxyServer }> {
const args = parseArgs(argv);
let upstreamConfigs: UpstreamConfig[] = [];
if (args.configPath) {
const config = loadConfig(args.configPath);
upstreamConfigs = config.upstreams;
}
// --upstream flags: "name:command arg1 arg2" for STDIO or "name:http://url" for HTTP
for (const spec of args.upstreams) {
const colonIdx = spec.indexOf(':');
if (colonIdx === -1) continue;
const name = spec.slice(0, colonIdx);
const rest = spec.slice(colonIdx + 1);
if (rest.startsWith('http://') || rest.startsWith('https://')) {
upstreamConfigs.push({
serverId: name,
name,
transport: 'streamable-http',
url: rest,
});
} else {
const parts = rest.split(' ').filter(Boolean);
const command = parts[0];
if (!command) continue;
const config: UpstreamConfig = {
serverId: name,
name,
transport: 'stdio',
command,
args: parts.slice(1),
};
upstreamConfigs.push(config);
}
}
const router = new McpRouter();
for (const config of upstreamConfigs) {
const upstream = createUpstream(config);
if (upstream instanceof StdioUpstream) {
await upstream.start();
}
router.addUpstream(upstream);
}
const server = new StdioProxyServer(router);
const shutdown = async () => {
server.stop();
await router.closeAll();
process.exit(0);
};
process.on('SIGTERM', () => void shutdown());
process.on('SIGINT', () => void shutdown());
server.start();
process.stderr.write(`mcpctl-proxy started with ${upstreamConfigs.length} upstream(s)\n`);
return { router, server };
}
// Run when executed directly
const isMain = process.argv[1]?.endsWith('main.js') || process.argv[1]?.endsWith('main.ts');
if (isMain) {
main().catch((err) => {
process.stderr.write(`Fatal: ${err}\n`);
process.exit(1);
});
}

View File

@@ -1,33 +1,69 @@
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from './types.js'; import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification } from './types.js';
/** /**
* Routes MCP requests to the appropriate upstream server. * Routes MCP requests to the appropriate upstream server.
* *
* The proxy presents a unified MCP interface to clients. Tools from * The proxy presents a unified MCP interface to clients. Tools, resources,
* all upstreams are merged under namespaced prefixes (e.g., "slack/send_message"). * and prompts from all upstreams are merged under namespaced prefixes
* (e.g., "slack/send_message").
* *
* Routing is done by tool name prefix: "servername/toolname" -> upstream "servername". * Routing is done by name prefix: "servername/toolname" -> upstream "servername".
*/ */
export class McpRouter { export class McpRouter {
private upstreams = new Map<string, UpstreamConnection>(); private upstreams = new Map<string, UpstreamConnection>();
private toolToServer = new Map<string, string>(); private toolToServer = new Map<string, string>();
private resourceToServer = new Map<string, string>();
private promptToServer = new Map<string, string>();
private notificationHandler: ((notification: JsonRpcNotification) => void) | null = null;
addUpstream(connection: UpstreamConnection): void { addUpstream(connection: UpstreamConnection): void {
this.upstreams.set(connection.name, connection); this.upstreams.set(connection.name, connection);
if (this.notificationHandler && connection.onNotification) {
const serverName = connection.name;
const handler = this.notificationHandler;
connection.onNotification((notification) => {
handler({
...notification,
params: {
...notification.params,
_source: serverName,
},
});
});
}
} }
removeUpstream(name: string): void { removeUpstream(name: string): void {
this.upstreams.delete(name); this.upstreams.delete(name);
// Remove tool mappings for this server for (const map of [this.toolToServer, this.resourceToServer, this.promptToServer]) {
for (const [tool, server] of this.toolToServer) { for (const [key, server] of map) {
if (server === name) { if (server === name) {
this.toolToServer.delete(tool); map.delete(key);
}
}
}
}
setNotificationHandler(handler: (notification: JsonRpcNotification) => void): void {
this.notificationHandler = handler;
// Wire to all existing upstreams
for (const [serverName, upstream] of this.upstreams) {
if (upstream.onNotification) {
upstream.onNotification((notification) => {
handler({
...notification,
params: {
...notification.params,
_source: serverName,
},
});
});
} }
} }
} }
/** /**
* Initialize tools from all upstreams by calling tools/list on each. * Discover tools from all upstreams by calling tools/list on each.
*/ */
async discoverTools(): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> { async discoverTools(): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = []; const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
@@ -36,7 +72,7 @@ export class McpRouter {
try { try {
const response = await upstream.send({ const response = await upstream.send({
jsonrpc: '2.0', jsonrpc: '2.0',
id: `discover-${serverName}`, id: `discover-tools-${serverName}`,
method: 'tools/list', method: 'tools/list',
}); });
@@ -60,25 +96,95 @@ export class McpRouter {
} }
/** /**
* Route a tools/call request to the correct upstream. * Discover resources from all upstreams by calling resources/list on each.
*/ */
async routeToolCall(request: JsonRpcRequest): Promise<JsonRpcResponse> { async discoverResources(): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> {
const params = request.params as { name?: string; arguments?: unknown } | undefined; const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = [];
const toolName = params?.name;
if (!toolName) { for (const [serverName, upstream] of this.upstreams) {
try {
const response = await upstream.send({
jsonrpc: '2.0',
id: `discover-resources-${serverName}`,
method: 'resources/list',
});
if (response.result && typeof response.result === 'object' && 'resources' in response.result) {
const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources;
for (const resource of resources) {
const namespacedUri = `${serverName}://${resource.uri}`;
this.resourceToServer.set(namespacedUri, serverName);
allResources.push({
...resource,
uri: namespacedUri,
});
}
}
} catch {
// Server may be unavailable; skip its resources
}
}
return allResources;
}
/**
* Discover prompts from all upstreams by calling prompts/list on each.
*/
async discoverPrompts(): Promise<Array<{ name: string; description?: string; arguments?: unknown[] }>> {
const allPrompts: Array<{ name: string; description?: string; arguments?: unknown[] }> = [];
for (const [serverName, upstream] of this.upstreams) {
try {
const response = await upstream.send({
jsonrpc: '2.0',
id: `discover-prompts-${serverName}`,
method: 'prompts/list',
});
if (response.result && typeof response.result === 'object' && 'prompts' in response.result) {
const prompts = (response.result as { prompts: Array<{ name: string; description?: string; arguments?: unknown[] }> }).prompts;
for (const prompt of prompts) {
const namespacedName = `${serverName}/${prompt.name}`;
this.promptToServer.set(namespacedName, serverName);
allPrompts.push({
...prompt,
name: namespacedName,
});
}
}
} catch {
// Server may be unavailable; skip its prompts
}
}
return allPrompts;
}
/**
* Route a namespaced call to the correct upstream, stripping the namespace prefix.
*/
private async routeNamespacedCall(
request: JsonRpcRequest,
nameField: string,
routingMap: Map<string, string>,
): Promise<JsonRpcResponse> {
const params = request.params as Record<string, unknown> | undefined;
const name = params?.[nameField] as string | undefined;
if (!name) {
return { return {
jsonrpc: '2.0', jsonrpc: '2.0',
id: request.id, id: request.id,
error: { code: -32602, message: 'Missing tool name in params' }, error: { code: -32602, message: `Missing ${nameField} in params` },
}; };
} }
const serverName = this.toolToServer.get(toolName); const serverName = routingMap.get(name);
if (!serverName) { if (!serverName) {
return { return {
jsonrpc: '2.0', jsonrpc: '2.0',
id: request.id, id: request.id,
error: { code: -32601, message: `Unknown tool: ${toolName}` }, error: { code: -32601, message: `Unknown ${nameField}: ${name}` },
}; };
} }
@@ -91,13 +197,16 @@ export class McpRouter {
}; };
} }
// Strip the namespace prefix for the upstream call // Strip the namespace prefix
const originalToolName = toolName.slice(serverName.length + 1); const originalName = nameField === 'uri'
? name.slice(`${serverName}://`.length)
: name.slice(serverName.length + 1);
const upstreamRequest: JsonRpcRequest = { const upstreamRequest: JsonRpcRequest = {
...request, ...request,
params: { params: {
...params, ...params,
name: originalToolName, [nameField]: originalName,
}, },
}; };
@@ -106,7 +215,7 @@ export class McpRouter {
/** /**
* Route a generic request. Handles protocol-level methods locally, * Route a generic request. Handles protocol-level methods locally,
* delegates tool calls to upstreams. * delegates tool/resource/prompt calls to upstreams.
*/ */
async route(request: JsonRpcRequest): Promise<JsonRpcResponse> { async route(request: JsonRpcRequest): Promise<JsonRpcResponse> {
switch (request.method) { switch (request.method) {
@@ -122,6 +231,8 @@ export class McpRouter {
}, },
capabilities: { capabilities: {
tools: {}, tools: {},
resources: {},
prompts: {},
}, },
}, },
}; };
@@ -136,7 +247,35 @@ export class McpRouter {
} }
case 'tools/call': case 'tools/call':
return this.routeToolCall(request); return this.routeNamespacedCall(request, 'name', this.toolToServer);
case 'resources/list': {
const resources = await this.discoverResources();
return {
jsonrpc: '2.0',
id: request.id,
result: { resources },
};
}
case 'resources/read':
return this.routeNamespacedCall(request, 'uri', this.resourceToServer);
case 'resources/subscribe':
case 'resources/unsubscribe':
return this.routeNamespacedCall(request, 'uri', this.resourceToServer);
case 'prompts/list': {
const prompts = await this.discoverPrompts();
return {
jsonrpc: '2.0',
id: request.id,
result: { prompts },
};
}
case 'prompts/get':
return this.routeNamespacedCall(request, 'name', this.promptToServer);
default: default:
return { return {
@@ -157,5 +296,7 @@ export class McpRouter {
} }
this.upstreams.clear(); this.upstreams.clear();
this.toolToServer.clear(); this.toolToServer.clear();
this.resourceToServer.clear();
this.promptToServer.clear();
} }
} }

View File

@@ -1,5 +1,5 @@
import { createInterface } from 'node:readline'; import { createInterface } from 'node:readline';
import type { JsonRpcRequest, JsonRpcResponse, JsonRpcMessage } from './types.js'; import type { JsonRpcRequest, JsonRpcResponse, JsonRpcNotification, JsonRpcMessage } from './types.js';
import type { McpRouter } from './router.js'; import type { McpRouter } from './router.js';
/** /**
@@ -16,6 +16,13 @@ export class StdioProxyServer {
start(): void { start(): void {
this.running = true; this.running = true;
// Forward notifications from upstreams to client
this.router.setNotificationHandler((notification) => {
if (this.running) {
this.sendNotification(notification);
}
});
const rl = createInterface({ input: process.stdin }); const rl = createInterface({ input: process.stdin });
rl.on('line', (line) => { rl.on('line', (line) => {
@@ -54,6 +61,10 @@ export class StdioProxyServer {
process.stdout.write(JSON.stringify(response) + '\n'); process.stdout.write(JSON.stringify(response) + '\n');
} }
private sendNotification(notification: JsonRpcNotification): void {
process.stdout.write(JSON.stringify(notification) + '\n');
}
stop(): void { stop(): void {
this.running = false; this.running = false;
} }

View File

@@ -69,4 +69,6 @@ export interface UpstreamConnection {
close(): Promise<void>; close(): Promise<void>;
/** Whether the connection is alive */ /** Whether the connection is alive */
isAlive(): boolean; isAlive(): boolean;
/** Register a handler for notifications from this upstream */
onNotification?(handler: (notification: JsonRpcNotification) => void): void;
} }

View File

@@ -1,6 +1,6 @@
import { spawn, type ChildProcess } from 'node:child_process'; import { spawn, type ChildProcess } from 'node:child_process';
import { createInterface } from 'node:readline'; import { createInterface } from 'node:readline';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, UpstreamConfig } from '../types.js'; import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification, UpstreamConfig } from '../types.js';
/** /**
* Connects to an MCP server over STDIO (spawn a process, write JSON-RPC to stdin, read from stdout). * Connects to an MCP server over STDIO (spawn a process, write JSON-RPC to stdin, read from stdout).
@@ -13,6 +13,7 @@ export class StdioUpstream implements UpstreamConnection {
reject: (err: Error) => void; reject: (err: Error) => void;
}>(); }>();
private alive = false; private alive = false;
private notificationHandlers: Array<(notification: JsonRpcNotification) => void> = [];
constructor(private config: UpstreamConfig) { constructor(private config: UpstreamConfig) {
this.name = config.name; this.name = config.name;
@@ -43,12 +44,25 @@ export class StdioUpstream implements UpstreamConnection {
const rl = createInterface({ input: this.process.stdout }); const rl = createInterface({ input: this.process.stdout });
rl.on('line', (line) => { rl.on('line', (line) => {
try { try {
const msg = JSON.parse(line) as JsonRpcResponse; const msg = JSON.parse(line) as Record<string, unknown>;
if ('id' in msg && msg.id !== undefined) { if ('id' in msg && msg.id !== undefined) {
const pending = this.pendingRequests.get(msg.id); // Response to a pending request
const pending = this.pendingRequests.get(msg.id as string | number);
if (pending) { if (pending) {
this.pendingRequests.delete(msg.id); this.pendingRequests.delete(msg.id as string | number);
pending.resolve(msg); pending.resolve(msg as unknown as JsonRpcResponse);
}
} else if ('method' in msg) {
// Notification from upstream
const notification: JsonRpcNotification = {
jsonrpc: '2.0',
method: msg.method as string,
};
if (msg.params) {
notification.params = msg.params as Record<string, unknown>;
}
for (const handler of this.notificationHandlers) {
handler(notification);
} }
} }
} catch { } catch {
@@ -97,4 +111,8 @@ export class StdioUpstream implements UpstreamConnection {
isAlive(): boolean { isAlive(): boolean {
return this.alive; return this.alive;
} }
onNotification(handler: (notification: JsonRpcNotification) => void): void {
this.notificationHandlers.push(handler);
}
} }

View File

@@ -0,0 +1,153 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { HealthMonitor } from '../src/health.js';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js';
function mockUpstream(name: string, alive = true): UpstreamConnection {
return {
name,
isAlive: vi.fn(() => alive),
close: vi.fn(async () => {}),
send: vi.fn(async (req: JsonRpcRequest): Promise<JsonRpcResponse> => ({
jsonrpc: '2.0',
id: req.id,
error: { code: -32601, message: 'Method not found' },
})),
};
}
describe('HealthMonitor', () => {
let monitor: HealthMonitor;
beforeEach(() => {
monitor = new HealthMonitor({ intervalMs: 100, failureThreshold: 2 });
});
afterEach(() => {
monitor.stop();
});
it('tracks upstream and reports initial healthy state', () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
const status = monitor.getStatus('slack');
expect(status).toBeDefined();
expect(status?.state).toBe('healthy');
expect(status?.consecutiveFailures).toBe(0);
});
it('reports disconnected for dead upstream on track', () => {
const upstream = mockUpstream('slack', false);
monitor.track(upstream);
expect(monitor.getStatus('slack')?.state).toBe('disconnected');
});
it('marks upstream healthy when ping succeeds', async () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
await monitor.checkAll();
expect(monitor.isHealthy('slack')).toBe(true);
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(0);
});
it('marks upstream degraded after one failure', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValue(new Error('timeout'));
monitor.track(upstream);
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('degraded');
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(1);
});
it('marks upstream disconnected after threshold failures', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValue(new Error('timeout'));
monitor.track(upstream);
await monitor.checkAll();
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('disconnected');
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(2);
});
it('recovers from degraded to healthy', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValueOnce(new Error('timeout'));
monitor.track(upstream);
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('degraded');
// Next check succeeds
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('healthy');
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(0);
});
it('emits change events on state transitions', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValue(new Error('timeout'));
monitor.track(upstream);
const changes: Array<{ name: string; oldState: string; newState: string }> = [];
monitor.on('change', (change) => changes.push(change));
await monitor.checkAll();
expect(changes).toHaveLength(1);
expect(changes[0]).toEqual({ name: 'slack', oldState: 'healthy', newState: 'degraded' });
await monitor.checkAll();
expect(changes).toHaveLength(2);
expect(changes[1]).toEqual({ name: 'slack', oldState: 'degraded', newState: 'disconnected' });
});
it('does not emit when state stays the same', async () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
const changes: unknown[] = [];
monitor.on('change', (change) => changes.push(change));
await monitor.checkAll();
await monitor.checkAll();
expect(changes).toHaveLength(0);
});
it('reports disconnected when upstream is not alive', async () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
vi.mocked(upstream.isAlive).mockReturnValue(false);
const changes: Array<{ name: string; oldState: string; newState: string }> = [];
monitor.on('change', (change) => changes.push(change));
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('disconnected');
expect(changes).toHaveLength(1);
});
it('returns all statuses', () => {
monitor.track(mockUpstream('slack'));
monitor.track(mockUpstream('github'));
const statuses = monitor.getAllStatuses();
expect(statuses).toHaveLength(2);
expect(statuses.map((s) => s.name)).toEqual(['slack', 'github']);
});
it('untracks upstream', () => {
monitor.track(mockUpstream('slack'));
monitor.untrack('slack');
expect(monitor.getStatus('slack')).toBeUndefined();
expect(monitor.getAllStatuses()).toHaveLength(0);
});
});

View File

@@ -1,18 +1,43 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'; import { describe, it, expect, vi, beforeEach } from 'vitest';
import { McpRouter } from '../src/router.js'; import { McpRouter } from '../src/router.js';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js'; import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification } from '../src/types.js';
function mockUpstream(name: string, tools: Array<{ name: string; description?: string }> = []): UpstreamConnection { function mockUpstream(
name: string,
opts: {
tools?: Array<{ name: string; description?: string }>;
resources?: Array<{ uri: string; name?: string; description?: string }>;
prompts?: Array<{ name: string; description?: string }>;
} = {},
): UpstreamConnection {
const notificationHandlers: Array<(n: JsonRpcNotification) => void> = [];
return { return {
name, name,
isAlive: vi.fn(() => true), isAlive: vi.fn(() => true),
close: vi.fn(async () => {}), close: vi.fn(async () => {}),
onNotification: vi.fn((handler: (n: JsonRpcNotification) => void) => {
notificationHandlers.push(handler);
}),
send: vi.fn(async (req: JsonRpcRequest): Promise<JsonRpcResponse> => { send: vi.fn(async (req: JsonRpcRequest): Promise<JsonRpcResponse> => {
if (req.method === 'tools/list') { if (req.method === 'tools/list') {
return { return {
jsonrpc: '2.0', jsonrpc: '2.0',
id: req.id, id: req.id,
result: { tools }, result: { tools: opts.tools ?? [] },
};
}
if (req.method === 'resources/list') {
return {
jsonrpc: '2.0',
id: req.id,
result: { resources: opts.resources ?? [] },
};
}
if (req.method === 'prompts/list') {
return {
jsonrpc: '2.0',
id: req.id,
result: { prompts: opts.prompts ?? [] },
}; };
} }
if (req.method === 'tools/call') { if (req.method === 'tools/call') {
@@ -24,9 +49,29 @@ function mockUpstream(name: string, tools: Array<{ name: string; description?: s
}, },
}; };
} }
if (req.method === 'resources/read') {
return {
jsonrpc: '2.0',
id: req.id,
result: {
contents: [{ uri: (req.params as Record<string, unknown>)?.uri, text: 'resource content' }],
},
};
}
if (req.method === 'prompts/get') {
return {
jsonrpc: '2.0',
id: req.id,
result: {
messages: [{ role: 'user', content: { type: 'text', text: 'prompt content' } }],
},
};
}
return { jsonrpc: '2.0', id: req.id, error: { code: -32601, message: 'Not found' } }; return { jsonrpc: '2.0', id: req.id, error: { code: -32601, message: 'Not found' } };
}), }),
}; // expose for tests
_notificationHandlers: notificationHandlers,
} as UpstreamConnection & { _notificationHandlers: Array<(n: JsonRpcNotification) => void> };
} }
describe('McpRouter', () => { describe('McpRouter', () => {
@@ -37,7 +82,7 @@ describe('McpRouter', () => {
}); });
describe('initialize', () => { describe('initialize', () => {
it('responds with server info and capabilities', async () => { it('responds with server info and capabilities including resources and prompts', async () => {
const res = await router.route({ const res = await router.route({
jsonrpc: '2.0', jsonrpc: '2.0',
id: 1, id: 1,
@@ -48,6 +93,10 @@ describe('McpRouter', () => {
const result = res.result as Record<string, unknown>; const result = res.result as Record<string, unknown>;
expect(result['protocolVersion']).toBe('2024-11-05'); expect(result['protocolVersion']).toBe('2024-11-05');
expect((result['serverInfo'] as Record<string, unknown>)['name']).toBe('mcpctl-proxy'); expect((result['serverInfo'] as Record<string, unknown>)['name']).toBe('mcpctl-proxy');
const capabilities = result['capabilities'] as Record<string, unknown>;
expect(capabilities['tools']).toBeDefined();
expect(capabilities['resources']).toBeDefined();
expect(capabilities['prompts']).toBeDefined();
}); });
}); });
@@ -64,13 +113,17 @@ describe('McpRouter', () => {
}); });
it('discovers and namespaces tools from upstreams', async () => { it('discovers and namespaces tools from upstreams', async () => {
router.addUpstream(mockUpstream('slack', [ router.addUpstream(mockUpstream('slack', {
tools: [
{ name: 'send_message', description: 'Send a message' }, { name: 'send_message', description: 'Send a message' },
{ name: 'list_channels', description: 'List channels' }, { name: 'list_channels', description: 'List channels' },
])); ],
router.addUpstream(mockUpstream('github', [ }));
router.addUpstream(mockUpstream('github', {
tools: [
{ name: 'create_issue', description: 'Create an issue' }, { name: 'create_issue', description: 'Create an issue' },
])); ],
}));
const res = await router.route({ const res = await router.route({
jsonrpc: '2.0', jsonrpc: '2.0',
@@ -90,9 +143,9 @@ describe('McpRouter', () => {
vi.mocked(failingUpstream.send).mockRejectedValue(new Error('Connection refused')); vi.mocked(failingUpstream.send).mockRejectedValue(new Error('Connection refused'));
router.addUpstream(failingUpstream); router.addUpstream(failingUpstream);
router.addUpstream(mockUpstream('working', [ router.addUpstream(mockUpstream('working', {
{ name: 'do_thing', description: 'Does a thing' }, tools: [{ name: 'do_thing', description: 'Does a thing' }],
])); }));
const res = await router.route({ const res = await router.route({
jsonrpc: '2.0', jsonrpc: '2.0',
@@ -108,7 +161,7 @@ describe('McpRouter', () => {
describe('tools/call', () => { describe('tools/call', () => {
it('routes call to correct upstream', async () => { it('routes call to correct upstream', async () => {
const slack = mockUpstream('slack', [{ name: 'send_message' }]); const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] });
router.addUpstream(slack); router.addUpstream(slack);
await router.discoverTools(); await router.discoverTools();
@@ -154,7 +207,7 @@ describe('McpRouter', () => {
}); });
it('returns error when upstream is dead', async () => { it('returns error when upstream is dead', async () => {
const slack = mockUpstream('slack', [{ name: 'send_message' }]); const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] });
router.addUpstream(slack); router.addUpstream(slack);
await router.discoverTools(); await router.discoverTools();
@@ -172,12 +225,177 @@ describe('McpRouter', () => {
}); });
}); });
describe('resources/list', () => {
it('returns empty resources when no upstreams', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/list',
});
const result = res.result as { resources: unknown[] };
expect(result.resources).toEqual([]);
});
it('discovers and namespaces resources from upstreams', async () => {
router.addUpstream(mockUpstream('files', {
resources: [
{ uri: 'file:///docs/readme.md', name: 'README', description: 'Project readme' },
],
}));
router.addUpstream(mockUpstream('db', {
resources: [
{ uri: 'db://users', name: 'Users table' },
],
}));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/list',
});
const result = res.result as { resources: Array<{ uri: string }> };
expect(result.resources).toHaveLength(2);
expect(result.resources.map((r) => r.uri)).toContain('files://file:///docs/readme.md');
expect(result.resources.map((r) => r.uri)).toContain('db://db://users');
});
});
describe('resources/read', () => {
it('routes read to correct upstream', async () => {
const files = mockUpstream('files', {
resources: [{ uri: 'file:///docs/readme.md', name: 'README' }],
});
router.addUpstream(files);
await router.discoverResources();
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/read',
params: { uri: 'files://file:///docs/readme.md' },
});
expect(res.result).toBeDefined();
expect(vi.mocked(files.send)).toHaveBeenCalledWith(
expect.objectContaining({
method: 'resources/read',
params: expect.objectContaining({ uri: 'file:///docs/readme.md' }),
}),
);
});
it('returns error for unknown resource', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/read',
params: { uri: 'unknown://resource' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
});
describe('prompts/list', () => {
it('returns empty prompts when no upstreams', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/list',
});
const result = res.result as { prompts: unknown[] };
expect(result.prompts).toEqual([]);
});
it('discovers and namespaces prompts from upstreams', async () => {
router.addUpstream(mockUpstream('code', {
prompts: [
{ name: 'review', description: 'Code review' },
{ name: 'explain', description: 'Explain code' },
],
}));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/list',
});
const result = res.result as { prompts: Array<{ name: string }> };
expect(result.prompts).toHaveLength(2);
expect(result.prompts.map((p) => p.name)).toContain('code/review');
expect(result.prompts.map((p) => p.name)).toContain('code/explain');
});
});
describe('prompts/get', () => {
it('routes get to correct upstream', async () => {
const code = mockUpstream('code', {
prompts: [{ name: 'review', description: 'Code review' }],
});
router.addUpstream(code);
await router.discoverPrompts();
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/get',
params: { name: 'code/review' },
});
expect(res.result).toBeDefined();
expect(vi.mocked(code.send)).toHaveBeenCalledWith(
expect.objectContaining({
method: 'prompts/get',
params: expect.objectContaining({ name: 'review' }),
}),
);
});
it('returns error for unknown prompt', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/get',
params: { name: 'unknown/prompt' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
});
describe('notifications', () => {
it('forwards notifications from upstreams with source tag', () => {
const received: JsonRpcNotification[] = [];
router.setNotificationHandler((n) => received.push(n));
const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] }) as UpstreamConnection & {
_notificationHandlers: Array<(n: JsonRpcNotification) => void>;
};
router.addUpstream(slack);
// Simulate upstream sending a notification
for (const handler of slack._notificationHandlers) {
handler({ jsonrpc: '2.0', method: 'notifications/progress', params: { progress: 50 } });
}
expect(received).toHaveLength(1);
expect(received[0]?.method).toBe('notifications/progress');
expect(received[0]?.params?._source).toBe('slack');
});
});
describe('unknown methods', () => { describe('unknown methods', () => {
it('returns method not found error', async () => { it('returns method not found error', async () => {
const res = await router.route({ const res = await router.route({
jsonrpc: '2.0', jsonrpc: '2.0',
id: 1, id: 1,
method: 'resources/list', method: 'completions/complete',
}); });
expect(res.error).toBeDefined(); expect(res.error).toBeDefined();
@@ -192,13 +410,26 @@ describe('McpRouter', () => {
expect(router.getUpstreamNames()).toEqual(['slack', 'github']); expect(router.getUpstreamNames()).toEqual(['slack', 'github']);
}); });
it('removes upstream', async () => { it('removes upstream and cleans up all mappings', async () => {
const slack = mockUpstream('slack', [{ name: 'send_message' }]); const slack = mockUpstream('slack', {
tools: [{ name: 'send_message' }],
resources: [{ uri: 'slack://channels' }],
prompts: [{ name: 'compose' }],
});
router.addUpstream(slack); router.addUpstream(slack);
await router.discoverTools(); await router.discoverTools();
await router.discoverResources();
await router.discoverPrompts();
router.removeUpstream('slack'); router.removeUpstream('slack');
expect(router.getUpstreamNames()).toEqual([]); expect(router.getUpstreamNames()).toEqual([]);
// Verify tool/resource/prompt mappings are cleaned
const toolRes = await router.route({
jsonrpc: '2.0', id: 1, method: 'tools/call',
params: { name: 'slack/send_message' },
});
expect(toolRes.error?.code).toBe(-32601);
}); });
it('closes all upstreams', async () => { it('closes all upstreams', async () => {