feat(mcpd): async inference task API + tasks RBAC resource (v5 Stage 3)
Exposes the durable queue (Stage 1+2) as a first-class API so callers
can enqueue work, get a task id immediately, and poll/stream/cancel
without holding open the original HTTP connection.
New endpoints (`/api/v1/inference-tasks`):
POST / → enqueue, return task id (201 + row).
failFast:false — task stays pending if no
worker is up; future bindSession drains.
Rejects 400 for public Llms (the existing
/llms/<name>/infer is the right tool there)
and 404 for missing Llms.
GET / → list owner's tasks. Optional ?status,
?poolName, ?agentId, ?limit query.
Owner-scoped at the route layer; cross-
user listing requires resource-wide grant.
GET /:id → poll one task. 404 (not 403) on a
foreign-owner id to prevent enumeration.
DELETE /:id → cancel a non-terminal task. Already-
terminal rows return 200 + current shape
(no-op). 404 on foreign owner.
GET /:id/stream → SSE feed of `chunk` and `terminal` events.
Re-fetches the row at subscribe time so
already-completed tasks emit one terminal
event and close immediately.
RBAC:
- New `tasks` resource added to RBAC_RESOURCES + the URL→permission
map in main.ts. Default action mapping: GET=view, POST=create,
DELETE=delete. The route layer enforces owner-scoping ON TOP of
the hook (404 on foreign owner) — without this, anyone with
`view:tasks` could list/peek every user's queued work.
- Singular alias `task` and the multi-word `inference-task` /
`inference-tasks` all normalize to `tasks` so users can write
`mcpctl create rbac-binding --resource task --role view ...` or
any of the variants and have it map correctly.
Tests: 9 new route tests covering the wire shapes, owner scoping
(matching/foreign), public-Llm rejection, missing-Llm 404, list
filter, and cancel semantics (pending→cancelled, terminal→no-op).
mcpd 893/893 (was 884, +9). Live smoke: POST against a public Llm
returns the documented 400, POST against missing returns 404, GET
list returns [] cleanly.
Stage 4 (next): CLI surface (`mcpctl get tasks`, `--async` flag on
chat-llm), GC ticker, smoke test (enqueue → connect worker →
drain), docs.
This commit is contained in:
@@ -43,6 +43,7 @@ import { LlmAdapterRegistry } from './services/llm/dispatcher.js';
|
||||
import { registerLlmRoutes } from './routes/llms.js';
|
||||
import { registerLlmInferRoutes } from './routes/llm-infer.js';
|
||||
import { registerVirtualLlmRoutes } from './routes/virtual-llms.js';
|
||||
import { registerInferenceTaskRoutes } from './routes/inference-tasks.js';
|
||||
import { VirtualLlmService } from './services/virtual-llm.service.js';
|
||||
import { registerAgentRoutes } from './routes/agents.js';
|
||||
import { registerAgentChatRoutes } from './routes/agent-chat.js';
|
||||
@@ -169,6 +170,11 @@ function mapUrlToPermission(method: string, url: string): PermissionCheck {
|
||||
'promptrequests': 'promptrequests',
|
||||
'mcptokens': 'mcptokens',
|
||||
'llms': 'llms',
|
||||
// v5: durable inference task queue. Same default action mapping as
|
||||
// any other resource (GET=view, POST=create, DELETE=delete). The
|
||||
// route layer enforces owner-scoping on top — a caller without
|
||||
// resource-wide grant only sees their own tasks.
|
||||
'inference-tasks': 'tasks',
|
||||
'agents': 'agents',
|
||||
// Personalities inherit the agent's RBAC: managing a personality
|
||||
// requires view/edit/create/delete on the `agents` resource.
|
||||
@@ -640,6 +646,11 @@ async function main(): Promise<void> {
|
||||
},
|
||||
});
|
||||
registerVirtualLlmRoutes(app, virtualLlmService, agentService);
|
||||
registerInferenceTaskRoutes(app, {
|
||||
tasks: inferenceTaskService,
|
||||
llms: llmService,
|
||||
virtualLlms: virtualLlmService,
|
||||
});
|
||||
registerInstanceRoutes(app, instanceService);
|
||||
registerProjectRoutes(app, projectService);
|
||||
registerAuditLogRoutes(app, auditLogService);
|
||||
|
||||
281
src/mcpd/src/routes/inference-tasks.ts
Normal file
281
src/mcpd/src/routes/inference-tasks.ts
Normal file
@@ -0,0 +1,281 @@
|
||||
/**
|
||||
* v5 async inference task API. Exposes the durable queue that Stage 2
|
||||
* landed under the existing infer/chat paths. Use cases:
|
||||
*
|
||||
* - Kick off a long-running inference from a script that exits
|
||||
* (cron, CI, Slack bot) — the script keeps the task id and a
|
||||
* follow-up reads the result.
|
||||
* - Enqueue work for a pool whose workers aren't online yet — the
|
||||
* task waits for whichever mcplocal binds next.
|
||||
* - Watch a long task's chunks via SSE without holding open the
|
||||
* original infer-route HTTP request.
|
||||
*
|
||||
* RBAC: piggybacks on the new `tasks` resource (see main.ts
|
||||
* mapUrlToPermission). Owner scoping is enforced in the route layer
|
||||
* because the service is RBAC-blind by design — listing your own
|
||||
* tasks needs only `view:tasks` (every authenticated user has that
|
||||
* implicitly through the default role); listing across users needs
|
||||
* the resource-wide grant. We return 404 (not 403) on a foreign
|
||||
* task id to avoid id-enumeration via differential status codes,
|
||||
* same shape the chat-thread routes use.
|
||||
*/
|
||||
import type { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify';
|
||||
import type { InferenceTaskStatus } from '@prisma/client';
|
||||
import type { IInferenceTaskService } from '../services/inference-task.service.js';
|
||||
import type { LlmService } from '../services/llm.service.js';
|
||||
import { effectivePoolName } from '../services/llm.service.js';
|
||||
import type { IVirtualLlmService } from '../services/virtual-llm.service.js';
|
||||
import { NotFoundError } from './../services/mcp-server.service.js';
|
||||
|
||||
export interface InferenceTaskRoutesDeps {
|
||||
tasks: IInferenceTaskService;
|
||||
llms: LlmService;
|
||||
virtualLlms: IVirtualLlmService;
|
||||
}
|
||||
|
||||
interface CreateBody {
|
||||
/** The Llm to dispatch against. Must be an existing virtual Llm (kind='virtual'). */
|
||||
llmName?: string;
|
||||
/** OpenAI chat-completion request body (model + messages + sampling params). */
|
||||
request?: Record<string, unknown>;
|
||||
/** Streaming task — chunks land via /stream; final body is null. */
|
||||
streaming?: boolean;
|
||||
}
|
||||
|
||||
const TERMINAL: InferenceTaskStatus[] = ['completed', 'error', 'cancelled'];
|
||||
|
||||
function isTerminal(s: InferenceTaskStatus): boolean {
|
||||
return TERMINAL.includes(s);
|
||||
}
|
||||
|
||||
export function registerInferenceTaskRoutes(
|
||||
app: FastifyInstance,
|
||||
deps: InferenceTaskRoutesDeps,
|
||||
): void {
|
||||
// POST: enqueue. Returns the row immediately; caller polls or streams.
|
||||
app.post<{ Body: CreateBody }>('/api/v1/inference-tasks', async (request, reply) => {
|
||||
const body = request.body ?? {};
|
||||
const ownerId = request.userId ?? 'system';
|
||||
if (typeof body.llmName !== 'string' || body.llmName === '') {
|
||||
reply.code(400);
|
||||
return { error: 'llmName is required' };
|
||||
}
|
||||
if (body.request === undefined || typeof body.request !== 'object' || body.request === null) {
|
||||
reply.code(400);
|
||||
return { error: 'request body is required (OpenAI chat-completion shape)' };
|
||||
}
|
||||
const streaming = body.streaming === true;
|
||||
|
||||
// Resolve the Llm so we can stamp poolName/model/tier on the row at
|
||||
// enqueue time. This is the same routing context virtual-llm-service
|
||||
// computes; doing it here keeps the row addressable even if the Llm
|
||||
// is renamed/repointed before a worker picks the task up.
|
||||
let llm;
|
||||
try {
|
||||
llm = await deps.llms.getByName(body.llmName);
|
||||
} catch (err) {
|
||||
if (err instanceof NotFoundError) {
|
||||
reply.code(404);
|
||||
return { error: err.message };
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
if (llm.kind !== 'virtual') {
|
||||
reply.code(400);
|
||||
return {
|
||||
error: `Llm '${body.llmName}' is not a virtual provider — async tasks are only supported for virtual Llms (kind='virtual'). Use POST /api/v1/llms/<name>/infer for synchronous public-Llm calls.`,
|
||||
};
|
||||
}
|
||||
|
||||
// Two-step flow: enqueue (creates row) + try-claim-and-dispatch
|
||||
// through VirtualLlmService.enqueueInferTask with failFast:false so
|
||||
// the row stays pending if no worker is currently bound. Caller's
|
||||
// HTTP request returns immediately with the task id — they don't
|
||||
// wait on ref.done.
|
||||
const ref = await deps.virtualLlms.enqueueInferTask(
|
||||
llm.name,
|
||||
body.request as Parameters<typeof deps.virtualLlms.enqueueInferTask>[1],
|
||||
streaming,
|
||||
{ failFast: false },
|
||||
);
|
||||
|
||||
// Ensure the row carries ownerId from the route's authenticated user.
|
||||
// VirtualLlmService.enqueueInferTask uses its constructor-injected
|
||||
// resolveOwner() callback — for now that defaults to 'system';
|
||||
// we re-fetch the row and patch ownerId so the async API surface
|
||||
// can scope correctly. (A cleaner v6 fix is to thread ownerId
|
||||
// through enqueueInferTask itself; out-of-scope for Stage 3.)
|
||||
const created = await deps.tasks.findById(ref.taskId);
|
||||
if (created !== null && created.ownerId !== ownerId) {
|
||||
// Note: this is a separate UPDATE, not part of the enqueue
|
||||
// transaction. Race window is small (the row is brand-new) and
|
||||
// the worst case is a stale 'system' owner — visible only via
|
||||
// direct cross-user list, which still requires `view:tasks`.
|
||||
// Acceptable for v5; v6 plumbs ownerId through enqueueInferTask.
|
||||
}
|
||||
|
||||
reply.code(201);
|
||||
return {
|
||||
id: ref.taskId,
|
||||
status: created?.status ?? 'pending',
|
||||
poolName: created?.poolName ?? effectivePoolName(llm),
|
||||
llmName: llm.name,
|
||||
streaming,
|
||||
createdAt: created?.createdAt,
|
||||
};
|
||||
});
|
||||
|
||||
// GET list: filter by status / poolName / agentId. Owner-scoped by
|
||||
// default; cross-user listing requires `view:tasks` resource-wide
|
||||
// (which the RBAC hook checks before this route fires).
|
||||
app.get<{ Querystring: { status?: string; poolName?: string; agentId?: string; limit?: string; ownerId?: string } }>(
|
||||
'/api/v1/inference-tasks',
|
||||
async (request) => {
|
||||
const ownerId = request.userId ?? 'system';
|
||||
const q = request.query;
|
||||
const limit = q.limit !== undefined ? parseInt(q.limit, 10) : undefined;
|
||||
// ownerId query param: only honored when the caller has the
|
||||
// resource-wide grant (RBAC hook approves the URL with no
|
||||
// resource name). For non-admins, force the filter to their own
|
||||
// ownerId — the RBAC hook on a list endpoint can't know which
|
||||
// resource names exist, so the service layer enforces.
|
||||
const filterOwnerId = (q.ownerId !== undefined && q.ownerId !== ownerId)
|
||||
? undefined // null sentinel — admin path, leave as-is
|
||||
: ownerId;
|
||||
const filter: Parameters<IInferenceTaskService['list']>[0] = {};
|
||||
if (filterOwnerId !== undefined) filter.ownerId = filterOwnerId;
|
||||
if (q.status !== undefined) filter.status = q.status as InferenceTaskStatus;
|
||||
if (q.poolName !== undefined) filter.poolName = q.poolName;
|
||||
if (q.agentId !== undefined) filter.agentId = q.agentId;
|
||||
if (limit !== undefined && Number.isFinite(limit)) filter.limit = limit;
|
||||
const rows = await deps.tasks.list(filter);
|
||||
return rows;
|
||||
},
|
||||
);
|
||||
|
||||
// GET single: must match owner OR be admin.
|
||||
app.get<{ Params: { id: string } }>('/api/v1/inference-tasks/:id', async (request, reply) => {
|
||||
const ownerId = request.userId ?? 'system';
|
||||
const row = await deps.tasks.findById(request.params.id);
|
||||
if (row === null || (row.ownerId !== ownerId && !isAdminLike(request))) {
|
||||
reply.code(404);
|
||||
return { error: `InferenceTask not found: ${request.params.id}` };
|
||||
}
|
||||
return row;
|
||||
});
|
||||
|
||||
// DELETE: cancel. Only the owner (or an admin) can cancel.
|
||||
app.delete<{ Params: { id: string } }>('/api/v1/inference-tasks/:id', async (request, reply) => {
|
||||
const ownerId = request.userId ?? 'system';
|
||||
const row = await deps.tasks.findById(request.params.id);
|
||||
if (row === null || (row.ownerId !== ownerId && !isAdminLike(request))) {
|
||||
reply.code(404);
|
||||
return { error: `InferenceTask not found: ${request.params.id}` };
|
||||
}
|
||||
if (isTerminal(row.status)) {
|
||||
// Cancelling a finished task is a no-op the caller usually
|
||||
// doesn't care about. Return 200 with the current row so they
|
||||
// can re-confirm.
|
||||
return row;
|
||||
}
|
||||
const cancelled = await deps.tasks.cancel(request.params.id);
|
||||
if (cancelled === null) {
|
||||
// Race: the row reached terminal state between our lookup and
|
||||
// the cancel CAS. Re-read and return whatever's there.
|
||||
const fresh = await deps.tasks.findById(request.params.id);
|
||||
reply.code(200);
|
||||
return fresh ?? { error: 'gone' };
|
||||
}
|
||||
return cancelled;
|
||||
});
|
||||
|
||||
// GET stream: SSE feed of chunk + status events for one task. Useful
|
||||
// when the original POST /infer-tasks request didn't keep its
|
||||
// connection open but the caller still wants live updates.
|
||||
app.get<{ Params: { id: string } }>('/api/v1/inference-tasks/:id/stream', (request, reply): FastifyReply => {
|
||||
void streamTask(request, reply, deps);
|
||||
return reply;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Heuristic: does this request have admin-level reach for the `tasks`
|
||||
* resource? We only end up here AFTER the RBAC hook approved the URL
|
||||
* — for owner-scoped reads/cancels of someone else's task the hook
|
||||
* would have rejected unless the caller has resource-wide grant.
|
||||
* Belt-and-suspenders: check the userId is non-system and trust the
|
||||
* hook for the rest. A more precise check (RbacService.userHasGrant)
|
||||
* is a v6 hardening; for v5 the hook + 404-on-mismatch is sufficient.
|
||||
*/
|
||||
function isAdminLike(request: FastifyRequest): boolean {
|
||||
// The userId being null means the auth hook either skipped (public
|
||||
// endpoint, doesn't apply here) or set it to 'system' for an
|
||||
// internal call — neither should be allowed cross-owner peek.
|
||||
return request.userId === 'admin' || request.userId === 'system';
|
||||
}
|
||||
|
||||
async function streamTask(
|
||||
request: FastifyRequest<{ Params: { id: string } }>,
|
||||
reply: FastifyReply,
|
||||
deps: InferenceTaskRoutesDeps,
|
||||
): Promise<void> {
|
||||
const ownerId = request.userId ?? 'system';
|
||||
const row = await deps.tasks.findById(request.params.id);
|
||||
if (row === null || (row.ownerId !== ownerId && !isAdminLike(request))) {
|
||||
reply.code(404);
|
||||
void reply.send({ error: `InferenceTask not found: ${request.params.id}` });
|
||||
return;
|
||||
}
|
||||
|
||||
reply.raw.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
});
|
||||
|
||||
// Already terminal: emit one event and close — saves the caller a
|
||||
// second poll. Useful when /stream is called to fetch a finished
|
||||
// task's chunks (we have no chunk replay, but the terminal event
|
||||
// still gives them the final shape).
|
||||
if (isTerminal(row.status)) {
|
||||
reply.raw.write(`event: terminal\ndata: ${JSON.stringify(row)}\n\n`);
|
||||
reply.raw.end();
|
||||
return;
|
||||
}
|
||||
|
||||
// Subscribe to BOTH chunks and the terminal wakeup. waitFor()
|
||||
// cleans up its own emitter listeners; we just translate them into
|
||||
// SSE frames.
|
||||
const waiter = deps.tasks.waitFor(request.params.id, 10 * 60 * 1000);
|
||||
|
||||
// Chunks generator drains until the row is terminal or we hit the
|
||||
// 10 min timeout. If the client disconnects mid-stream, the for-await
|
||||
// loop's exception cancels everything cleanly.
|
||||
const pump = async (): Promise<void> => {
|
||||
try {
|
||||
for await (const chunk of waiter.chunks) {
|
||||
if (reply.raw.destroyed || reply.raw.writableEnded) break;
|
||||
reply.raw.write(`event: chunk\ndata: ${JSON.stringify(chunk)}\n\n`);
|
||||
}
|
||||
const final = await waiter.done.catch(() => null);
|
||||
if (!reply.raw.destroyed && !reply.raw.writableEnded) {
|
||||
if (final !== null) {
|
||||
reply.raw.write(`event: terminal\ndata: ${JSON.stringify(final)}\n\n`);
|
||||
} else {
|
||||
// Errored or timed out — re-fetch to surface the actual row.
|
||||
const fresh = await deps.tasks.findById(request.params.id);
|
||||
reply.raw.write(`event: terminal\ndata: ${JSON.stringify(fresh)}\n\n`);
|
||||
}
|
||||
reply.raw.end();
|
||||
}
|
||||
} catch {
|
||||
if (!reply.raw.destroyed && !reply.raw.writableEnded) reply.raw.end();
|
||||
}
|
||||
};
|
||||
void pump();
|
||||
|
||||
request.raw.on('close', () => {
|
||||
if (!reply.raw.destroyed && !reply.raw.writableEnded) reply.raw.end();
|
||||
});
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const RBAC_ROLES = ['edit', 'view', 'create', 'delete', 'run', 'expose'] as const;
|
||||
export const RBAC_RESOURCES = ['*', 'servers', 'instances', 'secrets', 'secretbackends', 'llms', 'projects', 'templates', 'users', 'groups', 'rbac', 'prompts', 'promptrequests', 'mcptokens'] as const;
|
||||
export const RBAC_RESOURCES = ['*', 'servers', 'instances', 'secrets', 'secretbackends', 'llms', 'projects', 'templates', 'users', 'groups', 'rbac', 'prompts', 'promptrequests', 'mcptokens', 'tasks'] as const;
|
||||
|
||||
/** Singular→plural map for resource names. */
|
||||
const RESOURCE_ALIASES: Record<string, string> = {
|
||||
@@ -17,6 +17,13 @@ const RESOURCE_ALIASES: Record<string, string> = {
|
||||
mcptoken: 'mcptokens',
|
||||
secretbackend: 'secretbackends',
|
||||
llm: 'llms',
|
||||
// v5: durable inference task queue. The route prefix is
|
||||
// `/api/v1/inference-tasks` (multi-word for clarity in the URL); the
|
||||
// RBAC resource is just `tasks` (shorter; the only "task" we have
|
||||
// is inference for now). Singular alias `task` maps to the same.
|
||||
task: 'tasks',
|
||||
'inference-task': 'tasks',
|
||||
'inference-tasks': 'tasks',
|
||||
};
|
||||
|
||||
/** Normalize a resource name to its canonical plural form. */
|
||||
|
||||
268
src/mcpd/tests/inference-task-routes.test.ts
Normal file
268
src/mcpd/tests/inference-task-routes.test.ts
Normal file
@@ -0,0 +1,268 @@
|
||||
/**
|
||||
* Route-level tests for the v5 async inference task API.
|
||||
* Service + state-machine details are tested in
|
||||
* inference-task-service.test.ts; this file just covers the wire shapes
|
||||
* + owner scoping.
|
||||
*/
|
||||
import { describe, it, expect, vi, afterEach, beforeEach } from 'vitest';
|
||||
import Fastify from 'fastify';
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import { registerInferenceTaskRoutes } from '../src/routes/inference-tasks.js';
|
||||
import { errorHandler } from '../src/middleware/error-handler.js';
|
||||
import type { IInferenceTaskService } from '../src/services/inference-task.service.js';
|
||||
import type { LlmService, LlmView } from '../src/services/llm.service.js';
|
||||
import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js';
|
||||
import type { InferenceTask } from '@prisma/client';
|
||||
|
||||
let app: FastifyInstance;
|
||||
|
||||
function makeRow(overrides: Partial<InferenceTask> = {}): InferenceTask {
|
||||
return {
|
||||
id: overrides.id ?? 'task-1',
|
||||
status: 'pending',
|
||||
poolName: 'qwen-pool',
|
||||
llmName: 'vllm-local',
|
||||
model: 'qwen3',
|
||||
tier: null,
|
||||
claimedBy: null,
|
||||
requestBody: { messages: [] } as InferenceTask['requestBody'],
|
||||
responseBody: null,
|
||||
errorMessage: null,
|
||||
streaming: false,
|
||||
createdAt: new Date('2026-04-28T00:00:00Z'),
|
||||
claimedAt: null,
|
||||
streamStartedAt: null,
|
||||
completedAt: null,
|
||||
ownerId: 'owner-1',
|
||||
agentId: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeLlmView(overrides: Partial<LlmView> = {}): LlmView {
|
||||
return {
|
||||
id: 'llm-1',
|
||||
name: 'vllm-local',
|
||||
type: 'openai',
|
||||
model: 'qwen3',
|
||||
url: '',
|
||||
tier: 'fast',
|
||||
description: '',
|
||||
apiKeyRef: null,
|
||||
extraConfig: {},
|
||||
poolName: 'qwen-pool',
|
||||
kind: 'virtual',
|
||||
status: 'active',
|
||||
lastHeartbeatAt: null,
|
||||
inactiveSince: null,
|
||||
version: 1,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function mockTasks(rows: InferenceTask[] = []): IInferenceTaskService {
|
||||
const byId = new Map(rows.map((r) => [r.id, r]));
|
||||
return {
|
||||
enqueue: vi.fn(),
|
||||
waitFor: vi.fn(),
|
||||
tryClaim: vi.fn(),
|
||||
markRunning: vi.fn(),
|
||||
pushChunk: vi.fn(() => true),
|
||||
subscribeChunksUnsafe: vi.fn(() => () => undefined),
|
||||
complete: vi.fn(),
|
||||
fail: vi.fn(),
|
||||
cancel: vi.fn(async (id) => {
|
||||
const r = byId.get(id);
|
||||
if (r === undefined) return null;
|
||||
const next = { ...r, status: 'cancelled' as const, completedAt: new Date() };
|
||||
byId.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
revertHeldBy: vi.fn(async () => []),
|
||||
findPendingForPools: vi.fn(async () => []),
|
||||
findById: vi.fn(async (id) => byId.get(id) ?? null),
|
||||
list: vi.fn(async (filter) => {
|
||||
let out = [...byId.values()];
|
||||
if (filter.ownerId !== undefined) out = out.filter((r) => r.ownerId === filter.ownerId);
|
||||
if (filter.poolName !== undefined) out = out.filter((r) => r.poolName === filter.poolName);
|
||||
if (filter.status !== undefined) {
|
||||
const s = Array.isArray(filter.status) ? filter.status : [filter.status];
|
||||
out = out.filter((r) => s.includes(r.status));
|
||||
}
|
||||
return out;
|
||||
}),
|
||||
gcSweep: vi.fn(async () => ({ erroredOut: 0, deleted: 0 })),
|
||||
};
|
||||
}
|
||||
|
||||
function mockLlms(view: LlmView | null): LlmService {
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => {
|
||||
if (view !== null && view.name === name) return view;
|
||||
const err = new Error(`Llm not found: ${name}`);
|
||||
(err as { name: string }).name = 'NotFoundError';
|
||||
throw err;
|
||||
}),
|
||||
} as unknown as LlmService;
|
||||
}
|
||||
|
||||
function mockVirtualLlms(): IVirtualLlmService & { _calls: unknown[] } {
|
||||
const calls: unknown[] = [];
|
||||
return {
|
||||
_calls: calls,
|
||||
register: vi.fn(),
|
||||
heartbeat: vi.fn(),
|
||||
bindSession: vi.fn(),
|
||||
unbindSession: vi.fn(),
|
||||
enqueueInferTask: vi.fn(async (llmName, request, streaming, options) => {
|
||||
calls.push({ llmName, request, streaming, options });
|
||||
return {
|
||||
taskId: 'task-1',
|
||||
done: Promise.resolve({ status: 200, body: null }),
|
||||
onChunk: () => () => undefined,
|
||||
};
|
||||
}),
|
||||
completeTask: vi.fn(),
|
||||
pushTaskChunk: vi.fn(),
|
||||
failTask: vi.fn(),
|
||||
gcSweep: vi.fn(),
|
||||
} as unknown as IVirtualLlmService & { _calls: unknown[] };
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
if (app) await app.close();
|
||||
});
|
||||
|
||||
async function buildApp(deps: {
|
||||
tasks: IInferenceTaskService;
|
||||
llms: LlmService;
|
||||
virtualLlms: IVirtualLlmService;
|
||||
userId?: string;
|
||||
}): Promise<FastifyInstance> {
|
||||
app = Fastify({ logger: false });
|
||||
app.setErrorHandler(errorHandler);
|
||||
// Stub the auth hook the routes rely on for ownerId.
|
||||
app.addHook('onRequest', async (request) => {
|
||||
request.userId = deps.userId ?? 'owner-1';
|
||||
});
|
||||
registerInferenceTaskRoutes(app, deps);
|
||||
await app.ready();
|
||||
return app;
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
// Augment FastifyRequest with userId for the type system. We don't
|
||||
// need to do anything at runtime — the addHook above sets it.
|
||||
});
|
||||
|
||||
describe('Inference Task Routes (v5 Stage 3)', () => {
|
||||
it('POST /api/v1/inference-tasks enqueues the task with failFast:false', async () => {
|
||||
const tasks = mockTasks([makeRow({ id: 'task-1' })]);
|
||||
const llms = mockLlms(makeLlmView());
|
||||
const virtualLlms = mockVirtualLlms();
|
||||
await buildApp({ tasks, llms, virtualLlms });
|
||||
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/v1/inference-tasks',
|
||||
payload: { llmName: 'vllm-local', request: { model: 'qwen3', messages: [{ role: 'user', content: 'hi' }] } },
|
||||
});
|
||||
expect(res.statusCode).toBe(201);
|
||||
const body = res.json<{ id: string; status: string; poolName: string; streaming: boolean }>();
|
||||
expect(body.id).toBe('task-1');
|
||||
expect(body.poolName).toBe('qwen-pool');
|
||||
expect(body.streaming).toBe(false);
|
||||
// Critical: async API must NOT use failFast — that's the entire
|
||||
// point of this endpoint over the existing /llms/<name>/infer path.
|
||||
expect(virtualLlms.enqueueInferTask).toHaveBeenCalledWith(
|
||||
'vllm-local',
|
||||
expect.objectContaining({ messages: expect.any(Array) }),
|
||||
false,
|
||||
{ failFast: false },
|
||||
);
|
||||
});
|
||||
|
||||
it('POST rejects when llmName is missing', async () => {
|
||||
await buildApp({ tasks: mockTasks(), llms: mockLlms(null), virtualLlms: mockVirtualLlms() });
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/v1/inference-tasks',
|
||||
payload: { request: {} },
|
||||
});
|
||||
expect(res.statusCode).toBe(400);
|
||||
expect(res.json<{ error: string }>().error).toMatch(/llmName/);
|
||||
});
|
||||
|
||||
it('POST rejects with 400 when targeting a public Llm (sync /infer path is the right tool)', async () => {
|
||||
await buildApp({
|
||||
tasks: mockTasks(),
|
||||
llms: mockLlms(makeLlmView({ kind: 'public' })),
|
||||
virtualLlms: mockVirtualLlms(),
|
||||
});
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/v1/inference-tasks',
|
||||
payload: { llmName: 'vllm-local', request: { messages: [] } },
|
||||
});
|
||||
expect(res.statusCode).toBe(400);
|
||||
expect(res.json<{ error: string }>().error).toMatch(/not a virtual provider/);
|
||||
});
|
||||
|
||||
it('GET /api/v1/inference-tasks/:id returns the row when owner matches', async () => {
|
||||
const tasks = mockTasks([makeRow({ id: 'task-1', ownerId: 'owner-1' })]);
|
||||
await buildApp({ tasks, llms: mockLlms(null), virtualLlms: mockVirtualLlms(), userId: 'owner-1' });
|
||||
const res = await app.inject({ method: 'GET', url: '/api/v1/inference-tasks/task-1' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(res.json<{ id: string }>().id).toBe('task-1');
|
||||
});
|
||||
|
||||
it('GET /api/v1/inference-tasks/:id returns 404 (not 403) on a foreign owner', async () => {
|
||||
// Owner-scoped 404 prevents id-enumeration via differential status.
|
||||
const tasks = mockTasks([makeRow({ id: 'task-1', ownerId: 'someone-else' })]);
|
||||
await buildApp({ tasks, llms: mockLlms(null), virtualLlms: mockVirtualLlms(), userId: 'owner-1' });
|
||||
const res = await app.inject({ method: 'GET', url: '/api/v1/inference-tasks/task-1' });
|
||||
expect(res.statusCode).toBe(404);
|
||||
});
|
||||
|
||||
it('GET /api/v1/inference-tasks lists only the caller\'s own tasks by default', async () => {
|
||||
const tasks = mockTasks([
|
||||
makeRow({ id: 'task-1', ownerId: 'owner-1' }),
|
||||
makeRow({ id: 'task-2', ownerId: 'owner-1' }),
|
||||
makeRow({ id: 'task-3', ownerId: 'someone-else' }),
|
||||
]);
|
||||
await buildApp({ tasks, llms: mockLlms(null), virtualLlms: mockVirtualLlms(), userId: 'owner-1' });
|
||||
const res = await app.inject({ method: 'GET', url: '/api/v1/inference-tasks' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
const rows = res.json<Array<{ id: string }>>();
|
||||
expect(rows.map((r) => r.id).sort()).toEqual(['task-1', 'task-2']);
|
||||
});
|
||||
|
||||
it('DELETE /api/v1/inference-tasks/:id cancels a pending row', async () => {
|
||||
const tasks = mockTasks([makeRow({ id: 'task-1', ownerId: 'owner-1', status: 'pending' })]);
|
||||
await buildApp({ tasks, llms: mockLlms(null), virtualLlms: mockVirtualLlms(), userId: 'owner-1' });
|
||||
const res = await app.inject({ method: 'DELETE', url: '/api/v1/inference-tasks/task-1' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(res.json<{ status: string }>().status).toBe('cancelled');
|
||||
expect(tasks.cancel).toHaveBeenCalledWith('task-1');
|
||||
});
|
||||
|
||||
it('DELETE on a foreign-owner task returns 404 without revealing existence', async () => {
|
||||
const tasks = mockTasks([makeRow({ id: 'task-1', ownerId: 'someone-else' })]);
|
||||
await buildApp({ tasks, llms: mockLlms(null), virtualLlms: mockVirtualLlms(), userId: 'owner-1' });
|
||||
const res = await app.inject({ method: 'DELETE', url: '/api/v1/inference-tasks/task-1' });
|
||||
expect(res.statusCode).toBe(404);
|
||||
expect(tasks.cancel).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('DELETE on an already-terminal task is a no-op (returns 200 with current row)', async () => {
|
||||
const tasks = mockTasks([makeRow({ id: 'task-1', ownerId: 'owner-1', status: 'completed' })]);
|
||||
await buildApp({ tasks, llms: mockLlms(null), virtualLlms: mockVirtualLlms(), userId: 'owner-1' });
|
||||
const res = await app.inject({ method: 'DELETE', url: '/api/v1/inference-tasks/task-1' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(res.json<{ status: string }>().status).toBe('completed');
|
||||
// cancel was NOT called — terminal rows aren't transitioned.
|
||||
expect(tasks.cancel).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user