feat: implement v2 3-tier architecture (mcpctl → mcplocal → mcpd)
Some checks failed
CI / lint (pull_request) Has been cancelled
CI / typecheck (pull_request) Has been cancelled
CI / test (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / package (pull_request) Has been cancelled

- Rename local-proxy to mcplocal with HTTP server, LLM pipeline, mcpd discovery
- Add LLM pre-processing: token estimation, filter cache, metrics, Gemini CLI + DeepSeek providers
- Add mcpd auth (login/logout) and MCP proxy endpoints
- Update CLI: dual URLs (mcplocalUrl/mcpdUrl), auth commands, --direct flag
- Add tiered health monitoring, shell completions, e2e integration tests
- 57 test files, 597 tests passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Michal
2026-02-22 11:42:06 +00:00
parent a4fe5fdbe2
commit b8c5cf718a
82 changed files with 5832 additions and 123 deletions

25
src/mcplocal/package.json Normal file
View File

@@ -0,0 +1,25 @@
{
"name": "@mcpctl/mcplocal",
"version": "0.1.0",
"private": true,
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"build": "tsc --build",
"clean": "rimraf dist",
"dev": "tsx watch src/index.ts",
"start": "node dist/index.js",
"test": "vitest",
"test:run": "vitest run"
},
"dependencies": {
"@fastify/cors": "^10.0.0",
"@mcpctl/shared": "workspace:*",
"@modelcontextprotocol/sdk": "^1.0.0",
"fastify": "^5.0.0"
},
"devDependencies": {
"@types/node": "^25.3.0"
}
}

View File

@@ -0,0 +1,39 @@
import type { McpdClient } from './http/mcpd-client.js';
import type { McpRouter } from './router.js';
import { McpdUpstream } from './upstream/mcpd.js';
interface McpdServer {
id: string;
name: string;
transport: string;
status?: string;
}
/**
* Discovers MCP servers from mcpd and registers them as upstreams in the router.
* Called periodically or on demand to keep the router in sync with mcpd.
*/
export async function refreshUpstreams(router: McpRouter, mcpdClient: McpdClient): Promise<string[]> {
const servers = await mcpdClient.get<McpdServer[]>('/api/v1/servers');
const registered: string[] = [];
// Remove stale upstreams
const currentNames = new Set(router.getUpstreamNames());
const serverNames = new Set(servers.map((s) => s.name));
for (const name of currentNames) {
if (!serverNames.has(name)) {
router.removeUpstream(name);
}
}
// Add/update upstreams for each server
for (const server of servers) {
if (!currentNames.has(server.name)) {
const upstream = new McpdUpstream(server.id, server.name, mcpdClient);
router.addUpstream(upstream);
}
registered.push(server.name);
}
return registered;
}

126
src/mcplocal/src/health.ts Normal file
View File

