feat: v5 durable inference task queue #70
Reference in New Issue
Block a user
Delete Branch "feat/inference-task-queue"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
v5 makes inference tasks first-class persisted entities. mcpd's
in-memory request map is gone; every infer call lands as an
InferenceTaskrow that survives restart, worker disconnect, and"no worker is online right now". Workers drain queued work when
they bind. Two API surfaces, one queue underneath:
POST /llms/<name>/inferPOST /agents/<name>/chat. Caller's HTTP request waits, exactlylike pre-v5; internally the row carries the work.
POST /api/v1/inference-tasks. Caller gets atask id immediately and polls / streams / cancels separately.
Four stages
ed21ad1):InferenceTaskschema + repository +service with full state machine. EventEmitter-backed in-process
signal channel for blocked HTTP handlers; the data model is
multi-instance-ready (pg LISTEN/NOTIFY swap is just the wakeup
layer in v6). 19 new tests.
7b18bb6):VirtualLlmServicerewired through thequeue. In-memory
tasksByIdmap removed; wake tasks keep theirown small in-memory map. Worker disconnect reverts claimed/
running rows back to pending; worker bind drains pending. New
failFastoption preserves the v1-v4 fast-fail semantic forcallers that need it (chat.service pool failover, direct infer).
1dcfdc8): Async API endpoints + RBACtasksresource.POST/GET/DELETE/list +
/streamSSE. Owner-scoped at the routelayer; cross-user requires
view:tasksresource grant; foreign-owner ids return 404 to prevent enumeration.
<this>): CLI (mcpctl get tasks,chat-llm --async), GC ticker (1h pending timeout, 7d terminal retention),live smoke, full docs (
docs/inference-tasks.md).Test plan
enqueue → bind worker → drain → completed; cancel pending → row
persists as cancelled; GET on foreign owner returns 404.
The persistence + signaling layer for v5. No integration with the existing in-flight inference path yet — that's Stage 2. This commit just lands the durable queue underneath, with a state machine that mcpd's HTTP handlers, the worker result-POST route, and the GC sweep will all build on. Schema (src/db/prisma/schema.prisma + migration): - New `InferenceTask` model + `InferenceTaskStatus` enum (pending|claimed|running|completed|error|cancelled). - Routing fields stored at enqueue time so a later rename of `Llm.poolName` doesn't reroute already-queued work: `poolName` (effective pool key), `llmName` (pinned target), `model`, `tier`. - Worker tracking: `claimedBy` (providerSessionId) + `claimedAt`, cleared on revert. - Bodies as `Json`: requestBody (always set), responseBody (set at completion). Streaming chunks are NOT persisted — too expensive at delta granularity. The final assembled body lands once per task. - Lifecycle timestamps: createdAt, claimedAt, streamStartedAt, completedAt. Plus ownerId (RBAC + audit) and agentId (null for direct chat-llm calls). - Indexes for the hot paths: (status, poolName) for the dispatcher's drain query, claimedBy for the disconnect revert, completedAt for the GC retention sweep, owner/agent for the async API listing. Repository (src/mcpd/src/repositories/inference-task.repository.ts): - CRUD + state transitions as conditional CAS via `updateMany`. Two workers racing to claim the same row both run the UPDATE; whichever the DB serializes first sees affected=1 and gets the row, the loser sees 0 and falls through to the next candidate. No application- level locking required. - findPendingForPools(poolNames[]) for the worker drain on bind. - findHeldBy(claimedBy) for the unbindSession revert. - findStalePending + findExpiredTerminal for the GC sweep. Service (src/mcpd/src/services/inference-task.service.ts): - Owns the in-process EventEmitter that wakes blocked HTTP handlers when a worker POSTs results. The DB row is the source of truth for *state*; the EventEmitter just signals "go re-read row X" so we don't have to poll. Single-instance assumption for v5; pg LISTEN/NOTIFY is the v6 swap when scaling horizontally — no schema change needed, just replace the emitter wakeup. - waitFor(taskId, timeoutMs) returns { done, chunks }: the terminal promise + an async iterator of streaming deltas. Throws on cancel (clear message) or error (worker's errorMessage propagates) or timeout. Polls the row once at subscribe time so an already- terminal task resolves immediately without waiting for an event that's never coming. - gcSweep flips stale pending rows to error (with a clear message about the timeout) and deletes terminal rows past retention. Defaults: 1h pending timeout, 7d terminal retention; both configurable. Tests: - 6 db-level schema tests (defaults, json roundtrip, drain query shape, claimedBy filter, GC predicate, agentId nullable). - 13 service tests covering enqueue, the CAS race on tryClaim, complete/fail/cancel, idempotent terminal transitions, revertHeldBy on disconnect, and the full waitFor signal lifecycle (immediate resolve, wake on event, chunk streaming, cancel/error/timeout paths). Plus a gcSweep test with a fixed clock. mcpd 881/881 (was 868; +13). db pool-schema 14/14, +6 new inference-task-schema. Pre-existing failures in models.test.ts (Secret FK fixture issue, also fails on main HEAD) are unrelated. Stage 2 (next): VirtualLlmService rewires through this — remove the in-memory pendingTasks map; enqueue creates a row, dispatch picks an active session, the result-route updates the row + emits the wakeup. Worker disconnect reverts; worker bind drains.The in-memory `tasksById` map for inference tasks is gone. Every inference call lands as a row in `InferenceTask`; the result POST updates the row + emits a wakeup; the in-flight HTTP handler unblocks on the wake. mcpd surviving a restart no longer drops in-flight tasks, and a worker disconnecting mid-task no longer fails the caller — the row reverts to pending and a sibling worker on the same pool drains it. Wake tasks (publisher control messages, not inference) keep their own small in-memory map (`wakeTasks`). They're millisecond-scoped and don't benefit from durability — a missed wake on restart just means the next infer fires a fresh wake. Behavioral changes worth flagging: - Worker disconnect mid-task: WAS reject ref.done with "publisher disconnected"; NOW revert claimed/running rows to pending. Original caller's ref.done keeps waiting up to INFER_AWAIT_TIMEOUT_MS (10 min); whichever worker delivers the result fulfills it. - bindSession drains pending tasks for the session's pool keys. So tasks queued while no worker was up automatically get dispatched when one shows up. The drain matches by *effective pool key* (poolName ?? name) — tasks queued against vllm-alice get drained by any session whose owned Llms share alice's pool. - New `failFast: true` option on enqueueInferTask (default: false). Existing callers that NEED fast-fail get it explicitly: - Direct `/api/v1/llms/<name>/infer` route: caller pinned a specific Llm and wants 503 immediately if the publisher is offline; queueing for an unknown future worker would surprise. - chat.service pool failover loop: it iterates pool candidates and needs each candidate's transport failure to surface fast. Without failFast, a downed pool member would absorb the call into the queue and the loop would wait 10 min before trying the next. The async API route (Stage 3) leaves failFast=false — that's the whole point of the durable queue path. - VirtualLlmService now requires an InferenceTaskService dep at construction. Older test wirings that didn't pass it get a clear "InferenceTaskService not wired" error from enqueueInferTask rather than a confusing in-memory stub. Tests: - 12 existing virtual-llm-service tests updated for the new semantics: "rejects when no session" → "queues durably"; "rejects when row inactive" → "still queues (pool may have a sibling)"; "unbindSession rejects in-flight tasks" → "reverts to pending". Wake-task probing now uses `wakeTasks` instead of `tasksById`. - 3 new v5-specific tests: drain-on-bind matches by effective pool key (not just name); enqueue without a session keeps the row pending; completeTask via the result-route updates the DB and emits the wakeup that resolves ref.done. - chat-service-virtual-llm + llm-infer-route assertions updated to expect the new {failFast: true} option arg. mcpd 884/884 (was 881; +3 v5 cases). mcplocal 723/723. Full smoke suite 144/144 against the deployed queue-backed mcpd. Stage 3 (next): expose the durable queue via async API endpoints. POST /api/v1/inference-tasks (enqueue with failFast=false), GET /api/v1/inference-tasks/:id (poll), GET /api/v1/inference-tasks/:id/stream (SSE), DELETE /api/v1/inference-tasks/:id (cancel). New `tasks` RBAC resource.Exposes the durable queue (Stage 1+2) as a first-class API so callers can enqueue work, get a task id immediately, and poll/stream/cancel without holding open the original HTTP connection. New endpoints (`/api/v1/inference-tasks`): POST / → enqueue, return task id (201 + row). failFast:false — task stays pending if no worker is up; future bindSession drains. Rejects 400 for public Llms (the existing /llms/<name>/infer is the right tool there) and 404 for missing Llms. GET / → list owner's tasks. Optional ?status, ?poolName, ?agentId, ?limit query. Owner-scoped at the route layer; cross- user listing requires resource-wide grant. GET /:id → poll one task. 404 (not 403) on a foreign-owner id to prevent enumeration. DELETE /:id → cancel a non-terminal task. Already- terminal rows return 200 + current shape (no-op). 404 on foreign owner. GET /:id/stream → SSE feed of `chunk` and `terminal` events. Re-fetches the row at subscribe time so already-completed tasks emit one terminal event and close immediately. RBAC: - New `tasks` resource added to RBAC_RESOURCES + the URL→permission map in main.ts. Default action mapping: GET=view, POST=create, DELETE=delete. The route layer enforces owner-scoping ON TOP of the hook (404 on foreign owner) — without this, anyone with `view:tasks` could list/peek every user's queued work. - Singular alias `task` and the multi-word `inference-task` / `inference-tasks` all normalize to `tasks` so users can write `mcpctl create rbac-binding --resource task --role view ...` or any of the variants and have it map correctly. Tests: 9 new route tests covering the wire shapes, owner scoping (matching/foreign), public-Llm rejection, missing-Llm 404, list filter, and cancel semantics (pending→cancelled, terminal→no-op). mcpd 893/893 (was 884, +9). Live smoke: POST against a public Llm returns the documented 400, POST against missing returns 404, GET list returns [] cleanly. Stage 4 (next): CLI surface (`mcpctl get tasks`, `--async` flag on chat-llm), GC ticker, smoke test (enqueue → connect worker → drain), docs.CLI surface for the durable queue: - `mcpctl get tasks` — table view (ID, STATUS, POOL, LLM, MODEL, STREAM, AGE, WORKER). Aliases `task`, `tasks`, `inference-task`, `inference-tasks` all normalize to the canonical plural so URL construction works uniformly. RESOURCE_ALIASES + completions generator updated. - `mcpctl chat-llm <name> --async -m <msg>` — enqueue and exit. stdout is just the task id (pipeable into `xargs mcpctl get task`); stderr carries human-readable status. REPL mode is rejected for --async (fire-and-forget doesn't make sense without -m). GC ticker in mcpd: 5-min interval. Pending tasks past 1 h queue timeout flip to error with a clear message; terminal tasks past 7 d retention get deleted. Both queries are index-backed. Crash fix uncovered by the smoke: when the async route doesn't await ref.done, a later cancel/error rejected the in-flight Promise as unhandled and crashed mcpd. The route now attaches a no-op `.catch` so the legacy `done` semantic still works for sync callers (chat, direct infer) without taking out the process for async ones. The EnqueueInferOptions also gained an explicit `ownerId` field so the async API can stamp the authenticated user on the row instead of inheriting 'system' from the constructor's resolveOwner — without this, every GET/DELETE from the original caller would 404 due to foreign-owner mismatch. Smoke (tests/smoke/inference-task.smoke.test.ts): 1. POST /inference-tasks while no worker bound → row=pending. 2. Bring a registrar online → bindSession drain claims and dispatches → worker complete()s → row=completed → GET returns the assistant body. 3. Stop worker, enqueue, DELETE → row=cancelled, persisted. docs/inference-tasks.md (new): full data model, lifecycle diagram, async API reference, CLI examples, RBAC table, GC defaults, and the v5 limitations / v6 roadmap. Cross-linked from virtual-llms.md and agents.md. Tests + smoke: mcpd 893/893, mcplocal 723/723, cli 437/437, full smoke 146/146 (was 144, +2 new task smoke). Live mcpd verified via manual curl: enqueue → cancel → re-fetch — no crash, owner scoping returns 404 on foreign ids, GC ticker logs at info when it sweeps. v5 complete: durable queue (Stage 1) + VirtualLlmService rewire (Stage 2) + async API & RBAC (Stage 3) + CLI/GC/smoke/docs (Stage 4).CLI surface for the durable queue: - `mcpctl get tasks` — table view (ID, STATUS, POOL, LLM, MODEL, STREAM, AGE, WORKER). Aliases `task`, `tasks`, `inference-task`, `inference-tasks` all normalize to the canonical plural so URL construction works uniformly. RESOURCE_ALIASES + completions generator updated. - `mcpctl chat-llm <name> --async -m <msg>` — enqueue and exit. stdout is just the task id (pipeable into `xargs mcpctl get task`); stderr carries human-readable status. REPL mode is rejected for --async (fire-and-forget doesn't make sense without -m). GC ticker in mcpd: 5-min interval. Pending tasks past 1 h queue timeout flip to error with a clear message; terminal tasks past 7 d retention get deleted. Both queries are index-backed. Crash fix uncovered by the smoke: when the async route doesn't await ref.done, a later cancel/error rejected the in-flight Promise as unhandled and crashed mcpd. The route now attaches a no-op `.catch` so the legacy `done` semantic still works for sync callers (chat, direct infer) without taking out the process for async ones. The EnqueueInferOptions also gained an explicit `ownerId` field so the async API can stamp the authenticated user on the row instead of inheriting 'system' from the constructor's resolveOwner — without this, every GET/DELETE from the original caller would 404 due to foreign-owner mismatch. Smoke (tests/smoke/inference-task.smoke.test.ts): 1. POST /inference-tasks while no worker bound → row=pending. 2. Bring a registrar online → bindSession drain claims and dispatches → worker complete()s → row=completed → GET returns the assistant body. 3. Stop worker, enqueue, DELETE → row=cancelled, persisted. docs/inference-tasks.md (new): full data model, lifecycle diagram, async API reference, CLI examples, RBAC table, GC defaults, and the v5 limitations / v6 roadmap. Cross-linked from virtual-llms.md and agents.md. Tests + smoke: mcpd 893/893, mcplocal 723/723, cli 437/437, full smoke 146/146 (was 144, +2 new task smoke). Live mcpd verified via manual curl: enqueue → cancel → re-fetch — no crash, owner scoping returns 404 on foreign ids, GC ticker logs at info when it sweeps. v5 complete: durable queue (Stage 1) + VirtualLlmService rewire (Stage 2) + async API & RBAC (Stage 3) + CLI/GC/smoke/docs (Stage 4).