feat: install logging, error trapping, PXE/ISO integration tests
Some checks failed
CI/CD / lint (pull_request) Failing after 13s
CI/CD / test (pull_request) Failing after 10s
CI/CD / typecheck (pull_request) Failing after 36s
CI/CD / build (pull_request) Has been skipped
CI/CD / publish-rpm (pull_request) Has been skipped
CI/CD / publish-deb (pull_request) Has been skipped

Kickstart installs on real hardware failed silently — no error reporting,
only 3 progress callbacks, zero log streaming. This overhaul makes every
install fully observable.

Kickstart improvements:
- Error trapping in %pre and %post (trap ERR sends failure details to bastion)
- 12+ granular progress stages (was 3): SSH, hostname, k3s prep, EFI boot, metadata
- Background log streamer: tails %post output and batch-sends to /api/log
- bastion_log() function for explicit log lines from kickstart scripts

Bastion API:
- POST /api/log — receives raw log lines from kickstart (single or batch)
- InstallLogBuffer — per-MAC ring buffer (2000 lines) + file persistence
- GET /api/logs/:mac — now returns log_lines + log_total alongside stages
- SSE /api/logs/:mac/follow — uses named events (event: stage vs event: log)
- Progress events forwarded to labd via bastion-progress WebSocket message
- Post-provision k3s logs routed through progressBus (was console-only)

dnsmasq fixes found during VM testing:
- HTTP Boot filename: ipxe-real.efi → ipxe.efi (leftover from old 2-stage approach)
- pxe-service directives: only in proxy mode (breaks OVMF PXE in full mode)
- PXEClient vendor class echo for UEFI firmware compatibility

Integration tests:
- PXE boot test: blank UEFI VM → dnsmasq → HTTP Boot → iPXE → bastion → install
- ISO boot test: blank VM boots from bastion-generated ISO → same flow
- Shared helpers: pxe-network (no DHCP, nftables fix), pxe-vm (UEFI + ISO boot)
- test-provision.sh: runs both PXE + ISO tests with prerequisite checks
- 250GB sparse QCOW2 disk (LVM layout needs ~204GB)

201 unit tests passing (11 new).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-03-26 22:26:33 +00:00
parent ffc4a782d2
commit 46b017d77e
189 changed files with 16241 additions and 432 deletions

View File

@@ -0,0 +1,24 @@
{
"name": "@lab/agent",
"version": "0.1.0",
"private": true,
"type": "module",
"main": "./dist/main.js",
"types": "./dist/main.d.ts",
"scripts": {
"build": "tsc --build",
"clean": "rimraf dist"
},
"dependencies": {
"@lab/shared": "workspace:*",
"winston": "^3.17.0",
"winston-daily-rotate-file": "^5.0.0",
"ws": "^8.19.0"
},
"devDependencies": {
"@types/node": "^22.14.1",
"@types/ws": "^8.18.1",
"rimraf": "^6.1.3",
"typescript": "^5.9.3"
}
}

View File

@@ -0,0 +1,10 @@
/**
* @lab/agent — Lab agent daemon entry point.
*
* For now this module re-exports the command executor so it can be consumed
* by other packages in the monorepo.
*/
export { CommandExecutor } from "./services/executor.js";
export type { ExecOptions, ExecResult } from "./services/executor.js";
export { AgentConnection, type ConnectionConfig, type ConnectionState, DEFAULT_CONNECTION_CONFIG } from "./services/connection.js";

View File

