feat(mcpd): virtual-LLM routes + GC ticker (v1 Stage 3)

End-to-end backend wiring. After this stage, an mcplocal client can
register a provider, hold the SSE channel open, heartbeat, and have
its inference requests fanned through the relay — all without
touching the agent layer or the public-LLM path.

Routes (new file: routes/virtual-llms.ts):
  POST /api/v1/llms/_provider-register    → returns { providerSessionId, llms[] }
  GET  /api/v1/llms/_provider-stream      → SSE channel keyed by
                                            x-mcpctl-provider-session header.
                                            Emits `event: hello` on open,
                                            `event: task` on inference fan-out,
                                            `: ping` every 20 s for proxies.
  POST /api/v1/llms/_provider-heartbeat   → bumps lastHeartbeatAt
  POST /api/v1/llms/_provider-task/:id/result
                                          → mcplocal pushes result back;
                                            body shape is one of:
                                              { error: 'msg' }
                                              { chunk: { data, done? } }
                                              { status, body }

LlmService:
- LlmView gains kind/status/lastHeartbeatAt/inactiveSince so route
  handlers + the upcoming `mcpctl get llm` columns can branch on
  kind without re-fetching the row.

llm-infer.ts:
- Detects llm.kind === 'virtual' and delegates to
  VirtualLlmService.enqueueInferTask. Streaming + non-streaming both
  supported; on 503 (publisher offline) the existing audit hook still
  fires with the right status code.
- Adds optional `virtualLlms: VirtualLlmService` to LlmInferDeps;
  absence in test fixtures returns a 500 with a clear "server
  misconfiguration" message rather than silently falling through to
  the public path against an empty URL.

main.ts:
- Constructs VirtualLlmService(llmRepo).
- Passes it to registerLlmInferRoutes.
- Calls registerVirtualLlmRoutes(app, virtualLlmService).
- 60-s GC ticker started after app.listen; clears on graceful
  shutdown alongside the existing reconcile timer.

Tests: 11 new virtual-LLM route assertions (validation paths,
service plumbing for register/heartbeat/task-result) + 3 new
infer-route assertions (kind=virtual non-streaming relay, 503 path,
500 when virtualLlms dep missing). mcpd suite: 833/833 (was 819,
+14). Typecheck clean.

The full SSE handshake is exercised by the smoke test in Stage 6;
under app.inject the keep-alive blocks until close so unit-level
SSE testing isn't worth the complexity here.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-04-27 14:15:18 +01:00
parent 2215922618
commit 192a3831df
6 changed files with 553 additions and 16 deletions

View File

