diff --git a/.taskmaster/tasks/tasks.json b/.taskmaster/tasks/tasks.json index 1c40700..6e07dfe 100644 --- a/.taskmaster/tasks/tasks.json +++ b/.taskmaster/tasks/tasks.json @@ -307,7 +307,7 @@ "dependencies": [ "4" ], - "status": "pending", + "status": "done", "subtasks": [ { "id": 1, @@ -367,7 +367,8 @@ "testStrategy": "Run full integration test suite. Verify coverage >85% for project-related files.", "parentId": "undefined" } - ] + ], + "updatedAt": "2026-02-21T04:30:43.622Z" }, { "id": "6", @@ -380,7 +381,7 @@ "3", "4" ], - "status": "pending", + "status": "done", "subtasks": [ { "id": 1, @@ -465,7 +466,8 @@ "testStrategy": "Unit tests for getLogs. Integration test: run container, tail logs, verify output.", "parentId": "undefined" } - ] + ], + "updatedAt": "2026-02-21T04:52:51.544Z" }, { "id": "7", @@ -514,8 +516,9 @@ "dependencies": [ "7" ], - "status": "pending", - "subtasks": [] + "status": "done", + "subtasks": [], + "updatedAt": "2026-02-21T04:55:53.675Z" }, { "id": "9", @@ -555,8 +558,9 @@ "dependencies": [ "1" ], - "status": "pending", - "subtasks": [] + "status": "in-progress", + "subtasks": [], + "updatedAt": "2026-02-21T04:56:01.658Z" }, { "id": "12", @@ -732,9 +736,9 @@ ], "metadata": { "version": "1.0.0", - "lastModified": "2026-02-21T04:26:06.239Z", + "lastModified": "2026-02-21T04:56:01.659Z", "taskCount": 24, - "completedCount": 5, + "completedCount": 8, "tags": [ "master" ] diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 226b8c3..2c8924c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -91,6 +91,10 @@ importers: '@modelcontextprotocol/sdk': specifier: ^1.0.0 version: 1.26.0(zod@3.25.76) + devDependencies: + '@types/node': + specifier: ^25.3.0 + version: 25.3.0 src/mcpd: dependencies: diff --git a/src/local-proxy/package.json b/src/local-proxy/package.json index 1060b69..7ae315e 100644 --- a/src/local-proxy/package.json +++ b/src/local-proxy/package.json @@ -14,7 +14,10 @@ "test:run": "vitest run" }, "dependencies": { - "@modelcontextprotocol/sdk": "^1.0.0", - "@mcpctl/shared": "workspace:*" + "@mcpctl/shared": "workspace:*", + "@modelcontextprotocol/sdk": "^1.0.0" + }, + "devDependencies": { + "@types/node": "^25.3.0" } } diff --git a/src/local-proxy/src/index.ts b/src/local-proxy/src/index.ts index 4d38b7f..5a35dca 100644 --- a/src/local-proxy/src/index.ts +++ b/src/local-proxy/src/index.ts @@ -1,2 +1,14 @@ -// Local LLM proxy entry point -// Will be implemented in Task 11 +// Local LLM proxy - aggregates multiple MCP servers behind a single STDIO endpoint +export { McpRouter } from './router.js'; +export { StdioProxyServer } from './server.js'; +export { StdioUpstream, HttpUpstream } from './upstream/index.js'; +export type { + JsonRpcRequest, + JsonRpcResponse, + JsonRpcNotification, + JsonRpcError, + JsonRpcMessage, + UpstreamConfig, + UpstreamConnection, + ProxyConfig, +} from './types.js'; diff --git a/src/local-proxy/src/router.ts b/src/local-proxy/src/router.ts new file mode 100644 index 0000000..aa2e44b --- /dev/null +++ b/src/local-proxy/src/router.ts @@ -0,0 +1,161 @@ +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from './types.js'; + +/** + * Routes MCP requests to the appropriate upstream server. + * + * The proxy presents a unified MCP interface to clients. Tools from + * all upstreams are merged under namespaced prefixes (e.g., "slack/send_message"). + * + * Routing is done by tool name prefix: "servername/toolname" -> upstream "servername". + */ +export class McpRouter { + private upstreams = new Map(); + private toolToServer = new Map(); + + addUpstream(connection: UpstreamConnection): void { + this.upstreams.set(connection.name, connection); + } + + removeUpstream(name: string): void { + this.upstreams.delete(name); + // Remove tool mappings for this server + for (const [tool, server] of this.toolToServer) { + if (server === name) { + this.toolToServer.delete(tool); + } + } + } + + /** + * Initialize tools from all upstreams by calling tools/list on each. + */ + async discoverTools(): Promise> { + const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = []; + + for (const [serverName, upstream] of this.upstreams) { + try { + const response = await upstream.send({ + jsonrpc: '2.0', + id: `discover-${serverName}`, + method: 'tools/list', + }); + + if (response.result && typeof response.result === 'object' && 'tools' in response.result) { + const tools = (response.result as { tools: Array<{ name: string; description?: string; inputSchema?: unknown }> }).tools; + for (const tool of tools) { + const namespacedName = `${serverName}/${tool.name}`; + this.toolToServer.set(namespacedName, serverName); + allTools.push({ + ...tool, + name: namespacedName, + }); + } + } + } catch { + // Server may be unavailable; skip its tools + } + } + + return allTools; + } + + /** + * Route a tools/call request to the correct upstream. + */ + async routeToolCall(request: JsonRpcRequest): Promise { + const params = request.params as { name?: string; arguments?: unknown } | undefined; + const toolName = params?.name; + if (!toolName) { + return { + jsonrpc: '2.0', + id: request.id, + error: { code: -32602, message: 'Missing tool name in params' }, + }; + } + + const serverName = this.toolToServer.get(toolName); + if (!serverName) { + return { + jsonrpc: '2.0', + id: request.id, + error: { code: -32601, message: `Unknown tool: ${toolName}` }, + }; + } + + const upstream = this.upstreams.get(serverName); + if (!upstream || !upstream.isAlive()) { + return { + jsonrpc: '2.0', + id: request.id, + error: { code: -32603, message: `Upstream '${serverName}' is not available` }, + }; + } + + // Strip the namespace prefix for the upstream call + const originalToolName = toolName.slice(serverName.length + 1); + const upstreamRequest: JsonRpcRequest = { + ...request, + params: { + ...params, + name: originalToolName, + }, + }; + + return upstream.send(upstreamRequest); + } + + /** + * Route a generic request. Handles protocol-level methods locally, + * delegates tool calls to upstreams. + */ + async route(request: JsonRpcRequest): Promise { + switch (request.method) { + case 'initialize': + return { + jsonrpc: '2.0', + id: request.id, + result: { + protocolVersion: '2024-11-05', + serverInfo: { + name: 'mcpctl-proxy', + version: '0.1.0', + }, + capabilities: { + tools: {}, + }, + }, + }; + + case 'tools/list': { + const tools = await this.discoverTools(); + return { + jsonrpc: '2.0', + id: request.id, + result: { tools }, + }; + } + + case 'tools/call': + return this.routeToolCall(request); + + default: + return { + jsonrpc: '2.0', + id: request.id, + error: { code: -32601, message: `Method not found: ${request.method}` }, + }; + } + } + + getUpstreamNames(): string[] { + return [...this.upstreams.keys()]; + } + + async closeAll(): Promise { + for (const upstream of this.upstreams.values()) { + await upstream.close(); + } + this.upstreams.clear(); + this.toolToServer.clear(); + } +} diff --git a/src/local-proxy/src/server.ts b/src/local-proxy/src/server.ts new file mode 100644 index 0000000..47cc0ae --- /dev/null +++ b/src/local-proxy/src/server.ts @@ -0,0 +1,60 @@ +import { createInterface } from 'node:readline'; +import type { JsonRpcRequest, JsonRpcResponse, JsonRpcMessage } from './types.js'; +import type { McpRouter } from './router.js'; + +/** + * STDIO-based MCP proxy server. + * + * Reads JSON-RPC messages from stdin and writes responses to stdout. + * This is the transport that Claude Code uses to communicate with MCP servers. + */ +export class StdioProxyServer { + private running = false; + + constructor(private router: McpRouter) {} + + start(): void { + this.running = true; + + const rl = createInterface({ input: process.stdin }); + + rl.on('line', (line) => { + if (!this.running) return; + + this.handleLine(line).catch((err) => { + process.stderr.write(`Proxy error: ${err}\n`); + }); + }); + + rl.on('close', () => { + this.running = false; + }); + } + + private async handleLine(line: string): Promise { + let msg: JsonRpcMessage; + try { + msg = JSON.parse(line) as JsonRpcMessage; + } catch { + return; // Skip invalid JSON + } + + // Only handle requests (messages with an id) + if (!('id' in msg) || msg.id === undefined) { + // Notification - no response needed + return; + } + + const request = msg as JsonRpcRequest; + const response = await this.router.route(request); + this.sendResponse(response); + } + + private sendResponse(response: JsonRpcResponse): void { + process.stdout.write(JSON.stringify(response) + '\n'); + } + + stop(): void { + this.running = false; + } +} diff --git a/src/local-proxy/src/types.ts b/src/local-proxy/src/types.ts new file mode 100644 index 0000000..cae3870 --- /dev/null +++ b/src/local-proxy/src/types.ts @@ -0,0 +1,72 @@ +/** + * MCP JSON-RPC message types used by the proxy. + */ + +export interface JsonRpcRequest { + jsonrpc: '2.0'; + id: string | number; + method: string; + params?: Record; +} + +export interface JsonRpcResponse { + jsonrpc: '2.0'; + id: string | number; + result?: unknown; + error?: JsonRpcError; +} + +export interface JsonRpcNotification { + jsonrpc: '2.0'; + method: string; + params?: Record; +} + +export interface JsonRpcError { + code: number; + message: string; + data?: unknown; +} + +export type JsonRpcMessage = JsonRpcRequest | JsonRpcResponse | JsonRpcNotification; + +/** Configuration for an upstream MCP server */ +export interface UpstreamConfig { + /** Server ID from mcpd */ + serverId: string; + /** Human-readable name */ + name: string; + /** Transport type */ + transport: 'stdio' | 'sse' | 'streamable-http'; + /** For STDIO: command + args */ + command?: string; + args?: string[]; + /** For HTTP transports: URL */ + url?: string; + /** Environment variables to pass */ + env?: Record; +} + +/** Proxy server configuration */ +export interface ProxyConfig { + /** Port for the proxy to listen on */ + port: number; + /** Host to bind to */ + host: string; + /** mcpd daemon URL for fetching server configs */ + daemonUrl: string; + /** Upstream servers to proxy */ + upstreams: UpstreamConfig[]; +} + +/** A running upstream connection */ +export interface UpstreamConnection { + /** Server name */ + name: string; + /** Send a JSON-RPC request and get a response */ + send(request: JsonRpcRequest): Promise; + /** Disconnect from the upstream */ + close(): Promise; + /** Whether the connection is alive */ + isAlive(): boolean; +} diff --git a/src/local-proxy/src/upstream/http.ts b/src/local-proxy/src/upstream/http.ts new file mode 100644 index 0000000..a732a07 --- /dev/null +++ b/src/local-proxy/src/upstream/http.ts @@ -0,0 +1,71 @@ +import http from 'node:http'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, UpstreamConfig } from '../types.js'; + +/** + * Connects to an MCP server over HTTP (SSE or Streamable HTTP transport). + * Sends JSON-RPC requests via HTTP POST. + */ +export class HttpUpstream implements UpstreamConnection { + readonly name: string; + private url: string; + private alive = true; + + constructor(config: UpstreamConfig) { + this.name = config.name; + if (!config.url) { + throw new Error(`HTTP upstream '${config.name}' has no URL configured`); + } + this.url = config.url; + } + + async send(request: JsonRpcRequest): Promise { + if (!this.alive) { + throw new Error(`Upstream '${this.name}' is closed`); + } + + const body = JSON.stringify(request); + const parsed = new URL(this.url); + + return new Promise((resolve, reject) => { + const opts: http.RequestOptions = { + hostname: parsed.hostname, + port: parsed.port, + path: parsed.pathname, + method: 'POST', + timeout: 30000, + headers: { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }, + }; + + const req = http.request(opts, (res) => { + const chunks: Buffer[] = []; + res.on('data', (chunk: Buffer) => chunks.push(chunk)); + res.on('end', () => { + try { + const raw = Buffer.concat(chunks).toString('utf-8'); + resolve(JSON.parse(raw) as JsonRpcResponse); + } catch (err) { + reject(new Error(`Invalid response from '${this.name}': ${err}`)); + } + }); + }); + req.on('error', reject); + req.on('timeout', () => { + req.destroy(); + reject(new Error(`Request to '${this.name}' timed out`)); + }); + req.write(body); + req.end(); + }); + } + + async close(): Promise { + this.alive = false; + } + + isAlive(): boolean { + return this.alive; + } +} diff --git a/src/local-proxy/src/upstream/index.ts b/src/local-proxy/src/upstream/index.ts new file mode 100644 index 0000000..7715ad3 --- /dev/null +++ b/src/local-proxy/src/upstream/index.ts @@ -0,0 +1,2 @@ +export { StdioUpstream } from './stdio.js'; +export { HttpUpstream } from './http.js'; diff --git a/src/local-proxy/src/upstream/stdio.ts b/src/local-proxy/src/upstream/stdio.ts new file mode 100644 index 0000000..489de37 --- /dev/null +++ b/src/local-proxy/src/upstream/stdio.ts @@ -0,0 +1,100 @@ +import { spawn, type ChildProcess } from 'node:child_process'; +import { createInterface } from 'node:readline'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, UpstreamConfig } from '../types.js'; + +/** + * Connects to an MCP server over STDIO (spawn a process, write JSON-RPC to stdin, read from stdout). + */ +export class StdioUpstream implements UpstreamConnection { + readonly name: string; + private process: ChildProcess | null = null; + private pendingRequests = new Map void; + reject: (err: Error) => void; + }>(); + private alive = false; + + constructor(private config: UpstreamConfig) { + this.name = config.name; + } + + async start(): Promise { + if (!this.config.command) { + throw new Error(`STDIO upstream '${this.name}' has no command configured`); + } + + this.process = spawn(this.config.command, this.config.args ?? [], { + stdio: ['pipe', 'pipe', 'pipe'], + env: { ...process.env, ...this.config.env }, + }); + + this.alive = true; + + this.process.on('exit', () => { + this.alive = false; + // Reject any pending requests + for (const [, pending] of this.pendingRequests) { + pending.reject(new Error(`Upstream '${this.name}' process exited`)); + } + this.pendingRequests.clear(); + }); + + if (this.process.stdout) { + const rl = createInterface({ input: this.process.stdout }); + rl.on('line', (line) => { + try { + const msg = JSON.parse(line) as JsonRpcResponse; + if ('id' in msg && msg.id !== undefined) { + const pending = this.pendingRequests.get(msg.id); + if (pending) { + this.pendingRequests.delete(msg.id); + pending.resolve(msg); + } + } + } catch { + // Skip non-JSON lines + } + }); + } + } + + async send(request: JsonRpcRequest): Promise { + if (!this.process?.stdin || !this.alive) { + throw new Error(`Upstream '${this.name}' is not connected`); + } + + return new Promise((resolve, reject) => { + this.pendingRequests.set(request.id, { resolve, reject }); + + const timeout = setTimeout(() => { + this.pendingRequests.delete(request.id); + reject(new Error(`Request to '${this.name}' timed out`)); + }, 30000); + + this.pendingRequests.set(request.id, { + resolve: (res) => { + clearTimeout(timeout); + resolve(res); + }, + reject: (err) => { + clearTimeout(timeout); + reject(err); + }, + }); + + this.process!.stdin!.write(JSON.stringify(request) + '\n'); + }); + } + + async close(): Promise { + if (this.process) { + this.process.kill('SIGTERM'); + this.alive = false; + this.process = null; + } + } + + isAlive(): boolean { + return this.alive; + } +} diff --git a/src/local-proxy/tests/router.test.ts b/src/local-proxy/tests/router.test.ts new file mode 100644 index 0000000..af8264f --- /dev/null +++ b/src/local-proxy/tests/router.test.ts @@ -0,0 +1,217 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { McpRouter } from '../src/router.js'; +import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js'; + +function mockUpstream(name: string, tools: Array<{ name: string; description?: string }> = []): UpstreamConnection { + return { + name, + isAlive: vi.fn(() => true), + close: vi.fn(async () => {}), + send: vi.fn(async (req: JsonRpcRequest): Promise => { + if (req.method === 'tools/list') { + return { + jsonrpc: '2.0', + id: req.id, + result: { tools }, + }; + } + if (req.method === 'tools/call') { + return { + jsonrpc: '2.0', + id: req.id, + result: { + content: [{ type: 'text', text: `Called ${(req.params as Record)?.name}` }], + }, + }; + } + return { jsonrpc: '2.0', id: req.id, error: { code: -32601, message: 'Not found' } }; + }), + }; +} + +describe('McpRouter', () => { + let router: McpRouter; + + beforeEach(() => { + router = new McpRouter(); + }); + + describe('initialize', () => { + it('responds with server info and capabilities', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'initialize', + }); + + expect(res.result).toBeDefined(); + const result = res.result as Record; + expect(result['protocolVersion']).toBe('2024-11-05'); + expect((result['serverInfo'] as Record)['name']).toBe('mcpctl-proxy'); + }); + }); + + describe('tools/list', () => { + it('returns empty tools when no upstreams', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'tools/list', + }); + + const result = res.result as { tools: unknown[] }; + expect(result.tools).toEqual([]); + }); + + it('discovers and namespaces tools from upstreams', async () => { + router.addUpstream(mockUpstream('slack', [ + { name: 'send_message', description: 'Send a message' }, + { name: 'list_channels', description: 'List channels' }, + ])); + router.addUpstream(mockUpstream('github', [ + { name: 'create_issue', description: 'Create an issue' }, + ])); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'tools/list', + }); + + const result = res.result as { tools: Array<{ name: string }> }; + expect(result.tools).toHaveLength(3); + expect(result.tools.map((t) => t.name)).toContain('slack/send_message'); + expect(result.tools.map((t) => t.name)).toContain('slack/list_channels'); + expect(result.tools.map((t) => t.name)).toContain('github/create_issue'); + }); + + it('skips unavailable upstreams', async () => { + const failingUpstream = mockUpstream('failing'); + vi.mocked(failingUpstream.send).mockRejectedValue(new Error('Connection refused')); + router.addUpstream(failingUpstream); + + router.addUpstream(mockUpstream('working', [ + { name: 'do_thing', description: 'Does a thing' }, + ])); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'tools/list', + }); + + const result = res.result as { tools: Array<{ name: string }> }; + expect(result.tools).toHaveLength(1); + expect(result.tools[0]?.name).toBe('working/do_thing'); + }); + }); + + describe('tools/call', () => { + it('routes call to correct upstream', async () => { + const slack = mockUpstream('slack', [{ name: 'send_message' }]); + router.addUpstream(slack); + await router.discoverTools(); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'tools/call', + params: { name: 'slack/send_message', arguments: { channel: '#general', text: 'hello' } }, + }); + + expect(res.result).toBeDefined(); + // Verify the upstream received the call with de-namespaced tool name + expect(vi.mocked(slack.send)).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'tools/call', + params: expect.objectContaining({ name: 'send_message' }), + }), + ); + }); + + it('returns error for unknown tool', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'tools/call', + params: { name: 'unknown/tool' }, + }); + + expect(res.error).toBeDefined(); + expect(res.error?.code).toBe(-32601); + }); + + it('returns error when tool name is missing', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'tools/call', + params: {}, + }); + + expect(res.error).toBeDefined(); + expect(res.error?.code).toBe(-32602); + }); + + it('returns error when upstream is dead', async () => { + const slack = mockUpstream('slack', [{ name: 'send_message' }]); + router.addUpstream(slack); + await router.discoverTools(); + + vi.mocked(slack.isAlive).mockReturnValue(false); + + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'tools/call', + params: { name: 'slack/send_message' }, + }); + + expect(res.error).toBeDefined(); + expect(res.error?.code).toBe(-32603); + }); + }); + + describe('unknown methods', () => { + it('returns method not found error', async () => { + const res = await router.route({ + jsonrpc: '2.0', + id: 1, + method: 'resources/list', + }); + + expect(res.error).toBeDefined(); + expect(res.error?.code).toBe(-32601); + }); + }); + + describe('upstream management', () => { + it('lists upstream names', () => { + router.addUpstream(mockUpstream('slack')); + router.addUpstream(mockUpstream('github')); + expect(router.getUpstreamNames()).toEqual(['slack', 'github']); + }); + + it('removes upstream', async () => { + const slack = mockUpstream('slack', [{ name: 'send_message' }]); + router.addUpstream(slack); + await router.discoverTools(); + + router.removeUpstream('slack'); + expect(router.getUpstreamNames()).toEqual([]); + }); + + it('closes all upstreams', async () => { + const slack = mockUpstream('slack'); + const github = mockUpstream('github'); + router.addUpstream(slack); + router.addUpstream(github); + + await router.closeAll(); + + expect(slack.close).toHaveBeenCalled(); + expect(github.close).toHaveBeenCalled(); + expect(router.getUpstreamNames()).toEqual([]); + }); + }); +}); diff --git a/src/local-proxy/tsconfig.json b/src/local-proxy/tsconfig.json index 6d97847..4c4fbfc 100644 --- a/src/local-proxy/tsconfig.json +++ b/src/local-proxy/tsconfig.json @@ -2,7 +2,8 @@ "extends": "../../tsconfig.base.json", "compilerOptions": { "rootDir": "src", - "outDir": "dist" + "outDir": "dist", + "types": ["node"] }, "include": ["src/**/*.ts"], "references": [