@@ -0,0 +1,126 @@
import { EventEmitter } from 'node:events';
import type { UpstreamConnection, JsonRpcResponse } from './types.js';
export type HealthState = 'healthy' | 'degraded' | 'disconnected';
export interface HealthStatus {
name: string;
state: HealthState;
lastCheck: number;
consecutiveFailures: number;
}
export interface HealthMonitorOptions {
/** Interval between health checks in ms (default: 30000) */
intervalMs?: number;
/** Number of failures before marking disconnected (default: 3) */
failureThreshold?: number;
}
/**
* Monitors upstream connection health with periodic pings.
* Emits 'change' events when an upstream's health state changes.
*/
export class HealthMonitor extends EventEmitter {
private statuses = new Map<string, HealthStatus>();
private upstreams = new Map<string, UpstreamConnection>();
private timer: ReturnType<typeof setInterval> | null = null;
private readonly intervalMs: number;
private readonly failureThreshold: number;
constructor(opts?: HealthMonitorOptions) {
super();
this.intervalMs = opts?.intervalMs ?? 30000;
this.failureThreshold = opts?.failureThreshold ?? 3;
}
track(upstream: UpstreamConnection): void {
this.upstreams.set(upstream.name, upstream);
this.statuses.set(upstream.name, {
name: upstream.name,
state: upstream.isAlive() ? 'healthy' : 'disconnected',
lastCheck: Date.now(),
consecutiveFailures: 0,
});
}
untrack(name: string): void {
this.upstreams.delete(name);
this.statuses.delete(name);
}
start(): void {
if (this.timer) return;
this.timer = setInterval(() => void this.checkAll(), this.intervalMs);
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
getStatus(name: string): HealthStatus | undefined {
return this.statuses.get(name);
}
getAllStatuses(): HealthStatus[] {
return [...this.statuses.values()];
}
isHealthy(name: string): boolean {
const status = this.statuses.get(name);
return status?.state === 'healthy';
}
async checkAll(): Promise<void> {
const checks = [...this.upstreams.entries()].map(([name, upstream]) =>
this.checkOne(name, upstream),
);
await Promise.allSettled(checks);
}
private async checkOne(name: string, upstream: UpstreamConnection): Promise<void> {
const status = this.statuses.get(name);
if (!status) return;
const oldState = status.state;
if (!upstream.isAlive()) {
status.consecutiveFailures = this.failureThreshold;
status.state = 'disconnected';
status.lastCheck = Date.now();
if (oldState !== 'disconnected') {
this.emit('change', { name, oldState, newState: 'disconnected' });
}
return;
}
try {
const response: JsonRpcResponse = await upstream.send({
jsonrpc: '2.0',
id: `health-${name}-${Date.now()}`,
method: 'ping',
});
// A response (even an error for unknown method) means the server is alive
if (response) {
status.consecutiveFailures = 0;
status.state = 'healthy';
}
} catch {
status.consecutiveFailures++;
if (status.consecutiveFailures >= this.failureThreshold) {
status.state = 'disconnected';
} else {
status.state = 'degraded';
}
}
status.lastCheck = Date.now();
if (oldState !== status.state) {
this.emit('change', { name, oldState, newState: status.state });
}
}
}

View File

@@ -0,0 +1,8 @@
export { TieredHealthMonitor } from './tiered.js';
export type {
TieredHealthStatus,
TieredHealthMonitorDeps,
McplocalHealth,
McpdHealth,
InstanceHealth,
} from './tiered.js';

View File

@@ -0,0 +1,98 @@
import type { McpdClient } from '../http/mcpd-client.js';
import type { ProviderRegistry } from '../providers/registry.js';
export interface McplocalHealth {
status: 'healthy' | 'degraded';
uptime: number;
llmProvider: string | null;
}
export interface McpdHealth {
status: 'connected' | 'disconnected';
url: string;
}
export interface InstanceHealth {
name: string;
status: string;
}
export interface TieredHealthStatus {
mcplocal: McplocalHealth;
mcpd: McpdHealth;
instances: InstanceHealth[];
}
export interface TieredHealthMonitorDeps {
mcpdClient: McpdClient | null;
providerRegistry: ProviderRegistry;
mcpdUrl: string;
}
/**
* Monitors health across all tiers: mcplocal itself, the mcpd daemon, and MCP server instances.
* Aggregates status from multiple sources into a single TieredHealthStatus.
*/
export class TieredHealthMonitor {
private readonly mcpdClient: McpdClient | null;
private readonly providerRegistry: ProviderRegistry;
private readonly mcpdUrl: string;
private readonly startTime: number;
constructor(deps: TieredHealthMonitorDeps) {
this.mcpdClient = deps.mcpdClient;
this.providerRegistry = deps.providerRegistry;
this.mcpdUrl = deps.mcpdUrl;
this.startTime = Date.now();
}
async checkHealth(): Promise<TieredHealthStatus> {
const [mcpdHealth, instances] = await Promise.all([
this.checkMcpd(),
this.fetchInstances(),
]);
const mcplocalHealth = this.checkMcplocal();
return {
mcplocal: mcplocalHealth,
mcpd: mcpdHealth,
instances,
};
}
private checkMcplocal(): McplocalHealth {
const activeProvider = this.providerRegistry.getActive();
return {
status: 'healthy',
uptime: (Date.now() - this.startTime) / 1000,
llmProvider: activeProvider?.name ?? null,
};
}
private async checkMcpd(): Promise<McpdHealth> {
if (this.mcpdClient === null) {
return { status: 'disconnected', url: this.mcpdUrl };
}
try {
await this.mcpdClient.get<unknown>('/health');
return { status: 'connected', url: this.mcpdUrl };
} catch {
return { status: 'disconnected', url: this.mcpdUrl };
}
}
private async fetchInstances(): Promise<InstanceHealth[]> {
if (this.mcpdClient === null) {
return [];
}
try {
const response = await this.mcpdClient.get<{ instances: InstanceHealth[] }>('/instances');
return response.instances;
} catch {
return [];
}
}
}

View File

@@ -0,0 +1,32 @@
/** Configuration for the mcplocal HTTP server. */
export interface HttpConfig {
/** Port for the HTTP server (default: 3200) */
httpPort: number;
/** Host to bind to (default: 127.0.0.1) */
httpHost: string;
/** URL of the mcpd daemon (default: http://localhost:3100) */
mcpdUrl: string;
/** Bearer token for authenticating with mcpd */
mcpdToken: string;
/** Log level (default: info) */
logLevel: 'fatal' | 'error' | 'warn' | 'info' | 'debug' | 'trace';
}
const DEFAULT_HTTP_PORT = 3200;
const DEFAULT_HTTP_HOST = '127.0.0.1';
const DEFAULT_MCPD_URL = 'http://localhost:3100';
const DEFAULT_MCPD_TOKEN = '';
const DEFAULT_LOG_LEVEL = 'info';
export function loadHttpConfig(env: Record<string, string | undefined> = process.env): HttpConfig {
const portStr = env['MCPLOCAL_HTTP_PORT'];
const port = portStr !== undefined ? parseInt(portStr, 10) : DEFAULT_HTTP_PORT;
return {
httpPort: Number.isFinite(port) ? port : DEFAULT_HTTP_PORT,
httpHost: env['MCPLOCAL_HTTP_HOST'] ?? DEFAULT_HTTP_HOST,
mcpdUrl: env['MCPLOCAL_MCPD_URL'] ?? DEFAULT_MCPD_URL,
mcpdToken: env['MCPLOCAL_MCPD_TOKEN'] ?? DEFAULT_MCPD_TOKEN,
logLevel: (env['MCPLOCAL_LOG_LEVEL'] as HttpConfig['logLevel'] | undefined) ?? DEFAULT_LOG_LEVEL,
};
}

View File

@@ -0,0 +1,6 @@
export { createHttpServer } from './server.js';
export type { HttpServerDeps } from './server.js';
export { loadHttpConfig } from './config.js';
export type { HttpConfig } from './config.js';
export { McpdClient, AuthenticationError, ConnectionError } from './mcpd-client.js';
export { registerProxyRoutes } from './routes/proxy.js';

View File

@@ -0,0 +1,105 @@
/**
* HTTP client for communicating with the mcpd daemon.
* Wraps fetch calls with auth headers and error handling.
*/
/** Thrown when mcpd returns a 401 Unauthorized response. */
export class AuthenticationError extends Error {
constructor(message = 'Authentication failed: invalid or expired token') {
super(message);
this.name = 'AuthenticationError';
}
}
/** Thrown when mcpd is unreachable (connection refused, DNS failure, etc.). */
export class ConnectionError extends Error {
constructor(url: string, cause?: unknown) {
const msg = `Cannot connect to mcpd at ${url}`;
super(cause instanceof Error ? `${msg}: ${cause.message}` : msg);
this.name = 'ConnectionError';
}
}
export class McpdClient {
private readonly baseUrl: string;
private readonly token: string;
constructor(baseUrl: string, token: string) {
// Strip trailing slash for consistent URL joining
this.baseUrl = baseUrl.replace(/\/+$/, '');
this.token = token;
}
async get<T>(path: string): Promise<T> {
return this.request<T>('GET', path);
}
async post<T>(path: string, body?: unknown): Promise<T> {
return this.request<T>('POST', path, body);
}
async put<T>(path: string, body?: unknown): Promise<T> {
return this.request<T>('PUT', path, body);
}
async delete(path: string): Promise<void> {
await this.request<unknown>('DELETE', path);
}
/**
* Forward a raw request to mcpd. Returns the status code and body
* so the proxy route can relay them directly.
*/
async forward(
method: string,
path: string,
query: string,
body: unknown | undefined,
): Promise<{ status: number; body: unknown }> {
const url = `${this.baseUrl}${path}${query ? `?${query}` : ''}`;
const headers: Record<string, string> = {
'Authorization': `Bearer ${this.token}`,
'Accept': 'application/json',
};
const init: RequestInit = { method, headers };
if (body !== undefined && body !== null && method !== 'GET' && method !== 'HEAD') {
headers['Content-Type'] = 'application/json';
init.body = JSON.stringify(body);
}
let res: Response;
try {
res = await fetch(url, init);
} catch (err: unknown) {
throw new ConnectionError(this.baseUrl, err);
}
if (res.status === 401) {
throw new AuthenticationError();
}
const text = await res.text();
let parsed: unknown;
try {
parsed = JSON.parse(text);
} catch {
parsed = text;
}
return { status: res.status, body: parsed };
}
private async request<T>(method: string, path: string, body?: unknown): Promise<T> {
const result = await this.forward(method, path, '', body);
if (result.status >= 400) {
const detail = typeof result.body === 'object' && result.body !== null
? JSON.stringify(result.body)
: String(result.body);
throw new Error(`mcpd returned ${String(result.status)}: ${detail}`);
}
return result.body as T;
}
}

View File

@@ -0,0 +1,38 @@
/**
* Catch-all proxy route that forwards /api/v1/* requests to mcpd.
*/
import type { FastifyInstance } from 'fastify';
import { AuthenticationError, ConnectionError } from '../mcpd-client.js';
import type { McpdClient } from '../mcpd-client.js';
export function registerProxyRoutes(app: FastifyInstance, client: McpdClient): void {
app.all('/api/v1/*', async (request, reply) => {
const path = (request.url.split('?')[0]) ?? '/';
const querystring = request.url.includes('?')
? request.url.slice(request.url.indexOf('?') + 1)
: '';
const body = request.method !== 'GET' && request.method !== 'HEAD'
? (request.body as unknown)
: undefined;
try {
const result = await client.forward(request.method, path, querystring, body);
return reply.code(result.status).send(result.body);
} catch (err: unknown) {
if (err instanceof AuthenticationError) {
return reply.code(401).send({
error: 'unauthorized',
message: 'Authentication with mcpd failed. Run `mcpctl login` to refresh your token.',
});
}
if (err instanceof ConnectionError) {
return reply.code(503).send({
error: 'service_unavailable',
message: 'Cannot reach mcpd daemon. Is it running?',
});
}
throw err;
}
});
}

View File

@@ -0,0 +1,85 @@
import Fastify from 'fastify';
import type { FastifyInstance } from 'fastify';
import cors from '@fastify/cors';
import { APP_VERSION } from '@mcpctl/shared';
import type { HttpConfig } from './config.js';
import { McpdClient } from './mcpd-client.js';
import { registerProxyRoutes } from './routes/proxy.js';
import type { McpRouter } from '../router.js';
import type { HealthMonitor } from '../health.js';
import type { TieredHealthMonitor } from '../health/tiered.js';
export interface HttpServerDeps {
router: McpRouter;
healthMonitor?: HealthMonitor | undefined;
tieredHealthMonitor?: TieredHealthMonitor | undefined;
}
export async function createHttpServer(
config: HttpConfig,
deps: HttpServerDeps,
): Promise<FastifyInstance> {
const app = Fastify({
logger: {
level: config.logLevel,
},
});
await app.register(cors, {
origin: true,
methods: ['GET', 'POST', 'PUT', 'DELETE', 'PATCH'],
});
// Health endpoint
app.get('/health', async (_request, reply) => {
const upstreams = deps.router.getUpstreamNames();
const healthStatuses = deps.healthMonitor
? deps.healthMonitor.getAllStatuses()
: undefined;
// Include tiered summary if available
let tieredSummary: { mcpd: string; llmProvider: string | null } | undefined;
if (deps.tieredHealthMonitor) {
const tiered = await deps.tieredHealthMonitor.checkHealth();
tieredSummary = {
mcpd: tiered.mcpd.status,
llmProvider: tiered.mcplocal.llmProvider,
};
}
reply.code(200).send({
status: 'healthy',
version: APP_VERSION,
uptime: process.uptime(),
timestamp: new Date().toISOString(),
upstreams: upstreams.length,
mcpdUrl: config.mcpdUrl,
...(healthStatuses !== undefined ? { health: healthStatuses } : {}),
...(tieredSummary !== undefined ? { tiered: tieredSummary } : {}),
});
});
// Detailed tiered health endpoint
app.get('/health/detailed', async (_request, reply) => {
if (!deps.tieredHealthMonitor) {
reply.code(503).send({
error: 'Tiered health monitor not configured',
});
return;
}
const status = await deps.tieredHealthMonitor.checkHealth();
reply.code(200).send(status);
});
// Liveness probe
app.get('/healthz', async (_request, reply) => {
reply.code(200).send({ status: 'ok' });
});
// Proxy management routes to mcpd
const mcpdClient = new McpdClient(config.mcpdUrl, config.mcpdToken);
registerProxyRoutes(app, mcpdClient);
return app;
}

25
src/mcplocal/src/index.ts Normal file
View File

@@ -0,0 +1,25 @@
// Local LLM proxy - aggregates multiple MCP servers behind a single STDIO endpoint
export { McpRouter } from './router.js';
export { StdioProxyServer } from './server.js';
export { StdioUpstream, HttpUpstream } from './upstream/index.js';
export { HealthMonitor } from './health.js';
export type { HealthState, HealthStatus, HealthMonitorOptions } from './health.js';
export { TieredHealthMonitor } from './health/index.js';
export type { TieredHealthStatus, TieredHealthMonitorDeps, McplocalHealth, McpdHealth, InstanceHealth } from './health/index.js';
export { main } from './main.js';
export type { MainResult } from './main.js';
export { ProviderRegistry } from './providers/index.js';
export type { LlmProvider, CompletionOptions, CompletionResult, ChatMessage } from './providers/index.js';
export { OpenAiProvider, AnthropicProvider, OllamaProvider, GeminiCliProvider, DeepSeekProvider } from './providers/index.js';
export { createHttpServer, loadHttpConfig, McpdClient, AuthenticationError, ConnectionError, registerProxyRoutes } from './http/index.js';
export type { HttpConfig, HttpServerDeps } from './http/index.js';
export type {
JsonRpcRequest,
JsonRpcResponse,
JsonRpcNotification,
JsonRpcError,
JsonRpcMessage,
UpstreamConfig,
UpstreamConnection,
ProxyConfig,
} from './types.js';

View File

@@ -0,0 +1,96 @@
/**
* LRU cache for filter decisions.
*
* Caches whether a given tool name + response size combination should be
* filtered by the LLM pipeline. Avoids redundant LLM calls for repeated
* queries that produce similar-sized responses.
*/
export interface FilterCacheConfig {
/** Maximum number of entries in the cache (default 256) */
maxEntries: number;
/** TTL in milliseconds for cache entries (default 3_600_000 = 1 hour) */
ttlMs: number;
}
export const DEFAULT_FILTER_CACHE_CONFIG: FilterCacheConfig = {
maxEntries: 256,
ttlMs: 3_600_000,
};
interface CacheEntry {
shouldFilter: boolean;
createdAt: number;
}
/**
* Simple LRU cache for filter decisions keyed by tool name.
*
* Uses a Map to maintain insertion order for LRU eviction.
* No external dependencies.
*/
export class FilterCache {
private cache = new Map<string, CacheEntry>();
private readonly config: FilterCacheConfig;
constructor(config: Partial<FilterCacheConfig> = {}) {
this.config = { ...DEFAULT_FILTER_CACHE_CONFIG, ...config };
}
/**
* Look up a cached filter decision.
*
* @param toolName - The MCP tool name.
* @returns `true`/`false` if a cached decision exists, or `null` if no valid entry.
*/
shouldFilter(toolName: string): boolean | null {
const entry = this.cache.get(toolName);
if (!entry) return null;
// Check TTL expiration
if (Date.now() - entry.createdAt > this.config.ttlMs) {
this.cache.delete(toolName);
return null;
}
// Move to end for LRU freshness
this.cache.delete(toolName);
this.cache.set(toolName, entry);
return entry.shouldFilter;
}
/**
* Record a filter decision in the cache.
*
* @param toolName - The MCP tool name.
* @param shouldFilter - Whether the response should be filtered.
*/
recordDecision(toolName: string, shouldFilter: boolean): void {
// If already present, remove to refresh position
this.cache.delete(toolName);
// Evict oldest entry if at capacity
if (this.cache.size >= this.config.maxEntries) {
const oldest = this.cache.keys().next();
if (!oldest.done) {
this.cache.delete(oldest.value);
}
}
this.cache.set(toolName, {
shouldFilter,
createdAt: Date.now(),
});
}
/** Clear all cached entries. */
clear(): void {
this.cache.clear();
}
/** Number of entries currently in the cache. */
get size(): number {
return this.cache.size;
}
}

View File

@@ -0,0 +1,8 @@
export { LlmProcessor, DEFAULT_PROCESSOR_CONFIG } from './processor.js';
export type { LlmProcessorConfig, ProcessedRequest, FilteredResponse } from './processor.js';
export { RESPONSE_FILTER_SYSTEM_PROMPT, REQUEST_OPTIMIZATION_SYSTEM_PROMPT } from './prompts.js';
export { estimateTokens } from './token-counter.js';
export { FilterCache, DEFAULT_FILTER_CACHE_CONFIG } from './filter-cache.js';
export type { FilterCacheConfig } from './filter-cache.js';
export { FilterMetrics } from './metrics.js';
export type { FilterMetricsSnapshot } from './metrics.js';

View File

@@ -0,0 +1,83 @@
/**
* Metrics tracking for the LLM filter pipeline.
*
* Records token savings, cache efficiency, and filter latency to enable
* observability of the smart context optimization layer.
*/
export interface FilterMetricsSnapshot {
/** Total estimated tokens that entered the filter pipeline */
totalTokensProcessed: number;
/** Estimated tokens saved by filtering */
tokensSaved: number;
/** Number of cache hits (filter decision reused) */
cacheHits: number;
/** Number of cache misses (required fresh decision) */
cacheMisses: number;
/** Number of filter operations performed */
filterCount: number;
/** Average filter latency in milliseconds (0 if no operations) */
averageFilterLatencyMs: number;
}
/**
* Accumulates metrics for the LLM filter pipeline.
*
* Thread-safe for single-threaded Node.js usage. Call `getStats()` to
* retrieve a snapshot of current metrics.
*/
export class FilterMetrics {
private totalTokensProcessed = 0;
private tokensSaved = 0;
private cacheHits = 0;
private cacheMisses = 0;
private filterCount = 0;
private totalFilterLatencyMs = 0;
/**
* Record a single filter operation.
*
* @param originalTokens - Estimated tokens before filtering.
* @param filteredTokens - Estimated tokens after filtering.
* @param latencyMs - Time taken for the filter operation in ms.
*/
recordFilter(originalTokens: number, filteredTokens: number, latencyMs: number): void {
this.totalTokensProcessed += originalTokens;
this.tokensSaved += Math.max(0, originalTokens - filteredTokens);
this.filterCount++;
this.totalFilterLatencyMs += latencyMs;
}
/** Record a cache hit. */
recordCacheHit(): void {
this.cacheHits++;
}
/** Record a cache miss. */
recordCacheMiss(): void {
this.cacheMisses++;
}
/** Return a snapshot of all accumulated metrics. */
getStats(): FilterMetricsSnapshot {
return {
totalTokensProcessed: this.totalTokensProcessed,
tokensSaved: this.tokensSaved,
cacheHits: this.cacheHits,
cacheMisses: this.cacheMisses,
filterCount: this.filterCount,
averageFilterLatencyMs:
this.filterCount > 0 ? this.totalFilterLatencyMs / this.filterCount : 0,
};
}
/** Reset all metrics to zero. */
reset(): void {
this.totalTokensProcessed = 0;
this.tokensSaved = 0;
this.cacheHits = 0;
this.cacheMisses = 0;
this.filterCount = 0;
this.totalFilterLatencyMs = 0;
}
}

View File

@@ -0,0 +1,231 @@
import type { ProviderRegistry } from '../providers/registry.js';
import type { JsonRpcResponse } from '../types.js';
import { RESPONSE_FILTER_SYSTEM_PROMPT, REQUEST_OPTIMIZATION_SYSTEM_PROMPT } from './prompts.js';
import { estimateTokens } from './token-counter.js';
import { FilterCache } from './filter-cache.js';
import type { FilterCacheConfig } from './filter-cache.js';
import { FilterMetrics } from './metrics.js';
export interface LlmProcessorConfig {
/** Enable request preprocessing */
enablePreprocessing: boolean;
/** Enable response filtering */
enableFiltering: boolean;
/** Tool name patterns to skip (matched against namespaced name) */
excludeTools: string[];
/** Max tokens for LLM calls */
maxTokens: number;
/** Token threshold below which responses skip LLM filtering (default 250 tokens ~ 1000 chars) */
tokenThreshold: number;
/** Filter cache configuration (optional; omit to use defaults) */
filterCache?: FilterCacheConfig | undefined;
}
export const DEFAULT_PROCESSOR_CONFIG: LlmProcessorConfig = {
enablePreprocessing: false,
enableFiltering: true,
excludeTools: [],
maxTokens: 1024,
tokenThreshold: 250,
};
export interface ProcessedRequest {
optimized: boolean;
params: Record<string, unknown>;
}
export interface FilteredResponse {
filtered: boolean;
result: unknown;
originalSize: number;
filteredSize: number;
}
/**
* LLM pre-processing pipeline. Intercepts MCP tool calls and uses a local
* LLM to optimize requests and filter responses, reducing token usage for
* the upstream Claude model.
*
* Includes smart context optimization:
* - Token-based thresholds to skip filtering small responses
* - LRU cache for filter decisions on repeated tool calls
* - Metrics tracking for observability
*/
export class LlmProcessor {
private readonly filterCache: FilterCache;
private readonly metrics: FilterMetrics;
constructor(
private providers: ProviderRegistry,
private config: LlmProcessorConfig = DEFAULT_PROCESSOR_CONFIG,
) {
this.filterCache = new FilterCache(config.filterCache);
this.metrics = new FilterMetrics();
}
/** Methods that should never be preprocessed (protocol-level or simple CRUD) */
private static readonly BYPASS_METHODS = new Set([
'initialize',
'tools/list',
'resources/list',
'prompts/list',
'prompts/get',
'resources/subscribe',
'resources/unsubscribe',
]);
/** Simple operations that don't benefit from preprocessing */
private static readonly SIMPLE_OPERATIONS = new Set([
'create', 'delete', 'remove', 'subscribe', 'unsubscribe',
]);
shouldProcess(method: string, toolName?: string): boolean {
if (LlmProcessor.BYPASS_METHODS.has(method)) return false;
if (!toolName) return false;
// Check exclude list
if (this.config.excludeTools.some((pattern) => toolName.includes(pattern))) {
return false;
}
// Skip simple CRUD operations
const baseName = toolName.includes('/') ? toolName.split('/').pop()! : toolName;
for (const op of LlmProcessor.SIMPLE_OPERATIONS) {
if (baseName.startsWith(op)) return false;
}
return true;
}
/**
* Optimize request parameters using the active LLM provider.
* Falls back to original params if LLM is unavailable or fails.
*/
async preprocessRequest(toolName: string, params: Record<string, unknown>): Promise<ProcessedRequest> {
if (!this.config.enablePreprocessing) {
return { optimized: false, params };
}
const provider = this.providers.getActive();
if (!provider) {
return { optimized: false, params };
}
try {
const result = await provider.complete({
messages: [
{ role: 'system', content: REQUEST_OPTIMIZATION_SYSTEM_PROMPT },
{ role: 'user', content: `Tool: ${toolName}\nParameters: ${JSON.stringify(params)}` },
],
maxTokens: this.config.maxTokens,
temperature: 0,
});
const optimized = JSON.parse(result.content) as Record<string, unknown>;
return { optimized: true, params: optimized };
} catch {
// LLM failed, fall through to original params
return { optimized: false, params };
}
}
/**
* Filter/summarize a tool response using the active LLM provider.
* Falls back to original response if LLM is unavailable or fails.
*
* Uses token-based thresholds and an LRU filter cache to skip unnecessary
* LLM calls. Records metrics for every filter operation.
*/
async filterResponse(toolName: string, response: JsonRpcResponse): Promise<FilteredResponse> {
if (!this.config.enableFiltering) {
const raw = JSON.stringify(response.result);
return { filtered: false, result: response.result, originalSize: raw.length, filteredSize: raw.length };
}
const provider = this.providers.getActive();
if (!provider) {
const raw = JSON.stringify(response.result);
return { filtered: false, result: response.result, originalSize: raw.length, filteredSize: raw.length };
}
// Don't filter error responses
if (response.error) {
return { filtered: false, result: response.result, originalSize: 0, filteredSize: 0 };
}
const raw = JSON.stringify(response.result);
const tokens = estimateTokens(raw);
// Skip small responses below the token threshold
if (tokens < this.config.tokenThreshold) {
return { filtered: false, result: response.result, originalSize: raw.length, filteredSize: raw.length };
}
// Check filter cache for a prior decision on this tool
const cachedDecision = this.filterCache.shouldFilter(toolName);
if (cachedDecision !== null) {
this.metrics.recordCacheHit();
if (!cachedDecision) {
// Previously decided not to filter this tool's responses
return { filtered: false, result: response.result, originalSize: raw.length, filteredSize: raw.length };
}
} else {
this.metrics.recordCacheMiss();
}
const startTime = performance.now();
try {
const result = await provider.complete({
messages: [
{ role: 'system', content: RESPONSE_FILTER_SYSTEM_PROMPT },
{ role: 'user', content: `Tool: ${toolName}\nResponse (${raw.length} chars):\n${raw}` },
],
maxTokens: this.config.maxTokens,
temperature: 0,
});
const filtered = JSON.parse(result.content) as unknown;
const filteredStr = JSON.stringify(filtered);
const filteredTokens = estimateTokens(filteredStr);
const latencyMs = performance.now() - startTime;
this.metrics.recordFilter(tokens, filteredTokens, latencyMs);
// Cache the decision: if filtering actually saved tokens, remember to filter
const didSave = filteredStr.length < raw.length;
this.filterCache.recordDecision(toolName, didSave);
return {
filtered: true,
result: filtered,
originalSize: raw.length,
filteredSize: filteredStr.length,
};
} catch {
const latencyMs = performance.now() - startTime;
this.metrics.recordFilter(tokens, tokens, latencyMs);
// LLM failed — cache as "don't filter" to avoid repeated failures
this.filterCache.recordDecision(toolName, false);
// LLM failed, return original
return { filtered: false, result: response.result, originalSize: raw.length, filteredSize: raw.length };
}
}
/** Return a snapshot of filter pipeline metrics. */
getMetrics(): ReturnType<FilterMetrics['getStats']> {
return this.metrics.getStats();
}
/** Reset all metrics. */
resetMetrics(): void {
this.metrics.reset();
}
/** Clear the filter decision cache. */
clearFilterCache(): void {
this.filterCache.clear();
}
}

View File

@@ -0,0 +1,21 @@
/**
* System prompts for the LLM pre-processing pipeline.
*/
export const RESPONSE_FILTER_SYSTEM_PROMPT = `You are a data filtering assistant. Your job is to extract only the relevant information from MCP tool responses.
Rules:
- Remove redundant or verbose fields that aren't useful to the user's query
- Keep essential identifiers, names, statuses, and key metrics
- Preserve error messages and warnings in full
- If the response is already concise, return it unchanged
- Output valid JSON only, no markdown or explanations
- If you cannot parse the input, return it unchanged`;
export const REQUEST_OPTIMIZATION_SYSTEM_PROMPT = `You are a query optimization assistant. Your job is to optimize MCP tool call parameters.
Rules:
- Add appropriate filters or limits if the query is too broad
- Keep the original intent of the request
- Output valid JSON with the optimized parameters only, no markdown or explanations
- If no optimization is needed, return the original parameters unchanged`;

View File

@@ -0,0 +1,18 @@
/**
* Simple token estimation utility.
*
* Uses a heuristic of ~4 characters per token, which is a reasonable
* approximation for English text and JSON payloads. For more accurate
* counting, a tokenizer like tiktoken could be used instead.
*/
/**
* Estimate the number of tokens in a text string.
*
* @param text - The input text to estimate tokens for.
* @returns Estimated token count (minimum 0).
*/
export function estimateTokens(text: string): number {
if (text.length === 0) return 0;
return Math.ceil(text.length / 4);
}

150
src/mcplocal/src/main.ts Normal file
View File

@@ -0,0 +1,150 @@
#!/usr/bin/env node
import { readFileSync } from 'node:fs';
import type { FastifyInstance } from 'fastify';
import type { ProxyConfig, UpstreamConfig } from './types.js';
import { McpRouter } from './router.js';
import { StdioProxyServer } from './server.js';
import { StdioUpstream } from './upstream/stdio.js';
import { HttpUpstream } from './upstream/http.js';
import { createHttpServer } from './http/server.js';
import { loadHttpConfig } from './http/config.js';
import type { HttpConfig } from './http/config.js';
interface ParsedArgs {
configPath: string | undefined;
upstreams: string[];
noHttp: boolean;
}
function parseArgs(argv: string[]): ParsedArgs {
let configPath: string | undefined;
const upstreams: string[] = [];
let noHttp = false;
for (let i = 2; i < argv.length; i++) {
const arg = argv[i];
if (arg === '--config' && i + 1 < argv.length) {
configPath = argv[++i];
} else if (arg?.startsWith('--config=')) {
configPath = arg.slice('--config='.length);
} else if (arg === '--upstream' && i + 1 < argv.length) {
upstreams.push(argv[++i]!);
} else if (arg?.startsWith('--upstream=')) {
upstreams.push(arg.slice('--upstream='.length));
} else if (arg === '--no-http') {
noHttp = true;
}
}
return { configPath, upstreams, noHttp };
}
function loadConfig(configPath: string): ProxyConfig {
const raw = readFileSync(configPath, 'utf-8');
return JSON.parse(raw) as ProxyConfig;
}
function createUpstream(config: UpstreamConfig) {
if (config.transport === 'stdio') {
return new StdioUpstream(config);
}
return new HttpUpstream(config);
}
export interface MainResult {
router: McpRouter;
server: StdioProxyServer;
httpServer: FastifyInstance | undefined;
httpConfig: HttpConfig;
}
export async function main(argv: string[] = process.argv): Promise<MainResult> {
const args = parseArgs(argv);
const httpConfig = loadHttpConfig();
let upstreamConfigs: UpstreamConfig[] = [];
if (args.configPath) {
const config = loadConfig(args.configPath);
upstreamConfigs = config.upstreams;
}
// --upstream flags: "name:command arg1 arg2" for STDIO or "name:http://url" for HTTP
for (const spec of args.upstreams) {
const colonIdx = spec.indexOf(':');
if (colonIdx === -1) continue;
const name = spec.slice(0, colonIdx);
const rest = spec.slice(colonIdx + 1);
if (rest.startsWith('http://') || rest.startsWith('https://')) {
upstreamConfigs.push({
serverId: name,
name,
transport: 'streamable-http',
url: rest,
});
} else {
const parts = rest.split(' ').filter(Boolean);
const command = parts[0];
if (!command) continue;
const config: UpstreamConfig = {
serverId: name,
name,
transport: 'stdio',
command,
args: parts.slice(1),
};
upstreamConfigs.push(config);
}
}
const router = new McpRouter();
for (const config of upstreamConfigs) {
const upstream = createUpstream(config);
if (upstream instanceof StdioUpstream) {
await upstream.start();
}
router.addUpstream(upstream);
}
// Start stdio proxy server
const server = new StdioProxyServer(router);
server.start();
process.stderr.write(`mcpctl-proxy started with ${upstreamConfigs.length} upstream(s)\n`);
// Start HTTP server unless disabled
let httpServer: FastifyInstance | undefined;
if (!args.noHttp) {
httpServer = await createHttpServer(httpConfig, { router });
await httpServer.listen({ port: httpConfig.httpPort, host: httpConfig.httpHost });
process.stderr.write(`mcpctl-proxy HTTP server listening on ${httpConfig.httpHost}:${httpConfig.httpPort}\n`);
}
// Graceful shutdown
let shuttingDown = false;
const shutdown = async () => {
if (shuttingDown) return;
shuttingDown = true;
server.stop();
if (httpServer) {
await httpServer.close();
}
await router.closeAll();
process.exit(0);
};
process.on('SIGTERM', () => void shutdown());
process.on('SIGINT', () => void shutdown());
return { router, server, httpServer, httpConfig };
}
// Run when executed directly
const isMain = process.argv[1]?.endsWith('main.js') || process.argv[1]?.endsWith('main.ts');
if (isMain) {
main().catch((err) => {
process.stderr.write(`Fatal: ${err}\n`);
process.exit(1);
});
}

View File

@@ -0,0 +1,179 @@
import https from 'node:https';
import type { LlmProvider, CompletionOptions, CompletionResult, ChatMessage, ToolCall } from './types.js';
export interface AnthropicConfig {
apiKey: string;
defaultModel?: string;
}
/**
* Anthropic Claude provider using the Messages API.
*/
export class AnthropicProvider implements LlmProvider {
readonly name = 'anthropic';
private apiKey: string;
private defaultModel: string;
constructor(config: AnthropicConfig) {
this.apiKey = config.apiKey;
this.defaultModel = config.defaultModel ?? 'claude-sonnet-4-20250514';
}
async complete(options: CompletionOptions): Promise<CompletionResult> {
const model = options.model ?? this.defaultModel;
// Separate system message from conversation
const systemMessages = options.messages.filter((m) => m.role === 'system');
const conversationMessages = options.messages.filter((m) => m.role !== 'system');
const body: Record<string, unknown> = {
model,
max_tokens: options.maxTokens ?? 4096,
messages: conversationMessages.map(toAnthropicMessage),
};
if (systemMessages.length > 0) {
body.system = systemMessages.map((m) => m.content).join('\n');
}
if (options.temperature !== undefined) body.temperature = options.temperature;
if (options.tools && options.tools.length > 0) {
body.tools = options.tools.map((t) => ({
name: t.name,
description: t.description,
input_schema: t.inputSchema,
}));
}
const response = await this.request(body);
return parseAnthropicResponse(response);
}
async listModels(): Promise<string[]> {
// Anthropic doesn't have a models listing endpoint; return known models
return [
'claude-opus-4-20250514',
'claude-sonnet-4-20250514',
'claude-haiku-3-5-20241022',
];
}
async isAvailable(): Promise<boolean> {
try {
// Send a minimal request to check API key
await this.complete({
messages: [{ role: 'user', content: 'hi' }],
maxTokens: 1,
});
return true;
} catch {
return false;
}
}
private request(body: unknown): Promise<unknown> {
return new Promise((resolve, reject) => {
const payload = JSON.stringify(body);
const opts = {
hostname: 'api.anthropic.com',
port: 443,
path: '/v1/messages',
method: 'POST',
timeout: 120000,
headers: {
'x-api-key': this.apiKey,
'anthropic-version': '2023-06-01',
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(payload),
},
};
const req = https.request(opts, (res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf-8');
try {
resolve(JSON.parse(raw));
} catch {
reject(new Error(`Invalid JSON response: ${raw.slice(0, 200)}`));
}
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error('Request timed out'));
});
req.write(payload);
req.end();
});
}
}
function toAnthropicMessage(msg: ChatMessage): Record<string, unknown> {
if (msg.role === 'tool') {
return {
role: 'user',
content: [{
type: 'tool_result',
tool_use_id: msg.toolCallId,
content: msg.content,
}],
};
}
return {
role: msg.role === 'user' ? 'user' : 'assistant',
content: msg.content,
};
}
function parseAnthropicResponse(raw: unknown): CompletionResult {
const data = raw as {
content?: Array<{
type: string;
text?: string;
id?: string;
name?: string;
input?: Record<string, unknown>;
}>;
stop_reason?: string;
usage?: {
input_tokens?: number;
output_tokens?: number;
};
};
let content = '';
const toolCalls: ToolCall[] = [];
for (const block of data.content ?? []) {
if (block.type === 'text' && block.text) {
content += block.text;
} else if (block.type === 'tool_use' && block.id && block.name) {
toolCalls.push({
id: block.id,
name: block.name,
arguments: block.input ?? {},
});
}
}
const inputTokens = data.usage?.input_tokens ?? 0;
const outputTokens = data.usage?.output_tokens ?? 0;
const finishReason = data.stop_reason === 'tool_use' ? 'tool_calls' as const
: data.stop_reason === 'max_tokens' ? 'length' as const
: 'stop' as const;
return {
content,
toolCalls,
usage: {
promptTokens: inputTokens,
completionTokens: outputTokens,
totalTokens: inputTokens + outputTokens,
},
finishReason,
};
}