@@ -0,0 +1,157 @@
// Agent WebSocket connection to labd with heartbeat and reconnection.
import { EventEmitter } from "node:events";
import { hostname } from "node:os";
import { readFileSync } from "node:fs";
import WebSocket from "ws";
import type { AgentMessage, ServerMessage } from "@lab/shared";
import { parseServerMessage } from "@lab/shared";
export type ConnectionState = "disconnected" | "connecting" | "connected" | "reconnecting";
export interface ConnectionConfig {
labdUrl: string;
certPath: string;
keyPath: string;
caPath?: string;
heartbeatIntervalMs: number;
reconnectBaseDelayMs: number;
reconnectMaxDelayMs: number;
}
export const DEFAULT_CONNECTION_CONFIG: Partial<ConnectionConfig> = {
heartbeatIntervalMs: 10_000,
reconnectBaseDelayMs: 1_000,
reconnectMaxDelayMs: 30_000,
};
export class AgentConnection extends EventEmitter {
private ws: WebSocket | null = null;
private heartbeatTimer: NodeJS.Timeout | null = null;
private reconnectAttempts = 0;
private isClosing = false;
private _state: ConnectionState = "disconnected";
constructor(private config: ConnectionConfig) {
super();
}
get state(): ConnectionState {
return this._state;
}
isConnected(): boolean {
return this._state === "connected";
}
async connect(): Promise<void> {
if (this.isClosing) return;
this.setState(this.reconnectAttempts > 0 ? "reconnecting" : "connecting");
const wsUrl = this.config.labdUrl.replace("https:", "wss:").replace("http:", "ws:") + "/ws/agent";
try {
this.ws = new WebSocket(wsUrl, {
cert: readFileSync(this.config.certPath),
key: readFileSync(this.config.keyPath),
ca: this.config.caPath ? readFileSync(this.config.caPath) : undefined,
rejectUnauthorized: true,
});
this.ws.on("open", () => {
this.reconnectAttempts = 0;
this.setState("connected");
this.startHeartbeat();
this.emit("connected");
});
this.ws.on("message", (data: Buffer) => {
try {
const message = parseServerMessage(data.toString());
this.handleMessage(message);
this.emit("message", message);
} catch {
// Ignore unparseable messages
}
});
this.ws.on("close", (_code: number, _reason: Buffer) => {
this.stopHeartbeat();
this.setState("disconnected");
this.emit("disconnected");
this.scheduleReconnect();
});
this.ws.on("error", (_error: Error) => {
// Error is followed by close event, so reconnect happens there
});
} catch {
this.scheduleReconnect();
}
}
send(message: AgentMessage): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
close(): void {
this.isClosing = true;
this.stopHeartbeat();
this.ws?.close();
this.setState("disconnected");
}
private handleMessage(message: ServerMessage): void {
if (message.type === "server-shutdown") {
this.isClosing = true; // Don't reconnect
this.emit("shutdown", message.reconnectAfter);
}
}
private startHeartbeat(): void {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
this.send({
type: "heartbeat",
hostname: hostname(),
uptime: process.uptime(),
version: process.env["npm_package_version"] ?? "0.0.0",
memUsage: process.memoryUsage().heapUsed,
cpuUsage: 0, // Simplified — os.loadavg() not available everywhere
});
}, this.config.heartbeatIntervalMs);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
private scheduleReconnect(): void {
if (this.isClosing) return;
const delay = Math.min(
this.config.reconnectBaseDelayMs * Math.pow(2, this.reconnectAttempts),
this.config.reconnectMaxDelayMs,
);
this.reconnectAttempts++;
this.setState("reconnecting");
setTimeout(() => {
void this.connect();
}, delay);
}
private setState(state: ConnectionState): void {
if (this._state !== state) {
this._state = state;
this.emit("stateChange", state);
}
}
}

View File

