feat(cli+docs+smoke): inference-task CLI + GC ticker + smoke + docs (v5 Stage 4)
Some checks failed
CI/CD / lint (pull_request) Successful in 55s
CI/CD / test (pull_request) Successful in 1m12s
CI/CD / typecheck (pull_request) Successful in 2m46s
CI/CD / smoke (pull_request) Failing after 1m44s
CI/CD / build (pull_request) Failing after 7m0s
CI/CD / publish (pull_request) Has been skipped

CLI surface for the durable queue:

- `mcpctl get tasks` — table view (ID, STATUS, POOL, LLM, MODEL,
  STREAM, AGE, WORKER). Aliases `task`, `tasks`, `inference-task`,
  `inference-tasks` all normalize to the canonical plural so URL
  construction works uniformly. RESOURCE_ALIASES + completions
  generator updated.
- `mcpctl chat-llm <name> --async -m <msg>` — enqueue and exit. stdout
  is just the task id (pipeable into `xargs mcpctl get task`); stderr
  carries human-readable status. REPL mode is rejected for --async
  (fire-and-forget doesn't make sense without -m).

GC ticker in mcpd: 5-min interval. Pending tasks past 1 h queue
timeout flip to error with a clear message; terminal tasks past 7 d
retention get deleted. Both queries are index-backed.

Crash fix uncovered by the smoke: when the async route doesn't await
ref.done, a later cancel/error rejected the in-flight Promise as
unhandled and crashed mcpd. The route now attaches a no-op `.catch`
so the legacy `done` semantic still works for sync callers (chat,
direct infer) without taking out the process for async ones. The
EnqueueInferOptions also gained an explicit `ownerId` field so the
async API can stamp the authenticated user on the row instead of
inheriting 'system' from the constructor's resolveOwner — without
this, every GET/DELETE from the original caller would 404 due to
foreign-owner mismatch.

Smoke (tests/smoke/inference-task.smoke.test.ts):

  1. POST /inference-tasks while no worker bound → row=pending.
  2. Bring a registrar online → bindSession drain claims and
     dispatches → worker complete()s → row=completed → GET returns
     the assistant body.
  3. Stop worker, enqueue, DELETE → row=cancelled, persisted.

docs/inference-tasks.md (new): full data model, lifecycle diagram,
async API reference, CLI examples, RBAC table, GC defaults, and the
v5 limitations / v6 roadmap. Cross-linked from virtual-llms.md and
agents.md.

Tests + smoke: mcpd 893/893, mcplocal 723/723, cli 437/437, full
smoke 146/146 (was 144, +2 new task smoke). Live mcpd verified via
manual curl: enqueue → cancel → re-fetch — no crash, owner scoping
returns 404 on foreign ids, GC ticker logs at info when it sweeps.

v5 complete: durable queue (Stage 1) + VirtualLlmService rewire
(Stage 2) + async API & RBAC (Stage 3) + CLI/GC/smoke/docs (Stage 4).
This commit is contained in:
Michal
2026-04-28 15:25:09 +01:00
parent 1dcfdc8b05
commit 7320b50dac
14 changed files with 654 additions and 27 deletions

View File

@@ -46,7 +46,12 @@ export function createChatLlmCommand(deps: ChatLlmCommandDeps): Command {
.option('--temperature <n>', 'Sampling temperature (0..2)', parseFloat)
.option('--max-tokens <n>', 'Maximum tokens in the assistant reply', parseFloatInt)
.option('--no-stream', 'Disable SSE streaming (single JSON response)')
.option('--async', 'Enqueue as a durable inference task and print the task id (does not wait for completion). Virtual Llms only. Poll with `mcpctl get task <id>`.')
.action(async (name: string, opts: ChatLlmOpts) => {
if (opts.async === true) {
await runAsync(deps, name, opts);
return;
}
await printHeader(deps, name, opts.system);
if (opts.message !== undefined) {
await runOneShot(deps, name, opts);
@@ -62,6 +67,38 @@ interface ChatLlmOpts {
temperature?: number;
maxTokens?: number;
stream?: boolean;
async?: boolean;
}
/**
* v5: enqueue a durable inference task and print the id. The caller
* exits without waiting; they can poll with `mcpctl get task <id>` or
* watch live with the SSE `/stream` endpoint. --async requires either
* -m or stdin (REPL doesn't make sense in fire-and-forget mode).
*/
async function runAsync(deps: ChatLlmCommandDeps, name: string, opts: ChatLlmOpts): Promise<void> {
const message = opts.message;
if (message === undefined || message === '') {
process.stderr.write('--async requires -m/--message (REPL mode is not supported for async tasks)\n');
process.exitCode = 1;
return;
}
const messages: Array<{ role: 'system' | 'user'; content: string }> = [];
if (opts.system !== undefined && opts.system !== '') {
messages.push({ role: 'system', content: opts.system });
}
messages.push({ role: 'user', content: message });
const request: Record<string, unknown> = { messages };
if (opts.temperature !== undefined) request.temperature = opts.temperature;
if (opts.maxTokens !== undefined) request.max_tokens = opts.maxTokens;
const created = await deps.client.post<{ id: string; status: string; poolName: string; llmName: string }>(
'/api/v1/inference-tasks',
{ llmName: name, request, streaming: opts.stream !== false },
);
// stdout is JUST the id so it's pipeable into other commands.
// Metadata goes to stderr so `mcpctl chat-llm X --async -m hi | xargs mcpctl describe task` works.
process.stdout.write(`${created.id}\n`);
process.stderr.write(`(task ${created.id} enqueued for pool '${created.poolName}', status=${created.status})\n`);
}
interface LlmInfo {

View File

@@ -155,6 +155,54 @@ const llmColumns: Column<LlmRow>[] = [
{ header: 'ID', key: 'id' },
];
/**
* v5 InferenceTask row shape from `GET /api/v1/inference-tasks`. We
* don't import the Prisma type to keep the CLI build independent of
* @prisma/client; the field set here mirrors what the API actually
* returns.
*/
interface InferenceTaskRow {
id: string;
status: 'pending' | 'claimed' | 'running' | 'completed' | 'error' | 'cancelled';
poolName: string;
llmName: string;
model: string;
tier: string | null;
claimedBy: string | null;
streaming: boolean;
createdAt: string;
claimedAt: string | null;
completedAt: string | null;
errorMessage: string | null;
ownerId: string;
agentId: string | null;
}
const inferenceTaskColumns: Column<InferenceTaskRow>[] = [
{ header: 'ID', key: 'id' },
{ header: 'STATUS', key: 'status', width: 10 },
{ header: 'POOL', key: 'poolName', width: 18 },
{ header: 'LLM', key: 'llmName', width: 20 },
{ header: 'MODEL', key: 'model', width: 24 },
{ header: 'STREAM', key: (r) => r.streaming ? 'yes' : '-', width: 7 },
// AGE is more useful than createdAt at the table level — operators
// are usually scanning for "what's stuck" rather than absolute time.
{ header: 'AGE', key: (r) => formatAge(r.createdAt), width: 8 },
{ header: 'WORKER', key: (r) => r.claimedBy ?? '-', width: 16 },
];
function formatAge(iso: string): string {
const ms = Date.now() - new Date(iso).getTime();
if (!Number.isFinite(ms) || ms < 0) return '-';
const sec = Math.floor(ms / 1000);
if (sec < 60) return `${String(sec)}s`;
const min = Math.floor(sec / 60);
if (min < 60) return `${String(min)}m`;
const hr = Math.floor(min / 60);
if (hr < 48) return `${String(hr)}h`;
return `${String(Math.floor(hr / 24))}d`;
}
interface AgentRow {
id: string;
name: string;
@@ -385,6 +433,8 @@ function getColumnsForResource(resource: string): Column<Record<string, unknown>
return agentColumns as unknown as Column<Record<string, unknown>>[];
case 'personalities':
return personalityColumns as unknown as Column<Record<string, unknown>>[];
case 'inference-tasks':
return inferenceTaskColumns as unknown as Column<Record<string, unknown>>[];
default:
return [
{ header: 'ID', key: 'id' as keyof Record<string, unknown> },

View File

@@ -42,6 +42,14 @@ export const RESOURCE_ALIASES: Record<string, string> = {
personalities: 'personalities',
thread: 'threads',
threads: 'threads',
// v5: durable inference task queue. URL prefix is `/api/v1/inference-tasks`
// (multi-word for clarity); the operator typically wants short
// forms like `mcpctl get tasks`. All variants normalize to the
// canonical plural so URL construction works uniformly.
task: 'inference-tasks',
tasks: 'inference-tasks',
'inference-task': 'inference-tasks',
'inference-tasks': 'inference-tasks',
all: 'all',
};