View File

@@ -0,0 +1,191 @@
import https from 'node:https';
import type { LlmProvider, CompletionOptions, CompletionResult, ChatMessage, ToolCall } from './types.js';
export interface DeepSeekConfig {
apiKey: string;
baseUrl?: string;
defaultModel?: string;
}
interface DeepSeekMessage {
role: string;
content: string | null;
tool_calls?: Array<{
id: string;
type: 'function';
function: { name: string; arguments: string };
}>;
tool_call_id?: string;
name?: string;
}
/**
* DeepSeek provider using the OpenAI-compatible chat completions API.
* Endpoint: https://api.deepseek.com/v1/chat/completions
*/
export class DeepSeekProvider implements LlmProvider {
readonly name = 'deepseek';
private apiKey: string;
private baseUrl: string;
private defaultModel: string;
constructor(config: DeepSeekConfig) {
this.apiKey = config.apiKey;
this.baseUrl = (config.baseUrl ?? 'https://api.deepseek.com').replace(/\/$/, '');
this.defaultModel = config.defaultModel ?? 'deepseek-chat';
}
async complete(options: CompletionOptions): Promise<CompletionResult> {
const model = options.model ?? this.defaultModel;
const body: Record<string, unknown> = {
model,
messages: options.messages.map(toDeepSeekMessage),
};
if (options.temperature !== undefined) body.temperature = options.temperature;
if (options.maxTokens !== undefined) body.max_tokens = options.maxTokens;
if (options.tools && options.tools.length > 0) {
body.tools = options.tools.map((t) => ({
type: 'function',
function: {
name: t.name,
description: t.description,
parameters: t.inputSchema,
},
}));
}
const response = await this.request('/v1/chat/completions', body);
return parseResponse(response);
}
async listModels(): Promise<string[]> {
// DeepSeek doesn't have a public models listing endpoint;
// return well-known models.
return [
'deepseek-chat',
'deepseek-reasoner',
];
}
async isAvailable(): Promise<boolean> {
if (!this.apiKey) return false;
try {
// Send a minimal request to verify the API key
await this.complete({
messages: [{ role: 'user', content: 'hi' }],
maxTokens: 1,
});
return true;
} catch {
return false;
}
}
private request(path: string, body: unknown, method = 'POST'): Promise<unknown> {
return new Promise((resolve, reject) => {
const url = new URL(path, this.baseUrl);
const payload = body !== undefined ? JSON.stringify(body) : undefined;
const opts = {
hostname: url.hostname,
port: url.port || 443,
path: url.pathname,
method,
timeout: 120000,
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
...(payload ? { 'Content-Length': Buffer.byteLength(payload) } : {}),
},
};
const req = https.request(opts, (res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf-8');
// Handle rate limiting
if (res.statusCode === 429) {
const retryAfter = res.headers['retry-after'];
reject(new Error(`DeepSeek rate limit exceeded${retryAfter ? `. Retry after ${retryAfter}s` : ''}`));
return;
}
try {
resolve(JSON.parse(raw));
} catch {
reject(new Error(`Invalid JSON response from DeepSeek ${path}: ${raw.slice(0, 200)}`));
}
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error('DeepSeek request timed out'));
});
if (payload) req.write(payload);
req.end();
});
}
}
function toDeepSeekMessage(msg: ChatMessage): DeepSeekMessage {
const result: DeepSeekMessage = {
role: msg.role,
content: msg.content,
};
if (msg.toolCallId !== undefined) result.tool_call_id = msg.toolCallId;
if (msg.name !== undefined) result.name = msg.name;
return result;
}
function parseResponse(raw: unknown): CompletionResult {
const data = raw as {
choices?: Array<{
message?: {
content?: string | null;
tool_calls?: Array<{
id: string;
function: { name: string; arguments: string };
}>;
};
finish_reason?: string;
}>;
usage?: {
prompt_tokens?: number;
completion_tokens?: number;
total_tokens?: number;
};
};
const choice = data.choices?.[0];
const toolCalls: ToolCall[] = (choice?.message?.tool_calls ?? []).map((tc) => ({
id: tc.id,
name: tc.function.name,
arguments: safeParse(tc.function.arguments),
}));
const finishReason = choice?.finish_reason === 'tool_calls' ? 'tool_calls' as const
: choice?.finish_reason === 'length' ? 'length' as const
: 'stop' as const;
return {
content: choice?.message?.content ?? '',
toolCalls,
usage: {
promptTokens: data.usage?.prompt_tokens ?? 0,
completionTokens: data.usage?.completion_tokens ?? 0,
totalTokens: data.usage?.total_tokens ?? 0,
},
finishReason,
};
}
function safeParse(json: string): Record<string, unknown> {
try {
return JSON.parse(json) as Record<string, unknown>;
} catch {
return {};
}
}

View File

