Compare commits

...

7 Commits

Author SHA1 Message Date
Michal
0591c62845 feat(mcpd): AgentService virtual methods + GC cascade (v3 Stage 2)
State machine for kind=virtual Agent rows. Mirrors what
VirtualLlmService did for Llms in v1, then wires both lifecycles
together so disconnect/heartbeat/GC cascade through both at once.

AgentRepository:
- create/update accept the new lifecycle fields (kind, providerSessionId,
  status, lastHeartbeatAt, inactiveSince).
- Adds findBySessionId, findByLlmId, findStaleVirtuals, findExpiredInactives.

AgentService — new virtual-agent methods:
- registerVirtualAgents(sessionId, inputs, ownerId) — sticky upsert.
  New names insert as kind=virtual/status=active. Existing virtuals
  owned by the same session reactivate; existing inactive virtuals
  from a foreign session can be adopted (sticky reconnect). Refuses
  to overwrite a public agent or a foreign session's still-active
  virtual (HTTP 409). Pinned LLM is resolved via LlmService — caller
  posts Llms first.
- heartbeatVirtualAgents(sessionId) — bumps owned agents on a session
  heartbeat; revives inactive rows.
- markVirtualAgentsInactiveBySession(sessionId) — disconnect cascade.
- deleteVirtualAgentsForLlm(llmId) — defensive cascade for the GC's
  Llm-delete step (Agent.llmId is Restrict).
- gcSweepVirtualAgents() — same shape as VirtualLlmService.gcSweep
  (90s heartbeat-stale → inactive, 4h inactive → delete).

VirtualLlmService:
- Optional AgentService dependency. heartbeat() now also bumps owned
  agents; unbindSession() flips them inactive. gcSweep() runs the
  agent sweep FIRST (so any agent that would block an Llm delete via
  Restrict is already gone), and adds a defensive
  deleteVirtualAgentsForLlm step right before each Llm delete in case
  an agent's heartbeat lagged its Llm's just enough to escape this
  round's 4h cutoff.

main.ts:
- VirtualLlmService construction moves below AgentService so it can
  receive the cascade dependency.