@@ -0,0 +1,161 @@
import { EventEmitter } from "node:events";
import { spawn, type ChildProcess } from "node:child_process";
/** Options for executing a command. */
export interface ExecOptions {
/** The command and its arguments, e.g. ["ls", "-la"]. */
command: string[];
/** Maximum execution time in milliseconds. */
timeout: number;
/** Whether to allocate a pseudo-TTY. */
tty: boolean;
/** Optional environment variables (merged with process.env). */
env?: Record<string, string>;
/** Optional working directory. */
cwd?: string;
}
/** Result returned after a command finishes. */
export interface ExecResult {
exitCode: number;
stdout: string;
stderr: string;
timedOut: boolean;
signal?: string | undefined;
}
export interface CommandExecutorEvents {
stdout: [requestId: string, chunk: Buffer];
stderr: [requestId: string, chunk: Buffer];
}
/**
* Executes commands in a sandboxed child process with timeout handling
* and streaming output via events.
*/
export class CommandExecutor extends EventEmitter<CommandExecutorEvents> {
private readonly processes = new Map<string, ChildProcess>();
/** Grace period between SIGTERM and SIGKILL when a timeout fires (ms). */
private static readonly KILL_GRACE_MS = 5_000;
/**
* Execute a command and return its result once it exits.
*
* While the process is running, `stdout` and `stderr` events are emitted
* with `(requestId, chunk)` so callers can stream output in real time.
*/
execute(requestId: string, options: ExecOptions): Promise<ExecResult> {
const { command, timeout, tty, env, cwd } = options;
const [cmd, ...args] = command;
if (cmd === undefined) {
return Promise.resolve({
exitCode: 1,
stdout: "",
stderr: "Empty command",
timedOut: false,
});
}
return new Promise<ExecResult>((resolve) => {
const child = spawn(cmd, args, {
cwd,
env: env ? { ...process.env, ...env } : undefined,
stdio: tty ? ["pipe", "pipe", "pipe"] : ["pipe", "pipe", "pipe"],
// When TTY support is needed the caller should use node-pty or
// similar; for now we always use pipe-based stdio.
});
this.processes.set(requestId, child);
let stdoutBuf = "";
let stderrBuf = "";
let timedOut = false;
let killTimer: ReturnType<typeof setTimeout> | undefined;
// -- Streaming output ------------------------------------------------
child.stdout?.on("data", (chunk: Buffer) => {
stdoutBuf += chunk.toString();
this.emit("stdout", requestId, chunk);
});
child.stderr?.on("data", (chunk: Buffer) => {
stderrBuf += chunk.toString();
this.emit("stderr", requestId, chunk);
});
// -- Timeout handling -------------------------------------------------
const timeoutTimer = setTimeout(() => {
timedOut = true;
// Graceful shutdown first.
child.kill("SIGTERM");
// If the process does not exit within the grace period, force-kill.
killTimer = setTimeout(() => {
child.kill("SIGKILL");
}, CommandExecutor.KILL_GRACE_MS);
}, timeout);
// -- Completion -------------------------------------------------------
child.on("close", (code, signal) => {
clearTimeout(timeoutTimer);
if (killTimer !== undefined) {
clearTimeout(killTimer);
}
this.processes.delete(requestId);
resolve({
exitCode: code ?? 1,
stdout: stdoutBuf,
stderr: stderrBuf,
timedOut,
signal: signal ?? undefined,
});
});
child.on("error", (err) => {
clearTimeout(timeoutTimer);
if (killTimer !== undefined) {
clearTimeout(killTimer);
}
this.processes.delete(requestId);
resolve({
exitCode: 1,
stdout: stdoutBuf,
stderr: err.message,
timedOut: false,
});
});
});
}
/**
* Send a signal to a running process.
*
* @returns `true` if the process was found and the signal was sent.
*/
sendSignal(requestId: string, signal: NodeJS.Signals): boolean {
const child = this.processes.get(requestId);
if (!child) {
return false;
}
return child.kill(signal);
}
/**
* Write data to the stdin of a running process.
*
* @returns `true` if the process was found and stdin was writable.
*/
writeStdin(requestId: string, data: string): boolean {
const child = this.processes.get(requestId);
if (!child?.stdin || child.stdin.destroyed) {
return false;
}
return child.stdin.write(data);
}
}

View File