@@ -0,0 +1,113 @@
import { spawn } from 'node:child_process';
import { execFile } from 'node:child_process';
import { promisify } from 'node:util';
import type { LlmProvider, CompletionOptions, CompletionResult } from './types.js';
const execFileAsync = promisify(execFile);
export interface GeminiCliConfig {
binaryPath?: string;
defaultModel?: string;
timeoutMs?: number;
}
/**
* Gemini CLI provider. Spawns the `gemini` binary in non-interactive mode
* using the -p (--prompt) flag and captures stdout.
*
* Note: This provider does not support tool calls since the CLI interface
* only returns text output. toolCalls will always be an empty array.
*/
export class GeminiCliProvider implements LlmProvider {
readonly name = 'gemini-cli';
private binaryPath: string;
private defaultModel: string;
private timeoutMs: number;
constructor(config?: GeminiCliConfig) {
this.binaryPath = config?.binaryPath ?? 'gemini';
this.defaultModel = config?.defaultModel ?? 'gemini-2.5-flash';
this.timeoutMs = config?.timeoutMs ?? 30000;
}
async complete(options: CompletionOptions): Promise<CompletionResult> {
const model = options.model ?? this.defaultModel;
// Build prompt from messages
const prompt = options.messages
.map((m) => {
if (m.role === 'system') return `System: ${m.content}`;
if (m.role === 'user') return m.content;
if (m.role === 'assistant') return `Assistant: ${m.content}`;
return m.content;
})
.join('\n\n');
const args = ['-p', prompt, '-m', model, '-o', 'text'];
const content = await this.spawn(args);
return {
content: content.trim(),
toolCalls: [],
usage: {
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
},
finishReason: 'stop',
};
}
async listModels(): Promise<string[]> {
// The Gemini CLI does not expose a model listing command;
// return well-known models.
return [
'gemini-2.5-flash',
'gemini-2.5-pro',
'gemini-2.0-flash',
];
}
async isAvailable(): Promise<boolean> {
try {
await execFileAsync(this.binaryPath, ['--version'], { timeout: 5000 });
return true;
} catch {
return false;
}
}
private spawn(args: string[]): Promise<string> {
return new Promise((resolve, reject) => {
const child = spawn(this.binaryPath, args, {
stdio: ['ignore', 'pipe', 'pipe'],
timeout: this.timeoutMs,
});
const stdoutChunks: Buffer[] = [];
const stderrChunks: Buffer[] = [];
child.stdout.on('data', (chunk: Buffer) => stdoutChunks.push(chunk));
child.stderr.on('data', (chunk: Buffer) => stderrChunks.push(chunk));
child.on('error', (err) => {
if ((err as NodeJS.ErrnoException).code === 'ENOENT') {
reject(new Error(`Gemini CLI binary not found at '${this.binaryPath}'. Install with: npm install -g @google/gemini-cli`));
} else {
reject(err);
}
});
child.on('close', (code) => {
const stdout = Buffer.concat(stdoutChunks).toString('utf-8');
if (code === 0) {
resolve(stdout);
} else {
const stderr = Buffer.concat(stderrChunks).toString('utf-8');
reject(new Error(`Gemini CLI exited with code ${code}: ${stderr.slice(0, 500)}`));
}
});
});
}
}

View File

@@ -0,0 +1,12 @@
export type { LlmProvider, CompletionOptions, CompletionResult, ChatMessage, ToolDefinition, ToolCall } from './types.js';
export { OpenAiProvider } from './openai.js';
export type { OpenAiConfig } from './openai.js';
export { AnthropicProvider } from './anthropic.js';
export type { AnthropicConfig } from './anthropic.js';
export { OllamaProvider } from './ollama.js';
export type { OllamaConfig } from './ollama.js';
export { GeminiCliProvider } from './gemini-cli.js';
export type { GeminiCliConfig } from './gemini-cli.js';
export { DeepSeekProvider } from './deepseek.js';
export type { DeepSeekConfig } from './deepseek.js';
export { ProviderRegistry } from './registry.js';

View File

@@ -0,0 +1,138 @@
import http from 'node:http';
import type { LlmProvider, CompletionOptions, CompletionResult, ToolCall } from './types.js';
export interface OllamaConfig {
baseUrl?: string;
defaultModel?: string;
}
/**
* Ollama provider for local model inference.
* Uses the Ollama HTTP API at /api/chat.
*/
export class OllamaProvider implements LlmProvider {
readonly name = 'ollama';
private baseUrl: string;
private defaultModel: string;
constructor(config?: OllamaConfig) {
this.baseUrl = (config?.baseUrl ?? 'http://localhost:11434').replace(/\/$/, '');
this.defaultModel = config?.defaultModel ?? 'llama3.2';
}
async complete(options: CompletionOptions): Promise<CompletionResult> {
const model = options.model ?? this.defaultModel;
const body: Record<string, unknown> = {
model,
messages: options.messages.map((m) => ({
role: m.role === 'tool' ? 'assistant' : m.role,
content: m.content,
})),
stream: false,
};
if (options.temperature !== undefined) {
body.options = { temperature: options.temperature };
}
if (options.tools && options.tools.length > 0) {
body.tools = options.tools.map((t) => ({
type: 'function',
function: {
name: t.name,
description: t.description,
parameters: t.inputSchema,
},
}));
}
const response = await this.request('/api/chat', body);
return parseOllamaResponse(response);
}
async listModels(): Promise<string[]> {
const response = await this.request('/api/tags', undefined, 'GET') as {
models?: Array<{ name: string }>;
};
return (response.models ?? []).map((m) => m.name);
}
async isAvailable(): Promise<boolean> {
try {
await this.listModels();
return true;
} catch {
return false;
}
}
private request(path: string, body?: unknown, method = 'POST'): Promise<unknown> {
return new Promise((resolve, reject) => {
const url = new URL(path, this.baseUrl);
const payload = body !== undefined ? JSON.stringify(body) : undefined;
const opts: http.RequestOptions = {
hostname: url.hostname,
port: url.port || 11434,
path: url.pathname,
method,
timeout: 300000, // Ollama can be slow on first inference
headers: {
'Content-Type': 'application/json',
...(payload ? { 'Content-Length': Buffer.byteLength(payload) } : {}),
},
};
const req = http.request(opts, (res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf-8');
try {
resolve(JSON.parse(raw));
} catch {
reject(new Error(`Invalid JSON from Ollama: ${raw.slice(0, 200)}`));
}
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error('Ollama request timed out'));
});
if (payload) req.write(payload);
req.end();
});
}
}
function parseOllamaResponse(raw: unknown): CompletionResult {
const data = raw as {
message?: {
content?: string;
tool_calls?: Array<{
function: { name: string; arguments: Record<string, unknown> };
}>;
};
prompt_eval_count?: number;
eval_count?: number;
};
const toolCalls: ToolCall[] = (data.message?.tool_calls ?? []).map((tc, i) => ({
id: `ollama-${i}`,
name: tc.function.name,
arguments: tc.function.arguments,
}));
const promptTokens = data.prompt_eval_count ?? 0;
const completionTokens = data.eval_count ?? 0;
return {
content: data.message?.content ?? '',
toolCalls,
usage: {
promptTokens,
completionTokens,
totalTokens: promptTokens + completionTokens,
},
finishReason: toolCalls.length > 0 ? 'tool_calls' : 'stop',
};
}

View File

@@ -0,0 +1,179 @@
import https from 'node:https';
import http from 'node:http';
import type { LlmProvider, CompletionOptions, CompletionResult, ChatMessage, ToolCall } from './types.js';
export interface OpenAiConfig {
apiKey: string;
baseUrl?: string;
defaultModel?: string;
}
interface OpenAiMessage {
role: string;
content: string | null;
tool_calls?: Array<{
id: string;
type: 'function';
function: { name: string; arguments: string };
}>;
tool_call_id?: string;
name?: string;
}
/**
* OpenAI-compatible provider. Works with OpenAI API, Azure OpenAI,
* and any service with an OpenAI-compatible chat completions endpoint.
*/
export class OpenAiProvider implements LlmProvider {
readonly name = 'openai';
private apiKey: string;
private baseUrl: string;
private defaultModel: string;
constructor(config: OpenAiConfig) {
this.apiKey = config.apiKey;
this.baseUrl = (config.baseUrl ?? 'https://api.openai.com').replace(/\/$/, '');
this.defaultModel = config.defaultModel ?? 'gpt-4o';
}
async complete(options: CompletionOptions): Promise<CompletionResult> {
const model = options.model ?? this.defaultModel;
const body: Record<string, unknown> = {
model,
messages: options.messages.map(toOpenAiMessage),
};
if (options.temperature !== undefined) body.temperature = options.temperature;
if (options.maxTokens !== undefined) body.max_tokens = options.maxTokens;
if (options.tools && options.tools.length > 0) {
body.tools = options.tools.map((t) => ({
type: 'function',
function: {
name: t.name,
description: t.description,
parameters: t.inputSchema,
},
}));
}
const response = await this.request('/v1/chat/completions', body);
return parseResponse(response);
}
async listModels(): Promise<string[]> {
const response = await this.request('/v1/models', undefined, 'GET');
const data = response as { data?: Array<{ id: string }> };
return (data.data ?? []).map((m) => m.id);
}
async isAvailable(): Promise<boolean> {
try {
await this.listModels();
return true;
} catch {
return false;
}
}
private request(path: string, body: unknown, method = 'POST'): Promise<unknown> {
return new Promise((resolve, reject) => {
const url = new URL(path, this.baseUrl);
const isHttps = url.protocol === 'https:';
const transport = isHttps ? https : http;
const payload = body !== undefined ? JSON.stringify(body) : undefined;
const opts = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname,
method,
timeout: 120000,
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
...(payload ? { 'Content-Length': Buffer.byteLength(payload) } : {}),
},
};
const req = transport.request(opts, (res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf-8');
try {
resolve(JSON.parse(raw));
} catch {
reject(new Error(`Invalid JSON response from ${path}: ${raw.slice(0, 200)}`));
}
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error('Request timed out'));
});
if (payload) req.write(payload);
req.end();
});
}
}
function toOpenAiMessage(msg: ChatMessage): OpenAiMessage {
const result: OpenAiMessage = {
role: msg.role,
content: msg.content,
};
if (msg.toolCallId !== undefined) result.tool_call_id = msg.toolCallId;
if (msg.name !== undefined) result.name = msg.name;
return result;
}
function parseResponse(raw: unknown): CompletionResult {
const data = raw as {
choices?: Array<{
message?: {
content?: string | null;
tool_calls?: Array<{
id: string;
function: { name: string; arguments: string };
}>;
};
finish_reason?: string;
}>;
usage?: {
prompt_tokens?: number;
completion_tokens?: number;
total_tokens?: number;
};
};
const choice = data.choices?.[0];
const toolCalls: ToolCall[] = (choice?.message?.tool_calls ?? []).map((tc) => ({
id: tc.id,
name: tc.function.name,
arguments: safeParse(tc.function.arguments),
}));
const finishReason = choice?.finish_reason === 'tool_calls' ? 'tool_calls' as const
: choice?.finish_reason === 'length' ? 'length' as const
: 'stop' as const;
return {
content: choice?.message?.content ?? '',
toolCalls,
usage: {
promptTokens: data.usage?.prompt_tokens ?? 0,
completionTokens: data.usage?.completion_tokens ?? 0,
totalTokens: data.usage?.total_tokens ?? 0,
},
finishReason,
};
}
function safeParse(json: string): Record<string, unknown> {
try {
return JSON.parse(json) as Record<string, unknown>;
} catch {
return {};
}
}

View File

@@ -0,0 +1,48 @@
import type { LlmProvider } from './types.js';
/**
* Registry for LLM providers. Supports switching the active provider at runtime.
*/
export class ProviderRegistry {
private providers = new Map<string, LlmProvider>();
private activeProvider: string | null = null;
register(provider: LlmProvider): void {
this.providers.set(provider.name, provider);
if (this.activeProvider === null) {
this.activeProvider = provider.name;
}
}
unregister(name: string): void {
this.providers.delete(name);
if (this.activeProvider === name) {
const first = this.providers.keys().next();
this.activeProvider = first.done ? null : first.value;
}
}
setActive(name: string): void {
if (!this.providers.has(name)) {
throw new Error(`Provider '${name}' is not registered`);
}
this.activeProvider = name;
}
getActive(): LlmProvider | null {
if (this.activeProvider === null) return null;
return this.providers.get(this.activeProvider) ?? null;
}
get(name: string): LlmProvider | undefined {
return this.providers.get(name);
}
list(): string[] {
return [...this.providers.keys()];
}
getActiveName(): string | null {
return this.activeProvider;
}
}

View File

@@ -0,0 +1,56 @@
/**
* LLM Provider abstraction for the local proxy.
*
* When the proxy intercepts tool calls, it can optionally route them
* through an LLM provider for autonomous tool use, summarization,
* or other processing before forwarding to the upstream MCP server.
*/
export interface ChatMessage {
role: 'system' | 'user' | 'assistant' | 'tool';
content: string;
toolCallId?: string;
name?: string;
}
export interface ToolDefinition {
name: string;
description: string;
inputSchema: Record<string, unknown>;
}
export interface ToolCall {
id: string;
name: string;
arguments: Record<string, unknown>;
}
export interface CompletionResult {
content: string;
toolCalls: ToolCall[];
usage: {
promptTokens: number;
completionTokens: number;
totalTokens: number;
};
finishReason: 'stop' | 'tool_calls' | 'length' | 'error';
}
export interface CompletionOptions {
messages: ChatMessage[];
tools?: ToolDefinition[];
temperature?: number;
maxTokens?: number;
model?: string;
}
export interface LlmProvider {
/** Provider identifier (e.g., 'openai', 'anthropic', 'ollama') */
readonly name: string;
/** Create a chat completion */
complete(options: CompletionOptions): Promise<CompletionResult>;
/** List available models */
listModels(): Promise<string[]>;
/** Check if the provider is configured and reachable */
isAvailable(): Promise<boolean>;
}

339
src/mcplocal/src/router.ts Normal file
View File

