diff --git a/src/mcpd/src/main.ts b/src/mcpd/src/main.ts index fc05068..03ce6fd 100644 --- a/src/mcpd/src/main.ts +++ b/src/mcpd/src/main.ts @@ -40,6 +40,8 @@ import { ChatToolDispatcherImpl } from './services/chat-tool-dispatcher.js'; import { LlmAdapterRegistry } from './services/llm/dispatcher.js'; import { registerLlmRoutes } from './routes/llms.js'; import { registerLlmInferRoutes } from './routes/llm-infer.js'; +import { registerVirtualLlmRoutes } from './routes/virtual-llms.js'; +import { VirtualLlmService } from './services/virtual-llm.service.js'; import { registerAgentRoutes } from './routes/agents.js'; import { registerAgentChatRoutes } from './routes/agent-chat.js'; import { PromptRepository } from './repositories/prompt.repository.js'; @@ -433,6 +435,10 @@ async function main(): Promise { adapters: llmAdapters, log: { warn: (msg) => app.log.warn(msg) }, }); + // Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker + // is started below after `app.listen` so it doesn't fire before the + // server is accepting traffic. + const virtualLlmService = new VirtualLlmService(llmRepo); // AgentService + ChatService get fully wired below once projectService and // mcpProxyService are constructed (ChatService needs them via the // ChatToolDispatcher bridge). @@ -606,6 +612,7 @@ async function main(): Promise { registerLlmInferRoutes(app, { llmService, adapters: llmAdapters, + virtualLlms: virtualLlmService, onInferenceEvent: (event) => { app.log.info({ event: 'llm_inference_call', @@ -620,6 +627,7 @@ async function main(): Promise { }); }, }); + registerVirtualLlmRoutes(app, virtualLlmService); registerInstanceRoutes(app, instanceService); registerProjectRoutes(app, projectService); registerAuditLogRoutes(app, auditLogService); @@ -753,6 +761,21 @@ async function main(): Promise { } }, RECONCILE_INTERVAL_MS); + // Virtual-LLM GC sweep — flips heartbeat-stale rows to inactive (90 s + // cutoff) and deletes inactives past the 4 h retention window. Runs + // every 60 s; cheap (two indexed queries) when there are no virtuals. + const VIRTUAL_LLM_GC_INTERVAL_MS = 60_000; + const virtualLlmGcTimer = setInterval(async () => { + try { + const { markedInactive, deleted } = await virtualLlmService.gcSweep(); + if (markedInactive > 0 || deleted > 0) { + app.log.info(`[virtual-llm gc] markedInactive=${String(markedInactive)} deleted=${String(deleted)}`); + } + } catch (err) { + app.log.error({ err }, 'Virtual LLM GC sweep failed'); + } + }, VIRTUAL_LLM_GC_INTERVAL_MS); + // Health probe runner — periodic MCP probes (like k8s livenessProbe). // Without explicit healthCheck.tool, probes send tools/list through // McpProxyService so they traverse the exact production call path. @@ -787,6 +810,7 @@ async function main(): Promise { setupGracefulShutdown(app, { disconnectDb: async () => { clearInterval(reconcileTimer); + clearInterval(virtualLlmGcTimer); healthProbeRunner.stop(); secretBackendRotatorLoop.stop(); gitBackup.stop(); diff --git a/src/mcpd/src/routes/llm-infer.ts b/src/mcpd/src/routes/llm-infer.ts index 6c93cc9..38a84f7 100644 --- a/src/mcpd/src/routes/llm-infer.ts +++ b/src/mcpd/src/routes/llm-infer.ts @@ -15,12 +15,20 @@ import type { FastifyInstance, FastifyReply } from 'fastify'; import type { LlmService } from '../services/llm.service.js'; import type { LlmAdapterRegistry } from '../services/llm/dispatcher.js'; +import type { VirtualLlmService } from '../services/virtual-llm.service.js'; import { NotFoundError } from '../services/mcp-server.service.js'; import type { OpenAiChatRequest, InferContext } from '../services/llm/types.js'; export interface LlmInferDeps { llmService: LlmService; adapters: LlmAdapterRegistry; + /** + * Optional. When provided, requests for `kind=virtual` Llm rows are + * fanned through the SSE control channel rather than calling an + * upstream URL directly. Required for v1 of the virtual-LLM feature; + * absent in older test configurations. + */ + virtualLlms?: VirtualLlmService; /** Optional hook to emit audit events — consumer may ignore. */ onInferenceEvent?: (event: InferenceAuditEvent) => void; } @@ -62,6 +70,73 @@ export function registerLlmInferRoutes( return { error: 'messages is required' }; } + const streaming = body.stream === true; + + const audit = (status: number): void => { + if (deps.onInferenceEvent === undefined) return; + deps.onInferenceEvent({ + kind: 'llm_inference_call', + llmName: llm.name, + model: llm.model, + type: llm.type, + userId: request.userId, + tokenSha: request.mcpToken?.tokenSha, + streaming, + durationMs: Date.now() - started, + status, + }); + }; + + // ── Virtual-provider branch ── + // For kind=virtual rows there is no upstream URL — inference is fanned + // through the SSE control channel back to the publishing mcplocal. + // VirtualLlmService.enqueueInferTask handles the routing. + if (llm.kind === 'virtual') { + if (deps.virtualLlms === undefined) { + reply.code(500); + audit(500); + return { error: 'virtual LLM dispatch unavailable (server misconfiguration)' }; + } + try { + if (!streaming) { + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, false); + const result = await ref.done; + reply.code(result.status); + audit(result.status); + return result.body; + } + // Streaming: open SSE response, fan chunks from the result stream + // into outgoing SSE frames. + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + const ref = await deps.virtualLlms.enqueueInferTask(llm.name, body, true); + const unsubscribe = ref.onChunk((chunk) => writeSseChunk(reply, chunk.data)); + try { + await ref.done; + audit(200); + } catch (err) { + const payload = JSON.stringify({ error: err instanceof Error ? err.message : String(err) }); + writeSseChunk(reply, payload); + audit(502); + } finally { + unsubscribe(); + if (!reply.raw.writableEnded) reply.raw.end(); + } + return reply; + } catch (err) { + const status = (err as { statusCode?: number }).statusCode ?? 502; + reply.code(status); + audit(status); + return { error: err instanceof Error ? err.message : String(err) }; + } + } + + // ── Public-provider branch (existing behavior) ── + // Resolve API key (may be empty string for providers that don't take one). let apiKey = ''; if (llm.apiKeyRef !== null) { @@ -82,22 +157,6 @@ export function registerLlmInferRoutes( }; const adapter = deps.adapters.get(llm.type); - const streaming = body.stream === true; - - const audit = (status: number): void => { - if (deps.onInferenceEvent === undefined) return; - deps.onInferenceEvent({ - kind: 'llm_inference_call', - llmName: llm.name, - model: llm.model, - type: llm.type, - userId: request.userId, - tokenSha: request.mcpToken?.tokenSha, - streaming, - durationMs: Date.now() - started, - status, - }); - }; if (!streaming) { try { diff --git a/src/mcpd/src/routes/virtual-llms.ts b/src/mcpd/src/routes/virtual-llms.ts new file mode 100644 index 0000000..59664cb --- /dev/null +++ b/src/mcpd/src/routes/virtual-llms.ts @@ -0,0 +1,174 @@ +/** + * Routes for the virtual-LLM control plane (`kind=virtual` Llm rows). + * + * POST /api/v1/llms/_provider-register — register/refresh, returns sessionId + * GET /api/v1/llms/_provider-stream — SSE channel; mcpd → mcplocal task fan-out + * POST /api/v1/llms/_provider-heartbeat — keep-alive (every 30 s from mcplocal) + * POST /api/v1/llms/_provider-task/:id/result — mcplocal pushes result/chunks back + * + * RBAC: these all live under `/api/v1/llms/...` so the existing + * `mapUrlToPermission` in main.ts maps them to the `llms` resource — + * POST = create:llms, GET = view:llms. That's appropriate: publishing + * a virtual LLM is morally the same as creating one. + * + * Inference for virtual rows still lands on `/api/v1/llms/:name/infer` + * (unchanged URL); that route gains a `kind=virtual` branch in this stage + * and delegates here via VirtualLlmService. + */ +import type { FastifyInstance, FastifyReply } from 'fastify'; +import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js'; + +const SSE_PING_MS = 20_000; +const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session'; + +export function registerVirtualLlmRoutes( + app: FastifyInstance, + service: VirtualLlmService, +): void { + app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>( + '/api/v1/llms/_provider-register', + async (request, reply) => { + const body = (request.body ?? {}); + const providers = Array.isArray(body.providers) ? body.providers : null; + if (providers === null || providers.length === 0) { + reply.code(400); + return { error: '`providers` array is required and must be non-empty' }; + } + + try { + const result = await service.register({ + providerSessionId: body.providerSessionId ?? null, + providers: providers.map(coerceProviderInput), + }); + reply.code(201); + return result; + } catch (err) { + const status = (err as { statusCode?: number }).statusCode ?? 500; + reply.code(status); + return { error: (err as Error).message }; + } + }, + ); + + app.get('/api/v1/llms/_provider-stream', (request, reply): FastifyReply => { + const sessionHeader = request.headers[PROVIDER_SESSION_HEADER]; + const sessionId = typeof sessionHeader === 'string' ? sessionHeader : null; + if (sessionId === null || sessionId === '') { + reply.code(400); + void reply.send({ error: `${PROVIDER_SESSION_HEADER} header is required (call /_provider-register first)` }); + return reply; + } + + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + + const handle: VirtualSessionHandle = { + pushTask(task: VirtualTaskFrame): void { + if (reply.raw.destroyed || reply.raw.writableEnded) return; + reply.raw.write(`event: task\ndata: ${JSON.stringify(task)}\n\n`); + }, + get alive(): boolean { + return !reply.raw.destroyed && !reply.raw.writableEnded; + }, + }; + + service.bindSession(sessionId, handle); + reply.raw.write(`event: hello\ndata: ${JSON.stringify({ sessionId })}\n\n`); + + // Keep-alive comment lines so proxies (Cilium, k8s ingress) don't time + // out an idle SSE connection. + const pingTimer = setInterval(() => { + if (reply.raw.destroyed || reply.raw.writableEnded) return; + reply.raw.write(`: ping\n\n`); + }, SSE_PING_MS); + + request.raw.on('close', () => { + clearInterval(pingTimer); + service.unbindSession(sessionId).catch((err: unknown) => { + app.log.warn({ err, sessionId }, 'unbindSession failed'); + }); + }); + + return reply; + }); + + app.post<{ Body: { providerSessionId?: string } }>( + '/api/v1/llms/_provider-heartbeat', + async (request, reply) => { + const sessionId = request.body?.providerSessionId; + if (typeof sessionId !== 'string' || sessionId === '') { + reply.code(400); + return { error: 'providerSessionId required' }; + } + await service.heartbeat(sessionId); + return { ok: true }; + }, + ); + + app.post<{ + Params: { taskId: string }; + Body: { + status?: number; + body?: unknown; + chunk?: { data: string; done?: boolean }; + error?: string; + }; + }>( + '/api/v1/llms/_provider-task/:taskId/result', + async (request, reply) => { + const { taskId } = request.params; + const body = request.body ?? {}; + + if (typeof body.error === 'string' && body.error !== '') { + const ok = service.failTask(taskId, new Error(body.error)); + return { ok }; + } + if (body.chunk !== undefined && typeof body.chunk.data === 'string') { + const ok = service.pushTaskChunk(taskId, body.chunk); + return { ok }; + } + if (typeof body.status === 'number') { + const ok = service.completeTask(taskId, { status: body.status, body: body.body }); + return { ok }; + } + + reply.code(400); + return { error: 'body must contain one of: { error }, { chunk: { data, done? } }, { status, body }' }; + }, + ); +} + +/** Narrow an unknown providers array element into the service's input shape. */ +function coerceProviderInput(raw: unknown): { + name: string; + type: string; + model: string; + tier?: string; + description?: string; + extraConfig?: Record; +} { + if (raw === null || typeof raw !== 'object') { + throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 }); + } + const o = raw as Record; + const name = o['name']; + const type = o['type']; + const model = o['model']; + if (typeof name !== 'string' || typeof type !== 'string' || typeof model !== 'string') { + throw Object.assign( + new Error('provider entry requires string `name`, `type`, `model`'), + { statusCode: 400 }, + ); + } + const out: ReturnType = { name, type, model }; + if (typeof o['tier'] === 'string') out.tier = o['tier']; + if (typeof o['description'] === 'string') out.description = o['description']; + if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') { + out.extraConfig = o['extraConfig'] as Record; + } + return out; +} diff --git a/src/mcpd/src/services/llm.service.ts b/src/mcpd/src/services/llm.service.ts index 6dd3932..92c5edd 100644 --- a/src/mcpd/src/services/llm.service.ts +++ b/src/mcpd/src/services/llm.service.ts @@ -50,6 +50,11 @@ export interface LlmView { description: string; apiKeyRef: ApiKeyRef | null; extraConfig: Record; + // Virtual-provider lifecycle (kind defaults to 'public' for legacy rows). + kind: 'public' | 'virtual'; + status: 'active' | 'inactive' | 'hibernating'; + lastHeartbeatAt: Date | null; + inactiveSince: Date | null; version: number; createdAt: Date; updatedAt: Date; @@ -275,6 +280,10 @@ export class LlmService { description: row.description, apiKeyRef, extraConfig: row.extraConfig as Record, + kind: row.kind, + status: row.status, + lastHeartbeatAt: row.lastHeartbeatAt, + inactiveSince: row.inactiveSince, version: row.version, createdAt: row.createdAt, updatedAt: row.updatedAt, diff --git a/src/mcpd/tests/llm-infer-route.test.ts b/src/mcpd/tests/llm-infer-route.test.ts index 20e5339..33279e3 100644 --- a/src/mcpd/tests/llm-infer-route.test.ts +++ b/src/mcpd/tests/llm-infer-route.test.ts @@ -20,6 +20,10 @@ function makeLlmView(overrides: Partial = {}): LlmView { description: '', apiKeyRef: { name: 'anthropic-key', key: 'token' }, extraConfig: {}, + kind: 'public', + status: 'active', + lastHeartbeatAt: null, + inactiveSince: null, version: 1, createdAt: new Date(), updatedAt: new Date(), @@ -205,4 +209,87 @@ describe('POST /api/v1/llms/:name/infer', () => { expect(res.statusCode).toBe(502); expect(res.json<{ error: string }>().error).toMatch(/upstream down/); }); + + // ── Virtual-provider branch (kind=virtual) ── + + it('routes kind=virtual non-streaming through VirtualLlmService.enqueueInferTask', async () => { + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', type: 'openai', apiKeyRef: null }), + resolveApiKey: async () => '', + }; + const enqueue = vi.fn(async () => ({ + taskId: 't-1', + done: Promise.resolve({ status: 200, body: { choices: [{ message: { content: 'hello from relay' } }] } }), + onChunk: () => () => undefined, + })); + app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + registerLlmInferRoutes(app, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + llmService: svc as any, + adapters: new LlmAdapterRegistry(), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + virtualLlms: { enqueueInferTask: enqueue } as any, + }); + await app.ready(); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(200); + expect(res.json<{ choices: Array<{ message: { content: string } }> }>().choices[0]!.message.content).toBe('hello from relay'); + expect(enqueue).toHaveBeenCalledWith( + 'claude', + expect.objectContaining({ messages: expect.any(Array) }), + false, + ); + }); + + it('returns 503 when the publisher is offline (VirtualLlmService throws)', async () => { + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }), + resolveApiKey: async () => '', + }; + const enqueue = vi.fn(async () => { + throw Object.assign(new Error('no live SSE session; publisher offline'), { statusCode: 503 }); + }); + app = Fastify({ logger: false }); + app.setErrorHandler(errorHandler); + registerLlmInferRoutes(app, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + llmService: svc as any, + adapters: new LlmAdapterRegistry(), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + virtualLlms: { enqueueInferTask: enqueue } as any, + }); + await app.ready(); + + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(503); + expect(res.json<{ error: string }>().error).toMatch(/publisher offline/); + }); + + it('returns 500 when virtualLlms dep is missing but the row is kind=virtual', async () => { + // Defensive: prior test configurations may not pass virtualLlms. We + // surface a clear server-misconfiguration error rather than calling + // the public-adapter path, which would try to hit an empty URL. + const svc: LlmServiceLike = { + getByName: async () => makeLlmView({ kind: 'virtual', apiKeyRef: null, type: 'openai' }), + resolveApiKey: async () => '', + }; + await setupApp(svc, new LlmAdapterRegistry()); // no virtualLlms + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/vllm-local/infer', + payload: { messages: [{ role: 'user', content: 'hi' }] }, + }); + expect(res.statusCode).toBe(500); + expect(res.json<{ error: string }>().error).toMatch(/virtual LLM dispatch unavailable/); + }); }); diff --git a/src/mcpd/tests/virtual-llm-routes.test.ts b/src/mcpd/tests/virtual-llm-routes.test.ts new file mode 100644 index 0000000..a4477bc --- /dev/null +++ b/src/mcpd/tests/virtual-llm-routes.test.ts @@ -0,0 +1,184 @@ +import { describe, it, expect, vi, afterEach } from 'vitest'; +import Fastify from 'fastify'; +import type { FastifyInstance } from 'fastify'; +import { registerVirtualLlmRoutes } from '../src/routes/virtual-llms.js'; +import type { + VirtualLlmService, + VirtualSessionHandle, +} from '../src/services/virtual-llm.service.js'; + +let app: FastifyInstance; + +afterEach(async () => { + if (app) await app.close(); +}); + +function fakeService(overrides: Partial = {}): VirtualLlmService { + return { + register: vi.fn(async (input) => ({ + providerSessionId: input.providerSessionId ?? 'sess-generated', + llms: [], + })), + heartbeat: vi.fn(async () => undefined), + bindSession: vi.fn(), + unbindSession: vi.fn(async () => undefined), + enqueueInferTask: vi.fn(), + completeTask: vi.fn(() => true), + pushTaskChunk: vi.fn(() => true), + failTask: vi.fn(() => true), + gcSweep: vi.fn(), + ...overrides, + } as unknown as VirtualLlmService; +} + +async function setupApp(svc: VirtualLlmService): Promise { + app = Fastify({ logger: false }); + registerVirtualLlmRoutes(app, svc); + await app.ready(); + return app; +} + +describe('POST /api/v1/llms/_provider-register', () => { + it('returns 400 when providers is missing or empty', async () => { + await setupApp(fakeService()); + const a = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: {} }); + expect(a.statusCode).toBe(400); + const b = await app.inject({ method: 'POST', url: '/api/v1/llms/_provider-register', payload: { providers: [] } }); + expect(b.statusCode).toBe(400); + }); + + it('returns 400 when a provider entry is missing required fields', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { providers: [{ name: 'incomplete' }] }, + }); + expect(res.statusCode).toBe(400); + }); + + it('forwards a valid registration to the service and returns 201', async () => { + const register = vi.fn(async () => ({ + providerSessionId: 'sess-xyz', + llms: [{ id: 'l1' }], + })); + await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { + providerSessionId: 'sess-xyz', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }], + }, + }); + expect(res.statusCode).toBe(201); + expect(register).toHaveBeenCalledWith({ + providerSessionId: 'sess-xyz', + providers: [{ name: 'vllm-local', type: 'openai', model: 'm', tier: 'fast', extraConfig: { gpu: 1 } }], + }); + expect(res.json()).toMatchObject({ providerSessionId: 'sess-xyz' }); + }); + + it('surfaces service errors with their declared status code (e.g. 409 conflict)', async () => { + const register = vi.fn(async () => { + throw Object.assign(new Error('Cannot publish over public LLM: dup'), { statusCode: 409 }); + }); + await setupApp(fakeService({ register: register as unknown as VirtualLlmService['register'] })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-register', + payload: { providers: [{ name: 'dup', type: 'openai', model: 'm' }] }, + }); + expect(res.statusCode).toBe(409); + expect(res.json()).toMatchObject({ error: expect.stringMatching(/public LLM/) }); + }); +}); + +describe('POST /api/v1/llms/_provider-heartbeat', () => { + it('returns 400 without providerSessionId', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-heartbeat', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); + + it('forwards the sessionId to service.heartbeat', async () => { + const heartbeat = vi.fn(async () => undefined); + await setupApp(fakeService({ heartbeat })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-heartbeat', + payload: { providerSessionId: 'sess-abc' }, + }); + expect(res.statusCode).toBe(200); + expect(heartbeat).toHaveBeenCalledWith('sess-abc'); + }); +}); + +describe('POST /api/v1/llms/_provider-task/:taskId/result', () => { + it('forwards { error } to service.failTask', async () => { + const failTask = vi.fn(() => true); + await setupApp(fakeService({ failTask })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-1/result', + payload: { error: 'upstream blew up' }, + }); + expect(res.statusCode).toBe(200); + expect(failTask).toHaveBeenCalledWith('t-1', expect.objectContaining({ message: 'upstream blew up' })); + }); + + it('forwards { chunk } to service.pushTaskChunk', async () => { + const pushTaskChunk = vi.fn(() => true); + await setupApp(fakeService({ pushTaskChunk })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-2/result', + payload: { chunk: { data: 'hello' } }, + }); + expect(res.statusCode).toBe(200); + expect(pushTaskChunk).toHaveBeenCalledWith('t-2', { data: 'hello' }); + }); + + it('forwards { status, body } to service.completeTask', async () => { + const completeTask = vi.fn(() => true); + await setupApp(fakeService({ completeTask })); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-3/result', + payload: { status: 200, body: { ok: true } }, + }); + expect(res.statusCode).toBe(200); + expect(completeTask).toHaveBeenCalledWith('t-3', { status: 200, body: { ok: true } }); + }); + + it('returns 400 for an empty/unrecognised result body', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/llms/_provider-task/t-4/result', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('GET /api/v1/llms/_provider-stream', () => { + it('returns 400 without the x-mcpctl-provider-session header', async () => { + await setupApp(fakeService()); + const res = await app.inject({ + method: 'GET', + url: '/api/v1/llms/_provider-stream', + }); + expect(res.statusCode).toBe(400); + }); + + // Note: a full SSE handshake test would require a real HTTP listen + // because `app.inject` holds the response open and never returns under + // the `text/event-stream` keep-alive. The smoke test in Stage 6 spins + // up a real listener and exercises the open → bind → task → close + // round-trip end to end. +});