feat: implement local LLM proxy architecture with MCP routing

Add STDIO and HTTP upstream transports, McpRouter with tool namespacing
and discovery, and StdioProxyServer for aggregating multiple MCP servers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Michal
2026-02-21 05:00:20 +00:00
parent 1b8b886995
commit 4b67a9cc15
12 changed files with 722 additions and 15 deletions

View File

@@ -307,7 +307,7 @@
"dependencies": [
"4"
],
"status": "pending",
"status": "done",
"subtasks": [
{
"id": 1,
@@ -367,7 +367,8 @@
"testStrategy": "Run full integration test suite. Verify coverage >85% for project-related files.",
"parentId": "undefined"
}
]
],
"updatedAt": "2026-02-21T04:30:43.622Z"
},
{
"id": "6",
@@ -380,7 +381,7 @@
"3",
"4"
],
"status": "pending",
"status": "done",
"subtasks": [
{
"id": 1,
@@ -465,7 +466,8 @@
"testStrategy": "Unit tests for getLogs. Integration test: run container, tail logs, verify output.",
"parentId": "undefined"
}
]
],
"updatedAt": "2026-02-21T04:52:51.544Z"
},
{
"id": "7",
@@ -514,8 +516,9 @@
"dependencies": [
"7"
],
"status": "pending",
"subtasks": []
"status": "done",
"subtasks": [],
"updatedAt": "2026-02-21T04:55:53.675Z"
},
{
"id": "9",
@@ -555,8 +558,9 @@
"dependencies": [
"1"
],
"status": "pending",
"subtasks": []
"status": "in-progress",
"subtasks": [],
"updatedAt": "2026-02-21T04:56:01.658Z"
},
{
"id": "12",
@@ -732,9 +736,9 @@
],
"metadata": {
"version": "1.0.0",
"lastModified": "2026-02-21T04:26:06.239Z",
"lastModified": "2026-02-21T04:56:01.659Z",
"taskCount": 24,
"completedCount": 5,
"completedCount": 8,
"tags": [
"master"
]

4
pnpm-lock.yaml generated
View File

@@ -91,6 +91,10 @@ importers:
'@modelcontextprotocol/sdk':
specifier: ^1.0.0
version: 1.26.0(zod@3.25.76)
devDependencies:
'@types/node':
specifier: ^25.3.0
version: 25.3.0
src/mcpd:
dependencies:

View File

@@ -14,7 +14,10 @@
"test:run": "vitest run"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"@mcpctl/shared": "workspace:*"
"@mcpctl/shared": "workspace:*",
"@modelcontextprotocol/sdk": "^1.0.0"
},
"devDependencies": {
"@types/node": "^25.3.0"
}
}

View File

@@ -1,2 +1,14 @@
// Local LLM proxy entry point
// Will be implemented in Task 11
// Local LLM proxy - aggregates multiple MCP servers behind a single STDIO endpoint
export { McpRouter } from './router.js';
export { StdioProxyServer } from './server.js';
export { StdioUpstream, HttpUpstream } from './upstream/index.js';
export type {
JsonRpcRequest,
JsonRpcResponse,
JsonRpcNotification,
JsonRpcError,
JsonRpcMessage,
UpstreamConfig,
UpstreamConnection,
ProxyConfig,
} from './types.js';

View File