Tests: 13 new in virtual-agent-service.test.ts cover all the register
variants (insert, sticky reconnect, adopt-inactive-foreign, refuse
public-overwrite, refuse foreign-session-active), heartbeat-revive,
disconnect-cascade, deleteVirtualAgentsForLlm scope, GC sweep flip
+ delete + idempotence, and three VirtualLlmService cascade scenarios
(unbindSession, gcSweep deleting agent before Llm, defensive cascade
when agent's heartbeat lagged).

mcpd suite: 854/854 (was 841 + 13 new). Workspace unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:03:59 +01:00
Michal
8d59b0bf2c feat(db+mcpd): Agent lifecycle + chat.service kind=virtual branch (v3 Stage 1)
Two pieces of v3 plumbing — schema + the latent v1 chat.service bug.

Schema (db):
- Agent gains kind/providerSessionId/lastHeartbeatAt/status/inactiveSince
  mirroring Llm's v1 lifecycle. Reuses LlmKind / LlmStatus enums; no
  new types. Existing rows backfill kind=public/status=active so v1
  CRUD is unaffected.
- @@index([kind, status]) for the GC sweep, @@index([providerSessionId])
  for disconnect-cascade lookups.
- 4 new prisma-level tests cover defaults, persisting virtual fields,
  the (kind, status) GC index, and providerSessionId lookups.
  Total agent-schema tests: 20/20.

chat.service (mcpd) — fixes the v1 latent bug:
- LlmView's kind is now plumbed through prepareContext as ctx.llmKind.
- Two new private helpers, runOneInference / streamInference, branch
  on ctx.llmKind: 'public' goes through the existing adapter
  registry, 'virtual' relays through VirtualLlmService.enqueueInferTask
  (mirrors the route-handler branch from v1 Stage 3).
- Streaming bridges VirtualLlmService's onChunk callback API to an
  async iterator via a small queue + wake pattern.
- ChatService gains an optional virtualLlms constructor parameter;
  main.ts wires it in. Older test wirings without it raise a clear
  "virtualLlms dispatcher not wired" error when the row is virtual,
  rather than silently falling through to the public path against an
  empty URL.

This unblocks any Agent (public OR future v3-virtual) pinned to a
kind=virtual Llm. Pre-this-stage, those agents 502'd against the
empty url field.

Tests: 4 new chat-service-virtual-llm.test.ts cover the relay path
non-streaming, streaming, missing-dispatcher error, and rejection
surfacing. mcpd suite: 841/841 (was 833, +8 across stages 1+v3-Stage-1).
Workspace: 2054/2054 across 153 files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 16:55:02 +01:00
45c7737ee1 feat: virtual LLMs v2 (wake-on-demand) (#65)
Some checks failed
CI/CD / lint (push) Successful in 54s
CI/CD / test (push) Successful in 1m12s
CI/CD / typecheck (push) Successful in 2m42s
CI/CD / smoke (push) Failing after 1m43s
CI/CD / build (push) Successful in 2m33s
CI/CD / publish (push) Has been skipped
2026-04-27 14:20:59 +00:00
Michal
e0cfe0ba4d feat: virtual-LLM v2 smoke + docs (v2 Stage 3)
Some checks failed
CI/CD / lint (pull_request) Successful in 55s
CI/CD / test (pull_request) Successful in 1m8s
CI/CD / typecheck (pull_request) Successful in 2m43s
CI/CD / smoke (pull_request) Failing after 1m44s
CI/CD / build (pull_request) Successful in 5m28s
CI/CD / publish (pull_request) Has been skipped
Closes v2 (wake-on-demand). Same shape as v1's stage 6: smoke
exercises the live-cluster path, docs lose the "v2 reserved" caveat
and gain a full wake-recipe section.

Smoke (virtual-llm.smoke.test.ts):
- New "wake-on-demand" describe block runs alongside the v1 tests.
- Spins a tiny in-process HTTP "wake controller"; the published
  provider's isAvailable() returns false until the wake POST flips
  the bool. Asserts:
    1. Provider publishes as kind=virtual / status=hibernating.
    2. First inference triggers the wake recipe, the recipe POSTs
       to the controller, the provider becomes available, mcpd
       relays the inference, and the row settles to active.
- Cleans up the row + wake server in afterAll.

Docs (docs/virtual-llms.md):
- Lifecycle table updates the `hibernating` description from
  "reserved for v2" to the actual v2 semantics.
- New "Wake-on-demand (v2)" section: configuration shapes for both
  recipe types (http + command), the wake-then-infer flow diagram,
  concurrent-infer dedup, failure semantics.
- Roadmap drops v2; v3-v5 still listed.

Workspace: 2050/2050 (smoke runs separately; the new SSE-based wake
test runs only against a live cluster, not under \`pnpm test:run\`).

v2 closes. v3 = virtual agents, v4 = LB pool by model, v5 = queue.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:20:18 +01:00
Michal
db839afc57 feat(mcpd): wake-before-infer for hibernating virtual LLMs (v2 Stage 2)
Second half of v2. mcpd now dispatches a \`wake\` task on the SSE
control channel when an inference request hits a row whose
status=hibernating, waits for the publisher to confirm readiness,
then proceeds with the infer task. Concurrent infers for the same
hibernating Llm share a single wake task — \`wakeInFlight\` map
dedupes by Llm name.

State machine in enqueueInferTask:
  active        → push infer task immediately (existing path).
  inactive      → 503, publisher offline (existing path).
  hibernating   → ensureAwake() → push infer task (new in v2).

ensureAwake/runWake (private):
- Allocates a fresh taskId on the existing PendingTask plumbing.
- Pushes \`{ kind: "wake", taskId, llmName }\` on the SSE handle.
- Awaits the publisher's result POST. On 2xx, flips the row to
  active + bumps lastHeartbeatAt, so all queued + future infers
  hit the active path. On non-2xx or service.failTask, the row
  stays hibernating (next request retries).

Tests: 4 new in virtual-llm-service.test.ts cover happy path
(wake → infer in order), concurrent dedup (3 parallel infers, 1
wake task), wake failure surfaces to all queued infers and leaves
the row hibernating, inactive ≠ hibernating (still rejects with 503,
no wake attempt). 22/22 service tests, 2050/2050 workspace.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:18:24 +01:00
Michal
af0fabd84f feat(mcplocal+mcpd): wake-recipe config + wake-task execution (v2 Stage 1)
First half of v2 — mcplocal can now declare a hibernating backend and
respond to a `wake` task by running a configured recipe. v2 Stage 2
will wire mcpd to dispatch the wake task before relaying inference.

Config (LlmProviderFileEntry):
- New \`wake\` block on a published provider:
    wake:
      type: http        # or: command
      url: ...           # http only
      method: POST       # http only, default POST
      headers: {...}     # http only
      body: ...          # http only
      command: ...       # command only
      args: [...]        # command only
      maxWaitSeconds: 60 # how long to poll isAvailable() after wake fires

Registrar (mcplocal):
- At publish time, providers with a wake recipe whose isAvailable()
  returns false report initialStatus=hibernating to mcpd. Without a
  wake recipe (legacy v1) or when already up, status stays active.
- handleWakeTask: runs the recipe (HTTP request OR child-process
  spawn), then polls isAvailable() up to maxWaitSeconds, sending a
  heartbeat each loop so mcpd's GC sweep doesn't time us out
  mid-boot. Reports { ok, ms } on success or { error } on
  timeout/recipe failure via the existing _provider-task/:id/result.
- Replaces the v1 stub that rejected wake tasks with "not implemented".

mcpd VirtualLlmService:
- RegisterProviderInput gains optional initialStatus ('active' |
  'hibernating'). The register/upsert path uses it for both new and
  reconnecting rows. Defaults to 'active' so v1 publishers still
  work unchanged.
- Provider-register route's coercer accepts the new field.

Tests: 3 new in registrar.test.ts cover initialStatus selection
(hibernating when wake configured + unavailable, active otherwise,
active when no wake even if unavailable). 8/8 registrar tests, 833/833
mcpd unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:15:46 +01:00
700d1683c2 fix(cli): strip virtual-LLM lifecycle fields from llm apply-doc YAML (#64)
Some checks failed
CI/CD / lint (push) Successful in 56s
CI/CD / test (push) Successful in 1m11s
CI/CD / typecheck (push) Successful in 2m49s
CI/CD / smoke (push) Failing after 1m42s
CI/CD / build (push) Successful in 3m10s
CI/CD / publish (push) Has been skipped
2026-04-27 13:47:18 +00:00
19 changed files with 1816 additions and 53 deletions

View File

@@ -97,11 +97,11 @@ route branches on it server-side.
## Lifecycle in detail
| State | What it means |
|----------------|-----------------------------------------------------------------------|
| `active` | Heartbeat received within the last 90 s and the SSE channel is open. |
| State | What it means |
|----------------|---------------------------------------------------------------------------------|
| `active` | Heartbeat received within the last 90 s and the SSE channel is open. |
| `inactive` | Either the SSE closed or the heartbeat watchdog tripped. Inference returns 503. |
| `hibernating` | Reserved for v2 (wake-on-demand). v1 never writes this state. |
| `hibernating` | Publisher is online but the backend is asleep; the next inference triggers a `wake` task before relaying. |
Two timers on mcpd run the GC sweep:
@@ -132,10 +132,75 @@ a finalized `CompletionResult`, not a token stream. Streaming requests
therefore arrive at the caller as a single delta + `[DONE]`. Real
per-token streaming is a v2 concern.
## Wake-on-demand (v2)
A provider whose backend hibernates (a vLLM instance that suspends
when idle, an Ollama daemon that exits when nothing's connected, …)
can declare a **wake recipe** in mcplocal config. When that provider's
`isAvailable()` returns false at registrar startup, the row is
published as `status=hibernating`. The next inference request that
hits the row triggers the recipe and waits for the backend to come up
before relaying.
Two recipe types:
```jsonc
// HTTP — POST to a "wake controller" that starts the backend out of band.
{
"name": "vllm-local",
"type": "openai",
"model": "...",
"publish": true,
"wake": {
"type": "http",
"url": "http://10.0.0.50:9090/wake/vllm",
"method": "POST",
"headers": { "Authorization": "Bearer ..." },
"maxWaitSeconds": 60
}
}
```
```jsonc
// command — spawn a local process (systemd, wakeonlan, custom script).
{
"name": "vllm-local",
"type": "openai",
"model": "...",
"publish": true,
"wake": {
"type": "command",
"command": "/usr/local/bin/start-vllm",
"args": ["--profile", "qwen3"],
"maxWaitSeconds": 120
}
}
```
How a request flows when the row is `hibernating`:
```
client → mcpd POST /api/v1/llms/<name>/infer
mcpd: status === hibernating → push wake task on SSE
mcplocal: receive wake task → run recipe → poll isAvailable()
→ heartbeat each tick → POST { ok: true } back
mcpd: flip row → active, push the original infer task
mcplocal: run inference → POST result back
mcpd → client (forwards the inference result)
```
Concurrent infers for the same hibernating Llm share a single wake
task — only the first request triggers the recipe; later ones await
the same in-flight wake promise. After the wake settles, every queued
infer dispatches in order.
If the recipe fails (HTTP non-2xx, command exits non-zero, or the
provider doesn't come up within `maxWaitSeconds`), every queued infer
is rejected with a clear error and the row stays `hibernating`
the next request gets a fresh wake attempt.
## Roadmap (later stages)
- **v2 — Wake-on-demand**: Secret-stored "wake recipe" so mcpd can ask
mcplocal to start a hibernating backend before sending inference.
- **v3 — Virtual agents**: mcplocal publishes its local agent configs
(model + system prompt + sampling defaults) into mcpd's `Agent` table.
- **v4 — LB pool by model**: agents can target a model name instead of

View File

@@ -0,0 +1,14 @@
-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the
-- existing LlmKind / LlmStatus enums so we don't double-define them.
-- Existing rows backfill with kind='public' / status='active' so
-- nothing changes for manually-created agents.
ALTER TABLE "Agent"
ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public',
ADD COLUMN "providerSessionId" TEXT,
ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3),
ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active',
ADD COLUMN "inactiveSince" TIMESTAMP(3);
CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status");
CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId");

View File

@@ -469,20 +469,26 @@ model BackupPending {
// Per-call LiteLLM-style overrides stack on top of `defaultParams`.
model Agent {
id String @id @default(cuid())
name String @unique
description String @default("") // shown in MCP tools/list
systemPrompt String @default("") @db.Text // agent persona
id String @id @default(cuid())
name String @unique
description String @default("") // shown in MCP tools/list
systemPrompt String @default("") @db.Text // agent persona
llmId String
projectId String?
defaultPersonalityId String? // applied at chat time when no --personality flag
proxyModelName String? // optional informational override
defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ...
extras Json @default("{}") // future LoRA / tool-allowlist
defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ...
extras Json @default("{}") // future LoRA / tool-allowlist
// ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ──
kind LlmKind @default(public)
providerSessionId String? // mcplocal session that owns this row when virtual
lastHeartbeatAt DateTime?
status LlmStatus @default(active)
inactiveSince DateTime?
ownerId String
version Int @default(1)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
version Int @default(1)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
llm Llm @relation(fields: [llmId], references: [id], onDelete: Restrict)
project Project? @relation(fields: [projectId], references: [id], onDelete: SetNull)
@@ -497,6 +503,8 @@ model Agent {
@@index([projectId])
@@index([ownerId])
@@index([defaultPersonalityId])
@@index([kind, status])
@@index([providerSessionId])
}
// ── Personalities (named overlay bundles of prompts on top of an Agent) ──

View File

@@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => {
expect(reloaded?.defaultPersonalityId).toBeNull();
});
// ── v3: Agent.kind virtual + lifecycle fields ──
it('defaults a freshly inserted Agent to kind=public, status=active', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-default-kind');
const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id });
expect(agent.kind).toBe('public');
expect(agent.status).toBe('active');
expect(agent.providerSessionId).toBeNull();
expect(agent.lastHeartbeatAt).toBeNull();
expect(agent.inactiveSince).toBeNull();
});
it('persists kind=virtual + lifecycle fields together', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-pub-virtual');
const now = new Date();
const agent = await prisma.agent.create({
data: {
name: 'local-coder',
llmId: llm.id,
ownerId: user.id,
kind: 'virtual',
providerSessionId: 'sess-abc',
lastHeartbeatAt: now,
status: 'active',
},
});
expect(agent.kind).toBe('virtual');
expect(agent.providerSessionId).toBe('sess-abc');
expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime());
});
it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-gc-agent');
await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } });
await prisma.agent.create({
data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' },
});
await prisma.agent.create({
data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() },
});
const stale = await prisma.agent.findMany({
where: { kind: 'virtual', status: 'inactive' },
select: { name: true },
});
expect(stale.map((a) => a.name)).toEqual(['v-inactive']);
});
it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-sess-cascade');
await prisma.agent.create({
data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' },
});
const owned = await prisma.agent.findMany({
where: { providerSessionId: 'shared' },
select: { name: true },
orderBy: { name: 'asc' },
});
expect(owned.map((a) => a.name)).toEqual(['a', 'b']);
});
it('binds the same prompt to multiple personalities of an agent', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-shared-prompt');

View File

@@ -435,10 +435,8 @@ async function main(): Promise<void> {
adapters: llmAdapters,
log: { warn: (msg) => app.log.warn(msg) },
});
// Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker
// is started below after `app.listen` so it doesn't fire before the
// server is accepting traffic.
const virtualLlmService = new VirtualLlmService(llmRepo);
// VirtualLlmService is constructed lower down (after AgentService) so
// it can wire the agent-cascade callbacks introduced in v3 Stage 2.
// AgentService + ChatService get fully wired below once projectService and
// mcpProxyService are constructed (ChatService needs them via the
// ChatToolDispatcher bridge).
@@ -465,6 +463,10 @@ async function main(): Promise<void> {
const personalityRepo = new PersonalityRepository(prisma);
const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo);
const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo);
// Virtual-provider state machine (kind=virtual rows for both Llms and
// Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade.
// The 60-s GC ticker is started below after `app.listen`.
const virtualLlmService = new VirtualLlmService(llmRepo, agentService);
// ChatService needs the proxy + project repo via the ChatToolDispatcher
// bridge. The dispatcher's logger references `app.log`, which is not
// constructed until further down — `chatService` itself is built right
@@ -607,6 +609,7 @@ async function main(): Promise<void> {
promptRepo,
chatToolDispatcher,
personalityRepo,
virtualLlmService,
);
registerAgentChatRoutes(app, chatService);
registerLlmInferRoutes(app, {

View File

@@ -1,4 +1,4 @@
import type { PrismaClient, Agent, Prisma } from '@prisma/client';
import type { PrismaClient, Agent, Prisma, LlmKind, LlmStatus } from '@prisma/client';
export interface CreateAgentRepoInput {
name: string;
@@ -11,6 +11,12 @@ export interface CreateAgentRepoInput {
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
ownerId: string;
// Virtual-agent lifecycle (omit for kind=public).
kind?: LlmKind;
providerSessionId?: string | null;
status?: LlmStatus;
lastHeartbeatAt?: Date | null;
inactiveSince?: Date | null;
}
export interface UpdateAgentRepoInput {
@@ -22,6 +28,13 @@ export interface UpdateAgentRepoInput {
proxyModelName?: string | null;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
// Virtual-agent lifecycle. AgentService is the only public writer; the
// VirtualAgentService methods (Stage 2) bypass the public CRUD path.
kind?: LlmKind;
providerSessionId?: string | null;
status?: LlmStatus;
lastHeartbeatAt?: Date | null;
inactiveSince?: Date | null;
}
export interface IAgentRepository {
@@ -32,6 +45,11 @@ export interface IAgentRepository {
create(data: CreateAgentRepoInput): Promise<Agent>;
update(id: string, data: UpdateAgentRepoInput): Promise<Agent>;
delete(id: string): Promise<void>;
// Virtual-agent lifecycle helpers.
findBySessionId(sessionId: string): Promise<Agent[]>;
findByLlmId(llmId: string): Promise<Agent[]>;
findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]>;
findExpiredInactives(deletionCutoff: Date): Promise<Agent[]>;
}
export class AgentRepository implements IAgentRepository {
@@ -69,6 +87,11 @@ export class AgentRepository implements IAgentRepository {
defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue,
extras: (data.extras ?? {}) as Prisma.InputJsonValue,
ownerId: data.ownerId,
...(data.kind !== undefined ? { kind: data.kind } : {}),
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
...(data.status !== undefined ? { status: data.status } : {}),
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
},
});
}
@@ -99,6 +122,11 @@ export class AgentRepository implements IAgentRepository {
if (data.extras !== undefined) {
updateData.extras = data.extras as Prisma.InputJsonValue;
}
if (data.kind !== undefined) updateData.kind = data.kind;
if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId;
if (data.status !== undefined) updateData.status = data.status;
if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt;
if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince;
// Bump optimistic version on every update.
updateData.version = { increment: 1 };
return this.prisma.agent.update({ where: { id }, data: updateData });
@@ -107,4 +135,40 @@ export class AgentRepository implements IAgentRepository {
async delete(id: string): Promise<void> {
await this.prisma.agent.delete({ where: { id } });
}
// ── Virtual-agent lifecycle queries ──
async findBySessionId(sessionId: string): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: { providerSessionId: sessionId },
orderBy: { name: 'asc' },
});
}
async findByLlmId(llmId: string): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: { llmId },
orderBy: { name: 'asc' },
});
}
async findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: {
kind: 'virtual',
status: 'active',
lastHeartbeatAt: { lt: heartbeatCutoff },
},
});
}
async findExpiredInactives(deletionCutoff: Date): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: {
kind: 'virtual',
status: 'inactive',
inactiveSince: { lt: deletionCutoff },
},
});
}
}

View File

@@ -150,6 +150,7 @@ function coerceProviderInput(raw: unknown): {
tier?: string;
description?: string;
extraConfig?: Record<string, unknown>;
initialStatus?: 'active' | 'hibernating';
} {
if (raw === null || typeof raw !== 'object') {
throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 });
@@ -170,5 +171,11 @@ function coerceProviderInput(raw: unknown): {
if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') {
out.extraConfig = o['extraConfig'] as Record<string, unknown>;
}
// Only accept the two values v2 actually defines. Anything else falls
// through to the service default (active) — matches v1 publishers that
// don't know about this field.
if (o['initialStatus'] === 'active' || o['initialStatus'] === 'hibernating') {
out.initialStatus = o['initialStatus'];
}
return out;
}

View File

@@ -33,12 +33,28 @@ export interface AgentView {
proxyModelName: string | null;
defaultParams: AgentChatParams;
extras: Record<string, unknown>;
// Virtual-agent lifecycle (defaults make public agents look like "active").
kind: 'public' | 'virtual';
status: 'active' | 'inactive' | 'hibernating';
lastHeartbeatAt: Date | null;
inactiveSince: Date | null;
ownerId: string;
version: number;
createdAt: Date;
updatedAt: Date;
}
/** Input shape mcplocal sends per virtual agent on register. */
export interface VirtualAgentInput {
name: string;
llmName: string;
description?: string;
systemPrompt?: string;
project?: string;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
}
export class AgentService {
constructor(
private readonly repo: IAgentRepository,
@@ -179,10 +195,162 @@ export class AgentService {
proxyModelName: row.proxyModelName,
defaultParams: row.defaultParams as AgentChatParams,
extras: row.extras as Record<string, unknown>,
kind: row.kind,
status: row.status,
lastHeartbeatAt: row.lastHeartbeatAt,
inactiveSince: row.inactiveSince,
ownerId: row.ownerId,
version: row.version,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
};
}
// ── Virtual-agent lifecycle (v3) ──
/**
* Sticky upsert of virtual agents owned by a publishing mcplocal session.
* Mirrors VirtualLlmService.register's semantics:
* - New agents → insert with kind=virtual / status=active.
* - Existing virtual agents owned by the same session → update + reactivate.
* - Existing virtual agents owned by a different session, but currently
* inactive → adopt (sticky reconnect after a session lapse).
* - Existing public agents OR foreign-active virtuals → 409 Conflict.
* - Pinned LLM must already exist (publisher posts Llms first in the same
* register payload).
*/
async registerVirtualAgents(
sessionId: string,
inputs: VirtualAgentInput[],
ownerId: string,
): Promise<AgentView[]> {
const now = new Date();
const out: AgentView[] = [];
for (const a of inputs) {
const llm = await this.llms.getByName(a.llmName);
const projectId = a.project !== undefined
? (await this.projects.resolveAndGet(a.project)).id
: null;
const existing = await this.repo.findByName(a.name);
if (existing !== null) {
if (existing.kind === 'public') {
throw Object.assign(
new Error(`Cannot publish over public Agent: ${a.name}`),
{ statusCode: 409 },
);
}
if (existing.providerSessionId !== sessionId && existing.status === 'active') {
throw Object.assign(
new Error(`Virtual Agent '${a.name}' is already active under a different session`),
{ statusCode: 409 },
);
}
const updated = await this.repo.update(existing.id, {
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
llmId: llm.id,
projectId,
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
inactiveSince: null,
});
out.push(await this.toView(updated));
continue;
}
const created = await this.repo.create({
name: a.name,
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
llmId: llm.id,
projectId,
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
ownerId,
});
out.push(await this.toView(created));
}
return out;
}
/**
* Bumps lastHeartbeatAt on every virtual agent owned by the session.
* Revives inactive rows. Called from VirtualLlmService.heartbeat so
* one publisher heartbeat covers both Llms and Agents.
*/
async heartbeatVirtualAgents(sessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(sessionId);
if (owned.length === 0) return;
const now = new Date();
for (const row of owned) {
await this.repo.update(row.id, {
lastHeartbeatAt: now,
...(row.status === 'inactive' ? { status: 'active', inactiveSince: null } : {}),
});
}
}
/** Flip every virtual agent owned by the session to inactive immediately. */
async markVirtualAgentsInactiveBySession(sessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(sessionId);
const now = new Date();
for (const row of owned) {
if (row.status === 'active') {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
}
}
}
/**
* Cascade-delete virtual agents pinned to a virtual Llm. Called from
* VirtualLlmService.gcSweep BEFORE deleting the inactive Llm row, since
* Agent.llmId is `onDelete: Restrict` and would otherwise block the
* Llm delete.
*/
async deleteVirtualAgentsForLlm(llmId: string): Promise<number> {
const pinned = await this.repo.findByLlmId(llmId);
let deleted = 0;
for (const row of pinned) {
if (row.kind !== 'virtual') continue;
await this.repo.delete(row.id);
deleted += 1;
}
return deleted;
}
/**
* GC sweep for virtual agents — same shape as VirtualLlmService.gcSweep:
* 1. Heartbeat-stale active virtuals → inactive (90-s cutoff).
* 2. 4-h-old inactive virtuals → delete.
* Run BEFORE the LlmService GC sweep so any agent that would have
* blocked an Llm delete via Restrict has already been cleared.
*/
async gcSweepVirtualAgents(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> {
const HEARTBEAT_TIMEOUT_MS = 90_000;
const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000;
let markedInactive = 0;
let deleted = 0;
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
for (const row of stale) {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
markedInactive += 1;
}
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
const expired = await this.repo.findExpiredInactives(deletionCutoff);
for (const row of expired) {
await this.repo.delete(row.id);
deleted += 1;
}
return { markedInactive, deleted };
}
}

View File

@@ -31,6 +31,7 @@ import type {
} from '../repositories/chat.repository.js';
import type { IPromptRepository } from '../repositories/prompt.repository.js';
import type { IPersonalityRepository } from '../repositories/personality.repository.js';
import type { IVirtualLlmService } from './virtual-llm.service.js';
import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js';
import type { AgentChatParams } from '../validation/agent.schema.js';
import { NotFoundError } from './mcp-server.service.js';
@@ -132,6 +133,14 @@ export class ChatService {
private readonly promptRepo: IPromptRepository,
private readonly tools: ChatToolDispatcher,
private readonly personalities?: IPersonalityRepository,
/**
* v3: when an Agent is pinned to a `kind=virtual` Llm, inference is
* relayed via this service rather than an HTTP adapter (the virtual
* row has no upstream URL). Optional so older test wirings still
* compile; in those tests the chat path will refuse virtual Llms
* with a clear error.
*/
private readonly virtualLlms?: IVirtualLlmService,
) {}
async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> {
@@ -170,14 +179,7 @@ export class ChatService {
let lastTurnIndex = ctx.startingTurnIndex;
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const result = await adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
const result = await this.runOneInference(ctx);
const choice = extractChoice(result.body);
if (choice === null) {
throw new Error(`Adapter returned no choice (status ${String(result.status)})`);
@@ -240,19 +242,12 @@ export class ChatService {
const ctx = await this.prepareContext(args);
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = {
content: '',
toolCalls: [],
};
let finishReason: string | null = null;
for await (const chunk of adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
})) {
for await (const chunk of this.streamInference(ctx)) {
if (chunk.done === true) break;
if (chunk.data === '[DONE]') break;
const evt = parseStreamingChunk(chunk.data);
@@ -347,12 +342,130 @@ export class ChatService {
}
}
/**
* Streaming counterpart of runOneInference. Yields raw OpenAI-style
* SSE chunks ({data: string; done?: boolean}) regardless of whether
* we're hitting a public adapter or relaying through VirtualLlmService.
* The caller's `parseStreamingChunk` already speaks OpenAI shape, so
* downstream code doesn't need to know which path produced the chunks.
*/
private async *streamInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): AsyncGenerator<{ data: string; done?: boolean }> {
if (ctx.llmKind !== 'virtual') {
const adapter = this.adapters.get(ctx.llmType);
yield* adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
return;
}
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm',
);
}
// Bridge VirtualLlmService's onChunk callback API to an async
// iterator. Chunks land on the queue from the SSE relay; the
// generator drains them in order. ref.done resolves when the
// publisher emits its `[DONE]` marker.
const ref = await this.virtualLlms.enqueueInferTask(
ctx.llmName,
{ ...this.buildBody(ctx), stream: true },
true,
);
const queue: Array<{ data: string; done?: boolean }> = [];
let resolveTick: (() => void) | null = null;
const wake = (): void => {
const r = resolveTick;
resolveTick = null;
if (r !== null) r();
};
const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); });
let finished = false;
let failure: Error | null = null;
ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); });
try {
while (true) {
while (queue.length > 0) {
const c = queue.shift()!;
yield c;
if (c.done === true) return;
}
if (finished) {
if (failure !== null) throw failure;
return;
}
await new Promise<void>((r) => { resolveTick = r; });
}
} finally {
unsubscribe();
}
}
/**
* Run a single non-streaming inference iteration. Branches on
* ctx.llmKind: public goes through the existing adapter registry,
* virtual relays through VirtualLlmService.enqueueInferTask (mirrors
* the same branch in `routes/llm-infer.ts` from v1 Stage 3).
*
* Throws when virtualLlms isn't wired but the row is virtual — older
* test wirings hit this path.
*/
private async runOneInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): Promise<{ status: number; body: unknown }> {
if (ctx.llmKind === 'virtual') {
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm',
);
}
const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false);
return ref.done;
}
const adapter = this.adapters.get(ctx.llmType);
return adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
}
private async prepareContext(args: ChatRequestArgs): Promise<{
threadId: string;
history: OpenAiMessage[];
systemBlock: string;
llmName: string;
llmType: string;
/** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
@@ -435,6 +548,7 @@ export class ChatService {
systemBlock,
llmName: llm.name,
llmType: llm.type,
llmKind: llm.kind,
modelOverride: llm.model,
url: llm.url,
apiKey,

View File

@@ -28,6 +28,7 @@ import { randomUUID } from 'node:crypto';
import type { ILlmRepository } from '../repositories/llm.repository.js';
import type { OpenAiChatRequest } from './llm/types.js';
import { NotFoundError } from './mcp-server.service.js';
import type { AgentService } from './agent.service.js';
/** A virtual provider's announcement at registration time. */
export interface RegisterProviderInput {
@@ -37,6 +38,15 @@ export interface RegisterProviderInput {
tier?: string;
description?: string;
extraConfig?: Record<string, unknown>;
/**
* Optional. Lets the publisher hint that the underlying backend is
* asleep — mcpd records the row as `hibernating` and will dispatch a
* `wake` task before any inference. Defaults to `active` (today's
* behavior). v2 publishers (mcplocal with a configured wake recipe)
* pass 'hibernating' when `LlmProvider.isAvailable()` returns false at
* publish time.
*/
initialStatus?: 'active' | 'hibernating';
}
export interface RegisterResult {
@@ -103,8 +113,23 @@ export interface PendingTaskRef {
export class VirtualLlmService implements IVirtualLlmService {
private readonly sessions = new Map<string, VirtualSessionHandle>();
private readonly tasksById = new Map<string, PendingTask>();
/**
* Dedupe concurrent wake requests for the same Llm. The first request
* starts the wake; later requests for the same name await the same
* promise. Cleared as soon as the wake settles (success or failure).
*/
private readonly wakeInFlight = new Map<string, Promise<void>>();
constructor(private readonly repo: ILlmRepository) {}
constructor(
private readonly repo: ILlmRepository,
/**
* Optional. v3 wires AgentService here so the lifecycle cascades:
* heartbeat → bump owned agents; disconnect → mark agents inactive;
* gcSweep → sweep agents first, then delete pinned-to-Llm cascade
* before deleting the Llm itself (Agent.llmId is Restrict).
*/
private readonly agents?: AgentService,
) {}
async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> {
const sessionId = input.providerSessionId ?? randomUUID();
@@ -112,6 +137,7 @@ export class VirtualLlmService implements IVirtualLlmService {
const llms: Llm[] = [];
for (const p of input.providers) {
const initialStatus = p.initialStatus ?? 'active';
const existing = await this.repo.findByName(p.name);
if (existing === null) {
const created = await this.repo.create({
@@ -123,7 +149,7 @@ export class VirtualLlmService implements IVirtualLlmService {
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
status: initialStatus,
lastHeartbeatAt: now,
inactiveSince: null,
});
@@ -156,7 +182,7 @@ export class VirtualLlmService implements IVirtualLlmService {
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
status: initialStatus,
lastHeartbeatAt: now,
inactiveSince: null,
});
@@ -168,7 +194,6 @@ export class VirtualLlmService implements IVirtualLlmService {
async heartbeat(providerSessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(providerSessionId);
if (owned.length === 0) return;
const now = new Date();
for (const row of owned) {
// Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a
@@ -180,6 +205,11 @@ export class VirtualLlmService implements IVirtualLlmService {
: {}),
});
}
// Cascade to virtual agents owned by the same session — same heartbeat
// covers them. No-op if AgentService isn't wired (older test configs).
if (this.agents !== undefined) {
await this.agents.heartbeatVirtualAgents(providerSessionId);
}
}
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void {
@@ -198,6 +228,10 @@ export class VirtualLlmService implements IVirtualLlmService {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
}
}
// Cascade to virtual agents owned by the same session.
if (this.agents !== undefined) {
await this.agents.markVirtualAgentsInactiveBySession(providerSessionId);
}
// Reject any in-flight tasks for this session — the relay can't deliver
// a result POST anymore.
for (const t of this.tasksById.values()) {
@@ -220,9 +254,9 @@ export class VirtualLlmService implements IVirtualLlmService {
{ statusCode: 500 },
);
}
if (llm.status !== 'active') {
if (llm.status === 'inactive') {
throw Object.assign(
new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`),
new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`),
{ statusCode: 503 },
);
}
@@ -234,6 +268,16 @@ export class VirtualLlmService implements IVirtualLlmService {
);
}
// ── Wake-on-demand (v2) ──
// Status=hibernating means the publisher told us at register time
// (or via a later status update) that the backend is asleep. Fire a
// wake task and wait for the publisher to confirm readiness before
// dispatching the actual inference. Concurrent infers for the same
// Llm share a single wake promise.
if (llm.status === 'hibernating') {
await this.ensureAwake(llm.id, llm.name, llm.providerSessionId, handle);
}
const taskId = randomUUID();
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
@@ -275,6 +319,77 @@ export class VirtualLlmService implements IVirtualLlmService {
};
}
/**
* Drive the publisher to wake the backend. Concurrent callers for the
* same Llm name share the in-flight promise — we only ever ask the
* publisher once. Throws on timeout or recipe failure; on success the
* row is flipped to active and subsequent infer calls proceed.
*/
private async ensureAwake(
llmId: string,
llmName: string,
sessionId: string,
handle: VirtualSessionHandle,
): Promise<void> {
const existing = this.wakeInFlight.get(llmName);
if (existing !== undefined) {
await existing;
return;
}
const promise = this.runWake(llmId, llmName, sessionId, handle);
this.wakeInFlight.set(llmName, promise);
try {
await promise;
} finally {
this.wakeInFlight.delete(llmName);
}
}
private async runWake(
llmId: string,
llmName: string,
sessionId: string,
handle: VirtualSessionHandle,
): Promise<void> {
const taskId = randomUUID();
let resolveDone!: () => void;
let rejectDone!: (err: Error) => void;
const done = new Promise<void>((resolve, reject) => {
resolveDone = resolve;
rejectDone = reject;
});
const pending: PendingTask = {
taskId,
sessionId,
llmName,
streaming: false,
// Wake tasks return { ok: true } on success or never resolve at
// all if the publisher dies; the rejectNonStreaming path covers
// the disconnect-mid-wake case via unbindSession.
resolveNonStreaming: (_body, status) => {
if (status >= 200 && status < 300) resolveDone();
else rejectDone(new Error(`wake task returned status ${String(status)}`));
},
rejectNonStreaming: rejectDone,
pushChunk: null,
};
this.tasksById.set(taskId, pending);
handle.pushTask({ kind: 'wake', taskId, llmName });
await done;
// Flip the row to active so subsequent infer calls go through the
// normal active path. The publisher's own heartbeat will keep the
// row alive from this point.
await this.repo.update(llmId, {
status: 'active',
lastHeartbeatAt: new Date(),
inactiveSince: null,
});
}
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
const t = this.tasksById.get(taskId);
if (t === undefined) return false;
@@ -308,6 +423,16 @@ export class VirtualLlmService implements IVirtualLlmService {
let markedInactive = 0;
let deleted = 0;
// v3: sweep virtual agents FIRST so any Llm-pinned agent that's
// about to be cascaded (because its Llm is also expiring) is gone
// before we attempt to delete the Llm. Agent.llmId is Restrict and
// would otherwise block.
if (this.agents !== undefined) {
const agentSweep = await this.agents.gcSweepVirtualAgents(now);
markedInactive += agentSweep.markedInactive;
deleted += agentSweep.deleted;
}
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
for (const row of stale) {
@@ -318,6 +443,13 @@ export class VirtualLlmService implements IVirtualLlmService {
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
const expired = await this.repo.findExpiredInactives(deletionCutoff);
for (const row of expired) {
// Final defensive cascade: drop any virtual agents still pinned
// to this Llm (e.g. their lastHeartbeatAt happens to lag the
// Llm's by a few seconds and they didn't make this round's
// 4-h cutoff). Without this we'd hit a Restrict FK error.
if (this.agents !== undefined) {
await this.agents.deleteVirtualAgentsForLlm(row.id);
}
await this.repo.delete(row.id);
deleted += 1;
}

View File

@@ -0,0 +1,251 @@
import { describe, it, expect, vi } from 'vitest';
import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js';
import type { AgentService } from '../src/services/agent.service.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
import type { IChatRepository } from '../src/repositories/chat.repository.js';
import type { IPromptRepository } from '../src/repositories/prompt.repository.js';
import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { ChatMessage, ChatThread, Prompt } from '@prisma/client';
const NOW = new Date();
/**
* Tests targeting v3 Stage 1's chat.service kind=virtual branch.
* Mirror the existing chat-service.test.ts patterns but isolate the
* adapter-vs-relay dispatch decision.
*/
function mockChatRepo(): IChatRepository {
const msgs: ChatMessage[] = [];
const threads: ChatThread[] = [];
let idCounter = 1;
return {
createThread: vi.fn(async ({ agentId, ownerId, title }) => {
const t: ChatThread = {
id: `thread-${String(idCounter++)}`, agentId, ownerId,
title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW,
};
threads.push(t);
return t;
}),
findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null),
listThreadsByAgent: vi.fn(async () => []),
listMessages: vi.fn(async () => []),
appendMessage: vi.fn(async (input) => {
const m: ChatMessage = {
id: `msg-${String(idCounter++)}`,
threadId: input.threadId,
turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length,
role: input.role,
content: input.content,
toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'],
toolCallId: input.toolCallId ?? null,
status: input.status ?? 'complete',
createdAt: NOW,
};
msgs.push(m);
return m;
}),
updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)),
markPendingAsError: vi.fn(async () => 0),
touchThread: vi.fn(async () => undefined),
nextTurnIndex: vi.fn(async () => msgs.length),
};
}
function mockAgents(): AgentService {
return {
getByName: vi.fn(async (name: string) => ({
id: `agent-${name}`, name, description: '',
systemPrompt: 'You are a helpful agent.',
llm: { id: 'llm-1', name: 'vllm-local' },
project: null,
defaultPersonality: null,
proxyModelName: null,
defaultParams: {},
extras: {},
ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW,
})),
} as unknown as AgentService;
}
function mockLlmsVirtual(): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'fake',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: 'virtual',
status: 'active',
lastHeartbeatAt: NOW,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => ''),
} as unknown as LlmService;
}
function mockPromptRepo(): IPromptRepository {
return {
findAll: vi.fn(async () => []),
findGlobal: vi.fn(async () => []),
findByAgent: vi.fn(async () => []),
findById: vi.fn(async () => null),
findByNameAndProject: vi.fn(async () => null),
findByNameAndAgent: vi.fn(async () => null),
create: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
};
}
function mockTools(): ChatToolDispatcher {
return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) };
}
function emptyAdapterRegistry(): LlmAdapterRegistry {
return {
get: () => { throw new Error('adapter should not be used for kind=virtual'); },
} as unknown as LlmAdapterRegistry;
}
function mockVirtualLlms(opts: {
reply?: string;
rejectWith?: Error;
streamingChunks?: string[];
}): IVirtualLlmService {
const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => {
if (opts.rejectWith !== undefined) {
return {
taskId: 't-1',
done: Promise.reject(opts.rejectWith),
onChunk: () => () => undefined,
};
}
if (!streaming) {
const body = {
choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }],
};
return {
taskId: 't-1',
done: Promise.resolve({ status: 200, body }),
onChunk: () => () => undefined,
};
}
// Streaming path: collect subscribers, push the configured chunks
// synchronously, then resolve done.
const subs = new Set<(c: { data: string; done?: boolean }) => void>();
const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}'];
return {
taskId: 't-1',
done: (async (): Promise<{ status: number; body: unknown }> => {
// Wait long enough for the caller to register subscribers
// before fanning chunks. Promise.resolve() isn't enough — the
// microtask running this IIFE is queued ahead of the caller's
// await on enqueueInferTask, so subs would still be empty.
await new Promise((r) => setTimeout(r, 0));
for (const c of chunks) for (const s of subs) s({ data: c });
for (const s of subs) s({ data: '[DONE]', done: true });
return { status: 200, body: null };
})(),
onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); },
};
});
return {
register: vi.fn(),
heartbeat: vi.fn(),
bindSession: vi.fn(),
unbindSession: vi.fn(),
enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'],
completeTask: vi.fn(),
pushTaskChunk: vi.fn(),
failTask: vi.fn(),
gcSweep: vi.fn(),
};
}
describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({ reply: 'hello back from local' });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toBe('hello back from local');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array) }),
false,
);
});
it('streaming relays through VirtualLlmService and emits the same text deltas', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({
streamingChunks: [
'{"choices":[{"delta":{"content":"hello "}}]}',
'{"choices":[{"delta":{"content":"world"}}]}',
],
});
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const deltas: string[] = [];
for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
if (evt.type === 'text') deltas.push(evt.delta);
if (evt.type === 'final') break;
}
expect(deltas.join('')).toBe('hello world');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array), stream: true }),
true,
);
});
it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => {
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
// no personalities, no virtualLlms
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/virtualLlms dispatcher not wired/);
});
it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => {
const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/publisher offline/);
});
});

View File

@@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } |
} as unknown as AgentService;
}
function mockLlms(): LlmService {
function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: opts.kind ?? 'public',
status: 'active',
lastHeartbeatAt: null,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => 'fake-key'),

View File

@@ -0,0 +1,376 @@
import { describe, it, expect, vi } from 'vitest';
import { AgentService, type VirtualAgentInput } from '../src/services/agent.service.js';
import { VirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { IAgentRepository } from '../src/repositories/agent.repository.js';
import type { ILlmRepository } from '../src/repositories/llm.repository.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { ProjectService } from '../src/services/project.service.js';
import type { Agent, Llm } from '@prisma/client';
/**
* v3 Stage 2 — virtual-agent lifecycle methods on AgentService and the
* cascade callbacks wired into VirtualLlmService.gcSweep / heartbeat /
* unbindSession. Mirrors the shape of virtual-llm-service.test.ts but
* focused on the agent-side state machine + the Llm→Agent cascade.
*/
const NOW = new Date();
function makeAgent(overrides: Partial<Agent> = {}): Agent {
return {
id: `agent-${Math.random().toString(36).slice(2, 8)}`,
name: 'fake-agent',
description: '',
systemPrompt: '',
llmId: 'llm-1',
projectId: null,
defaultPersonalityId: null,
proxyModelName: null,
defaultParams: {} as Agent['defaultParams'],
extras: {} as Agent['extras'],
kind: 'virtual',
providerSessionId: 'sess-1',
lastHeartbeatAt: NOW,
status: 'active',
inactiveSince: null,
ownerId: 'owner-1',
version: 1,
createdAt: NOW,
updatedAt: NOW,
...overrides,
};
}
function makeLlm(overrides: Partial<Llm> = {}): Llm {
return {
id: `llm-${Math.random().toString(36).slice(2, 8)}`,
name: 'vllm-local',
type: 'openai',
model: 'm',
url: '',
tier: 'fast',
description: '',
apiKeySecretId: null,
apiKeySecretKey: null,
extraConfig: {} as Llm['extraConfig'],
kind: 'virtual',
providerSessionId: 'sess-1',
lastHeartbeatAt: NOW,
status: 'active',
inactiveSince: null,
version: 1,
createdAt: NOW,
updatedAt: NOW,
...overrides,
};
}
function mockAgentRepo(initial: Agent[] = []): IAgentRepository {
const rows = new Map<string, Agent>(initial.map((r) => [r.id, r]));
let counter = rows.size;
return {
findAll: vi.fn(async () => [...rows.values()]),
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
findByName: vi.fn(async (name: string) => {
for (const r of rows.values()) if (r.name === name) return r;
return null;
}),
findByProjectId: vi.fn(async () => []),
findBySessionId: vi.fn(async (sid: string) =>
[...rows.values()].filter((r) => r.providerSessionId === sid)),
findByLlmId: vi.fn(async (llmId: string) =>
[...rows.values()].filter((r) => r.llmId === llmId)),
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'active'
&& r.lastHeartbeatAt !== null
&& r.lastHeartbeatAt < cutoff)),
findExpiredInactives: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'inactive'
&& r.inactiveSince !== null
&& r.inactiveSince < cutoff)),
create: vi.fn(async (data) => {
counter += 1;
const row = makeAgent({
id: `agent-${String(counter)}`,
name: data.name,
description: data.description ?? '',
systemPrompt: data.systemPrompt ?? '',
llmId: data.llmId,
projectId: data.projectId ?? null,
kind: data.kind ?? 'public',
providerSessionId: data.providerSessionId ?? null,
status: data.status ?? 'active',
lastHeartbeatAt: data.lastHeartbeatAt ?? null,
inactiveSince: data.inactiveSince ?? null,
ownerId: data.ownerId,
});
rows.set(row.id, row);
return row;
}),
update: vi.fn(async (id, data) => {
const existing = rows.get(id);
if (!existing) throw new Error('not found');
const next: Agent = {
...existing,
...(data.description !== undefined ? { description: data.description } : {}),
...(data.systemPrompt !== undefined ? { systemPrompt: data.systemPrompt } : {}),
...(data.llmId !== undefined ? { llmId: data.llmId } : {}),
...(data.projectId !== undefined ? { projectId: data.projectId } : {}),
...(data.kind !== undefined ? { kind: data.kind } : {}),
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
...(data.status !== undefined ? { status: data.status } : {}),
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
};
rows.set(id, next);
return next;
}),
delete: vi.fn(async (id: string) => { rows.delete(id); }),
};
}
function mockLlms(): LlmService {
return {
getById: vi.fn(async (id: string) => ({ id, name: 'vllm-local', type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
} as unknown as LlmService;
}
function mockProjects(): ProjectService {
return {
getById: vi.fn(async (id: string) => ({ id, name: 'mcpctl-dev' })),
resolveAndGet: vi.fn(async (idOrName: string) => ({
id: idOrName === 'mcpctl-dev' ? 'proj-1' : 'proj-other',
name: idOrName,
})),
} as unknown as ProjectService;
}
describe('AgentService — virtual-agent lifecycle (v3 Stage 2)', () => {
it('registerVirtualAgents inserts new rows with kind=virtual / status=active', async () => {
const repo = mockAgentRepo();
const svc = new AgentService(repo, mockLlms(), mockProjects());
const inputs: VirtualAgentInput[] = [
{ name: 'local-coder', llmName: 'vllm-local', description: 'd', systemPrompt: 's' },
];
const out = await svc.registerVirtualAgents('sess-1', inputs, 'owner-1');
expect(out).toHaveLength(1);
expect(out[0]!.kind).toBe('virtual');
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents reuses an existing row from the same session (sticky reconnect)', async () => {
const existing = makeAgent({ name: 'local-coder', providerSessionId: 'sess-1', status: 'inactive', inactiveSince: NOW });
const repo = mockAgentRepo([existing]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const out = await svc.registerVirtualAgents(
'sess-1',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
);
expect(out[0]!.id).toBe(existing.id);
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents adopts an inactive virtual from a different session', async () => {
const existing = makeAgent({
name: 'local-coder', providerSessionId: 'old-session',
status: 'inactive', inactiveSince: NOW,
});
const repo = mockAgentRepo([existing]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const out = await svc.registerVirtualAgents(
'new-session',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
);
expect(out[0]!.id).toBe(existing.id);
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents refuses to overwrite a public agent (409)', async () => {
const repo = mockAgentRepo([makeAgent({ name: 'reviewer', kind: 'public', providerSessionId: null })]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await expect(svc.registerVirtualAgents(
'sess-x',
[{ name: 'reviewer', llmName: 'vllm-local' }],
'owner-1',
)).rejects.toThrow(/Cannot publish over public Agent/);
});
it('registerVirtualAgents refuses if another active session owns the name', async () => {
const repo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'other', status: 'active' })]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await expect(svc.registerVirtualAgents(
'mine',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
)).rejects.toThrow(/already active under a different session/);
});
it('heartbeatVirtualAgents bumps + revives inactive', async () => {
const past = new Date(Date.now() - 5_000);
const a = makeAgent({ name: 'a', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past });
const repo = mockAgentRepo([a]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await svc.heartbeatVirtualAgents('sess');
const row = await repo.findByName('a');
expect(row?.status).toBe('active');
expect(row?.inactiveSince).toBeNull();
expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
});
it('markVirtualAgentsInactiveBySession flips owned actives to inactive', async () => {
const repo = mockAgentRepo([
makeAgent({ name: 'a', providerSessionId: 'sess' }),
makeAgent({ name: 'b', providerSessionId: 'sess' }),
makeAgent({ name: 'c', providerSessionId: 'other' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await svc.markVirtualAgentsInactiveBySession('sess');
expect((await repo.findByName('a'))?.status).toBe('inactive');
expect((await repo.findByName('b'))?.status).toBe('inactive');
expect((await repo.findByName('c'))?.status).toBe('active');
});
it('deleteVirtualAgentsForLlm deletes only virtuals pinned to that Llm', async () => {
const repo = mockAgentRepo([
makeAgent({ name: 'v-1', llmId: 'doomed', kind: 'virtual' }),
makeAgent({ name: 'v-2', llmId: 'doomed', kind: 'virtual' }),
makeAgent({ name: 'pub-1', llmId: 'doomed', kind: 'public', providerSessionId: null }),
makeAgent({ name: 'v-other', llmId: 'safe', kind: 'virtual' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const deleted = await svc.deleteVirtualAgentsForLlm('doomed');
expect(deleted).toBe(2);
expect(await repo.findByName('v-1')).toBeNull();
expect(await repo.findByName('v-2')).toBeNull();
expect(await repo.findByName('pub-1')).not.toBeNull();
expect(await repo.findByName('v-other')).not.toBeNull();
});
it('gcSweepVirtualAgents flips heartbeat-stale + deletes 4h-old inactive', async () => {
const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago, past 90s cutoff
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago, past 4h cutoff
const repo = mockAgentRepo([
makeAgent({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }),
makeAgent({ name: 'old', providerSessionId: 'b', status: 'inactive', inactiveSince: ancient }),
makeAgent({ name: 'pub', providerSessionId: null, kind: 'public' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const r = await svc.gcSweepVirtualAgents();
expect(r.markedInactive).toBe(1);
expect(r.deleted).toBe(1);
expect((await repo.findByName('stale'))?.status).toBe('inactive');
expect(await repo.findByName('old')).toBeNull();
expect(await repo.findByName('pub')).not.toBeNull();
});
});
describe('VirtualLlmService cascade through AgentService (v3 Stage 2)', () => {
function mockLlmRepo(initial: Llm[] = []): ILlmRepository {
const rows = new Map<string, Llm>(initial.map((r) => [r.id, r]));
let counter = rows.size;
return {
findAll: vi.fn(async () => [...rows.values()]),
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
findByName: vi.fn(async (name: string) => {
for (const r of rows.values()) if (r.name === name) return r;
return null;
}),
findByTier: vi.fn(async () => []),
findBySessionId: vi.fn(async (sid: string) =>
[...rows.values()].filter((r) => r.providerSessionId === sid)),
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'active'
&& r.lastHeartbeatAt !== null
&& r.lastHeartbeatAt < cutoff)),
findExpiredInactives: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'inactive'
&& r.inactiveSince !== null
&& r.inactiveSince < cutoff)),
create: vi.fn(async (data) => {
counter += 1;
const row = makeLlm({ id: `llm-${String(counter)}`, name: data.name, type: data.type });
rows.set(row.id, row);
return row;
}),
update: vi.fn(async (id, data) => {
const existing = rows.get(id);
if (!existing) throw new Error('not found');
const next: Llm = { ...existing, ...data } as Llm;
rows.set(id, next);
return next;
}),
delete: vi.fn(async (id: string) => { rows.delete(id); }),
};
}
it('unbindSession cascades to mark virtual agents inactive', async () => {
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'local-coder', providerSessionId: 'sess' }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.unbindSession('sess');
expect((await agentRepo.findByName('local-coder'))?.status).toBe('inactive');
});
it('gcSweep deletes virtual agents BEFORE their pinned virtual Llm', async () => {
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
const llmRepo = mockLlmRepo([makeLlm({
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
status: 'inactive', inactiveSince: ancient,
})]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: ancient }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
const r = await svc.gcSweep();
expect(r.deleted).toBeGreaterThanOrEqual(2); // 1 agent + 1 llm
expect(await llmRepo.findByName('vllm-local')).toBeNull();
expect(await agentRepo.findByName('pinned')).toBeNull();
});
it('gcSweep defensive cascade: still drops the agent when its heartbeat lagged the Llm', async () => {
// The Llm is past the 4h cutoff. The agent is inactive but only
// 1h old — wouldn't be GC'd by gcSweepVirtualAgents on its own.
// The defensive cascade in gcSweep deletes it anyway because the
// Restrict FK would otherwise block the Llm delete.
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
const recent = new Date(Date.now() - 1 * 60 * 60 * 1000);
const llmRepo = mockLlmRepo([makeLlm({
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
status: 'inactive', inactiveSince: ancient,
})]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: recent }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.gcSweep();
expect(await llmRepo.findByName('vllm-local')).toBeNull();
expect(await agentRepo.findByName('pinned')).toBeNull();
});
it('heartbeat cascades to bump owned virtual agents', async () => {
const past = new Date(Date.now() - 10_000);
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', lastHeartbeatAt: past })]);
const agentRepo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'sess', lastHeartbeatAt: past })]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.heartbeat('sess');
const a = await agentRepo.findByName('local-coder');
expect(a!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
});
});

View File

@@ -332,6 +332,108 @@ describe('VirtualLlmService', () => {
expect(await repo.findByName('public-survivor')).not.toBeNull();
});
// ── v2: wake-before-infer ──
it('hibernating: dispatches a wake task first and waits for it to complete before infer', async () => {
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
const svc = new VirtualLlmService(repo);
const session = fakeSession();
svc.bindSession('sess', session);
// Kick off enqueueInferTask. It blocks on the wake task.
const inferPromise = svc.enqueueInferTask(
'sleeping',
{ model: 'm', messages: [{ role: 'user', content: 'hi' }] },
false,
);
// Wait a tick so the wake task gets pushed.
await new Promise((r) => setTimeout(r, 0));
expect(session.tasks).toHaveLength(1);
const wakeTask = session.tasks[0] as { kind: string; taskId: string; llmName: string };
expect(wakeTask.kind).toBe('wake');
expect(wakeTask.llmName).toBe('sleeping');
// Resolve the wake task — service flips the row to active, then
// pushes the infer task on the same session.
expect(svc.completeTask(wakeTask.taskId, { status: 200, body: { ok: true } })).toBe(true);
const ref = await inferPromise;
expect(session.tasks).toHaveLength(2);
const inferTask = session.tasks[1] as { kind: string; taskId: string };
expect(inferTask.kind).toBe('infer');
expect(inferTask.taskId).toBe(ref.taskId);
// The row should be active now — concurrent callers won't trigger another wake.
const row = await repo.findByName('sleeping');
expect(row?.status).toBe('active');
});
it('hibernating: concurrent infer requests share a single wake task', async () => {
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
const svc = new VirtualLlmService(repo);
const session = fakeSession();
svc.bindSession('sess', session);
// Fire 3 concurrent infer requests against the same hibernating LLM.
const reqs = [
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
];
await new Promise((r) => setTimeout(r, 0));
// Exactly one wake task pushed, regardless of concurrent infers.
const wakeTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'wake');
expect(wakeTasks).toHaveLength(1);
const wakeTaskId = (session.tasks[0] as { taskId: string }).taskId;
expect(svc.completeTask(wakeTaskId, { status: 200, body: { ok: true } })).toBe(true);
const refs = await Promise.all(reqs);
// After wake, all 3 infer tasks pushed — total session tasks = 1 wake + 3 infer.
const inferTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'infer');
expect(inferTasks).toHaveLength(3);
expect(refs.map((r) => r.taskId).sort()).toEqual(refs.map((r) => r.taskId).sort());
});
it('hibernating: rejects when the wake task fails', async () => {
const repo = mockRepo([makeLlm({ name: 'broken', providerSessionId: 'sess', status: 'hibernating' })]);
const svc = new VirtualLlmService(repo);
svc.bindSession('sess', fakeSession());
const inferPromise = svc.enqueueInferTask(
'broken',
{ model: 'm', messages: [] },
false,
);
await new Promise((r) => setTimeout(r, 0));
// Get the wake task id from the in-flight tasks map (its only entry).
// We test the failure path via failTask which is part of the public
// surface used by the result-POST route handler.
const taskIds: string[] = [];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for (const id of (svc as any).tasksById.keys()) taskIds.push(id);
expect(taskIds).toHaveLength(1);
expect(svc.failTask(taskIds[0]!, new Error('wake recipe failed'))).toBe(true);
await expect(inferPromise).rejects.toThrow(/wake recipe failed/);
// Row stayed hibernating — the next request will get another wake try.
const row = await repo.findByName('broken');
expect(row?.status).toBe('hibernating');
});
it('inactive: still rejects with 503 (publisher offline) — wake path only fires for hibernating', async () => {
const repo = mockRepo([makeLlm({ name: 'gone', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]);
const svc = new VirtualLlmService(repo);
svc.bindSession('sess', fakeSession());
await expect(
svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false),
).rejects.toThrow(/inactive|publisher offline/);
});
it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => {
const long = new Date(Date.now() - 5 * 60 * 1000);
const repo = mockRepo([

View File

@@ -80,8 +80,25 @@ export interface LlmProviderFileEntry {
* Default: false — existing setups don't change behavior.
*/
publish?: boolean;
/**
* Optional wake recipe for hibernating backends. When set, a provider
* whose `isAvailable()` returns false at registrar start time is
* published as `status=hibernating`. The next inference request that
* lands on the row triggers this recipe; mcplocal polls
* `isAvailable()` until it returns true (or times out) and then flips
* the row to active so mcpd can dispatch the queued inference.
*
* Two recipe types:
* - `http`: POST to a URL (e.g. an external sleep/wake controller)
* - `command`: spawn a shell command (e.g. `systemctl --user start vllm`)
*/
wake?: WakeRecipe;
}
export type WakeRecipe =
| { type: 'http'; url: string; method?: 'GET' | 'POST'; headers?: Record<string, string>; body?: string; maxWaitSeconds?: number }
| { type: 'command'; command: string; args?: string[]; maxWaitSeconds?: number };
export interface ProjectLlmOverride {
model?: string;
provider?: string;

View File

@@ -215,6 +215,7 @@ async function maybeStartVirtualLlmRegistrar(
model: entry.model ?? entry.name,
};
if (entry.tier !== undefined) item.tier = entry.tier;
if (entry.wake !== undefined) item.wake = entry.wake;
published.push(item);
}
if (published.length === 0) return null;

View File

@@ -27,7 +27,9 @@ import http from 'node:http';
import https from 'node:https';
import { promises as fs } from 'node:fs';
import { dirname } from 'node:path';
import { spawn } from 'node:child_process';
import type { LlmProvider, CompletionOptions } from './types.js';
import type { WakeRecipe } from '../http/config.js';
export interface RegistrarLogger {
info: (msg: string) => void;
@@ -45,6 +47,13 @@ export interface RegistrarPublishedProvider {
tier?: 'fast' | 'heavy';
/** Optional human-readable description for `mcpctl get llm`. */
description?: string;
/**
* Optional wake recipe for backends that hibernate. When provided AND
* `provider.isAvailable()` returns false at registrar start, the row is
* published with status=hibernating; on the first incoming `wake` task
* the registrar runs this recipe and waits for the backend to come up.
*/
wake?: WakeRecipe;
}
export interface RegistrarOptions {
@@ -140,15 +149,28 @@ export class VirtualLlmRegistrar {
}
private async register(): Promise<void> {
const body: Record<string, unknown> = {
providers: this.opts.publishedProviders.map((p) => ({
// Decide initial status per provider. A provider with a wake recipe
// that's NOT currently available comes up as hibernating; otherwise
// active (today's behavior). isAvailable() is forgiving — any
// unexpected throw is treated as "not available" so a transient
// network blip during boot doesn't crash the registrar.
const providers = await Promise.all(this.opts.publishedProviders.map(async (p) => {
let initialStatus: 'active' | 'hibernating' = 'active';
if (p.wake !== undefined) {
let alive = false;
try { alive = await p.provider.isAvailable(); } catch { alive = false; }
if (!alive) initialStatus = 'hibernating';
}
return {
name: p.provider.name,
type: p.type,
model: p.model,
...(p.tier !== undefined ? { tier: p.tier } : {}),
...(p.description !== undefined ? { description: p.description } : {}),
})),
};
initialStatus,
};
}));
const body: Record<string, unknown> = { providers };
if (this.sessionId !== null) body['providerSessionId'] = this.sessionId;
const res = await postJson(
@@ -276,9 +298,51 @@ export class VirtualLlmRegistrar {
void this.handleInferTask(task);
return;
}
// Wake tasks are reserved for v2 — acknowledge with an error so mcpd
// surfaces a clean failure rather than waiting forever.
void this.postResult(task.taskId, { error: 'wake task type not implemented in this client (v2)' });
if (task.kind === 'wake') {
void this.handleWakeTask(task);
return;
}
}
/**
* Run the configured wake recipe and poll the provider until it comes
* up. Sends a `{ status: 200, body: { ok: true } }` result on success;
* `{ error }` on timeout or recipe failure. While waiting, also bumps
* the heartbeat so mcpd's GC sweep doesn't decide we're stale mid-wake.
*/
private async handleWakeTask(task: { kind: 'wake'; taskId: string; llmName: string }): Promise<void> {
const published = this.opts.publishedProviders.find((p) => p.provider.name === task.llmName);
if (published === undefined) {
await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` });
return;
}
if (published.wake === undefined) {
await this.postResult(task.taskId, { error: `provider '${task.llmName}' has no wake recipe configured` });
return;
}
try {
await runWakeRecipe(published.wake);
// Poll isAvailable() until it comes up (or timeout). Heartbeat
// every poll tick so mcpd doesn't time us out while we're waiting
// on a slow boot.
const maxWaitMs = (published.wake.maxWaitSeconds ?? 60) * 1000;
const started = Date.now();
while (Date.now() - started < maxWaitMs) {
let alive = false;
try { alive = await published.provider.isAvailable(); } catch { alive = false; }
if (alive) {
await this.heartbeatOnce();
await this.postResult(task.taskId, { status: 200, body: { ok: true, ms: Date.now() - started } });
return;
}
await this.heartbeatOnce();
await new Promise((r) => setTimeout(r, 1500));
}
await this.postResult(task.taskId, { error: `provider '${task.llmName}' did not come up within ${String(maxWaitMs)}ms` });
} catch (err) {
await this.postResult(task.taskId, { error: `wake recipe failed: ${(err as Error).message}` });
}
}
private async handleInferTask(task: InferTask): Promise<void> {
@@ -373,6 +437,68 @@ function openAiStreamChunk(
};
}
/**
* Execute a wake recipe. Returns when the recipe completes; throws if it
* fails. Doesn't itself poll for provider readiness — that's the caller's
* job (handleWakeTask polls isAvailable() with its own timeout).
*
* `http`: fires the configured request and considers any 2xx a success.
* The remote service is expected to be a "wake controller" that returns
* quickly; if the underlying boot takes minutes, the controller should
* return 202 and the readiness poll catches up.
*
* `command`: spawns the binary with args, waits for exit. Non-zero exit
* is treated as failure. stdout/stderr are discarded — the recipe's job
* is to *trigger* a wake, not to produce output.
*/
async function runWakeRecipe(recipe: WakeRecipe): Promise<void> {
if (recipe.type === 'http') {
const u = new URL(recipe.url);
const driver = u.protocol === 'https:' ? https : http;
const method = recipe.method ?? 'POST';
const headers: Record<string, string> = { ...(recipe.headers ?? {}) };
const body = recipe.body;
if (body !== undefined) {
headers['Content-Length'] = String(Buffer.byteLength(body));
}
await new Promise<void>((resolve, reject) => {
const req = driver.request({
hostname: u.hostname,
port: u.port || (u.protocol === 'https:' ? 443 : 80),
path: u.pathname + u.search,
method,
headers,
timeout: 30_000,
}, (res) => {
const status = res.statusCode ?? 0;
// Drain so the socket can be reused/freed.
res.resume();
if (status >= 200 && status < 300) resolve();
else reject(new Error(`wake HTTP returned ${String(status)}`));
});
req.on('error', reject);
req.on('timeout', () => { req.destroy(); reject(new Error('wake HTTP timed out')); });
if (body !== undefined) req.write(body);
req.end();
});
return;
}
if (recipe.type === 'command') {
await new Promise<void>((resolve, reject) => {
const child = spawn(recipe.command, recipe.args ?? [], {
stdio: 'ignore',
});
child.on('error', reject);
child.on('exit', (code) => {
if (code === 0) resolve();
else reject(new Error(`wake command exited with code ${String(code)}`));
});
});
return;
}
throw new Error(`unknown wake recipe type`);
}
interface PostResponse { statusCode: number; body: string }
/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */

