feat: v5 durable inference task queue #70

Merged
michal merged 4 commits from feat/inference-task-queue into main 2026-04-28 23:33:38 +00:00
Owner

Summary

v5 makes inference tasks first-class persisted entities. mcpd's
in-memory request map is gone; every infer call lands as an
InferenceTask row that survives restart, worker disconnect, and
"no worker is online right now". Workers drain queued work when
they bind. Two API surfaces, one queue underneath:

  • Sync (existing path, unchanged externally): POST /llms/<name>/infer
    • POST /agents/<name>/chat. Caller's HTTP request waits, exactly
      like pre-v5; internally the row carries the work.
  • Async (new): POST /api/v1/inference-tasks. Caller gets a
    task id immediately and polls / streams / cancels separately.

Four stages

  • Stage 1 (ed21ad1): InferenceTask schema + repository +
    service with full state machine. EventEmitter-backed in-process
    signal channel for blocked HTTP handlers; the data model is
    multi-instance-ready (pg LISTEN/NOTIFY swap is just the wakeup
    layer in v6). 19 new tests.
  • Stage 2 (7b18bb6): VirtualLlmService rewired through the
    queue. In-memory tasksById map removed; wake tasks keep their
    own small in-memory map. Worker disconnect reverts claimed/
    running rows back to pending; worker bind drains pending. New
    failFast option preserves the v1-v4 fast-fail semantic for
    callers that need it (chat.service pool failover, direct infer).
  • Stage 3 (1dcfdc8): Async API endpoints + RBAC tasks resource.
    POST/GET/DELETE/list + /stream SSE. Owner-scoped at the route
    layer; cross-user requires view:tasks resource grant; foreign-
    owner ids return 404 to prevent enumeration.
  • Stage 4 (<this>): CLI (mcpctl get tasks, chat-llm --async), GC ticker (1h pending timeout, 7d terminal retention),
    live smoke, full docs (docs/inference-tasks.md).

Test plan

  • mcpd unit: 893/893 (was 868; +25 across all stages)
  • mcplocal unit: 723/723
  • cli unit: 437/437
  • db schema: +6 inference-task-schema tests
  • full smoke: 146/146 (was 144; +2 inference-task smoke)
  • Live verified end-to-end against the deployed queue-backed mcpd:
    enqueue → bind worker → drain → completed; cancel pending → row
    persists as cancelled; GET on foreign owner returns 404.
## Summary v5 makes inference tasks first-class persisted entities. mcpd's in-memory request map is gone; every infer call lands as an `InferenceTask` row that survives restart, worker disconnect, and "no worker is online right now". Workers drain queued work when they bind. Two API surfaces, one queue underneath: - **Sync** (existing path, unchanged externally): `POST /llms/<name>/infer` + `POST /agents/<name>/chat`. Caller's HTTP request waits, exactly like pre-v5; internally the row carries the work. - **Async** (new): `POST /api/v1/inference-tasks`. Caller gets a task id immediately and polls / streams / cancels separately. ### Four stages - **Stage 1** (`ed21ad1`): `InferenceTask` schema + repository + service with full state machine. EventEmitter-backed in-process signal channel for blocked HTTP handlers; the data model is multi-instance-ready (pg LISTEN/NOTIFY swap is just the wakeup layer in v6). 19 new tests. - **Stage 2** (`7b18bb6`): `VirtualLlmService` rewired through the queue. In-memory `tasksById` map removed; wake tasks keep their own small in-memory map. Worker disconnect reverts claimed/ running rows back to pending; worker bind drains pending. New `failFast` option preserves the v1-v4 fast-fail semantic for callers that need it (chat.service pool failover, direct infer). - **Stage 3** (`1dcfdc8`): Async API endpoints + RBAC `tasks` resource. POST/GET/DELETE/list + `/stream` SSE. Owner-scoped at the route layer; cross-user requires `view:tasks` resource grant; foreign- owner ids return 404 to prevent enumeration. - **Stage 4** (`<this>`): CLI (`mcpctl get tasks`, `chat-llm --async`), GC ticker (1h pending timeout, 7d terminal retention), live smoke, full docs (`docs/inference-tasks.md`). ### Test plan - [x] mcpd unit: 893/893 (was 868; +25 across all stages) - [x] mcplocal unit: 723/723 - [x] cli unit: 437/437 - [x] db schema: +6 inference-task-schema tests - [x] full smoke: 146/146 (was 144; +2 inference-task smoke) - [x] Live verified end-to-end against the deployed queue-backed mcpd: enqueue → bind worker → drain → completed; cancel pending → row persists as cancelled; GET on foreign owner returns 404.
michal added 4 commits 2026-04-28 14:25:35 +00:00
The persistence + signaling layer for v5. No integration with the
existing in-flight inference path yet — that's Stage 2. This commit
just lands the durable queue underneath, with a state machine that
mcpd's HTTP handlers, the worker result-POST route, and the GC sweep
will all build on.

