Compare commits

...

9 Commits

Author SHA1 Message Date
c0b4dc89f3 Merge pull request 'chore: fulldeploy uses bao-backed pulumi wrapper for drift check' (#68) from chore/fulldeploy-pulumi-wrapper into main
Some checks failed
CI/CD / lint (push) Successful in 54s
CI/CD / test (push) Successful in 1m8s
CI/CD / typecheck (push) Successful in 2m23s
CI/CD / smoke (push) Failing after 1m42s
CI/CD / build (push) Successful in 5m46s
CI/CD / publish (push) Has been skipped
Reviewed-on: #68
2026-04-27 20:21:33 +00:00
Michal
7f49294b36 chore(fulldeploy): use kubernetes-deployment/scripts/pulumi.sh wrapper
Some checks failed
CI/CD / lint (pull_request) Successful in 2m22s
CI/CD / typecheck (pull_request) Successful in 2m57s
CI/CD / test (pull_request) Failing after 4m36s
CI/CD / smoke (pull_request) Has been skipped
CI/CD / build (pull_request) Has been skipped
CI/CD / publish (pull_request) Has been skipped
The pre-flight drift check now calls the bao-backed pulumi wrapper
that landed with the litellm key persistence work, so deploys no
longer need PULUMI_CONFIG_PASSPHRASE in .env or shell env. The
passphrase is fetched from OpenBao at runtime by the wrapper and
exec-passed to pulumi only — never touches the parent shell's
state.

Falls back to a clear warning if the wrapper isn't present (older
clone of kubernetes-deployment) instead of pretending to skip the
check silently.
2026-04-27 19:14:36 +01:00
f5bdeea8e7 Merge pull request 'feat: virtual agents v3 (Stages 1-3) + real fixes for chat/adapter/CLI thread format' (#67) from feat/virtual-agent-v3 into main
Some checks failed
CI/CD / typecheck (push) Successful in 55s
CI/CD / test (push) Successful in 1m10s
CI/CD / lint (push) Successful in 2m32s
CI/CD / smoke (push) Failing after 1m44s
CI/CD / build (push) Successful in 5m4s
CI/CD / publish (push) Has been skipped
Reviewed-on: #67
2026-04-27 18:06:59 +00:00
Michal
1998b733b2 feat(cli+docs): mcpctl get agent KIND/STATUS columns + virtual-agent smoke + docs (v3 Stage 4)
Some checks failed
CI/CD / lint (pull_request) Successful in 55s
CI/CD / test (pull_request) Successful in 1m10s
CI/CD / typecheck (pull_request) Successful in 2m30s
CI/CD / build (pull_request) Successful in 2m36s
CI/CD / smoke (pull_request) Failing after 5m56s
CI/CD / publish (pull_request) Has been skipped
CLI: `mcpctl get agent` table view gains KIND and STATUS columns
mirroring the `get llm` shape from v1. Public agents render as
`public/active` (the AgentRow defaults) and virtual ones surface their
true lifecycle state, so `mcpctl get agent` becomes a single-pane view
for both manually-created and mcplocal-published personas.

Smoke: tests/smoke/virtual-agent.smoke.test.ts mirrors virtual-llm's
in-process registrar pattern — publishes a fake provider + agent in
one round-trip, confirms mcpd surfaces the agent kind=virtual /
status=active under /api/v1/agents, then disconnects and verifies the
paired Llm-and-Agent both flip to inactive (deletion is GC-driven, not
disconnect-driven, so the rows must still exist post-stop). Heartbeat-
stale and 4 h sweep paths are covered by the unit suite to keep smoke
duration in check.

Docs: docs/virtual-llms.md gets a "Virtual agents (v3)" section with a
config sample, lifecycle notes, listing example, and the cluster-wide
name-uniqueness caveat. The API surface block now mentions the new
`agents[]` field on _provider-register, the join-by-session heartbeat
behavior, and the `GET /api/v1/agents` lifecycle fields. docs/agents.md
gains a one-paragraph note pointing to the v3 publishing path.

Tests: full smoke suite 141/141 (was 139, +2 new), unit suites
unchanged (mcpd 860/860, mcplocal 723/723).
2026-04-27 18:47:03 +01:00
Michal
610808b9e7 fix(chat): real fixes for thinking-model + URL conventions, not test tweaks
Some checks failed
CI/CD / lint (pull_request) Successful in 54s
CI/CD / test (pull_request) Successful in 1m7s
CI/CD / typecheck (pull_request) Successful in 2m37s
CI/CD / smoke (pull_request) Failing after 1m43s
CI/CD / build (pull_request) Successful in 5m42s
CI/CD / publish (pull_request) Has been skipped
Five real bugs surfaced by the agent-chat smoke against live
qwen3-thinking. None of these are fixed by changing the test — the
test was right to fail.

1. openai-passthrough adapter doubled `/v1` in the request URL. The
   adapter hard-codes `/v1/chat/completions` after the configured base,
   but every OpenAI-compat provider documents its base URL with a
   trailing `/v1` (api.openai.com/v1, llm.example.com/v1, …). Users
   pasting that conventional shape produced
   `https://x/v1/v1/chat/completions` → 404. endpointUrl now strips a
   trailing `/v1` so both forms canonicalize. `/v1beta` (Anthropic-style)
   is preserved.

2. Non-streaming chat returned an empty assistant when thinking models
   (qwen3-thinking, deepseek-reasoner, OpenAI o1) emitted only
   `reasoning_content` with `content: null`. extractChoice now also
   pulls reasoning (every spelling the streaming parser already knows
   about), and a new pickAssistantText helper falls back to it when
   content is empty. A `[response truncated by max_tokens]` marker is
   appended when finish_reason is `length`, so users see the cut-off
   instead of guessing why the answer is short. Symmetric streaming
   fix: the chatStream loop accumulates reasoning and yields ONE
   synthesized `text` frame at the end when content stayed empty,
   keeping the CLI's stdout (which only prints `text` deltas) in sync
   with the persisted thread message.

3. `mcpctl get agent X -o yaml` emitted `kind: public` (the v3
   lifecycle field) instead of `kind: agent` (apply envelope), so
   round-tripping through `apply -f` failed. Same fix shape as the v1
   Llm strip in toApplyDocs — drop kind/status/lastHeartbeatAt/
   inactiveSince/providerSessionId for the agents resource too.

4. Non-streaming `mcpctl chat` printed `thread:<cuid>` (no space) on
   stderr; streaming printed `(thread: <cuid>)` (with space). Tests
   and any other regex watching for one form missed the other.
   Standardize on `thread: <cuid>` (single space) in both paths.

5. agent-chat.smoke's `run()` used `execSync`, which discards stderr on
   success — making any `expect(stderr).toMatch(...)` assertion
   structurally impossible to satisfy in the happy path. Switch to
   `spawnSync` so stderr is actually captured. Includes a small
   shell-style argv splitter so the existing call sites with quoted
   multi-word values (`--system-prompt "..."`) keep working.

Tests: +6 new mcpd unit tests (4 chat-service for the reasoning
fallback / truncation marker / content-preference / streaming synth;
2 llm-adapters for the URL strip + /v1beta preservation). Full mcpd
+ mcplocal + smoke green: 860/860 + 723/723 + 139/139.
2026-04-27 18:39:01 +01:00
Michal
58bc277242 feat(mcpd+mcplocal): register-agents endpoint + mcplocal agents block (v3 Stage 3)
Extends the existing `_provider-register` payload with an optional `agents`
array so a single round-trip atomically publishes both virtual Llms and
their pinned virtual Agents. v1/v2 publishers (providers-only) keep
working unchanged — the agents path is gated on the route receiving an
AgentService instance, otherwise it logs a warning and ignores the array.

mcplocal config gains a top-level `agents` block (loadLocalAgents)
mirroring the providers shape. The registrar reads it, builds
RegistrarPublishedAgent entries against the published provider names,
and folds them into the same register POST. mcpd routes the agents
through AgentService.registerVirtualAgents(sessionId, ..., ownerId),
which was added in Stage 2.

No CLI changes here — `mcpctl chat <virtual-agent>` already works once
chat.service has the kind=virtual branch (Stage 1) and the agents are
present in the Agent table. CLI columns + smoke land in Stage 4.
2026-04-27 18:38:37 +01:00
Michal
c7b1bd8e2c feat(mcpd): AgentService virtual methods + GC cascade (v3 Stage 2)
State machine for kind=virtual Agent rows. Mirrors what
VirtualLlmService did for Llms in v1, then wires both lifecycles
together so disconnect/heartbeat/GC cascade through both at once.

AgentRepository:
- create/update accept the new lifecycle fields (kind, providerSessionId,
  status, lastHeartbeatAt, inactiveSince).
- Adds findBySessionId, findByLlmId, findStaleVirtuals, findExpiredInactives.

AgentService — new virtual-agent methods:
- registerVirtualAgents(sessionId, inputs, ownerId) — sticky upsert.
  New names insert as kind=virtual/status=active. Existing virtuals
  owned by the same session reactivate; existing inactive virtuals
  from a foreign session can be adopted (sticky reconnect). Refuses
  to overwrite a public agent or a foreign session's still-active
  virtual (HTTP 409). Pinned LLM is resolved via LlmService — caller
  posts Llms first.
- heartbeatVirtualAgents(sessionId) — bumps owned agents on a session
  heartbeat; revives inactive rows.
- markVirtualAgentsInactiveBySession(sessionId) — disconnect cascade.
- deleteVirtualAgentsForLlm(llmId) — defensive cascade for the GC's
  Llm-delete step (Agent.llmId is Restrict).
- gcSweepVirtualAgents() — same shape as VirtualLlmService.gcSweep
  (90s heartbeat-stale → inactive, 4h inactive → delete).

VirtualLlmService:
- Optional AgentService dependency. heartbeat() now also bumps owned
  agents; unbindSession() flips them inactive. gcSweep() runs the
  agent sweep FIRST (so any agent that would block an Llm delete via
  Restrict is already gone), and adds a defensive
  deleteVirtualAgentsForLlm step right before each Llm delete in case
  an agent's heartbeat lagged its Llm's just enough to escape this
  round's 4h cutoff.

main.ts:
- VirtualLlmService construction moves below AgentService so it can
  receive the cascade dependency.

Tests: 13 new in virtual-agent-service.test.ts cover all the register
variants (insert, sticky reconnect, adopt-inactive-foreign, refuse
public-overwrite, refuse foreign-session-active), heartbeat-revive,
disconnect-cascade, deleteVirtualAgentsForLlm scope, GC sweep flip
+ delete + idempotence, and three VirtualLlmService cascade scenarios
(unbindSession, gcSweep deleting agent before Llm, defensive cascade
when agent's heartbeat lagged).

mcpd suite: 854/854 (was 841 + 13 new). Workspace unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:07:23 +01:00
Michal
9afd24a3aa feat(db+mcpd): Agent lifecycle + chat.service kind=virtual branch (v3 Stage 1)
Two pieces of v3 plumbing — schema + the latent v1 chat.service bug.

Schema (db):
- Agent gains kind/providerSessionId/lastHeartbeatAt/status/inactiveSince
  mirroring Llm's v1 lifecycle. Reuses LlmKind / LlmStatus enums; no
  new types. Existing rows backfill kind=public/status=active so v1
  CRUD is unaffected.
- @@index([kind, status]) for the GC sweep, @@index([providerSessionId])
  for disconnect-cascade lookups.
- 4 new prisma-level tests cover defaults, persisting virtual fields,
  the (kind, status) GC index, and providerSessionId lookups.
  Total agent-schema tests: 20/20.

chat.service (mcpd) — fixes the v1 latent bug:
- LlmView's kind is now plumbed through prepareContext as ctx.llmKind.
- Two new private helpers, runOneInference / streamInference, branch
  on ctx.llmKind: 'public' goes through the existing adapter
  registry, 'virtual' relays through VirtualLlmService.enqueueInferTask
  (mirrors the route-handler branch from v1 Stage 3).
- Streaming bridges VirtualLlmService's onChunk callback API to an
  async iterator via a small queue + wake pattern.
- ChatService gains an optional virtualLlms constructor parameter;
  main.ts wires it in. Older test wirings without it raise a clear
  "virtualLlms dispatcher not wired" error when the row is virtual,
  rather than silently falling through to the public path against an
  empty URL.

This unblocks any Agent (public OR future v3-virtual) pinned to a
kind=virtual Llm. Pre-this-stage, those agents 502'd against the
empty url field.

Tests: 4 new chat-service-virtual-llm.test.ts cover the relay path
non-streaming, streaming, missing-dispatcher error, and rejection
surfacing. mcpd suite: 841/841 (was 833, +8 across stages 1+v3-Stage-1).
Workspace: 2054/2054 across 153 files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:07:23 +01:00
9374a2652b perf: vitest threads pool + Dockerfile pnpm cache mount (#66)
Some checks failed
CI/CD / lint (push) Successful in 58s
CI/CD / test (push) Successful in 1m11s
CI/CD / typecheck (push) Successful in 2m35s
CI/CD / smoke (push) Failing after 1m43s
CI/CD / build (push) Successful in 2m21s
CI/CD / publish (push) Has been skipped
2026-04-27 16:07:05 +00:00
24 changed files with 1903 additions and 85 deletions

View File

@@ -204,5 +204,9 @@ mcpctl chat reviewer
- [virtual-llms.md](./virtual-llms.md) — local LLMs (e.g. `vllm-local`)
publishing themselves into `mcpctl get llm` so anyone can chat with
them via `mcpctl chat-llm <name>`. Inference is relayed through the
publishing mcplocal — mcpd never holds the local URL or key.
publishing mcplocal — mcpd never holds the local URL or key. **v3**
extends the same publishing model to **virtual agents** declared in
mcplocal config — they show up in `mcpctl get agent` with
`KIND=virtual / STATUS=active` and become chat-able via
`mcpctl chat <name>` like any other agent.
- [chat.md](./chat.md) — `mcpctl chat` flow and LiteLLM-style flags.

View File

@@ -199,10 +199,87 @@ provider doesn't come up within `maxWaitSeconds`), every queued infer
is rejected with a clear error and the row stays `hibernating`
the next request gets a fresh wake attempt.
## Virtual agents (v3)
Virtual agents extend the same publishing model to **agents** — named
LLM personas with their own system prompt and sampling defaults. mcplocal
declares them in its config alongside its providers, and the existing
`_provider-register` endpoint atomically publishes both Llms and Agents
in one round-trip. They show up under `mcpctl get agent` next to
manually-created public agents and become chat-able via
`mcpctl chat <agent>` — no special command.
### Declaring a virtual agent in mcplocal config
```jsonc
// ~/.mcpctl/config.json
{
"llm": {
"providers": [
{ "name": "vllm-local", "type": "vllm", "model": "Qwen/Qwen2.5-7B-Instruct-AWQ", "publish": true }
]
},
"agents": [
{
"name": "local-coder",
"llm": "vllm-local",
"description": "Local coding assistant on the workstation GPU",
"systemPrompt": "You are a senior engineer. Be terse.",
"defaultParams": { "temperature": 0.2 }
}
]
}
```
`llm` references a published provider's name from the same config. Agents
pinned to a name that isn't being published are still forwarded to mcpd —
the server validates `llmName` and 404s with a clear message if it's
genuinely missing, which lets you point at a *public* Llm if you want.
### Lifecycle
Same shape as virtual Llms — 30 s heartbeat from mcplocal, 90 s
heartbeat-stale → status flips to `inactive`, 4 h inactive → row deleted
by mcpd's GC sweep. Heartbeats cover both Llms and Agents owned by the
session.
The GC orders agent deletes **before** their pinned virtual Llm so the
`Agent.llmId onDelete: Restrict` FK doesn't block the sweep.
### Listing
```sh
$ mcpctl get agents
NAME KIND STATUS LLM PROJECT DESCRIPTION
local-coder virtual active vllm-local - Local coding assistant on…
reviewer public active qwen3-thinking mcpctl-development I review what you're shipping…
```
The `KIND` and `STATUS` columns are the v3 additions. Round-tripping
through `mcpctl get agent X -o yaml | mcpctl apply -f -` strips those
runtime fields cleanly so a virtual agent can be re-declared as a public
one (or vice versa) without manual editing.
### Chatting
```sh
$ mcpctl chat local-coder
> hello?
… streams through mcpd → SSE → mcplocal's vllm-local provider …
```
Same command as for public agents. Works because chat.service has a
`kind=virtual` branch that hands off to `VirtualLlmService.enqueueInferTask`
when the agent's pinned Llm is virtual.
### Cluster-wide name uniqueness
`Agent.name` is unique cluster-wide. Two mcplocals trying to publish the
same agent name collide on the second register with HTTP 409. Per-publisher
namespacing is a v4+ concern — same constraint as virtual Llms in v1.
## Roadmap (later stages)
- **v3 — Virtual agents**: mcplocal publishes its local agent configs
(model + system prompt + sampling defaults) into mcpd's `Agent` table.
- **v4 — LB pool by model**: agents can target a model name instead of
a specific Llm; mcpd picks the healthiest pool member per request.
- **v5 — Task queue**: persisted requests for hibernating/saturated
@@ -211,18 +288,23 @@ the next request gets a fresh wake attempt.
## API surface (v1)
```
POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[] }
POST /api/v1/llms/_provider-register → returns { providerSessionId, llms[], agents[] }
v3: body accepts an optional `agents[]` array
alongside `providers[]`. Atomic publish; older
clients (providers-only) keep working.
GET /api/v1/llms/_provider-stream → SSE channel; require x-mcpctl-provider-session header
POST /api/v1/llms/_provider-heartbeat → { providerSessionId }
POST /api/v1/llms/_provider-heartbeat → { providerSessionId } — bumps both Llms and Agents
owned by the session
POST /api/v1/llms/_provider-task/:id/result
→ one of:
{ error: "msg" }
{ chunk: { data, done? } }
{ status, body }
GET /api/v1/llms → list (now includes kind, status, lastHeartbeatAt, inactiveSince)
GET /api/v1/llms → list (includes kind, status, lastHeartbeatAt, inactiveSince)
POST /api/v1/llms/<virtual>/infer → routes through the SSE relay
DELETE /api/v1/llms/<virtual> → delete unconditionally (also runs GC's job)
GET /api/v1/agents → list (v3: includes kind, status, lastHeartbeatAt, inactiveSince)
```
RBAC piggybacks on `view/edit/create:llms` — no new resource. Publishing

View File

@@ -29,25 +29,27 @@ echo " mcpctl Full Deploy"
echo "========================================"
# --- Pre-flight: Pulumi drift check ---
# Uses the kubernetes-deployment/scripts/pulumi.sh wrapper which pulls
# PULUMI_CONFIG_PASSPHRASE from OpenBao at runtime, so the passphrase
# never needs to live in .env or shell history. Falls back to a warning
# if the wrapper isn't present (older clone of kubernetes-deployment).
echo ""
echo ">>> Pre-flight: checking for Pulumi infra drift"
echo ""
if [ -d "$PULUMI_DIR" ]; then
if [ -z "$PULUMI_CONFIG_PASSPHRASE" ]; then
echo " WARNING: PULUMI_CONFIG_PASSPHRASE not set — skipping drift check."
echo " Set it in .env or export it to enable."
else
preview_output=$(cd "$PULUMI_DIR" && pulumi preview --stack "$PULUMI_STACK" --non-interactive --diff 2>&1) || true
if [ -d "$PULUMI_DIR" ] && [ -x "$PULUMI_DIR/scripts/pulumi.sh" ]; then
preview_output=$("$PULUMI_DIR/scripts/pulumi.sh" preview --stack "$PULUMI_STACK" --non-interactive --diff 2>&1) || true
if echo "$preview_output" | grep -qE '^\s+[-+~]'; then
echo "$preview_output"
echo ""
echo "ERROR: Pulumi detected infra changes that have not been applied."
echo " Run: cd $PULUMI_DIR && pulumi up -s $PULUMI_STACK"
echo " Run: $PULUMI_DIR/scripts/pulumi.sh up -s $PULUMI_STACK"
echo " Then re-run this script."
exit 1
fi
echo " No drift — infra is in sync."
fi # passphrase check
elif [ -d "$PULUMI_DIR" ]; then
echo " WARNING: $PULUMI_DIR/scripts/pulumi.sh not found or not executable —"
echo " skipping drift check. Pull latest kubernetes-deployment."
else
echo " WARNING: Pulumi repo not found at $PULUMI_DIR — skipping drift check."
fi

View File

@@ -151,6 +151,9 @@ async function runOneShot(
const sec = Math.max(0.05, (Date.now() - startMs) / 1000);
const words = (res.assistant.match(/\S+/g) ?? []).length;
process.stdout.write(`${res.assistant}\n`);
// `thread: <id>` — single space after the colon, matching the streaming
// path (line 160 below) so any tooling/regex that watches one form picks
// up the other too.
process.stderr.write(styleStats(`(${String(words)}w · ${(words / sec).toFixed(1)} w/s · ${sec.toFixed(1)}s)`) + ` thread: ${res.threadId}\n`);
return;
}

View File

@@ -155,10 +155,17 @@ interface AgentRow {
description: string;
llm: { id: string; name: string };
project: { id: string; name: string } | null;
// v3: lifecycle fields. Public agents have kind=public/status=active and
// these never change — virtuals get them set/updated by mcpd's
// AgentService as the publishing mcplocal heartbeats and disconnects.
kind?: 'public' | 'virtual';
status?: 'active' | 'inactive';
}
const agentColumns: Column<AgentRow>[] = [
{ header: 'NAME', key: 'name' },
{ header: 'KIND', key: (r) => r.kind ?? 'public', width: 8 },
{ header: 'STATUS', key: (r) => r.status ?? 'active', width: 10 },
{ header: 'LLM', key: (r) => r.llm.name, width: 24 },
{ header: 'PROJECT', key: (r) => r.project?.name ?? '-', width: 20 },
{ header: 'DESCRIPTION', key: (r) => truncate(r.description, 50) || '-', width: 50 },
@@ -408,8 +415,8 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string }
const kind = RESOURCE_KIND[resource] ?? resource;
return items.map((item) => {
const cleaned = stripInternalFields(item as Record<string, unknown>);
// Llm-specific: the new virtual-provider lifecycle fields collide with
// the apply-doc `kind` envelope (the schema uses `kind: public|virtual`)
// Llm-specific: the virtual-provider lifecycle fields collide with the
// apply-doc `kind` envelope (the schema uses `kind: public|virtual`)
// and aren't apply-able anyway — they're derived runtime state managed
// by VirtualLlmService. Drop them so YAML round-trips stay clean.
if (resource === 'llms') {
@@ -419,6 +426,17 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string }
delete cleaned['inactiveSince'];
delete cleaned['providerSessionId'];
}
// Agent-specific: same shape as Llm — Agent gained kind/status/etc. in
// v3 Stage 1 (virtual agent lifecycle) and the schema-`kind` field
// shadows the apply-envelope `kind: agent`. Strip the same set so
// `get agent X -o yaml | apply -f -` round-trips without diff.
if (resource === 'agents') {
delete cleaned['kind'];
delete cleaned['status'];
delete cleaned['lastHeartbeatAt'];
delete cleaned['inactiveSince'];
delete cleaned['providerSessionId'];
}
return { kind, ...cleaned };
});
}

View File

@@ -0,0 +1,14 @@
-- Mirror Llm's virtual-provider lifecycle on Agent. Reuses the
-- existing LlmKind / LlmStatus enums so we don't double-define them.
-- Existing rows backfill with kind='public' / status='active' so
-- nothing changes for manually-created agents.
ALTER TABLE "Agent"
ADD COLUMN "kind" "LlmKind" NOT NULL DEFAULT 'public',
ADD COLUMN "providerSessionId" TEXT,
ADD COLUMN "lastHeartbeatAt" TIMESTAMP(3),
ADD COLUMN "status" "LlmStatus" NOT NULL DEFAULT 'active',
ADD COLUMN "inactiveSince" TIMESTAMP(3);
CREATE INDEX "Agent_kind_status_idx" ON "Agent"("kind", "status");
CREATE INDEX "Agent_providerSessionId_idx" ON "Agent"("providerSessionId");

View File

@@ -479,6 +479,12 @@ model Agent {
proxyModelName String? // optional informational override
defaultParams Json @default("{}") // LiteLLM-style: temperature, top_p, top_k, max_tokens, stop, ...
extras Json @default("{}") // future LoRA / tool-allowlist
// ── Virtual-agent lifecycle (NULL/default for kind=public, mirrors Llm) ──
kind LlmKind @default(public)
providerSessionId String? // mcplocal session that owns this row when virtual
lastHeartbeatAt DateTime?
status LlmStatus @default(active)
inactiveSince DateTime?
ownerId String
version Int @default(1)
createdAt DateTime @default(now())
@@ -497,6 +503,8 @@ model Agent {
@@index([projectId])
@@index([ownerId])
@@index([defaultPersonalityId])
@@index([kind, status])
@@index([providerSessionId])
}
// ── Personalities (named overlay bundles of prompts on top of an Agent) ──

View File

@@ -317,6 +317,78 @@ describe('agent / chat-thread / chat-message schema', () => {
expect(reloaded?.defaultPersonalityId).toBeNull();
});
// ── v3: Agent.kind virtual + lifecycle fields ──
it('defaults a freshly inserted Agent to kind=public, status=active', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-default-kind');
const agent = await makeAgent({ name: 'fresh', llmId: llm.id, ownerId: user.id });
expect(agent.kind).toBe('public');
expect(agent.status).toBe('active');
expect(agent.providerSessionId).toBeNull();
expect(agent.lastHeartbeatAt).toBeNull();
expect(agent.inactiveSince).toBeNull();
});
it('persists kind=virtual + lifecycle fields together', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-pub-virtual');
const now = new Date();
const agent = await prisma.agent.create({
data: {
name: 'local-coder',
llmId: llm.id,
ownerId: user.id,
kind: 'virtual',
providerSessionId: 'sess-abc',
lastHeartbeatAt: now,
status: 'active',
},
});
expect(agent.kind).toBe('virtual');
expect(agent.providerSessionId).toBe('sess-abc');
expect(agent.lastHeartbeatAt?.getTime()).toBe(now.getTime());
});
it('finds virtual agents by (kind, status) cheaply (GC sweep query)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-gc-agent');
await prisma.agent.create({ data: { name: 'pub-1', llmId: llm.id, ownerId: user.id } });
await prisma.agent.create({
data: { name: 'v-active', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's1' },
});
await prisma.agent.create({
data: { name: 'v-inactive', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 's2', status: 'inactive', inactiveSince: new Date() },
});
const stale = await prisma.agent.findMany({
where: { kind: 'virtual', status: 'inactive' },
select: { name: true },
});
expect(stale.map((a) => a.name)).toEqual(['v-inactive']);
});
it('finds agents by providerSessionId (used on mcplocal disconnect cascade)', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-sess-cascade');
await prisma.agent.create({
data: { name: 'a', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'b', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'shared' },
});
await prisma.agent.create({
data: { name: 'c', llmId: llm.id, ownerId: user.id, kind: 'virtual', providerSessionId: 'other' },
});
const owned = await prisma.agent.findMany({
where: { providerSessionId: 'shared' },
select: { name: true },
orderBy: { name: 'asc' },
});
expect(owned.map((a) => a.name)).toEqual(['a', 'b']);
});
it('binds the same prompt to multiple personalities of an agent', async () => {
const user = await makeUser();
const llm = await makeLlm('llm-shared-prompt');

View File

@@ -435,10 +435,8 @@ async function main(): Promise<void> {
adapters: llmAdapters,
log: { warn: (msg) => app.log.warn(msg) },
});
// Virtual-provider state machine (kind=virtual rows). The 60-s GC ticker
// is started below after `app.listen` so it doesn't fire before the
// server is accepting traffic.
const virtualLlmService = new VirtualLlmService(llmRepo);
// VirtualLlmService is constructed lower down (after AgentService) so
// it can wire the agent-cascade callbacks introduced in v3 Stage 2.
// AgentService + ChatService get fully wired below once projectService and
// mcpProxyService are constructed (ChatService needs them via the
// ChatToolDispatcher bridge).
@@ -465,6 +463,10 @@ async function main(): Promise<void> {
const personalityRepo = new PersonalityRepository(prisma);
const personalityService = new PersonalityService(personalityRepo, agentRepo, promptRepo);
const agentService = new AgentService(agentRepo, llmService, projectService, personalityRepo);
// Virtual-provider state machine (kind=virtual rows for both Llms and
// Agents). v3 wires AgentService for heartbeat/disconnect/GC cascade.
// The 60-s GC ticker is started below after `app.listen`.
const virtualLlmService = new VirtualLlmService(llmRepo, agentService);
// ChatService needs the proxy + project repo via the ChatToolDispatcher
// bridge. The dispatcher's logger references `app.log`, which is not
// constructed until further down — `chatService` itself is built right
@@ -607,6 +609,7 @@ async function main(): Promise<void> {
promptRepo,
chatToolDispatcher,
personalityRepo,
virtualLlmService,
);
registerAgentChatRoutes(app, chatService);
registerLlmInferRoutes(app, {
@@ -627,7 +630,7 @@ async function main(): Promise<void> {
});
},
});
registerVirtualLlmRoutes(app, virtualLlmService);
registerVirtualLlmRoutes(app, virtualLlmService, agentService);
registerInstanceRoutes(app, instanceService);
registerProjectRoutes(app, projectService);
registerAuditLogRoutes(app, auditLogService);