View File

@@ -142,13 +142,17 @@ describe('VirtualLlmRegistrar', () => {
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
expect(registerCall).toBeDefined();
expect(registerCall!.method).toBe('POST');
const body = JSON.parse(registerCall!.body) as { providers: Array<{ name: string; type: string; model: string; tier: string }> };
expect(body.providers).toEqual([{
const body = JSON.parse(registerCall!.body) as { providers: Array<Record<string, unknown>> };
expect(body.providers).toHaveLength(1);
expect(body.providers[0]).toMatchObject({
name: 'vllm-local',
type: 'openai',
model: 'qwen',
tier: 'fast',
}]);
// v2 always sends initialStatus; defaults to 'active' when no
// wake recipe is configured.
initialStatus: 'active',
});
expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc');
// Sticky session id persisted.
@@ -219,6 +223,107 @@ describe('VirtualLlmRegistrar', () => {
}
});
// ── v2: hibernating + wake recipe ──
it('publishes initialStatus=hibernating when provider is unavailable AND wake is configured', async () => {
const fake = await startFakeServer();
try {
const sleeping: LlmProvider = {
name: 'vllm-local',
async complete() { throw new Error('not running'); },
async listModels() { return []; },
async isAvailable() { return false; },
};
const registrar = new VirtualLlmRegistrar({
mcpdUrl: fake.url,
token: 't',
publishedProviders: [{
provider: sleeping,
type: 'openai',
model: 'm',
wake: { type: 'http', url: 'http://localhost:9999/wake', maxWaitSeconds: 1 },
}],
sessionFilePath: join(tempDir, 'provider-session'),
log: silentLog(),
heartbeatIntervalMs: 60_000,
});
await registrar.start();
await new Promise((r) => setTimeout(r, 20));
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
expect(body.providers[0]!.initialStatus).toBe('hibernating');
registrar.stop();
} finally {
await fake.close();
}
});
it('publishes initialStatus=active when provider is available even with a wake recipe', async () => {
const fake = await startFakeServer();
try {
const awake: LlmProvider = {
name: 'vllm-local',
async complete() { throw new Error('not used'); },
async listModels() { return []; },
async isAvailable() { return true; },
};
const registrar = new VirtualLlmRegistrar({
mcpdUrl: fake.url,
token: 't',
publishedProviders: [{
provider: awake,
type: 'openai',
model: 'm',
wake: { type: 'http', url: 'http://localhost:9999/wake' },
}],
sessionFilePath: join(tempDir, 'provider-session'),
log: silentLog(),
heartbeatIntervalMs: 60_000,
});
await registrar.start();
await new Promise((r) => setTimeout(r, 20));
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
expect(body.providers[0]!.initialStatus).toBe('active');
registrar.stop();
} finally {
await fake.close();
}
});
it('publishes initialStatus=active when no wake recipe is configured (legacy path)', async () => {
const fake = await startFakeServer();
try {
// Provider intentionally returns false but has no wake recipe →
// legacy v1 publishers don't get hibernation behavior.
const sleeping: LlmProvider = {
name: 'vllm-local',
async complete() { return { content: '', toolCalls: [], usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0 }, finishReason: 'stop' }; },
async listModels() { return []; },
async isAvailable() { return false; },
};
const registrar = new VirtualLlmRegistrar({
mcpdUrl: fake.url,
token: 't',
publishedProviders: [{ provider: sleeping, type: 'openai', model: 'm' }],
sessionFilePath: join(tempDir, 'provider-session'),
log: silentLog(),
heartbeatIntervalMs: 60_000,
});
await registrar.start();
await new Promise((r) => setTimeout(r, 20));
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
expect(body.providers[0]!.initialStatus).toBe('active');
registrar.stop();
} finally {
await fake.close();
}
});
it('throws when mcpd returns non-201 from /_provider-register', async () => {
const fake = await startFakeServer();
fake.handler = (_req, res, _body) => {

View File

@@ -207,3 +207,137 @@ describe('virtual-LLM smoke', () => {
expect(res.body).toMatch(/publisher offline|inactive/);
}, 30_000);
});
// ── v2: hibernating + wake-on-demand ──
const HIBERNATING_NAME = `smoke-virtual-hib-${SUFFIX}`;
let hibernatingRegistrar: VirtualLlmRegistrar | null = null;
/**
* Provider that's "asleep" until \`wakeFn()\` is called. Used to drive
* the wake-on-demand smoke without standing up an actual sleep/wake
* controller — we flip the bool from inside the wake recipe.
*/
function makeSleepingProvider(name: string, content: string): {
provider: LlmProvider;
wakeFn: () => void;
wakeCount: () => number;
} {
let awake = false;
let count = 0;
const provider: LlmProvider = {
name,
async complete(): Promise<CompletionResult> {
if (!awake) throw new Error('provider not awake');
return {
content,
toolCalls: [],
usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 },
finishReason: 'stop',
};
},
async listModels() { return []; },
async isAvailable() { return awake; },
};
return {
provider,
wakeFn: () => { awake = true; count += 1; },
wakeCount: () => count,
};
}
describe('virtual-LLM smoke — wake-on-demand', () => {
let wakeServerUrl: string;
let wakeServer: http.Server;
let wakeFn: (() => void) | null = null;
beforeAll(async () => {
if (!mcpdUp) return;
// Tiny in-process "wake controller" — receives the http wake recipe
// POST and flips the local provider's `awake` bool.
await new Promise<void>((resolve) => {
wakeServer = http.createServer((req, res) => {
if (req.url === '/wake' && wakeFn !== null) {
wakeFn();
res.writeHead(200);
res.end('woken');
return;
}
res.writeHead(404);
res.end();
});
wakeServer.listen(0, '127.0.0.1', () => {
const addr = wakeServer.address();
if (addr === null || typeof addr === 'string') throw new Error('listen failed');
wakeServerUrl = `http://127.0.0.1:${String(addr.port)}/wake`;
resolve();
});
});
});
afterAll(async () => {
if (hibernatingRegistrar !== null) hibernatingRegistrar.stop();
if (wakeServer) await new Promise<void>((r) => wakeServer.close(() => r()));
if (mcpdUp) {
const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
if (list.status === 200) {
const rows = JSON.parse(list.body) as Array<{ id: string; name: string }>;
const row = rows.find((r) => r.name === HIBERNATING_NAME);
if (row !== undefined) {
await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined);
}
}
}
});
it('publishes a sleeping provider as kind=virtual / status=hibernating', async () => {
if (!mcpdUp) return;
const token = readToken();
if (token === null) return;
const sleeping = makeSleepingProvider(HIBERNATING_NAME, 'awake now');
wakeFn = sleeping.wakeFn;
const published: RegistrarPublishedProvider[] = [{
provider: sleeping.provider,
type: 'openai',
model: 'fake-hibernating',
tier: 'fast',
wake: { type: 'http', url: wakeServerUrl, method: 'POST', maxWaitSeconds: 5 },
}];
hibernatingRegistrar = new VirtualLlmRegistrar({
mcpdUrl: MCPD_URL,
token,
publishedProviders: published,
sessionFilePath: join(tempDir, 'hib-session'),
log: { info: () => {}, warn: () => {}, error: () => {} },
heartbeatIntervalMs: 60_000,
});
await hibernatingRegistrar.start();
await new Promise((r) => setTimeout(r, 400));
const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
expect(res.status).toBe(200);
const rows = JSON.parse(res.body) as Array<{ name: string; kind: string; status: string }>;
const row = rows.find((r) => r.name === HIBERNATING_NAME);
expect(row, `${HIBERNATING_NAME} must be present`).toBeDefined();
expect(row!.kind).toBe('virtual');
expect(row!.status).toBe('hibernating');
}, 30_000);
it('first inference triggers the wake recipe and then completes', async () => {
if (!mcpdUp) return;
// wakeFn was set in the previous test; it flips the provider's
// `awake` bool when the wake POST lands.
const res = await httpRequest('POST', `${MCPD_URL}/api/v1/llms/${HIBERNATING_NAME}/infer`, {
messages: [{ role: 'user', content: 'wake then say hello' }],
});
expect(res.status).toBe(200);
const body = JSON.parse(res.body) as { choices?: Array<{ message?: { content?: string } }> };
expect(body.choices?.[0]?.message?.content).toBe('awake now');
// After the wake, the row should now be active.
const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
const rows = JSON.parse(list.body) as Array<{ name: string; status: string }>;
expect(rows.find((r) => r.name === HIBERNATING_NAME)?.status).toBe('active');
}, 30_000);
});