Schema (src/db/prisma/schema.prisma + migration):

- New `InferenceTask` model + `InferenceTaskStatus` enum
  (pending|claimed|running|completed|error|cancelled).
- Routing fields stored at enqueue time so a later rename of
  `Llm.poolName` doesn't reroute already-queued work: `poolName`
  (effective pool key), `llmName` (pinned target), `model`, `tier`.
- Worker tracking: `claimedBy` (providerSessionId) + `claimedAt`,
  cleared on revert.
- Bodies as `Json`: requestBody (always set), responseBody (set at
  completion). Streaming chunks are NOT persisted — too expensive at
  delta granularity. The final assembled body lands once per task.
- Lifecycle timestamps: createdAt, claimedAt, streamStartedAt,
  completedAt. Plus ownerId (RBAC + audit) and agentId (null for
  direct chat-llm calls).
- Indexes for the hot paths: (status, poolName) for the dispatcher's
  drain query, claimedBy for the disconnect revert, completedAt for
  the GC retention sweep, owner/agent for the async API listing.

Repository (src/mcpd/src/repositories/inference-task.repository.ts):

- CRUD + state transitions as conditional CAS via `updateMany`. Two
  workers racing to claim the same row both run the UPDATE; whichever
  the DB serializes first sees affected=1 and gets the row, the loser
  sees 0 and falls through to the next candidate. No application-
  level locking required.
- findPendingForPools(poolNames[]) for the worker drain on bind.
- findHeldBy(claimedBy) for the unbindSession revert.
- findStalePending + findExpiredTerminal for the GC sweep.

Service (src/mcpd/src/services/inference-task.service.ts):

- Owns the in-process EventEmitter that wakes blocked HTTP handlers
  when a worker POSTs results. The DB row is the source of truth for
  *state*; the EventEmitter just signals "go re-read row X" so we
  don't have to poll. Single-instance assumption for v5; pg
  LISTEN/NOTIFY is the v6 swap when scaling horizontally — no schema
  change needed, just replace the emitter wakeup.