@@ -40,6 +40,8 @@ import { ChatToolDispatcherImpl } from './services/chat-tool-dispatcher.js';
import { LlmAdapterRegistry } from './services/llm/dispatcher.js';
import { registerLlmRoutes } from './routes/llms.js';
import { registerLlmInferRoutes } from './routes/llm-infer.js';
import { registerVirtualLlmRoutes } from './routes/virtual-llms.js';
import { VirtualLlmService } from './services/virtual-llm.service.js';
import { registerAgentRoutes } from './routes/agents.js';
import { registerAgentChatRoutes } from './routes/agent-chat.js';
import { PromptRepository } from './repositories/prompt.repository.js';
@@ -433,6 +435,10 @@ async function main(): Promise<void> {
adapters: llmAdapters,
log: { warn: (msg) => app.log.warn(msg) },
});
// Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker
// is started below after `app.listen` so it doesn't fire before the
// server is accepting traffic.
const virtualLlmService = new VirtualLlmService(llmRepo);
// AgentService + ChatService get fully wired below once projectService and
// mcpProxyService are constructed (ChatService needs them via the
// ChatToolDispatcher bridge).
@@ -606,6 +612,7 @@ async function main(): Promise<void> {
registerLlmInferRoutes(app, {
llmService,
adapters: llmAdapters,
virtualLlms: virtualLlmService,
onInferenceEvent: (event) => {
app.log.info({
event: 'llm_inference_call',
@@ -620,6 +627,7 @@ async function main(): Promise<void> {
});
},
});
registerVirtualLlmRoutes(app, virtualLlmService);
registerInstanceRoutes(app, instanceService);
registerProjectRoutes(app, projectService);
registerAuditLogRoutes(app, auditLogService);
@@ -753,6 +761,21 @@ async function main(): Promise<void> {
}
}, RECONCILE_INTERVAL_MS);
// Virtual-LLM GC sweep — flips heartbeat-stale rows to inactive (90 s
// cutoff) and deletes inactives past the 4 h retention window. Runs
// every 60 s; cheap (two indexed queries) when there are no virtuals.
const VIRTUAL_LLM_GC_INTERVAL_MS = 60_000;
const virtualLlmGcTimer = setInterval(async () => {
try {
const { markedInactive, deleted } = await virtualLlmService.gcSweep();
if (markedInactive > 0 || deleted > 0) {
app.log.info(`[virtual-llm gc] markedInactive=${String(markedInactive)} deleted=${String(deleted)}`);
}
} catch (err) {
app.log.error({ err }, 'Virtual LLM GC sweep failed');
}
}, VIRTUAL_LLM_GC_INTERVAL_MS);
// Health probe runner — periodic MCP probes (like k8s livenessProbe).
// Without explicit healthCheck.tool, probes send tools/list through
// McpProxyService so they traverse the exact production call path.
@@ -787,6 +810,7 @@ async function main(): Promise<void> {
setupGracefulShutdown(app, {
disconnectDb: async () => {
clearInterval(reconcileTimer);
clearInterval(virtualLlmGcTimer);
healthProbeRunner.stop();
secretBackendRotatorLoop.stop();
gitBackup.stop();

View File

@@ -15,12 +15,20 @@
import type { FastifyInstance, FastifyReply } from 'fastify';
import type { LlmService } from '../services/llm.service.js';
import type { LlmAdapterRegistry } from '../services/llm/dispatcher.js';
import type { VirtualLlmService } from '../services/virtual-llm.service.js';
import { NotFoundError } from '../services/mcp-server.service.js';
import type { OpenAiChatRequest, InferContext } from '../services/llm/types.js';
export interface LlmInferDeps {
llmService: LlmService;
adapters: LlmAdapterRegistry;
/**
* Optional. When provided, requests for `kind=virtual` Llm rows are
* fanned through the SSE control channel rather than calling an
* upstream URL directly. Required for v1 of the virtual-LLM feature;
* absent in older test configurations.
*/
virtualLlms?: VirtualLlmService;
/** Optional hook to emit audit events — consumer may ignore. */
onInferenceEvent?: (event: InferenceAuditEvent) => void;
}
@@ -62,6 +70,73 @@ export function registerLlmInferRoutes(
return { error: 'messages is required' };
}
const streaming = body.stream === true;
const audit = (status: number): void => {
if (deps.onInferenceEvent === undefined) return;
deps.onInferenceEvent({
kind: 'llm_inference_call',
llmName: llm.name,
model: llm.model,
type: llm.type,
userId: request.userId,
tokenSha: request.mcpToken?.tokenSha,
streaming,
durationMs: Date.now() - started,
status,
});
};
// ── Virtual-provider branch ──
// For kind=virtual rows there is no upstream URL — inference is fanned
// through the SSE control channel back to the publishing mcplocal.
// VirtualLlmService.enqueueInferTask handles the routing.
if (llm.kind === 'virtual') {
if (deps.virtualLlms === undefined) {
reply.code(500);
audit(500);
return { error: 'virtual LLM dispatch unavailable (server misconfiguration)' };
}
try {
if (!streaming) {
const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false);
const result = await ref.done;
reply.code(result.status);
audit(result.status);
return result.body;
}
// Streaming: open SSE response, fan chunks from the result stream
// into outgoing SSE frames.
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true);
const unsubscribe = ref.onChunk((chunk) => writeSseChunk(reply, chunk.data));
try {
await ref.done;
audit(200);
} catch (err) {
const payload = JSON.stringify({ error: err instanceof Error ? err.message : String(err) });
writeSseChunk(reply, payload);
audit(502);
} finally {
unsubscribe();
if (!reply.raw.writableEnded) reply.raw.end();
}
return reply;
} catch (err) {
const status = (err as { statusCode?: number }).statusCode ?? 502;
reply.code(status);
audit(status);
return { error: err instanceof Error ? err.message : String(err) };
}
}
// ── Public-provider branch (existing behavior) ──
// Resolve API key (may be empty string for providers that don't take one).
let apiKey = '';
if (llm.apiKeyRef !== null) {
@@ -82,22 +157,6 @@ export function registerLlmInferRoutes(
};
const adapter = deps.adapters.get(llm.type);
const streaming = body.stream === true;
const audit = (status: number): void => {
if (deps.onInferenceEvent === undefined) return;
deps.onInferenceEvent({
kind: 'llm_inference_call',
llmName: llm.name,
model: llm.model,
type: llm.type,
userId: request.userId,
tokenSha: request.mcpToken?.tokenSha,
streaming,
durationMs: Date.now() - started,
status,
});
};
if (!streaming) {
try {

View File

@@ -0,0 +1,174 @@
/**
* Routes for the virtual-LLM control plane (`kind=virtual` Llm rows).
*
* POST /api/v1/llms/_provider-register — register/refresh, returns sessionId
* GET /api/v1/llms/_provider-stream — SSE channel; mcpd → mcplocal task fan-out
* POST /api/v1/llms/_provider-heartbeat — keep-alive (every 30 s from mcplocal)
* POST /api/v1/llms/_provider-task/:id/result — mcplocal pushes result/chunks back
*
* RBAC: these all live under `/api/v1/llms/...` so the existing
* `mapUrlToPermission` in main.ts maps them to the `llms` resource —
* POST = create:llms, GET = view:llms. That's appropriate: publishing
* a virtual LLM is morally the same as creating one.
*
* Inference for virtual rows still lands on `/api/v1/llms/:name/infer`
* (unchanged URL); that route gains a `kind=virtual` branch in this stage
* and delegates here via VirtualLlmService.
*/
import type { FastifyInstance, FastifyReply } from 'fastify';
import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js';
const SSE_PING_MS = 20_000;
const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session';
export function registerVirtualLlmRoutes(
app: FastifyInstance,
service: VirtualLlmService,
): void {
app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>(
'/api/v1/llms/_provider-register',
async (request, reply) => {
const body = (request.body ?? {});
const providers = Array.isArray(body.providers) ? body.providers : null;
if (providers === null || providers.length === 0) {
reply.code(400);
return { error: '`providers` array is required and must be non-empty' };
}
try {
const result = await service.register({
providerSessionId: body.providerSessionId ?? null,
providers: providers.map(coerceProviderInput),
});
reply.code(201);
return result;
} catch (err) {
const status = (err as { statusCode?: number }).statusCode ?? 500;
reply.code(status);
return { error: (err as Error).message };
}
},
);
app.get('/api/v1/llms/_provider-stream', (request, reply): FastifyReply => {
const sessionHeader = request.headers[PROVIDER_SESSION_HEADER];
const sessionId = typeof sessionHeader === 'string' ? sessionHeader : null;
if (sessionId === null || sessionId === '') {
reply.code(400);
void reply.send({ error: `${PROVIDER_SESSION_HEADER} header is required (call /_provider-register first)` });
return reply;
}
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
const handle: VirtualSessionHandle = {
pushTask(task: VirtualTaskFrame): void {
if (reply.raw.destroyed || reply.raw.writableEnded) return;
reply.raw.write(`event: task\ndata: ${JSON.stringify(task)}\n\n`);
},
get alive(): boolean {
return !reply.raw.destroyed && !reply.raw.writableEnded;
},
};
service.bindSession(sessionId, handle);
reply.raw.write(`event: hello\ndata: ${JSON.stringify({ sessionId })}\n\n`);
// Keep-alive comment lines so proxies (Cilium, k8s ingress) don't time
// out an idle SSE connection.
const pingTimer = setInterval(() => {
if (reply.raw.destroyed || reply.raw.writableEnded) return;
reply.raw.write(`: ping\n\n`);
}, SSE_PING_MS);
request.raw.on('close', () => {
clearInterval(pingTimer);
service.unbindSession(sessionId).catch((err: unknown) => {
app.log.warn({ err, sessionId }, 'unbindSession failed');
});
});
return reply;
});
app.post<{ Body: { providerSessionId?: string } }>(
'/api/v1/llms/_provider-heartbeat',
async (request, reply) => {
const sessionId = request.body?.providerSessionId;
if (typeof sessionId !== 'string' || sessionId === '') {
reply.code(400);
return { error: 'providerSessionId required' };
}
await service.heartbeat(sessionId);
return { ok: true };
},
);
app.post<{
Params: { taskId: string };
Body: {
status?: number;
body?: unknown;
chunk?: { data: string; done?: boolean };
error?: string;
};
}>(
'/api/v1/llms/_provider-task/:taskId/result',
async (request, reply) => {
const { taskId } = request.params;
const body = request.body ?? {};
if (typeof body.error === 'string' && body.error !== '') {
const ok = service.failTask(taskId, new Error(body.error));
return { ok };
}
if (body.chunk !== undefined && typeof body.chunk.data === 'string') {
const ok = service.pushTaskChunk(taskId, body.chunk);
return { ok };
}
if (typeof body.status === 'number') {
const ok = service.completeTask(taskId, { status: body.status, body: body.body });
return { ok };
}
reply.code(400);
return { error: 'body must contain one of: { error }, { chunk: { data, done? } }, { status, body }' };
},
);
}
/** Narrow an unknown providers array element into the service's input shape. */
function coerceProviderInput(raw: unknown): {
name: string;
type: string;
model: string;
tier?: string;
description?: string;
extraConfig?: Record<string, unknown>;
} {
if (raw === null || typeof raw !== 'object') {
throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 });
}
const o = raw as Record<string, unknown>;
const name = o['name'];
const type = o['type'];
const model = o['model'];
if (typeof name !== 'string' || typeof type !== 'string' || typeof model !== 'string') {
throw Object.assign(
new Error('provider entry requires string `name`, `type`, `model`'),
{ statusCode: 400 },
);
}
const out: ReturnType<typeof coerceProviderInput> = { name, type, model };
if (typeof o['tier'] === 'string') out.tier = o['tier'];
if (typeof o['description'] === 'string') out.description = o['description'];
if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') {
out.extraConfig = o['extraConfig'] as Record<string, unknown>;
}
return out;
}

View File

@@ -50,6 +50,11 @@ export interface LlmView {
description: string;
apiKeyRef: ApiKeyRef | null;
extraConfig: Record<string, unknown>;
// Virtual-provider lifecycle (kind defaults to 'public' for legacy rows).
kind: 'public' | 'virtual';
status: 'active' | 'inactive' | 'hibernating';
lastHeartbeatAt: Date | null;
inactiveSince: Date | null;
version: number;
createdAt: Date;
updatedAt: Date;
@@ -275,6 +280,10 @@ export class LlmService {
description: row.description,
apiKeyRef,
extraConfig: row.extraConfig as Record<string, unknown>,
kind: row.kind,
status: row.status,
lastHeartbeatAt: row.lastHeartbeatAt,
inactiveSince: row.inactiveSince,
version: row.version,
createdAt: row.createdAt,
updatedAt: row.updatedAt,

View File

@@ -20,6 +20,10 @@ function makeLlmView(overrides: Partial<LlmView> = {}): LlmView {
description: '',
apiKeyRef: { name: 'anthropic-key', key: 'token' },
extraConfig: {},
kind: 'public',
status: 'active',
lastHeartbeatAt: null,
inactiveSince: null,
version: 1,
createdAt: new Date(),
updatedAt: new Date(),
@@ -205,4 +209,87 @@ describe('POST /api/v1/llms/:name/infer', () => {
expect(res.statusCode).toBe(502);
expect(res.json<{ error: string }>().error).toMatch(/upstream down/);
});
// ── Virtual-provider branch (kind=virtual) ──
it('routes kind=virtual non-streaming through VirtualLlmService.enqueueInferTask', async () => {
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ kind: 'virtual', type: 'openai', apiKeyRef: null }),
resolveApiKey: async () => '',
};
const enqueue = vi.fn(async () => ({
taskId: 't-1',
done: Promise.resolve({ status: 200, body: { choices: [{ message: { content: 'hello from relay' } }] } }),
onChunk: () => () => undefined,
}));
app = Fastify({ logger: false });
app.setErrorHandler(errorHandler);
registerLlmInferRoutes(app, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
llmService: svc as any,
adapters: new LlmAdapterRegistry(),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
virtualLlms: { enqueueInferTask: enqueue } as any,
});
await app.ready();
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/vllm-local/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(200);
expect(res.json<{ choices: Array<{ message: { content: string } }> }>().choices[0]!.message.content).toBe('hello from relay');
expect(enqueue).toHaveBeenCalledWith(
'claude',
expect.objectContaining({ messages: expect.any(Array) }),
false,
);
});
it('returns 503 when the publisher is offline (VirtualLlmService throws)', async () => {
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }),
resolveApiKey: async () => '',
};
const enqueue = vi.fn(async () => {
throw Object.assign(new Error('no live SSE session; publisher offline'), { statusCode: 503 });
});
app = Fastify({ logger: false });
app.setErrorHandler(errorHandler);
registerLlmInferRoutes(app, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
llmService: svc as any,
adapters: new LlmAdapterRegistry(),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
virtualLlms: { enqueueInferTask: enqueue } as any,
});
await app.ready();
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/vllm-local/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(503);
expect(res.json<{ error: string }>().error).toMatch(/publisher offline/);
});
it('returns 500 when virtualLlms dep is missing but the row is kind=virtual', async () => {
// Defensive: prior test configurations may not pass virtualLlms. We
// surface a clear server-misconfiguration error rather than calling
// the public-adapter path, which would try to hit an empty URL.
const svc: LlmServiceLike = {
getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }),
resolveApiKey: async () => '',
};
await setupApp(svc, new LlmAdapterRegistry()); // no virtualLlms
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/vllm-local/infer',
payload: { messages: [{ role: 'user', content: 'hi' }] },
});
expect(res.statusCode).toBe(500);
expect(res.json<{ error: string }>().error).toMatch(/virtual LLM dispatch unavailable/);
});
});

