Compare commits
15 Commits
feat/virtu
...
f5bdeea8e7
| Author | SHA1 | Date | |
|---|---|---|---|
| f5bdeea8e7 | |||
|
|
1998b733b2 | ||
|
|
610808b9e7 | ||
|
|
58bc277242 | ||
|
|
c7b1bd8e2c | ||
|
|
9afd24a3aa | ||
| 9374a2652b | |||
|
|
18245be0c1 | ||
| 45c7737ee1 | |||
|
|
e0cfe0ba4d | ||
|
|
db839afc57 | ||
|
|
af0fabd84f | ||
| 700d1683c2 | |||
|
|
2a44f60785 | ||
| 65b6b265d9 |
@@ -1,3 +1,9 @@
|
||||
# syntax=docker/dockerfile:1.6
|
||||
# `# syntax=...` enables BuildKit's --mount feature on the builder so we can
|
||||
# share the pnpm content-addressed store across image builds. Without it the
|
||||
# next two RUN steps fall back to plain mode and the cache mount is ignored
|
||||
# (build still works, just slower).
|
||||
|
||||
# Stage 1: Build TypeScript
|
||||
FROM node:20-alpine AS builder
|
||||
|
||||
@@ -12,8 +18,12 @@ COPY src/db/package.json src/db/tsconfig.json src/db/
|
||||
COPY src/shared/package.json src/shared/tsconfig.json src/shared/
|
||||
COPY src/web/package.json src/web/tsconfig.json src/web/
|
||||
|
||||
# Install all dependencies
|
||||
RUN pnpm install --frozen-lockfile
|
||||
# Install all dependencies. The cache mount keeps pnpm's CAS store warm
|
||||
# across builds: only newly-changed packages get downloaded; everything
|
||||
# else hardlinks from the cache. Drops install from ~60s to <5s on a
|
||||
# warm cache. `--frozen-lockfile` still guarantees lockfile fidelity.
|
||||
RUN --mount=type=cache,id=pnpm-store-mcpd-builder,target=/root/.local/share/pnpm/store \
|
||||
pnpm install --frozen-lockfile
|
||||
|
||||
# Copy source code
|
||||
COPY src/mcpd/src/ src/mcpd/src/
|
||||
@@ -42,8 +52,11 @@ COPY src/mcpd/package.json src/mcpd/
|
||||
COPY src/db/package.json src/db/
|
||||
COPY src/shared/package.json src/shared/
|
||||
|
||||
# Install all deps (prisma CLI needed at runtime for db push)
|
||||
RUN pnpm install --frozen-lockfile
|
||||
# Install all deps (prisma CLI needed at runtime for db push). Same
|
||||
# cache-mount trick as the builder stage; separate cache id so the two
|
||||
# stages don't compete for the same lock.
|
||||
RUN --mount=type=cache,id=pnpm-store-mcpd-runtime,target=/root/.local/share/pnpm/store \
|
||||
pnpm install --frozen-lockfile
|
||||
|
||||
# Copy prisma schema and generate client
|
||||
COPY src/db/prisma/ src/db/prisma/
|
||||
|
||||
@@ -204,5 +204,9 @@ mcpctl chat reviewer
|
||||
- [virtual-llms.md](./virtual-llms.md) — local LLMs (e.g. `vllm-local`)
|
||||
publishing themselves into `mcpctl get llm` so anyone can chat with
|
||||
them via `mcpctl chat-llm <name>`. Inference is relayed through the
|
||||
publishing mcplocal — mcpd never holds the local URL or key.
|
||||
publishing mcplocal — mcpd never holds the local URL or key. **v3**
|
||||
extends the same publishing model to **virtual agents** declared in
|
||||
mcplocal config — they show up in `mcpctl get agent` with
|
||||
`KIND=virtual / STATUS=active` and become chat-able via
|
||||
`mcpctl chat <name>` like any other agent.
|
||||
- [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags.
|
||||
|
||||
@@ -97,11 +97,11 @@ route branches on it server-side.
|
||||
|
||||
## Lifecycle in detail
|
||||
|
||||
| State | What it means |
|
||||
|----------------|-----------------------------------------------------------------------|
|
||||
| `active` | Heartbeat received within the last 90 s and the SSE channel is open. |
|
||||
| State | What it means |
|
||||
|----------------|---------------------------------------------------------------------------------|
|
||||
| `active` | Heartbeat received within the last 90 s and the SSE channel is open. |
|
||||
| `inactive` | Either the SSE closed or the heartbeat watchdog tripped. Inference returns 503. |
|
||||
| `hibernating` | Reserved for v2 (wake-on-demand). v1 never writes this state. |
|
||||
| `hibernating` | Publisher is online but the backend is asleep; the next inference triggers a `wake` task before relaying. |
|
||||
|
||||
Two timers on mcpd run the GC sweep:
|
||||
|
||||
@@ -132,12 +132,154 @@ a finalized `CompletionResult`, not a token stream. Streaming requests
|
||||
therefore arrive at the caller as a single delta + `[DONE]`. Real
|
||||
per-token streaming is a v2 concern.
|
||||
|
||||
## Wake-on-demand (v2)
|
||||
|
||||
A provider whose backend hibernates (a vLLM instance that suspends
|
||||
when idle, an Ollama daemon that exits when nothing's connected, …)
|
||||
can declare a **wake recipe** in mcplocal config. When that provider's
|
||||
`isAvailable()` returns false at registrar startup, the row is
|
||||
published as `status=hibernating`. The next inference request that
|
||||
hits the row triggers the recipe and waits for the backend to come up
|
||||
before relaying.
|
||||
|
||||
Two recipe types:
|
||||
|
||||
```jsonc
|
||||
// HTTP — POST to a "wake controller" that starts the backend out of band.
|
||||
{
|
||||
"name": "vllm-local",
|
||||
"type": "openai",
|
||||
"model": "...",
|
||||
"publish": true,
|
||||
"wake": {
|
||||
"type": "http",
|
||||
"url": "http://10.0.0.50:9090/wake/vllm",
|
||||
"method": "POST",
|
||||
"headers": { "Authorization": "Bearer ..." },
|
||||
"maxWaitSeconds": 60
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```jsonc
|
||||
// command — spawn a local process (systemd, wakeonlan, custom script).
|
||||
{
|
||||
"name": "vllm-local",
|
||||
"type": "openai",
|
||||
"model": "...",
|
||||
"publish": true,
|
||||
"wake": {
|
||||
"type": "command",
|
||||
"command": "/usr/local/bin/start-vllm",
|
||||
"args": ["--profile", "qwen3"],
|
||||
"maxWaitSeconds": 120
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
How a request flows when the row is `hibernating`:
|
||||
|
||||
```
|
||||
client → mcpd POST /api/v1/llms/<name>/infer
|
||||
mcpd: status === hibernating → push wake task on SSE
|
||||
mcplocal: receive wake task → run recipe → poll isAvailable()
|
||||
→ heartbeat each tick → POST { ok: true } back
|
||||
mcpd: flip row → active, push the original infer task
|
||||
mcplocal: run inference → POST result back
|
||||
mcpd → client (forwards the inference result)
|
||||
```
|
||||
|
||||
Concurrent infers for the same hibernating Llm share a single wake
|
||||
task — only the first request triggers the recipe; later ones await
|
||||
the same in-flight wake promise. After the wake settles, every queued
|
||||
infer dispatches in order.
|
||||
|
||||
If the recipe fails (HTTP non-2xx, command exits non-zero, or the
|
||||
provider doesn't come up within `maxWaitSeconds`), every queued infer
|
||||
is rejected with a clear error and the row stays `hibernating` —
|
||||
the next request gets a fresh wake attempt.
|
||||
|
||||
## Virtual agents (v3)
|
||||
|
||||
Virtual agents extend the same publishing model to **agents** — named
|
||||
LLM personas with their own system prompt and sampling defaults. mcplocal
|
||||
declares them in its config alongside its providers, and the existing
|
||||
`_provider-register` endpoint atomically publishes both Llms and Agents
|
||||
in one round-trip. They show up under `mcpctl get agent` next to
|
||||
manually-created public agents and become chat-able via
|
||||
`mcpctl chat <agent>` — no special command.
|
||||
|
||||
### Declaring a virtual agent in mcplocal config
|
||||
|
||||
```jsonc
|
||||
// ~/.mcpctl/config.json
|
||||
{
|
||||
"llm": {
|
||||
"providers": [
|
||||
{ "name": "vllm-local", "type": "vllm", "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", "publish": true }
|
||||
]
|
||||
},
|
||||
"agents": [
|
||||
{
|
||||
"name": "local-coder",
|
||||
"llm": "vllm-local",
|
||||
"description": "Local coding assistant on the workstation GPU",
|
||||
"systemPrompt": "You are a senior engineer. Be terse.",
|
||||
"defaultParams": { "temperature": 0.2 }
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
`llm` references a published provider's name from the same config. Agents
|
||||
pinned to a name that isn't being published are still forwarded to mcpd —
|
||||
the server validates `llmName` and 404s with a clear message if it's
|
||||
genuinely missing, which lets you point at a *public* Llm if you want.
|
||||
|
||||
### Lifecycle
|
||||
|
||||
Same shape as virtual Llms — 30 s heartbeat from mcplocal, 90 s
|
||||
heartbeat-stale → status flips to `inactive`, 4 h inactive → row deleted
|
||||
by mcpd's GC sweep. Heartbeats cover both Llms and Agents owned by the
|
||||
session.
|
||||
|
||||
The GC orders agent deletes **before** their pinned virtual Llm so the
|
||||
`Agent.llmId onDelete: Restrict` FK doesn't block the sweep.
|
||||
|
||||
### Listing
|
||||
|
||||
```sh
|
||||
$ mcpctl get agents
|
||||
NAME KIND STATUS LLM PROJECT DESCRIPTION
|
||||
local-coder virtual active vllm-local - Local coding assistant on…
|
||||
reviewer public active qwen3-thinking mcpctl-development I review what you're shipping…
|
||||
```
|
||||
|
||||
The `KIND` and `STATUS` columns are the v3 additions. Round-tripping
|
||||
through `mcpctl get agent X -o yaml | mcpctl apply -f -` strips those
|
||||
runtime fields cleanly so a virtual agent can be re-declared as a public
|
||||
one (or vice versa) without manual editing.
|
||||
|
||||
### Chatting
|
||||
|
||||
```sh
|
||||
$ mcpctl chat local-coder
|
||||
> hello?
|
||||
… streams through mcpd → SSE → mcplocal's vllm-local provider …
|
||||
```
|
||||
|
||||
Same command as for public agents. Works because chat.service has a
|
||||
`kind=virtual` branch that hands off to `VirtualLlmService.enqueueInferTask`
|
||||
when the agent's pinned Llm is virtual.
|
||||
|
||||
### Cluster-wide name uniqueness
|
||||
|
||||
`Agent.name` is unique cluster-wide. Two mcplocals trying to publish the
|
||||
same agent name collide on the second register with HTTP 409. Per-publisher
|
||||
namespacing is a v4+ concern — same constraint as virtual Llms in v1.
|
||||
|
||||
## Roadmap (later stages)
|
||||
|
||||
- **v2 — Wake-on-demand**: Secret-stored "wake recipe" so mcpd can ask
|
||||
mcplocal to start a hibernating backend before sending inference.
|
||||
- **v3 — Virtual agents**: mcplocal publishes its local agent configs
|
||||
(model + system prompt + sampling defaults) into mcpd's `Agent` table.
|
||||
- **v4 — LB pool by model**: agents can target a model name instead of
|
||||
a specific Llm; mcpd picks the healthiest pool member per request.
|
||||
- **v5 — Task queue**: persisted requests for hibernating/saturated
|
||||
@@ -146,18 +288,23 @@ per-token streaming is a v2 concern.
|
||||
## API surface (v1)
|
||||
|
||||
```
|
||||
POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[] }
|
||||
POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[], agents[] }
|
||||
v3: body accepts an optional `agents[]` array
|
||||
alongside `providers[]`. Atomic publish; older
|
||||
clients (providers-only) keep working.
|
||||
GET /api/v1/llms/_provider-stream → SSE channel; require x-mcpctl-provider-session header
|
||||
POST /api/v1/llms/_provider-heartbeat → { providerSessionId }
|
||||
POST /api/v1/llms/_provider-heartbeat → { providerSessionId } — bumps both Llms and Agents
|
||||
owned by the session
|
||||
POST /api/v1/llms/_provider-task/:id/result
|
||||
→ one of:
|
||||
{ error: "msg" }
|
||||
{ chunk: { data, done? } }
|
||||
{ status, body }
|
||||
|
||||
GET /api/v1/llms → list (now includes kind, status, lastHeartbeatAt, inactiveSince)
|
||||
GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince)
|
||||
POST /api/v1/llms/<virtual>/infer → routes through the SSE relay
|
||||
DELETE /api/v1/llms/<virtual> → delete unconditionally (also runs GC's job)
|
||||
GET /api/v1/agents → list (v3: includes kind, status, lastHeartbeatAt, inactiveSince)
|
||||
```
|
||||
|
||||
RBAC piggybacks on `view/edit/create:llms` — no new resource. Publishing
|
||||
|
||||
@@ -151,7 +151,10 @@ async function runOneShot(
|
||||
const sec = Math.max(0.05, (Date.now() - startMs) / 1000);
|
||||
const words = (res.assistant.match(/\S+/g) ?? []).length;
|
||||
process.stdout.write(`${res.assistant}\n`);
|
||||
process.stderr.write(styleStats(`(${String(words)}w · ${(words / sec).toFixed(1)} w/s · ${sec.toFixed(1)}s)`) + ` thread:${res.threadId}\n`);
|
||||
// `thread: <id>` — single space after the colon, matching the streaming
|
||||
// path (line 160 below) so any tooling/regex that watches one form picks
|
||||
// up the other too.
|
||||
process.stderr.write(styleStats(`(${String(words)}w · ${(words / sec).toFixed(1)} w/s · ${sec.toFixed(1)}s)`) + ` thread: ${res.threadId}\n`);
|
||||
return;
|
||||
}
|
||||
const bar = installStatusBar();
|
||||
|
||||
@@ -155,10 +155,17 @@ interface AgentRow {
|
||||
description: string;
|
||||
llm: { id: string; name: string };
|
||||
project: { id: string; name: string } | null;
|
||||
// v3: lifecycle fields. Public agents have kind=public/status=active and
|
||||
// these never change — virtuals get them set/updated by mcpd's
|
||||
// AgentService as the publishing mcplocal heartbeats and disconnects.
|
||||
kind?: 'public' | 'virtual';
|
||||
status?: 'active' | 'inactive';
|
||||
}
|
||||
|
||||
const agentColumns: Column<AgentRow>[] = [
|
||||
{ header: 'NAME', key: 'name' },
|
||||
{ header: 'KIND', key: (r) => r.kind ?? 'public', width: 8 },
|
||||
{ header: 'STATUS', key: (r) => r.status ?? 'active', width: 10 },
|
||||
{ header: 'LLM', key: (r) => r.llm.name, width: 24 },
|
||||
{ header: 'PROJECT', key: (r) => r.project?.name ?? '-', width: 20 },
|
||||
{ header: 'DESCRIPTION', key: (r) => truncate(r.description, 50) || '-', width: 50 },
|
||||
@@ -408,6 +415,28 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string }
|
||||
const kind = RESOURCE_KIND[resource] ?? resource;
|
||||
return items.map((item) => {
|
||||
const cleaned = stripInternalFields(item as Record<string, unknown>);
|
||||
// Llm-specific: the virtual-provider lifecycle fields collide with the
|
||||
// apply-doc `kind` envelope (the schema uses `kind: public|virtual`)
|
||||
// and aren't apply-able anyway — they're derived runtime state managed
|
||||
// by VirtualLlmService. Drop them so YAML round-trips stay clean.
|
||||
if (resource === 'llms') {
|
||||
delete cleaned['kind'];
|
||||
delete cleaned['status'];
|
||||
delete cleaned['lastHeartbeatAt'];
|
||||
delete cleaned['inactiveSince'];
|
||||
delete cleaned['providerSessionId'];
|
||||
}
|
||||
// Agent-specific: same shape as Llm — Agent gained kind/status/etc. in
|
||||
// v3 Stage 1 (virtual agent lifecycle) and the schema-`kind` field
|
||||
// shadows the apply-envelope `kind: agent`. Strip the same set so
|
||||
// `get agent X -o yaml | apply -f -` round-trips without diff.
|
||||
if (resource === 'agents') {
|
||||
delete cleaned['kind'];
|
||||
delete cleaned['status'];
|
||||
delete cleaned['lastHeartbeatAt'];
|
||||
delete cleaned['inactiveSince'];
|
||||
delete cleaned['providerSessionId'];
|
||||
}
|
||||
return { kind, ...cleaned };
|
||||
});
|
||||
}
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the
|
||||
-- existing LlmKind / LlmStatus enums so we don't double-define them.
|
||||
-- Existing rows backfill with kind='public' / status='active' so
|
||||
-- nothing changes for manually-created agents.
|
||||
|
||||
ALTER TABLE "Agent"
|
||||
ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public',
|
||||
ADD COLUMN "providerSessionId" TEXT,
|
||||
ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3),
|
||||
ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active',
|
||||
ADD COLUMN "inactiveSince" TIMESTAMP(3);
|
||||
|
||||
CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status");
|
||||
CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId");
|
||||
@@ -469,20 +469,26 @@ model BackupPending {
|
||||
// Per-call LiteLLM-style overrides stack on top of `defaultParams`.
|
||||
|
||||
model Agent {
|
||||
id String @id @default(cuid())
|
||||
name String @unique
|
||||
description String @default("") // shown in MCP tools/list
|
||||
systemPrompt String @default("") @db.Text // agent persona
|
||||
id String @id @default(cuid())
|
||||
name String @unique
|
||||
description String @default("") // shown in MCP tools/list
|
||||
systemPrompt String @default("") @db.Text // agent persona
|
||||
llmId String
|
||||
projectId String?
|
||||
defaultPersonalityId String? // applied at chat time when no --personality flag
|
||||
proxyModelName String? // optional informational override
|
||||
defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ...
|
||||
extras Json @default("{}") // future LoRA / tool-allowlist
|
||||
defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ...
|
||||
extras Json @default("{}") // future LoRA / tool-allowlist
|
||||
// ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ──
|
||||
kind LlmKind @default(public)
|
||||
providerSessionId String? // mcplocal session that owns this row when virtual
|
||||
lastHeartbeatAt DateTime?
|
||||
status LlmStatus @default(active)
|
||||
inactiveSince DateTime?
|
||||
ownerId String
|
||||
version Int @default(1)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
version Int @default(1)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
llm Llm @relation(fields: [llmId], references: [id], onDelete: Restrict)
|
||||
project Project? @relation(fields: [projectId], references: [id], onDelete: SetNull)
|
||||
@@ -497,6 +503,8 @@ model Agent {
|
||||
@@index([projectId])
|
||||
@@index([ownerId])
|
||||
@@index([defaultPersonalityId])
|
||||
@@index([kind, status])
|
||||
@@index([providerSessionId])
|
||||
}
|
||||
|
||||
// ── Personalities (named overlay bundles of prompts on top of an Agent) ──
|
||||
|
||||
@@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => {
|
||||
expect(reloaded?.defaultPersonalityId).toBeNull();
|
||||
});
|
||||
|
||||
// ── v3: Agent.kind virtual + lifecycle fields ──
|
||||
|
||||
it('defaults a freshly inserted Agent to kind=public, status=active', async () => {
|
||||
const user = await makeUser();
|
||||
const llm = await makeLlm('llm-default-kind');
|
||||
const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id });
|
||||
expect(agent.kind).toBe('public');
|
||||
expect(agent.status).toBe('active');
|
||||
expect(agent.providerSessionId).toBeNull();
|
||||
expect(agent.lastHeartbeatAt).toBeNull();
|
||||
expect(agent.inactiveSince).toBeNull();
|
||||
});
|
||||
|
||||
it('persists kind=virtual + lifecycle fields together', async () => {
|
||||
const user = await makeUser();
|
||||
const llm = await makeLlm('llm-pub-virtual');
|
||||
const now = new Date();
|
||||
const agent = await prisma.agent.create({
|
||||
data: {
|
||||
name: 'local-coder',
|
||||
llmId: llm.id,
|
||||
ownerId: user.id,
|
||||
kind: 'virtual',
|
||||
providerSessionId: 'sess-abc',
|
||||
lastHeartbeatAt: now,
|
||||
status: 'active',
|
||||
},
|
||||
});
|
||||
expect(agent.kind).toBe('virtual');
|
||||
expect(agent.providerSessionId).toBe('sess-abc');
|
||||
expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime());
|
||||
});
|
||||
|
||||
it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => {
|
||||
const user = await makeUser();
|
||||
const llm = await makeLlm('llm-gc-agent');
|
||||
await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } });
|
||||
await prisma.agent.create({
|
||||
data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' },
|
||||
});
|
||||
await prisma.agent.create({
|
||||
data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() },
|
||||
});
|
||||
|
||||
const stale = await prisma.agent.findMany({
|
||||
where: { kind: 'virtual', status: 'inactive' },
|
||||
select: { name: true },
|
||||
});
|
||||
expect(stale.map((a) => a.name)).toEqual(['v-inactive']);
|
||||
});
|
||||
|
||||
it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => {
|
||||
const user = await makeUser();
|
||||
const llm = await makeLlm('llm-sess-cascade');
|
||||
await prisma.agent.create({
|
||||
data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
|
||||
});
|
||||
await prisma.agent.create({
|
||||
data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
|
||||
});
|
||||
await prisma.agent.create({
|
||||
data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' },
|
||||
});
|
||||
|
||||
const owned = await prisma.agent.findMany({
|
||||
where: { providerSessionId: 'shared' },
|
||||
select: { name: true },
|
||||
orderBy: { name: 'asc' },
|
||||
});
|
||||
expect(owned.map((a) => a.name)).toEqual(['a', 'b']);
|
||||
});
|
||||
|
||||
it('binds the same prompt to multiple personalities of an agent', async () => {
|
||||
const user = await makeUser();
|
||||
const llm = await makeLlm('llm-shared-prompt');
|
||||
|
||||
@@ -435,10 +435,8 @@ async function main(): Promise<void> {
|
||||
adapters: llmAdapters,
|
||||
log: { warn: (msg) => app.log.warn(msg) },
|
||||
});
|
||||
// Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker
|
||||
// is started below after `app.listen` so it doesn't fire before the
|
||||
// server is accepting traffic.
|
||||
const virtualLlmService = new VirtualLlmService(llmRepo);
|
||||
// VirtualLlmService is constructed lower down (after AgentService) so
|
||||
// it can wire the agent-cascade callbacks introduced in v3 Stage 2.
|
||||
// AgentService + ChatService get fully wired below once projectService and
|
||||
// mcpProxyService are constructed (ChatService needs them via the
|
||||
// ChatToolDispatcher bridge).
|
||||
@@ -465,6 +463,10 @@ async function main(): Promise<void> {
|
||||
const personalityRepo = new PersonalityRepository(prisma);
|
||||
const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo);
|
||||
const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo);
|
||||
// Virtual-provider state machine (kind=virtual rows for both Llms and
|
||||
// Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade.
|
||||
// The 60-s GC ticker is started below after `app.listen`.
|
||||
const virtualLlmService = new VirtualLlmService(llmRepo, agentService);
|
||||
// ChatService needs the proxy + project repo via the ChatToolDispatcher
|
||||
// bridge. The dispatcher's logger references `app.log`, which is not
|
||||
// constructed until further down — `chatService` itself is built right
|
||||
@@ -607,6 +609,7 @@ async function main(): Promise<void> {
|
||||
promptRepo,
|
||||
chatToolDispatcher,
|
||||
personalityRepo,
|
||||
virtualLlmService,
|
||||
);
|
||||
registerAgentChatRoutes(app, chatService);
|
||||
registerLlmInferRoutes(app, {
|
||||
@@ -627,7 +630,7 @@ async function main(): Promise<void> {
|
||||
});
|
||||
},
|
||||
});
|
||||
registerVirtualLlmRoutes(app, virtualLlmService);
|
||||
registerVirtualLlmRoutes(app, virtualLlmService, agentService);
|
||||
registerInstanceRoutes(app, instanceService);
|
||||
registerProjectRoutes(app, projectService);
|
||||
registerAuditLogRoutes(app, auditLogService);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { PrismaClient, Agent, Prisma } from '@prisma/client';
|
||||
import type { PrismaClient, Agent, Prisma, LlmKind, LlmStatus } from '@prisma/client';
|
||||
|
||||
export interface CreateAgentRepoInput {
|
||||
name: string;
|
||||
@@ -11,6 +11,12 @@ export interface CreateAgentRepoInput {
|
||||
defaultParams?: Record<string, unknown>;
|
||||
extras?: Record<string, unknown>;
|
||||
ownerId: string;
|
||||
// Virtual-agent lifecycle (omit for kind=public).
|
||||
kind?: LlmKind;
|
||||
providerSessionId?: string | null;
|
||||
status?: LlmStatus;
|
||||
lastHeartbeatAt?: Date | null;
|
||||
inactiveSince?: Date | null;
|
||||
}
|
||||
|
||||
export interface UpdateAgentRepoInput {
|
||||
@@ -22,6 +28,13 @@ export interface UpdateAgentRepoInput {
|
||||
proxyModelName?: string | null;
|
||||
defaultParams?: Record<string, unknown>;
|
||||
extras?: Record<string, unknown>;
|
||||
// Virtual-agent lifecycle. AgentService is the only public writer; the
|
||||
// VirtualAgentService methods (Stage 2) bypass the public CRUD path.
|
||||
kind?: LlmKind;
|
||||
providerSessionId?: string | null;
|
||||
status?: LlmStatus;
|
||||
lastHeartbeatAt?: Date | null;
|
||||
inactiveSince?: Date | null;
|
||||
}
|
||||
|
||||
export interface IAgentRepository {
|
||||
@@ -32,6 +45,11 @@ export interface IAgentRepository {
|
||||
create(data: CreateAgentRepoInput): Promise<Agent>;
|
||||
update(id: string, data: UpdateAgentRepoInput): Promise<Agent>;
|
||||
delete(id: string): Promise<void>;
|
||||
// Virtual-agent lifecycle helpers.
|
||||
findBySessionId(sessionId: string): Promise<Agent[]>;
|
||||
findByLlmId(llmId: string): Promise<Agent[]>;
|
||||
findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]>;
|
||||
findExpiredInactives(deletionCutoff: Date): Promise<Agent[]>;
|
||||
}
|
||||
|
||||
export class AgentRepository implements IAgentRepository {
|
||||
@@ -69,6 +87,11 @@ export class AgentRepository implements IAgentRepository {
|
||||
defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue,
|
||||
extras: (data.extras ?? {}) as Prisma.InputJsonValue,
|
||||
ownerId: data.ownerId,
|
||||
...(data.kind !== undefined ? { kind: data.kind } : {}),
|
||||
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
|
||||
...(data.status !== undefined ? { status: data.status } : {}),
|
||||
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
|
||||
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -99,6 +122,11 @@ export class AgentRepository implements IAgentRepository {
|
||||
if (data.extras !== undefined) {
|
||||
updateData.extras = data.extras as Prisma.InputJsonValue;
|
||||
}
|
||||
if (data.kind !== undefined) updateData.kind = data.kind;
|
||||
if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId;
|
||||
if (data.status !== undefined) updateData.status = data.status;
|
||||
if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt;
|
||||
if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince;
|
||||
// Bump optimistic version on every update.
|
||||
updateData.version = { increment: 1 };
|
||||
return this.prisma.agent.update({ where: { id }, data: updateData });
|
||||
@@ -107,4 +135,40 @@ export class AgentRepository implements IAgentRepository {
|
||||
async delete(id: string): Promise<void> {
|
||||
await this.prisma.agent.delete({ where: { id } });
|
||||
}
|
||||
|
||||
// ── Virtual-agent lifecycle queries ──
|
||||
|
||||
async findBySessionId(sessionId: string): Promise<Agent[]> {
|
||||
return this.prisma.agent.findMany({
|
||||
where: { providerSessionId: sessionId },
|
||||
orderBy: { name: 'asc' },
|
||||
});
|
||||
}
|
||||
|
||||
async findByLlmId(llmId: string): Promise<Agent[]> {
|
||||
return this.prisma.agent.findMany({
|
||||
where: { llmId },
|
||||
orderBy: { name: 'asc' },
|
||||
});
|
||||
}
|
||||
|
||||
async findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]> {
|
||||
return this.prisma.agent.findMany({
|
||||
where: {
|
||||
kind: 'virtual',
|
||||
status: 'active',
|
||||
lastHeartbeatAt: { lt: heartbeatCutoff },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async findExpiredInactives(deletionCutoff: Date): Promise<Agent[]> {
|
||||
return this.prisma.agent.findMany({
|
||||
where: {
|
||||
kind: 'virtual',
|
||||
status: 'inactive',
|
||||
inactiveSince: { lt: deletionCutoff },
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
*/
|
||||
import type { FastifyInstance, FastifyReply } from 'fastify';
|
||||
import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js';
|
||||
import type { AgentService, VirtualAgentInput } from '../services/agent.service.js';
|
||||
|
||||
const SSE_PING_MS = 20_000;
|
||||
const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session';
|
||||
@@ -24,8 +25,15 @@ const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session';
|
||||
export function registerVirtualLlmRoutes(
|
||||
app: FastifyInstance,
|
||||
service: VirtualLlmService,
|
||||
/**
|
||||
* Optional. v3 wires AgentService here so the register endpoint can
|
||||
* also accept an `agents` array alongside `providers` and atomic-publish
|
||||
* both. Absent (older test wirings): the route still works for Llm-only
|
||||
* publishers, agents in the payload are ignored with a warning.
|
||||
*/
|
||||
agentService?: AgentService,
|
||||
): void {
|
||||
app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>(
|
||||
app.post<{ Body: { providerSessionId?: string; providers?: unknown[]; agents?: unknown[] } }>(
|
||||
'/api/v1/llms/_provider-register',
|
||||
async (request, reply) => {
|
||||
const body = (request.body ?? {});
|
||||
@@ -34,14 +42,29 @@ export function registerVirtualLlmRoutes(
|
||||
reply.code(400);
|
||||
return { error: '`providers` array is required and must be non-empty' };
|
||||
}
|
||||
const agentsInput = Array.isArray(body.agents) ? body.agents : null;
|
||||
|
||||
try {
|
||||
const result = await service.register({
|
||||
providerSessionId: body.providerSessionId ?? null,
|
||||
providers: providers.map(coerceProviderInput),
|
||||
});
|
||||
// v3: atomically publish virtual agents tied to the same session.
|
||||
// If the caller didn't include an agents array, skip silently.
|
||||
let agents: unknown[] = [];
|
||||
if (agentsInput !== null && agentsInput.length > 0) {
|
||||
if (agentService === undefined) {
|
||||
app.log.warn('virtual-llm register received `agents` but AgentService is not wired');
|
||||
} else {
|
||||
agents = await agentService.registerVirtualAgents(
|
||||
result.providerSessionId,
|
||||
agentsInput.map(coerceAgentInput),
|
||||
request.userId ?? 'system',
|
||||
);
|
||||
}
|
||||
}
|
||||
reply.code(201);
|
||||
return result;
|
||||
return { ...result, agents };
|
||||
} catch (err) {
|
||||
const status = (err as { statusCode?: number }).statusCode ?? 500;
|
||||
reply.code(status);
|
||||
@@ -142,6 +165,33 @@ export function registerVirtualLlmRoutes(
|
||||
);
|
||||
}
|
||||
|
||||
/** Narrow an unknown agents array element into the service's input shape (v3). */
|
||||
function coerceAgentInput(raw: unknown): VirtualAgentInput {
|
||||
if (raw === null || typeof raw !== 'object') {
|
||||
throw Object.assign(new Error('agent entry must be an object'), { statusCode: 400 });
|
||||
}
|
||||
const o = raw as Record<string, unknown>;
|
||||
const name = o['name'];
|
||||
const llmName = o['llmName'];
|
||||
if (typeof name !== 'string' || typeof llmName !== 'string') {
|
||||
throw Object.assign(
|
||||
new Error('agent entry requires string `name` and `llmName`'),
|
||||
{ statusCode: 400 },
|
||||
);
|
||||
}
|
||||
const out: VirtualAgentInput = { name, llmName };
|
||||
if (typeof o['description'] === 'string') out.description = o['description'];
|
||||
if (typeof o['systemPrompt'] === 'string') out.systemPrompt = o['systemPrompt'];
|
||||
if (typeof o['project'] === 'string') out.project = o['project'];
|
||||
if (o['defaultParams'] !== null && typeof o['defaultParams'] === 'object') {
|
||||
out.defaultParams = o['defaultParams'] as Record<string, unknown>;
|
||||
}
|
||||
if (o['extras'] !== null && typeof o['extras'] === 'object') {
|
||||
out.extras = o['extras'] as Record<string, unknown>;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/** Narrow an unknown providers array element into the service's input shape. */
|
||||
function coerceProviderInput(raw: unknown): {
|
||||
name: string;
|
||||
@@ -150,6 +200,7 @@ function coerceProviderInput(raw: unknown): {
|
||||
tier?: string;
|
||||
description?: string;
|
||||
extraConfig?: Record<string, unknown>;
|
||||
initialStatus?: 'active' | 'hibernating';
|
||||
} {
|
||||
if (raw === null || typeof raw !== 'object') {
|
||||
throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 });
|
||||
@@ -170,5 +221,11 @@ function coerceProviderInput(raw: unknown): {
|
||||
if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') {
|
||||
out.extraConfig = o['extraConfig'] as Record<string, unknown>;
|
||||
}
|
||||
// Only accept the two values v2 actually defines. Anything else falls
|
||||
// through to the service default (active) — matches v1 publishers that
|
||||
// don't know about this field.
|
||||
if (o['initialStatus'] === 'active' || o['initialStatus'] === 'hibernating') {
|
||||
out.initialStatus = o['initialStatus'];
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
@@ -33,12 +33,28 @@ export interface AgentView {
|
||||
proxyModelName: string | null;
|
||||
defaultParams: AgentChatParams;
|
||||
extras: Record<string, unknown>;
|
||||
// Virtual-agent lifecycle (defaults make public agents look like "active").
|
||||
kind: 'public' | 'virtual';
|
||||
status: 'active' | 'inactive' | 'hibernating';
|
||||
lastHeartbeatAt: Date | null;
|
||||
inactiveSince: Date | null;
|
||||
ownerId: string;
|
||||
version: number;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
}
|
||||
|
||||
/** Input shape mcplocal sends per virtual agent on register. */
|
||||
export interface VirtualAgentInput {
|
||||
name: string;
|
||||
llmName: string;
|
||||
description?: string;
|
||||
systemPrompt?: string;
|
||||
project?: string;
|
||||
defaultParams?: Record<string, unknown>;
|
||||
extras?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export class AgentService {
|
||||
constructor(
|
||||
private readonly repo: IAgentRepository,
|
||||
@@ -179,10 +195,162 @@ export class AgentService {
|
||||
proxyModelName: row.proxyModelName,
|
||||
defaultParams: row.defaultParams as AgentChatParams,
|
||||
extras: row.extras as Record<string, unknown>,
|
||||
kind: row.kind,
|
||||
status: row.status,
|
||||
lastHeartbeatAt: row.lastHeartbeatAt,
|
||||
inactiveSince: row.inactiveSince,
|
||||
ownerId: row.ownerId,
|
||||
version: row.version,
|
||||
createdAt: row.createdAt,
|
||||
updatedAt: row.updatedAt,
|
||||
};
|
||||
}
|
||||
|
||||
// ── Virtual-agent lifecycle (v3) ──
|
||||
|
||||
/**
|
||||
* Sticky upsert of virtual agents owned by a publishing mcplocal session.
|
||||
* Mirrors VirtualLlmService.register's semantics:
|
||||
* - New agents → insert with kind=virtual / status=active.
|
||||
* - Existing virtual agents owned by the same session → update + reactivate.
|
||||
* - Existing virtual agents owned by a different session, but currently
|
||||
* inactive → adopt (sticky reconnect after a session lapse).
|
||||
* - Existing public agents OR foreign-active virtuals → 409 Conflict.
|
||||
* - Pinned LLM must already exist (publisher posts Llms first in the same
|
||||
* register payload).
|
||||
*/
|
||||
async registerVirtualAgents(
|
||||
sessionId: string,
|
||||
inputs: VirtualAgentInput[],
|
||||
ownerId: string,
|
||||
): Promise<AgentView[]> {
|
||||
const now = new Date();
|
||||
const out: AgentView[] = [];
|
||||
for (const a of inputs) {
|
||||
const llm = await this.llms.getByName(a.llmName);
|
||||
const projectId = a.project !== undefined
|
||||
? (await this.projects.resolveAndGet(a.project)).id
|
||||
: null;
|
||||
const existing = await this.repo.findByName(a.name);
|
||||
if (existing !== null) {
|
||||
if (existing.kind === 'public') {
|
||||
throw Object.assign(
|
||||
new Error(`Cannot publish over public Agent: ${a.name}`),
|
||||
{ statusCode: 409 },
|
||||
);
|
||||
}
|
||||
if (existing.providerSessionId !== sessionId && existing.status === 'active') {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Agent '${a.name}' is already active under a different session`),
|
||||
{ statusCode: 409 },
|
||||
);
|
||||
}
|
||||
const updated = await this.repo.update(existing.id, {
|
||||
...(a.description !== undefined ? { description: a.description } : {}),
|
||||
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
|
||||
llmId: llm.id,
|
||||
projectId,
|
||||
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
|
||||
...(a.extras !== undefined ? { extras: a.extras } : {}),
|
||||
kind: 'virtual',
|
||||
providerSessionId: sessionId,
|
||||
status: 'active',
|
||||
lastHeartbeatAt: now,
|
||||
inactiveSince: null,
|
||||
});
|
||||
out.push(await this.toView(updated));
|
||||
continue;
|
||||
}
|
||||
const created = await this.repo.create({
|
||||
name: a.name,
|
||||
...(a.description !== undefined ? { description: a.description } : {}),
|
||||
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
|
||||
llmId: llm.id,
|
||||
projectId,
|
||||
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
|
||||
...(a.extras !== undefined ? { extras: a.extras } : {}),
|
||||
kind: 'virtual',
|
||||
providerSessionId: sessionId,
|
||||
status: 'active',
|
||||
lastHeartbeatAt: now,
|
||||
ownerId,
|
||||
});
|
||||
out.push(await this.toView(created));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Bumps lastHeartbeatAt on every virtual agent owned by the session.
|
||||
* Revives inactive rows. Called from VirtualLlmService.heartbeat so
|
||||
* one publisher heartbeat covers both Llms and Agents.
|
||||
*/
|
||||
async heartbeatVirtualAgents(sessionId: string): Promise<void> {
|
||||
const owned = await this.repo.findBySessionId(sessionId);
|
||||
if (owned.length === 0) return;
|
||||
const now = new Date();
|
||||
for (const row of owned) {
|
||||
await this.repo.update(row.id, {
|
||||
lastHeartbeatAt: now,
|
||||
...(row.status === 'inactive' ? { status: 'active', inactiveSince: null } : {}),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/** Flip every virtual agent owned by the session to inactive immediately. */
|
||||
async markVirtualAgentsInactiveBySession(sessionId: string): Promise<void> {
|
||||
const owned = await this.repo.findBySessionId(sessionId);
|
||||
const now = new Date();
|
||||
for (const row of owned) {
|
||||
if (row.status === 'active') {
|
||||
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cascade-delete virtual agents pinned to a virtual Llm. Called from
|
||||
* VirtualLlmService.gcSweep BEFORE deleting the inactive Llm row, since
|
||||
* Agent.llmId is `onDelete: Restrict` and would otherwise block the
|
||||
* Llm delete.
|
||||
*/
|
||||
async deleteVirtualAgentsForLlm(llmId: string): Promise<number> {
|
||||
const pinned = await this.repo.findByLlmId(llmId);
|
||||
let deleted = 0;
|
||||
for (const row of pinned) {
|
||||
if (row.kind !== 'virtual') continue;
|
||||
await this.repo.delete(row.id);
|
||||
deleted += 1;
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/**
|
||||
* GC sweep for virtual agents — same shape as VirtualLlmService.gcSweep:
|
||||
* 1. Heartbeat-stale active virtuals → inactive (90-s cutoff).
|
||||
* 2. 4-h-old inactive virtuals → delete.
|
||||
* Run BEFORE the LlmService GC sweep so any agent that would have
|
||||
* blocked an Llm delete via Restrict has already been cleared.
|
||||
*/
|
||||
async gcSweepVirtualAgents(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> {
|
||||
const HEARTBEAT_TIMEOUT_MS = 90_000;
|
||||
const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000;
|
||||
let markedInactive = 0;
|
||||
let deleted = 0;
|
||||
|
||||
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
|
||||
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
|
||||
for (const row of stale) {
|
||||
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
|
||||
markedInactive += 1;
|
||||
}
|
||||
|
||||
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
|
||||
const expired = await this.repo.findExpiredInactives(deletionCutoff);
|
||||
for (const row of expired) {
|
||||
await this.repo.delete(row.id);
|
||||
deleted += 1;
|
||||
}
|
||||
return { markedInactive, deleted };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import type {
|
||||
} from '../repositories/chat.repository.js';
|
||||
import type { IPromptRepository } from '../repositories/prompt.repository.js';
|
||||
import type { IPersonalityRepository } from '../repositories/personality.repository.js';
|
||||
import type { IVirtualLlmService } from './virtual-llm.service.js';
|
||||
import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js';
|
||||
import type { AgentChatParams } from '../validation/agent.schema.js';
|
||||
import { NotFoundError } from './mcp-server.service.js';
|
||||
@@ -132,6 +133,14 @@ export class ChatService {
|
||||
private readonly promptRepo: IPromptRepository,
|
||||
private readonly tools: ChatToolDispatcher,
|
||||
private readonly personalities?: IPersonalityRepository,
|
||||
/**
|
||||
* v3: when an Agent is pinned to a `kind=virtual` Llm, inference is
|
||||
* relayed via this service rather than an HTTP adapter (the virtual
|
||||
* row has no upstream URL). Optional so older test wirings still
|
||||
* compile; in those tests the chat path will refuse virtual Llms
|
||||
* with a clear error.
|
||||
*/
|
||||
private readonly virtualLlms?: IVirtualLlmService,
|
||||
) {}
|
||||
|
||||
async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> {
|
||||
@@ -170,19 +179,16 @@ export class ChatService {
|
||||
let lastTurnIndex = ctx.startingTurnIndex;
|
||||
try {
|
||||
for (let i = 0; i < ctx.maxIterations; i += 1) {
|
||||
const adapter = this.adapters.get(ctx.llmType);
|
||||
const result = await adapter.infer({
|
||||
body: this.buildBody(ctx),
|
||||
modelOverride: ctx.modelOverride,
|
||||
apiKey: ctx.apiKey,
|
||||
url: ctx.url,
|
||||
extraConfig: ctx.extraConfig,
|
||||
});
|
||||
const result = await this.runOneInference(ctx);
|
||||
const choice = extractChoice(result.body);
|
||||
if (choice === null) {
|
||||
throw new Error(`Adapter returned no choice (status ${String(result.status)})`);
|
||||
}
|
||||
if (choice.tool_calls !== undefined && choice.tool_calls.length > 0) {
|
||||
// Tool turns: keep `content` literal — even if empty — because the
|
||||
// OpenAI tool-use protocol expects the assistant message to carry
|
||||
// its tool_calls separately from any free-form text. Surfacing
|
||||
// reasoning here would confuse downstream tool dispatchers.
|
||||
const assistantTurn = await this.chatRepo.appendMessage({
|
||||
threadId: ctx.threadId,
|
||||
role: 'assistant',
|
||||
@@ -217,13 +223,17 @@ export class ChatService {
|
||||
await this.chatRepo.updateStatus(assistantTurn.id, 'complete');
|
||||
continue;
|
||||
}
|
||||
// Terminal text turn.
|
||||
// Terminal text turn. Use pickAssistantText so thinking models that
|
||||
// produced only reasoning_content still yield a usable answer (with
|
||||
// a truncation marker when finish_reason indicates max_tokens
|
||||
// cut-off). Empty body remains empty and bubbles up unchanged.
|
||||
const assistantText = pickAssistantText(choice);
|
||||
const finalMsg = await this.chatRepo.appendMessage({
|
||||
threadId: ctx.threadId,
|
||||
role: 'assistant',
|
||||
content: choice.content ?? '',
|
||||
content: assistantText,
|
||||
});
|
||||
assistantFinal = choice.content ?? '';
|
||||
assistantFinal = assistantText;
|
||||
lastTurnIndex = finalMsg.turnIndex;
|
||||
await this.chatRepo.touchThread(ctx.threadId);
|
||||
return { threadId: ctx.threadId, assistant: assistantFinal, turnIndex: lastTurnIndex };
|
||||
@@ -240,19 +250,20 @@ export class ChatService {
|
||||
const ctx = await this.prepareContext(args);
|
||||
try {
|
||||
for (let i = 0; i < ctx.maxIterations; i += 1) {
|
||||
const adapter = this.adapters.get(ctx.llmType);
|
||||
const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = {
|
||||
// `reasoning` is accumulated alongside `content` so we can fall back
|
||||
// to it when the model produces no `content` (thinking models with a
|
||||
// tight max_tokens, or providers that don't separate the two).
|
||||
const accumulated: {
|
||||
content: string;
|
||||
reasoning: string;
|
||||
toolCalls: Array<{ id: string; name: string; argumentsJson: string }>;
|
||||
} = {
|
||||
content: '',
|
||||
reasoning: '',
|
||||
toolCalls: [],
|
||||
};
|
||||
let finishReason: string | null = null;
|
||||
for await (const chunk of adapter.stream({
|
||||
body: { ...this.buildBody(ctx), stream: true },
|
||||
modelOverride: ctx.modelOverride,
|
||||
apiKey: ctx.apiKey,
|
||||
url: ctx.url,
|
||||
extraConfig: ctx.extraConfig,
|
||||
})) {
|
||||
for await (const chunk of this.streamInference(ctx)) {
|
||||
if (chunk.done === true) break;
|
||||
if (chunk.data === '[DONE]') break;
|
||||
const evt = parseStreamingChunk(chunk.data);
|
||||
@@ -262,9 +273,11 @@ export class ChatService {
|
||||
yield { type: 'text', delta: evt.contentDelta };
|
||||
}
|
||||
if (evt.reasoningDelta !== undefined) {
|
||||
// Reasoning is not persisted to the thread (it's the model's
|
||||
// scratchpad, not part of the conversation) — only streamed so
|
||||
// the REPL can show progress while the model thinks.
|
||||
// Streamed live so the REPL can show progress while the model
|
||||
// thinks. Also accumulated so a thinking-only response (no
|
||||
// `content`) still produces a non-empty persisted assistant
|
||||
// turn — see the fallback at the end of this loop iteration.
|
||||
accumulated.reasoning += evt.reasoningDelta;
|
||||
yield { type: 'thinking', delta: evt.reasoningDelta };
|
||||
}
|
||||
if (evt.toolCallDeltas !== undefined) {
|
||||
@@ -331,10 +344,27 @@ export class ChatService {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Fall back to reasoning when the model emitted only thinking
|
||||
// output. Mirrors pickAssistantText() in the non-streaming path —
|
||||
// same situation (thinking model + tight max_tokens, or a provider
|
||||
// that bundles the answer into reasoning_content).
|
||||
const persistedContent = pickAssistantText({
|
||||
content: accumulated.content.length > 0 ? accumulated.content : null,
|
||||
...(accumulated.reasoning.length > 0 ? { reasoning: accumulated.reasoning } : {}),
|
||||
finishReason,
|
||||
});
|
||||
// If we synthesized text from reasoning, yield it as a final `text`
|
||||
// delta so the client's stdout matches what the thread persists.
|
||||
// Without this, the REPL would show only `thinking` deltas (which
|
||||
// the CLI writes to stderr) and stdout would be empty for any
|
||||
// thinking-only response.
|
||||
if (accumulated.content.length === 0 && persistedContent.length > 0) {
|
||||
yield { type: 'text', delta: persistedContent };
|
||||
}
|
||||
const finalMsg = await this.chatRepo.appendMessage({
|
||||
threadId: ctx.threadId,
|
||||
role: 'assistant',
|
||||
content: accumulated.content,
|
||||
content: persistedContent,
|
||||
});
|
||||
await this.chatRepo.touchThread(ctx.threadId);
|
||||
yield { type: 'final', threadId: ctx.threadId, turnIndex: finalMsg.turnIndex };
|
||||
@@ -347,12 +377,130 @@ export class ChatService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Streaming counterpart of runOneInference. Yields raw OpenAI-style
|
||||
* SSE chunks ({data: string; done?: boolean}) regardless of whether
|
||||
* we're hitting a public adapter or relaying through VirtualLlmService.
|
||||
* The caller's `parseStreamingChunk` already speaks OpenAI shape, so
|
||||
* downstream code doesn't need to know which path produced the chunks.
|
||||
*/
|
||||
private async *streamInference(ctx: {
|
||||
llmName: string;
|
||||
llmType: string;
|
||||
llmKind: 'public' | 'virtual';
|
||||
modelOverride: string;
|
||||
url: string;
|
||||
apiKey: string;
|
||||
extraConfig: Record<string, unknown>;
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
toolList: ChatTool[];
|
||||
mergedParams: AgentChatParams;
|
||||
}): AsyncGenerator<{ data: string; done?: boolean }> {
|
||||
if (ctx.llmKind !== 'virtual') {
|
||||
const adapter = this.adapters.get(ctx.llmType);
|
||||
yield* adapter.stream({
|
||||
body: { ...this.buildBody(ctx), stream: true },
|
||||
modelOverride: ctx.modelOverride,
|
||||
apiKey: ctx.apiKey,
|
||||
url: ctx.url,
|
||||
extraConfig: ctx.extraConfig,
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (this.virtualLlms === undefined) {
|
||||
throw new Error(
|
||||
'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm',
|
||||
);
|
||||
}
|
||||
// Bridge VirtualLlmService's onChunk callback API to an async
|
||||
// iterator. Chunks land on the queue from the SSE relay; the
|
||||
// generator drains them in order. ref.done resolves when the
|
||||
// publisher emits its `[DONE]` marker.
|
||||
const ref = await this.virtualLlms.enqueueInferTask(
|
||||
ctx.llmName,
|
||||
{ ...this.buildBody(ctx), stream: true },
|
||||
true,
|
||||
);
|
||||
const queue: Array<{ data: string; done?: boolean }> = [];
|
||||
let resolveTick: (() => void) | null = null;
|
||||
const wake = (): void => {
|
||||
const r = resolveTick;
|
||||
resolveTick = null;
|
||||
if (r !== null) r();
|
||||
};
|
||||
const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); });
|
||||
let finished = false;
|
||||
let failure: Error | null = null;
|
||||
ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); });
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
while (queue.length > 0) {
|
||||
const c = queue.shift()!;
|
||||
yield c;
|
||||
if (c.done === true) return;
|
||||
}
|
||||
if (finished) {
|
||||
if (failure !== null) throw failure;
|
||||
return;
|
||||
}
|
||||
await new Promise<void>((r) => { resolveTick = r; });
|
||||
}
|
||||
} finally {
|
||||
unsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a single non-streaming inference iteration. Branches on
|
||||
* ctx.llmKind: public goes through the existing adapter registry,
|
||||
* virtual relays through VirtualLlmService.enqueueInferTask (mirrors
|
||||
* the same branch in `routes/llm-infer.ts` from v1 Stage 3).
|
||||
*
|
||||
* Throws when virtualLlms isn't wired but the row is virtual — older
|
||||
* test wirings hit this path.
|
||||
*/
|
||||
private async runOneInference(ctx: {
|
||||
llmName: string;
|
||||
llmType: string;
|
||||
llmKind: 'public' | 'virtual';
|
||||
modelOverride: string;
|
||||
url: string;
|
||||
apiKey: string;
|
||||
extraConfig: Record<string, unknown>;
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
toolList: ChatTool[];
|
||||
mergedParams: AgentChatParams;
|
||||
}): Promise<{ status: number; body: unknown }> {
|
||||
if (ctx.llmKind === 'virtual') {
|
||||
if (this.virtualLlms === undefined) {
|
||||
throw new Error(
|
||||
'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm',
|
||||
);
|
||||
}
|
||||
const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false);
|
||||
return ref.done;
|
||||
}
|
||||
const adapter = this.adapters.get(ctx.llmType);
|
||||
return adapter.infer({
|
||||
body: this.buildBody(ctx),
|
||||
modelOverride: ctx.modelOverride,
|
||||
apiKey: ctx.apiKey,
|
||||
url: ctx.url,
|
||||
extraConfig: ctx.extraConfig,
|
||||
});
|
||||
}
|
||||
|
||||
private async prepareContext(args: ChatRequestArgs): Promise<{
|
||||
threadId: string;
|
||||
history: OpenAiMessage[];
|
||||
systemBlock: string;
|
||||
llmName: string;
|
||||
llmType: string;
|
||||
/** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */
|
||||
llmKind: 'public' | 'virtual';
|
||||
modelOverride: string;
|
||||
url: string;
|
||||
apiKey: string;
|
||||
@@ -435,6 +583,7 @@ export class ChatService {
|
||||
systemBlock,
|
||||
llmName: llm.name,
|
||||
llmType: llm.type,
|
||||
llmKind: llm.kind,
|
||||
modelOverride: llm.model,
|
||||
url: llm.url,
|
||||
apiKey,
|
||||
@@ -568,6 +717,17 @@ export class ChatService {
|
||||
|
||||
interface ExtractedChoice {
|
||||
content: string | null;
|
||||
/**
|
||||
* Reasoning text emitted by thinking models (qwen3-thinking,
|
||||
* deepseek-reasoner, OpenAI o1 family). Different providers spell the
|
||||
* field differently; the parser accepts every shape the streaming
|
||||
* counterpart already accepts (see `parseStreamingChunk`). When `content`
|
||||
* is null/empty, callers fall back to this so thinking models that
|
||||
* exhaust their token budget on reasoning still produce a usable answer.
|
||||
*/
|
||||
reasoning?: string;
|
||||
/** OpenAI's stop reason — `'stop' | 'length' | 'tool_calls' | 'content_filter' | ...`. */
|
||||
finishReason?: string | null;
|
||||
tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>;
|
||||
}
|
||||
|
||||
@@ -575,17 +735,52 @@ function extractChoice(body: unknown): ExtractedChoice | null {
|
||||
if (typeof body !== 'object' || body === null) return null;
|
||||
const choices = (body as { choices?: unknown }).choices;
|
||||
if (!Array.isArray(choices) || choices.length === 0) return null;
|
||||
const first = choices[0] as { message?: { content?: unknown; tool_calls?: unknown } } | undefined;
|
||||
const first = choices[0] as {
|
||||
message?: {
|
||||
content?: unknown;
|
||||
reasoning_content?: unknown;
|
||||
reasoning?: unknown;
|
||||
provider_specific_fields?: { reasoning_content?: unknown; reasoning?: unknown };
|
||||
tool_calls?: unknown;
|
||||
};
|
||||
finish_reason?: unknown;
|
||||
} | undefined;
|
||||
if (first?.message === undefined) return null;
|
||||
const content = typeof first.message.content === 'string' ? first.message.content : null;
|
||||
const m = first.message;
|
||||
const reasoning =
|
||||
(typeof m.reasoning_content === 'string' && m.reasoning_content.length > 0 ? m.reasoning_content : undefined)
|
||||
?? (typeof m.reasoning === 'string' && m.reasoning.length > 0 ? m.reasoning : undefined)
|
||||
?? (typeof m.provider_specific_fields?.reasoning_content === 'string' && m.provider_specific_fields.reasoning_content.length > 0 ? m.provider_specific_fields.reasoning_content : undefined)
|
||||
?? (typeof m.provider_specific_fields?.reasoning === 'string' && m.provider_specific_fields.reasoning.length > 0 ? m.provider_specific_fields.reasoning : undefined);
|
||||
const finishReason = typeof first.finish_reason === 'string' ? first.finish_reason : null;
|
||||
const toolCalls = first.message.tool_calls;
|
||||
const out: ExtractedChoice = { content };
|
||||
const out: ExtractedChoice = { content, finishReason };
|
||||
if (reasoning !== undefined) out.reasoning = reasoning;
|
||||
if (Array.isArray(toolCalls)) {
|
||||
out.tool_calls = toolCalls as NonNullable<ExtractedChoice['tool_calls']>;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick what text to surface (and persist) as the assistant's reply.
|
||||
* Thinking models sometimes emit only `reasoning_content` and leave
|
||||
* `content` null — typically when `max_tokens` is too small for the
|
||||
* thinking budget, but also when the provider configuration just doesn't
|
||||
* separate the two. In that case the reasoning IS the answer for this
|
||||
* request, and the caller should see it. A `length` finish_reason marker
|
||||
* makes truncation visible so users can fix their max_tokens config.
|
||||
*/
|
||||
function pickAssistantText(choice: ExtractedChoice): string {
|
||||
if (choice.content !== null && choice.content.length > 0) return choice.content;
|
||||
if (choice.reasoning !== undefined && choice.reasoning.length > 0) {
|
||||
const truncated = choice.finishReason === 'length' ? '\n\n[response truncated by max_tokens]' : '';
|
||||
return `${choice.reasoning}${truncated}`;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
function safeParseJson(s: string): unknown {
|
||||
if (s === '') return {};
|
||||
try {
|
||||
|
||||
@@ -123,7 +123,15 @@ export class OpenAiPassthroughAdapter implements LlmAdapter {
|
||||
}
|
||||
|
||||
private endpointUrl(url: string): string {
|
||||
if (url !== '') return url.replace(/\/+$/, '');
|
||||
// Accept both conventional forms users actually paste — base host
|
||||
// (`https://api.openai.com`) and base + version (`https://api.openai.com/v1`).
|
||||
// Every OpenAI-compat provider documents their endpoint with the `/v1`
|
||||
// suffix, so users naturally include it; the adapter then re-appends
|
||||
// `/v1/chat/completions`, producing a doubled-`/v1` 404 against LiteLLM
|
||||
// and others. Strip a trailing `/v1` (with or without slash) so both
|
||||
// shapes resolve to the same canonical base. A more specific suffix
|
||||
// like `/v1beta` is preserved.
|
||||
if (url !== '') return url.replace(/\/+$/, '').replace(/\/v1$/, '');
|
||||
const def = DEFAULT_URLS[this.kind];
|
||||
if (def === undefined) {
|
||||
throw new Error(`${this.kind}: url is required (no default endpoint for this provider)`);
|
||||
|
||||
@@ -28,6 +28,7 @@ import { randomUUID } from 'node:crypto';
|
||||
import type { ILlmRepository } from '../repositories/llm.repository.js';
|
||||
import type { OpenAiChatRequest } from './llm/types.js';
|
||||
import { NotFoundError } from './mcp-server.service.js';
|
||||
import type { AgentService } from './agent.service.js';
|
||||
|
||||
/** A virtual provider's announcement at registration time. */
|
||||
export interface RegisterProviderInput {
|
||||
@@ -37,6 +38,15 @@ export interface RegisterProviderInput {
|
||||
tier?: string;
|
||||
description?: string;
|
||||
extraConfig?: Record<string, unknown>;
|
||||
/**
|
||||
* Optional. Lets the publisher hint that the underlying backend is
|
||||
* asleep — mcpd records the row as `hibernating` and will dispatch a
|
||||
* `wake` task before any inference. Defaults to `active` (today's
|
||||
* behavior). v2 publishers (mcplocal with a configured wake recipe)
|
||||
* pass 'hibernating' when `LlmProvider.isAvailable()` returns false at
|
||||
* publish time.
|
||||
*/
|
||||
initialStatus?: 'active' | 'hibernating';
|
||||
}
|
||||
|
||||
export interface RegisterResult {
|
||||
@@ -103,8 +113,23 @@ export interface PendingTaskRef {
|
||||
export class VirtualLlmService implements IVirtualLlmService {
|
||||
private readonly sessions = new Map<string, VirtualSessionHandle>();
|
||||
private readonly tasksById = new Map<string, PendingTask>();
|
||||
/**
|
||||
* Dedupe concurrent wake requests for the same Llm. The first request
|
||||
* starts the wake; later requests for the same name await the same
|
||||
* promise. Cleared as soon as the wake settles (success or failure).
|
||||
*/
|
||||
private readonly wakeInFlight = new Map<string, Promise<void>>();
|
||||
|
||||
constructor(private readonly repo: ILlmRepository) {}
|
||||
constructor(
|
||||
private readonly repo: ILlmRepository,
|
||||
/**
|
||||
* Optional. v3 wires AgentService here so the lifecycle cascades:
|
||||
* heartbeat → bump owned agents; disconnect → mark agents inactive;
|
||||
* gcSweep → sweep agents first, then delete pinned-to-Llm cascade
|
||||
* before deleting the Llm itself (Agent.llmId is Restrict).
|
||||
*/
|
||||
private readonly agents?: AgentService,
|
||||
) {}
|
||||
|
||||
async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> {
|
||||
const sessionId = input.providerSessionId ?? randomUUID();
|
||||
@@ -112,6 +137,7 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
const llms: Llm[] = [];
|
||||
|
||||
for (const p of input.providers) {
|
||||
const initialStatus = p.initialStatus ?? 'active';
|
||||
const existing = await this.repo.findByName(p.name);
|
||||
if (existing === null) {
|
||||
const created = await this.repo.create({
|
||||
@@ -123,7 +149,7 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
||||
kind: 'virtual',
|
||||
providerSessionId: sessionId,
|
||||
status: 'active',
|
||||
status: initialStatus,
|
||||
lastHeartbeatAt: now,
|
||||
inactiveSince: null,
|
||||
});
|
||||
@@ -156,7 +182,7 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
||||
kind: 'virtual',
|
||||
providerSessionId: sessionId,
|
||||
status: 'active',
|
||||
status: initialStatus,
|
||||
lastHeartbeatAt: now,
|
||||
inactiveSince: null,
|
||||
});
|
||||
@@ -168,7 +194,6 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
|
||||
async heartbeat(providerSessionId: string): Promise<void> {
|
||||
const owned = await this.repo.findBySessionId(providerSessionId);
|
||||
if (owned.length === 0) return;
|
||||
const now = new Date();
|
||||
for (const row of owned) {
|
||||
// Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a
|
||||
@@ -180,6 +205,11 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
// Cascade to virtual agents owned by the same session — same heartbeat
|
||||
// covers them. No-op if AgentService isn't wired (older test configs).
|
||||
if (this.agents !== undefined) {
|
||||
await this.agents.heartbeatVirtualAgents(providerSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void {
|
||||
@@ -198,6 +228,10 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
|
||||
}
|
||||
}
|
||||
// Cascade to virtual agents owned by the same session.
|
||||
if (this.agents !== undefined) {
|
||||
await this.agents.markVirtualAgentsInactiveBySession(providerSessionId);
|
||||
}
|
||||
// Reject any in-flight tasks for this session — the relay can't deliver
|
||||
// a result POST anymore.
|
||||
for (const t of this.tasksById.values()) {
|
||||
@@ -220,9 +254,9 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
{ statusCode: 500 },
|
||||
);
|
||||
}
|
||||
if (llm.status !== 'active') {
|
||||
if (llm.status === 'inactive') {
|
||||
throw Object.assign(
|
||||
new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`),
|
||||
new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`),
|
||||
{ statusCode: 503 },
|
||||
);
|
||||
}
|
||||
@@ -234,6 +268,16 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
);
|
||||
}
|
||||
|
||||
// ── Wake-on-demand (v2) ──
|
||||
// Status=hibernating means the publisher told us at register time
|
||||
// (or via a later status update) that the backend is asleep. Fire a
|
||||
// wake task and wait for the publisher to confirm readiness before
|
||||
// dispatching the actual inference. Concurrent infers for the same
|
||||
// Llm share a single wake promise.
|
||||
if (llm.status === 'hibernating') {
|
||||
await this.ensureAwake(llm.id, llm.name, llm.providerSessionId, handle);
|
||||
}
|
||||
|
||||
const taskId = randomUUID();
|
||||
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
|
||||
|
||||
@@ -275,6 +319,77 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Drive the publisher to wake the backend. Concurrent callers for the
|
||||
* same Llm name share the in-flight promise — we only ever ask the
|
||||
* publisher once. Throws on timeout or recipe failure; on success the
|
||||
* row is flipped to active and subsequent infer calls proceed.
|
||||
*/
|
||||
private async ensureAwake(
|
||||
llmId: string,
|
||||
llmName: string,
|
||||
sessionId: string,
|
||||
handle: VirtualSessionHandle,
|
||||
): Promise<void> {
|
||||
const existing = this.wakeInFlight.get(llmName);
|
||||
if (existing !== undefined) {
|
||||
await existing;
|
||||
return;
|
||||
}
|
||||
const promise = this.runWake(llmId, llmName, sessionId, handle);
|
||||
this.wakeInFlight.set(llmName, promise);
|
||||
try {
|
||||
await promise;
|
||||
} finally {
|
||||
this.wakeInFlight.delete(llmName);
|
||||
}
|
||||
}
|
||||
|
||||
private async runWake(
|
||||
llmId: string,
|
||||
llmName: string,
|
||||
sessionId: string,
|
||||
handle: VirtualSessionHandle,
|
||||
): Promise<void> {
|
||||
const taskId = randomUUID();
|
||||
let resolveDone!: () => void;
|
||||
let rejectDone!: (err: Error) => void;
|
||||
const done = new Promise<void>((resolve, reject) => {
|
||||
resolveDone = resolve;
|
||||
rejectDone = reject;
|
||||
});
|
||||
|
||||
const pending: PendingTask = {
|
||||
taskId,
|
||||
sessionId,
|
||||
llmName,
|
||||
streaming: false,
|
||||
// Wake tasks return { ok: true } on success or never resolve at
|
||||
// all if the publisher dies; the rejectNonStreaming path covers
|
||||
// the disconnect-mid-wake case via unbindSession.
|
||||
resolveNonStreaming: (_body, status) => {
|
||||
if (status >= 200 && status < 300) resolveDone();
|
||||
else rejectDone(new Error(`wake task returned status ${String(status)}`));
|
||||
},
|
||||
rejectNonStreaming: rejectDone,
|
||||
pushChunk: null,
|
||||
};
|
||||
this.tasksById.set(taskId, pending);
|
||||
|
||||
handle.pushTask({ kind: 'wake', taskId, llmName });
|
||||
|
||||
await done;
|
||||
|
||||
// Flip the row to active so subsequent infer calls go through the
|
||||
// normal active path. The publisher's own heartbeat will keep the
|
||||
// row alive from this point.
|
||||
await this.repo.update(llmId, {
|
||||
status: 'active',
|
||||
lastHeartbeatAt: new Date(),
|
||||
inactiveSince: null,
|
||||
});
|
||||
}
|
||||
|
||||
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
|
||||
const t = this.tasksById.get(taskId);
|
||||
if (t === undefined) return false;
|
||||
@@ -308,6 +423,16 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
let markedInactive = 0;
|
||||
let deleted = 0;
|
||||
|
||||
// v3: sweep virtual agents FIRST so any Llm-pinned agent that's
|
||||
// about to be cascaded (because its Llm is also expiring) is gone
|
||||
// before we attempt to delete the Llm. Agent.llmId is Restrict and
|
||||
// would otherwise block.
|
||||
if (this.agents !== undefined) {
|
||||
const agentSweep = await this.agents.gcSweepVirtualAgents(now);
|
||||
markedInactive += agentSweep.markedInactive;
|
||||
deleted += agentSweep.deleted;
|
||||
}
|
||||
|
||||
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
|
||||
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
|
||||
for (const row of stale) {
|
||||
@@ -318,6 +443,13 @@ export class VirtualLlmService implements IVirtualLlmService {
|
||||
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
|
||||
const expired = await this.repo.findExpiredInactives(deletionCutoff);
|
||||
for (const row of expired) {
|
||||
// Final defensive cascade: drop any virtual agents still pinned
|
||||
// to this Llm (e.g. their lastHeartbeatAt happens to lag the
|
||||
// Llm's by a few seconds and they didn't make this round's
|
||||
// 4-h cutoff). Without this we'd hit a Restrict FK error.
|
||||
if (this.agents !== undefined) {
|
||||
await this.agents.deleteVirtualAgentsForLlm(row.id);
|
||||
}
|
||||
await this.repo.delete(row.id);
|
||||
deleted += 1;
|
||||
}
|
||||
|
||||
251
src/mcpd/tests/chat-service-virtual-llm.test.ts
Normal file
251
src/mcpd/tests/chat-service-virtual-llm.test.ts
Normal file
@@ -0,0 +1,251 @@
|
||||
import { describe, it, expect, vi } from 'vitest';
|
||||
import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js';
|
||||
import type { AgentService } from '../src/services/agent.service.js';
|
||||
import type { LlmService } from '../src/services/llm.service.js';
|
||||
import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
|
||||
import type { IChatRepository } from '../src/repositories/chat.repository.js';
|
||||
import type { IPromptRepository } from '../src/repositories/prompt.repository.js';
|
||||
import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js';
|
||||
import type { ChatMessage, ChatThread, Prompt } from '@prisma/client';
|
||||
|
||||
const NOW = new Date();
|
||||
|
||||
/**
|
||||
* Tests targeting v3 Stage 1's chat.service kind=virtual branch.
|
||||
* Mirror the existing chat-service.test.ts patterns but isolate the
|
||||
* adapter-vs-relay dispatch decision.
|
||||
*/
|
||||
|
||||
function mockChatRepo(): IChatRepository {
|
||||
const msgs: ChatMessage[] = [];
|
||||
const threads: ChatThread[] = [];
|
||||
let idCounter = 1;
|
||||
return {
|
||||
createThread: vi.fn(async ({ agentId, ownerId, title }) => {
|
||||
const t: ChatThread = {
|
||||
id: `thread-${String(idCounter++)}`, agentId, ownerId,
|
||||
title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW,
|
||||
};
|
||||
threads.push(t);
|
||||
return t;
|
||||
}),
|
||||
findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null),
|
||||
listThreadsByAgent: vi.fn(async () => []),
|
||||
listMessages: vi.fn(async () => []),
|
||||
appendMessage: vi.fn(async (input) => {
|
||||
const m: ChatMessage = {
|
||||
id: `msg-${String(idCounter++)}`,
|
||||
threadId: input.threadId,
|
||||
turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length,
|
||||
role: input.role,
|
||||
content: input.content,
|
||||
toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'],
|
||||
toolCallId: input.toolCallId ?? null,
|
||||
status: input.status ?? 'complete',
|
||||
createdAt: NOW,
|
||||
};
|
||||
msgs.push(m);
|
||||
return m;
|
||||
}),
|
||||
updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)),
|
||||
markPendingAsError: vi.fn(async () => 0),
|
||||
touchThread: vi.fn(async () => undefined),
|
||||
nextTurnIndex: vi.fn(async () => msgs.length),
|
||||
};
|
||||
}
|
||||
|
||||
function mockAgents(): AgentService {
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => ({
|
||||
id: `agent-${name}`, name, description: '',
|
||||
systemPrompt: 'You are a helpful agent.',
|
||||
llm: { id: 'llm-1', name: 'vllm-local' },
|
||||
project: null,
|
||||
defaultPersonality: null,
|
||||
proxyModelName: null,
|
||||
defaultParams: {},
|
||||
extras: {},
|
||||
ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW,
|
||||
})),
|
||||
} as unknown as AgentService;
|
||||
}
|
||||
|
||||
function mockLlmsVirtual(): LlmService {
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => ({
|
||||
id: 'llm-1', name, type: 'openai', model: 'fake',
|
||||
url: '', tier: 'fast', description: '',
|
||||
apiKeyRef: null, extraConfig: {},
|
||||
kind: 'virtual',
|
||||
status: 'active',
|
||||
lastHeartbeatAt: NOW,
|
||||
inactiveSince: null,
|
||||
version: 1, createdAt: NOW, updatedAt: NOW,
|
||||
})),
|
||||
resolveApiKey: vi.fn(async () => ''),
|
||||
} as unknown as LlmService;
|
||||
}
|
||||
|
||||
function mockPromptRepo(): IPromptRepository {
|
||||
return {
|
||||
findAll: vi.fn(async () => []),
|
||||
findGlobal: vi.fn(async () => []),
|
||||
findByAgent: vi.fn(async () => []),
|
||||
findById: vi.fn(async () => null),
|
||||
findByNameAndProject: vi.fn(async () => null),
|
||||
findByNameAndAgent: vi.fn(async () => null),
|
||||
create: vi.fn(),
|
||||
update: vi.fn(),
|
||||
delete: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function mockTools(): ChatToolDispatcher {
|
||||
return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) };
|
||||
}
|
||||
|
||||
function emptyAdapterRegistry(): LlmAdapterRegistry {
|
||||
return {
|
||||
get: () => { throw new Error('adapter should not be used for kind=virtual'); },
|
||||
} as unknown as LlmAdapterRegistry;
|
||||
}
|
||||
|
||||
function mockVirtualLlms(opts: {
|
||||
reply?: string;
|
||||
rejectWith?: Error;
|
||||
streamingChunks?: string[];
|
||||
}): IVirtualLlmService {
|
||||
const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => {
|
||||
if (opts.rejectWith !== undefined) {
|
||||
return {
|
||||
taskId: 't-1',
|
||||
done: Promise.reject(opts.rejectWith),
|
||||
onChunk: () => () => undefined,
|
||||
};
|
||||
}
|
||||
if (!streaming) {
|
||||
const body = {
|
||||
choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }],
|
||||
};
|
||||
return {
|
||||
taskId: 't-1',
|
||||
done: Promise.resolve({ status: 200, body }),
|
||||
onChunk: () => () => undefined,
|
||||
};
|
||||
}
|
||||
// Streaming path: collect subscribers, push the configured chunks
|
||||
// synchronously, then resolve done.
|
||||
const subs = new Set<(c: { data: string; done?: boolean }) => void>();
|
||||
const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}'];
|
||||
return {
|
||||
taskId: 't-1',
|
||||
done: (async (): Promise<{ status: number; body: unknown }> => {
|
||||
// Wait long enough for the caller to register subscribers
|
||||
// before fanning chunks. Promise.resolve() isn't enough — the
|
||||
// microtask running this IIFE is queued ahead of the caller's
|
||||
// await on enqueueInferTask, so subs would still be empty.
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
for (const c of chunks) for (const s of subs) s({ data: c });
|
||||
for (const s of subs) s({ data: '[DONE]', done: true });
|
||||
return { status: 200, body: null };
|
||||
})(),
|
||||
onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); },
|
||||
};
|
||||
});
|
||||
return {
|
||||
register: vi.fn(),
|
||||
heartbeat: vi.fn(),
|
||||
bindSession: vi.fn(),
|
||||
unbindSession: vi.fn(),
|
||||
enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'],
|
||||
completeTask: vi.fn(),
|
||||
pushTaskChunk: vi.fn(),
|
||||
failTask: vi.fn(),
|
||||
gcSweep: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
|
||||
it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => {
|
||||
const chatRepo = mockChatRepo();
|
||||
const virtual = mockVirtualLlms({ reply: 'hello back from local' });
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsVirtual(),
|
||||
emptyAdapterRegistry(),
|
||||
chatRepo,
|
||||
mockPromptRepo(),
|
||||
mockTools(),
|
||||
undefined,
|
||||
virtual,
|
||||
);
|
||||
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
|
||||
expect(result.assistant).toBe('hello back from local');
|
||||
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
|
||||
'vllm-local',
|
||||
expect.objectContaining({ messages: expect.any(Array) }),
|
||||
false,
|
||||
);
|
||||
});
|
||||
|
||||
it('streaming relays through VirtualLlmService and emits the same text deltas', async () => {
|
||||
const chatRepo = mockChatRepo();
|
||||
const virtual = mockVirtualLlms({
|
||||
streamingChunks: [
|
||||
'{"choices":[{"delta":{"content":"hello "}}]}',
|
||||
'{"choices":[{"delta":{"content":"world"}}]}',
|
||||
],
|
||||
});
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsVirtual(),
|
||||
emptyAdapterRegistry(),
|
||||
chatRepo,
|
||||
mockPromptRepo(),
|
||||
mockTools(),
|
||||
undefined,
|
||||
virtual,
|
||||
);
|
||||
const deltas: string[] = [];
|
||||
for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
|
||||
if (evt.type === 'text') deltas.push(evt.delta);
|
||||
if (evt.type === 'final') break;
|
||||
}
|
||||
expect(deltas.join('')).toBe('hello world');
|
||||
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
|
||||
'vllm-local',
|
||||
expect.objectContaining({ messages: expect.any(Array), stream: true }),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => {
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsVirtual(),
|
||||
emptyAdapterRegistry(),
|
||||
mockChatRepo(),
|
||||
mockPromptRepo(),
|
||||
mockTools(),
|
||||
// no personalities, no virtualLlms
|
||||
);
|
||||
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
|
||||
.rejects.toThrow(/virtualLlms dispatcher not wired/);
|
||||
});
|
||||
|
||||
it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => {
|
||||
const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) });
|
||||
const svc = new ChatService(
|
||||
mockAgents(),
|
||||
mockLlmsVirtual(),
|
||||
emptyAdapterRegistry(),
|
||||
mockChatRepo(),
|
||||
mockPromptRepo(),
|
||||
mockTools(),
|
||||
undefined,
|
||||
virtual,
|
||||
);
|
||||
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
|
||||
.rejects.toThrow(/publisher offline/);
|
||||
});
|
||||
});
|
||||
@@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } |
|
||||
} as unknown as AgentService;
|
||||
}
|
||||
|
||||
function mockLlms(): LlmService {
|
||||
function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService {
|
||||
return {
|
||||
getByName: vi.fn(async (name: string) => ({
|
||||
id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking',
|
||||
url: '', tier: 'fast', description: '',
|
||||
apiKeyRef: null, extraConfig: {},
|
||||
kind: opts.kind ?? 'public',
|
||||
status: 'active',
|
||||
lastHeartbeatAt: null,
|
||||
inactiveSince: null,
|
||||
version: 1, createdAt: NOW, updatedAt: NOW,
|
||||
})),
|
||||
resolveApiKey: vi.fn(async () => 'fake-key'),
|
||||
@@ -457,6 +461,121 @@ describe('ChatService', () => {
|
||||
expect(assistantTurn?.content).not.toContain('Let me think');
|
||||
});
|
||||
|
||||
// Regression: thinking models with a tight max_tokens budget produce
|
||||
// `reasoning_content` only and leave `content` null. Without falling back
|
||||
// to reasoning, the assistant turn was empty and the smoke test saw an
|
||||
// empty stdout. This covers BOTH chat() (non-streaming) and chatStream()
|
||||
// (synthetic final text frame so the CLI's stdout matches what's
|
||||
// persisted to the thread).
|
||||
it('chat falls back to reasoning_content when content is null', async () => {
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter: LlmAdapter = {
|
||||
kind: 'thinking-truncated',
|
||||
infer: vi.fn(async () => ({
|
||||
status: 200,
|
||||
body: {
|
||||
id: 'cmpl-1',
|
||||
object: 'chat.completion',
|
||||
choices: [{
|
||||
index: 0,
|
||||
message: { role: 'assistant', content: null, reasoning_content: 'Thinking out loud about the answer' },
|
||||
finish_reason: 'stop',
|
||||
}],
|
||||
},
|
||||
})),
|
||||
stream: async function*() { yield { data: '[DONE]', done: true }; },
|
||||
};
|
||||
const svc = new ChatService(
|
||||
mockAgents(), mockLlms(), adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
|
||||
expect(result.assistant).toBe('Thinking out loud about the answer');
|
||||
const stored = chatRepo._msgs.find((m) => m.role === 'assistant');
|
||||
expect(stored?.content).toBe('Thinking out loud about the answer');
|
||||
});
|
||||
|
||||
it('chat appends [response truncated by max_tokens] when finish_reason is "length"', async () => {
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter: LlmAdapter = {
|
||||
kind: 'thinking-clipped',
|
||||
infer: vi.fn(async () => ({
|
||||
status: 200,
|
||||
body: {
|
||||
choices: [{
|
||||
index: 0,
|
||||
message: { role: 'assistant', content: null, reasoning_content: 'partial reasoning that ran out of' },
|
||||
finish_reason: 'length',
|
||||
}],
|
||||
},
|
||||
})),
|
||||
stream: async function*() { yield { data: '[DONE]', done: true }; },
|
||||
};
|
||||
const svc = new ChatService(
|
||||
mockAgents(), mockLlms(), adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
|
||||
expect(result.assistant).toContain('partial reasoning that ran out of');
|
||||
expect(result.assistant).toContain('[response truncated by max_tokens]');
|
||||
});
|
||||
|
||||
it('chat prefers content when both content and reasoning_content are present', async () => {
|
||||
// Thinking models that DO produce content shouldn't see the reasoning
|
||||
// bleed into the response — that's what the streaming path's
|
||||
// text/thinking split is for, and the non-streaming path should match.
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter: LlmAdapter = {
|
||||
kind: 'thinking-with-content',
|
||||
infer: vi.fn(async () => ({
|
||||
status: 200,
|
||||
body: {
|
||||
choices: [{
|
||||
index: 0,
|
||||
message: { role: 'assistant', content: 'real answer', reasoning_content: 'background thinking' },
|
||||
finish_reason: 'stop',
|
||||
}],
|
||||
},
|
||||
})),
|
||||
stream: async function*() { yield { data: '[DONE]', done: true }; },
|
||||
};
|
||||
const svc = new ChatService(
|
||||
mockAgents(), mockLlms(), adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
|
||||
expect(result.assistant).toBe('real answer');
|
||||
expect(result.assistant).not.toContain('background thinking');
|
||||
});
|
||||
|
||||
it('chatStream emits a synthetic text frame and persists reasoning when content is empty', async () => {
|
||||
const chatRepo = mockChatRepo();
|
||||
const adapter: LlmAdapter = {
|
||||
kind: 'thinking-only-stream',
|
||||
infer: vi.fn(),
|
||||
stream: async function*() {
|
||||
yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'thinking ' }, finish_reason: null }] }) };
|
||||
yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'more.' }, finish_reason: 'stop' }] }) };
|
||||
yield { data: '[DONE]', done: true };
|
||||
},
|
||||
};
|
||||
const svc = new ChatService(
|
||||
mockAgents(), mockLlms(), adapterRegistry(adapter),
|
||||
chatRepo, mockPromptRepo(), mockTools(),
|
||||
);
|
||||
const chunks: Array<{ type: string; delta?: string }> = [];
|
||||
for await (const c of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
|
||||
chunks.push({ type: c.type, delta: c.delta });
|
||||
}
|
||||
// 2 thinking deltas (live), 1 synthesized text frame, 1 final.
|
||||
expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta)).toEqual(['thinking ', 'more.']);
|
||||
expect(chunks.filter((c) => c.type === 'text').map((c) => c.delta)).toEqual(['thinking more.']);
|
||||
// The thread message captures the synthesized text so resumed chats see
|
||||
// a coherent assistant turn (rather than blank).
|
||||
const stored = chatRepo._msgs.find((m) => m.role === 'assistant');
|
||||
expect(stored?.content).toBe('thinking more.');
|
||||
});
|
||||
|
||||
// Regression: provider_specific_fields.reasoning_content shape (LiteLLM
|
||||
// passthrough from vLLM) is also recognized.
|
||||
it('chatStream recognizes LiteLLM provider_specific_fields.reasoning_content', async () => {
|
||||
|
||||
@@ -71,6 +71,36 @@ describe('OpenAiPassthroughAdapter', () => {
|
||||
await expect(adapter.infer(makeCtx())).rejects.toThrow(/no default endpoint/);
|
||||
});
|
||||
|
||||
it('infer: strips a trailing /v1 from the configured URL', async () => {
|
||||
// Users naturally paste the OpenAI-style base URL with /v1 because
|
||||
// every provider documents it that way (https://api.openai.com/v1,
|
||||
// https://llm.example.com/v1). The adapter then re-appends
|
||||
// /v1/chat/completions; without normalization this would produce a
|
||||
// doubled-/v1 404 against LiteLLM and friends.
|
||||
const fetchFn = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]);
|
||||
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
|
||||
await adapter.infer(makeCtx({ url: 'https://llm.example.com/v1' }));
|
||||
const [url1] = fetchFn.mock.calls[0] as [string];
|
||||
expect(url1).toBe('https://llm.example.com/v1/chat/completions');
|
||||
|
||||
// Trailing slash + /v1 should also normalize correctly.
|
||||
const fetchFn2 = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]);
|
||||
const adapter2 = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn2 as unknown as typeof fetch });
|
||||
await adapter2.infer(makeCtx({ url: 'https://llm.example.com/v1/' }));
|
||||
const [url2] = fetchFn2.mock.calls[0] as [string];
|
||||
expect(url2).toBe('https://llm.example.com/v1/chat/completions');
|
||||
});
|
||||
|
||||
it('infer: preserves a trailing /v1beta suffix (only exact /v1 is stripped)', async () => {
|
||||
// Some providers expose `/v1beta` as a parallel API surface — don't
|
||||
// accidentally rewrite that to `/v1` or strip it.
|
||||
const fetchFn = mockFetch([{ match: /\/v1beta\/v1\/chat\/completions$/, status: 200, body: {} }]);
|
||||
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
|
||||
await adapter.infer(makeCtx({ url: 'https://api.example.com/v1beta' }));
|
||||
const [url] = fetchFn.mock.calls[0] as [string];
|
||||
expect(url).toBe('https://api.example.com/v1beta/v1/chat/completions');
|
||||
});
|
||||
|
||||
it('infer: omits Authorization when apiKey is empty', async () => {
|
||||
const fetchFn = mockFetch([{ match: /ollama/, status: 200, body: {} }]);
|
||||
const adapter = new OpenAiPassthroughAdapter('ollama', { fetch: fetchFn as unknown as typeof fetch });
|
||||
|
||||
376
src/mcpd/tests/virtual-agent-service.test.ts
Normal file
376
src/mcpd/tests/virtual-agent-service.test.ts
Normal file
@@ -0,0 +1,376 @@
|
||||
import { describe, it, expect, vi } from 'vitest';
|
||||
import { AgentService, type VirtualAgentInput } from '../src/services/agent.service.js';
|
||||
import { VirtualLlmService } from '../src/services/virtual-llm.service.js';
|
||||
import type { IAgentRepository } from '../src/repositories/agent.repository.js';
|
||||
import type { ILlmRepository } from '../src/repositories/llm.repository.js';
|
||||
import type { LlmService } from '../src/services/llm.service.js';
|
||||
import type { ProjectService } from '../src/services/project.service.js';
|
||||
import type { Agent, Llm } from '@prisma/client';
|
||||
|
||||
/**
|
||||
* v3 Stage 2 — virtual-agent lifecycle methods on AgentService and the
|
||||
* cascade callbacks wired into VirtualLlmService.gcSweep / heartbeat /
|
||||
* unbindSession. Mirrors the shape of virtual-llm-service.test.ts but
|
||||
* focused on the agent-side state machine + the Llm→Agent cascade.
|
||||
*/
|
||||
|
||||
const NOW = new Date();
|
||||
|
||||
function makeAgent(overrides: Partial<Agent> = {}): Agent {
|
||||
return {
|
||||
id: `agent-${Math.random().toString(36).slice(2, 8)}`,
|
||||
name: 'fake-agent',
|
||||
description: '',
|
||||
systemPrompt: '',
|
||||
llmId: 'llm-1',
|
||||
projectId: null,
|
||||
defaultPersonalityId: null,
|
||||
proxyModelName: null,
|
||||
defaultParams: {} as Agent['defaultParams'],
|
||||
extras: {} as Agent['extras'],
|
||||
kind: 'virtual',
|
||||
providerSessionId: 'sess-1',
|
||||
lastHeartbeatAt: NOW,
|
||||
status: 'active',
|
||||
inactiveSince: null,
|
||||
ownerId: 'owner-1',
|
||||
version: 1,
|
||||
createdAt: NOW,
|
||||
updatedAt: NOW,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeLlm(overrides: Partial<Llm> = {}): Llm {
|
||||
return {
|
||||
id: `llm-${Math.random().toString(36).slice(2, 8)}`,
|
||||
name: 'vllm-local',
|
||||
type: 'openai',
|
||||
model: 'm',
|
||||
url: '',
|
||||
tier: 'fast',
|
||||
description: '',
|
||||
apiKeySecretId: null,
|
||||
apiKeySecretKey: null,
|
||||
extraConfig: {} as Llm['extraConfig'],
|
||||
kind: 'virtual',
|
||||
providerSessionId: 'sess-1',
|
||||
lastHeartbeatAt: NOW,
|
||||
status: 'active',
|
||||
inactiveSince: null,
|
||||
version: 1,
|
||||
createdAt: NOW,
|
||||
updatedAt: NOW,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function mockAgentRepo(initial: Agent[] = []): IAgentRepository {
|
||||
const rows = new Map<string, Agent>(initial.map((r) => [r.id, r]));
|
||||
let counter = rows.size;
|
||||
return {
|
||||
findAll: vi.fn(async () => [...rows.values()]),
|
||||
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
|
||||
findByName: vi.fn(async (name: string) => {
|
||||
for (const r of rows.values()) if (r.name === name) return r;
|
||||
return null;
|
||||
}),
|
||||
findByProjectId: vi.fn(async () => []),
|
||||
findBySessionId: vi.fn(async (sid: string) =>
|
||||
[...rows.values()].filter((r) => r.providerSessionId === sid)),
|
||||
findByLlmId: vi.fn(async (llmId: string) =>
|
||||
[...rows.values()].filter((r) => r.llmId === llmId)),
|
||||
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
|
||||
[...rows.values()].filter((r) =>
|
||||
r.kind === 'virtual'
|
||||
&& r.status === 'active'
|
||||
&& r.lastHeartbeatAt !== null
|
||||
&& r.lastHeartbeatAt < cutoff)),
|
||||
findExpiredInactives: vi.fn(async (cutoff: Date) =>
|
||||
[...rows.values()].filter((r) =>
|
||||
r.kind === 'virtual'
|
||||
&& r.status === 'inactive'
|
||||
&& r.inactiveSince !== null
|
||||
&& r.inactiveSince < cutoff)),
|
||||
create: vi.fn(async (data) => {
|
||||
counter += 1;
|
||||
const row = makeAgent({
|
||||
id: `agent-${String(counter)}`,
|
||||
name: data.name,
|
||||
description: data.description ?? '',
|
||||
systemPrompt: data.systemPrompt ?? '',
|
||||
llmId: data.llmId,
|
||||
projectId: data.projectId ?? null,
|
||||
kind: data.kind ?? 'public',
|
||||
providerSessionId: data.providerSessionId ?? null,
|
||||
status: data.status ?? 'active',
|
||||
lastHeartbeatAt: data.lastHeartbeatAt ?? null,
|
||||
inactiveSince: data.inactiveSince ?? null,
|
||||
ownerId: data.ownerId,
|
||||
});
|
||||
rows.set(row.id, row);
|
||||
return row;
|
||||
}),
|
||||
update: vi.fn(async (id, data) => {
|
||||
const existing = rows.get(id);
|
||||
if (!existing) throw new Error('not found');
|
||||
const next: Agent = {
|
||||
...existing,
|
||||
...(data.description !== undefined ? { description: data.description } : {}),
|
||||
...(data.systemPrompt !== undefined ? { systemPrompt: data.systemPrompt } : {}),
|
||||
...(data.llmId !== undefined ? { llmId: data.llmId } : {}),
|
||||
...(data.projectId !== undefined ? { projectId: data.projectId } : {}),
|
||||
...(data.kind !== undefined ? { kind: data.kind } : {}),
|
||||
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
|
||||
...(data.status !== undefined ? { status: data.status } : {}),
|
||||
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
|
||||
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
|
||||
};
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
delete: vi.fn(async (id: string) => { rows.delete(id); }),
|
||||
};
|
||||
}
|
||||
|
||||
function mockLlms(): LlmService {
|
||||
return {
|
||||
getById: vi.fn(async (id: string) => ({ id, name: 'vllm-local', type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
|
||||
getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
|
||||
} as unknown as LlmService;
|
||||
}
|
||||
|
||||
function mockProjects(): ProjectService {
|
||||
return {
|
||||
getById: vi.fn(async (id: string) => ({ id, name: 'mcpctl-dev' })),
|
||||
resolveAndGet: vi.fn(async (idOrName: string) => ({
|
||||
id: idOrName === 'mcpctl-dev' ? 'proj-1' : 'proj-other',
|
||||
name: idOrName,
|
||||
})),
|
||||
} as unknown as ProjectService;
|
||||
}
|
||||
|
||||
describe('AgentService — virtual-agent lifecycle (v3 Stage 2)', () => {
|
||||
it('registerVirtualAgents inserts new rows with kind=virtual / status=active', async () => {
|
||||
const repo = mockAgentRepo();
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
const inputs: VirtualAgentInput[] = [
|
||||
{ name: 'local-coder', llmName: 'vllm-local', description: 'd', systemPrompt: 's' },
|
||||
];
|
||||
const out = await svc.registerVirtualAgents('sess-1', inputs, 'owner-1');
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0]!.kind).toBe('virtual');
|
||||
expect(out[0]!.status).toBe('active');
|
||||
});
|
||||
|
||||
it('registerVirtualAgents reuses an existing row from the same session (sticky reconnect)', async () => {
|
||||
const existing = makeAgent({ name: 'local-coder', providerSessionId: 'sess-1', status: 'inactive', inactiveSince: NOW });
|
||||
const repo = mockAgentRepo([existing]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
const out = await svc.registerVirtualAgents(
|
||||
'sess-1',
|
||||
[{ name: 'local-coder', llmName: 'vllm-local' }],
|
||||
'owner-1',
|
||||
);
|
||||
expect(out[0]!.id).toBe(existing.id);
|
||||
expect(out[0]!.status).toBe('active');
|
||||
});
|
||||
|
||||
it('registerVirtualAgents adopts an inactive virtual from a different session', async () => {
|
||||
const existing = makeAgent({
|
||||
name: 'local-coder', providerSessionId: 'old-session',
|
||||
status: 'inactive', inactiveSince: NOW,
|
||||
});
|
||||
const repo = mockAgentRepo([existing]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
const out = await svc.registerVirtualAgents(
|
||||
'new-session',
|
||||
[{ name: 'local-coder', llmName: 'vllm-local' }],
|
||||
'owner-1',
|
||||
);
|
||||
expect(out[0]!.id).toBe(existing.id);
|
||||
expect(out[0]!.status).toBe('active');
|
||||
});
|
||||
|
||||
it('registerVirtualAgents refuses to overwrite a public agent (409)', async () => {
|
||||
const repo = mockAgentRepo([makeAgent({ name: 'reviewer', kind: 'public', providerSessionId: null })]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
await expect(svc.registerVirtualAgents(
|
||||
'sess-x',
|
||||
[{ name: 'reviewer', llmName: 'vllm-local' }],
|
||||
'owner-1',
|
||||
)).rejects.toThrow(/Cannot publish over public Agent/);
|
||||
});
|
||||
|
||||
it('registerVirtualAgents refuses if another active session owns the name', async () => {
|
||||
const repo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'other', status: 'active' })]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
await expect(svc.registerVirtualAgents(
|
||||
'mine',
|
||||
[{ name: 'local-coder', llmName: 'vllm-local' }],
|
||||
'owner-1',
|
||||
)).rejects.toThrow(/already active under a different session/);
|
||||
});
|
||||
|
||||
it('heartbeatVirtualAgents bumps + revives inactive', async () => {
|
||||
const past = new Date(Date.now() - 5_000);
|
||||
const a = makeAgent({ name: 'a', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past });
|
||||
const repo = mockAgentRepo([a]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
await svc.heartbeatVirtualAgents('sess');
|
||||
const row = await repo.findByName('a');
|
||||
expect(row?.status).toBe('active');
|
||||
expect(row?.inactiveSince).toBeNull();
|
||||
expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
|
||||
});
|
||||
|
||||
it('markVirtualAgentsInactiveBySession flips owned actives to inactive', async () => {
|
||||
const repo = mockAgentRepo([
|
||||
makeAgent({ name: 'a', providerSessionId: 'sess' }),
|
||||
makeAgent({ name: 'b', providerSessionId: 'sess' }),
|
||||
makeAgent({ name: 'c', providerSessionId: 'other' }),
|
||||
]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
await svc.markVirtualAgentsInactiveBySession('sess');
|
||||
expect((await repo.findByName('a'))?.status).toBe('inactive');
|
||||
expect((await repo.findByName('b'))?.status).toBe('inactive');
|
||||
expect((await repo.findByName('c'))?.status).toBe('active');
|
||||
});
|
||||
|
||||
it('deleteVirtualAgentsForLlm deletes only virtuals pinned to that Llm', async () => {
|
||||
const repo = mockAgentRepo([
|
||||
makeAgent({ name: 'v-1', llmId: 'doomed', kind: 'virtual' }),
|
||||
makeAgent({ name: 'v-2', llmId: 'doomed', kind: 'virtual' }),
|
||||
makeAgent({ name: 'pub-1', llmId: 'doomed', kind: 'public', providerSessionId: null }),
|
||||
makeAgent({ name: 'v-other', llmId: 'safe', kind: 'virtual' }),
|
||||
]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
const deleted = await svc.deleteVirtualAgentsForLlm('doomed');
|
||||
expect(deleted).toBe(2);
|
||||
expect(await repo.findByName('v-1')).toBeNull();
|
||||
expect(await repo.findByName('v-2')).toBeNull();
|
||||
expect(await repo.findByName('pub-1')).not.toBeNull();
|
||||
expect(await repo.findByName('v-other')).not.toBeNull();
|
||||
});
|
||||
|
||||
it('gcSweepVirtualAgents flips heartbeat-stale + deletes 4h-old inactive', async () => {
|
||||
const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago, past 90s cutoff
|
||||
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago, past 4h cutoff
|
||||
const repo = mockAgentRepo([
|
||||
makeAgent({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }),
|
||||
makeAgent({ name: 'old', providerSessionId: 'b', status: 'inactive', inactiveSince: ancient }),
|
||||
makeAgent({ name: 'pub', providerSessionId: null, kind: 'public' }),
|
||||
]);
|
||||
const svc = new AgentService(repo, mockLlms(), mockProjects());
|
||||
const r = await svc.gcSweepVirtualAgents();
|
||||
expect(r.markedInactive).toBe(1);
|
||||
expect(r.deleted).toBe(1);
|
||||
expect((await repo.findByName('stale'))?.status).toBe('inactive');
|
||||
expect(await repo.findByName('old')).toBeNull();
|
||||
expect(await repo.findByName('pub')).not.toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('VirtualLlmService cascade through AgentService (v3 Stage 2)', () => {
|
||||
function mockLlmRepo(initial: Llm[] = []): ILlmRepository {
|
||||
const rows = new Map<string, Llm>(initial.map((r) => [r.id, r]));
|
||||
let counter = rows.size;
|
||||
return {
|
||||
findAll: vi.fn(async () => [...rows.values()]),
|
||||
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
|
||||
findByName: vi.fn(async (name: string) => {
|
||||
for (const r of rows.values()) if (r.name === name) return r;
|
||||
return null;
|
||||
}),
|
||||
findByTier: vi.fn(async () => []),
|
||||
findBySessionId: vi.fn(async (sid: string) =>
|
||||
[...rows.values()].filter((r) => r.providerSessionId === sid)),
|
||||
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
|
||||
[...rows.values()].filter((r) =>
|
||||
r.kind === 'virtual'
|
||||
&& r.status === 'active'
|
||||
&& r.lastHeartbeatAt !== null
|
||||
&& r.lastHeartbeatAt < cutoff)),
|
||||
findExpiredInactives: vi.fn(async (cutoff: Date) =>
|
||||
[...rows.values()].filter((r) =>
|
||||
r.kind === 'virtual'
|
||||
&& r.status === 'inactive'
|
||||
&& r.inactiveSince !== null
|
||||
&& r.inactiveSince < cutoff)),
|
||||
create: vi.fn(async (data) => {
|
||||
counter += 1;
|
||||
const row = makeLlm({ id: `llm-${String(counter)}`, name: data.name, type: data.type });
|
||||
rows.set(row.id, row);
|
||||
return row;
|
||||
}),
|
||||
update: vi.fn(async (id, data) => {
|
||||
const existing = rows.get(id);
|
||||
if (!existing) throw new Error('not found');
|
||||
const next: Llm = { ...existing, ...data } as Llm;
|
||||
rows.set(id, next);
|
||||
return next;
|
||||
}),
|
||||
delete: vi.fn(async (id: string) => { rows.delete(id); }),
|
||||
};
|
||||
}
|
||||
|
||||
it('unbindSession cascades to mark virtual agents inactive', async () => {
|
||||
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
|
||||
const agentRepo = mockAgentRepo([
|
||||
makeAgent({ name: 'local-coder', providerSessionId: 'sess' }),
|
||||
]);
|
||||
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
|
||||
const svc = new VirtualLlmService(llmRepo, agents);
|
||||
await svc.unbindSession('sess');
|
||||
expect((await agentRepo.findByName('local-coder'))?.status).toBe('inactive');
|
||||
});
|
||||
|
||||
it('gcSweep deletes virtual agents BEFORE their pinned virtual Llm', async () => {
|
||||
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
|
||||
const llmRepo = mockLlmRepo([makeLlm({
|
||||
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
|
||||
status: 'inactive', inactiveSince: ancient,
|
||||
})]);
|
||||
const agentRepo = mockAgentRepo([
|
||||
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: ancient }),
|
||||
]);
|
||||
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
|
||||
const svc = new VirtualLlmService(llmRepo, agents);
|
||||
const r = await svc.gcSweep();
|
||||
expect(r.deleted).toBeGreaterThanOrEqual(2); // 1 agent + 1 llm
|
||||
expect(await llmRepo.findByName('vllm-local')).toBeNull();
|
||||
expect(await agentRepo.findByName('pinned')).toBeNull();
|
||||
});
|
||||
|
||||
it('gcSweep defensive cascade: still drops the agent when its heartbeat lagged the Llm', async () => {
|
||||
// The Llm is past the 4h cutoff. The agent is inactive but only
|
||||
// 1h old — wouldn't be GC'd by gcSweepVirtualAgents on its own.
|
||||
// The defensive cascade in gcSweep deletes it anyway because the
|
||||
// Restrict FK would otherwise block the Llm delete.
|
||||
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
|
||||
const recent = new Date(Date.now() - 1 * 60 * 60 * 1000);
|
||||
const llmRepo = mockLlmRepo([makeLlm({
|
||||
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
|
||||
status: 'inactive', inactiveSince: ancient,
|
||||
})]);
|
||||
const agentRepo = mockAgentRepo([
|
||||
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: recent }),
|
||||
]);
|
||||
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
|
||||
const svc = new VirtualLlmService(llmRepo, agents);
|
||||
await svc.gcSweep();
|
||||
expect(await llmRepo.findByName('vllm-local')).toBeNull();
|
||||
expect(await agentRepo.findByName('pinned')).toBeNull();
|
||||
});
|
||||
|
||||
it('heartbeat cascades to bump owned virtual agents', async () => {
|
||||
const past = new Date(Date.now() - 10_000);
|
||||
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', lastHeartbeatAt: past })]);
|
||||
const agentRepo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'sess', lastHeartbeatAt: past })]);
|
||||
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
|
||||
const svc = new VirtualLlmService(llmRepo, agents);
|
||||
await svc.heartbeat('sess');
|
||||
const a = await agentRepo.findByName('local-coder');
|
||||
expect(a!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
|
||||
});
|
||||
});
|
||||
@@ -332,6 +332,108 @@ describe('VirtualLlmService', () => {
|
||||
expect(await repo.findByName('public-survivor')).not.toBeNull();
|
||||
});
|
||||
|
||||
// ── v2: wake-before-infer ──
|
||||
|
||||
it('hibernating: dispatches a wake task first and waits for it to complete before infer', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
|
||||
// Kick off enqueueInferTask. It blocks on the wake task.
|
||||
const inferPromise = svc.enqueueInferTask(
|
||||
'sleeping',
|
||||
{ model: 'm', messages: [{ role: 'user', content: 'hi' }] },
|
||||
false,
|
||||
);
|
||||
|
||||
// Wait a tick so the wake task gets pushed.
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
expect(session.tasks).toHaveLength(1);
|
||||
const wakeTask = session.tasks[0] as { kind: string; taskId: string; llmName: string };
|
||||
expect(wakeTask.kind).toBe('wake');
|
||||
expect(wakeTask.llmName).toBe('sleeping');
|
||||
|
||||
// Resolve the wake task — service flips the row to active, then
|
||||
// pushes the infer task on the same session.
|
||||
expect(svc.completeTask(wakeTask.taskId, { status: 200, body: { ok: true } })).toBe(true);
|
||||
const ref = await inferPromise;
|
||||
expect(session.tasks).toHaveLength(2);
|
||||
const inferTask = session.tasks[1] as { kind: string; taskId: string };
|
||||
expect(inferTask.kind).toBe('infer');
|
||||
expect(inferTask.taskId).toBe(ref.taskId);
|
||||
|
||||
// The row should be active now — concurrent callers won't trigger another wake.
|
||||
const row = await repo.findByName('sleeping');
|
||||
expect(row?.status).toBe('active');
|
||||
});
|
||||
|
||||
it('hibernating: concurrent infer requests share a single wake task', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
const session = fakeSession();
|
||||
svc.bindSession('sess', session);
|
||||
|
||||
// Fire 3 concurrent infer requests against the same hibernating LLM.
|
||||
const reqs = [
|
||||
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||
];
|
||||
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
// Exactly one wake task pushed, regardless of concurrent infers.
|
||||
const wakeTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'wake');
|
||||
expect(wakeTasks).toHaveLength(1);
|
||||
|
||||
const wakeTaskId = (session.tasks[0] as { taskId: string }).taskId;
|
||||
expect(svc.completeTask(wakeTaskId, { status: 200, body: { ok: true } })).toBe(true);
|
||||
|
||||
const refs = await Promise.all(reqs);
|
||||
// After wake, all 3 infer tasks pushed — total session tasks = 1 wake + 3 infer.
|
||||
const inferTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'infer');
|
||||
expect(inferTasks).toHaveLength(3);
|
||||
expect(refs.map((r) => r.taskId).sort()).toEqual(refs.map((r) => r.taskId).sort());
|
||||
});
|
||||
|
||||
it('hibernating: rejects when the wake task fails', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'broken', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
|
||||
const inferPromise = svc.enqueueInferTask(
|
||||
'broken',
|
||||
{ model: 'm', messages: [] },
|
||||
false,
|
||||
);
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
|
||||
// Get the wake task id from the in-flight tasks map (its only entry).
|
||||
// We test the failure path via failTask which is part of the public
|
||||
// surface used by the result-POST route handler.
|
||||
const taskIds: string[] = [];
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
for (const id of (svc as any).tasksById.keys()) taskIds.push(id);
|
||||
expect(taskIds).toHaveLength(1);
|
||||
expect(svc.failTask(taskIds[0]!, new Error('wake recipe failed'))).toBe(true);
|
||||
|
||||
await expect(inferPromise).rejects.toThrow(/wake recipe failed/);
|
||||
|
||||
// Row stayed hibernating — the next request will get another wake try.
|
||||
const row = await repo.findByName('broken');
|
||||
expect(row?.status).toBe('hibernating');
|
||||
});
|
||||
|
||||
it('inactive: still rejects with 503 (publisher offline) — wake path only fires for hibernating', async () => {
|
||||
const repo = mockRepo([makeLlm({ name: 'gone', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]);
|
||||
const svc = new VirtualLlmService(repo);
|
||||
svc.bindSession('sess', fakeSession());
|
||||
|
||||
await expect(
|
||||
svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false),
|
||||
).rejects.toThrow(/inactive|publisher offline/);
|
||||
});
|
||||
|
||||
it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => {
|
||||
const long = new Date(Date.now() - 5 * 60 * 1000);
|
||||
const repo = mockRepo([
|
||||
|
||||
@@ -80,8 +80,25 @@ export interface LlmProviderFileEntry {
|
||||
* Default: false — existing setups don't change behavior.
|
||||
*/
|
||||
publish?: boolean;
|
||||
/**
|
||||
* Optional wake recipe for hibernating backends. When set, a provider
|
||||
* whose `isAvailable()` returns false at registrar start time is
|
||||
* published as `status=hibernating`. The next inference request that
|
||||
* lands on the row triggers this recipe; mcplocal polls
|
||||
* `isAvailable()` until it returns true (or times out) and then flips
|
||||
* the row to active so mcpd can dispatch the queued inference.
|
||||
*
|
||||
* Two recipe types:
|
||||
* - `http`: POST to a URL (e.g. an external sleep/wake controller)
|
||||
* - `command`: spawn a shell command (e.g. `systemctl --user start vllm`)
|
||||
*/
|
||||
wake?: WakeRecipe;
|
||||
}
|
||||
|
||||
export type WakeRecipe =
|
||||
| { type: 'http'; url: string; method?: 'GET' | 'POST'; headers?: Record<string, string>; body?: string; maxWaitSeconds?: number }
|
||||
| { type: 'command'; command: string; args?: string[]; maxWaitSeconds?: number };
|
||||
|
||||
export interface ProjectLlmOverride {
|
||||
model?: string;
|
||||
provider?: string;
|
||||
@@ -91,8 +108,27 @@ interface LlmMultiFileConfig {
|
||||
providers: LlmProviderFileEntry[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Local agent declaration (v3). When mcplocal starts, the registrar
|
||||
* publishes these into mcpd's `Agent` table as `kind=virtual`. They show
|
||||
* up under `mcpctl get agent` and become chat-able via `mcpctl chat <name>`.
|
||||
*
|
||||
* `llm` references a published provider's name from the `llm.providers`
|
||||
* array — the registrar resolves it server-side.
|
||||
*/
|
||||
export interface AgentFileEntry {
|
||||
name: string;
|
||||
llm: string;
|
||||
description?: string;
|
||||
systemPrompt?: string;
|
||||
project?: string;
|
||||
defaultParams?: Record<string, unknown>;
|
||||
extras?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
interface McpctlConfig {
|
||||
llm?: LlmFileConfig | LlmMultiFileConfig;
|
||||
agents?: AgentFileEntry[];
|
||||
projects?: Record<string, { llm?: ProjectLlmOverride }>;
|
||||
}
|
||||
|
||||
@@ -173,6 +209,15 @@ export function loadProjectLlmOverride(projectName: string): ProjectLlmOverride
|
||||
return config.projects?.[projectName]?.llm;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load locally-declared agents from ~/.mcpctl/config.json (v3 virtual
|
||||
* agents). Returns empty array if no agents block is configured.
|
||||
*/
|
||||
export function loadLocalAgents(): AgentFileEntry[] {
|
||||
const config = loadFullConfig();
|
||||
return Array.isArray(config.agents) ? config.agents : [];
|
||||
}
|
||||
|
||||
/** Reset cached config (for testing). */
|
||||
export function resetConfigCache(): void {
|
||||
cachedConfig = null;
|
||||
|
||||
@@ -7,12 +7,12 @@ import { StdioProxyServer } from './server.js';
|
||||
import { StdioUpstream } from './upstream/stdio.js';
|
||||
import { HttpUpstream } from './upstream/http.js';
|
||||
import { createHttpServer } from './http/server.js';
|
||||
import { loadHttpConfig, loadLlmProviders } from './http/config.js';
|
||||
import type { HttpConfig, LlmProviderFileEntry } from './http/config.js';
|
||||
import { loadHttpConfig, loadLlmProviders, loadLocalAgents } from './http/config.js';
|
||||
import type { HttpConfig, LlmProviderFileEntry, AgentFileEntry } from './http/config.js';
|
||||
import { createProvidersFromConfig } from './llm-config.js';
|
||||
import { createSecretStore } from '@mcpctl/shared';
|
||||
import type { ProviderRegistry } from './providers/registry.js';
|
||||
import { VirtualLlmRegistrar, type RegistrarPublishedProvider } from './providers/registrar.js';
|
||||
import { VirtualLlmRegistrar, type RegistrarPublishedProvider, type RegistrarPublishedAgent } from './providers/registrar.js';
|
||||
import { startWatchers, stopWatchers, reloadStages } from './proxymodel/watcher.js';
|
||||
import { existsSync, readFileSync as readFileSyncNs } from 'node:fs';
|
||||
import { homedir } from 'node:os';
|
||||
@@ -151,7 +151,8 @@ export async function main(argv: string[] = process.argv): Promise<MainResult> {
|
||||
// Virtual-LLM registrar: publish opted-in providers (`publish: true`)
|
||||
// into mcpd's Llm registry. Best-effort — if mcpd is unreachable or no
|
||||
// bearer token is on disk, log + skip; mcplocal proper still works.
|
||||
const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries);
|
||||
const localAgents = loadLocalAgents();
|
||||
const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries, localAgents);
|
||||
|
||||
// Graceful shutdown
|
||||
let shuttingDown = false;
|
||||
@@ -198,9 +199,10 @@ if (isMain) {
|
||||
async function maybeStartVirtualLlmRegistrar(
|
||||
providerRegistry: ProviderRegistry,
|
||||
llmEntries: LlmProviderFileEntry[],
|
||||
localAgents: AgentFileEntry[] = [],
|
||||
): Promise<VirtualLlmRegistrar | null> {
|
||||
const opted = llmEntries.filter((e) => e.publish === true);
|
||||
if (opted.length === 0) return null;
|
||||
if (opted.length === 0 && localAgents.length === 0) return null;
|
||||
|
||||
const published: RegistrarPublishedProvider[] = [];
|
||||
for (const entry of opted) {
|
||||
@@ -215,9 +217,32 @@ async function maybeStartVirtualLlmRegistrar(
|
||||
model: entry.model ?? entry.name,
|
||||
};
|
||||
if (entry.tier !== undefined) item.tier = entry.tier;
|
||||
if (entry.wake !== undefined) item.wake = entry.wake;
|
||||
published.push(item);
|
||||
}
|
||||
if (published.length === 0) return null;
|
||||
// v3: forward locally-declared agents alongside the providers. We
|
||||
// only forward agents whose `llm` field points at a name we're
|
||||
// actually publishing (or pre-declared). Stale entries are dropped
|
||||
// with a warning rather than failing the whole registration.
|
||||
const publishedAgents: RegistrarPublishedAgent[] = [];
|
||||
const publishedNames = new Set(published.map((p) => p.provider.name));
|
||||
for (const a of localAgents) {
|
||||
if (!publishedNames.has(a.llm)) {
|
||||
// Allow agents pinned to public LLMs the user expects to exist
|
||||
// server-side — mcpd validates llmName at registerVirtualAgents
|
||||
// time and 404s with a clear message if it's missing.
|
||||
// We don't drop these client-side; just note it.
|
||||
}
|
||||
const item: RegistrarPublishedAgent = { name: a.name, llmName: a.llm };
|
||||
if (a.description !== undefined) item.description = a.description;
|
||||
if (a.systemPrompt !== undefined) item.systemPrompt = a.systemPrompt;
|
||||
if (a.project !== undefined) item.project = a.project;
|
||||
if (a.defaultParams !== undefined) item.defaultParams = a.defaultParams;
|
||||
if (a.extras !== undefined) item.extras = a.extras;
|
||||
publishedAgents.push(item);
|
||||
}
|
||||
|
||||
if (published.length === 0 && publishedAgents.length === 0) return null;
|
||||
|
||||
// Resolve mcpd URL + bearer. Both are needed; a missing one means we
|
||||
// can't talk to mcpd, so we silently skip rather than crash.
|
||||
@@ -245,6 +270,7 @@ async function maybeStartVirtualLlmRegistrar(
|
||||
mcpdUrl,
|
||||
token,
|
||||
publishedProviders: published,
|
||||
...(publishedAgents.length > 0 ? { publishedAgents } : {}),
|
||||
sessionFilePath: join(homedir(), '.mcpctl', 'provider-session'),
|
||||
log: {
|
||||
info: (msg) => process.stderr.write(`${msg}\n`),
|
||||
|
||||
@@ -27,7 +27,9 @@ import http from 'node:http';
|
||||
import https from 'node:https';
|
||||
import { promises as fs } from 'node:fs';
|
||||
import { dirname } from 'node:path';
|
||||
import { spawn } from 'node:child_process';
|
||||
import type { LlmProvider, CompletionOptions } from './types.js';
|
||||
import type { WakeRecipe } from '../http/config.js';
|
||||
|
||||
export interface RegistrarLogger {
|
||||
info: (msg: string) => void;
|
||||
@@ -45,12 +47,37 @@ export interface RegistrarPublishedProvider {
|
||||
tier?: 'fast' | 'heavy';
|
||||
/** Optional human-readable description for `mcpctl get llm`. */
|
||||
description?: string;
|
||||
/**
|
||||
* Optional wake recipe for backends that hibernate. When provided AND
|
||||
* `provider.isAvailable()` returns false at registrar start, the row is
|
||||
* published with status=hibernating; on the first incoming `wake` task
|
||||
* the registrar runs this recipe and waits for the backend to come up.
|
||||
*/
|
||||
wake?: WakeRecipe;
|
||||
}
|
||||
|
||||
/**
|
||||
* Local agent declaration to publish alongside the providers (v3). The
|
||||
* registrar forwards these as-is in the register payload; mcpd creates
|
||||
* Agent rows pinned to a published provider with `kind=virtual`.
|
||||
*/
|
||||
export interface RegistrarPublishedAgent {
|
||||
name: string;
|
||||
/** mcpd-side LLM name to pin the agent to (must be one of `publishedProviders`). */
|
||||
llmName: string;
|
||||
description?: string;
|
||||
systemPrompt?: string;
|
||||
project?: string;
|
||||
defaultParams?: Record<string, unknown>;
|
||||
extras?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface RegistrarOptions {
|
||||
mcpdUrl: string;
|
||||
token: string;
|
||||
publishedProviders: RegistrarPublishedProvider[];
|
||||
/** Optional v3 — local agents to publish alongside the providers. */
|
||||
publishedAgents?: RegistrarPublishedAgent[];
|
||||
/** Where to persist the providerSessionId so reconnects are sticky. */
|
||||
sessionFilePath: string;
|
||||
log: RegistrarLogger;
|
||||
@@ -140,16 +167,43 @@ export class VirtualLlmRegistrar {
|
||||
}
|
||||
|
||||
private async register(): Promise<void> {
|
||||
const body: Record<string, unknown> = {
|
||||
providers: this.opts.publishedProviders.map((p) => ({
|
||||
// Decide initial status per provider. A provider with a wake recipe
|
||||
// that's NOT currently available comes up as hibernating; otherwise
|
||||
// active (today's behavior). isAvailable() is forgiving — any
|
||||
// unexpected throw is treated as "not available" so a transient
|
||||
// network blip during boot doesn't crash the registrar.
|
||||
const providers = await Promise.all(this.opts.publishedProviders.map(async (p) => {
|
||||
let initialStatus: 'active' | 'hibernating' = 'active';
|
||||
if (p.wake !== undefined) {
|
||||
let alive = false;
|
||||
try { alive = await p.provider.isAvailable(); } catch { alive = false; }
|
||||
if (!alive) initialStatus = 'hibernating';
|
||||
}
|
||||
return {
|
||||
name: p.provider.name,
|
||||
type: p.type,
|
||||
model: p.model,
|
||||
...(p.tier !== undefined ? { tier: p.tier } : {}),
|
||||
...(p.description !== undefined ? { description: p.description } : {}),
|
||||
})),
|
||||
};
|
||||
initialStatus,
|
||||
};
|
||||
}));
|
||||
const body: Record<string, unknown> = { providers };
|
||||
if (this.sessionId !== null) body['providerSessionId'] = this.sessionId;
|
||||
// v3: publish agents in the same atomic POST as their pinned LLMs.
|
||||
// Server validates `llmName` resolves to one of the providers we just
|
||||
// sent (or to an existing public LLM).
|
||||
if (this.opts.publishedAgents !== undefined && this.opts.publishedAgents.length > 0) {
|
||||
body['agents'] = this.opts.publishedAgents.map((a) => ({
|
||||
name: a.name,
|
||||
llmName: a.llmName,
|
||||
...(a.description !== undefined ? { description: a.description } : {}),
|
||||
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
|
||||
...(a.project !== undefined ? { project: a.project } : {}),
|
||||
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
|
||||
...(a.extras !== undefined ? { extras: a.extras } : {}),
|
||||
}));
|
||||
}
|
||||
|
||||
const res = await postJson(
|
||||
this.urlFor('/api/v1/llms/_provider-register'),
|
||||
@@ -276,9 +330,51 @@ export class VirtualLlmRegistrar {
|
||||
void this.handleInferTask(task);
|
||||
return;
|
||||
}
|
||||
// Wake tasks are reserved for v2 — acknowledge with an error so mcpd
|
||||
// surfaces a clean failure rather than waiting forever.
|
||||
void this.postResult(task.taskId, { error: 'wake task type not implemented in this client (v2)' });
|
||||
if (task.kind === 'wake') {
|
||||
void this.handleWakeTask(task);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the configured wake recipe and poll the provider until it comes
|
||||
* up. Sends a `{ status: 200, body: { ok: true } }` result on success;
|
||||
* `{ error }` on timeout or recipe failure. While waiting, also bumps
|
||||
* the heartbeat so mcpd's GC sweep doesn't decide we're stale mid-wake.
|
||||
*/
|
||||
private async handleWakeTask(task: { kind: 'wake'; taskId: string; llmName: string }): Promise<void> {
|
||||
const published = this.opts.publishedProviders.find((p) => p.provider.name === task.llmName);
|
||||
if (published === undefined) {
|
||||
await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` });
|
||||
return;
|
||||
}
|
||||
if (published.wake === undefined) {
|
||||
await this.postResult(task.taskId, { error: `provider '${task.llmName}' has no wake recipe configured` });
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await runWakeRecipe(published.wake);
|
||||
// Poll isAvailable() until it comes up (or timeout). Heartbeat
|
||||
// every poll tick so mcpd doesn't time us out while we're waiting
|
||||
// on a slow boot.
|
||||
const maxWaitMs = (published.wake.maxWaitSeconds ?? 60) * 1000;
|
||||
const started = Date.now();
|
||||
while (Date.now() - started < maxWaitMs) {
|
||||
let alive = false;
|
||||
try { alive = await published.provider.isAvailable(); } catch { alive = false; }
|
||||
if (alive) {
|
||||
await this.heartbeatOnce();
|
||||
await this.postResult(task.taskId, { status: 200, body: { ok: true, ms: Date.now() - started } });
|
||||
return;
|
||||
}
|
||||
await this.heartbeatOnce();
|
||||
await new Promise((r) => setTimeout(r, 1500));
|
||||
}
|
||||
await this.postResult(task.taskId, { error: `provider '${task.llmName}' did not come up within ${String(maxWaitMs)}ms` });
|
||||
} catch (err) {
|
||||
await this.postResult(task.taskId, { error: `wake recipe failed: ${(err as Error).message}` });
|
||||
}
|
||||
}
|
||||
|
||||
private async handleInferTask(task: InferTask): Promise<void> {
|
||||
@@ -373,6 +469,68 @@ function openAiStreamChunk(
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a wake recipe. Returns when the recipe completes; throws if it
|
||||
* fails. Doesn't itself poll for provider readiness — that's the caller's
|
||||
* job (handleWakeTask polls isAvailable() with its own timeout).
|
||||
*
|
||||
* `http`: fires the configured request and considers any 2xx a success.
|
||||
* The remote service is expected to be a "wake controller" that returns
|
||||
* quickly; if the underlying boot takes minutes, the controller should
|
||||
* return 202 and the readiness poll catches up.
|
||||
*
|
||||
* `command`: spawns the binary with args, waits for exit. Non-zero exit
|
||||
* is treated as failure. stdout/stderr are discarded — the recipe's job
|
||||
* is to *trigger* a wake, not to produce output.
|
||||
*/
|
||||
async function runWakeRecipe(recipe: WakeRecipe): Promise<void> {
|
||||
if (recipe.type === 'http') {
|
||||
const u = new URL(recipe.url);
|
||||
const driver = u.protocol === 'https:' ? https : http;
|
||||
const method = recipe.method ?? 'POST';
|
||||
const headers: Record<string, string> = { ...(recipe.headers ?? {}) };
|
||||
const body = recipe.body;
|
||||
if (body !== undefined) {
|
||||
headers['Content-Length'] = String(Buffer.byteLength(body));
|
||||
}
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const req = driver.request({
|
||||
hostname: u.hostname,
|
||||
port: u.port || (u.protocol === 'https:' ? 443 : 80),
|
||||
path: u.pathname + u.search,
|
||||
method,
|
||||
headers,
|
||||
timeout: 30_000,
|
||||
}, (res) => {
|
||||
const status = res.statusCode ?? 0;
|
||||
// Drain so the socket can be reused/freed.
|
||||
res.resume();
|
||||
if (status >= 200 && status < 300) resolve();
|
||||
else reject(new Error(`wake HTTP returned ${String(status)}`));
|
||||
});
|
||||
req.on('error', reject);
|
||||
req.on('timeout', () => { req.destroy(); reject(new Error('wake HTTP timed out')); });
|
||||
if (body !== undefined) req.write(body);
|
||||
req.end();
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (recipe.type === 'command') {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const child = spawn(recipe.command, recipe.args ?? [], {
|
||||
stdio: 'ignore',
|
||||
});
|
||||
child.on('error', reject);
|
||||
child.on('exit', (code) => {
|
||||
if (code === 0) resolve();
|
||||
else reject(new Error(`wake command exited with code ${String(code)}`));
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
throw new Error(`unknown wake recipe type`);
|
||||
}
|
||||
|
||||
interface PostResponse { statusCode: number; body: string }
|
||||
|
||||
/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */
|
||||
|
||||
@@ -142,13 +142,17 @@ describe('VirtualLlmRegistrar', () => {
|
||||
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||
expect(registerCall).toBeDefined();
|
||||
expect(registerCall!.method).toBe('POST');
|
||||
const body = JSON.parse(registerCall!.body) as { providers: Array<{ name: string; type: string; model: string; tier: string }> };
|
||||
expect(body.providers).toEqual([{
|
||||
const body = JSON.parse(registerCall!.body) as { providers: Array<Record<string, unknown>> };
|
||||
expect(body.providers).toHaveLength(1);
|
||||
expect(body.providers[0]).toMatchObject({
|
||||
name: 'vllm-local',
|
||||
type: 'openai',
|
||||
model: 'qwen',
|
||||
tier: 'fast',
|
||||
}]);
|
||||
// v2 always sends initialStatus; defaults to 'active' when no
|
||||
// wake recipe is configured.
|
||||
initialStatus: 'active',
|
||||
});
|
||||
expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc');
|
||||
|
||||
// Sticky session id persisted.
|
||||
@@ -219,6 +223,107 @@ describe('VirtualLlmRegistrar', () => {
|
||||
}
|
||||
});
|
||||
|
||||
// ── v2: hibernating + wake recipe ──
|
||||
|
||||
it('publishes initialStatus=hibernating when provider is unavailable AND wake is configured', async () => {
|
||||
const fake = await startFakeServer();
|
||||
try {
|
||||
const sleeping: LlmProvider = {
|
||||
name: 'vllm-local',
|
||||
async complete() { throw new Error('not running'); },
|
||||
async listModels() { return []; },
|
||||
async isAvailable() { return false; },
|
||||
};
|
||||
const registrar = new VirtualLlmRegistrar({
|
||||
mcpdUrl: fake.url,
|
||||
token: 't',
|
||||
publishedProviders: [{
|
||||
provider: sleeping,
|
||||
type: 'openai',
|
||||
model: 'm',
|
||||
wake: { type: 'http', url: 'http://localhost:9999/wake', maxWaitSeconds: 1 },
|
||||
}],
|
||||
sessionFilePath: join(tempDir, 'provider-session'),
|
||||
log: silentLog(),
|
||||
heartbeatIntervalMs: 60_000,
|
||||
});
|
||||
await registrar.start();
|
||||
await new Promise((r) => setTimeout(r, 20));
|
||||
|
||||
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
|
||||
expect(body.providers[0]!.initialStatus).toBe('hibernating');
|
||||
registrar.stop();
|
||||
} finally {
|
||||
await fake.close();
|
||||
}
|
||||
});
|
||||
|
||||
it('publishes initialStatus=active when provider is available even with a wake recipe', async () => {
|
||||
const fake = await startFakeServer();
|
||||
try {
|
||||
const awake: LlmProvider = {
|
||||
name: 'vllm-local',
|
||||
async complete() { throw new Error('not used'); },
|
||||
async listModels() { return []; },
|
||||
async isAvailable() { return true; },
|
||||
};
|
||||
const registrar = new VirtualLlmRegistrar({
|
||||
mcpdUrl: fake.url,
|
||||
token: 't',
|
||||
publishedProviders: [{
|
||||
provider: awake,
|
||||
type: 'openai',
|
||||
model: 'm',
|
||||
wake: { type: 'http', url: 'http://localhost:9999/wake' },
|
||||
}],
|
||||
sessionFilePath: join(tempDir, 'provider-session'),
|
||||
log: silentLog(),
|
||||
heartbeatIntervalMs: 60_000,
|
||||
});
|
||||
await registrar.start();
|
||||
await new Promise((r) => setTimeout(r, 20));
|
||||
|
||||
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
|
||||
expect(body.providers[0]!.initialStatus).toBe('active');
|
||||
registrar.stop();
|
||||
} finally {
|
||||
await fake.close();
|
||||
}
|
||||
});
|
||||
|
||||
it('publishes initialStatus=active when no wake recipe is configured (legacy path)', async () => {
|
||||
const fake = await startFakeServer();
|
||||
try {
|
||||
// Provider intentionally returns false but has no wake recipe →
|
||||
// legacy v1 publishers don't get hibernation behavior.
|
||||
const sleeping: LlmProvider = {
|
||||
name: 'vllm-local',
|
||||
async complete() { return { content: '', toolCalls: [], usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0 }, finishReason: 'stop' }; },
|
||||
async listModels() { return []; },
|
||||
async isAvailable() { return false; },
|
||||
};
|
||||
const registrar = new VirtualLlmRegistrar({
|
||||
mcpdUrl: fake.url,
|
||||
token: 't',
|
||||
publishedProviders: [{ provider: sleeping, type: 'openai', model: 'm' }],
|
||||
sessionFilePath: join(tempDir, 'provider-session'),
|
||||
log: silentLog(),
|
||||
heartbeatIntervalMs: 60_000,
|
||||
});
|
||||
await registrar.start();
|
||||
await new Promise((r) => setTimeout(r, 20));
|
||||
|
||||
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
|
||||
expect(body.providers[0]!.initialStatus).toBe('active');
|
||||
registrar.stop();
|
||||
} finally {
|
||||
await fake.close();
|
||||
}
|
||||
});
|
||||
|
||||
it('throws when mcpd returns non-201 from /_provider-register', async () => {
|
||||
const fake = await startFakeServer();
|
||||
fake.handler = (_req, res, _body) => {
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
|
||||
import http from 'node:http';
|
||||
import https from 'node:https';
|
||||
import { execSync } from 'node:child_process';
|
||||
import { spawnSync, execSync } from 'node:child_process';
|
||||
|
||||
const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu';
|
||||
const LLM_URL = process.env.MCPCTL_SMOKE_LLM_URL;
|
||||
@@ -31,21 +31,37 @@ const AGENT_NAME = `smoke-chat-agent-${SUFFIX}`;
|
||||
interface CliResult { code: number; stdout: string; stderr: string }
|
||||
|
||||
function run(args: string): CliResult {
|
||||
try {
|
||||
const stdout = execSync(`mcpctl --direct ${args}`, {
|
||||
encoding: 'utf-8',
|
||||
timeout: 60_000,
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
return { code: 0, stdout: stdout.trim(), stderr: '' };
|
||||
} catch (err) {
|
||||
const e = err as { status?: number; stdout?: Buffer | string; stderr?: Buffer | string };
|
||||
return {
|
||||
code: e.status ?? 1,
|
||||
stdout: e.stdout ? (typeof e.stdout === 'string' ? e.stdout : e.stdout.toString('utf-8')) : '',
|
||||
stderr: e.stderr ? (typeof e.stderr === 'string' ? e.stderr : e.stderr.toString('utf-8')) : '',
|
||||
};
|
||||
// spawnSync (not execSync) — execSync returns only stdout on success and
|
||||
// discards stderr, which made any `thread:` assertion against a successful
|
||||
// chat impossible to evaluate. Splitting the args correctly handles the
|
||||
// few existing call sites that quote-wrap multi-word values like
|
||||
// `--system-prompt "You are..."`.
|
||||
const argv = splitArgs(args);
|
||||
const res = spawnSync('mcpctl', ['--direct', ...argv], {
|
||||
encoding: 'utf-8',
|
||||
timeout: 60_000,
|
||||
});
|
||||
return {
|
||||
code: res.status ?? 1,
|
||||
stdout: (res.stdout ?? '').trim(),
|
||||
stderr: (res.stderr ?? '').trim(),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Tokenize a shell-style argv string with simple double-quote support — just
|
||||
* enough for the smoke test's call shapes. Not a full POSIX parser; we only
|
||||
* need to keep `--system-prompt "You are a smoke test..."` together as one
|
||||
* arg.
|
||||
*/
|
||||
function splitArgs(s: string): string[] {
|
||||
const out: string[] = [];
|
||||
const re = /"([^"]*)"|(\S+)/g;
|
||||
let m: RegExpExecArray | null;
|
||||
while ((m = re.exec(s)) !== null) {
|
||||
out.push(m[1] !== undefined ? m[1] : (m[2] ?? ''));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function healthz(url: string, timeoutMs = 5000): Promise<boolean> {
|
||||
|
||||
215
src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts
Normal file
215
src/mcplocal/tests/smoke/virtual-agent.smoke.test.ts
Normal file
@@ -0,0 +1,215 @@
|
||||
/**
|
||||
* Smoke tests: v3 virtual agents — register a virtual Llm + a virtual
|
||||
* Agent through the same `_provider-register` payload, then verify mcpd
|
||||
* surfaces the agent as kind=virtual / status=active. Mirrors
|
||||
* virtual-llm.smoke.test.ts's in-process registrar pattern so we don't
|
||||
* need to mutate ~/.mcpctl/config.json or bounce systemd's mcplocal.
|
||||
*
|
||||
* Heartbeat-stale → inactive (90 s) and 4 h auto-deletion are covered by
|
||||
* the unit suite (mcpd virtual-agent-service.test.ts); waiting > 90 s in
|
||||
* smoke would balloon the suite duration.
|
||||
*/
|
||||
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
|
||||
import http from 'node:http';
|
||||
import https from 'node:https';
|
||||
import { mkdtempSync, rmSync, readFileSync, existsSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import {
|
||||
VirtualLlmRegistrar,
|
||||
type RegistrarPublishedProvider,
|
||||
type RegistrarPublishedAgent,
|
||||
} from '../../src/providers/registrar.js';
|
||||
import type { LlmProvider, CompletionResult } from '../../src/providers/types.js';
|
||||
|
||||
const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu';
|
||||
const SUFFIX = Date.now().toString(36);
|
||||
const PROVIDER_NAME = `smoke-vagent-llm-${SUFFIX}`;
|
||||
const AGENT_NAME = `smoke-vagent-${SUFFIX}`;
|
||||
|
||||
function makeFakeProvider(name: string, content: string): LlmProvider {
|
||||
return {
|
||||
name,
|
||||
async complete(): Promise<CompletionResult> {
|
||||
return {
|
||||
content,
|
||||
toolCalls: [],
|
||||
usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 },
|
||||
finishReason: 'stop',
|
||||
};
|
||||
},
|
||||
async listModels() { return []; },
|
||||
async isAvailable() { return true; },
|
||||
};
|
||||
}
|
||||
|
||||
function healthz(url: string, timeoutMs = 5000): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`);
|
||||
const driver = parsed.protocol === 'https:' ? https : http;
|
||||
const req = driver.get({
|
||||
hostname: parsed.hostname,
|
||||
port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80),
|
||||
path: parsed.pathname,
|
||||
timeout: timeoutMs,
|
||||
}, (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); });
|
||||
req.on('error', () => resolve(false));
|
||||
req.on('timeout', () => { req.destroy(); resolve(false); });
|
||||
});
|
||||
}
|
||||
|
||||
function readToken(): string | null {
|
||||
try {
|
||||
const path = join(process.env.HOME ?? '', '.mcpctl', 'credentials');
|
||||
if (!existsSync(path)) return null;
|
||||
const parsed = JSON.parse(readFileSync(path, 'utf-8')) as { token?: string };
|
||||
return parsed.token ?? null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
interface HttpResponse { status: number; body: string }
|
||||
|
||||
function httpRequest(method: string, urlStr: string, body: unknown): Promise<HttpResponse> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const tokenRaw = readToken();
|
||||
const parsed = new URL(urlStr);
|
||||
const driver = parsed.protocol === 'https:' ? https : http;
|
||||
const headers: Record<string, string> = {
|
||||
Accept: 'application/json',
|
||||
...(body !== undefined ? { 'Content-Type': 'application/json' } : {}),
|
||||
...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}),
|
||||
};
|
||||
const req = driver.request({
|
||||
hostname: parsed.hostname,
|
||||
port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80),
|
||||
path: parsed.pathname + parsed.search,
|
||||
method,
|
||||
headers,
|
||||
timeout: 30_000,
|
||||
}, (res) => {
|
||||
const chunks: Buffer[] = [];
|
||||
res.on('data', (c: Buffer) => chunks.push(c));
|
||||
res.on('end', () => {
|
||||
resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') });
|
||||
});
|
||||
});
|
||||
req.on('error', reject);
|
||||
req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); });
|
||||
if (body !== undefined) req.write(JSON.stringify(body));
|
||||
req.end();
|
||||
});
|
||||
}
|
||||
|
||||
interface AgentRow { id: string; name: string; kind?: string; status?: string; llm?: { name: string }; description?: string }
|
||||
|
||||
let mcpdUp = false;
|
||||
let registrar: VirtualLlmRegistrar | null = null;
|
||||
let tempDir: string;
|
||||
|
||||
describe('virtual-agent smoke (v3)', () => {
|
||||
beforeAll(async () => {
|
||||
mcpdUp = await healthz(MCPD_URL);
|
||||
if (!mcpdUp) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn(`\n ○ virtual-agent smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`);
|
||||
return;
|
||||
}
|
||||
if (readToken() === null) {
|
||||
mcpdUp = false;
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn('\n ○ virtual-agent smoke: skipped — no ~/.mcpctl/credentials.\n');
|
||||
return;
|
||||
}
|
||||
tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-virtual-agent-smoke-'));
|
||||
}, 20_000);
|
||||
|
||||
afterAll(async () => {
|
||||
if (registrar !== null) registrar.stop();
|
||||
if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true });
|
||||
// Defensive cleanup: agent first (Llm.id has Restrict FK), then Llm.
|
||||
if (mcpdUp) {
|
||||
const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined);
|
||||
if (agents.status === 200) {
|
||||
const rows = JSON.parse(agents.body) as Array<{ id: string; name: string }>;
|
||||
const row = rows.find((r) => r.name === AGENT_NAME);
|
||||
if (row !== undefined) {
|
||||
await httpRequest('DELETE', `${MCPD_URL}/api/v1/agents/${row.id}`, undefined);
|
||||
}
|
||||
}
|
||||
const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||
if (llms.status === 200) {
|
||||
const rows = JSON.parse(llms.body) as Array<{ id: string; name: string }>;
|
||||
const row = rows.find((r) => r.name === PROVIDER_NAME);
|
||||
if (row !== undefined) {
|
||||
await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it('registrar publishes provider + agent in one round-trip and mcpd lists the agent kind=virtual / status=active', async () => {
|
||||
if (!mcpdUp) return;
|
||||
const token = readToken();
|
||||
if (token === null) return;
|
||||
|
||||
const published: RegistrarPublishedProvider[] = [
|
||||
{ provider: makeFakeProvider(PROVIDER_NAME, 'hi from virtual agent'), type: 'openai', model: 'fake-vagent', tier: 'fast' },
|
||||
];
|
||||
const publishedAgents: RegistrarPublishedAgent[] = [
|
||||
{
|
||||
name: AGENT_NAME,
|
||||
llmName: PROVIDER_NAME,
|
||||
description: 'v3 virtual agent smoke',
|
||||
systemPrompt: 'You are a smoke test. Reply READY.',
|
||||
defaultParams: { temperature: 0 },
|
||||
},
|
||||
];
|
||||
registrar = new VirtualLlmRegistrar({
|
||||
mcpdUrl: MCPD_URL,
|
||||
token,
|
||||
publishedProviders: published,
|
||||
publishedAgents,
|
||||
sessionFilePath: join(tempDir, 'session'),
|
||||
log: { info: () => {}, warn: () => {}, error: () => {} },
|
||||
heartbeatIntervalMs: 60_000,
|
||||
});
|
||||
await registrar.start();
|
||||
expect(registrar.getSessionId()).not.toBeNull();
|
||||
// Give the SSE handshake + atomic register a moment to settle.
|
||||
await new Promise((r) => setTimeout(r, 400));
|
||||
|
||||
const res = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined);
|
||||
expect(res.status).toBe(200);
|
||||
const rows = JSON.parse(res.body) as AgentRow[];
|
||||
const row = rows.find((r) => r.name === AGENT_NAME);
|
||||
expect(row, `${AGENT_NAME} must be present`).toBeDefined();
|
||||
expect(row!.kind).toBe('virtual');
|
||||
expect(row!.status).toBe('active');
|
||||
expect(row!.llm?.name).toBe(PROVIDER_NAME);
|
||||
expect(row!.description).toBe('v3 virtual agent smoke');
|
||||
}, 30_000);
|
||||
|
||||
it('publisher disconnect flips the agent to status=inactive (paired with its Llm)', async () => {
|
||||
if (!mcpdUp) return;
|
||||
if (registrar !== null) {
|
||||
registrar.stop();
|
||||
registrar = null;
|
||||
}
|
||||
// unbindSession runs synchronously on the SSE close handler; mcpd
|
||||
// flips both the Llm and any agents owned by the session to
|
||||
// inactive. A short wait covers the request round-trip.
|
||||
await new Promise((r) => setTimeout(r, 400));
|
||||
|
||||
const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined);
|
||||
expect(agents.status).toBe(200);
|
||||
const agentRow = (JSON.parse(agents.body) as AgentRow[]).find((r) => r.name === AGENT_NAME);
|
||||
expect(agentRow, `${AGENT_NAME} must still exist (deletion is GC-driven, not disconnect-driven)`).toBeDefined();
|
||||
expect(agentRow!.status).toBe('inactive');
|
||||
|
||||
const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||
const llmRow = (JSON.parse(llms.body) as Array<{ name: string; status: string }>).find((r) => r.name === PROVIDER_NAME);
|
||||
expect(llmRow!.status).toBe('inactive');
|
||||
}, 30_000);
|
||||
});
|
||||
@@ -207,3 +207,137 @@ describe('virtual-LLM smoke', () => {
|
||||
expect(res.body).toMatch(/publisher offline|inactive/);
|
||||
}, 30_000);
|
||||
});
|
||||
|
||||
// ── v2: hibernating + wake-on-demand ──
|
||||
|
||||
const HIBERNATING_NAME = `smoke-virtual-hib-${SUFFIX}`;
|
||||
let hibernatingRegistrar: VirtualLlmRegistrar | null = null;
|
||||
|
||||
/**
|
||||
* Provider that's "asleep" until \`wakeFn()\` is called. Used to drive
|
||||
* the wake-on-demand smoke without standing up an actual sleep/wake
|
||||
* controller — we flip the bool from inside the wake recipe.
|
||||
*/
|
||||
function makeSleepingProvider(name: string, content: string): {
|
||||
provider: LlmProvider;
|
||||
wakeFn: () => void;
|
||||
wakeCount: () => number;
|
||||
} {
|
||||
let awake = false;
|
||||
let count = 0;
|
||||
const provider: LlmProvider = {
|
||||
name,
|
||||
async complete(): Promise<CompletionResult> {
|
||||
if (!awake) throw new Error('provider not awake');
|
||||
return {
|
||||
content,
|
||||
toolCalls: [],
|
||||
usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 },
|
||||
finishReason: 'stop',
|
||||
};
|
||||
},
|
||||
async listModels() { return []; },
|
||||
async isAvailable() { return awake; },
|
||||
};
|
||||
return {
|
||||
provider,
|
||||
wakeFn: () => { awake = true; count += 1; },
|
||||
wakeCount: () => count,
|
||||
};
|
||||
}
|
||||
|
||||
describe('virtual-LLM smoke — wake-on-demand', () => {
|
||||
let wakeServerUrl: string;
|
||||
let wakeServer: http.Server;
|
||||
let wakeFn: (() => void) | null = null;
|
||||
|
||||
beforeAll(async () => {
|
||||
if (!mcpdUp) return;
|
||||
// Tiny in-process "wake controller" — receives the http wake recipe
|
||||
// POST and flips the local provider's `awake` bool.
|
||||
await new Promise<void>((resolve) => {
|
||||
wakeServer = http.createServer((req, res) => {
|
||||
if (req.url === '/wake' && wakeFn !== null) {
|
||||
wakeFn();
|
||||
res.writeHead(200);
|
||||
res.end('woken');
|
||||
return;
|
||||
}
|
||||
res.writeHead(404);
|
||||
res.end();
|
||||
});
|
||||
wakeServer.listen(0, '127.0.0.1', () => {
|
||||
const addr = wakeServer.address();
|
||||
if (addr === null || typeof addr === 'string') throw new Error('listen failed');
|
||||
wakeServerUrl = `http://127.0.0.1:${String(addr.port)}/wake`;
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
if (hibernatingRegistrar !== null) hibernatingRegistrar.stop();
|
||||
if (wakeServer) await new Promise<void>((r) => wakeServer.close(() => r()));
|
||||
if (mcpdUp) {
|
||||
const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||
if (list.status === 200) {
|
||||
const rows = JSON.parse(list.body) as Array<{ id: string; name: string }>;
|
||||
const row = rows.find((r) => r.name === HIBERNATING_NAME);
|
||||
if (row !== undefined) {
|
||||
await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it('publishes a sleeping provider as kind=virtual / status=hibernating', async () => {
|
||||
if (!mcpdUp) return;
|
||||
const token = readToken();
|
||||
if (token === null) return;
|
||||
const sleeping = makeSleepingProvider(HIBERNATING_NAME, 'awake now');
|
||||
wakeFn = sleeping.wakeFn;
|
||||
|
||||
const published: RegistrarPublishedProvider[] = [{
|
||||
provider: sleeping.provider,
|
||||
type: 'openai',
|
||||
model: 'fake-hibernating',
|
||||
tier: 'fast',
|
||||
wake: { type: 'http', url: wakeServerUrl, method: 'POST', maxWaitSeconds: 5 },
|
||||
}];
|
||||
hibernatingRegistrar = new VirtualLlmRegistrar({
|
||||
mcpdUrl: MCPD_URL,
|
||||
token,
|
||||
publishedProviders: published,
|
||||
sessionFilePath: join(tempDir, 'hib-session'),
|
||||
log: { info: () => {}, warn: () => {}, error: () => {} },
|
||||
heartbeatIntervalMs: 60_000,
|
||||
});
|
||||
await hibernatingRegistrar.start();
|
||||
await new Promise((r) => setTimeout(r, 400));
|
||||
|
||||
const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||
expect(res.status).toBe(200);
|
||||
const rows = JSON.parse(res.body) as Array<{ name: string; kind: string; status: string }>;
|
||||
const row = rows.find((r) => r.name === HIBERNATING_NAME);
|
||||
expect(row, `${HIBERNATING_NAME} must be present`).toBeDefined();
|
||||
expect(row!.kind).toBe('virtual');
|
||||
expect(row!.status).toBe('hibernating');
|
||||
}, 30_000);
|
||||
|
||||
it('first inference triggers the wake recipe and then completes', async () => {
|
||||
if (!mcpdUp) return;
|
||||
// wakeFn was set in the previous test; it flips the provider's
|
||||
// `awake` bool when the wake POST lands.
|
||||
const res = await httpRequest('POST', `${MCPD_URL}/api/v1/llms/${HIBERNATING_NAME}/infer`, {
|
||||
messages: [{ role: 'user', content: 'wake then say hello' }],
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
const body = JSON.parse(res.body) as { choices?: Array<{ message?: { content?: string } }> };
|
||||
expect(body.choices?.[0]?.message?.content).toBe('awake now');
|
||||
|
||||
// After the wake, the row should now be active.
|
||||
const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||
const rows = JSON.parse(list.body) as Array<{ name: string; status: string }>;
|
||||
expect(rows.find((r) => r.name === HIBERNATING_NAME)?.status).toBe('active');
|
||||
}, 30_000);
|
||||
});
|
||||
|
||||
@@ -1,8 +1,21 @@
|
||||
import { defineConfig } from 'vitest/config';
|
||||
import { availableParallelism } from 'node:os';
|
||||
|
||||
// Default vitest's pool to ~half the CPU threads we have. The previous
|
||||
// implicit default left this 64-thread workstation at ~10% utilization
|
||||
// during `pnpm test:run`. Half is a soft cap that stays kind to laptops
|
||||
// (8-thread → 4 workers) while letting beefy hosts push closer to the
|
||||
// box's actual capacity. Override at run time with VITEST_MAX_THREADS.
|
||||
const cores = availableParallelism();
|
||||
const maxThreads = Number(process.env['VITEST_MAX_THREADS'] ?? Math.max(2, Math.floor(cores / 2)));
|
||||
|
||||
export default defineConfig({
|
||||
test: {
|
||||
globals: true,
|
||||
pool: 'threads',
|
||||
poolOptions: {
|
||||
threads: { maxThreads, minThreads: 1 },
|
||||
},
|
||||
coverage: {
|
||||
provider: 'v8',
|
||||
reporter: ['text', 'json', 'html'],
|
||||
|
||||
Reference in New Issue
Block a user