- waitFor(taskId, timeoutMs) returns { done, chunks }: the terminal
  promise + an async iterator of streaming deltas. Throws on cancel
  (clear message) or error (worker's errorMessage propagates) or
  timeout. Polls the row once at subscribe time so an already-
  terminal task resolves immediately without waiting for an event
  that's never coming.
- gcSweep flips stale pending rows to error (with a clear message
  about the timeout) and deletes terminal rows past retention.
  Defaults: 1h pending timeout, 7d terminal retention; both
  configurable.

Tests:
- 6 db-level schema tests (defaults, json roundtrip, drain query
  shape, claimedBy filter, GC predicate, agentId nullable).
- 13 service tests covering enqueue, the CAS race on tryClaim,
  complete/fail/cancel, idempotent terminal transitions, revertHeldBy
  on disconnect, and the full waitFor signal lifecycle (immediate
  resolve, wake on event, chunk streaming, cancel/error/timeout
  paths). Plus a gcSweep test with a fixed clock.

mcpd 881/881 (was 868; +13). db pool-schema 14/14, +6 new
inference-task-schema. Pre-existing failures in models.test.ts
(Secret FK fixture issue, also fails on main HEAD) are unrelated.

Stage 2 (next): VirtualLlmService rewires through this — remove the
in-memory pendingTasks map; enqueue creates a row, dispatch picks an
active session, the result-route updates the row + emits the wakeup.
Worker disconnect reverts; worker bind drains.
The in-memory `tasksById` map for inference tasks is gone. Every
inference call lands as a row in `InferenceTask`; the result POST
updates the row + emits a wakeup; the in-flight HTTP handler unblocks
on the wake. mcpd surviving a restart no longer drops in-flight tasks,
and a worker disconnecting mid-task no longer fails the caller — the
row reverts to pending and a sibling worker on the same pool drains it.

Wake tasks (publisher control messages, not inference) keep their own
small in-memory map (`wakeTasks`). They're millisecond-scoped and
don't benefit from durability — a missed wake on restart just means
the next infer fires a fresh wake.

Behavioral changes worth flagging:

- Worker disconnect mid-task: WAS reject ref.done with "publisher
  disconnected"; NOW revert claimed/running rows to pending. Original
  caller's ref.done keeps waiting up to INFER_AWAIT_TIMEOUT_MS (10
  min); whichever worker delivers the result fulfills it.

- bindSession drains pending tasks for the session's pool keys. So
  tasks queued while no worker was up automatically get dispatched
  when one shows up. The drain matches by *effective pool key*
  (poolName ?? name) — tasks queued against vllm-alice get drained
  by any session whose owned Llms share alice's pool.

- New `failFast: true` option on enqueueInferTask (default: false).
  Existing callers that NEED fast-fail get it explicitly:
    - Direct `/api/v1/llms/<name>/infer` route: caller pinned a
      specific Llm and wants 503 immediately if the publisher is
      offline; queueing for an unknown future worker would surprise.
    - chat.service pool failover loop: it iterates pool candidates
      and needs each candidate's transport failure to surface fast.
      Without failFast, a downed pool member would absorb the call
      into the queue and the loop would wait 10 min before trying
      the next.
  The async API route (Stage 3) leaves failFast=false — that's the
  whole point of the durable queue path.

- VirtualLlmService now requires an InferenceTaskService dep at
  construction. Older test wirings that didn't pass it get a clear
  "InferenceTaskService not wired" error from enqueueInferTask
  rather than a confusing in-memory stub.

Tests:

- 12 existing virtual-llm-service tests updated for the new
  semantics: "rejects when no session" → "queues durably"; "rejects
  when row inactive" → "still queues (pool may have a sibling)";
  "unbindSession rejects in-flight tasks" → "reverts to pending".
  Wake-task probing now uses `wakeTasks` instead of `tasksById`.

- 3 new v5-specific tests: drain-on-bind matches by effective pool
  key (not just name); enqueue without a session keeps the row
  pending; completeTask via the result-route updates the DB and
  emits the wakeup that resolves ref.done.

- chat-service-virtual-llm + llm-infer-route assertions updated to
  expect the new {failFast: true} option arg.

mcpd 884/884 (was 881; +3 v5 cases). mcplocal 723/723. Full smoke
suite 144/144 against the deployed queue-backed mcpd.

Stage 3 (next): expose the durable queue via async API endpoints.
POST /api/v1/inference-tasks (enqueue with failFast=false), GET
/api/v1/inference-tasks/:id (poll), GET /api/v1/inference-tasks/:id/stream
(SSE), DELETE /api/v1/inference-tasks/:id (cancel). New `tasks` RBAC
resource.
Exposes the durable queue (Stage 1+2) as a first-class API so callers
can enqueue work, get a task id immediately, and poll/stream/cancel
without holding open the original HTTP connection.

