Compare commits
8 Commits
feat/virtu
...
perf/vites
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18245be0c1 | ||
| 45c7737ee1 | |||
|
|
e0cfe0ba4d | ||
|
|
db839afc57 | ||
|
|
af0fabd84f | ||
| 700d1683c2 | |||
|
|
2a44f60785 | ||
| 65b6b265d9 |
@@ -1,3 +1,9 @@
|
|||||||
|
# syntax=docker/dockerfile:1.6
|
||||||
|
# `# syntax=...` enables BuildKit's --mount feature on the builder so we can
|
||||||
|
# share the pnpm content-addressed store across image builds. Without it the
|
||||||
|
# next two RUN steps fall back to plain mode and the cache mount is ignored
|
||||||
|
# (build still works, just slower).
|
||||||
|
|
||||||
# Stage 1: Build TypeScript
|
# Stage 1: Build TypeScript
|
||||||
FROM node:20-alpine AS builder
|
FROM node:20-alpine AS builder
|
||||||
|
|
||||||
@@ -12,8 +18,12 @@ COPY src/db/package.json src/db/tsconfig.json src/db/
|
|||||||
COPY src/shared/package.json src/shared/tsconfig.json src/shared/
|
COPY src/shared/package.json src/shared/tsconfig.json src/shared/
|
||||||
COPY src/web/package.json src/web/tsconfig.json src/web/
|
COPY src/web/package.json src/web/tsconfig.json src/web/
|
||||||
|
|
||||||
# Install all dependencies
|
# Install all dependencies. The cache mount keeps pnpm's CAS store warm
|
||||||
RUN pnpm install --frozen-lockfile
|
# across builds: only newly-changed packages get downloaded; everything
|
||||||
|
# else hardlinks from the cache. Drops install from ~60s to <5s on a
|
||||||
|
# warm cache. `--frozen-lockfile` still guarantees lockfile fidelity.
|
||||||
|
RUN --mount=type=cache,id=pnpm-store-mcpd-builder,target=/root/.local/share/pnpm/store \
|
||||||
|
pnpm install --frozen-lockfile
|
||||||
|
|
||||||
# Copy source code
|
# Copy source code
|
||||||
COPY src/mcpd/src/ src/mcpd/src/
|
COPY src/mcpd/src/ src/mcpd/src/
|
||||||
@@ -42,8 +52,11 @@ COPY src/mcpd/package.json src/mcpd/
|
|||||||
COPY src/db/package.json src/db/
|
COPY src/db/package.json src/db/
|
||||||
COPY src/shared/package.json src/shared/
|
COPY src/shared/package.json src/shared/
|
||||||
|
|
||||||
# Install all deps (prisma CLI needed at runtime for db push)
|
# Install all deps (prisma CLI needed at runtime for db push). Same
|
||||||
RUN pnpm install --frozen-lockfile
|
# cache-mount trick as the builder stage; separate cache id so the two
|
||||||
|
# stages don't compete for the same lock.
|
||||||
|
RUN --mount=type=cache,id=pnpm-store-mcpd-runtime,target=/root/.local/share/pnpm/store \
|
||||||
|
pnpm install --frozen-lockfile
|
||||||
|
|
||||||
# Copy prisma schema and generate client
|
# Copy prisma schema and generate client
|
||||||
COPY src/db/prisma/ src/db/prisma/
|
COPY src/db/prisma/ src/db/prisma/
|
||||||
|
|||||||
@@ -97,11 +97,11 @@ route branches on it server-side.
|
|||||||
|
|
||||||
## Lifecycle in detail
|
## Lifecycle in detail
|
||||||
|
|
||||||
| State | What it means |
|
| State | What it means |
|
||||||
|----------------|-----------------------------------------------------------------------|
|
|----------------|---------------------------------------------------------------------------------|
|
||||||
| `active` | Heartbeat received within the last 90 s and the SSE channel is open. |
|
| `active` | Heartbeat received within the last 90 s and the SSE channel is open. |
|
||||||
| `inactive` | Either the SSE closed or the heartbeat watchdog tripped. Inference returns 503. |
|
| `inactive` | Either the SSE closed or the heartbeat watchdog tripped. Inference returns 503. |
|
||||||
| `hibernating` | Reserved for v2 (wake-on-demand). v1 never writes this state. |
|
| `hibernating` | Publisher is online but the backend is asleep; the next inference triggers a `wake` task before relaying. |
|
||||||
|
|
||||||
Two timers on mcpd run the GC sweep:
|
Two timers on mcpd run the GC sweep:
|
||||||
|
|
||||||
@@ -132,10 +132,75 @@ a finalized `CompletionResult`, not a token stream. Streaming requests
|
|||||||
therefore arrive at the caller as a single delta + `[DONE]`. Real
|
therefore arrive at the caller as a single delta + `[DONE]`. Real
|
||||||
per-token streaming is a v2 concern.
|
per-token streaming is a v2 concern.
|
||||||
|
|
||||||
|
## Wake-on-demand (v2)
|
||||||
|
|
||||||
|
A provider whose backend hibernates (a vLLM instance that suspends
|
||||||
|
when idle, an Ollama daemon that exits when nothing's connected, …)
|
||||||
|
can declare a **wake recipe** in mcplocal config. When that provider's
|
||||||
|
`isAvailable()` returns false at registrar startup, the row is
|
||||||
|
published as `status=hibernating`. The next inference request that
|
||||||
|
hits the row triggers the recipe and waits for the backend to come up
|
||||||
|
before relaying.
|
||||||
|
|
||||||
|
Two recipe types:
|
||||||
|
|
||||||
|
```jsonc
|
||||||
|
// HTTP — POST to a "wake controller" that starts the backend out of band.
|
||||||
|
{
|
||||||
|
"name": "vllm-local",
|
||||||
|
"type": "openai",
|
||||||
|
"model": "...",
|
||||||
|
"publish": true,
|
||||||
|
"wake": {
|
||||||
|
"type": "http",
|
||||||
|
"url": "http://10.0.0.50:9090/wake/vllm",
|
||||||
|
"method": "POST",
|
||||||
|
"headers": { "Authorization": "Bearer ..." },
|
||||||
|
"maxWaitSeconds": 60
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
```jsonc
|
||||||
|
// command — spawn a local process (systemd, wakeonlan, custom script).
|
||||||
|
{
|
||||||
|
"name": "vllm-local",
|
||||||
|
"type": "openai",
|
||||||
|
"model": "...",
|
||||||
|
"publish": true,
|
||||||
|
"wake": {
|
||||||
|
"type": "command",
|
||||||
|
"command": "/usr/local/bin/start-vllm",
|
||||||
|
"args": ["--profile", "qwen3"],
|
||||||
|
"maxWaitSeconds": 120
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
How a request flows when the row is `hibernating`:
|
||||||
|
|
||||||
|
```
|
||||||
|
client → mcpd POST /api/v1/llms/<name>/infer
|
||||||
|
mcpd: status === hibernating → push wake task on SSE
|
||||||
|
mcplocal: receive wake task → run recipe → poll isAvailable()
|
||||||
|
→ heartbeat each tick → POST { ok: true } back
|
||||||
|
mcpd: flip row → active, push the original infer task
|
||||||
|
mcplocal: run inference → POST result back
|
||||||
|
mcpd → client (forwards the inference result)
|
||||||
|
```
|
||||||
|
|
||||||
|
Concurrent infers for the same hibernating Llm share a single wake
|
||||||
|
task — only the first request triggers the recipe; later ones await
|
||||||
|
the same in-flight wake promise. After the wake settles, every queued
|
||||||
|
infer dispatches in order.
|
||||||
|
|
||||||
|
If the recipe fails (HTTP non-2xx, command exits non-zero, or the
|
||||||
|
provider doesn't come up within `maxWaitSeconds`), every queued infer
|
||||||
|
is rejected with a clear error and the row stays `hibernating` —
|
||||||
|
the next request gets a fresh wake attempt.
|
||||||
|
|
||||||
## Roadmap (later stages)
|
## Roadmap (later stages)
|
||||||
|
|
||||||
- **v2 — Wake-on-demand**: Secret-stored "wake recipe" so mcpd can ask
|
|
||||||
mcplocal to start a hibernating backend before sending inference.
|
|
||||||
- **v3 — Virtual agents**: mcplocal publishes its local agent configs
|
- **v3 — Virtual agents**: mcplocal publishes its local agent configs
|
||||||
(model + system prompt + sampling defaults) into mcpd's `Agent` table.
|
(model + system prompt + sampling defaults) into mcpd's `Agent` table.
|
||||||
- **v4 — LB pool by model**: agents can target a model name instead of
|
- **v4 — LB pool by model**: agents can target a model name instead of
|
||||||
|
|||||||
@@ -408,6 +408,17 @@ function toApplyDocs(resource: string, items: unknown[]): Array<{ kind: string }
|
|||||||
const kind = RESOURCE_KIND[resource] ?? resource;
|
const kind = RESOURCE_KIND[resource] ?? resource;
|
||||||
return items.map((item) => {
|
return items.map((item) => {
|
||||||
const cleaned = stripInternalFields(item as Record<string, unknown>);
|
const cleaned = stripInternalFields(item as Record<string, unknown>);
|
||||||
|
// Llm-specific: the new virtual-provider lifecycle fields collide with
|
||||||
|
// the apply-doc `kind` envelope (the schema uses `kind: public|virtual`)
|
||||||
|
// and aren't apply-able anyway — they're derived runtime state managed
|
||||||
|
// by VirtualLlmService. Drop them so YAML round-trips stay clean.
|
||||||
|
if (resource === 'llms') {
|
||||||
|
delete cleaned['kind'];
|
||||||
|
delete cleaned['status'];
|
||||||
|
delete cleaned['lastHeartbeatAt'];
|
||||||
|
delete cleaned['inactiveSince'];
|
||||||
|
delete cleaned['providerSessionId'];
|
||||||
|
}
|
||||||
return { kind, ...cleaned };
|
return { kind, ...cleaned };
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -150,6 +150,7 @@ function coerceProviderInput(raw: unknown): {
|
|||||||
tier?: string;
|
tier?: string;
|
||||||
description?: string;
|
description?: string;
|
||||||
extraConfig?: Record<string, unknown>;
|
extraConfig?: Record<string, unknown>;
|
||||||
|
initialStatus?: 'active' | 'hibernating';
|
||||||
} {
|
} {
|
||||||
if (raw === null || typeof raw !== 'object') {
|
if (raw === null || typeof raw !== 'object') {
|
||||||
throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 });
|
throw Object.assign(new Error('provider entry must be an object'), { statusCode: 400 });
|
||||||
@@ -170,5 +171,11 @@ function coerceProviderInput(raw: unknown): {
|
|||||||
if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') {
|
if (o['extraConfig'] !== null && typeof o['extraConfig'] === 'object') {
|
||||||
out.extraConfig = o['extraConfig'] as Record<string, unknown>;
|
out.extraConfig = o['extraConfig'] as Record<string, unknown>;
|
||||||
}
|
}
|
||||||
|
// Only accept the two values v2 actually defines. Anything else falls
|
||||||
|
// through to the service default (active) — matches v1 publishers that
|
||||||
|
// don't know about this field.
|
||||||
|
if (o['initialStatus'] === 'active' || o['initialStatus'] === 'hibernating') {
|
||||||
|
out.initialStatus = o['initialStatus'];
|
||||||
|
}
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,15 @@ export interface RegisterProviderInput {
|
|||||||
tier?: string;
|
tier?: string;
|
||||||
description?: string;
|
description?: string;
|
||||||
extraConfig?: Record<string, unknown>;
|
extraConfig?: Record<string, unknown>;
|
||||||
|
/**
|
||||||
|
* Optional. Lets the publisher hint that the underlying backend is
|
||||||
|
* asleep — mcpd records the row as `hibernating` and will dispatch a
|
||||||
|
* `wake` task before any inference. Defaults to `active` (today's
|
||||||
|
* behavior). v2 publishers (mcplocal with a configured wake recipe)
|
||||||
|
* pass 'hibernating' when `LlmProvider.isAvailable()` returns false at
|
||||||
|
* publish time.
|
||||||
|
*/
|
||||||
|
initialStatus?: 'active' | 'hibernating';
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface RegisterResult {
|
export interface RegisterResult {
|
||||||
@@ -103,6 +112,12 @@ export interface PendingTaskRef {
|
|||||||
export class VirtualLlmService implements IVirtualLlmService {
|
export class VirtualLlmService implements IVirtualLlmService {
|
||||||
private readonly sessions = new Map<string, VirtualSessionHandle>();
|
private readonly sessions = new Map<string, VirtualSessionHandle>();
|
||||||
private readonly tasksById = new Map<string, PendingTask>();
|
private readonly tasksById = new Map<string, PendingTask>();
|
||||||
|
/**
|
||||||
|
* Dedupe concurrent wake requests for the same Llm. The first request
|
||||||
|
* starts the wake; later requests for the same name await the same
|
||||||
|
* promise. Cleared as soon as the wake settles (success or failure).
|
||||||
|
*/
|
||||||
|
private readonly wakeInFlight = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
constructor(private readonly repo: ILlmRepository) {}
|
constructor(private readonly repo: ILlmRepository) {}
|
||||||
|
|
||||||
@@ -112,6 +127,7 @@ export class VirtualLlmService implements IVirtualLlmService {
|
|||||||
const llms: Llm[] = [];
|
const llms: Llm[] = [];
|
||||||
|
|
||||||
for (const p of input.providers) {
|
for (const p of input.providers) {
|
||||||
|
const initialStatus = p.initialStatus ?? 'active';
|
||||||
const existing = await this.repo.findByName(p.name);
|
const existing = await this.repo.findByName(p.name);
|
||||||
if (existing === null) {
|
if (existing === null) {
|
||||||
const created = await this.repo.create({
|
const created = await this.repo.create({
|
||||||
@@ -123,7 +139,7 @@ export class VirtualLlmService implements IVirtualLlmService {
|
|||||||
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
||||||
kind: 'virtual',
|
kind: 'virtual',
|
||||||
providerSessionId: sessionId,
|
providerSessionId: sessionId,
|
||||||
status: 'active',
|
status: initialStatus,
|
||||||
lastHeartbeatAt: now,
|
lastHeartbeatAt: now,
|
||||||
inactiveSince: null,
|
inactiveSince: null,
|
||||||
});
|
});
|
||||||
@@ -156,7 +172,7 @@ export class VirtualLlmService implements IVirtualLlmService {
|
|||||||
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
...(p.extraConfig !== undefined ? { extraConfig: p.extraConfig } : {}),
|
||||||
kind: 'virtual',
|
kind: 'virtual',
|
||||||
providerSessionId: sessionId,
|
providerSessionId: sessionId,
|
||||||
status: 'active',
|
status: initialStatus,
|
||||||
lastHeartbeatAt: now,
|
lastHeartbeatAt: now,
|
||||||
inactiveSince: null,
|
inactiveSince: null,
|
||||||
});
|
});
|
||||||
@@ -220,9 +236,9 @@ export class VirtualLlmService implements IVirtualLlmService {
|
|||||||
{ statusCode: 500 },
|
{ statusCode: 500 },
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if (llm.status !== 'active') {
|
if (llm.status === 'inactive') {
|
||||||
throw Object.assign(
|
throw Object.assign(
|
||||||
new Error(`Virtual Llm '${llmName}' is ${llm.status}; publisher offline`),
|
new Error(`Virtual Llm '${llmName}' is inactive; publisher offline`),
|
||||||
{ statusCode: 503 },
|
{ statusCode: 503 },
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -234,6 +250,16 @@ export class VirtualLlmService implements IVirtualLlmService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Wake-on-demand (v2) ──
|
||||||
|
// Status=hibernating means the publisher told us at register time
|
||||||
|
// (or via a later status update) that the backend is asleep. Fire a
|
||||||
|
// wake task and wait for the publisher to confirm readiness before
|
||||||
|
// dispatching the actual inference. Concurrent infers for the same
|
||||||
|
// Llm share a single wake promise.
|
||||||
|
if (llm.status === 'hibernating') {
|
||||||
|
await this.ensureAwake(llm.id, llm.name, llm.providerSessionId, handle);
|
||||||
|
}
|
||||||
|
|
||||||
const taskId = randomUUID();
|
const taskId = randomUUID();
|
||||||
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
|
const chunkSubscribers = new Set<(chunk: { data: string; done?: boolean }) => void>();
|
||||||
|
|
||||||
@@ -275,6 +301,77 @@ export class VirtualLlmService implements IVirtualLlmService {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Drive the publisher to wake the backend. Concurrent callers for the
|
||||||
|
* same Llm name share the in-flight promise — we only ever ask the
|
||||||
|
* publisher once. Throws on timeout or recipe failure; on success the
|
||||||
|
* row is flipped to active and subsequent infer calls proceed.
|
||||||
|
*/
|
||||||
|
private async ensureAwake(
|
||||||
|
llmId: string,
|
||||||
|
llmName: string,
|
||||||
|
sessionId: string,
|
||||||
|
handle: VirtualSessionHandle,
|
||||||
|
): Promise<void> {
|
||||||
|
const existing = this.wakeInFlight.get(llmName);
|
||||||
|
if (existing !== undefined) {
|
||||||
|
await existing;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const promise = this.runWake(llmId, llmName, sessionId, handle);
|
||||||
|
this.wakeInFlight.set(llmName, promise);
|
||||||
|
try {
|
||||||
|
await promise;
|
||||||
|
} finally {
|
||||||
|
this.wakeInFlight.delete(llmName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async runWake(
|
||||||
|
llmId: string,
|
||||||
|
llmName: string,
|
||||||
|
sessionId: string,
|
||||||
|
handle: VirtualSessionHandle,
|
||||||
|
): Promise<void> {
|
||||||
|
const taskId = randomUUID();
|
||||||
|
let resolveDone!: () => void;
|
||||||
|
let rejectDone!: (err: Error) => void;
|
||||||
|
const done = new Promise<void>((resolve, reject) => {
|
||||||
|
resolveDone = resolve;
|
||||||
|
rejectDone = reject;
|
||||||
|
});
|
||||||
|
|
||||||
|
const pending: PendingTask = {
|
||||||
|
taskId,
|
||||||
|
sessionId,
|
||||||
|
llmName,
|
||||||
|
streaming: false,
|
||||||
|
// Wake tasks return { ok: true } on success or never resolve at
|
||||||
|
// all if the publisher dies; the rejectNonStreaming path covers
|
||||||
|
// the disconnect-mid-wake case via unbindSession.
|
||||||
|
resolveNonStreaming: (_body, status) => {
|
||||||
|
if (status >= 200 && status < 300) resolveDone();
|
||||||
|
else rejectDone(new Error(`wake task returned status ${String(status)}`));
|
||||||
|
},
|
||||||
|
rejectNonStreaming: rejectDone,
|
||||||
|
pushChunk: null,
|
||||||
|
};
|
||||||
|
this.tasksById.set(taskId, pending);
|
||||||
|
|
||||||
|
handle.pushTask({ kind: 'wake', taskId, llmName });
|
||||||
|
|
||||||
|
await done;
|
||||||
|
|
||||||
|
// Flip the row to active so subsequent infer calls go through the
|
||||||
|
// normal active path. The publisher's own heartbeat will keep the
|
||||||
|
// row alive from this point.
|
||||||
|
await this.repo.update(llmId, {
|
||||||
|
status: 'active',
|
||||||
|
lastHeartbeatAt: new Date(),
|
||||||
|
inactiveSince: null,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
|
completeTask(taskId: string, result: { status: number; body: unknown }): boolean {
|
||||||
const t = this.tasksById.get(taskId);
|
const t = this.tasksById.get(taskId);
|
||||||
if (t === undefined) return false;
|
if (t === undefined) return false;
|
||||||
|
|||||||
@@ -332,6 +332,108 @@ describe('VirtualLlmService', () => {
|
|||||||
expect(await repo.findByName('public-survivor')).not.toBeNull();
|
expect(await repo.findByName('public-survivor')).not.toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── v2: wake-before-infer ──
|
||||||
|
|
||||||
|
it('hibernating: dispatches a wake task first and waits for it to complete before infer', async () => {
|
||||||
|
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||||
|
const svc = new VirtualLlmService(repo);
|
||||||
|
const session = fakeSession();
|
||||||
|
svc.bindSession('sess', session);
|
||||||
|
|
||||||
|
// Kick off enqueueInferTask. It blocks on the wake task.
|
||||||
|
const inferPromise = svc.enqueueInferTask(
|
||||||
|
'sleeping',
|
||||||
|
{ model: 'm', messages: [{ role: 'user', content: 'hi' }] },
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait a tick so the wake task gets pushed.
|
||||||
|
await new Promise((r) => setTimeout(r, 0));
|
||||||
|
expect(session.tasks).toHaveLength(1);
|
||||||
|
const wakeTask = session.tasks[0] as { kind: string; taskId: string; llmName: string };
|
||||||
|
expect(wakeTask.kind).toBe('wake');
|
||||||
|
expect(wakeTask.llmName).toBe('sleeping');
|
||||||
|
|
||||||
|
// Resolve the wake task — service flips the row to active, then
|
||||||
|
// pushes the infer task on the same session.
|
||||||
|
expect(svc.completeTask(wakeTask.taskId, { status: 200, body: { ok: true } })).toBe(true);
|
||||||
|
const ref = await inferPromise;
|
||||||
|
expect(session.tasks).toHaveLength(2);
|
||||||
|
const inferTask = session.tasks[1] as { kind: string; taskId: string };
|
||||||
|
expect(inferTask.kind).toBe('infer');
|
||||||
|
expect(inferTask.taskId).toBe(ref.taskId);
|
||||||
|
|
||||||
|
// The row should be active now — concurrent callers won't trigger another wake.
|
||||||
|
const row = await repo.findByName('sleeping');
|
||||||
|
expect(row?.status).toBe('active');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('hibernating: concurrent infer requests share a single wake task', async () => {
|
||||||
|
const repo = mockRepo([makeLlm({ name: 'sleeping', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||||
|
const svc = new VirtualLlmService(repo);
|
||||||
|
const session = fakeSession();
|
||||||
|
svc.bindSession('sess', session);
|
||||||
|
|
||||||
|
// Fire 3 concurrent infer requests against the same hibernating LLM.
|
||||||
|
const reqs = [
|
||||||
|
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||||
|
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||||
|
svc.enqueueInferTask('sleeping', { model: 'm', messages: [] }, false),
|
||||||
|
];
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 0));
|
||||||
|
// Exactly one wake task pushed, regardless of concurrent infers.
|
||||||
|
const wakeTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'wake');
|
||||||
|
expect(wakeTasks).toHaveLength(1);
|
||||||
|
|
||||||
|
const wakeTaskId = (session.tasks[0] as { taskId: string }).taskId;
|
||||||
|
expect(svc.completeTask(wakeTaskId, { status: 200, body: { ok: true } })).toBe(true);
|
||||||
|
|
||||||
|
const refs = await Promise.all(reqs);
|
||||||
|
// After wake, all 3 infer tasks pushed — total session tasks = 1 wake + 3 infer.
|
||||||
|
const inferTasks = (session.tasks as Array<{ kind: string }>).filter((t) => t.kind === 'infer');
|
||||||
|
expect(inferTasks).toHaveLength(3);
|
||||||
|
expect(refs.map((r) => r.taskId).sort()).toEqual(refs.map((r) => r.taskId).sort());
|
||||||
|
});
|
||||||
|
|
||||||
|
it('hibernating: rejects when the wake task fails', async () => {
|
||||||
|
const repo = mockRepo([makeLlm({ name: 'broken', providerSessionId: 'sess', status: 'hibernating' })]);
|
||||||
|
const svc = new VirtualLlmService(repo);
|
||||||
|
svc.bindSession('sess', fakeSession());
|
||||||
|
|
||||||
|
const inferPromise = svc.enqueueInferTask(
|
||||||
|
'broken',
|
||||||
|
{ model: 'm', messages: [] },
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
await new Promise((r) => setTimeout(r, 0));
|
||||||
|
|
||||||
|
// Get the wake task id from the in-flight tasks map (its only entry).
|
||||||
|
// We test the failure path via failTask which is part of the public
|
||||||
|
// surface used by the result-POST route handler.
|
||||||
|
const taskIds: string[] = [];
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
for (const id of (svc as any).tasksById.keys()) taskIds.push(id);
|
||||||
|
expect(taskIds).toHaveLength(1);
|
||||||
|
expect(svc.failTask(taskIds[0]!, new Error('wake recipe failed'))).toBe(true);
|
||||||
|
|
||||||
|
await expect(inferPromise).rejects.toThrow(/wake recipe failed/);
|
||||||
|
|
||||||
|
// Row stayed hibernating — the next request will get another wake try.
|
||||||
|
const row = await repo.findByName('broken');
|
||||||
|
expect(row?.status).toBe('hibernating');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('inactive: still rejects with 503 (publisher offline) — wake path only fires for hibernating', async () => {
|
||||||
|
const repo = mockRepo([makeLlm({ name: 'gone', providerSessionId: 'sess', status: 'inactive', inactiveSince: new Date() })]);
|
||||||
|
const svc = new VirtualLlmService(repo);
|
||||||
|
svc.bindSession('sess', fakeSession());
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
svc.enqueueInferTask('gone', { model: 'm', messages: [] }, false),
|
||||||
|
).rejects.toThrow(/inactive|publisher offline/);
|
||||||
|
});
|
||||||
|
|
||||||
it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => {
|
it('gcSweep is idempotent — running twice in a row is a no-op the second time', async () => {
|
||||||
const long = new Date(Date.now() - 5 * 60 * 1000);
|
const long = new Date(Date.now() - 5 * 60 * 1000);
|
||||||
const repo = mockRepo([
|
const repo = mockRepo([
|
||||||
|
|||||||
@@ -80,8 +80,25 @@ export interface LlmProviderFileEntry {
|
|||||||
* Default: false — existing setups don't change behavior.
|
* Default: false — existing setups don't change behavior.
|
||||||
*/
|
*/
|
||||||
publish?: boolean;
|
publish?: boolean;
|
||||||
|
/**
|
||||||
|
* Optional wake recipe for hibernating backends. When set, a provider
|
||||||
|
* whose `isAvailable()` returns false at registrar start time is
|
||||||
|
* published as `status=hibernating`. The next inference request that
|
||||||
|
* lands on the row triggers this recipe; mcplocal polls
|
||||||
|
* `isAvailable()` until it returns true (or times out) and then flips
|
||||||
|
* the row to active so mcpd can dispatch the queued inference.
|
||||||
|
*
|
||||||
|
* Two recipe types:
|
||||||
|
* - `http`: POST to a URL (e.g. an external sleep/wake controller)
|
||||||
|
* - `command`: spawn a shell command (e.g. `systemctl --user start vllm`)
|
||||||
|
*/
|
||||||
|
wake?: WakeRecipe;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type WakeRecipe =
|
||||||
|
| { type: 'http'; url: string; method?: 'GET' | 'POST'; headers?: Record<string, string>; body?: string; maxWaitSeconds?: number }
|
||||||
|
| { type: 'command'; command: string; args?: string[]; maxWaitSeconds?: number };
|
||||||
|
|
||||||
export interface ProjectLlmOverride {
|
export interface ProjectLlmOverride {
|
||||||
model?: string;
|
model?: string;
|
||||||
provider?: string;
|
provider?: string;
|
||||||
|
|||||||
@@ -215,6 +215,7 @@ async function maybeStartVirtualLlmRegistrar(
|
|||||||
model: entry.model ?? entry.name,
|
model: entry.model ?? entry.name,
|
||||||
};
|
};
|
||||||
if (entry.tier !== undefined) item.tier = entry.tier;
|
if (entry.tier !== undefined) item.tier = entry.tier;
|
||||||
|
if (entry.wake !== undefined) item.wake = entry.wake;
|
||||||
published.push(item);
|
published.push(item);
|
||||||
}
|
}
|
||||||
if (published.length === 0) return null;
|
if (published.length === 0) return null;
|
||||||
|
|||||||
@@ -27,7 +27,9 @@ import http from 'node:http';
|
|||||||
import https from 'node:https';
|
import https from 'node:https';
|
||||||
import { promises as fs } from 'node:fs';
|
import { promises as fs } from 'node:fs';
|
||||||
import { dirname } from 'node:path';
|
import { dirname } from 'node:path';
|
||||||
|
import { spawn } from 'node:child_process';
|
||||||
import type { LlmProvider, CompletionOptions } from './types.js';
|
import type { LlmProvider, CompletionOptions } from './types.js';
|
||||||
|
import type { WakeRecipe } from '../http/config.js';
|
||||||
|
|
||||||
export interface RegistrarLogger {
|
export interface RegistrarLogger {
|
||||||
info: (msg: string) => void;
|
info: (msg: string) => void;
|
||||||
@@ -45,6 +47,13 @@ export interface RegistrarPublishedProvider {
|
|||||||
tier?: 'fast' | 'heavy';
|
tier?: 'fast' | 'heavy';
|
||||||
/** Optional human-readable description for `mcpctl get llm`. */
|
/** Optional human-readable description for `mcpctl get llm`. */
|
||||||
description?: string;
|
description?: string;
|
||||||
|
/**
|
||||||
|
* Optional wake recipe for backends that hibernate. When provided AND
|
||||||
|
* `provider.isAvailable()` returns false at registrar start, the row is
|
||||||
|
* published with status=hibernating; on the first incoming `wake` task
|
||||||
|
* the registrar runs this recipe and waits for the backend to come up.
|
||||||
|
*/
|
||||||
|
wake?: WakeRecipe;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface RegistrarOptions {
|
export interface RegistrarOptions {
|
||||||
@@ -140,15 +149,28 @@ export class VirtualLlmRegistrar {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async register(): Promise<void> {
|
private async register(): Promise<void> {
|
||||||
const body: Record<string, unknown> = {
|
// Decide initial status per provider. A provider with a wake recipe
|
||||||
providers: this.opts.publishedProviders.map((p) => ({
|
// that's NOT currently available comes up as hibernating; otherwise
|
||||||
|
// active (today's behavior). isAvailable() is forgiving — any
|
||||||
|
// unexpected throw is treated as "not available" so a transient
|
||||||
|
// network blip during boot doesn't crash the registrar.
|
||||||
|
const providers = await Promise.all(this.opts.publishedProviders.map(async (p) => {
|
||||||
|
let initialStatus: 'active' | 'hibernating' = 'active';
|
||||||
|
if (p.wake !== undefined) {
|
||||||
|
let alive = false;
|
||||||
|
try { alive = await p.provider.isAvailable(); } catch { alive = false; }
|
||||||
|
if (!alive) initialStatus = 'hibernating';
|
||||||
|
}
|
||||||
|
return {
|
||||||
name: p.provider.name,
|
name: p.provider.name,
|
||||||
type: p.type,
|
type: p.type,
|
||||||
model: p.model,
|
model: p.model,
|
||||||
...(p.tier !== undefined ? { tier: p.tier } : {}),
|
...(p.tier !== undefined ? { tier: p.tier } : {}),
|
||||||
...(p.description !== undefined ? { description: p.description } : {}),
|
...(p.description !== undefined ? { description: p.description } : {}),
|
||||||
})),
|
initialStatus,
|
||||||
};
|
};
|
||||||
|
}));
|
||||||
|
const body: Record<string, unknown> = { providers };
|
||||||
if (this.sessionId !== null) body['providerSessionId'] = this.sessionId;
|
if (this.sessionId !== null) body['providerSessionId'] = this.sessionId;
|
||||||
|
|
||||||
const res = await postJson(
|
const res = await postJson(
|
||||||
@@ -276,9 +298,51 @@ export class VirtualLlmRegistrar {
|
|||||||
void this.handleInferTask(task);
|
void this.handleInferTask(task);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Wake tasks are reserved for v2 — acknowledge with an error so mcpd
|
if (task.kind === 'wake') {
|
||||||
// surfaces a clean failure rather than waiting forever.
|
void this.handleWakeTask(task);
|
||||||
void this.postResult(task.taskId, { error: 'wake task type not implemented in this client (v2)' });
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run the configured wake recipe and poll the provider until it comes
|
||||||
|
* up. Sends a `{ status: 200, body: { ok: true } }` result on success;
|
||||||
|
* `{ error }` on timeout or recipe failure. While waiting, also bumps
|
||||||
|
* the heartbeat so mcpd's GC sweep doesn't decide we're stale mid-wake.
|
||||||
|
*/
|
||||||
|
private async handleWakeTask(task: { kind: 'wake'; taskId: string; llmName: string }): Promise<void> {
|
||||||
|
const published = this.opts.publishedProviders.find((p) => p.provider.name === task.llmName);
|
||||||
|
if (published === undefined) {
|
||||||
|
await this.postResult(task.taskId, { error: `provider '${task.llmName}' not registered locally` });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (published.wake === undefined) {
|
||||||
|
await this.postResult(task.taskId, { error: `provider '${task.llmName}' has no wake recipe configured` });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await runWakeRecipe(published.wake);
|
||||||
|
// Poll isAvailable() until it comes up (or timeout). Heartbeat
|
||||||
|
// every poll tick so mcpd doesn't time us out while we're waiting
|
||||||
|
// on a slow boot.
|
||||||
|
const maxWaitMs = (published.wake.maxWaitSeconds ?? 60) * 1000;
|
||||||
|
const started = Date.now();
|
||||||
|
while (Date.now() - started < maxWaitMs) {
|
||||||
|
let alive = false;
|
||||||
|
try { alive = await published.provider.isAvailable(); } catch { alive = false; }
|
||||||
|
if (alive) {
|
||||||
|
await this.heartbeatOnce();
|
||||||
|
await this.postResult(task.taskId, { status: 200, body: { ok: true, ms: Date.now() - started } });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await this.heartbeatOnce();
|
||||||
|
await new Promise((r) => setTimeout(r, 1500));
|
||||||
|
}
|
||||||
|
await this.postResult(task.taskId, { error: `provider '${task.llmName}' did not come up within ${String(maxWaitMs)}ms` });
|
||||||
|
} catch (err) {
|
||||||
|
await this.postResult(task.taskId, { error: `wake recipe failed: ${(err as Error).message}` });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async handleInferTask(task: InferTask): Promise<void> {
|
private async handleInferTask(task: InferTask): Promise<void> {
|
||||||
@@ -373,6 +437,68 @@ function openAiStreamChunk(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a wake recipe. Returns when the recipe completes; throws if it
|
||||||
|
* fails. Doesn't itself poll for provider readiness — that's the caller's
|
||||||
|
* job (handleWakeTask polls isAvailable() with its own timeout).
|
||||||
|
*
|
||||||
|
* `http`: fires the configured request and considers any 2xx a success.
|
||||||
|
* The remote service is expected to be a "wake controller" that returns
|
||||||
|
* quickly; if the underlying boot takes minutes, the controller should
|
||||||
|
* return 202 and the readiness poll catches up.
|
||||||
|
*
|
||||||
|
* `command`: spawns the binary with args, waits for exit. Non-zero exit
|
||||||
|
* is treated as failure. stdout/stderr are discarded — the recipe's job
|
||||||
|
* is to *trigger* a wake, not to produce output.
|
||||||
|
*/
|
||||||
|
async function runWakeRecipe(recipe: WakeRecipe): Promise<void> {
|
||||||
|
if (recipe.type === 'http') {
|
||||||
|
const u = new URL(recipe.url);
|
||||||
|
const driver = u.protocol === 'https:' ? https : http;
|
||||||
|
const method = recipe.method ?? 'POST';
|
||||||
|
const headers: Record<string, string> = { ...(recipe.headers ?? {}) };
|
||||||
|
const body = recipe.body;
|
||||||
|
if (body !== undefined) {
|
||||||
|
headers['Content-Length'] = String(Buffer.byteLength(body));
|
||||||
|
}
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
const req = driver.request({
|
||||||
|
hostname: u.hostname,
|
||||||
|
port: u.port || (u.protocol === 'https:' ? 443 : 80),
|
||||||
|
path: u.pathname + u.search,
|
||||||
|
method,
|
||||||
|
headers,
|
||||||
|
timeout: 30_000,
|
||||||
|
}, (res) => {
|
||||||
|
const status = res.statusCode ?? 0;
|
||||||
|
// Drain so the socket can be reused/freed.
|
||||||
|
res.resume();
|
||||||
|
if (status >= 200 && status < 300) resolve();
|
||||||
|
else reject(new Error(`wake HTTP returned ${String(status)}`));
|
||||||
|
});
|
||||||
|
req.on('error', reject);
|
||||||
|
req.on('timeout', () => { req.destroy(); reject(new Error('wake HTTP timed out')); });
|
||||||
|
if (body !== undefined) req.write(body);
|
||||||
|
req.end();
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (recipe.type === 'command') {
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
const child = spawn(recipe.command, recipe.args ?? [], {
|
||||||
|
stdio: 'ignore',
|
||||||
|
});
|
||||||
|
child.on('error', reject);
|
||||||
|
child.on('exit', (code) => {
|
||||||
|
if (code === 0) resolve();
|
||||||
|
else reject(new Error(`wake command exited with code ${String(code)}`));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new Error(`unknown wake recipe type`);
|
||||||
|
}
|
||||||
|
|
||||||
interface PostResponse { statusCode: number; body: string }
|
interface PostResponse { statusCode: number; body: string }
|
||||||
|
|
||||||
/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */
|
/** Tiny JSON POST helper used by all of the registrar's mcpd calls. */
|
||||||
|
|||||||
@@ -142,13 +142,17 @@ describe('VirtualLlmRegistrar', () => {
|
|||||||
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||||
expect(registerCall).toBeDefined();
|
expect(registerCall).toBeDefined();
|
||||||
expect(registerCall!.method).toBe('POST');
|
expect(registerCall!.method).toBe('POST');
|
||||||
const body = JSON.parse(registerCall!.body) as { providers: Array<{ name: string; type: string; model: string; tier: string }> };
|
const body = JSON.parse(registerCall!.body) as { providers: Array<Record<string, unknown>> };
|
||||||
expect(body.providers).toEqual([{
|
expect(body.providers).toHaveLength(1);
|
||||||
|
expect(body.providers[0]).toMatchObject({
|
||||||
name: 'vllm-local',
|
name: 'vllm-local',
|
||||||
type: 'openai',
|
type: 'openai',
|
||||||
model: 'qwen',
|
model: 'qwen',
|
||||||
tier: 'fast',
|
tier: 'fast',
|
||||||
}]);
|
// v2 always sends initialStatus; defaults to 'active' when no
|
||||||
|
// wake recipe is configured.
|
||||||
|
initialStatus: 'active',
|
||||||
|
});
|
||||||
expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc');
|
expect(registerCall!.headers['authorization']).toBe('Bearer tok-abc');
|
||||||
|
|
||||||
// Sticky session id persisted.
|
// Sticky session id persisted.
|
||||||
@@ -219,6 +223,107 @@ describe('VirtualLlmRegistrar', () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── v2: hibernating + wake recipe ──
|
||||||
|
|
||||||
|
it('publishes initialStatus=hibernating when provider is unavailable AND wake is configured', async () => {
|
||||||
|
const fake = await startFakeServer();
|
||||||
|
try {
|
||||||
|
const sleeping: LlmProvider = {
|
||||||
|
name: 'vllm-local',
|
||||||
|
async complete() { throw new Error('not running'); },
|
||||||
|
async listModels() { return []; },
|
||||||
|
async isAvailable() { return false; },
|
||||||
|
};
|
||||||
|
const registrar = new VirtualLlmRegistrar({
|
||||||
|
mcpdUrl: fake.url,
|
||||||
|
token: 't',
|
||||||
|
publishedProviders: [{
|
||||||
|
provider: sleeping,
|
||||||
|
type: 'openai',
|
||||||
|
model: 'm',
|
||||||
|
wake: { type: 'http', url: 'http://localhost:9999/wake', maxWaitSeconds: 1 },
|
||||||
|
}],
|
||||||
|
sessionFilePath: join(tempDir, 'provider-session'),
|
||||||
|
log: silentLog(),
|
||||||
|
heartbeatIntervalMs: 60_000,
|
||||||
|
});
|
||||||
|
await registrar.start();
|
||||||
|
await new Promise((r) => setTimeout(r, 20));
|
||||||
|
|
||||||
|
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||||
|
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
|
||||||
|
expect(body.providers[0]!.initialStatus).toBe('hibernating');
|
||||||
|
registrar.stop();
|
||||||
|
} finally {
|
||||||
|
await fake.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('publishes initialStatus=active when provider is available even with a wake recipe', async () => {
|
||||||
|
const fake = await startFakeServer();
|
||||||
|
try {
|
||||||
|
const awake: LlmProvider = {
|
||||||
|
name: 'vllm-local',
|
||||||
|
async complete() { throw new Error('not used'); },
|
||||||
|
async listModels() { return []; },
|
||||||
|
async isAvailable() { return true; },
|
||||||
|
};
|
||||||
|
const registrar = new VirtualLlmRegistrar({
|
||||||
|
mcpdUrl: fake.url,
|
||||||
|
token: 't',
|
||||||
|
publishedProviders: [{
|
||||||
|
provider: awake,
|
||||||
|
type: 'openai',
|
||||||
|
model: 'm',
|
||||||
|
wake: { type: 'http', url: 'http://localhost:9999/wake' },
|
||||||
|
}],
|
||||||
|
sessionFilePath: join(tempDir, 'provider-session'),
|
||||||
|
log: silentLog(),
|
||||||
|
heartbeatIntervalMs: 60_000,
|
||||||
|
});
|
||||||
|
await registrar.start();
|
||||||
|
await new Promise((r) => setTimeout(r, 20));
|
||||||
|
|
||||||
|
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||||
|
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
|
||||||
|
expect(body.providers[0]!.initialStatus).toBe('active');
|
||||||
|
registrar.stop();
|
||||||
|
} finally {
|
||||||
|
await fake.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('publishes initialStatus=active when no wake recipe is configured (legacy path)', async () => {
|
||||||
|
const fake = await startFakeServer();
|
||||||
|
try {
|
||||||
|
// Provider intentionally returns false but has no wake recipe →
|
||||||
|
// legacy v1 publishers don't get hibernation behavior.
|
||||||
|
const sleeping: LlmProvider = {
|
||||||
|
name: 'vllm-local',
|
||||||
|
async complete() { return { content: '', toolCalls: [], usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0 }, finishReason: 'stop' }; },
|
||||||
|
async listModels() { return []; },
|
||||||
|
async isAvailable() { return false; },
|
||||||
|
};
|
||||||
|
const registrar = new VirtualLlmRegistrar({
|
||||||
|
mcpdUrl: fake.url,
|
||||||
|
token: 't',
|
||||||
|
publishedProviders: [{ provider: sleeping, type: 'openai', model: 'm' }],
|
||||||
|
sessionFilePath: join(tempDir, 'provider-session'),
|
||||||
|
log: silentLog(),
|
||||||
|
heartbeatIntervalMs: 60_000,
|
||||||
|
});
|
||||||
|
await registrar.start();
|
||||||
|
await new Promise((r) => setTimeout(r, 20));
|
||||||
|
|
||||||
|
const registerCall = fake.calls.find((c) => c.path === '/api/v1/llms/_provider-register');
|
||||||
|
const body = JSON.parse(registerCall!.body) as { providers: Array<{ initialStatus?: string }> };
|
||||||
|
expect(body.providers[0]!.initialStatus).toBe('active');
|
||||||
|
registrar.stop();
|
||||||
|
} finally {
|
||||||
|
await fake.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it('throws when mcpd returns non-201 from /_provider-register', async () => {
|
it('throws when mcpd returns non-201 from /_provider-register', async () => {
|
||||||
const fake = await startFakeServer();
|
const fake = await startFakeServer();
|
||||||
fake.handler = (_req, res, _body) => {
|
fake.handler = (_req, res, _body) => {
|
||||||
|
|||||||
@@ -207,3 +207,137 @@ describe('virtual-LLM smoke', () => {
|
|||||||
expect(res.body).toMatch(/publisher offline|inactive/);
|
expect(res.body).toMatch(/publisher offline|inactive/);
|
||||||
}, 30_000);
|
}, 30_000);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── v2: hibernating + wake-on-demand ──
|
||||||
|
|
||||||
|
const HIBERNATING_NAME = `smoke-virtual-hib-${SUFFIX}`;
|
||||||
|
let hibernatingRegistrar: VirtualLlmRegistrar | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provider that's "asleep" until \`wakeFn()\` is called. Used to drive
|
||||||
|
* the wake-on-demand smoke without standing up an actual sleep/wake
|
||||||
|
* controller — we flip the bool from inside the wake recipe.
|
||||||
|
*/
|
||||||
|
function makeSleepingProvider(name: string, content: string): {
|
||||||
|
provider: LlmProvider;
|
||||||
|
wakeFn: () => void;
|
||||||
|
wakeCount: () => number;
|
||||||
|
} {
|
||||||
|
let awake = false;
|
||||||
|
let count = 0;
|
||||||
|
const provider: LlmProvider = {
|
||||||
|
name,
|
||||||
|
async complete(): Promise<CompletionResult> {
|
||||||
|
if (!awake) throw new Error('provider not awake');
|
||||||
|
return {
|
||||||
|
content,
|
||||||
|
toolCalls: [],
|
||||||
|
usage: { promptTokens: 1, completionTokens: 4, totalTokens: 5 },
|
||||||
|
finishReason: 'stop',
|
||||||
|
};
|
||||||
|
},
|
||||||
|
async listModels() { return []; },
|
||||||
|
async isAvailable() { return awake; },
|
||||||
|
};
|
||||||
|
return {
|
||||||
|
provider,
|
||||||
|
wakeFn: () => { awake = true; count += 1; },
|
||||||
|
wakeCount: () => count,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('virtual-LLM smoke — wake-on-demand', () => {
|
||||||
|
let wakeServerUrl: string;
|
||||||
|
let wakeServer: http.Server;
|
||||||
|
let wakeFn: (() => void) | null = null;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
if (!mcpdUp) return;
|
||||||
|
// Tiny in-process "wake controller" — receives the http wake recipe
|
||||||
|
// POST and flips the local provider's `awake` bool.
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
wakeServer = http.createServer((req, res) => {
|
||||||
|
if (req.url === '/wake' && wakeFn !== null) {
|
||||||
|
wakeFn();
|
||||||
|
res.writeHead(200);
|
||||||
|
res.end('woken');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
res.writeHead(404);
|
||||||
|
res.end();
|
||||||
|
});
|
||||||
|
wakeServer.listen(0, '127.0.0.1', () => {
|
||||||
|
const addr = wakeServer.address();
|
||||||
|
if (addr === null || typeof addr === 'string') throw new Error('listen failed');
|
||||||
|
wakeServerUrl = `http://127.0.0.1:${String(addr.port)}/wake`;
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
if (hibernatingRegistrar !== null) hibernatingRegistrar.stop();
|
||||||
|
if (wakeServer) await new Promise<void>((r) => wakeServer.close(() => r()));
|
||||||
|
if (mcpdUp) {
|
||||||
|
const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||||
|
if (list.status === 200) {
|
||||||
|
const rows = JSON.parse(list.body) as Array<{ id: string; name: string }>;
|
||||||
|
const row = rows.find((r) => r.name === HIBERNATING_NAME);
|
||||||
|
if (row !== undefined) {
|
||||||
|
await httpRequest('DELETE', `${MCPD_URL}/api/v1/llms/${row.id}`, undefined);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('publishes a sleeping provider as kind=virtual / status=hibernating', async () => {
|
||||||
|
if (!mcpdUp) return;
|
||||||
|
const token = readToken();
|
||||||
|
if (token === null) return;
|
||||||
|
const sleeping = makeSleepingProvider(HIBERNATING_NAME, 'awake now');
|
||||||
|
wakeFn = sleeping.wakeFn;
|
||||||
|
|
||||||
|
const published: RegistrarPublishedProvider[] = [{
|
||||||
|
provider: sleeping.provider,
|
||||||
|
type: 'openai',
|
||||||
|
model: 'fake-hibernating',
|
||||||
|
tier: 'fast',
|
||||||
|
wake: { type: 'http', url: wakeServerUrl, method: 'POST', maxWaitSeconds: 5 },
|
||||||
|
}];
|
||||||
|
hibernatingRegistrar = new VirtualLlmRegistrar({
|
||||||
|
mcpdUrl: MCPD_URL,
|
||||||
|
token,
|
||||||
|
publishedProviders: published,
|
||||||
|
sessionFilePath: join(tempDir, 'hib-session'),
|
||||||
|
log: { info: () => {}, warn: () => {}, error: () => {} },
|
||||||
|
heartbeatIntervalMs: 60_000,
|
||||||
|
});
|
||||||
|
await hibernatingRegistrar.start();
|
||||||
|
await new Promise((r) => setTimeout(r, 400));
|
||||||
|
|
||||||
|
const res = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
const rows = JSON.parse(res.body) as Array<{ name: string; kind: string; status: string }>;
|
||||||
|
const row = rows.find((r) => r.name === HIBERNATING_NAME);
|
||||||
|
expect(row, `${HIBERNATING_NAME} must be present`).toBeDefined();
|
||||||
|
expect(row!.kind).toBe('virtual');
|
||||||
|
expect(row!.status).toBe('hibernating');
|
||||||
|
}, 30_000);
|
||||||
|
|
||||||
|
it('first inference triggers the wake recipe and then completes', async () => {
|
||||||
|
if (!mcpdUp) return;
|
||||||
|
// wakeFn was set in the previous test; it flips the provider's
|
||||||
|
// `awake` bool when the wake POST lands.
|
||||||
|
const res = await httpRequest('POST', `${MCPD_URL}/api/v1/llms/${HIBERNATING_NAME}/infer`, {
|
||||||
|
messages: [{ role: 'user', content: 'wake then say hello' }],
|
||||||
|
});
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
const body = JSON.parse(res.body) as { choices?: Array<{ message?: { content?: string } }> };
|
||||||
|
expect(body.choices?.[0]?.message?.content).toBe('awake now');
|
||||||
|
|
||||||
|
// After the wake, the row should now be active.
|
||||||
|
const list = await httpRequest('GET', `${MCPD_URL}/api/v1/llms`, undefined);
|
||||||
|
const rows = JSON.parse(list.body) as Array<{ name: string; status: string }>;
|
||||||
|
expect(rows.find((r) => r.name === HIBERNATING_NAME)?.status).toBe('active');
|
||||||
|
}, 30_000);
|
||||||
|
});
|
||||||
|
|||||||
@@ -1,8 +1,21 @@
|
|||||||
import { defineConfig } from 'vitest/config';
|
import { defineConfig } from 'vitest/config';
|
||||||
|
import { availableParallelism } from 'node:os';
|
||||||
|
|
||||||
|
// Default vitest's pool to ~half the CPU threads we have. The previous
|
||||||
|
// implicit default left this 64-thread workstation at ~10% utilization
|
||||||
|
// during `pnpm test:run`. Half is a soft cap that stays kind to laptops
|
||||||
|
// (8-thread → 4 workers) while letting beefy hosts push closer to the
|
||||||
|
// box's actual capacity. Override at run time with VITEST_MAX_THREADS.
|
||||||
|
const cores = availableParallelism();
|
||||||
|
const maxThreads = Number(process.env['VITEST_MAX_THREADS'] ?? Math.max(2, Math.floor(cores / 2)));
|
||||||
|
|
||||||
export default defineConfig({
|
export default defineConfig({
|
||||||
test: {
|
test: {
|
||||||
globals: true,
|
globals: true,
|
||||||
|
pool: 'threads',
|
||||||
|
poolOptions: {
|
||||||
|
threads: { maxThreads, minThreads: 1 },
|
||||||
|
},
|
||||||
coverage: {
|
coverage: {
|
||||||
provider: 'v8',
|
provider: 'v8',
|
||||||
reporter: ['text', 'json', 'html'],
|
reporter: ['text', 'json', 'html'],
|
||||||
|
|||||||
Reference in New Issue
Block a user