@@ -0,0 +1,161 @@
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from './types.js';
/**
* Routes MCP requests to the appropriate upstream server.
*
* The proxy presents a unified MCP interface to clients. Tools from
* all upstreams are merged under namespaced prefixes (e.g., "slack/send_message").
*
* Routing is done by tool name prefix: "servername/toolname" -> upstream "servername".
*/
export class McpRouter {
private upstreams = new Map<string, UpstreamConnection>();
private toolToServer = new Map<string, string>();
addUpstream(connection: UpstreamConnection): void {
this.upstreams.set(connection.name, connection);
}
removeUpstream(name: string): void {
this.upstreams.delete(name);
// Remove tool mappings for this server
for (const [tool, server] of this.toolToServer) {
if (server === name) {
this.toolToServer.delete(tool);
}
}
}
/**
* Initialize tools from all upstreams by calling tools/list on each.
*/
async discoverTools(): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
for (const [serverName, upstream] of this.upstreams) {
try {
const response = await upstream.send({
jsonrpc: '2.0',
id: `discover-${serverName}`,
method: 'tools/list',
});
if (response.result && typeof response.result === 'object' && 'tools' in response.result) {
const tools = (response.result as { tools: Array<{ name: string; description?: string; inputSchema?: unknown }> }).tools;
for (const tool of tools) {
const namespacedName = `${serverName}/${tool.name}`;
this.toolToServer.set(namespacedName, serverName);
allTools.push({
...tool,
name: namespacedName,
});
}
}
} catch {
// Server may be unavailable; skip its tools
}
}
return allTools;
}
/**
* Route a tools/call request to the correct upstream.
*/
async routeToolCall(request: JsonRpcRequest): Promise<JsonRpcResponse> {
const params = request.params as { name?: string; arguments?: unknown } | undefined;
const toolName = params?.name;
if (!toolName) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32602, message: 'Missing tool name in params' },
};
}
const serverName = this.toolToServer.get(toolName);
if (!serverName) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32601, message: `Unknown tool: ${toolName}` },
};
}
const upstream = this.upstreams.get(serverName);
if (!upstream || !upstream.isAlive()) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32603, message: `Upstream '${serverName}' is not available` },
};
}
// Strip the namespace prefix for the upstream call
const originalToolName = toolName.slice(serverName.length + 1);
const upstreamRequest: JsonRpcRequest = {
...request,
params: {
...params,
name: originalToolName,
},
};
return upstream.send(upstreamRequest);
}
/**
* Route a generic request. Handles protocol-level methods locally,
* delegates tool calls to upstreams.
*/
async route(request: JsonRpcRequest): Promise<JsonRpcResponse> {
switch (request.method) {
case 'initialize':
return {
jsonrpc: '2.0',
id: request.id,
result: {
protocolVersion: '2024-11-05',
serverInfo: {
name: 'mcpctl-proxy',
version: '0.1.0',
},
capabilities: {
tools: {},
},
},
};
case 'tools/list': {
const tools = await this.discoverTools();
return {
jsonrpc: '2.0',
id: request.id,
result: { tools },
};
}
case 'tools/call':
return this.routeToolCall(request);
default:
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32601, message: `Method not found: ${request.method}` },
};
}
}
getUpstreamNames(): string[] {
return [...this.upstreams.keys()];
}
async closeAll(): Promise<void> {
for (const upstream of this.upstreams.values()) {
await upstream.close();
}
this.upstreams.clear();
this.toolToServer.clear();
}
}

View File

@@ -0,0 +1,60 @@
import { createInterface } from 'node:readline';
import type { JsonRpcRequest, JsonRpcResponse, JsonRpcMessage } from './types.js';
import type { McpRouter } from './router.js';
/**
* STDIO-based MCP proxy server.
*
* Reads JSON-RPC messages from stdin and writes responses to stdout.
* This is the transport that Claude Code uses to communicate with MCP servers.
*/
export class StdioProxyServer {
private running = false;
constructor(private router: McpRouter) {}
start(): void {
this.running = true;
const rl = createInterface({ input: process.stdin });
rl.on('line', (line) => {
if (!this.running) return;
this.handleLine(line).catch((err) => {
process.stderr.write(`Proxy error: ${err}\n`);
});
});
rl.on('close', () => {
this.running = false;
});
}
private async handleLine(line: string): Promise<void> {
let msg: JsonRpcMessage;
try {
msg = JSON.parse(line) as JsonRpcMessage;
} catch {
return; // Skip invalid JSON
}
// Only handle requests (messages with an id)
if (!('id' in msg) || msg.id === undefined) {
// Notification - no response needed
return;
}
const request = msg as JsonRpcRequest;
const response = await this.router.route(request);
this.sendResponse(response);
}
private sendResponse(response: JsonRpcResponse): void {
process.stdout.write(JSON.stringify(response) + '\n');
}
stop(): void {
this.running = false;
}
}

View File

@@ -0,0 +1,72 @@
/**
* MCP JSON-RPC message types used by the proxy.
*/
export interface JsonRpcRequest {
jsonrpc: '2.0';
id: string | number;
method: string;
params?: Record<string, unknown>;
}
export interface JsonRpcResponse {
jsonrpc: '2.0';
id: string | number;
result?: unknown;
error?: JsonRpcError;
}
export interface JsonRpcNotification {
jsonrpc: '2.0';
method: string;
params?: Record<string, unknown>;
}
export interface JsonRpcError {
code: number;
message: string;
data?: unknown;
}
export type JsonRpcMessage = JsonRpcRequest | JsonRpcResponse | JsonRpcNotification;
/** Configuration for an upstream MCP server */
export interface UpstreamConfig {
/** Server ID from mcpd */
serverId: string;
/** Human-readable name */
name: string;
/** Transport type */
transport: 'stdio' | 'sse' | 'streamable-http';
/** For STDIO: command + args */
command?: string;
args?: string[];
/** For HTTP transports: URL */
url?: string;
/** Environment variables to pass */
env?: Record<string, string>;
}
/** Proxy server configuration */
export interface ProxyConfig {
/** Port for the proxy to listen on */
port: number;
/** Host to bind to */
host: string;
/** mcpd daemon URL for fetching server configs */
daemonUrl: string;
/** Upstream servers to proxy */
upstreams: UpstreamConfig[];
}
/** A running upstream connection */
export interface UpstreamConnection {
/** Server name */
name: string;
/** Send a JSON-RPC request and get a response */
send(request: JsonRpcRequest): Promise<JsonRpcResponse>;
/** Disconnect from the upstream */
close(): Promise<void>;
/** Whether the connection is alive */
isAlive(): boolean;
}