New endpoints (`/api/v1/inference-tasks`):

  POST   /                  → enqueue, return task id (201 + row).
                              failFast:false — task stays pending if no
                              worker is up; future bindSession drains.
                              Rejects 400 for public Llms (the existing
                              /llms/<name>/infer is the right tool there)
                              and 404 for missing Llms.
  GET    /                  → list owner's tasks. Optional ?status,
                              ?poolName, ?agentId, ?limit query.
                              Owner-scoped at the route layer; cross-
                              user listing requires resource-wide grant.
  GET    /:id               → poll one task. 404 (not 403) on a
                              foreign-owner id to prevent enumeration.
  DELETE /:id               → cancel a non-terminal task. Already-
                              terminal rows return 200 + current shape
                              (no-op). 404 on foreign owner.
  GET    /:id/stream        → SSE feed of `chunk` and `terminal` events.
                              Re-fetches the row at subscribe time so
                              already-completed tasks emit one terminal
                              event and close immediately.

RBAC:

- New `tasks` resource added to RBAC_RESOURCES + the URL→permission
  map in main.ts. Default action mapping: GET=view, POST=create,
  DELETE=delete. The route layer enforces owner-scoping ON TOP of
  the hook (404 on foreign owner) — without this, anyone with
  `view:tasks` could list/peek every user's queued work.
- Singular alias `task` and the multi-word `inference-task` /
  `inference-tasks` all normalize to `tasks` so users can write
  `mcpctl create rbac-binding --resource task --role view ...` or
  any of the variants and have it map correctly.

Tests: 9 new route tests covering the wire shapes, owner scoping
(matching/foreign), public-Llm rejection, missing-Llm 404, list
filter, and cancel semantics (pending→cancelled, terminal→no-op).
mcpd 893/893 (was 884, +9). Live smoke: POST against a public Llm
returns the documented 400, POST against missing returns 404, GET
list returns [] cleanly.

Stage 4 (next): CLI surface (`mcpctl get tasks`, `--async` flag on
chat-llm), GC ticker, smoke test (enqueue → connect worker →
drain), docs.
feat(cli+docs+smoke): inference-task CLI + GC ticker + smoke + docs (v5 Stage 4)
Some checks failed
CI/CD / lint (pull_request) Successful in 55s
CI/CD / test (pull_request) Successful in 1m12s
CI/CD / typecheck (pull_request) Successful in 2m46s
CI/CD / smoke (pull_request) Failing after 1m44s
CI/CD / build (pull_request) Failing after 7m0s
CI/CD / publish (pull_request) Has been skipped
7320b50dac
CLI surface for the durable queue:

- `mcpctl get tasks` — table view (ID, STATUS, POOL, LLM, MODEL,
  STREAM, AGE, WORKER). Aliases `task`, `tasks`, `inference-task`,
  `inference-tasks` all normalize to the canonical plural so URL
  construction works uniformly. RESOURCE_ALIASES + completions
  generator updated.