@@ -0,0 +1,38 @@
import winston from "winston";
import DailyRotateFile from "winston-daily-rotate-file";
const LOG_DIR = process.env["LOG_DIR"] ?? "/var/log/lab-agent";
const logger = winston.createLogger({
level: process.env["LOG_LEVEL"] ?? "info",
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json(),
),
transports: [
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple(),
),
}),
new DailyRotateFile({
dirname: LOG_DIR,
filename: "agent-%DATE%.log",
maxSize: "20m",
maxFiles: "14d",
}),
],
});
/**
* Create a child logger scoped to a specific component.
*
* The returned logger inherits all transports and configuration from the root
* logger but attaches a `component` metadata field to every log entry.
*/
export function createChildLogger(component: string): winston.Logger {
return logger.child({ component });
}
export { logger };

View File

@@ -0,0 +1,111 @@
// Tests for CommandExecutor.
import { describe, it, expect } from "vitest";
import { CommandExecutor } from "../src/services/executor.js";
describe("CommandExecutor", () => {
it("executes a simple command", async () => {
const exec = new CommandExecutor();
const result = await exec.execute("req-1", {
command: ["echo", "hello"],
timeout: 5000,
tty: false,
});
expect(result.exitCode).toBe(0);
expect(result.stdout.trim()).toBe("hello");
expect(result.timedOut).toBe(false);
});
it("captures stderr", async () => {
const exec = new CommandExecutor();
const result = await exec.execute("req-2", {
command: ["sh", "-c", "echo err >&2"],
timeout: 5000,
tty: false,
});
expect(result.exitCode).toBe(0);
expect(result.stderr.trim()).toBe("err");
});
it("returns non-zero exit code", async () => {
const exec = new CommandExecutor();
const result = await exec.execute("req-3", {
command: ["sh", "-c", "exit 42"],
timeout: 5000,
tty: false,
});
expect(result.exitCode).toBe(42);
});
it("times out long-running commands", async () => {
const exec = new CommandExecutor();
const result = await exec.execute("req-4", {
command: ["sleep", "60"],
timeout: 200,
tty: false,
});
expect(result.timedOut).toBe(true);
}, 10_000);
it("emits stdout events for streaming", async () => {
const exec = new CommandExecutor();
const chunks: string[] = [];
exec.on("stdout", (_reqId: string, chunk: string) => {
chunks.push(chunk);
});
await exec.execute("req-5", {
command: ["echo", "streamed"],
timeout: 5000,
tty: false,
});
expect(chunks.join("").trim()).toBe("streamed");
});
it("sends signal to running process", async () => {
const exec = new CommandExecutor();
// Start a long process
const promise = exec.execute("req-6", {
command: ["sleep", "60"],
timeout: 30000,
tty: false,
});
// Give it time to start
await new Promise((r) => setTimeout(r, 100));
const sent = exec.sendSignal("req-6", "SIGTERM");
expect(sent).toBe(true);
const result = await promise;
expect(result.exitCode).not.toBe(0);
}, 10_000);
it("sendSignal returns false for unknown request", () => {
const exec = new CommandExecutor();
expect(exec.sendSignal("nonexistent", "SIGTERM")).toBe(false);
});
it("uses custom cwd", async () => {
const exec = new CommandExecutor();
const result = await exec.execute("req-7", {
command: ["pwd"],
timeout: 5000,
tty: false,
cwd: "/tmp",
});
expect(result.stdout.trim()).toBe("/tmp");
});
it("uses custom env", async () => {
const exec = new CommandExecutor();
const result = await exec.execute("req-8", {
command: ["sh", "-c", "echo $MY_VAR"],
timeout: 5000,
tty: false,
env: { MY_VAR: "test_value" },
});
expect(result.stdout.trim()).toBe("test_value");
});
});

View File

@@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist",
"composite": true
},
"include": ["src/**/*.ts"],
"references": [
{ "path": "../shared" }
]
}