View File

@@ -1,4 +1,4 @@
import type { PrismaClient, Agent, Prisma } from '@prisma/client';
import type { PrismaClient, Agent, Prisma, LlmKind, LlmStatus } from '@prisma/client';
export interface CreateAgentRepoInput {
name: string;
@@ -11,6 +11,12 @@ export interface CreateAgentRepoInput {
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
ownerId: string;
// Virtual-agent lifecycle (omit for kind=public).
kind?: LlmKind;
providerSessionId?: string | null;
status?: LlmStatus;
lastHeartbeatAt?: Date | null;
inactiveSince?: Date | null;
}
export interface UpdateAgentRepoInput {
@@ -22,6 +28,13 @@ export interface UpdateAgentRepoInput {
proxyModelName?: string | null;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
// Virtual-agent lifecycle. AgentService is the only public writer; the
// VirtualAgentService methods (Stage 2) bypass the public CRUD path.
kind?: LlmKind;
providerSessionId?: string | null;
status?: LlmStatus;
lastHeartbeatAt?: Date | null;
inactiveSince?: Date | null;
}
export interface IAgentRepository {
@@ -32,6 +45,11 @@ export interface IAgentRepository {
create(data: CreateAgentRepoInput): Promise<Agent>;
update(id: string, data: UpdateAgentRepoInput): Promise<Agent>;
delete(id: string): Promise<void>;
// Virtual-agent lifecycle helpers.
findBySessionId(sessionId: string): Promise<Agent[]>;
findByLlmId(llmId: string): Promise<Agent[]>;
findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]>;
findExpiredInactives(deletionCutoff: Date): Promise<Agent[]>;
}
export class AgentRepository implements IAgentRepository {
@@ -69,6 +87,11 @@ export class AgentRepository implements IAgentRepository {
defaultParams: (data.defaultParams ?? {}) as Prisma.InputJsonValue,
extras: (data.extras ?? {}) as Prisma.InputJsonValue,
ownerId: data.ownerId,
...(data.kind !== undefined ? { kind: data.kind } : {}),
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
...(data.status !== undefined ? { status: data.status } : {}),
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
},
});
}
@@ -99,6 +122,11 @@ export class AgentRepository implements IAgentRepository {
if (data.extras !== undefined) {
updateData.extras = data.extras as Prisma.InputJsonValue;
}
if (data.kind !== undefined) updateData.kind = data.kind;
if (data.providerSessionId !== undefined) updateData.providerSessionId = data.providerSessionId;
if (data.status !== undefined) updateData.status = data.status;
if (data.lastHeartbeatAt !== undefined) updateData.lastHeartbeatAt = data.lastHeartbeatAt;
if (data.inactiveSince !== undefined) updateData.inactiveSince = data.inactiveSince;
// Bump optimistic version on every update.
updateData.version = { increment: 1 };
return this.prisma.agent.update({ where: { id }, data: updateData });
@@ -107,4 +135,40 @@ export class AgentRepository implements IAgentRepository {
async delete(id: string): Promise<void> {
await this.prisma.agent.delete({ where: { id } });
}
// ── Virtual-agent lifecycle queries ──
async findBySessionId(sessionId: string): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: { providerSessionId: sessionId },
orderBy: { name: 'asc' },
});
}
async findByLlmId(llmId: string): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: { llmId },
orderBy: { name: 'asc' },
});
}
async findStaleVirtuals(heartbeatCutoff: Date): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: {
kind: 'virtual',
status: 'active',
lastHeartbeatAt: { lt: heartbeatCutoff },
},
});
}
async findExpiredInactives(deletionCutoff: Date): Promise<Agent[]> {
return this.prisma.agent.findMany({
where: {
kind: 'virtual',
status: 'inactive',
inactiveSince: { lt: deletionCutoff },
},
});
}
}