@@ -0,0 +1,339 @@
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification } from './types.js';
import type { LlmProcessor } from './llm/processor.js';
/**
* Routes MCP requests to the appropriate upstream server.
*
* The proxy presents a unified MCP interface to clients. Tools, resources,
* and prompts from all upstreams are merged under namespaced prefixes
* (e.g., "slack/send_message").
*
* Routing is done by name prefix: "servername/toolname" -> upstream "servername".
*/
export class McpRouter {
private upstreams = new Map<string, UpstreamConnection>();
private toolToServer = new Map<string, string>();
private resourceToServer = new Map<string, string>();
private promptToServer = new Map<string, string>();
private notificationHandler: ((notification: JsonRpcNotification) => void) | null = null;
private llmProcessor: LlmProcessor | null = null;
setLlmProcessor(processor: LlmProcessor): void {
this.llmProcessor = processor;
}
addUpstream(connection: UpstreamConnection): void {
this.upstreams.set(connection.name, connection);
if (this.notificationHandler && connection.onNotification) {
const serverName = connection.name;
const handler = this.notificationHandler;
connection.onNotification((notification) => {
handler({
...notification,
params: {
...notification.params,
_source: serverName,
},
});
});
}
}
removeUpstream(name: string): void {
this.upstreams.delete(name);
for (const map of [this.toolToServer, this.resourceToServer, this.promptToServer]) {
for (const [key, server] of map) {
if (server === name) {
map.delete(key);
}
}
}
}
setNotificationHandler(handler: (notification: JsonRpcNotification) => void): void {
this.notificationHandler = handler;
// Wire to all existing upstreams
for (const [serverName, upstream] of this.upstreams) {
if (upstream.onNotification) {
upstream.onNotification((notification) => {
handler({
...notification,
params: {
...notification.params,
_source: serverName,
},
});
});
}
}
}
/**
* Discover tools from all upstreams by calling tools/list on each.
*/
async discoverTools(): Promise<Array<{ name: string; description?: string; inputSchema?: unknown }>> {
const allTools: Array<{ name: string; description?: string; inputSchema?: unknown }> = [];
for (const [serverName, upstream] of this.upstreams) {
try {
const response = await upstream.send({
jsonrpc: '2.0',
id: `discover-tools-${serverName}`,
method: 'tools/list',
});
if (response.result && typeof response.result === 'object' && 'tools' in response.result) {
const tools = (response.result as { tools: Array<{ name: string; description?: string; inputSchema?: unknown }> }).tools;
for (const tool of tools) {
const namespacedName = `${serverName}/${tool.name}`;
this.toolToServer.set(namespacedName, serverName);
allTools.push({
...tool,
name: namespacedName,
});
}
}
} catch {
// Server may be unavailable; skip its tools
}
}
return allTools;
}
/**
* Discover resources from all upstreams by calling resources/list on each.
*/
async discoverResources(): Promise<Array<{ uri: string; name?: string; description?: string; mimeType?: string }>> {
const allResources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> = [];
for (const [serverName, upstream] of this.upstreams) {
try {
const response = await upstream.send({
jsonrpc: '2.0',
id: `discover-resources-${serverName}`,
method: 'resources/list',
});
if (response.result && typeof response.result === 'object' && 'resources' in response.result) {
const resources = (response.result as { resources: Array<{ uri: string; name?: string; description?: string; mimeType?: string }> }).resources;
for (const resource of resources) {
const namespacedUri = `${serverName}://${resource.uri}`;
this.resourceToServer.set(namespacedUri, serverName);
allResources.push({
...resource,
uri: namespacedUri,
});
}
}
} catch {
// Server may be unavailable; skip its resources
}
}
return allResources;
}
/**
* Discover prompts from all upstreams by calling prompts/list on each.
*/
async discoverPrompts(): Promise<Array<{ name: string; description?: string; arguments?: unknown[] }>> {
const allPrompts: Array<{ name: string; description?: string; arguments?: unknown[] }> = [];
for (const [serverName, upstream] of this.upstreams) {
try {
const response = await upstream.send({
jsonrpc: '2.0',
id: `discover-prompts-${serverName}`,
method: 'prompts/list',
});
if (response.result && typeof response.result === 'object' && 'prompts' in response.result) {
const prompts = (response.result as { prompts: Array<{ name: string; description?: string; arguments?: unknown[] }> }).prompts;
for (const prompt of prompts) {
const namespacedName = `${serverName}/${prompt.name}`;
this.promptToServer.set(namespacedName, serverName);
allPrompts.push({
...prompt,
name: namespacedName,
});
}
}
} catch {
// Server may be unavailable; skip its prompts
}
}
return allPrompts;
}
/**
* Route a namespaced call to the correct upstream, stripping the namespace prefix.
*/
private async routeNamespacedCall(
request: JsonRpcRequest,
nameField: string,
routingMap: Map<string, string>,
): Promise<JsonRpcResponse> {
const params = request.params as Record<string, unknown> | undefined;
const name = params?.[nameField] as string | undefined;
if (!name) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32602, message: `Missing ${nameField} in params` },
};
}
const serverName = routingMap.get(name);
if (!serverName) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32601, message: `Unknown ${nameField}: ${name}` },
};
}
const upstream = this.upstreams.get(serverName);
if (!upstream || !upstream.isAlive()) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32603, message: `Upstream '${serverName}' is not available` },
};
}
// Strip the namespace prefix
const originalName = nameField === 'uri'
? name.slice(`${serverName}://`.length)
: name.slice(serverName.length + 1);
const upstreamRequest: JsonRpcRequest = {
...request,
params: {
...params,
[nameField]: originalName,
},
};
return upstream.send(upstreamRequest);
}
/**
* Route a generic request. Handles protocol-level methods locally,
* delegates tool/resource/prompt calls to upstreams.
*/
async route(request: JsonRpcRequest): Promise<JsonRpcResponse> {
switch (request.method) {
case 'initialize':
return {
jsonrpc: '2.0',
id: request.id,
result: {
protocolVersion: '2024-11-05',
serverInfo: {
name: 'mcpctl-proxy',
version: '0.1.0',
},
capabilities: {
tools: {},
resources: {},
prompts: {},
},
},
};
case 'tools/list': {
const tools = await this.discoverTools();
return {
jsonrpc: '2.0',
id: request.id,
result: { tools },
};
}
case 'tools/call':
return this.routeToolCall(request);
case 'resources/list': {
const resources = await this.discoverResources();
return {
jsonrpc: '2.0',
id: request.id,
result: { resources },
};
}
case 'resources/read':
return this.routeNamespacedCall(request, 'uri', this.resourceToServer);
case 'resources/subscribe':
case 'resources/unsubscribe':
return this.routeNamespacedCall(request, 'uri', this.resourceToServer);
case 'prompts/list': {
const prompts = await this.discoverPrompts();
return {
jsonrpc: '2.0',
id: request.id,
result: { prompts },
};
}
case 'prompts/get':
return this.routeNamespacedCall(request, 'name', this.promptToServer);
default:
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32601, message: `Method not found: ${request.method}` },
};
}
}
/**
* Route a tools/call request, optionally applying LLM pre/post-processing.
*/
private async routeToolCall(request: JsonRpcRequest): Promise<JsonRpcResponse> {
const params = request.params as Record<string, unknown> | undefined;
const toolName = params?.['name'] as string | undefined;
// If no processor or tool shouldn't be processed, route directly
if (!this.llmProcessor || !toolName || !this.llmProcessor.shouldProcess('tools/call', toolName)) {
return this.routeNamespacedCall(request, 'name', this.toolToServer);
}
// Preprocess request params
const toolParams = (params?.['arguments'] ?? {}) as Record<string, unknown>;
const processed = await this.llmProcessor.preprocessRequest(toolName, toolParams);
const processedRequest: JsonRpcRequest = processed.optimized
? { ...request, params: { ...params, arguments: processed.params } }
: request;
// Route to upstream
const response = await this.routeNamespacedCall(processedRequest, 'name', this.toolToServer);
// Filter response
if (response.error) return response;
const filtered = await this.llmProcessor.filterResponse(toolName, response);
if (filtered.filtered) {
return { ...response, result: filtered.result };
}
return response;
}
getUpstreamNames(): string[] {
return [...this.upstreams.keys()];
}
async closeAll(): Promise<void> {
for (const upstream of this.upstreams.values()) {
await upstream.close();
}
this.upstreams.clear();
this.toolToServer.clear();
this.resourceToServer.clear();
this.promptToServer.clear();
}
}

View File

@@ -0,0 +1,71 @@
import { createInterface } from 'node:readline';
import type { JsonRpcRequest, JsonRpcResponse, JsonRpcNotification, JsonRpcMessage } from './types.js';
import type { McpRouter } from './router.js';
/**
* STDIO-based MCP proxy server.
*
* Reads JSON-RPC messages from stdin and writes responses to stdout.
* This is the transport that Claude Code uses to communicate with MCP servers.
*/
export class StdioProxyServer {
private running = false;
constructor(private router: McpRouter) {}
start(): void {
this.running = true;
// Forward notifications from upstreams to client
this.router.setNotificationHandler((notification) => {
if (this.running) {
this.sendNotification(notification);
}
});
const rl = createInterface({ input: process.stdin });
rl.on('line', (line) => {
if (!this.running) return;
this.handleLine(line).catch((err) => {
process.stderr.write(`Proxy error: ${err}\n`);
});
});
rl.on('close', () => {
this.running = false;
});
}
private async handleLine(line: string): Promise<void> {
let msg: JsonRpcMessage;
try {
msg = JSON.parse(line) as JsonRpcMessage;
} catch {
return; // Skip invalid JSON
}
// Only handle requests (messages with an id)
if (!('id' in msg) || msg.id === undefined) {
// Notification - no response needed
return;
}
const request = msg as JsonRpcRequest;
const response = await this.router.route(request);
this.sendResponse(response);
}
private sendResponse(response: JsonRpcResponse): void {
process.stdout.write(JSON.stringify(response) + '\n');
}
private sendNotification(notification: JsonRpcNotification): void {
process.stdout.write(JSON.stringify(notification) + '\n');
}
stop(): void {
this.running = false;
}
}

74
src/mcplocal/src/types.ts Normal file
View File

@@ -0,0 +1,74 @@
/**
* MCP JSON-RPC message types used by the proxy.
*/
export interface JsonRpcRequest {
jsonrpc: '2.0';
id: string | number;
method: string;
params?: Record<string, unknown>;
}
export interface JsonRpcResponse {
jsonrpc: '2.0';
id: string | number;
result?: unknown;
error?: JsonRpcError;
}
export interface JsonRpcNotification {
jsonrpc: '2.0';
method: string;
params?: Record<string, unknown>;
}
export interface JsonRpcError {
code: number;
message: string;
data?: unknown;
}
export type JsonRpcMessage = JsonRpcRequest | JsonRpcResponse | JsonRpcNotification;
/** Configuration for an upstream MCP server */
export interface UpstreamConfig {
/** Server ID from mcpd */
serverId: string;
/** Human-readable name */
name: string;
/** Transport type */
transport: 'stdio' | 'sse' | 'streamable-http';
/** For STDIO: command + args */
command?: string;
args?: string[];
/** For HTTP transports: URL */
url?: string;
/** Environment variables to pass */
env?: Record<string, string>;
}
/** Proxy server configuration */
export interface ProxyConfig {
/** Port for the proxy to listen on */
port: number;
/** Host to bind to */
host: string;
/** mcpd daemon URL for fetching server configs */
daemonUrl: string;
/** Upstream servers to proxy */
upstreams: UpstreamConfig[];
}
/** A running upstream connection */
export interface UpstreamConnection {
/** Server name */
name: string;
/** Send a JSON-RPC request and get a response */
send(request: JsonRpcRequest): Promise<JsonRpcResponse>;
/** Disconnect from the upstream */
close(): Promise<void>;
/** Whether the connection is alive */
isAlive(): boolean;
/** Register a handler for notifications from this upstream */
onNotification?(handler: (notification: JsonRpcNotification) => void): void;
}

View File

@@ -0,0 +1,71 @@
import http from 'node:http';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, UpstreamConfig } from '../types.js';
/**
* Connects to an MCP server over HTTP (SSE or Streamable HTTP transport).
* Sends JSON-RPC requests via HTTP POST.
*/
export class HttpUpstream implements UpstreamConnection {
readonly name: string;
private url: string;
private alive = true;
constructor(config: UpstreamConfig) {
this.name = config.name;
if (!config.url) {
throw new Error(`HTTP upstream '${config.name}' has no URL configured`);
}
this.url = config.url;
}
async send(request: JsonRpcRequest): Promise<JsonRpcResponse> {
if (!this.alive) {
throw new Error(`Upstream '${this.name}' is closed`);
}
const body = JSON.stringify(request);
const parsed = new URL(this.url);
return new Promise((resolve, reject) => {
const opts: http.RequestOptions = {
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname,
method: 'POST',
timeout: 30000,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
};
const req = http.request(opts, (res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () => {
try {
const raw = Buffer.concat(chunks).toString('utf-8');
resolve(JSON.parse(raw) as JsonRpcResponse);
} catch (err) {
reject(new Error(`Invalid response from '${this.name}': ${err}`));
}
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error(`Request to '${this.name}' timed out`));
});
req.write(body);
req.end();
});
}
async close(): Promise<void> {
this.alive = false;
}
isAlive(): boolean {
return this.alive;
}
}

View File

@@ -0,0 +1,3 @@
export { StdioUpstream } from './stdio.js';
export { HttpUpstream } from './http.js';
export { McpdUpstream } from './mcpd.js';

View File

@@ -0,0 +1,68 @@
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../types.js';
import type { McpdClient } from '../http/mcpd-client.js';
interface McpdProxyRequest {
serverId: string;
method: string;
params?: Record<string, unknown> | undefined;
}
interface McpdProxyResponse {
result?: unknown;
error?: { code: number; message: string; data?: unknown };
}
/**
* An upstream that routes MCP requests through mcpd's /api/v1/mcp/proxy endpoint.
* mcpd holds the credentials and manages the actual MCP server connections.
*/
export class McpdUpstream implements UpstreamConnection {
readonly name: string;
private alive = true;
constructor(
private serverId: string,
serverName: string,
private mcpdClient: McpdClient,
) {
this.name = serverName;
}
async send(request: JsonRpcRequest): Promise<JsonRpcResponse> {
if (!this.alive) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32603, message: `Upstream '${this.name}' is closed` },
};
}
const proxyRequest: McpdProxyRequest = {
serverId: this.serverId,
method: request.method,
params: request.params,
};
try {
const result = await this.mcpdClient.post<McpdProxyResponse>('/api/v1/mcp/proxy', proxyRequest);
if (result.error) {
return { jsonrpc: '2.0', id: request.id, error: result.error };
}
return { jsonrpc: '2.0', id: request.id, result: result.result };
} catch (err) {
return {
jsonrpc: '2.0',
id: request.id,
error: { code: -32603, message: `mcpd proxy error: ${(err as Error).message}` },
};
}
}
async close(): Promise<void> {
this.alive = false;
}
isAlive(): boolean {
return this.alive;
}
}

View File

@@ -0,0 +1,118 @@
import { spawn, type ChildProcess } from 'node:child_process';
import { createInterface } from 'node:readline';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification, UpstreamConfig } from '../types.js';
/**
* Connects to an MCP server over STDIO (spawn a process, write JSON-RPC to stdin, read from stdout).
*/
export class StdioUpstream implements UpstreamConnection {
readonly name: string;
private process: ChildProcess | null = null;
private pendingRequests = new Map<string | number, {
resolve: (res: JsonRpcResponse) => void;
reject: (err: Error) => void;
}>();
private alive = false;
private notificationHandlers: Array<(notification: JsonRpcNotification) => void> = [];
constructor(private config: UpstreamConfig) {
this.name = config.name;
}
async start(): Promise<void> {
if (!this.config.command) {
throw new Error(`STDIO upstream '${this.name}' has no command configured`);
}
this.process = spawn(this.config.command, this.config.args ?? [], {
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env, ...this.config.env },
});
this.alive = true;
this.process.on('exit', () => {
this.alive = false;
// Reject any pending requests
for (const [, pending] of this.pendingRequests) {
pending.reject(new Error(`Upstream '${this.name}' process exited`));
}
this.pendingRequests.clear();
});
if (this.process.stdout) {
const rl = createInterface({ input: this.process.stdout });
rl.on('line', (line) => {
try {
const msg = JSON.parse(line) as Record<string, unknown>;
if ('id' in msg && msg.id !== undefined) {
// Response to a pending request
const pending = this.pendingRequests.get(msg.id as string | number);
if (pending) {
this.pendingRequests.delete(msg.id as string | number);
pending.resolve(msg as unknown as JsonRpcResponse);
}
} else if ('method' in msg) {
// Notification from upstream
const notification: JsonRpcNotification = {
jsonrpc: '2.0',
method: msg.method as string,
};
if (msg.params) {
notification.params = msg.params as Record<string, unknown>;
}
for (const handler of this.notificationHandlers) {
handler(notification);
}
}
} catch {
// Skip non-JSON lines
}
});
}
}
async send(request: JsonRpcRequest): Promise<JsonRpcResponse> {
if (!this.process?.stdin || !this.alive) {
throw new Error(`Upstream '${this.name}' is not connected`);
}
return new Promise((resolve, reject) => {
this.pendingRequests.set(request.id, { resolve, reject });
const timeout = setTimeout(() => {
this.pendingRequests.delete(request.id);
reject(new Error(`Request to '${this.name}' timed out`));
}, 30000);
this.pendingRequests.set(request.id, {
resolve: (res) => {
clearTimeout(timeout);
resolve(res);
},
reject: (err) => {
clearTimeout(timeout);
reject(err);
},
});
this.process!.stdin!.write(JSON.stringify(request) + '\n');
});
}
async close(): Promise<void> {
if (this.process) {
this.process.kill('SIGTERM');
this.alive = false;
this.process = null;
}
}
isAlive(): boolean {
return this.alive;
}
onNotification(handler: (notification: JsonRpcNotification) => void): void {
this.notificationHandlers.push(handler);
}
}

View File