View File

@@ -0,0 +1,184 @@
import { describe, it, expect, vi, afterEach } from 'vitest';
import Fastify from 'fastify';
import type { FastifyInstance } from 'fastify';
import { registerVirtualLlmRoutes } from '../src/routes/virtual-llms.js';
import type {
VirtualLlmService,
VirtualSessionHandle,
} from '../src/services/virtual-llm.service.js';
let app: FastifyInstance;
afterEach(async () => {
if (app) await app.close();
});
function fakeService(overrides: Partial<VirtualLlmService> = {}): VirtualLlmService {
return {
register: vi.fn(async (input) => ({
providerSessionId: input.providerSessionId ?? 'sess-generated',
llms: [],
})),
heartbeat: vi.fn(async () => undefined),
bindSession: vi.fn(),
unbindSession: vi.fn(async () => undefined),
enqueueInferTask: vi.fn(),
completeTask: vi.fn(() => true),
pushTaskChunk: vi.fn(() => true),
failTask: vi.fn(() => true),
gcSweep: vi.fn(),
...overrides,
} as unknown as VirtualLlmService;
}
async function setupApp(svc: VirtualLlmService): Promise<FastifyInstance> {
app = Fastify({ logger: false });
registerVirtualLlmRoutes(app, svc);
await app.ready();
return app;
}
describe('POST /api/v1/llms/_provider-register', () => {
it('returns 400 when providers is missing or empty', async () => {
await setupApp(fakeService());
const a = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: {} });
expect(a.statusCode).toBe(400);
const b = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: { providers: [] } });
expect(b.statusCode).toBe(400);
});
it('returns 400 when a provider entry is missing required fields', async () => {
await setupApp(fakeService());
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-register',
payload: { providers: [{ name: 'incomplete' }] },
});
expect(res.statusCode).toBe(400);
});
it('forwards a valid registration to the service and returns 201', async () => {
const register = vi.fn(async () => ({
providerSessionId: 'sess-xyz',
llms: [{ id: 'l1' }],
}));
await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] }));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-register',
payload: {
providerSessionId: 'sess-xyz',
providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }],
},
});
expect(res.statusCode).toBe(201);
expect(register).toHaveBeenCalledWith({
providerSessionId: 'sess-xyz',
providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }],
});
expect(res.json()).toMatchObject({ providerSessionId: 'sess-xyz' });
});
it('surfaces service errors with their declared status code (e.g. 409 conflict)', async () => {
const register = vi.fn(async () => {
throw Object.assign(new Error('Cannot publish over public LLM: dup'), { statusCode: 409 });
});
await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] }));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-register',
payload: { providers: [{ name: 'dup', type: 'openai', model: 'm' }] },
});
expect(res.statusCode).toBe(409);
expect(res.json()).toMatchObject({ error: expect.stringMatching(/public LLM/) });
});
});
describe('POST /api/v1/llms/_provider-heartbeat', () => {
it('returns 400 without providerSessionId', async () => {
await setupApp(fakeService());
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-heartbeat',
payload: {},
});
expect(res.statusCode).toBe(400);
});
it('forwards the sessionId to service.heartbeat', async () => {
const heartbeat = vi.fn(async () => undefined);
await setupApp(fakeService({ heartbeat }));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-heartbeat',
payload: { providerSessionId: 'sess-abc' },
});
expect(res.statusCode).toBe(200);
expect(heartbeat).toHaveBeenCalledWith('sess-abc');
});
});
describe('POST /api/v1/llms/_provider-task/:taskId/result', () => {
it('forwards { error } to service.failTask', async () => {
const failTask = vi.fn(() => true);
await setupApp(fakeService({ failTask }));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-task/t-1/result',
payload: { error: 'upstream blew up' },
});
expect(res.statusCode).toBe(200);
expect(failTask).toHaveBeenCalledWith('t-1', expect.objectContaining({ message: 'upstream blew up' }));
});
it('forwards { chunk } to service.pushTaskChunk', async () => {
const pushTaskChunk = vi.fn(() => true);
await setupApp(fakeService({ pushTaskChunk }));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-task/t-2/result',
payload: { chunk: { data: 'hello' } },
});
expect(res.statusCode).toBe(200);
expect(pushTaskChunk).toHaveBeenCalledWith('t-2', { data: 'hello' });
});
it('forwards { status, body } to service.completeTask', async () => {
const completeTask = vi.fn(() => true);
await setupApp(fakeService({ completeTask }));
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-task/t-3/result',
payload: { status: 200, body: { ok: true } },
});
expect(res.statusCode).toBe(200);
expect(completeTask).toHaveBeenCalledWith('t-3', { status: 200, body: { ok: true } });
});
it('returns 400 for an empty/unrecognised result body', async () => {
await setupApp(fakeService());
const res = await app.inject({
method: 'POST',
url: '/api/v1/llms/_provider-task/t-4/result',
payload: {},
});
expect(res.statusCode).toBe(400);
});
});
describe('GET /api/v1/llms/_provider-stream', () => {
it('returns 400 without the x-mcpctl-provider-session header', async () => {
await setupApp(fakeService());
const res = await app.inject({
method: 'GET',
url: '/api/v1/llms/_provider-stream',
});
expect(res.statusCode).toBe(400);
});
// Note: a full SSE handshake test would require a real HTTP listen
// because `app.inject` holds the response open and never returns under
// the `text/event-stream` keep-alive. The smoke test in Stage 6 spins
// up a real listener and exercises the open → bind → task → close
// round-trip end to end.
});