feat: HTTP-mode mcplocal container + mcpctl test mcp + token-auth preHandler

Delivers the final piece of the mcptoken stack: a containerized,
network-accessible mcplocal that serves Streamable-HTTP MCP to off-host
clients (the vLLM use case), authenticated by project-scoped McpTokens.

New binary (same package, new entry):
  - src/mcplocal/src/serve.ts — HTTP-only entry. Reads MCPLOCAL_MCPD_URL,
    MCPLOCAL_MCPD_TOKEN, MCPLOCAL_HTTP_HOST/PORT, MCPLOCAL_CACHE_DIR from
    env. No StdioProxyServer, no --upstream.
  - src/mcplocal/src/http/token-auth.ts — Fastify preHandler that
    validates mcpctl_pat_ bearers via mcpd's /api/v1/mcptokens/introspect.
    30s positive / 5s negative TTL. Rejects wrong-project with 403.

Shared HTTP MCP client:
  - src/shared/src/mcp-http/ — reusable McpHttpSession with initialize,
    listTools, callTool, close. Handles http+https, SSE, id correlation,
    distinct McpProtocolError / McpTransportError. Plus mcpHealthCheck
    and deriveBaseUrl helpers.

New CLI verb `mcpctl test mcp <url>`:
  - Flags: --token (also $MCPCTL_TOKEN), --tool, --args (JSON),
    --expect-tools, --timeout, -o text|json, --no-health.
  - Exit codes: 0 PASS, 1 TRANSPORT/AUTH FAIL, 2 CONTRACT FAIL.

Container + deploy:
  - deploy/Dockerfile.mcplocal (Node 20 alpine, multi-stage, pnpm
    workspace, CMD node src/mcplocal/dist/serve.js, VOLUME
    /var/lib/mcplocal/cache, HEALTHCHECK on :3200/healthz).
  - scripts/build-mcplocal.sh mirrors build-mcpd.sh.
  - fulldeploy.sh is now a 4-step pipeline that also builds + rolls out
    mcplocal (gated on `kubectl get deployment/mcplocal` so the script
    stays green before the Pulumi stack lands).

Audit + cache:
  - project-mcp-endpoint.ts passes MCPLOCAL_CACHE_DIR into FileCache at
    both construction sites and, when request.mcpToken is present, calls
    collector.setSessionMcpToken(id, ...) so audit events carry the
    tokenName/tokenSha.

Tests:
  - 9 unit cases on `mcpctl test mcp` (happy path, health miss,
    expect-tools hit/miss, transport throw, tool isError, json report,
    $MCPCTL_TOKEN env fallback, invalid --args).
  - Smoke test src/mcplocal/tests/smoke/mcptoken.smoke.test.ts —
    gated on healthz($MCPGW_URL), skipped cleanly when unreachable.
    Covers happy path, wrong-project 403, --expect-tools contract
    failure, and revocation 401 within the negative-cache window.

1773/1773 workspace tests pass. Pulumi resources (Deployment, Service,
Ingress, PVC, Secret, NetworkPolicy) still need to land in
../kubernetes-deployment before the smoke gate flips on.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-04-17 01:21:42 +01:00
parent a151b2e756
commit 2127b41d9f
16 changed files with 1202 additions and 9 deletions

View File

@@ -4,3 +4,4 @@ export * from './constants/index.js';
export * from './utils/index.js';
export * from './secrets/index.js';
export * from './tokens/index.js';
export * from './mcp-http/index.js';

View File