@@ -0,0 +1,68 @@
import { describe, it, expect, vi } from 'vitest';
import { refreshUpstreams } from '../src/discovery.js';
import { McpRouter } from '../src/router.js';
function mockMcpdClient(servers: Array<{ id: string; name: string; transport: string }>) {
return {
baseUrl: 'http://test:3100',
token: 'test-token',
get: vi.fn(async () => servers),
post: vi.fn(async () => ({ result: {} })),
put: vi.fn(),
delete: vi.fn(),
forward: vi.fn(),
};
}
describe('refreshUpstreams', () => {
it('registers mcpd servers as upstreams', async () => {
const router = new McpRouter();
const client = mockMcpdClient([
{ id: 'srv-1', name: 'slack', transport: 'stdio' },
{ id: 'srv-2', name: 'github', transport: 'stdio' },
]);
const registered = await refreshUpstreams(router, client as any);
expect(registered).toEqual(['slack', 'github']);
expect(router.getUpstreamNames()).toContain('slack');
expect(router.getUpstreamNames()).toContain('github');
});
it('removes stale upstreams', async () => {
const router = new McpRouter();
// First refresh: 2 servers
const client1 = mockMcpdClient([
{ id: 'srv-1', name: 'slack', transport: 'stdio' },
{ id: 'srv-2', name: 'github', transport: 'stdio' },
]);
await refreshUpstreams(router, client1 as any);
expect(router.getUpstreamNames()).toHaveLength(2);
// Second refresh: only 1 server
const client2 = mockMcpdClient([
{ id: 'srv-1', name: 'slack', transport: 'stdio' },
]);
await refreshUpstreams(router, client2 as any);
expect(router.getUpstreamNames()).toEqual(['slack']);
});
it('does not duplicate existing upstreams', async () => {
const router = new McpRouter();
const client = mockMcpdClient([
{ id: 'srv-1', name: 'slack', transport: 'stdio' },
]);
await refreshUpstreams(router, client as any);
await refreshUpstreams(router, client as any);
expect(router.getUpstreamNames()).toEqual(['slack']);
});
it('handles empty server list', async () => {
const router = new McpRouter();
const client = mockMcpdClient([]);
const registered = await refreshUpstreams(router, client as any);
expect(registered).toEqual([]);
expect(router.getUpstreamNames()).toHaveLength(0);
});
});

View File

@@ -0,0 +1,112 @@
import { describe, it, expect, vi, afterEach } from 'vitest';
import { FilterCache, DEFAULT_FILTER_CACHE_CONFIG } from '../src/llm/filter-cache.js';
describe('FilterCache', () => {
afterEach(() => {
vi.restoreAllMocks();
});
it('returns null for unknown tool names', () => {
const cache = new FilterCache();
expect(cache.shouldFilter('unknown/tool')).toBeNull();
});
it('stores and retrieves filter decisions', () => {
const cache = new FilterCache();
cache.recordDecision('slack/search', true);
expect(cache.shouldFilter('slack/search')).toBe(true);
cache.recordDecision('github/list_repos', false);
expect(cache.shouldFilter('github/list_repos')).toBe(false);
});
it('updates existing entries on re-record', () => {
const cache = new FilterCache();
cache.recordDecision('slack/search', true);
expect(cache.shouldFilter('slack/search')).toBe(true);
cache.recordDecision('slack/search', false);
expect(cache.shouldFilter('slack/search')).toBe(false);
});
it('evicts oldest entry when at capacity', () => {
const cache = new FilterCache({ maxEntries: 3 });
cache.recordDecision('tool-a', true);
cache.recordDecision('tool-b', false);
cache.recordDecision('tool-c', true);
expect(cache.size).toBe(3);
// Adding a 4th should evict 'tool-a' (oldest)
cache.recordDecision('tool-d', false);
expect(cache.size).toBe(3);
expect(cache.shouldFilter('tool-a')).toBeNull();
expect(cache.shouldFilter('tool-b')).toBe(false);
expect(cache.shouldFilter('tool-d')).toBe(false);
});
it('refreshes LRU position on access', () => {
const cache = new FilterCache({ maxEntries: 3 });
cache.recordDecision('tool-a', true);
cache.recordDecision('tool-b', false);
cache.recordDecision('tool-c', true);
// Access tool-a to refresh it
cache.shouldFilter('tool-a');
// Now add tool-d — tool-b should be evicted (oldest unreferenced)
cache.recordDecision('tool-d', false);
expect(cache.shouldFilter('tool-a')).toBe(true);
expect(cache.shouldFilter('tool-b')).toBeNull();
});
it('expires entries after TTL', () => {
const now = Date.now();
vi.spyOn(Date, 'now').mockReturnValue(now);
const cache = new FilterCache({ ttlMs: 1000 });
cache.recordDecision('slack/search', true);
expect(cache.shouldFilter('slack/search')).toBe(true);
// Advance time past TTL
vi.spyOn(Date, 'now').mockReturnValue(now + 1001);
expect(cache.shouldFilter('slack/search')).toBeNull();
// Entry should be removed
expect(cache.size).toBe(0);
});
it('does not expire entries within TTL', () => {
const now = Date.now();
vi.spyOn(Date, 'now').mockReturnValue(now);
const cache = new FilterCache({ ttlMs: 1000 });
cache.recordDecision('slack/search', true);
// Advance time within TTL
vi.spyOn(Date, 'now').mockReturnValue(now + 999);
expect(cache.shouldFilter('slack/search')).toBe(true);
});
it('clears all entries', () => {
const cache = new FilterCache();
cache.recordDecision('tool-a', true);
cache.recordDecision('tool-b', false);
expect(cache.size).toBe(2);
cache.clear();
expect(cache.size).toBe(0);
expect(cache.shouldFilter('tool-a')).toBeNull();
});
it('uses default config values', () => {
const cache = new FilterCache();
// Should support the default number of entries without issue
for (let i = 0; i < DEFAULT_FILTER_CACHE_CONFIG.maxEntries; i++) {
cache.recordDecision(`tool-${i}`, true);
}
expect(cache.size).toBe(DEFAULT_FILTER_CACHE_CONFIG.maxEntries);
// One more should trigger eviction
cache.recordDecision('extra-tool', true);
expect(cache.size).toBe(DEFAULT_FILTER_CACHE_CONFIG.maxEntries);
});
});

View File

@@ -0,0 +1,153 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { HealthMonitor } from '../src/health.js';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse } from '../src/types.js';
function mockUpstream(name: string, alive = true): UpstreamConnection {
return {
name,
isAlive: vi.fn(() => alive),
close: vi.fn(async () => {}),
send: vi.fn(async (req: JsonRpcRequest): Promise<JsonRpcResponse> => ({
jsonrpc: '2.0',
id: req.id,
error: { code: -32601, message: 'Method not found' },
})),
};
}
describe('HealthMonitor', () => {
let monitor: HealthMonitor;
beforeEach(() => {
monitor = new HealthMonitor({ intervalMs: 100, failureThreshold: 2 });
});
afterEach(() => {
monitor.stop();
});
it('tracks upstream and reports initial healthy state', () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
const status = monitor.getStatus('slack');
expect(status).toBeDefined();
expect(status?.state).toBe('healthy');
expect(status?.consecutiveFailures).toBe(0);
});
it('reports disconnected for dead upstream on track', () => {
const upstream = mockUpstream('slack', false);
monitor.track(upstream);
expect(monitor.getStatus('slack')?.state).toBe('disconnected');
});
it('marks upstream healthy when ping succeeds', async () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
await monitor.checkAll();
expect(monitor.isHealthy('slack')).toBe(true);
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(0);
});
it('marks upstream degraded after one failure', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValue(new Error('timeout'));
monitor.track(upstream);
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('degraded');
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(1);
});
it('marks upstream disconnected after threshold failures', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValue(new Error('timeout'));
monitor.track(upstream);
await monitor.checkAll();
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('disconnected');
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(2);
});
it('recovers from degraded to healthy', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValueOnce(new Error('timeout'));
monitor.track(upstream);
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('degraded');
// Next check succeeds
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('healthy');
expect(monitor.getStatus('slack')?.consecutiveFailures).toBe(0);
});
it('emits change events on state transitions', async () => {
const upstream = mockUpstream('slack');
vi.mocked(upstream.send).mockRejectedValue(new Error('timeout'));
monitor.track(upstream);
const changes: Array<{ name: string; oldState: string; newState: string }> = [];
monitor.on('change', (change) => changes.push(change));
await monitor.checkAll();
expect(changes).toHaveLength(1);
expect(changes[0]).toEqual({ name: 'slack', oldState: 'healthy', newState: 'degraded' });
await monitor.checkAll();
expect(changes).toHaveLength(2);
expect(changes[1]).toEqual({ name: 'slack', oldState: 'degraded', newState: 'disconnected' });
});
it('does not emit when state stays the same', async () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
const changes: unknown[] = [];
monitor.on('change', (change) => changes.push(change));
await monitor.checkAll();
await monitor.checkAll();
expect(changes).toHaveLength(0);
});
it('reports disconnected when upstream is not alive', async () => {
const upstream = mockUpstream('slack');
monitor.track(upstream);
vi.mocked(upstream.isAlive).mockReturnValue(false);
const changes: Array<{ name: string; oldState: string; newState: string }> = [];
monitor.on('change', (change) => changes.push(change));
await monitor.checkAll();
expect(monitor.getStatus('slack')?.state).toBe('disconnected');
expect(changes).toHaveLength(1);
});
it('returns all statuses', () => {
monitor.track(mockUpstream('slack'));
monitor.track(mockUpstream('github'));
const statuses = monitor.getAllStatuses();
expect(statuses).toHaveLength(2);
expect(statuses.map((s) => s.name)).toEqual(['slack', 'github']);
});
it('untracks upstream', () => {
monitor.track(mockUpstream('slack'));
monitor.untrack('slack');
expect(monitor.getStatus('slack')).toBeUndefined();
expect(monitor.getAllStatuses()).toHaveLength(0);
});
});

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,283 @@
import { describe, it, expect, vi } from 'vitest';
import { LlmProcessor, DEFAULT_PROCESSOR_CONFIG } from '../src/llm/processor.js';
import { ProviderRegistry } from '../src/providers/registry.js';
import type { LlmProvider, CompletionResult } from '../src/providers/types.js';
function mockProvider(responses: string[]): LlmProvider {
let callIndex = 0;
return {
name: 'mock',
async complete(): Promise<CompletionResult> {
const content = responses[callIndex] ?? '{}';
callIndex++;
return {
content,
toolCalls: [],
usage: { promptTokens: 10, completionTokens: 5, totalTokens: 15 },
finishReason: 'stop',
};
},
async listModels() { return ['mock-1']; },
async isAvailable() { return true; },
};
}
function makeRegistry(provider?: LlmProvider): ProviderRegistry {
const registry = new ProviderRegistry();
if (provider) {
registry.register(provider);
}
return registry;
}
describe('LlmProcessor.shouldProcess', () => {
it('bypasses protocol-level methods', () => {
const proc = new LlmProcessor(makeRegistry());
expect(proc.shouldProcess('initialize')).toBe(false);
expect(proc.shouldProcess('tools/list')).toBe(false);
expect(proc.shouldProcess('resources/list')).toBe(false);
expect(proc.shouldProcess('prompts/list')).toBe(false);
});
it('returns false when no tool name', () => {
const proc = new LlmProcessor(makeRegistry());
expect(proc.shouldProcess('tools/call')).toBe(false);
});
it('returns true for normal tool calls', () => {
const proc = new LlmProcessor(makeRegistry());
expect(proc.shouldProcess('tools/call', 'slack/search_messages')).toBe(true);
});
it('skips excluded tools', () => {
const proc = new LlmProcessor(makeRegistry(), {
...DEFAULT_PROCESSOR_CONFIG,
excludeTools: ['slack'],
});
expect(proc.shouldProcess('tools/call', 'slack/search_messages')).toBe(false);
expect(proc.shouldProcess('tools/call', 'github/search')).toBe(true);
});
it('skips simple CRUD operations', () => {
const proc = new LlmProcessor(makeRegistry());
expect(proc.shouldProcess('tools/call', 'slack/create_channel')).toBe(false);
expect(proc.shouldProcess('tools/call', 'slack/delete_message')).toBe(false);
expect(proc.shouldProcess('tools/call', 'slack/remove_user')).toBe(false);
});
});
describe('LlmProcessor.preprocessRequest', () => {
it('returns original params when preprocessing disabled', async () => {
const proc = new LlmProcessor(makeRegistry(mockProvider(['{}'])), {
...DEFAULT_PROCESSOR_CONFIG,
enablePreprocessing: false,
});
const result = await proc.preprocessRequest('slack/search', { query: 'test' });
expect(result.optimized).toBe(false);
expect(result.params).toEqual({ query: 'test' });
});
it('returns original params when no provider', async () => {
const proc = new LlmProcessor(makeRegistry(), {
...DEFAULT_PROCESSOR_CONFIG,
enablePreprocessing: true,
});
const result = await proc.preprocessRequest('slack/search', { query: 'test' });
expect(result.optimized).toBe(false);
});
it('optimizes params with LLM', async () => {
const provider = mockProvider([JSON.stringify({ query: 'test', limit: 10 })]);
const proc = new LlmProcessor(makeRegistry(provider), {
...DEFAULT_PROCESSOR_CONFIG,
enablePreprocessing: true,
});
const result = await proc.preprocessRequest('slack/search', { query: 'test' });
expect(result.optimized).toBe(true);
expect(result.params).toEqual({ query: 'test', limit: 10 });
});
it('falls back on LLM error', async () => {
const badProvider: LlmProvider = {
name: 'bad',
async complete() { throw new Error('LLM down'); },
async listModels() { return []; },
async isAvailable() { return false; },
};
const proc = new LlmProcessor(makeRegistry(badProvider), {
...DEFAULT_PROCESSOR_CONFIG,
enablePreprocessing: true,
});
const result = await proc.preprocessRequest('slack/search', { query: 'test' });
expect(result.optimized).toBe(false);
expect(result.params).toEqual({ query: 'test' });
});
});
describe('LlmProcessor.filterResponse', () => {
it('returns original when filtering disabled', async () => {
const proc = new LlmProcessor(makeRegistry(mockProvider([])), {
...DEFAULT_PROCESSOR_CONFIG,
enableFiltering: false,
});
const response = { jsonrpc: '2.0' as const, id: '1', result: { data: 'big' } };
const result = await proc.filterResponse('slack/search', response);
expect(result.filtered).toBe(false);
});
it('returns original when no provider', async () => {
const proc = new LlmProcessor(makeRegistry());
const response = { jsonrpc: '2.0' as const, id: '1', result: { data: 'x'.repeat(600) } };
const result = await proc.filterResponse('slack/search', response);
expect(result.filtered).toBe(false);
});
it('skips small responses below token threshold', async () => {
const proc = new LlmProcessor(makeRegistry(mockProvider([])));
// With default tokenThreshold=250, any response < 1000 chars (~250 tokens) is skipped
const response = { jsonrpc: '2.0' as const, id: '1', result: { data: 'small' } };
const result = await proc.filterResponse('slack/search', response);
expect(result.filtered).toBe(false);
});
it('skips error responses', async () => {
const proc = new LlmProcessor(makeRegistry(mockProvider([])));
const response = { jsonrpc: '2.0' as const, id: '1', error: { code: -1, message: 'fail' } };
const result = await proc.filterResponse('slack/search', response);
expect(result.filtered).toBe(false);
});
it('filters large responses with LLM', async () => {
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, name: `item-${i}`, extra: 'x'.repeat(20) })) };
const filteredData = { items: [{ id: 0, name: 'item-0' }, { id: 1, name: 'item-1' }] };
const provider = mockProvider([JSON.stringify(filteredData)]);
const proc = new LlmProcessor(makeRegistry(provider));
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
const result = await proc.filterResponse('slack/search', response);
expect(result.filtered).toBe(true);
expect(result.filteredSize).toBeLessThan(result.originalSize);
});
it('falls back on LLM error', async () => {
const badProvider: LlmProvider = {
name: 'bad',
async complete() { throw new Error('LLM down'); },
async listModels() { return []; },
async isAvailable() { return false; },
};
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, extra: 'x'.repeat(20) })) };
const proc = new LlmProcessor(makeRegistry(badProvider));
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
const result = await proc.filterResponse('slack/search', response);
expect(result.filtered).toBe(false);
expect(result.result).toEqual(largeData);
});
it('respects custom tokenThreshold', async () => {
// Set a very high threshold so that even "big" responses are skipped
const proc = new LlmProcessor(makeRegistry(mockProvider([])), {
...DEFAULT_PROCESSOR_CONFIG,
tokenThreshold: 10_000,
});
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, name: `item-${i}` })) };
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
const result = await proc.filterResponse('slack/search', response);
expect(result.filtered).toBe(false);
});
it('uses filter cache to skip repeated filtering', async () => {
// First call: LLM returns same-size data => cache records shouldFilter=false
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, extra: 'x'.repeat(20) })) };
const raw = JSON.stringify(largeData);
// Return something larger so the cache stores shouldFilter=false (filtered not smaller)
const notSmaller = JSON.stringify(largeData);
const provider = mockProvider([notSmaller]);
const proc = new LlmProcessor(makeRegistry(provider));
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
// First call goes to LLM
await proc.filterResponse('slack/search', response);
// Second call should hit cache (shouldFilter=false) and skip LLM
const result2 = await proc.filterResponse('slack/search', response);
expect(result2.filtered).toBe(false);
const metrics = proc.getMetrics();
expect(metrics.cacheHits).toBeGreaterThanOrEqual(1);
});
it('records metrics on filter operations', async () => {
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, name: `item-${i}`, extra: 'x'.repeat(20) })) };
const filteredData = { items: [{ id: 0, name: 'item-0' }] };
const provider = mockProvider([JSON.stringify(filteredData)]);
const proc = new LlmProcessor(makeRegistry(provider));
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
await proc.filterResponse('slack/search', response);
const metrics = proc.getMetrics();
expect(metrics.filterCount).toBe(1);
expect(metrics.totalTokensProcessed).toBeGreaterThan(0);
expect(metrics.tokensSaved).toBeGreaterThan(0);
expect(metrics.cacheMisses).toBe(1);
});
it('records metrics even on LLM failure', async () => {
const badProvider: LlmProvider = {
name: 'bad',
async complete() { throw new Error('LLM down'); },
async listModels() { return []; },
async isAvailable() { return false; },
};
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, extra: 'x'.repeat(20) })) };
const proc = new LlmProcessor(makeRegistry(badProvider));
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
await proc.filterResponse('slack/search', response);
const metrics = proc.getMetrics();
expect(metrics.filterCount).toBe(1);
expect(metrics.totalTokensProcessed).toBeGreaterThan(0);
// No tokens saved because filter failed
expect(metrics.tokensSaved).toBe(0);
});
});
describe('LlmProcessor metrics and cache management', () => {
it('exposes metrics via getMetrics()', () => {
const proc = new LlmProcessor(makeRegistry());
const metrics = proc.getMetrics();
expect(metrics.totalTokensProcessed).toBe(0);
expect(metrics.filterCount).toBe(0);
});
it('resets metrics', async () => {
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, extra: 'x'.repeat(20) })) };
const provider = mockProvider([JSON.stringify({ summary: 'ok' })]);
const proc = new LlmProcessor(makeRegistry(provider));
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
await proc.filterResponse('slack/search', response);
expect(proc.getMetrics().filterCount).toBe(1);
proc.resetMetrics();
expect(proc.getMetrics().filterCount).toBe(0);
});
it('clears filter cache', async () => {
const largeData = { items: Array.from({ length: 50 }, (_, i) => ({ id: i, extra: 'x'.repeat(20) })) };
const filteredData = { items: [{ id: 0 }] };
// Two responses needed: first call filters, second call after cache clear also filters
const provider = mockProvider([JSON.stringify(filteredData), JSON.stringify(filteredData)]);
const proc = new LlmProcessor(makeRegistry(provider));
const response = { jsonrpc: '2.0' as const, id: '1', result: largeData };
await proc.filterResponse('slack/search', response);
proc.clearFilterCache();
// After clearing cache, should get a cache miss again
proc.resetMetrics();
await proc.filterResponse('slack/search', response);
expect(proc.getMetrics().cacheMisses).toBe(1);
});
});

