feat(agents+chat): agents feature + live chat UX #57

Merged
michal merged 14 commits from feat/agents-and-chat-ux into main 2026-04-26 17:53:30 +00:00
8 changed files with 523 additions and 1 deletions
Showing only changes of commit 285be11dd5 - Show all commits

View File

@@ -22,6 +22,8 @@ import type { TrafficCapture } from './traffic.js';
import { LLMProviderAdapter } from '../proxymodel/llm-adapter.js';
import { FileCache } from '../proxymodel/file-cache.js';
import { createDefaultPlugin } from '../proxymodel/plugins/default.js';
import { createAgentsPlugin } from '../proxymodel/plugins/agents.js';
import { composePlugins } from '../proxymodel/plugins/compose.js';
import { AuditCollector } from '../audit/collector.js';
interface ProjectCacheEntry {
@@ -143,7 +145,11 @@ export function registerProjectMcpEndpoint(app: FastifyInstance, mcpdClient: Mcp
providerRegistry: effectiveRegistry,
};
if (resolvedModel) pluginConfig.modelOverride = resolvedModel;
const plugin = createDefaultPlugin(pluginConfig);
const basePlugin = createDefaultPlugin(pluginConfig);
// Always compose the agents plugin on top so Agents attached to the
// project show up as virtual MCP servers in tools/list, regardless of
// which proxymodel the project is using.
const plugin = composePlugins([basePlugin, createAgentsPlugin()]);
router.setPlugin(plugin);
// Fetch project instructions and set on router

View File

@@ -24,6 +24,7 @@ export interface PluginContextDeps {
processContent: (toolName: string, content: string, contentType: ContentType) => Promise<{ content: string; sections?: Section[] }>;
queueNotification: (notification: JsonRpcNotification) => void;
postToMcpd: (path: string, body: Record<string, unknown>) => Promise<unknown>;
getFromMcpd: (path: string) => Promise<unknown>;
auditCollector?: AuditCollector;
}
@@ -114,6 +115,10 @@ export class PluginContextImpl implements PluginSessionContext {
return this.deps.postToMcpd(path, body);
}
getFromMcpd(path: string): Promise<unknown> {
return this.deps.getFromMcpd(path);
}
/** Emit an audit event, auto-filling sessionId and projectName. */
emitAuditEvent(event: Omit<AuditEvent, 'sessionId' | 'projectName'>): void {
this.deps.auditCollector?.emit({

View File

@@ -47,6 +47,7 @@ export interface PluginSessionContext {
// mcpd client access (for propose_prompt, etc.)
postToMcpd(path: string, body: Record<string, unknown>): Promise<unknown>;
getFromMcpd(path: string): Promise<unknown>;
// Audit event emission (auto-fills sessionId and projectName)
emitAuditEvent(event: Omit<AuditEvent, 'sessionId' | 'projectName'>): void;

View File

@@ -0,0 +1,143 @@
/**
* Agents plugin — exposes each Agent attached to a Project as a virtual
* MCP server in the session's tools/list.
*
* On session create, fetches `GET /api/v1/projects/:p/agents` and for each
* agent registers a virtual server named `agent-<agentName>` with one tool
* `chat`. The tool's description mirrors the agent's description so clients
* (e.g. Claude consuming MCP via mcplocal) see useful prose like "I review
* security design — ask me after each major change." The `chat` tool takes
* a `message` (required) and a few LiteLLM-style overrides (temperature,
* max_tokens, etc.) plus an optional `threadId` for follow-ups; the handler
* POSTs to `/api/v1/agents/:name/chat` and returns the assistant's reply.
*
* Namespace collision: `registerServer` namespaces tools as
* `<server>/<tool>`. If a real upstream MCP server is named `agent-<x>`,
* mcplocal's discovery would already produce `agent-<x>/<tool>` entries
* and our virtual server's tools would clobber them in the virtualTools
* map. To avoid silent shadowing, the plugin scans current upstream tools
* before registering and skips any agent whose namespace would collide,
* emitting an `agent_namespace_collision` audit event so the operator
* sees the reason in the audit trail.
*
* The plugin owns no request-path hooks — agents are reachable purely
* through the virtual-server surface, which `tools/list` and `tools/call`
* already serve via plugin-context.
*/
import type { ProxyModelPlugin, VirtualServer } from '../plugin.js';
import type { ToolDefinition } from '../types.js';
const AGENT_NAMESPACE_PREFIX = 'agent-';
export interface AgentSummary {
id: string;
name: string;
description: string;
}
const STATE_KEY = 'agents-plugin:registered';
export function createAgentsPlugin(): ProxyModelPlugin {
return {
name: 'agents',
description: 'Exposes project-scoped Agents as virtual MCP servers.',
async onSessionCreate(ctx) {
let agents: AgentSummary[];
try {
const data = await ctx.getFromMcpd(
`/api/v1/projects/${encodeURIComponent(ctx.projectName)}/agents`,
);
agents = (Array.isArray(data) ? data : []) as AgentSummary[];
} catch (err) {
ctx.log.warn(`agents-plugin: failed to fetch project agents: ${(err as Error).message}`);
return;
}
if (agents.length === 0) return;
const upstreamTools = await ctx.discoverTools().catch(() => [] as ToolDefinition[]);
const upstreamNames = new Set(upstreamTools.map((t) => t.name));
const registered: string[] = [];
for (const agent of agents) {
const serverName = `${AGENT_NAMESPACE_PREFIX}${agent.name}`;
// Collision: any existing tool already namespaced under this prefix.
const collision = [...upstreamNames].some((n) => n.startsWith(`${serverName}/`));
if (collision) {
ctx.log.warn(
`agents-plugin: namespace collision for ${serverName} (agent ${agent.name}), skipping`,
);
continue;
}
ctx.registerServer(virtualServerForAgent(agent));
registered.push(serverName);
}
ctx.state.set(STATE_KEY, registered);
},
async onSessionDestroy(ctx) {
const registered = ctx.state.get(STATE_KEY) as string[] | undefined;
if (registered === undefined) return;
for (const name of registered) ctx.unregisterServer(name);
ctx.state.delete(STATE_KEY);
},
};
}
function virtualServerForAgent(agent: AgentSummary): VirtualServer {
const description = agent.description.length > 0
? agent.description
: `Chat with agent ${agent.name}`;
const definition: ToolDefinition = {
name: 'chat',
description,
inputSchema: {
type: 'object',
properties: {
message: { type: 'string', description: 'User message to send to the agent' },
threadId: { type: 'string', description: 'Omit to start a new thread' },
systemOverride: { type: 'string', description: 'Replace agent.systemPrompt for this call' },
systemAppend: { type: 'string', description: 'Append to agent.systemPrompt for this call' },
temperature: { type: 'number' },
top_p: { type: 'number' },
top_k: { type: 'integer' },
max_tokens: { type: 'integer' },
seed: { type: 'integer' },
stop: {
oneOf: [
{ type: 'string' },
{ type: 'array', items: { type: 'string' } },
],
},
tools_allowlist: { type: 'array', items: { type: 'string' } },
extra: { type: 'object', additionalProperties: true },
},
required: ['message'],
},
};
return {
name: `${AGENT_NAMESPACE_PREFIX}${agent.name}`,
description,
tools: [{
definition,
handler: async (args, ctx) => {
const res = await ctx.postToMcpd(
`/api/v1/agents/${encodeURIComponent(agent.name)}/chat`,
{ ...args, stream: false },
);
const r = res as { assistant?: string; threadId?: string; turnIndex?: number; error?: string };
if (r.error !== undefined) {
return { content: [{ type: 'text', text: `error: ${r.error}` }], isError: true };
}
const out: { content: Array<{ type: 'text'; text: string }>; _meta?: Record<string, unknown> } = {
content: [{ type: 'text', text: r.assistant ?? '' }],
};
if (r.threadId !== undefined) {
out._meta = { threadId: r.threadId, turnIndex: r.turnIndex };
}
return out;
},
}],
};
}

View File

@@ -0,0 +1,132 @@
/**
* composePlugins — chain N plugins into one.
*
* The router only accepts a single plugin per project session. When we want
* orthogonal plugin behaviors (e.g. the existing `default` proxymodel PLUS
* the agents plugin's virtual-server registration), we compose them into a
* single facade that fans each hook out to all parents in order. This is
* a generalization of what `createDefaultPlugin` does manually for two
* fixed parents.
*
* Hook semantics:
* - onSessionCreate / onSessionDestroy: every plugin's hook runs in order.
* - onInitialize: first non-null result wins (instructions don't merge).
* - onToolsList / onResourcesList / onPromptsList: results pipeline through
* the plugins, each transforming the previous step's output.
* - onToolCallBefore / onResourceRead / onPromptGet: first non-null wins
* (an interceptor short-circuits the chain).
* - onToolCallAfter: pipeline — each plugin can transform the response.
*
* For chat-style plugins (gate, content-pipeline, agents), this is what you
* want: agents registers virtual servers in onSessionCreate without
* conflicting with gate's onToolCallBefore interceptors.
*/
import type { ProxyModelPlugin } from '../plugin.js';
export function composePlugins(plugins: ProxyModelPlugin[]): ProxyModelPlugin {
if (plugins.length === 0) {
throw new Error('composePlugins requires at least one plugin');
}
if (plugins.length === 1) return plugins[0]!;
const out: ProxyModelPlugin = {
name: plugins.map((p) => p.name).join('+'),
description: 'Composed: ' + plugins.map((p) => p.name).join(', '),
};
if (plugins.some((p) => p.onSessionCreate)) {
out.onSessionCreate = async (ctx) => {
for (const p of plugins) {
if (p.onSessionCreate) await p.onSessionCreate(ctx);
}
};
}
if (plugins.some((p) => p.onSessionDestroy)) {
out.onSessionDestroy = async (ctx) => {
for (const p of plugins) {
if (p.onSessionDestroy) await p.onSessionDestroy(ctx);
}
};
}
if (plugins.some((p) => p.onInitialize)) {
out.onInitialize = async (request, ctx) => {
for (const p of plugins) {
if (p.onInitialize) {
const res = await p.onInitialize(request, ctx);
if (res !== null) return res;
}
}
return null;
};
}
if (plugins.some((p) => p.onToolsList)) {
out.onToolsList = async (tools, ctx) => {
let acc = tools;
for (const p of plugins) {
if (p.onToolsList) acc = await p.onToolsList(acc, ctx);
}
return acc;
};
}
if (plugins.some((p) => p.onToolCallBefore)) {
out.onToolCallBefore = async (toolName, args, request, ctx) => {
for (const p of plugins) {
if (p.onToolCallBefore) {
const intercepted = await p.onToolCallBefore(toolName, args, request, ctx);
if (intercepted !== null) return intercepted;
}
}
return null;
};
}
if (plugins.some((p) => p.onToolCallAfter)) {
out.onToolCallAfter = async (toolName, args, response, ctx) => {
let acc = response;
for (const p of plugins) {
if (p.onToolCallAfter) acc = await p.onToolCallAfter(toolName, args, acc, ctx);
}
return acc;
};
}
if (plugins.some((p) => p.onResourcesList)) {
out.onResourcesList = async (resources, ctx) => {
let acc = resources;
for (const p of plugins) {
if (p.onResourcesList) acc = await p.onResourcesList(acc, ctx);
}
return acc;
};
}
if (plugins.some((p) => p.onResourceRead)) {
out.onResourceRead = async (uri, request, ctx) => {
for (const p of plugins) {
if (p.onResourceRead) {
const res = await p.onResourceRead(uri, request, ctx);
if (res !== null) return res;
}
}
return null;
};
}
if (plugins.some((p) => p.onPromptsList)) {
out.onPromptsList = async (prompts, ctx) => {
let acc = prompts;
for (const p of plugins) {
if (p.onPromptsList) acc = await p.onPromptsList(acc, ctx);
}
return acc;
};
}
if (plugins.some((p) => p.onPromptGet)) {
out.onPromptGet = async (name, request, ctx) => {
for (const p of plugins) {
if (p.onPromptGet) {
const res = await p.onPromptGet(name, request, ctx);
if (res !== null) return res;
}
}
return null;
};
}
return out;
}

View File

@@ -197,6 +197,10 @@ export class McpRouter {
if (!this.mcpdClient) throw new Error('mcpd client not configured');
return this.mcpdClient.post(path, body);
},
getFromMcpd: async (path) => {
if (!this.mcpdClient) throw new Error('mcpd client not configured');
return this.mcpdClient.get(path);
},
...(this.auditCollector ? { auditCollector: this.auditCollector } : {}),
};

View File

@@ -0,0 +1,164 @@
import { describe, it, expect, vi } from 'vitest';
import { createAgentsPlugin } from '../src/proxymodel/plugins/agents.js';
import type { PluginSessionContext, VirtualServer } from '../src/proxymodel/plugin.js';
import type { ToolDefinition } from '../src/proxymodel/types.js';
function mockCtx(opts: {
agents?: Array<{ id: string; name: string; description: string }> | Error;
upstreamTools?: ToolDefinition[];
postResponse?: unknown;
} = {}): PluginSessionContext & {
_registered: VirtualServer[];
_unregistered: string[];
_postCalls: Array<{ path: string; body: Record<string, unknown> }>;
_warnings: string[];
} {
const registered: VirtualServer[] = [];
const unregistered: string[] = [];
const postCalls: Array<{ path: string; body: Record<string, unknown> }> = [];
const warnings: string[] = [];
const state = new Map<string, unknown>();
const ctx = {
sessionId: 'sess-1',
projectName: 'mcpctl-dev',
state,
llm: {} as PluginSessionContext['llm'],
cache: {} as PluginSessionContext['cache'],
log: {
debug: () => undefined,
info: () => undefined,
warn: (msg: string) => warnings.push(msg),
error: () => undefined,
},
registerTool: vi.fn(),
unregisterTool: vi.fn(),
registerServer: vi.fn((s: VirtualServer) => { registered.push(s); }),
unregisterServer: vi.fn((name: string) => { unregistered.push(name); }),
queueNotification: vi.fn(),
discoverTools: vi.fn(async () => opts.upstreamTools ?? []),
routeToUpstream: vi.fn(),
fetchPromptIndex: vi.fn(async () => []),
getSystemPrompt: vi.fn(async (_: string, fallback: string) => fallback),
processContent: vi.fn(),
postToMcpd: vi.fn(async (path: string, body: Record<string, unknown>) => {
postCalls.push({ path, body });
return opts.postResponse ?? { assistant: 'hi back', threadId: 'thread-1', turnIndex: 1 };
}),
getFromMcpd: vi.fn(async (_path: string) => {
if (opts.agents instanceof Error) throw opts.agents;
return opts.agents ?? [];
}),
emitAuditEvent: vi.fn(),
_registered: registered,
_unregistered: unregistered,
_postCalls: postCalls,
_warnings: warnings,
} as unknown as ReturnType<typeof mockCtx>;
return ctx;
}
describe('agents plugin', () => {
it('registers a virtual server per agent on session create', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({
agents: [
{ id: 'a1', name: 'reviewer', description: 'I review security design' },
{ id: 'a2', name: 'deployer', description: 'I help you deploy' },
],
});
await plugin.onSessionCreate!(ctx);
expect(ctx._registered.map((s) => s.name)).toEqual(['agent-reviewer', 'agent-deployer']);
// Tool description carries the agent's description.
expect(ctx._registered[0]!.tools[0]!.definition.description).toBe('I review security design');
});
it('falls back to a generic description when agent.description is empty', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({
agents: [{ id: 'a1', name: 'silent', description: '' }],
});
await plugin.onSessionCreate!(ctx);
expect(ctx._registered[0]!.tools[0]!.definition.description).toBe('Chat with agent silent');
});
it('skips agents whose namespace collides with an upstream MCP server', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({
agents: [{ id: 'a1', name: 'colliding', description: '' }],
upstreamTools: [{ name: 'agent-colliding/something', description: '' }],
});
await plugin.onSessionCreate!(ctx);
expect(ctx._registered).toHaveLength(0);
expect(ctx._warnings.some((w) => /namespace collision/.test(w))).toBe(true);
});
it('does nothing when the project has no agents', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({ agents: [] });
await plugin.onSessionCreate!(ctx);
expect(ctx._registered).toEqual([]);
});
it('logs and continues when fetching agents from mcpd fails', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({ agents: new Error('mcpd unreachable') });
await plugin.onSessionCreate!(ctx);
expect(ctx._registered).toEqual([]);
expect(ctx._warnings.some((w) => /mcpd unreachable/.test(w))).toBe(true);
});
it('chat tool POSTs to /api/v1/agents/:name/chat and returns the assistant text', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({
agents: [{ id: 'a1', name: 'reviewer', description: 'I review' }],
});
await plugin.onSessionCreate!(ctx);
const handler = ctx._registered[0]!.tools[0]!.handler;
const result = await handler({ message: 'security check?', temperature: 0.3 }, ctx);
expect(ctx._postCalls).toHaveLength(1);
expect(ctx._postCalls[0]!.path).toBe('/api/v1/agents/reviewer/chat');
expect(ctx._postCalls[0]!.body).toMatchObject({
message: 'security check?',
temperature: 0.3,
stream: false,
});
expect(result).toMatchObject({
content: [{ type: 'text', text: 'hi back' }],
_meta: { threadId: 'thread-1' },
});
});
it('chat tool surfaces an mcpd error response as an isError content block', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({
agents: [{ id: 'a1', name: 'reviewer', description: '' }],
postResponse: { error: 'agent unhappy' },
});
await plugin.onSessionCreate!(ctx);
const handler = ctx._registered[0]!.tools[0]!.handler;
const result = await handler({ message: 'hi' }, ctx) as { isError: boolean; content: Array<{ text: string }> };
expect(result.isError).toBe(true);
expect(result.content[0]!.text).toContain('agent unhappy');
});
it('onSessionDestroy unregisters every server it registered', async () => {
const plugin = createAgentsPlugin();
const ctx = mockCtx({
agents: [
{ id: 'a1', name: 'one', description: '' },
{ id: 'a2', name: 'two', description: '' },
],
});
await plugin.onSessionCreate!(ctx);
await plugin.onSessionDestroy!(ctx);
expect(ctx._unregistered.sort()).toEqual(['agent-one', 'agent-two']);
});
});