- `mcpctl chat-llm <name> --async -m <msg>` — enqueue and exit. stdout
  is just the task id (pipeable into `xargs mcpctl get task`); stderr
  carries human-readable status. REPL mode is rejected for --async
  (fire-and-forget doesn't make sense without -m).

GC ticker in mcpd: 5-min interval. Pending tasks past 1 h queue
timeout flip to error with a clear message; terminal tasks past 7 d
retention get deleted. Both queries are index-backed.

Crash fix uncovered by the smoke: when the async route doesn't await
ref.done, a later cancel/error rejected the in-flight Promise as
unhandled and crashed mcpd. The route now attaches a no-op `.catch`
so the legacy `done` semantic still works for sync callers (chat,
direct infer) without taking out the process for async ones. The
EnqueueInferOptions also gained an explicit `ownerId` field so the
async API can stamp the authenticated user on the row instead of
inheriting 'system' from the constructor's resolveOwner — without
this, every GET/DELETE from the original caller would 404 due to
foreign-owner mismatch.

Smoke (tests/smoke/inference-task.smoke.test.ts):

  1. POST /inference-tasks while no worker bound → row=pending.
  2. Bring a registrar online → bindSession drain claims and
     dispatches → worker complete()s → row=completed → GET returns
     the assistant body.
  3. Stop worker, enqueue, DELETE → row=cancelled, persisted.

docs/inference-tasks.md (new): full data model, lifecycle diagram,
async API reference, CLI examples, RBAC table, GC defaults, and the
v5 limitations / v6 roadmap. Cross-linked from virtual-llms.md and
agents.md.

Tests + smoke: mcpd 893/893, mcplocal 723/723, cli 437/437, full
smoke 146/146 (was 144, +2 new task smoke). Live mcpd verified via
manual curl: enqueue → cancel → re-fetch — no crash, owner scoping
returns 404 on foreign ids, GC ticker logs at info when it sweeps.

v5 complete: durable queue (Stage 1) + VirtualLlmService rewire
(Stage 2) + async API & RBAC (Stage 3) + CLI/GC/smoke/docs (Stage 4).
michal added 1 commit 2026-04-28 14:25:35 +00:00
feat(cli+docs+smoke): inference-task CLI + GC ticker + smoke + docs (v5 Stage 4)
Some checks failed
CI/CD / lint (pull_request) Successful in 55s
CI/CD / test (pull_request) Successful in 1m12s
CI/CD / typecheck (pull_request) Successful in 2m46s
CI/CD / smoke (pull_request) Failing after 1m44s
CI/CD / build (pull_request) Failing after 7m0s
CI/CD / publish (pull_request) Has been skipped
7320b50dac
CLI surface for the durable queue:

- `mcpctl get tasks` — table view (ID, STATUS, POOL, LLM, MODEL,
  STREAM, AGE, WORKER). Aliases `task`, `tasks`, `inference-task`,
  `inference-tasks` all normalize to the canonical plural so URL
  construction works uniformly. RESOURCE_ALIASES + completions
  generator updated.
- `mcpctl chat-llm <name> --async -m <msg>` — enqueue and exit. stdout
  is just the task id (pipeable into `xargs mcpctl get task`); stderr
  carries human-readable status. REPL mode is rejected for --async
  (fire-and-forget doesn't make sense without -m).

GC ticker in mcpd: 5-min interval. Pending tasks past 1 h queue
timeout flip to error with a clear message; terminal tasks past 7 d
retention get deleted. Both queries are index-backed.

Crash fix uncovered by the smoke: when the async route doesn't await
ref.done, a later cancel/error rejected the in-flight Promise as
unhandled and crashed mcpd. The route now attaches a no-op `.catch`
so the legacy `done` semantic still works for sync callers (chat,
direct infer) without taking out the process for async ones. The
EnqueueInferOptions also gained an explicit `ownerId` field so the
async API can stamp the authenticated user on the row instead of
inheriting 'system' from the constructor's resolveOwner — without
this, every GET/DELETE from the original caller would 404 due to
foreign-owner mismatch.

Smoke (tests/smoke/inference-task.smoke.test.ts):

  1. POST /inference-tasks while no worker bound → row=pending.
  2. Bring a registrar online → bindSession drain claims and
     dispatches → worker complete()s → row=completed → GET returns
     the assistant body.
  3. Stop worker, enqueue, DELETE → row=cancelled, persisted.

docs/inference-tasks.md (new): full data model, lifecycle diagram,
async API reference, CLI examples, RBAC table, GC defaults, and the
v5 limitations / v6 roadmap. Cross-linked from virtual-llms.md and
agents.md.

Tests + smoke: mcpd 893/893, mcplocal 723/723, cli 437/437, full
smoke 146/146 (was 144, +2 new task smoke). Live mcpd verified via
manual curl: enqueue → cancel → re-fetch — no crash, owner scoping
returns 404 on foreign ids, GC ticker logs at info when it sweeps.

v5 complete: durable queue (Stage 1) + VirtualLlmService rewire
(Stage 2) + async API & RBAC (Stage 3) + CLI/GC/smoke/docs (Stage 4).
michal merged commit 46697f4f63 into main 2026-04-28 23:33:38 +00:00
Sign in to join this conversation.
No Reviewers
No Label
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: michal/mcpctl#70