View File

@@ -0,0 +1,110 @@
import { describe, it, expect, vi } from 'vitest';
import { McpdUpstream } from '../src/upstream/mcpd.js';
import type { JsonRpcRequest } from '../src/types.js';
function mockMcpdClient(responses: Map<string, unknown> = new Map()) {
return {
baseUrl: 'http://test:3100',
token: 'test-token',
get: vi.fn(),
post: vi.fn(async (_path: string, body: unknown) => {
const req = body as { serverId: string; method: string };
const key = `${req.serverId}:${req.method}`;
if (responses.has(key)) {
return responses.get(key);
}
return { result: { ok: true } };
}),
put: vi.fn(),
delete: vi.fn(),
forward: vi.fn(),
};
}
describe('McpdUpstream', () => {
it('sends tool calls via mcpd proxy', async () => {
const client = mockMcpdClient(new Map([
['srv-1:tools/call', { result: { content: [{ type: 'text', text: 'hello' }] } }],
]));
const upstream = new McpdUpstream('srv-1', 'slack', client as any);
const request: JsonRpcRequest = {
jsonrpc: '2.0',
id: '1',
method: 'tools/call',
params: { name: 'search', arguments: { query: 'test' } },
};
const response = await upstream.send(request);
expect(response.result).toEqual({ content: [{ type: 'text', text: 'hello' }] });
expect(client.post).toHaveBeenCalledWith('/api/v1/mcp/proxy', {
serverId: 'srv-1',
method: 'tools/call',
params: { name: 'search', arguments: { query: 'test' } },
});
});
it('sends tools/list via mcpd proxy', async () => {
const client = mockMcpdClient(new Map([
['srv-1:tools/list', { result: { tools: [{ name: 'search', description: 'Search' }] } }],
]));
const upstream = new McpdUpstream('srv-1', 'slack', client as any);
const request: JsonRpcRequest = {
jsonrpc: '2.0',
id: '2',
method: 'tools/list',
};
const response = await upstream.send(request);
expect(response.result).toEqual({ tools: [{ name: 'search', description: 'Search' }] });
});
it('returns error when mcpd fails', async () => {
const client = mockMcpdClient();
client.post.mockRejectedValue(new Error('connection refused'));
const upstream = new McpdUpstream('srv-1', 'slack', client as any);
const request: JsonRpcRequest = { jsonrpc: '2.0', id: '3', method: 'tools/list' };
const response = await upstream.send(request);
expect(response.error).toBeDefined();
expect(response.error!.message).toContain('mcpd proxy error');
});
it('returns error when upstream is closed', async () => {
const client = mockMcpdClient();
const upstream = new McpdUpstream('srv-1', 'slack', client as any);
await upstream.close();
const request: JsonRpcRequest = { jsonrpc: '2.0', id: '4', method: 'tools/list' };
const response = await upstream.send(request);
expect(response.error).toBeDefined();
expect(response.error!.message).toContain('closed');
});
it('reports alive status correctly', async () => {
const client = mockMcpdClient();
const upstream = new McpdUpstream('srv-1', 'slack', client as any);
expect(upstream.isAlive()).toBe(true);
await upstream.close();
expect(upstream.isAlive()).toBe(false);
});
it('relays error responses from mcpd', async () => {
const client = mockMcpdClient(new Map([
['srv-1:tools/call', { error: { code: -32601, message: 'Tool not found' } }],
]));
const upstream = new McpdUpstream('srv-1', 'slack', client as any);
const request: JsonRpcRequest = {
jsonrpc: '2.0',
id: '5',
method: 'tools/call',
params: { name: 'nonexistent' },
};
const response = await upstream.send(request);
expect(response.error).toEqual({ code: -32601, message: 'Tool not found' });
});
});

View File

@@ -0,0 +1,93 @@
import { describe, it, expect } from 'vitest';
import { FilterMetrics } from '../src/llm/metrics.js';
describe('FilterMetrics', () => {
it('starts with zeroed stats', () => {
const m = new FilterMetrics();
const stats = m.getStats();
expect(stats.totalTokensProcessed).toBe(0);
expect(stats.tokensSaved).toBe(0);
expect(stats.cacheHits).toBe(0);
expect(stats.cacheMisses).toBe(0);
expect(stats.filterCount).toBe(0);
expect(stats.averageFilterLatencyMs).toBe(0);
});
it('records filter operations and accumulates tokens', () => {
const m = new FilterMetrics();
m.recordFilter(500, 200, 50);
m.recordFilter(300, 100, 30);
const stats = m.getStats();
expect(stats.totalTokensProcessed).toBe(800);
expect(stats.tokensSaved).toBe(500); // (500-200) + (300-100)
expect(stats.filterCount).toBe(2);
expect(stats.averageFilterLatencyMs).toBe(40); // (50+30)/2
});
it('does not allow negative token savings', () => {
const m = new FilterMetrics();
// Filtered output is larger than original (edge case)
m.recordFilter(100, 200, 10);
const stats = m.getStats();
expect(stats.totalTokensProcessed).toBe(100);
expect(stats.tokensSaved).toBe(0); // clamped to 0
});
it('records cache hits and misses independently', () => {
const m = new FilterMetrics();
m.recordCacheHit();
m.recordCacheHit();
m.recordCacheMiss();
const stats = m.getStats();
expect(stats.cacheHits).toBe(2);
expect(stats.cacheMisses).toBe(1);
});
it('computes average latency correctly', () => {
const m = new FilterMetrics();
m.recordFilter(100, 50, 10);
m.recordFilter(100, 50, 20);
m.recordFilter(100, 50, 30);
expect(m.getStats().averageFilterLatencyMs).toBe(20);
});
it('returns 0 average latency when no filter operations', () => {
const m = new FilterMetrics();
// Only cache operations, no filter calls
m.recordCacheHit();
expect(m.getStats().averageFilterLatencyMs).toBe(0);
});
it('resets all metrics to zero', () => {
const m = new FilterMetrics();
m.recordFilter(500, 200, 50);
m.recordCacheHit();
m.recordCacheMiss();
m.reset();
const stats = m.getStats();
expect(stats.totalTokensProcessed).toBe(0);
expect(stats.tokensSaved).toBe(0);
expect(stats.cacheHits).toBe(0);
expect(stats.cacheMisses).toBe(0);
expect(stats.filterCount).toBe(0);
expect(stats.averageFilterLatencyMs).toBe(0);
});
it('returns independent snapshots', () => {
const m = new FilterMetrics();
m.recordFilter(100, 50, 10);
const snap1 = m.getStats();
m.recordFilter(200, 100, 20);
const snap2 = m.getStats();
// snap1 should not have been mutated
expect(snap1.totalTokensProcessed).toBe(100);
expect(snap2.totalTokensProcessed).toBe(300);
});
});

View File

@@ -0,0 +1,118 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { ProviderRegistry } from '../src/providers/registry.js';
import type { LlmProvider, CompletionOptions, CompletionResult } from '../src/providers/types.js';
function mockProvider(name: string): LlmProvider {
return {
name,
complete: vi.fn(async (): Promise<CompletionResult> => ({
content: `Response from ${name}`,
toolCalls: [],
usage: { promptTokens: 10, completionTokens: 20, totalTokens: 30 },
finishReason: 'stop',
})),
listModels: vi.fn(async () => [`${name}-model-1`, `${name}-model-2`]),
isAvailable: vi.fn(async () => true),
};
}
describe('ProviderRegistry', () => {
let registry: ProviderRegistry;
beforeEach(() => {
registry = new ProviderRegistry();
});
it('starts with no providers', () => {
expect(registry.list()).toEqual([]);
expect(registry.getActive()).toBeNull();
expect(registry.getActiveName()).toBeNull();
});
it('registers a provider and sets it as active', () => {
const openai = mockProvider('openai');
registry.register(openai);
expect(registry.list()).toEqual(['openai']);
expect(registry.getActive()).toBe(openai);
expect(registry.getActiveName()).toBe('openai');
});
it('first registered provider becomes active', () => {
registry.register(mockProvider('openai'));
registry.register(mockProvider('anthropic'));
expect(registry.getActiveName()).toBe('openai');
expect(registry.list()).toEqual(['openai', 'anthropic']);
});
it('switches active provider', () => {
registry.register(mockProvider('openai'));
registry.register(mockProvider('anthropic'));
registry.setActive('anthropic');
expect(registry.getActiveName()).toBe('anthropic');
});
it('throws when setting unknown provider as active', () => {
expect(() => registry.setActive('unknown')).toThrow("Provider 'unknown' is not registered");
});
it('gets provider by name', () => {
const openai = mockProvider('openai');
registry.register(openai);
expect(registry.get('openai')).toBe(openai);
expect(registry.get('unknown')).toBeUndefined();
});
it('unregisters a provider', () => {
registry.register(mockProvider('openai'));
registry.register(mockProvider('anthropic'));
registry.unregister('openai');
expect(registry.list()).toEqual(['anthropic']);
// Active should switch to remaining provider
expect(registry.getActiveName()).toBe('anthropic');
});
it('unregistering active provider switches to next available', () => {
registry.register(mockProvider('openai'));
registry.register(mockProvider('anthropic'));
registry.setActive('openai');
registry.unregister('openai');
expect(registry.getActiveName()).toBe('anthropic');
});
it('unregistering last provider clears active', () => {
registry.register(mockProvider('openai'));
registry.unregister('openai');
expect(registry.getActive()).toBeNull();
expect(registry.getActiveName()).toBeNull();
});
it('active provider can complete', async () => {
const provider = mockProvider('openai');
registry.register(provider);
const active = registry.getActive()!;
const result = await active.complete({
messages: [{ role: 'user', content: 'Hello' }],
});
expect(result.content).toBe('Response from openai');
expect(result.finishReason).toBe('stop');
expect(provider.complete).toHaveBeenCalled();
});
it('active provider can list models', async () => {
registry.register(mockProvider('anthropic'));
const active = registry.getActive()!;
const models = await active.listModels();
expect(models).toEqual(['anthropic-model-1', 'anthropic-model-2']);
});
});

View File