View File

@@ -0,0 +1,71 @@
import http from 'node:http';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, UpstreamConfig } from '../types.js';
/**
* Connects to an MCP server over HTTP (SSE or Streamable HTTP transport).
* Sends JSON-RPC requests via HTTP POST.
*/
export class HttpUpstream implements UpstreamConnection {
readonly name: string;
private url: string;
private alive = true;
constructor(config: UpstreamConfig) {
this.name = config.name;
if (!config.url) {
throw new Error(`HTTP upstream '${config.name}' has no URL configured`);
}
this.url = config.url;
}
async send(request: JsonRpcRequest): Promise<JsonRpcResponse> {
if (!this.alive) {
throw new Error(`Upstream '${this.name}' is closed`);
}
const body = JSON.stringify(request);
const parsed = new URL(this.url);
return new Promise((resolve, reject) => {
const opts: http.RequestOptions = {
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname,
method: 'POST',
timeout: 30000,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
};
const req = http.request(opts, (res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () => {
try {
const raw = Buffer.concat(chunks).toString('utf-8');
resolve(JSON.parse(raw) as JsonRpcResponse);
} catch (err) {
reject(new Error(`Invalid response from '${this.name}': ${err}`));
}
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error(`Request to '${this.name}' timed out`));
});
req.write(body);
req.end();
});
}
async close(): Promise<void> {
this.alive = false;
}
isAlive(): boolean {
return this.alive;
}
}

View File

@@ -0,0 +1,2 @@
export { StdioUpstream } from './stdio.js';
export { HttpUpstream } from './http.js';

View File

@@ -0,0 +1,100 @@
import { spawn, type ChildProcess } from 'node:child_process';
import { createInterface } from 'node:readline';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, UpstreamConfig } from '../types.js';
/**
* Connects to an MCP server over STDIO (spawn a process, write JSON-RPC to stdin, read from stdout).
*/
export class StdioUpstream implements UpstreamConnection {
readonly name: string;
private process: ChildProcess | null = null;
private pendingRequests = new Map<string | number, {
resolve: (res: JsonRpcResponse) => void;
reject: (err: Error) => void;
}>();
private alive = false;
constructor(private config: UpstreamConfig) {
this.name = config.name;
}
async start(): Promise<void> {
if (!this.config.command) {
throw new Error(`STDIO upstream '${this.name}' has no command configured`);
}
this.process = spawn(this.config.command, this.config.args ?? [], {
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env, ...this.config.env },
});
this.alive = true;
this.process.on('exit', () => {
this.alive = false;
// Reject any pending requests
for (const [, pending] of this.pendingRequests) {
pending.reject(new Error(`Upstream '${this.name}' process exited`));
}
this.pendingRequests.clear();
});
if (this.process.stdout) {
const rl = createInterface({ input: this.process.stdout });
rl.on('line', (line) => {
try {
const msg = JSON.parse(line) as JsonRpcResponse;
if ('id' in msg && msg.id !== undefined) {
const pending = this.pendingRequests.get(msg.id);
if (pending) {
this.pendingRequests.delete(msg.id);
pending.resolve(msg);
}
}
} catch {
// Skip non-JSON lines
}
});
}
}
async send(request: JsonRpcRequest): Promise<JsonRpcResponse> {
if (!this.process?.stdin || !this.alive) {
throw new Error(`Upstream '${this.name}' is not connected`);
}
return new Promise((resolve, reject) => {
this.pendingRequests.set(request.id, { resolve, reject });
const timeout = setTimeout(() => {
this.pendingRequests.delete(request.id);
reject(new Error(`Request to '${this.name}' timed out`));
}, 30000);
this.pendingRequests.set(request.id, {
resolve: (res) => {
clearTimeout(timeout);
resolve(res);
},
reject: (err) => {
clearTimeout(timeout);
reject(err);
},
});
this.process!.stdin!.write(JSON.stringify(request) + '\n');
});
}
async close(): Promise<void> {
if (this.process) {
this.process.kill('SIGTERM');
this.alive = false;
this.process = null;
}
}
isAlive(): boolean {
return this.alive;
}
}

View File