View File

@@ -17,6 +17,7 @@
*/
import type { FastifyInstance, FastifyReply } from 'fastify';
import type { VirtualLlmService, VirtualSessionHandle, VirtualTaskFrame } from '../services/virtual-llm.service.js';
import type { AgentService, VirtualAgentInput } from '../services/agent.service.js';
const SSE_PING_MS = 20_000;
const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session';
@@ -24,8 +25,15 @@ const PROVIDER_SESSION_HEADER = 'x-mcpctl-provider-session';
export function registerVirtualLlmRoutes(
app: FastifyInstance,
service: VirtualLlmService,
/**
* Optional. v3 wires AgentService here so the register endpoint can
* also accept an `agents` array alongside `providers` and atomic-publish
* both. Absent (older test wirings): the route still works for Llm-only
* publishers, agents in the payload are ignored with a warning.
*/
agentService?: AgentService,
): void {
app.post<{ Body: { providerSessionId?: string; providers?: unknown[] } }>(
app.post<{ Body: { providerSessionId?: string; providers?: unknown[]; agents?: unknown[] } }>(
'/api/v1/llms/_provider-register',
async (request, reply) => {
const body = (request.body ?? {});
@@ -34,14 +42,29 @@ export function registerVirtualLlmRoutes(
reply.code(400);
return { error: '`providers` array is required and must be non-empty' };
}
const agentsInput = Array.isArray(body.agents) ? body.agents : null;
try {
const result = await service.register({
providerSessionId: body.providerSessionId ?? null,
providers: providers.map(coerceProviderInput),
});
// v3: atomically publish virtual agents tied to the same session.
// If the caller didn't include an agents array, skip silently.
let agents: unknown[] = [];
if (agentsInput !== null && agentsInput.length > 0) {
if (agentService === undefined) {
app.log.warn('virtual-llm register received `agents` but AgentService is not wired');
} else {
agents = await agentService.registerVirtualAgents(
result.providerSessionId,
agentsInput.map(coerceAgentInput),
request.userId ?? 'system',
);
}
}
reply.code(201);
return result;
return { ...result, agents };
} catch (err) {
const status = (err as { statusCode?: number }).statusCode ?? 500;
reply.code(status);
@@ -142,6 +165,33 @@ export function registerVirtualLlmRoutes(
);
}
/** Narrow an unknown agents array element into the service's input shape (v3). */
function coerceAgentInput(raw: unknown): VirtualAgentInput {
if (raw === null || typeof raw !== 'object') {
throw Object.assign(new Error('agent entry must be an object'), { statusCode: 400 });
}
const o = raw as Record<string, unknown>;
const name = o['name'];
const llmName = o['llmName'];
if (typeof name !== 'string' || typeof llmName !== 'string') {
throw Object.assign(
new Error('agent entry requires string `name` and `llmName`'),
{ statusCode: 400 },
);
}
const out: VirtualAgentInput = { name, llmName };
if (typeof o['description'] === 'string') out.description = o['description'];
if (typeof o['systemPrompt'] === 'string') out.systemPrompt = o['systemPrompt'];
if (typeof o['project'] === 'string') out.project = o['project'];
if (o['defaultParams'] !== null && typeof o['defaultParams'] === 'object') {
out.defaultParams = o['defaultParams'] as Record<string, unknown>;
}
if (o['extras'] !== null && typeof o['extras'] === 'object') {
out.extras = o['extras'] as Record<string, unknown>;
}
return out;
}
/** Narrow an unknown providers array element into the service's input shape. */
function coerceProviderInput(raw: unknown): {
name: string;

View File

@@ -33,12 +33,28 @@ export interface AgentView {
proxyModelName: string | null;
defaultParams: AgentChatParams;
extras: Record<string, unknown>;
// Virtual-agent lifecycle (defaults make public agents look like "active").
kind: 'public' | 'virtual';
status: 'active' | 'inactive' | 'hibernating';
lastHeartbeatAt: Date | null;
inactiveSince: Date | null;
ownerId: string;
version: number;
createdAt: Date;
updatedAt: Date;
}
/** Input shape mcplocal sends per virtual agent on register. */
export interface VirtualAgentInput {
name: string;
llmName: string;
description?: string;
systemPrompt?: string;
project?: string;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
}
export class AgentService {
constructor(
private readonly repo: IAgentRepository,
@@ -179,10 +195,162 @@ export class AgentService {
proxyModelName: row.proxyModelName,
defaultParams: row.defaultParams as AgentChatParams,
extras: row.extras as Record<string, unknown>,
kind: row.kind,
status: row.status,
lastHeartbeatAt: row.lastHeartbeatAt,
inactiveSince: row.inactiveSince,
ownerId: row.ownerId,
version: row.version,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
};
}
// ── Virtual-agent lifecycle (v3) ──
/**
* Sticky upsert of virtual agents owned by a publishing mcplocal session.
* Mirrors VirtualLlmService.register's semantics:
* - New agents → insert with kind=virtual / status=active.
* - Existing virtual agents owned by the same session → update + reactivate.
* - Existing virtual agents owned by a different session, but currently
* inactive → adopt (sticky reconnect after a session lapse).
* - Existing public agents OR foreign-active virtuals → 409 Conflict.
* - Pinned LLM must already exist (publisher posts Llms first in the same
* register payload).
*/
async registerVirtualAgents(
sessionId: string,
inputs: VirtualAgentInput[],
ownerId: string,
): Promise<AgentView[]> {
const now = new Date();
const out: AgentView[] = [];
for (const a of inputs) {
const llm = await this.llms.getByName(a.llmName);
const projectId = a.project !== undefined
? (await this.projects.resolveAndGet(a.project)).id
: null;
const existing = await this.repo.findByName(a.name);
if (existing !== null) {
if (existing.kind === 'public') {
throw Object.assign(
new Error(`Cannot publish over public Agent: ${a.name}`),
{ statusCode: 409 },
);
}
if (existing.providerSessionId !== sessionId && existing.status === 'active') {
throw Object.assign(
new Error(`Virtual Agent '${a.name}' is already active under a different session`),
{ statusCode: 409 },
);
}
const updated = await this.repo.update(existing.id, {
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
llmId: llm.id,
projectId,
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
inactiveSince: null,
});
out.push(await this.toView(updated));
continue;
}
const created = await this.repo.create({
name: a.name,
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
llmId: llm.id,
projectId,
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
kind: 'virtual',
providerSessionId: sessionId,
status: 'active',
lastHeartbeatAt: now,
ownerId,
});
out.push(await this.toView(created));
}
return out;
}
/**
* Bumps lastHeartbeatAt on every virtual agent owned by the session.
* Revives inactive rows. Called from VirtualLlmService.heartbeat so
* one publisher heartbeat covers both Llms and Agents.
*/
async heartbeatVirtualAgents(sessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(sessionId);
if (owned.length === 0) return;
const now = new Date();
for (const row of owned) {
await this.repo.update(row.id, {
lastHeartbeatAt: now,
...(row.status === 'inactive' ? { status: 'active', inactiveSince: null } : {}),
});
}
}
/** Flip every virtual agent owned by the session to inactive immediately. */
async markVirtualAgentsInactiveBySession(sessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(sessionId);
const now = new Date();
for (const row of owned) {
if (row.status === 'active') {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
}
}
}
/**
* Cascade-delete virtual agents pinned to a virtual Llm. Called from
* VirtualLlmService.gcSweep BEFORE deleting the inactive Llm row, since
* Agent.llmId is `onDelete: Restrict` and would otherwise block the
* Llm delete.
*/
async deleteVirtualAgentsForLlm(llmId: string): Promise<number> {
const pinned = await this.repo.findByLlmId(llmId);
let deleted = 0;
for (const row of pinned) {
if (row.kind !== 'virtual') continue;
await this.repo.delete(row.id);
deleted += 1;
}
return deleted;
}
/**
* GC sweep for virtual agents — same shape as VirtualLlmService.gcSweep:
* 1. Heartbeat-stale active virtuals → inactive (90-s cutoff).
* 2. 4-h-old inactive virtuals → delete.
* Run BEFORE the LlmService GC sweep so any agent that would have
* blocked an Llm delete via Restrict has already been cleared.
*/
async gcSweepVirtualAgents(now: Date = new Date()): Promise<{ markedInactive: number; deleted: number }> {
const HEARTBEAT_TIMEOUT_MS = 90_000;
const INACTIVE_RETENTION_MS = 4 * 60 * 60 * 1000;
let markedInactive = 0;
let deleted = 0;
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
for (const row of stale) {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
markedInactive += 1;
}
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
const expired = await this.repo.findExpiredInactives(deletionCutoff);
for (const row of expired) {
await this.repo.delete(row.id);
deleted += 1;
}
return { markedInactive, deleted };
}
}

View File

@@ -31,6 +31,7 @@ import type {
} from '../repositories/chat.repository.js';
import type { IPromptRepository } from '../repositories/prompt.repository.js';
import type { IPersonalityRepository } from '../repositories/personality.repository.js';
import type { IVirtualLlmService } from './virtual-llm.service.js';
import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js';
import type { AgentChatParams } from '../validation/agent.schema.js';
import { NotFoundError } from './mcp-server.service.js';
@@ -132,6 +133,14 @@ export class ChatService {
private readonly promptRepo: IPromptRepository,
private readonly tools: ChatToolDispatcher,
private readonly personalities?: IPersonalityRepository,
/**
* v3: when an Agent is pinned to a `kind=virtual` Llm, inference is
* relayed via this service rather than an HTTP adapter (the virtual
* row has no upstream URL). Optional so older test wirings still
* compile; in those tests the chat path will refuse virtual Llms
* with a clear error.
*/
private readonly virtualLlms?: IVirtualLlmService,
) {}
async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> {
@@ -170,19 +179,16 @@ export class ChatService {
let lastTurnIndex = ctx.startingTurnIndex;
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const result = await adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
const result = await this.runOneInference(ctx);
const choice = extractChoice(result.body);
if (choice === null) {
throw new Error(`Adapter returned no choice (status ${String(result.status)})`);
}
if (choice.tool_calls !== undefined && choice.tool_calls.length > 0) {
// Tool turns: keep `content` literal — even if empty — because the
// OpenAI tool-use protocol expects the assistant message to carry
// its tool_calls separately from any free-form text. Surfacing
// reasoning here would confuse downstream tool dispatchers.
const assistantTurn = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'assistant',
@@ -217,13 +223,17 @@ export class ChatService {
await this.chatRepo.updateStatus(assistantTurn.id, 'complete');
continue;
}
// Terminal text turn.
// Terminal text turn. Use pickAssistantText so thinking models that
// produced only reasoning_content still yield a usable answer (with
// a truncation marker when finish_reason indicates max_tokens
// cut-off). Empty body remains empty and bubbles up unchanged.
const assistantText = pickAssistantText(choice);
const finalMsg = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'assistant',
content: choice.content ?? '',
content: assistantText,
});
assistantFinal = choice.content ?? '';
assistantFinal = assistantText;
lastTurnIndex = finalMsg.turnIndex;
await this.chatRepo.touchThread(ctx.threadId);
return { threadId: ctx.threadId, assistant: assistantFinal, turnIndex: lastTurnIndex };
@@ -240,19 +250,20 @@ export class ChatService {
const ctx = await this.prepareContext(args);
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = {
// `reasoning` is accumulated alongside `content` so we can fall back
// to it when the model produces no `content` (thinking models with a
// tight max_tokens, or providers that don't separate the two).
const accumulated: {
content: string;
reasoning: string;
toolCalls: Array<{ id: string; name: string; argumentsJson: string }>;
} = {
content: '',
reasoning: '',
toolCalls: [],
};
let finishReason: string | null = null;
for await (const chunk of adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
})) {
for await (const chunk of this.streamInference(ctx)) {
if (chunk.done === true) break;
if (chunk.data === '[DONE]') break;
const evt = parseStreamingChunk(chunk.data);
@@ -262,9 +273,11 @@ export class ChatService {
yield { type: 'text', delta: evt.contentDelta };
}
if (evt.reasoningDelta !== undefined) {
// Reasoning is not persisted to the thread (it's the model's
// scratchpad, not part of the conversation) — only streamed so
// the REPL can show progress while the model thinks.
// Streamed live so the REPL can show progress while the model
// thinks. Also accumulated so a thinking-only response (no
// `content`) still produces a non-empty persisted assistant
// turn — see the fallback at the end of this loop iteration.
accumulated.reasoning += evt.reasoningDelta;
yield { type: 'thinking', delta: evt.reasoningDelta };
}
if (evt.toolCallDeltas !== undefined) {
@@ -331,10 +344,27 @@ export class ChatService {
continue;
}
// Fall back to reasoning when the model emitted only thinking
// output. Mirrors pickAssistantText() in the non-streaming path —
// same situation (thinking model + tight max_tokens, or a provider
// that bundles the answer into reasoning_content).
const persistedContent = pickAssistantText({
content: accumulated.content.length > 0 ? accumulated.content : null,
...(accumulated.reasoning.length > 0 ? { reasoning: accumulated.reasoning } : {}),
finishReason,
});
// If we synthesized text from reasoning, yield it as a final `text`
// delta so the client's stdout matches what the thread persists.
// Without this, the REPL would show only `thinking` deltas (which
// the CLI writes to stderr) and stdout would be empty for any
// thinking-only response.
if (accumulated.content.length === 0 && persistedContent.length > 0) {
yield { type: 'text', delta: persistedContent };
}
const finalMsg = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'assistant',
content: accumulated.content,
content: persistedContent,
});
await this.chatRepo.touchThread(ctx.threadId);
yield { type: 'final', threadId: ctx.threadId, turnIndex: finalMsg.turnIndex };
@@ -347,12 +377,130 @@ export class ChatService {
}
}
/**
* Streaming counterpart of runOneInference. Yields raw OpenAI-style
* SSE chunks ({data: string; done?: boolean}) regardless of whether
* we're hitting a public adapter or relaying through VirtualLlmService.
* The caller's `parseStreamingChunk` already speaks OpenAI shape, so
* downstream code doesn't need to know which path produced the chunks.
*/
private async *streamInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): AsyncGenerator<{ data: string; done?: boolean }> {
if (ctx.llmKind !== 'virtual') {
const adapter = this.adapters.get(ctx.llmType);
yield* adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
return;
}
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot stream chat with kind=virtual Llm',
);
}
// Bridge VirtualLlmService's onChunk callback API to an async
// iterator. Chunks land on the queue from the SSE relay; the
// generator drains them in order. ref.done resolves when the
// publisher emits its `[DONE]` marker.
const ref = await this.virtualLlms.enqueueInferTask(
ctx.llmName,
{ ...this.buildBody(ctx), stream: true },
true,
);
const queue: Array<{ data: string; done?: boolean }> = [];
let resolveTick: (() => void) | null = null;
const wake = (): void => {
const r = resolveTick;
resolveTick = null;
if (r !== null) r();
};
const unsubscribe = ref.onChunk((c) => { queue.push(c); wake(); });
let finished = false;
let failure: Error | null = null;
ref.done.then(() => { finished = true; wake(); }).catch((err: Error) => { failure = err; finished = true; wake(); });
try {
while (true) {
while (queue.length > 0) {
const c = queue.shift()!;
yield c;
if (c.done === true) return;
}
if (finished) {
if (failure !== null) throw failure;
return;
}
await new Promise<void>((r) => { resolveTick = r; });
}
} finally {
unsubscribe();
}
}
/**
* Run a single non-streaming inference iteration. Branches on
* ctx.llmKind: public goes through the existing adapter registry,
* virtual relays through VirtualLlmService.enqueueInferTask (mirrors
* the same branch in `routes/llm-infer.ts` from v1 Stage 3).
*
* Throws when virtualLlms isn't wired but the row is virtual — older
* test wirings hit this path.
*/
private async runOneInference(ctx: {
llmName: string;
llmType: string;
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
history: OpenAiMessage[];
systemBlock: string;
toolList: ChatTool[];
mergedParams: AgentChatParams;
}): Promise<{ status: number; body: unknown }> {
if (ctx.llmKind === 'virtual') {
if (this.virtualLlms === undefined) {
throw new Error(
'virtualLlms dispatcher not wired into ChatService — cannot chat with kind=virtual Llm',
);
}
const ref = await this.virtualLlms.enqueueInferTask(ctx.llmName, this.buildBody(ctx), false);
return ref.done;
}
const adapter = this.adapters.get(ctx.llmType);
return adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
}
private async prepareContext(args: ChatRequestArgs): Promise<{
threadId: string;
history: OpenAiMessage[];
systemBlock: string;
llmName: string;
llmType: string;
/** v3: 'virtual' means infer is relayed via VirtualLlmService instead of an HTTP adapter. */
llmKind: 'public' | 'virtual';
modelOverride: string;
url: string;
apiKey: string;
@@ -435,6 +583,7 @@ export class ChatService {
systemBlock,
llmName: llm.name,
llmType: llm.type,
llmKind: llm.kind,
modelOverride: llm.model,
url: llm.url,
apiKey,
@@ -568,6 +717,17 @@ export class ChatService {
interface ExtractedChoice {
content: string | null;
/**
* Reasoning text emitted by thinking models (qwen3-thinking,
* deepseek-reasoner, OpenAI o1 family). Different providers spell the
* field differently; the parser accepts every shape the streaming
* counterpart already accepts (see `parseStreamingChunk`). When `content`
* is null/empty, callers fall back to this so thinking models that
* exhaust their token budget on reasoning still produce a usable answer.
*/
reasoning?: string;
/** OpenAI's stop reason — `'stop' | 'length' | 'tool_calls' | 'content_filter' | ...`. */
finishReason?: string | null;
tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>;
}
@@ -575,17 +735,52 @@ function extractChoice(body: unknown): ExtractedChoice | null {
if (typeof body !== 'object' || body === null) return null;
const choices = (body as { choices?: unknown }).choices;
if (!Array.isArray(choices) || choices.length === 0) return null;
const first = choices[0] as { message?: { content?: unknown; tool_calls?: unknown } } | undefined;
const first = choices[0] as {
message?: {
content?: unknown;
reasoning_content?: unknown;
reasoning?: unknown;
provider_specific_fields?: { reasoning_content?: unknown; reasoning?: unknown };
tool_calls?: unknown;
};
finish_reason?: unknown;
} | undefined;
if (first?.message === undefined) return null;
const content = typeof first.message.content === 'string' ? first.message.content : null;
const m = first.message;
const reasoning =
(typeof m.reasoning_content === 'string' && m.reasoning_content.length > 0 ? m.reasoning_content : undefined)
?? (typeof m.reasoning === 'string' && m.reasoning.length > 0 ? m.reasoning : undefined)
?? (typeof m.provider_specific_fields?.reasoning_content === 'string' && m.provider_specific_fields.reasoning_content.length > 0 ? m.provider_specific_fields.reasoning_content : undefined)
?? (typeof m.provider_specific_fields?.reasoning === 'string' && m.provider_specific_fields.reasoning.length > 0 ? m.provider_specific_fields.reasoning : undefined);
const finishReason = typeof first.finish_reason === 'string' ? first.finish_reason : null;
const toolCalls = first.message.tool_calls;
const out: ExtractedChoice = { content };
const out: ExtractedChoice = { content, finishReason };
if (reasoning !== undefined) out.reasoning = reasoning;
if (Array.isArray(toolCalls)) {
out.tool_calls = toolCalls as NonNullable<ExtractedChoice['tool_calls']>;
}
return out;
}
/**
* Pick what text to surface (and persist) as the assistant's reply.
* Thinking models sometimes emit only `reasoning_content` and leave
* `content` null — typically when `max_tokens` is too small for the
* thinking budget, but also when the provider configuration just doesn't
* separate the two. In that case the reasoning IS the answer for this
* request, and the caller should see it. A `length` finish_reason marker
* makes truncation visible so users can fix their max_tokens config.
*/
function pickAssistantText(choice: ExtractedChoice): string {
if (choice.content !== null && choice.content.length > 0) return choice.content;
if (choice.reasoning !== undefined && choice.reasoning.length > 0) {
const truncated = choice.finishReason === 'length' ? '\n\n[response truncated by max_tokens]' : '';
return `${choice.reasoning}${truncated}`;
}
return '';
}
function safeParseJson(s: string): unknown {
if (s === '') return {};
try {

View File

@@ -123,7 +123,15 @@ export class OpenAiPassthroughAdapter implements LlmAdapter {
}
private endpointUrl(url: string): string {
if (url !== '') return url.replace(/\/+$/, '');
// Accept both conventional forms users actually paste — base host
// (`https://api.openai.com`) and base + version (`https://api.openai.com/v1`).
// Every OpenAI-compat provider documents their endpoint with the `/v1`
// suffix, so users naturally include it; the adapter then re-appends
// `/v1/chat/completions`, producing a doubled-`/v1` 404 against LiteLLM
// and others. Strip a trailing `/v1` (with or without slash) so both
// shapes resolve to the same canonical base. A more specific suffix
// like `/v1beta` is preserved.
if (url !== '') return url.replace(/\/+$/, '').replace(/\/v1$/, '');
const def = DEFAULT_URLS[this.kind];
if (def === undefined) {
throw new Error(`${this.kind}: url is required (no default endpoint for this provider)`);

View File

@@ -28,6 +28,7 @@ import { randomUUID } from 'node:crypto';
import type { ILlmRepository } from '../repositories/llm.repository.js';
import type { OpenAiChatRequest } from './llm/types.js';
import { NotFoundError } from './mcp-server.service.js';
import type { AgentService } from './agent.service.js';
/** A virtual provider's announcement at registration time. */
export interface RegisterProviderInput {
@@ -119,7 +120,16 @@ export class VirtualLlmService implements IVirtualLlmService {
*/
private readonly wakeInFlight = new Map<string, Promise<void>>();
constructor(private readonly repo: ILlmRepository) {}
constructor(
private readonly repo: ILlmRepository,
/**
* Optional. v3 wires AgentService here so the lifecycle cascades:
* heartbeat → bump owned agents; disconnect → mark agents inactive;
* gcSweep → sweep agents first, then delete pinned-to-Llm cascade
* before deleting the Llm itself (Agent.llmId is Restrict).
*/
private readonly agents?: AgentService,
) {}
async register(input: { providerSessionId?: string | null; providers: RegisterProviderInput[] }): Promise<RegisterResult> {
const sessionId = input.providerSessionId ?? randomUUID();
@@ -184,7 +194,6 @@ export class VirtualLlmService implements IVirtualLlmService {
async heartbeat(providerSessionId: string): Promise<void> {
const owned = await this.repo.findBySessionId(providerSessionId);
if (owned.length === 0) return;
const now = new Date();
for (const row of owned) {
// Bump lastHeartbeatAt; if the row was already inactive (e.g. due to a
@@ -196,6 +205,11 @@ export class VirtualLlmService implements IVirtualLlmService {
: {}),
});
}
// Cascade to virtual agents owned by the same session — same heartbeat
// covers them. No-op if AgentService isn't wired (older test configs).
if (this.agents !== undefined) {
await this.agents.heartbeatVirtualAgents(providerSessionId);
}
}
bindSession(providerSessionId: string, handle: VirtualSessionHandle): void {
@@ -214,6 +228,10 @@ export class VirtualLlmService implements IVirtualLlmService {
await this.repo.update(row.id, { status: 'inactive', inactiveSince: now });
}
}
// Cascade to virtual agents owned by the same session.
if (this.agents !== undefined) {
await this.agents.markVirtualAgentsInactiveBySession(providerSessionId);
}
// Reject any in-flight tasks for this session — the relay can't deliver
// a result POST anymore.
for (const t of this.tasksById.values()) {
@@ -405,6 +423,16 @@ export class VirtualLlmService implements IVirtualLlmService {
let markedInactive = 0;
let deleted = 0;
// v3: sweep virtual agents FIRST so any Llm-pinned agent that's
// about to be cascaded (because its Llm is also expiring) is gone
// before we attempt to delete the Llm. Agent.llmId is Restrict and
// would otherwise block.
if (this.agents !== undefined) {
const agentSweep = await this.agents.gcSweepVirtualAgents(now);
markedInactive += agentSweep.markedInactive;
deleted += agentSweep.deleted;
}
const heartbeatCutoff = new Date(now.getTime() - HEARTBEAT_TIMEOUT_MS);
const stale = await this.repo.findStaleVirtuals(heartbeatCutoff);
for (const row of stale) {
@@ -415,6 +443,13 @@ export class VirtualLlmService implements IVirtualLlmService {
const deletionCutoff = new Date(now.getTime() - INACTIVE_RETENTION_MS);
const expired = await this.repo.findExpiredInactives(deletionCutoff);
for (const row of expired) {
// Final defensive cascade: drop any virtual agents still pinned
// to this Llm (e.g. their lastHeartbeatAt happens to lag the
// Llm's by a few seconds and they didn't make this round's
// 4-h cutoff). Without this we'd hit a Restrict FK error.
if (this.agents !== undefined) {
await this.agents.deleteVirtualAgentsForLlm(row.id);
}
await this.repo.delete(row.id);
deleted += 1;
}

View File

@@ -0,0 +1,251 @@
import { describe, it, expect, vi } from 'vitest';
import { ChatService, type ChatToolDispatcher } from '../src/services/chat.service.js';
import type { AgentService } from '../src/services/agent.service.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { LlmAdapterRegistry } from '../src/services/llm/dispatcher.js';
import type { IChatRepository } from '../src/repositories/chat.repository.js';
import type { IPromptRepository } from '../src/repositories/prompt.repository.js';
import type { IVirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { ChatMessage, ChatThread, Prompt } from '@prisma/client';
const NOW = new Date();
/**
* Tests targeting v3 Stage 1's chat.service kind=virtual branch.
* Mirror the existing chat-service.test.ts patterns but isolate the
* adapter-vs-relay dispatch decision.
*/
function mockChatRepo(): IChatRepository {
const msgs: ChatMessage[] = [];
const threads: ChatThread[] = [];
let idCounter = 1;
return {
createThread: vi.fn(async ({ agentId, ownerId, title }) => {
const t: ChatThread = {
id: `thread-${String(idCounter++)}`, agentId, ownerId,
title: title ?? '', lastTurnAt: NOW, createdAt: NOW, updatedAt: NOW,
};
threads.push(t);
return t;
}),
findThread: vi.fn(async (id: string) => threads.find((t) => t.id === id) ?? null),
listThreadsByAgent: vi.fn(async () => []),
listMessages: vi.fn(async () => []),
appendMessage: vi.fn(async (input) => {
const m: ChatMessage = {
id: `msg-${String(idCounter++)}`,
threadId: input.threadId,
turnIndex: input.turnIndex ?? msgs.filter((x) => x.threadId === input.threadId).length,
role: input.role,
content: input.content,
toolCalls: (input.toolCalls ?? null) as ChatMessage['toolCalls'],
toolCallId: input.toolCallId ?? null,
status: input.status ?? 'complete',
createdAt: NOW,
};
msgs.push(m);
return m;
}),
updateStatus: vi.fn(async (_id, _s) => ({ } as ChatMessage)),
markPendingAsError: vi.fn(async () => 0),
touchThread: vi.fn(async () => undefined),
nextTurnIndex: vi.fn(async () => msgs.length),
};
}
function mockAgents(): AgentService {
return {
getByName: vi.fn(async (name: string) => ({
id: `agent-${name}`, name, description: '',
systemPrompt: 'You are a helpful agent.',
llm: { id: 'llm-1', name: 'vllm-local' },
project: null,
defaultPersonality: null,
proxyModelName: null,
defaultParams: {},
extras: {},
ownerId: 'owner-1', version: 1, createdAt: NOW, updatedAt: NOW,
})),
} as unknown as AgentService;
}
function mockLlmsVirtual(): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'fake',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: 'virtual',
status: 'active',
lastHeartbeatAt: NOW,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => ''),
} as unknown as LlmService;
}
function mockPromptRepo(): IPromptRepository {
return {
findAll: vi.fn(async () => []),
findGlobal: vi.fn(async () => []),
findByAgent: vi.fn(async () => []),
findById: vi.fn(async () => null),
findByNameAndProject: vi.fn(async () => null),
findByNameAndAgent: vi.fn(async () => null),
create: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
};
}
function mockTools(): ChatToolDispatcher {
return { listTools: vi.fn(async () => []), callTool: vi.fn(async () => ({ ok: true })) };
}
function emptyAdapterRegistry(): LlmAdapterRegistry {
return {
get: () => { throw new Error('adapter should not be used for kind=virtual'); },
} as unknown as LlmAdapterRegistry;
}
function mockVirtualLlms(opts: {
reply?: string;
rejectWith?: Error;
streamingChunks?: string[];
}): IVirtualLlmService {
const enqueueInferTask = vi.fn(async (_name: string, _body: unknown, streaming: boolean) => {
if (opts.rejectWith !== undefined) {
return {
taskId: 't-1',
done: Promise.reject(opts.rejectWith),
onChunk: () => () => undefined,
};
}
if (!streaming) {
const body = {
choices: [{ message: { content: opts.reply ?? 'hi from relay' }, finish_reason: 'stop' }],
};
return {
taskId: 't-1',
done: Promise.resolve({ status: 200, body }),
onChunk: () => () => undefined,
};
}
// Streaming path: collect subscribers, push the configured chunks
// synchronously, then resolve done.
const subs = new Set<(c: { data: string; done?: boolean }) => void>();
const chunks = opts.streamingChunks ?? ['{"choices":[{"delta":{"content":"hi from relay"}}]}'];
return {
taskId: 't-1',
done: (async (): Promise<{ status: number; body: unknown }> => {
// Wait long enough for the caller to register subscribers
// before fanning chunks. Promise.resolve() isn't enough — the
// microtask running this IIFE is queued ahead of the caller's
// await on enqueueInferTask, so subs would still be empty.
await new Promise((r) => setTimeout(r, 0));
for (const c of chunks) for (const s of subs) s({ data: c });
for (const s of subs) s({ data: '[DONE]', done: true });
return { status: 200, body: null };
})(),
onChunk: (cb): (() => void) => { subs.add(cb); return () => subs.delete(cb); },
};
});
return {
register: vi.fn(),
heartbeat: vi.fn(),
bindSession: vi.fn(),
unbindSession: vi.fn(),
enqueueInferTask: enqueueInferTask as unknown as IVirtualLlmService['enqueueInferTask'],
completeTask: vi.fn(),
pushTaskChunk: vi.fn(),
failTask: vi.fn(),
gcSweep: vi.fn(),
};
}
describe('ChatService — kind=virtual branch (v3 Stage 1)', () => {
it('non-streaming relays through VirtualLlmService.enqueueInferTask', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({ reply: 'hello back from local' });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toBe('hello back from local');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array) }),
false,
);
});
it('streaming relays through VirtualLlmService and emits the same text deltas', async () => {
const chatRepo = mockChatRepo();
const virtual = mockVirtualLlms({
streamingChunks: [
'{"choices":[{"delta":{"content":"hello "}}]}',
'{"choices":[{"delta":{"content":"world"}}]}',
],
});
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
chatRepo,
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
const deltas: string[] = [];
for await (const evt of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
if (evt.type === 'text') deltas.push(evt.delta);
if (evt.type === 'final') break;
}
expect(deltas.join('')).toBe('hello world');
expect(virtual.enqueueInferTask).toHaveBeenCalledWith(
'vllm-local',
expect.objectContaining({ messages: expect.any(Array), stream: true }),
true,
);
});
it('non-streaming throws a clear error when virtualLlms isn\'t wired but the row is virtual', async () => {
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
// no personalities, no virtualLlms
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/virtualLlms dispatcher not wired/);
});
it('non-streaming surfaces the relay\'s rejection (e.g. publisher offline) up to the caller', async () => {
const virtual = mockVirtualLlms({ rejectWith: Object.assign(new Error('publisher offline'), { statusCode: 503 }) });
const svc = new ChatService(
mockAgents(),
mockLlmsVirtual(),
emptyAdapterRegistry(),
mockChatRepo(),
mockPromptRepo(),
mockTools(),
undefined,
virtual,
);
await expect(svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' }))
.rejects.toThrow(/publisher offline/);
});
});

View File

@@ -118,12 +118,16 @@ function mockAgents(opts: { defaultPersonality?: { id: string; name: string } |
} as unknown as AgentService;
}
function mockLlms(): LlmService {
function mockLlms(opts: { kind?: 'public' | 'virtual' } = {}): LlmService {
return {
getByName: vi.fn(async (name: string) => ({
id: 'llm-1', name, type: 'openai', model: 'qwen3-thinking',
url: '', tier: 'fast', description: '',
apiKeyRef: null, extraConfig: {},
kind: opts.kind ?? 'public',
status: 'active',
lastHeartbeatAt: null,
inactiveSince: null,
version: 1, createdAt: NOW, updatedAt: NOW,
})),
resolveApiKey: vi.fn(async () => 'fake-key'),
@@ -457,6 +461,121 @@ describe('ChatService', () => {
expect(assistantTurn?.content).not.toContain('Let me think');
});
// Regression: thinking models with a tight max_tokens budget produce
// `reasoning_content` only and leave `content` null. Without falling back
// to reasoning, the assistant turn was empty and the smoke test saw an
// empty stdout. This covers BOTH chat() (non-streaming) and chatStream()
// (synthetic final text frame so the CLI's stdout matches what's
// persisted to the thread).
it('chat falls back to reasoning_content when content is null', async () => {
const chatRepo = mockChatRepo();
const adapter: LlmAdapter = {
kind: 'thinking-truncated',
infer: vi.fn(async () => ({
status: 200,
body: {
id: 'cmpl-1',
object: 'chat.completion',
choices: [{
index: 0,
message: { role: 'assistant', content: null, reasoning_content: 'Thinking out loud about the answer' },
finish_reason: 'stop',
}],
},
})),
stream: async function*() { yield { data: '[DONE]', done: true }; },
};
const svc = new ChatService(
mockAgents(), mockLlms(), adapterRegistry(adapter),
chatRepo, mockPromptRepo(), mockTools(),
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toBe('Thinking out loud about the answer');
const stored = chatRepo._msgs.find((m) => m.role === 'assistant');
expect(stored?.content).toBe('Thinking out loud about the answer');
});
it('chat appends [response truncated by max_tokens] when finish_reason is "length"', async () => {
const chatRepo = mockChatRepo();
const adapter: LlmAdapter = {
kind: 'thinking-clipped',
infer: vi.fn(async () => ({
status: 200,
body: {
choices: [{
index: 0,
message: { role: 'assistant', content: null, reasoning_content: 'partial reasoning that ran out of' },
finish_reason: 'length',
}],
},
})),
stream: async function*() { yield { data: '[DONE]', done: true }; },
};
const svc = new ChatService(
mockAgents(), mockLlms(), adapterRegistry(adapter),
chatRepo, mockPromptRepo(), mockTools(),
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toContain('partial reasoning that ran out of');
expect(result.assistant).toContain('[response truncated by max_tokens]');
});
it('chat prefers content when both content and reasoning_content are present', async () => {
// Thinking models that DO produce content shouldn't see the reasoning
// bleed into the response — that's what the streaming path's
// text/thinking split is for, and the non-streaming path should match.
const chatRepo = mockChatRepo();
const adapter: LlmAdapter = {
kind: 'thinking-with-content',
infer: vi.fn(async () => ({
status: 200,
body: {
choices: [{
index: 0,
message: { role: 'assistant', content: 'real answer', reasoning_content: 'background thinking' },
finish_reason: 'stop',
}],
},
})),
stream: async function*() { yield { data: '[DONE]', done: true }; },
};
const svc = new ChatService(
mockAgents(), mockLlms(), adapterRegistry(adapter),
chatRepo, mockPromptRepo(), mockTools(),
);
const result = await svc.chat({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' });
expect(result.assistant).toBe('real answer');
expect(result.assistant).not.toContain('background thinking');
});
it('chatStream emits a synthetic text frame and persists reasoning when content is empty', async () => {
const chatRepo = mockChatRepo();
const adapter: LlmAdapter = {
kind: 'thinking-only-stream',
infer: vi.fn(),
stream: async function*() {
yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'thinking ' }, finish_reason: null }] }) };
yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'more.' }, finish_reason: 'stop' }] }) };
yield { data: '[DONE]', done: true };
},
};
const svc = new ChatService(
mockAgents(), mockLlms(), adapterRegistry(adapter),
chatRepo, mockPromptRepo(), mockTools(),
);
const chunks: Array<{ type: string; delta?: string }> = [];
for await (const c of svc.chatStream({ agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1' })) {
chunks.push({ type: c.type, delta: c.delta });
}
// 2 thinking deltas (live), 1 synthesized text frame, 1 final.
expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta)).toEqual(['thinking ', 'more.']);
expect(chunks.filter((c) => c.type === 'text').map((c) => c.delta)).toEqual(['thinking more.']);
// The thread message captures the synthesized text so resumed chats see
// a coherent assistant turn (rather than blank).
const stored = chatRepo._msgs.find((m) => m.role === 'assistant');
expect(stored?.content).toBe('thinking more.');
});
// Regression: provider_specific_fields.reasoning_content shape (LiteLLM
// passthrough from vLLM) is also recognized.
it('chatStream recognizes LiteLLM provider_specific_fields.reasoning_content', async () => {

View File

@@ -71,6 +71,36 @@ describe('OpenAiPassthroughAdapter', () => {
await expect(adapter.infer(makeCtx())).rejects.toThrow(/no default endpoint/);
});
it('infer: strips a trailing /v1 from the configured URL', async () => {
// Users naturally paste the OpenAI-style base URL with /v1 because
// every provider documents it that way (https://api.openai.com/v1,
// https://llm.example.com/v1). The adapter then re-appends
// /v1/chat/completions; without normalization this would produce a
// doubled-/v1 404 against LiteLLM and friends.
const fetchFn = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]);
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
await adapter.infer(makeCtx({ url: 'https://llm.example.com/v1' }));
const [url1] = fetchFn.mock.calls[0] as [string];
expect(url1).toBe('https://llm.example.com/v1/chat/completions');
// Trailing slash + /v1 should also normalize correctly.
const fetchFn2 = mockFetch([{ match: /\/v1\/chat\/completions$/, status: 200, body: {} }]);
const adapter2 = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn2 as unknown as typeof fetch });
await adapter2.infer(makeCtx({ url: 'https://llm.example.com/v1/' }));
const [url2] = fetchFn2.mock.calls[0] as [string];
expect(url2).toBe('https://llm.example.com/v1/chat/completions');
});
it('infer: preserves a trailing /v1beta suffix (only exact /v1 is stripped)', async () => {
// Some providers expose `/v1beta` as a parallel API surface — don't
// accidentally rewrite that to `/v1` or strip it.
const fetchFn = mockFetch([{ match: /\/v1beta\/v1\/chat\/completions$/, status: 200, body: {} }]);
const adapter = new OpenAiPassthroughAdapter('openai', { fetch: fetchFn as unknown as typeof fetch });
await adapter.infer(makeCtx({ url: 'https://api.example.com/v1beta' }));
const [url] = fetchFn.mock.calls[0] as [string];
expect(url).toBe('https://api.example.com/v1beta/v1/chat/completions');
});
it('infer: omits Authorization when apiKey is empty', async () => {
const fetchFn = mockFetch([{ match: /ollama/, status: 200, body: {} }]);
const adapter = new OpenAiPassthroughAdapter('ollama', { fetch: fetchFn as unknown as typeof fetch });

View File

@@ -0,0 +1,376 @@
import { describe, it, expect, vi } from 'vitest';
import { AgentService, type VirtualAgentInput } from '../src/services/agent.service.js';
import { VirtualLlmService } from '../src/services/virtual-llm.service.js';
import type { IAgentRepository } from '../src/repositories/agent.repository.js';
import type { ILlmRepository } from '../src/repositories/llm.repository.js';
import type { LlmService } from '../src/services/llm.service.js';
import type { ProjectService } from '../src/services/project.service.js';
import type { Agent, Llm } from '@prisma/client';
/**
* v3 Stage 2 — virtual-agent lifecycle methods on AgentService and the
* cascade callbacks wired into VirtualLlmService.gcSweep / heartbeat /
* unbindSession. Mirrors the shape of virtual-llm-service.test.ts but
* focused on the agent-side state machine + the Llm→Agent cascade.
*/
const NOW = new Date();
function makeAgent(overrides: Partial<Agent> = {}): Agent {
return {
id: `agent-${Math.random().toString(36).slice(2, 8)}`,
name: 'fake-agent',
description: '',
systemPrompt: '',
llmId: 'llm-1',
projectId: null,
defaultPersonalityId: null,
proxyModelName: null,
defaultParams: {} as Agent['defaultParams'],
extras: {} as Agent['extras'],
kind: 'virtual',
providerSessionId: 'sess-1',
lastHeartbeatAt: NOW,
status: 'active',
inactiveSince: null,
ownerId: 'owner-1',
version: 1,
createdAt: NOW,
updatedAt: NOW,
...overrides,
};
}
function makeLlm(overrides: Partial<Llm> = {}): Llm {
return {
id: `llm-${Math.random().toString(36).slice(2, 8)}`,
name: 'vllm-local',
type: 'openai',
model: 'm',
url: '',
tier: 'fast',
description: '',
apiKeySecretId: null,
apiKeySecretKey: null,
extraConfig: {} as Llm['extraConfig'],
kind: 'virtual',
providerSessionId: 'sess-1',
lastHeartbeatAt: NOW,
status: 'active',
inactiveSince: null,
version: 1,
createdAt: NOW,
updatedAt: NOW,
...overrides,
};
}
function mockAgentRepo(initial: Agent[] = []): IAgentRepository {
const rows = new Map<string, Agent>(initial.map((r) => [r.id, r]));
let counter = rows.size;
return {
findAll: vi.fn(async () => [...rows.values()]),
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
findByName: vi.fn(async (name: string) => {
for (const r of rows.values()) if (r.name === name) return r;
return null;
}),
findByProjectId: vi.fn(async () => []),
findBySessionId: vi.fn(async (sid: string) =>
[...rows.values()].filter((r) => r.providerSessionId === sid)),
findByLlmId: vi.fn(async (llmId: string) =>
[...rows.values()].filter((r) => r.llmId === llmId)),
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'active'
&& r.lastHeartbeatAt !== null
&& r.lastHeartbeatAt < cutoff)),
findExpiredInactives: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'inactive'
&& r.inactiveSince !== null
&& r.inactiveSince < cutoff)),
create: vi.fn(async (data) => {
counter += 1;
const row = makeAgent({
id: `agent-${String(counter)}`,
name: data.name,
description: data.description ?? '',
systemPrompt: data.systemPrompt ?? '',
llmId: data.llmId,
projectId: data.projectId ?? null,
kind: data.kind ?? 'public',
providerSessionId: data.providerSessionId ?? null,
status: data.status ?? 'active',
lastHeartbeatAt: data.lastHeartbeatAt ?? null,
inactiveSince: data.inactiveSince ?? null,
ownerId: data.ownerId,
});
rows.set(row.id, row);
return row;
}),
update: vi.fn(async (id, data) => {
const existing = rows.get(id);
if (!existing) throw new Error('not found');
const next: Agent = {
...existing,
...(data.description !== undefined ? { description: data.description } : {}),
...(data.systemPrompt !== undefined ? { systemPrompt: data.systemPrompt } : {}),
...(data.llmId !== undefined ? { llmId: data.llmId } : {}),
...(data.projectId !== undefined ? { projectId: data.projectId } : {}),
...(data.kind !== undefined ? { kind: data.kind } : {}),
...(data.providerSessionId !== undefined ? { providerSessionId: data.providerSessionId } : {}),
...(data.status !== undefined ? { status: data.status } : {}),
...(data.lastHeartbeatAt !== undefined ? { lastHeartbeatAt: data.lastHeartbeatAt } : {}),
...(data.inactiveSince !== undefined ? { inactiveSince: data.inactiveSince } : {}),
};
rows.set(id, next);
return next;
}),
delete: vi.fn(async (id: string) => { rows.delete(id); }),
};
}
function mockLlms(): LlmService {
return {
getById: vi.fn(async (id: string) => ({ id, name: 'vllm-local', type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
getByName: vi.fn(async (name: string) => ({ id: 'llm-1', name, type: 'openai', model: 'm', kind: 'virtual', status: 'active' })),
} as unknown as LlmService;
}
function mockProjects(): ProjectService {
return {
getById: vi.fn(async (id: string) => ({ id, name: 'mcpctl-dev' })),
resolveAndGet: vi.fn(async (idOrName: string) => ({
id: idOrName === 'mcpctl-dev' ? 'proj-1' : 'proj-other',
name: idOrName,
})),
} as unknown as ProjectService;
}
describe('AgentService — virtual-agent lifecycle (v3 Stage 2)', () => {
it('registerVirtualAgents inserts new rows with kind=virtual / status=active', async () => {
const repo = mockAgentRepo();
const svc = new AgentService(repo, mockLlms(), mockProjects());
const inputs: VirtualAgentInput[] = [
{ name: 'local-coder', llmName: 'vllm-local', description: 'd', systemPrompt: 's' },
];
const out = await svc.registerVirtualAgents('sess-1', inputs, 'owner-1');
expect(out).toHaveLength(1);
expect(out[0]!.kind).toBe('virtual');
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents reuses an existing row from the same session (sticky reconnect)', async () => {
const existing = makeAgent({ name: 'local-coder', providerSessionId: 'sess-1', status: 'inactive', inactiveSince: NOW });
const repo = mockAgentRepo([existing]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const out = await svc.registerVirtualAgents(
'sess-1',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
);
expect(out[0]!.id).toBe(existing.id);
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents adopts an inactive virtual from a different session', async () => {
const existing = makeAgent({
name: 'local-coder', providerSessionId: 'old-session',
status: 'inactive', inactiveSince: NOW,
});
const repo = mockAgentRepo([existing]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const out = await svc.registerVirtualAgents(
'new-session',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
);
expect(out[0]!.id).toBe(existing.id);
expect(out[0]!.status).toBe('active');
});
it('registerVirtualAgents refuses to overwrite a public agent (409)', async () => {
const repo = mockAgentRepo([makeAgent({ name: 'reviewer', kind: 'public', providerSessionId: null })]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await expect(svc.registerVirtualAgents(
'sess-x',
[{ name: 'reviewer', llmName: 'vllm-local' }],
'owner-1',
)).rejects.toThrow(/Cannot publish over public Agent/);
});
it('registerVirtualAgents refuses if another active session owns the name', async () => {
const repo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'other', status: 'active' })]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await expect(svc.registerVirtualAgents(
'mine',
[{ name: 'local-coder', llmName: 'vllm-local' }],
'owner-1',
)).rejects.toThrow(/already active under a different session/);
});
it('heartbeatVirtualAgents bumps + revives inactive', async () => {
const past = new Date(Date.now() - 5_000);
const a = makeAgent({ name: 'a', providerSessionId: 'sess', status: 'inactive', lastHeartbeatAt: past, inactiveSince: past });
const repo = mockAgentRepo([a]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await svc.heartbeatVirtualAgents('sess');
const row = await repo.findByName('a');
expect(row?.status).toBe('active');
expect(row?.inactiveSince).toBeNull();
expect(row!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
});
it('markVirtualAgentsInactiveBySession flips owned actives to inactive', async () => {
const repo = mockAgentRepo([
makeAgent({ name: 'a', providerSessionId: 'sess' }),
makeAgent({ name: 'b', providerSessionId: 'sess' }),
makeAgent({ name: 'c', providerSessionId: 'other' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
await svc.markVirtualAgentsInactiveBySession('sess');
expect((await repo.findByName('a'))?.status).toBe('inactive');
expect((await repo.findByName('b'))?.status).toBe('inactive');
expect((await repo.findByName('c'))?.status).toBe('active');
});
it('deleteVirtualAgentsForLlm deletes only virtuals pinned to that Llm', async () => {
const repo = mockAgentRepo([
makeAgent({ name: 'v-1', llmId: 'doomed', kind: 'virtual' }),
makeAgent({ name: 'v-2', llmId: 'doomed', kind: 'virtual' }),
makeAgent({ name: 'pub-1', llmId: 'doomed', kind: 'public', providerSessionId: null }),
makeAgent({ name: 'v-other', llmId: 'safe', kind: 'virtual' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const deleted = await svc.deleteVirtualAgentsForLlm('doomed');
expect(deleted).toBe(2);
expect(await repo.findByName('v-1')).toBeNull();
expect(await repo.findByName('v-2')).toBeNull();
expect(await repo.findByName('pub-1')).not.toBeNull();
expect(await repo.findByName('v-other')).not.toBeNull();
});
it('gcSweepVirtualAgents flips heartbeat-stale + deletes 4h-old inactive', async () => {
const long = new Date(Date.now() - 5 * 60 * 1000); // 5 min ago, past 90s cutoff
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 h ago, past 4h cutoff
const repo = mockAgentRepo([
makeAgent({ name: 'stale', providerSessionId: 'a', status: 'active', lastHeartbeatAt: long }),
makeAgent({ name: 'old', providerSessionId: 'b', status: 'inactive', inactiveSince: ancient }),
makeAgent({ name: 'pub', providerSessionId: null, kind: 'public' }),
]);
const svc = new AgentService(repo, mockLlms(), mockProjects());
const r = await svc.gcSweepVirtualAgents();
expect(r.markedInactive).toBe(1);
expect(r.deleted).toBe(1);
expect((await repo.findByName('stale'))?.status).toBe('inactive');
expect(await repo.findByName('old')).toBeNull();
expect(await repo.findByName('pub')).not.toBeNull();
});
});
describe('VirtualLlmService cascade through AgentService (v3 Stage 2)', () => {
function mockLlmRepo(initial: Llm[] = []): ILlmRepository {
const rows = new Map<string, Llm>(initial.map((r) => [r.id, r]));
let counter = rows.size;
return {
findAll: vi.fn(async () => [...rows.values()]),
findById: vi.fn(async (id: string) => rows.get(id) ?? null),
findByName: vi.fn(async (name: string) => {
for (const r of rows.values()) if (r.name === name) return r;
return null;
}),
findByTier: vi.fn(async () => []),
findBySessionId: vi.fn(async (sid: string) =>
[...rows.values()].filter((r) => r.providerSessionId === sid)),
findStaleVirtuals: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'active'
&& r.lastHeartbeatAt !== null
&& r.lastHeartbeatAt < cutoff)),
findExpiredInactives: vi.fn(async (cutoff: Date) =>
[...rows.values()].filter((r) =>
r.kind === 'virtual'
&& r.status === 'inactive'
&& r.inactiveSince !== null
&& r.inactiveSince < cutoff)),
create: vi.fn(async (data) => {
counter += 1;
const row = makeLlm({ id: `llm-${String(counter)}`, name: data.name, type: data.type });
rows.set(row.id, row);
return row;
}),
update: vi.fn(async (id, data) => {
const existing = rows.get(id);
if (!existing) throw new Error('not found');
const next: Llm = { ...existing, ...data } as Llm;
rows.set(id, next);
return next;
}),
delete: vi.fn(async (id: string) => { rows.delete(id); }),
};
}
it('unbindSession cascades to mark virtual agents inactive', async () => {
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess' })]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'local-coder', providerSessionId: 'sess' }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.unbindSession('sess');
expect((await agentRepo.findByName('local-coder'))?.status).toBe('inactive');
});
it('gcSweep deletes virtual agents BEFORE their pinned virtual Llm', async () => {
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
const llmRepo = mockLlmRepo([makeLlm({
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
status: 'inactive', inactiveSince: ancient,
})]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: ancient }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
const r = await svc.gcSweep();
expect(r.deleted).toBeGreaterThanOrEqual(2); // 1 agent + 1 llm
expect(await llmRepo.findByName('vllm-local')).toBeNull();
expect(await agentRepo.findByName('pinned')).toBeNull();
});
it('gcSweep defensive cascade: still drops the agent when its heartbeat lagged the Llm', async () => {
// The Llm is past the 4h cutoff. The agent is inactive but only
// 1h old — wouldn't be GC'd by gcSweepVirtualAgents on its own.
// The defensive cascade in gcSweep deletes it anyway because the
// Restrict FK would otherwise block the Llm delete.
const ancient = new Date(Date.now() - 5 * 60 * 60 * 1000);
const recent = new Date(Date.now() - 1 * 60 * 60 * 1000);
const llmRepo = mockLlmRepo([makeLlm({
id: 'doomed-llm', name: 'vllm-local', providerSessionId: 'sess',
status: 'inactive', inactiveSince: ancient,
})]);
const agentRepo = mockAgentRepo([
makeAgent({ name: 'pinned', providerSessionId: 'sess', llmId: 'doomed-llm', status: 'inactive', inactiveSince: recent }),
]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.gcSweep();
expect(await llmRepo.findByName('vllm-local')).toBeNull();
expect(await agentRepo.findByName('pinned')).toBeNull();
});
it('heartbeat cascades to bump owned virtual agents', async () => {
const past = new Date(Date.now() - 10_000);
const llmRepo = mockLlmRepo([makeLlm({ name: 'vllm-local', providerSessionId: 'sess', lastHeartbeatAt: past })]);
const agentRepo = mockAgentRepo([makeAgent({ name: 'local-coder', providerSessionId: 'sess', lastHeartbeatAt: past })]);
const agents = new AgentService(agentRepo, mockLlms(), mockProjects());
const svc = new VirtualLlmService(llmRepo, agents);
await svc.heartbeat('sess');
const a = await agentRepo.findByName('local-coder');
expect(a!.lastHeartbeatAt!.getTime()).toBeGreaterThan(past.getTime());
});
});

View File

@@ -108,8 +108,27 @@ interface LlmMultiFileConfig {
providers: LlmProviderFileEntry[];
}
/**
* Local agent declaration (v3). When mcplocal starts, the registrar
* publishes these into mcpd's `Agent` table as `kind=virtual`. They show
* up under `mcpctl get agent` and become chat-able via `mcpctl chat <name>`.
*
* `llm` references a published provider's name from the `llm.providers`
* array — the registrar resolves it server-side.
*/
export interface AgentFileEntry {
name: string;
llm: string;
description?: string;
systemPrompt?: string;
project?: string;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
}
interface McpctlConfig {
llm?: LlmFileConfig | LlmMultiFileConfig;
agents?: AgentFileEntry[];
projects?: Record<string, { llm?: ProjectLlmOverride }>;
}
@@ -190,6 +209,15 @@ export function loadProjectLlmOverride(projectName: string): ProjectLlmOverride
return config.projects?.[projectName]?.llm;
}
/**
* Load locally-declared agents from ~/.mcpctl/config.json (v3 virtual
* agents). Returns empty array if no agents block is configured.
*/
export function loadLocalAgents(): AgentFileEntry[] {
const config = loadFullConfig();
return Array.isArray(config.agents) ? config.agents : [];
}
/** Reset cached config (for testing). */
export function resetConfigCache(): void {
cachedConfig = null;

View File

@@ -7,12 +7,12 @@ import { StdioProxyServer } from './server.js';
import { StdioUpstream } from './upstream/stdio.js';
import { HttpUpstream } from './upstream/http.js';
import { createHttpServer } from './http/server.js';
import { loadHttpConfig, loadLlmProviders } from './http/config.js';
import type { HttpConfig, LlmProviderFileEntry } from './http/config.js';
import { loadHttpConfig, loadLlmProviders, loadLocalAgents } from './http/config.js';
import type { HttpConfig, LlmProviderFileEntry, AgentFileEntry } from './http/config.js';
import { createProvidersFromConfig } from './llm-config.js';
import { createSecretStore } from '@mcpctl/shared';
import type { ProviderRegistry } from './providers/registry.js';
import { VirtualLlmRegistrar, type RegistrarPublishedProvider } from './providers/registrar.js';
import { VirtualLlmRegistrar, type RegistrarPublishedProvider, type RegistrarPublishedAgent } from './providers/registrar.js';
import { startWatchers, stopWatchers, reloadStages } from './proxymodel/watcher.js';
import { existsSync, readFileSync as readFileSyncNs } from 'node:fs';
import { homedir } from 'node:os';
@@ -151,7 +151,8 @@ export async function main(argv: string[] = process.argv): Promise<MainResult> {
// Virtual-LLM registrar: publish opted-in providers (`publish: true`)
// into mcpd's Llm registry. Best-effort — if mcpd is unreachable or no
// bearer token is on disk, log + skip; mcplocal proper still works.
const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries);
const localAgents = loadLocalAgents();
const registrar = await maybeStartVirtualLlmRegistrar(providerRegistry, llmEntries, localAgents);
// Graceful shutdown
let shuttingDown = false;
@@ -198,9 +199,10 @@ if (isMain) {
async function maybeStartVirtualLlmRegistrar(
providerRegistry: ProviderRegistry,
llmEntries: LlmProviderFileEntry[],
localAgents: AgentFileEntry[] = [],
): Promise<VirtualLlmRegistrar | null> {
const opted = llmEntries.filter((e) => e.publish === true);
if (opted.length === 0) return null;
if (opted.length === 0 && localAgents.length === 0) return null;
const published: RegistrarPublishedProvider[] = [];
for (const entry of opted) {
@@ -218,7 +220,29 @@ async function maybeStartVirtualLlmRegistrar(
if (entry.wake !== undefined) item.wake = entry.wake;
published.push(item);
}
if (published.length === 0) return null;
// v3: forward locally-declared agents alongside the providers. We
// only forward agents whose `llm` field points at a name we're
// actually publishing (or pre-declared). Stale entries are dropped
// with a warning rather than failing the whole registration.
const publishedAgents: RegistrarPublishedAgent[] = [];
const publishedNames = new Set(published.map((p) => p.provider.name));
for (const a of localAgents) {
if (!publishedNames.has(a.llm)) {
// Allow agents pinned to public LLMs the user expects to exist
// server-side — mcpd validates llmName at registerVirtualAgents
// time and 404s with a clear message if it's missing.
// We don't drop these client-side; just note it.
}
const item: RegistrarPublishedAgent = { name: a.name, llmName: a.llm };
if (a.description !== undefined) item.description = a.description;
if (a.systemPrompt !== undefined) item.systemPrompt = a.systemPrompt;
if (a.project !== undefined) item.project = a.project;
if (a.defaultParams !== undefined) item.defaultParams = a.defaultParams;
if (a.extras !== undefined) item.extras = a.extras;
publishedAgents.push(item);
}
if (published.length === 0 && publishedAgents.length === 0) return null;
// Resolve mcpd URL + bearer. Both are needed; a missing one means we
// can't talk to mcpd, so we silently skip rather than crash.
@@ -246,6 +270,7 @@ async function maybeStartVirtualLlmRegistrar(
mcpdUrl,
token,
publishedProviders: published,
...(publishedAgents.length > 0 ? { publishedAgents } : {}),
sessionFilePath: join(homedir(), '.mcpctl', 'provider-session'),
log: {
info: (msg) => process.stderr.write(`${msg}\n`),

View File

@@ -56,10 +56,28 @@ export interface RegistrarPublishedProvider {
wake?: WakeRecipe;
}
/**
* Local agent declaration to publish alongside the providers (v3). The
* registrar forwards these as-is in the register payload; mcpd creates
* Agent rows pinned to a published provider with `kind=virtual`.
*/
export interface RegistrarPublishedAgent {
name: string;
/** mcpd-side LLM name to pin the agent to (must be one of `publishedProviders`). */
llmName: string;
description?: string;
systemPrompt?: string;
project?: string;
defaultParams?: Record<string, unknown>;
extras?: Record<string, unknown>;
}
export interface RegistrarOptions {
mcpdUrl: string;
token: string;
publishedProviders: RegistrarPublishedProvider[];
/** Optional v3 — local agents to publish alongside the providers. */
publishedAgents?: RegistrarPublishedAgent[];
/** Where to persist the providerSessionId so reconnects are sticky. */
sessionFilePath: string;
log: RegistrarLogger;
@@ -172,6 +190,20 @@ export class VirtualLlmRegistrar {
}));
const body: Record<string, unknown> = { providers };
if (this.sessionId !== null) body['providerSessionId'] = this.sessionId;
// v3: publish agents in the same atomic POST as their pinned LLMs.
// Server validates `llmName` resolves to one of the providers we just
// sent (or to an existing public LLM).
if (this.opts.publishedAgents !== undefined && this.opts.publishedAgents.length > 0) {
body['agents'] = this.opts.publishedAgents.map((a) => ({
name: a.name,
llmName: a.llmName,
...(a.description !== undefined ? { description: a.description } : {}),
...(a.systemPrompt !== undefined ? { systemPrompt: a.systemPrompt } : {}),
...(a.project !== undefined ? { project: a.project } : {}),
...(a.defaultParams !== undefined ? { defaultParams: a.defaultParams } : {}),
...(a.extras !== undefined ? { extras: a.extras } : {}),
}));
}
const res = await postJson(
this.urlFor('/api/v1/llms/_provider-register'),

View File

@@ -17,7 +17,7 @@
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import http from 'node:http';
import https from 'node:https';
import { execSync } from 'node:child_process';
import { spawnSync, execSync } from 'node:child_process';
const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu';
const LLM_URL = process.env.MCPCTL_SMOKE_LLM_URL;
@@ -31,21 +31,37 @@ const AGENT_NAME = `smoke-chat-agent-${SUFFIX}`;
interface CliResult { code: number; stdout: string; stderr: string }
function run(args: string): CliResult {
try {
const stdout = execSync(`mcpctl --direct ${args}`, {
// spawnSync (not execSync) — execSync returns only stdout on success and
// discards stderr, which made any `thread:` assertion against a successful
// chat impossible to evaluate. Splitting the args correctly handles the
// few existing call sites that quote-wrap multi-word values like
// `--system-prompt "You are..."`.
const argv = splitArgs(args);
const res = spawnSync('mcpctl', ['--direct', ...argv], {
encoding: 'utf-8',
timeout: 60_000,
stdio: ['ignore', 'pipe', 'pipe'],
});
return { code: 0, stdout: stdout.trim(), stderr: '' };
} catch (err) {
const e = err as { status?: number; stdout?: Buffer | string; stderr?: Buffer | string };
return {
code: e.status ?? 1,
stdout: e.stdout ? (typeof e.stdout === 'string' ? e.stdout : e.stdout.toString('utf-8')) : '',
stderr: e.stderr ? (typeof e.stderr === 'string' ? e.stderr : e.stderr.toString('utf-8')) : '',
code: res.status ?? 1,
stdout: (res.stdout ?? '').trim(),
stderr: (res.stderr ?? '').trim(),
};
}
/**
* Tokenize a shell-style argv string with simple double-quote support — just
* enough for the smoke test's call shapes. Not a full POSIX parser; we only
* need to keep `--system-prompt "You are a smoke test..."` together as one
* arg.
*/
function splitArgs(s: string): string[] {
const out: string[] = [];
const re = /"([^"]*)"|(\S+)/g;
let m: RegExpExecArray | null;
while ((m = re.exec(s)) !== null) {
out.push(m[1] !== undefined ? m[1] : (m[2] ?? ''));
}
return out;
}
function healthz(url: string, timeoutMs = 5000): Promise<boolean> {

View File

@@ -0,0 +1,215 @@
/**
* Smoke tests: v3 virtual agents — register a virtual Llm + a virtual
* Agent through the same `_provider-register` payload, then verify mcpd
* surfaces the agent as kind=virtual / status=active. Mirrors
* virtual-llm.smoke.test.ts's in-process registrar pattern so we don't
* need to mutate ~/.mcpctl/config.json or bounce systemd's mcplocal.
*
* Heartbeat-stale → inactive (90 s) and 4 h auto-deletion are covered by
* the unit suite (mcpd virtual-agent-service.test.ts); waiting > 90 s in
* smoke would balloon the suite duration.
*/
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import http from 'node:http';
import https from 'node:https';
import { mkdtempSync, rmSync, readFileSync, existsSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
VirtualLlmRegistrar,
type RegistrarPublishedProvider,
type RegistrarPublishedAgent,
} from '../../src/providers/registrar.js';
import type { LlmProvider, CompletionResult } from '../../src/providers/types.js';
const MCPD_URL = process.env.MCPD_URL ?? 'https://mcpctl.ad.itaz.eu';
const SUFFIX = Date.now().toString(36);
const PROVIDER_NAME = `smoke-vagent-llm-${SUFFIX}`;
const AGENT_NAME = `smoke-vagent-${SUFFIX}`;
function makeFakeProvider(name: string, content: string): LlmProvider {
return {
name,
async complete(): Promise<CompletionResult> {
return {
content,
toolCalls: [],
usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 },
finishReason: 'stop',
};
},
async listModels() { return []; },
async isAvailable() { return true; },
};
}
function healthz(url: string, timeoutMs = 5000): Promise<boolean> {
return new Promise((resolve) => {
const parsed = new URL(`${url.replace(/\/$/, '')}/healthz`);
const driver = parsed.protocol === 'https:' ? https : http;
const req = driver.get({
hostname: parsed.hostname,
port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80),
path: parsed.pathname,
timeout: timeoutMs,
}, (res) => { resolve((res.statusCode ?? 500) < 500); res.resume(); });
req.on('error', () => resolve(false));
req.on('timeout', () => { req.destroy(); resolve(false); });
});
}
function readToken(): string | null {
try {
const path = join(process.env.HOME ?? '', '.mcpctl', 'credentials');
if (!existsSync(path)) return null;
const parsed = JSON.parse(readFileSync(path, 'utf-8')) as { token?: string };
return parsed.token ?? null;
} catch {
return null;
}
}
interface HttpResponse { status: number; body: string }
function httpRequest(method: string, urlStr: string, body: unknown): Promise<HttpResponse> {
return new Promise((resolve, reject) => {
const tokenRaw = readToken();
const parsed = new URL(urlStr);
const driver = parsed.protocol === 'https:' ? https : http;
const headers: Record<string, string> = {
Accept: 'application/json',
...(body !== undefined ? { 'Content-Type': 'application/json' } : {}),
...(tokenRaw !== null ? { Authorization: `Bearer ${tokenRaw}` } : {}),
};
const req = driver.request({
hostname: parsed.hostname,
port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80),
path: parsed.pathname + parsed.search,
method,
headers,
timeout: 30_000,
}, (res) => {
const chunks: Buffer[] = [];
res.on('data', (c: Buffer) => chunks.push(c));
res.on('end', () => {
resolve({ status: res.statusCode ?? 0, body: Buffer.concat(chunks).toString('utf-8') });
});
});
req.on('error', reject);
req.on('timeout', () => { req.destroy(); reject(new Error(`httpRequest timeout: ${method} ${urlStr}`)); });
if (body !== undefined) req.write(JSON.stringify(body));
req.end();
});
}
interface AgentRow { id: string; name: string; kind?: string; status?: string; llm?: { name: string }; description?: string }
let mcpdUp = false;
let registrar: VirtualLlmRegistrar | null = null;
let tempDir: string;
describe('virtual-agent smoke (v3)', () => {
beforeAll(async () => {
mcpdUp = await healthz(MCPD_URL);
if (!mcpdUp) {
// eslint-disable-next-line no-console
console.warn(`\n ○ virtual-agent smoke: skipped — ${MCPD_URL}/healthz unreachable.\n`);
return;
}
if (readToken() === null) {
mcpdUp = false;
// eslint-disable-next-line no-console
console.warn('\n ○ virtual-agent smoke: skipped — no ~/.mcpctl/credentials.\n');
return;
}
tempDir = mkdtempSync(join(tmpdir(), 'mcpctl-virtual-agent-smoke-'));
}, 20_000);
afterAll(async () => {
if (registrar !== null) registrar.stop();
if (tempDir !== undefined) rmSync(tempDir, { recursive: true, force: true });
// Defensive cleanup: agent first (Llm.id has Restrict FK), then Llm.
if (mcpdUp) {
const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined);
if (agents.status === 200) {
const rows = JSON.parse(agents.body) as Array<{ id: string; name: string }>;
const row = rows.find((r) => r.name === AGENT_NAME);
if (row !== undefined) {
await httpRequest('DELETE', `${MCPD_URL}/api/v1/agents/${row.id}`, undefined);
}
}
const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
if (llms.status === 200) {
const rows = JSON.parse(llms.body) as Array<{ id: string; name: string }>;
const row = rows.find((r) => r.name === PROVIDER_NAME);
if (row !== undefined) {
await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined);
}
}
}
});
it('registrar publishes provider + agent in one round-trip and mcpd lists the agent kind=virtual / status=active', async () => {
if (!mcpdUp) return;
const token = readToken();
if (token === null) return;
const published: RegistrarPublishedProvider[] = [
{ provider: makeFakeProvider(PROVIDER_NAME, 'hi from virtual agent'), type: 'openai', model: 'fake-vagent', tier: 'fast' },
];
const publishedAgents: RegistrarPublishedAgent[] = [
{
name: AGENT_NAME,
llmName: PROVIDER_NAME,
description: 'v3 virtual agent smoke',
systemPrompt: 'You are a smoke test. Reply READY.',
defaultParams: { temperature: 0 },
},
];
registrar = new VirtualLlmRegistrar({
mcpdUrl: MCPD_URL,
token,
publishedProviders: published,
publishedAgents,
sessionFilePath: join(tempDir, 'session'),
log: { info: () => {}, warn: () => {}, error: () => {} },
heartbeatIntervalMs: 60_000,
});
await registrar.start();
expect(registrar.getSessionId()).not.toBeNull();
// Give the SSE handshake + atomic register a moment to settle.
await new Promise((r) => setTimeout(r, 400));
const res = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined);
expect(res.status).toBe(200);
const rows = JSON.parse(res.body) as AgentRow[];
const row = rows.find((r) => r.name === AGENT_NAME);
expect(row, `${AGENT_NAME} must be present`).toBeDefined();
expect(row!.kind).toBe('virtual');
expect(row!.status).toBe('active');
expect(row!.llm?.name).toBe(PROVIDER_NAME);
expect(row!.description).toBe('v3 virtual agent smoke');
}, 30_000);
it('publisher disconnect flips the agent to status=inactive (paired with its Llm)', async () => {
if (!mcpdUp) return;
if (registrar !== null) {
registrar.stop();
registrar = null;
}
// unbindSession runs synchronously on the SSE close handler; mcpd
// flips both the Llm and any agents owned by the session to
// inactive. A short wait covers the request round-trip.
await new Promise((r) => setTimeout(r, 400));
const agents = await httpRequest('GET', `${MCPD_URL}/api/v1/agents`, undefined);
expect(agents.status).toBe(200);
const agentRow = (JSON.parse(agents.body) as AgentRow[]).find((r) => r.name === AGENT_NAME);
expect(agentRow, `${AGENT_NAME} must still exist (deletion is GC-driven, not disconnect-driven)`).toBeDefined();
expect(agentRow!.status).toBe('inactive');
const llms = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
const llmRow = (JSON.parse(llms.body) as Array<{ name: string; status: string }>).find((r) => r.name === PROVIDER_NAME);
expect(llmRow!.status).toBe('inactive');
}, 30_000);
});