@@ -0,0 +1,246 @@
/**
* Reusable Streamable-HTTP MCP client.
*
* Handles:
* - Bearer auth (session tokens or McpToken PATs)
* - mcp-session-id round-trip
* - Both JSON and text/event-stream response bodies
* - JSON-RPC id correlation when a response is multiplexed with notifications
*
* Used by the smoke suite (`SmokeMcpSession` is a thin wrapper around this)
* and by `mcpctl test mcp <url>`.
*/
import http from 'node:http';
import https from 'node:https';
export interface McpHttpSessionOptions {
/** Bearer to send on every request. Accepts raw tokens (no "Bearer " prefix). */
bearer?: string;
/** Additional headers merged into every request. */
headers?: Record<string, string>;
/** Timeout per HTTP request in milliseconds. Defaults to 30_000. */
timeoutMs?: number;
}
export interface ToolInfo {
name: string;
description?: string;
inputSchema?: unknown;
}
export interface ToolCallResult {
content: Array<{ type: string; text?: string }>;
isError?: boolean;
}
interface HttpRequestArgs {
url: string;
method: string;
headers?: Record<string, string>;
body?: string;
timeoutMs?: number;
}
interface HttpRequestResult {
status: number;
headers: http.IncomingHttpHeaders;
body: string;
}
function rawHttpRequest(opts: HttpRequestArgs): Promise<HttpRequestResult> {
return new Promise((resolve, reject) => {
const parsed = new URL(opts.url);
const driver = parsed.protocol === 'https:' ? https : http;
const req = driver.request(
{
hostname: parsed.hostname,
port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80),
path: parsed.pathname + parsed.search,
method: opts.method,
headers: opts.headers,
timeout: opts.timeoutMs ?? 30_000,
},
(res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () => {
resolve({
status: res.statusCode ?? 0,
headers: res.headers,
body: Buffer.concat(chunks).toString('utf-8'),
});
});
},
);
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error('Request timed out'));
});
if (opts.body) req.write(opts.body);
req.end();
});
}
function parseSse(body: string): unknown[] {
const messages: unknown[] = [];
for (const line of body.split('\n')) {
if (line.startsWith('data: ')) {
try {
messages.push(JSON.parse(line.slice(6)));
} catch {
// skip malformed SSE data line
}
}
}
return messages;
}
/** Thrown when the server returned a response JSON-RPC error payload. */
export class McpProtocolError extends Error {
constructor(public readonly code: number, message: string) {
super(`MCP error ${code}: ${message}`);
this.name = 'McpProtocolError';
}
}
/** Thrown when the HTTP layer rejected the request (auth, transport, 5xx). */
export class McpTransportError extends Error {
constructor(public readonly status: number, public readonly body: string, message?: string) {
super(message ?? `HTTP ${status}: ${body.slice(0, 200)}`);
this.name = 'McpTransportError';
}
}
export class McpHttpSession {
private sessionId: string | undefined;
private nextId = 1;
constructor(
/** Full URL of the MCP endpoint (e.g. `https://mcp.example.com/projects/foo/mcp`). */
public readonly url: string,
private readonly options: McpHttpSessionOptions = {},
) {}
private buildHeaders(extra: Record<string, string> = {}): Record<string, string> {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
...(this.options.headers ?? {}),
...extra,
};
if (this.sessionId) headers['mcp-session-id'] = this.sessionId;
if (this.options.bearer) headers['Authorization'] = `Bearer ${this.options.bearer}`;
return headers;
}
/**
* Send a JSON-RPC request and wait for the response with a matching id.
* Handles both single JSON and multiplexed SSE bodies.
*/
async send(method: string, params: Record<string, unknown> = {}): Promise<unknown> {
const id = this.nextId++;
const request = { jsonrpc: '2.0', id, method, params };
const args: HttpRequestArgs = {
url: this.url,
method: 'POST',
headers: this.buildHeaders(),
body: JSON.stringify(request),
};
if (this.options.timeoutMs !== undefined) args.timeoutMs = this.options.timeoutMs;
const result = await rawHttpRequest(args);
if (!this.sessionId) {
const sid = result.headers['mcp-session-id'];
if (typeof sid === 'string') this.sessionId = sid;
}
if (result.status >= 400) {
let message = `HTTP ${result.status}`;
try {
const body = JSON.parse(result.body) as { error?: string | { message?: string } };
const errField = body.error;
if (typeof errField === 'string') message = errField;
else if (errField && typeof errField === 'object' && typeof errField.message === 'string') message = errField.message;
} catch {
message = `HTTP ${result.status}: ${result.body.slice(0, 200)}`;
}
throw new McpTransportError(result.status, result.body, message);
}
const messages = result.headers['content-type']?.includes('text/event-stream')
? parseSse(result.body)
: [JSON.parse(result.body)];
const matched = messages.find((m) => {
const msg = m as { id?: unknown };
return msg.id === id;
}) as { result?: unknown; error?: { code: number; message: string } } | undefined;
const parsed = matched ?? messages[0] as { result?: unknown; error?: { code: number; message: string } } | undefined;
if (!parsed) throw new Error(`No response for ${method}`);
if (parsed.error) throw new McpProtocolError(parsed.error.code, parsed.error.message);
return parsed.result;
}
async sendNotification(method: string, params: Record<string, unknown> = {}): Promise<void> {
const notification = { jsonrpc: '2.0', method, params };
const args: HttpRequestArgs = {
url: this.url,
method: 'POST',
headers: this.buildHeaders(),
body: JSON.stringify(notification),
};
if (this.options.timeoutMs !== undefined) args.timeoutMs = this.options.timeoutMs;
await rawHttpRequest(args).catch(() => { /* best-effort */ });
}
/** MCP `initialize` handshake. */
async initialize(): Promise<{ protocolVersion?: string; serverInfo?: { name?: string; version?: string }; capabilities?: unknown }> {
return await this.send('initialize', {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: { name: 'mcpctl-mcp-http-client', version: '1.0.0' },
}) as { protocolVersion?: string; serverInfo?: { name?: string; version?: string }; capabilities?: unknown };
}
/** List tools exposed by the endpoint. */
async listTools(): Promise<ToolInfo[]> {
const result = await this.send('tools/list') as { tools?: ToolInfo[] };
return result.tools ?? [];
}
/** Call a tool and return its `content` payload. */
async callTool(name: string, args: Record<string, unknown> = {}): Promise<ToolCallResult> {
return await this.send('tools/call', { name, arguments: args }) as ToolCallResult;
}
/** Clean-close the session with a DELETE. Safe to call when no sessionId has been negotiated. */
async close(): Promise<void> {
if (this.sessionId === undefined) return;
await rawHttpRequest({
url: this.url,
method: 'DELETE',
headers: this.buildHeaders(),
timeoutMs: 5_000,
}).catch(() => { /* best-effort */ });
this.sessionId = undefined;
}
}
/** Best-effort healthcheck against `<base>/healthz`. */
export async function mcpHealthCheck(baseUrl: string, timeoutMs = 5_000): Promise<boolean> {
try {
const res = await rawHttpRequest({ url: `${baseUrl.replace(/\/$/, '')}/healthz`, method: 'GET', timeoutMs });
return res.status >= 200 && res.status < 500;
} catch {
return false;
}
}
/** Derive `<scheme>://<host>[:port]` from a full MCP endpoint URL (for healthcheck). */
export function deriveBaseUrl(mcpUrl: string): string {
const u = new URL(mcpUrl);
return `${u.protocol}//${u.host}`;
}