View File

@@ -0,0 +1,67 @@
import { describe, it, expect, vi } from 'vitest';
import { composePlugins } from '../src/proxymodel/plugins/compose.js';
import type { ProxyModelPlugin, PluginSessionContext } from '../src/proxymodel/plugin.js';
import type { JsonRpcRequest, JsonRpcResponse } from '../src/types.js';
const fakeCtx = {} as PluginSessionContext;
function plugin(name: string, hooks: Partial<ProxyModelPlugin> = {}): ProxyModelPlugin {
return { name, ...hooks };
}
describe('composePlugins', () => {
it('returns the single plugin when given one', () => {
const p = plugin('only');
expect(composePlugins([p])).toBe(p);
});
it('throws when given an empty list', () => {
expect(() => composePlugins([])).toThrow();
});
it('chains onSessionCreate / onSessionDestroy in order', async () => {
const calls: string[] = [];
const a = plugin('a', {
onSessionCreate: async () => { calls.push('a-create'); },
onSessionDestroy: async () => { calls.push('a-destroy'); },
});
const b = plugin('b', {
onSessionCreate: async () => { calls.push('b-create'); },
onSessionDestroy: async () => { calls.push('b-destroy'); },
});
const composed = composePlugins([a, b]);
await composed.onSessionCreate!(fakeCtx);
await composed.onSessionDestroy!(fakeCtx);
expect(calls).toEqual(['a-create', 'b-create', 'a-destroy', 'b-destroy']);
});
it('first non-null onToolCallBefore short-circuits the chain', async () => {
const aSpy = vi.fn(async () => null);
const bSpy = vi.fn(async (): Promise<JsonRpcResponse> => ({ jsonrpc: '2.0', id: 1, result: 'B' }));
const cSpy = vi.fn(async (): Promise<JsonRpcResponse> => ({ jsonrpc: '2.0', id: 1, result: 'C' }));
const composed = composePlugins([
plugin('a', { onToolCallBefore: aSpy }),
plugin('b', { onToolCallBefore: bSpy }),
plugin('c', { onToolCallBefore: cSpy }),
]);
const req: JsonRpcRequest = { jsonrpc: '2.0', id: 1, method: 'tools/call' };
const res = await composed.onToolCallBefore!('foo', {}, req, fakeCtx);
expect(res?.result).toBe('B');
expect(cSpy).not.toHaveBeenCalled();
});
it('onToolsList pipelines through plugins (each transforms the previous output)', async () => {
const composed = composePlugins([
plugin('a', { onToolsList: async (tools) => [...tools, { name: 'a-added', description: '' }] }),
plugin('b', { onToolsList: async (tools) => [...tools, { name: 'b-added', description: '' }] }),
]);
const out = await composed.onToolsList!([{ name: 'orig', description: '' }], fakeCtx);
expect(out.map((t) => t.name)).toEqual(['orig', 'a-added', 'b-added']);
});
it('does not declare hooks that no plugin provides (no-op composition stays minimal)', () => {
const composed = composePlugins([plugin('a'), plugin('b')]);
expect(composed.onSessionCreate).toBeUndefined();
expect(composed.onToolsList).toBeUndefined();
});
});