@@ -0,0 +1,217 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { McpRouter } from '../src/router.js';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js';
function mockUpstream(name: string, tools: Array<{ name: string; description?: string }> = []): UpstreamConnection {
return {
name,
isAlive: vi.fn(() => true),
close: vi.fn(async () => {}),
send: vi.fn(async (req: JsonRpcRequest): Promise<JsonRpcResponse> => {
if (req.method === 'tools/list') {
return {
jsonrpc: '2.0',
id: req.id,
result: { tools },
};
}
if (req.method === 'tools/call') {
return {
jsonrpc: '2.0',
id: req.id,
result: {
content: [{ type: 'text', text: `Called ${(req.params as Record<string, unknown>)?.name}` }],
},
};
}
return { jsonrpc: '2.0', id: req.id, error: { code: -32601, message: 'Not found' } };
}),
};
}
describe('McpRouter', () => {
let router: McpRouter;
beforeEach(() => {
router = new McpRouter();
});
describe('initialize', () => {
it('responds with server info and capabilities', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'initialize',
});
expect(res.result).toBeDefined();
const result = res.result as Record<string, unknown>;
expect(result['protocolVersion']).toBe('2024-11-05');
expect((result['serverInfo'] as Record<string, unknown>)['name']).toBe('mcpctl-proxy');
});
});
describe('tools/list', () => {
it('returns empty tools when no upstreams', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/list',
});
const result = res.result as { tools: unknown[] };
expect(result.tools).toEqual([]);
});
it('discovers and namespaces tools from upstreams', async () => {
router.addUpstream(mockUpstream('slack', [
{ name: 'send_message', description: 'Send a message' },
{ name: 'list_channels', description: 'List channels' },
]));
router.addUpstream(mockUpstream('github', [
{ name: 'create_issue', description: 'Create an issue' },
]));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/list',
});
const result = res.result as { tools: Array<{ name: string }> };
expect(result.tools).toHaveLength(3);
expect(result.tools.map((t) => t.name)).toContain('slack/send_message');
expect(result.tools.map((t) => t.name)).toContain('slack/list_channels');
expect(result.tools.map((t) => t.name)).toContain('github/create_issue');
});
it('skips unavailable upstreams', async () => {
const failingUpstream = mockUpstream('failing');
vi.mocked(failingUpstream.send).mockRejectedValue(new Error('Connection refused'));
router.addUpstream(failingUpstream);
router.addUpstream(mockUpstream('working', [
{ name: 'do_thing', description: 'Does a thing' },
]));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/list',
});
const result = res.result as { tools: Array<{ name: string }> };
expect(result.tools).toHaveLength(1);
expect(result.tools[0]?.name).toBe('working/do_thing');
});
});
describe('tools/call', () => {
it('routes call to correct upstream', async () => {
const slack = mockUpstream('slack', [{ name: 'send_message' }]);
router.addUpstream(slack);
await router.discoverTools();
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: { name: 'slack/send_message', arguments: { channel: '#general', text: 'hello' } },
});
expect(res.result).toBeDefined();
// Verify the upstream received the call with de-namespaced tool name
expect(vi.mocked(slack.send)).toHaveBeenCalledWith(
expect.objectContaining({
method: 'tools/call',
params: expect.objectContaining({ name: 'send_message' }),
}),
);
});
it('returns error for unknown tool', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: { name: 'unknown/tool' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
it('returns error when tool name is missing', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: {},
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32602);
});
it('returns error when upstream is dead', async () => {
const slack = mockUpstream('slack', [{ name: 'send_message' }]);
router.addUpstream(slack);
await router.discoverTools();
vi.mocked(slack.isAlive).mockReturnValue(false);
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: { name: 'slack/send_message' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32603);
});
});
describe('unknown methods', () => {
it('returns method not found error', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/list',
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
});
describe('upstream management', () => {
it('lists upstream names', () => {
router.addUpstream(mockUpstream('slack'));
router.addUpstream(mockUpstream('github'));
expect(router.getUpstreamNames()).toEqual(['slack', 'github']);
});
it('removes upstream', async () => {
const slack = mockUpstream('slack', [{ name: 'send_message' }]);
router.addUpstream(slack);
await router.discoverTools();
router.removeUpstream('slack');
expect(router.getUpstreamNames()).toEqual([]);
});
it('closes all upstreams', async () => {
const slack = mockUpstream('slack');
const github = mockUpstream('github');
router.addUpstream(slack);
router.addUpstream(github);
await router.closeAll();
expect(slack.close).toHaveBeenCalled();
expect(github.close).toHaveBeenCalled();
expect(router.getUpstreamNames()).toEqual([]);
});
});
});

View File

@@ -2,7 +2,8 @@
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
"outDir": "dist",
"types": ["node"]
},
"include": ["src/**/*.ts"],
"references": [