@@ -0,0 +1,448 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { McpRouter } from '../src/router.js';
import type { UpstreamConnection, JsonRpcRequest, JsonRpcResponse, JsonRpcNotification } from '../src/types.js';
function mockUpstream(
name: string,
opts: {
tools?: Array<{ name: string; description?: string }>;
resources?: Array<{ uri: string; name?: string; description?: string }>;
prompts?: Array<{ name: string; description?: string }>;
} = {},
): UpstreamConnection {
const notificationHandlers: Array<(n: JsonRpcNotification) => void> = [];
return {
name,
isAlive: vi.fn(() => true),
close: vi.fn(async () => {}),
onNotification: vi.fn((handler: (n: JsonRpcNotification) => void) => {
notificationHandlers.push(handler);
}),
send: vi.fn(async (req: JsonRpcRequest): Promise<JsonRpcResponse> => {
if (req.method === 'tools/list') {
return {
jsonrpc: '2.0',
id: req.id,
result: { tools: opts.tools ?? [] },
};
}
if (req.method === 'resources/list') {
return {
jsonrpc: '2.0',
id: req.id,
result: { resources: opts.resources ?? [] },
};
}
if (req.method === 'prompts/list') {
return {
jsonrpc: '2.0',
id: req.id,
result: { prompts: opts.prompts ?? [] },
};
}
if (req.method === 'tools/call') {
return {
jsonrpc: '2.0',
id: req.id,
result: {
content: [{ type: 'text', text: `Called ${(req.params as Record<string, unknown>)?.name}` }],
},
};
}
if (req.method === 'resources/read') {
return {
jsonrpc: '2.0',
id: req.id,
result: {
contents: [{ uri: (req.params as Record<string, unknown>)?.uri, text: 'resource content' }],
},
};
}
if (req.method === 'prompts/get') {
return {
jsonrpc: '2.0',
id: req.id,
result: {
messages: [{ role: 'user', content: { type: 'text', text: 'prompt content' } }],
},
};
}
return { jsonrpc: '2.0', id: req.id, error: { code: -32601, message: 'Not found' } };
}),
// expose for tests
_notificationHandlers: notificationHandlers,
} as UpstreamConnection & { _notificationHandlers: Array<(n: JsonRpcNotification) => void> };
}
describe('McpRouter', () => {
let router: McpRouter;
beforeEach(() => {
router = new McpRouter();
});
describe('initialize', () => {
it('responds with server info and capabilities including resources and prompts', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'initialize',
});
expect(res.result).toBeDefined();
const result = res.result as Record<string, unknown>;
expect(result['protocolVersion']).toBe('2024-11-05');
expect((result['serverInfo'] as Record<string, unknown>)['name']).toBe('mcpctl-proxy');
const capabilities = result['capabilities'] as Record<string, unknown>;
expect(capabilities['tools']).toBeDefined();
expect(capabilities['resources']).toBeDefined();
expect(capabilities['prompts']).toBeDefined();
});
});
describe('tools/list', () => {
it('returns empty tools when no upstreams', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/list',
});
const result = res.result as { tools: unknown[] };
expect(result.tools).toEqual([]);
});
it('discovers and namespaces tools from upstreams', async () => {
router.addUpstream(mockUpstream('slack', {
tools: [
{ name: 'send_message', description: 'Send a message' },
{ name: 'list_channels', description: 'List channels' },
],
}));
router.addUpstream(mockUpstream('github', {
tools: [
{ name: 'create_issue', description: 'Create an issue' },
],
}));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/list',
});
const result = res.result as { tools: Array<{ name: string }> };
expect(result.tools).toHaveLength(3);
expect(result.tools.map((t) => t.name)).toContain('slack/send_message');
expect(result.tools.map((t) => t.name)).toContain('slack/list_channels');
expect(result.tools.map((t) => t.name)).toContain('github/create_issue');
});
it('skips unavailable upstreams', async () => {
const failingUpstream = mockUpstream('failing');
vi.mocked(failingUpstream.send).mockRejectedValue(new Error('Connection refused'));
router.addUpstream(failingUpstream);
router.addUpstream(mockUpstream('working', {
tools: [{ name: 'do_thing', description: 'Does a thing' }],
}));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/list',
});
const result = res.result as { tools: Array<{ name: string }> };
expect(result.tools).toHaveLength(1);
expect(result.tools[0]?.name).toBe('working/do_thing');
});
});
describe('tools/call', () => {
it('routes call to correct upstream', async () => {
const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] });
router.addUpstream(slack);
await router.discoverTools();
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: { name: 'slack/send_message', arguments: { channel: '#general', text: 'hello' } },
});
expect(res.result).toBeDefined();
// Verify the upstream received the call with de-namespaced tool name
expect(vi.mocked(slack.send)).toHaveBeenCalledWith(
expect.objectContaining({
method: 'tools/call',
params: expect.objectContaining({ name: 'send_message' }),
}),
);
});
it('returns error for unknown tool', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: { name: 'unknown/tool' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
it('returns error when tool name is missing', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: {},
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32602);
});
it('returns error when upstream is dead', async () => {
const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] });
router.addUpstream(slack);
await router.discoverTools();
vi.mocked(slack.isAlive).mockReturnValue(false);
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: { name: 'slack/send_message' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32603);
});
});
describe('resources/list', () => {
it('returns empty resources when no upstreams', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/list',
});
const result = res.result as { resources: unknown[] };
expect(result.resources).toEqual([]);
});
it('discovers and namespaces resources from upstreams', async () => {
router.addUpstream(mockUpstream('files', {
resources: [
{ uri: 'file:///docs/readme.md', name: 'README', description: 'Project readme' },
],
}));
router.addUpstream(mockUpstream('db', {
resources: [
{ uri: 'db://users', name: 'Users table' },
],
}));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/list',
});
const result = res.result as { resources: Array<{ uri: string }> };
expect(result.resources).toHaveLength(2);
expect(result.resources.map((r) => r.uri)).toContain('files://file:///docs/readme.md');
expect(result.resources.map((r) => r.uri)).toContain('db://db://users');
});
});
describe('resources/read', () => {
it('routes read to correct upstream', async () => {
const files = mockUpstream('files', {
resources: [{ uri: 'file:///docs/readme.md', name: 'README' }],
});
router.addUpstream(files);
await router.discoverResources();
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/read',
params: { uri: 'files://file:///docs/readme.md' },
});
expect(res.result).toBeDefined();
expect(vi.mocked(files.send)).toHaveBeenCalledWith(
expect.objectContaining({
method: 'resources/read',
params: expect.objectContaining({ uri: 'file:///docs/readme.md' }),
}),
);
});
it('returns error for unknown resource', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'resources/read',
params: { uri: 'unknown://resource' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
});
describe('prompts/list', () => {
it('returns empty prompts when no upstreams', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/list',
});
const result = res.result as { prompts: unknown[] };
expect(result.prompts).toEqual([]);
});
it('discovers and namespaces prompts from upstreams', async () => {
router.addUpstream(mockUpstream('code', {
prompts: [
{ name: 'review', description: 'Code review' },
{ name: 'explain', description: 'Explain code' },
],
}));
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/list',
});
const result = res.result as { prompts: Array<{ name: string }> };
expect(result.prompts).toHaveLength(2);
expect(result.prompts.map((p) => p.name)).toContain('code/review');
expect(result.prompts.map((p) => p.name)).toContain('code/explain');
});
});
describe('prompts/get', () => {
it('routes get to correct upstream', async () => {
const code = mockUpstream('code', {
prompts: [{ name: 'review', description: 'Code review' }],
});
router.addUpstream(code);
await router.discoverPrompts();
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/get',
params: { name: 'code/review' },
});
expect(res.result).toBeDefined();
expect(vi.mocked(code.send)).toHaveBeenCalledWith(
expect.objectContaining({
method: 'prompts/get',
params: expect.objectContaining({ name: 'review' }),
}),
);
});
it('returns error for unknown prompt', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'prompts/get',
params: { name: 'unknown/prompt' },
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
});
describe('notifications', () => {
it('forwards notifications from upstreams with source tag', () => {
const received: JsonRpcNotification[] = [];
router.setNotificationHandler((n) => received.push(n));
const slack = mockUpstream('slack', { tools: [{ name: 'send_message' }] }) as UpstreamConnection & {
_notificationHandlers: Array<(n: JsonRpcNotification) => void>;
};
router.addUpstream(slack);
// Simulate upstream sending a notification
for (const handler of slack._notificationHandlers) {
handler({ jsonrpc: '2.0', method: 'notifications/progress', params: { progress: 50 } });
}
expect(received).toHaveLength(1);
expect(received[0]?.method).toBe('notifications/progress');
expect(received[0]?.params?._source).toBe('slack');
});
});
describe('unknown methods', () => {
it('returns method not found error', async () => {
const res = await router.route({
jsonrpc: '2.0',
id: 1,
method: 'completions/complete',
});
expect(res.error).toBeDefined();
expect(res.error?.code).toBe(-32601);
});
});
describe('upstream management', () => {
it('lists upstream names', () => {
router.addUpstream(mockUpstream('slack'));
router.addUpstream(mockUpstream('github'));
expect(router.getUpstreamNames()).toEqual(['slack', 'github']);
});
it('removes upstream and cleans up all mappings', async () => {
const slack = mockUpstream('slack', {
tools: [{ name: 'send_message' }],
resources: [{ uri: 'slack://channels' }],
prompts: [{ name: 'compose' }],
});
router.addUpstream(slack);
await router.discoverTools();
await router.discoverResources();
await router.discoverPrompts();
router.removeUpstream('slack');
expect(router.getUpstreamNames()).toEqual([]);
// Verify tool/resource/prompt mappings are cleaned
const toolRes = await router.route({
jsonrpc: '2.0', id: 1, method: 'tools/call',
params: { name: 'slack/send_message' },
});
expect(toolRes.error?.code).toBe(-32601);
});
it('closes all upstreams', async () => {
const slack = mockUpstream('slack');
const github = mockUpstream('github');
router.addUpstream(slack);
router.addUpstream(github);
await router.closeAll();
expect(slack.close).toHaveBeenCalled();
expect(github.close).toHaveBeenCalled();
expect(router.getUpstreamNames()).toEqual([]);
});
});
});

View File

@@ -0,0 +1,304 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { TieredHealthMonitor } from '../src/health/tiered.js';
import type { TieredHealthMonitorDeps } from '../src/health/tiered.js';
import type { McpdClient } from '../src/http/mcpd-client.js';
import { ProviderRegistry } from '../src/providers/registry.js';
import type { LlmProvider } from '../src/providers/types.js';
function mockMcpdClient(overrides?: {
getResult?: unknown;
getFails?: boolean;
instancesResult?: { instances: Array<{ name: string; status: string }> };
instancesFails?: boolean;
}): McpdClient {
const client = {
get: vi.fn(async (path: string) => {
if (path === '/health') {
if (overrides?.getFails) {
throw new Error('Connection refused');
}
return overrides?.getResult ?? { status: 'ok' };
}
if (path === '/instances') {
if (overrides?.instancesFails) {
throw new Error('Connection refused');
}
return overrides?.instancesResult ?? { instances: [] };
}
return {};
}),
post: vi.fn(),
put: vi.fn(),
delete: vi.fn(),
forward: vi.fn(),
} as unknown as McpdClient;
return client;
}
function mockLlmProvider(name: string): LlmProvider {
return {
name,
complete: vi.fn(),
listModels: vi.fn(async () => []),
isAvailable: vi.fn(async () => true),
};
}
describe('TieredHealthMonitor', () => {
let providerRegistry: ProviderRegistry;
beforeEach(() => {
providerRegistry = new ProviderRegistry();
});
describe('mcplocal health', () => {
it('reports healthy status with uptime', async () => {
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcplocal.status).toBe('healthy');
expect(result.mcplocal.uptime).toBeGreaterThanOrEqual(0);
});
it('reports null llmProvider when none registered', async () => {
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcplocal.llmProvider).toBeNull();
});
it('reports active llmProvider name when one is registered', async () => {
const provider = mockLlmProvider('openai');
providerRegistry.register(provider);
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcplocal.llmProvider).toBe('openai');
});
it('reports the currently active provider when multiple registered', async () => {
providerRegistry.register(mockLlmProvider('openai'));
providerRegistry.register(mockLlmProvider('anthropic'));
providerRegistry.setActive('anthropic');
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcplocal.llmProvider).toBe('anthropic');
});
});
describe('mcpd health', () => {
it('reports connected when mcpd /health responds successfully', async () => {
const client = mockMcpdClient();
const monitor = new TieredHealthMonitor({
mcpdClient: client,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcpd.status).toBe('connected');
expect(result.mcpd.url).toBe('http://localhost:3100');
});
it('reports disconnected when mcpd /health throws', async () => {
const client = mockMcpdClient({ getFails: true });
const monitor = new TieredHealthMonitor({
mcpdClient: client,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcpd.status).toBe('disconnected');
expect(result.mcpd.url).toBe('http://localhost:3100');
});
it('reports disconnected when mcpdClient is null', async () => {
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcpd.status).toBe('disconnected');
expect(result.mcpd.url).toBe('http://localhost:3100');
});
it('includes the configured mcpd URL in the response', async () => {
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://custom-host:9999',
});
const result = await monitor.checkHealth();
expect(result.mcpd.url).toBe('http://custom-host:9999');
});
});
describe('instances', () => {
it('returns instances from mcpd /instances endpoint', async () => {
const client = mockMcpdClient({
instancesResult: {
instances: [
{ name: 'slack', status: 'running' },
{ name: 'github', status: 'stopped' },
],
},
});
const monitor = new TieredHealthMonitor({
mcpdClient: client,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.instances).toHaveLength(2);
expect(result.instances[0]).toEqual({ name: 'slack', status: 'running' });
expect(result.instances[1]).toEqual({ name: 'github', status: 'stopped' });
});
it('returns empty array when mcpdClient is null', async () => {
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.instances).toEqual([]);
});
it('returns empty array when /instances request fails', async () => {
const client = mockMcpdClient({ instancesFails: true });
const monitor = new TieredHealthMonitor({
mcpdClient: client,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.instances).toEqual([]);
});
it('returns empty array when mcpd has no instances', async () => {
const client = mockMcpdClient({
instancesResult: { instances: [] },
});
const monitor = new TieredHealthMonitor({
mcpdClient: client,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.instances).toEqual([]);
});
});
describe('full integration', () => {
it('returns complete tiered status with all sections', async () => {
providerRegistry.register(mockLlmProvider('openai'));
const client = mockMcpdClient({
instancesResult: {
instances: [
{ name: 'slack', status: 'running' },
],
},
});
const monitor = new TieredHealthMonitor({
mcpdClient: client,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
// Verify structure
expect(result).toHaveProperty('mcplocal');
expect(result).toHaveProperty('mcpd');
expect(result).toHaveProperty('instances');
// mcplocal
expect(result.mcplocal.status).toBe('healthy');
expect(typeof result.mcplocal.uptime).toBe('number');
expect(result.mcplocal.llmProvider).toBe('openai');
// mcpd
expect(result.mcpd.status).toBe('connected');
// instances
expect(result.instances).toHaveLength(1);
expect(result.instances[0]?.name).toBe('slack');
});
it('handles degraded scenario: no mcpd, no provider', async () => {
const monitor = new TieredHealthMonitor({
mcpdClient: null,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcplocal.status).toBe('healthy');
expect(result.mcplocal.llmProvider).toBeNull();
expect(result.mcpd.status).toBe('disconnected');
expect(result.instances).toEqual([]);
});
it('handles mcpd connected but instances endpoint failing', async () => {
const client = mockMcpdClient({ instancesFails: true });
const monitor = new TieredHealthMonitor({
mcpdClient: client,
providerRegistry,
mcpdUrl: 'http://localhost:3100',
});
const result = await monitor.checkHealth();
expect(result.mcpd.status).toBe('connected');
expect(result.instances).toEqual([]);
});
});
});

View File

@@ -0,0 +1,45 @@
import { describe, it, expect } from 'vitest';
import { estimateTokens } from '../src/llm/token-counter.js';
describe('estimateTokens', () => {
it('returns 0 for empty string', () => {
expect(estimateTokens('')).toBe(0);
});
it('returns 1 for strings of 1-4 characters', () => {
expect(estimateTokens('a')).toBe(1);
expect(estimateTokens('ab')).toBe(1);
expect(estimateTokens('abc')).toBe(1);
expect(estimateTokens('abcd')).toBe(1);
});
it('returns 2 for strings of 5-8 characters', () => {
expect(estimateTokens('abcde')).toBe(2);
expect(estimateTokens('abcdefgh')).toBe(2);
});
it('estimates roughly 4 chars per token for longer text', () => {
const text = 'a'.repeat(1000);
expect(estimateTokens(text)).toBe(250);
});
it('rounds up partial tokens', () => {
// 7 chars / 4 = 1.75 -> ceil = 2
expect(estimateTokens('abcdefg')).toBe(2);
// 9 chars / 4 = 2.25 -> ceil = 3
expect(estimateTokens('abcdefghi')).toBe(3);
});
it('handles JSON payloads', () => {
const json = JSON.stringify({ key: 'value', nested: { a: 1, b: [1, 2, 3] } });
const expected = Math.ceil(json.length / 4);
expect(estimateTokens(json)).toBe(expected);
});
it('handles unicode text', () => {
// Note: estimation is by string length (code units), not bytes
const text = '\u{1F600}'.repeat(10); // emoji
const expected = Math.ceil(text.length / 4);
expect(estimateTokens(text)).toBe(expected);
});
});

View File

@@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist",
"types": ["node"]
},
"include": ["src/**/*.ts"],
"references": [
{ "path": "../shared" }
]
}

View File

@@ -0,0 +1,8 @@
import { defineProject } from 'vitest/config';
export default defineProject({
test: {
name: 'mcplocal',
include: ['tests/**/*.test.ts'],
},
});