Merge pull request 'fix: wire STDIO attach for docker-image MCP servers' (#49) from feat/k8s-operator into main
Some checks failed
Some checks failed
Reviewed-on: #49
This commit was merged in pull request #49.
This commit is contained in:
@@ -5,7 +5,7 @@ import { NotFoundError } from './mcp-server.service.js';
|
||||
import { InvalidStateError } from './instance.service.js';
|
||||
import { sendViaSse } from './transport/sse-client.js';
|
||||
import { sendViaStdio } from './transport/stdio-client.js';
|
||||
import { PersistentStdioClient } from './transport/persistent-stdio.js';
|
||||
import { PersistentStdioClient, type StdioMode } from './transport/persistent-stdio.js';
|
||||
|
||||
/**
|
||||
* Build the spawn command for a runtime inside its runner container.
|
||||
@@ -35,6 +35,18 @@ export interface McpProxyResponse {
|
||||
error?: { code: number; message: string; data?: unknown };
|
||||
}
|
||||
|
||||
function formatError(err: unknown): string {
|
||||
if (err instanceof Error) return err.message || err.toString();
|
||||
if (err && typeof err === 'object') {
|
||||
try {
|
||||
return JSON.stringify(err);
|
||||
} catch {
|
||||
return Object.prototype.toString.call(err);
|
||||
}
|
||||
}
|
||||
return String(err);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a streamable-http SSE response body to extract the JSON-RPC payload.
|
||||
* Streamable-http returns `event: message\ndata: {...}\n\n` format.
|
||||
@@ -140,28 +152,48 @@ export class McpProxyService {
|
||||
}
|
||||
const packageName = server.packageName as string | null;
|
||||
const command = server.command as string[] | null;
|
||||
const dockerImage = server.dockerImage as string | null;
|
||||
|
||||
if (!packageName && (!command || command.length === 0)) {
|
||||
// Decide STDIO mode:
|
||||
// - packageName set → exec via runtime runner (npx/uvx).
|
||||
// - command set → exec the given command in the container.
|
||||
// - dockerImage only → attach to PID 1 (image entrypoint IS the MCP server).
|
||||
// - nothing → unreachable, reject.
|
||||
const runtime = (server.runtime as string | null) ?? 'node';
|
||||
let mode: StdioMode;
|
||||
if (command && command.length > 0) {
|
||||
mode = { kind: 'exec', command };
|
||||
} else if (packageName) {
|
||||
mode = { kind: 'exec', command: buildRuntimeSpawnCmd(runtime, packageName) };
|
||||
} else if (dockerImage) {
|
||||
mode = { kind: 'attach' };
|
||||
} else {
|
||||
throw new InvalidStateError(
|
||||
`Server '${server.name}' (${server.id}) uses STDIO transport with a docker image ` +
|
||||
`but has no command. Set 'command' to the image's entrypoint ` +
|
||||
`(e.g. mcpctl edit server ${server.name} --command node --command build/index.js)`
|
||||
`Server '${server.name}' (${server.id}) uses STDIO transport but has no ` +
|
||||
`packageName, command, or dockerImage. Configure one of these.`,
|
||||
);
|
||||
}
|
||||
|
||||
// Build the spawn command based on runtime
|
||||
const runtime = (server.runtime as string | null) ?? 'node';
|
||||
const spawnCmd = command && command.length > 0
|
||||
? command
|
||||
: buildRuntimeSpawnCmd(runtime, packageName!);
|
||||
|
||||
// Try persistent connection first
|
||||
try {
|
||||
return await this.sendViaPersistentStdio(instance.containerId, spawnCmd, method, params);
|
||||
} catch {
|
||||
// Persistent failed — fall back to one-shot
|
||||
return await this.sendViaPersistentStdio(instance.containerId, mode, method, params);
|
||||
} catch (err) {
|
||||
this.removeClient(instance.containerId);
|
||||
return sendViaStdio(this.orchestrator, instance.containerId, packageName, method, params, 120_000, command, runtime);
|
||||
// Fall back to one-shot exec when we have a command to run.
|
||||
// Attach mode has no equivalent one-shot fallback — surface the error.
|
||||
if (mode.kind === 'exec') {
|
||||
return sendViaStdio(this.orchestrator, instance.containerId, packageName, method, params, 120_000, command, runtime);
|
||||
}
|
||||
const detail = formatError(err);
|
||||
console.error(`[mcp-proxy] attach to ${instance.containerId} failed:`, err);
|
||||
return {
|
||||
jsonrpc: '2.0',
|
||||
id: 1,
|
||||
error: {
|
||||
code: -32000,
|
||||
message: `STDIO attach to '${instance.containerId}' failed: ${detail}`,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -178,16 +210,17 @@ export class McpProxyService {
|
||||
|
||||
/**
|
||||
* Send via a persistent STDIO connection (reused across calls).
|
||||
* Mode is exec (run a command in the container) or attach (talk to PID 1).
|
||||
*/
|
||||
private async sendViaPersistentStdio(
|
||||
containerId: string,
|
||||
command: string[],
|
||||
mode: StdioMode,
|
||||
method: string,
|
||||
params?: Record<string, unknown>,
|
||||
): Promise<McpProxyResponse> {
|
||||
let client = this.stdioClients.get(containerId);
|
||||
if (!client) {
|
||||
client = new PersistentStdioClient(this.orchestrator!, containerId, command);
|
||||
client = new PersistentStdioClient(this.orchestrator!, containerId, mode);
|
||||
this.stdioClients.set(containerId, client);
|
||||
}
|
||||
return client.send(method, params);
|
||||
|
||||
@@ -1,14 +1,24 @@
|
||||
import type { McpOrchestrator, InteractiveExec } from '../orchestrator.js';
|
||||
import type { McpProxyResponse } from '../mcp-proxy-service.js';
|
||||
|
||||
export type StdioMode =
|
||||
| { kind: 'exec'; command: string[] }
|
||||
| { kind: 'attach' };
|
||||
|
||||
/**
|
||||
* Persistent STDIO connection to an MCP server running inside a Docker container.
|
||||
* Persistent STDIO connection to an MCP server running inside a container.
|
||||
*
|
||||
* Instead of cold-starting a new process per call (docker exec one-shot), this keeps
|
||||
* a long-running `docker exec -i <cmd>` session alive. The MCP init handshake runs
|
||||
* once, then tool calls are multiplexed over the same stdin/stdout pipe.
|
||||
* Two modes:
|
||||
* exec — start a new process in the container (`docker exec -i <cmd>` /
|
||||
* `kubectl exec -i`) and speak MCP to it. Used for runner-image
|
||||
* servers where mcpctl launches the MCP binary itself.
|
||||
* attach — attach to the container's PID 1 stdin/stdout. Used for
|
||||
* docker-image servers whose entrypoint IS the MCP server
|
||||
* (e.g. gitea-mcp-server, docmost-mcp).
|
||||
*
|
||||
* Falls back gracefully: if the process dies, the next call will reconnect.
|
||||
* In both modes the MCP init handshake runs once; subsequent tool calls
|
||||
* are multiplexed over the same pipe. If the session dies, the next call
|
||||
* will reconnect.
|
||||
*/
|
||||
export class PersistentStdioClient {
|
||||
private exec: InteractiveExec | null = null;
|
||||
@@ -25,7 +35,7 @@ export class PersistentStdioClient {
|
||||
constructor(
|
||||
private readonly orchestrator: McpOrchestrator,
|
||||
private readonly containerId: string,
|
||||
private readonly command: string[],
|
||||
private readonly mode: StdioMode,
|
||||
private readonly timeoutMs = 120_000,
|
||||
) {}
|
||||
|
||||
@@ -90,11 +100,18 @@ export class PersistentStdioClient {
|
||||
private async connect(): Promise<void> {
|
||||
this.close();
|
||||
|
||||
if (!this.orchestrator.execInteractive) {
|
||||
throw new Error('Orchestrator does not support interactive exec');
|
||||
let exec: InteractiveExec;
|
||||
if (this.mode.kind === 'attach') {
|
||||
if (!this.orchestrator.attachInteractive) {
|
||||
throw new Error('Orchestrator does not support attach');
|
||||
}
|
||||
exec = await this.orchestrator.attachInteractive(this.containerId);
|
||||
} else {
|
||||
if (!this.orchestrator.execInteractive) {
|
||||
throw new Error('Orchestrator does not support interactive exec');
|
||||
}
|
||||
exec = await this.orchestrator.execInteractive(this.containerId, this.mode.command);
|
||||
}
|
||||
|
||||
const exec = await this.orchestrator.execInteractive(this.containerId, this.command);
|
||||
this.exec = exec;
|
||||
this.buffer = '';
|
||||
|
||||
|
||||
111
src/mcpd/tests/persistent-stdio.test.ts
Normal file
111
src/mcpd/tests/persistent-stdio.test.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
import { describe, it, expect, vi } from 'vitest';
|
||||
import { PassThrough } from 'node:stream';
|
||||
import { PersistentStdioClient } from '../src/services/transport/persistent-stdio.js';
|
||||
import type { InteractiveExec, McpOrchestrator } from '../src/services/orchestrator.js';
|
||||
|
||||
function makeFakeExec(): {
|
||||
iexec: InteractiveExec;
|
||||
written: string[];
|
||||
emit: (line: unknown) => void;
|
||||
} {
|
||||
const stdout = new PassThrough();
|
||||
const written: string[] = [];
|
||||
const iexec: InteractiveExec = {
|
||||
stdout,
|
||||
write(data) { written.push(data); },
|
||||
close() { stdout.destroy(); },
|
||||
};
|
||||
const emit = (msg: unknown) => {
|
||||
stdout.write(JSON.stringify(msg) + '\n');
|
||||
};
|
||||
return { iexec, written, emit };
|
||||
}
|
||||
|
||||
function makeOrchestrator(overrides: Partial<McpOrchestrator> = {}): McpOrchestrator {
|
||||
return {
|
||||
pullImage: vi.fn(),
|
||||
createContainer: vi.fn(),
|
||||
stopContainer: vi.fn(),
|
||||
removeContainer: vi.fn(),
|
||||
inspectContainer: vi.fn(),
|
||||
getContainerLogs: vi.fn(),
|
||||
execInContainer: vi.fn(),
|
||||
ping: vi.fn(),
|
||||
...overrides,
|
||||
} as McpOrchestrator;
|
||||
}
|
||||
|
||||
describe('PersistentStdioClient', () => {
|
||||
it('exec mode calls execInteractive with the command', async () => {
|
||||
const fake = makeFakeExec();
|
||||
const execInteractive = vi.fn(async () => fake.iexec);
|
||||
const orch = makeOrchestrator({ execInteractive });
|
||||
|
||||
const client = new PersistentStdioClient(
|
||||
orch,
|
||||
'container-1',
|
||||
{ kind: 'exec', command: ['node', 'index.js'] },
|
||||
);
|
||||
|
||||
// Drive the handshake: respond to the first init request (id=1)
|
||||
// then to the subsequent tools/list request (id=2).
|
||||
const sendPromise = client.send('tools/list');
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const init = JSON.parse(fake.written[0]!);
|
||||
expect(init.method).toBe('initialize');
|
||||
fake.emit({ jsonrpc: '2.0', id: init.id, result: { capabilities: {} } });
|
||||
await new Promise((r) => setTimeout(r, 150));
|
||||
|
||||
// Second written msg is notifications/initialized; third is tools/list
|
||||
const toolsReq = JSON.parse(fake.written[2]!);
|
||||
expect(toolsReq.method).toBe('tools/list');
|
||||
fake.emit({ jsonrpc: '2.0', id: toolsReq.id, result: { tools: [] } });
|
||||
|
||||
const res = await sendPromise;
|
||||
expect(res.result).toEqual({ tools: [] });
|
||||
expect(execInteractive).toHaveBeenCalledWith('container-1', ['node', 'index.js']);
|
||||
client.close();
|
||||
});
|
||||
|
||||
it('attach mode calls attachInteractive and never execInteractive', async () => {
|
||||
const fake = makeFakeExec();
|
||||
const attachInteractive = vi.fn(async () => fake.iexec);
|
||||
const execInteractive = vi.fn();
|
||||
const orch = makeOrchestrator({ attachInteractive, execInteractive });
|
||||
|
||||
const client = new PersistentStdioClient(
|
||||
orch,
|
||||
'container-gitea',
|
||||
{ kind: 'attach' },
|
||||
);
|
||||
|
||||
const sendPromise = client.send('tools/list');
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const init = JSON.parse(fake.written[0]!);
|
||||
fake.emit({ jsonrpc: '2.0', id: init.id, result: { capabilities: {} } });
|
||||
await new Promise((r) => setTimeout(r, 150));
|
||||
|
||||
const req = JSON.parse(fake.written[2]!);
|
||||
fake.emit({ jsonrpc: '2.0', id: req.id, result: { tools: [{ name: 'list_repos' }] } });
|
||||
|
||||
const res = await sendPromise;
|
||||
expect((res.result as { tools: unknown[] }).tools).toHaveLength(1);
|
||||
expect(attachInteractive).toHaveBeenCalledWith('container-gitea');
|
||||
expect(execInteractive).not.toHaveBeenCalled();
|
||||
client.close();
|
||||
});
|
||||
|
||||
it('attach mode throws if orchestrator does not support attach', async () => {
|
||||
const orch = makeOrchestrator({}); // no attachInteractive
|
||||
const client = new PersistentStdioClient(orch, 'c', { kind: 'attach' });
|
||||
await expect(client.send('tools/list')).rejects.toThrow(/attach/i);
|
||||
});
|
||||
|
||||
it('exec mode throws if orchestrator does not support execInteractive', async () => {
|
||||
const orch = makeOrchestrator({}); // no execInteractive
|
||||
const client = new PersistentStdioClient(orch, 'c', { kind: 'exec', command: ['x'] });
|
||||
await expect(client.send('tools/list')).rejects.toThrow(/interactive exec/i);
|
||||
});
|
||||
});
|
||||
@@ -4,12 +4,10 @@ description: Gitea MCP server for repositories, issues, PRs, and code management
|
||||
dockerImage: "docker.gitea.com/gitea-mcp-server:latest"
|
||||
transport: STDIO
|
||||
repositoryUrl: https://gitea.com/gitea/gitea-mcp
|
||||
command:
|
||||
- /app/gitea-mcp
|
||||
- -t
|
||||
- stdio
|
||||
# Health check disabled: STDIO health probe requires packageName (npm-based servers).
|
||||
# This server uses a custom dockerImage. Probe support for dockerImage STDIO servers is TODO.
|
||||
# No command: the image's entrypoint IS the MCP server. mcpd attaches to PID 1
|
||||
# stdin/stdout (attach mode) rather than exec-ing a new process. The image is
|
||||
# distroless and has no node/shell, so exec-based STDIO would fail.
|
||||
# Health check disabled: STDIO health probe requires node in the container.
|
||||
env:
|
||||
- name: GITEA_HOST
|
||||
description: Gitea instance URL (e.g. https://gitea.example.com)
|
||||
|
||||
Reference in New Issue
Block a user