Files
mcpctl/src/mcpd/src/services/chat.service.ts
Michal faef1e732d feat(mcpd): personality routes + chat system block overlay (Stage 3)
End-to-end backend wiring for the agents-feature evolution. After
this stage you can curl all the endpoints; CLI + Web UI follow.

Routes (new):
  GET    /api/v1/agents/:agentName/personalities
  POST   /api/v1/agents/:agentName/personalities
  GET    /api/v1/personalities/:id
  PUT    /api/v1/personalities/:id
  DELETE /api/v1/personalities/:id
  GET    /api/v1/personalities/:id/prompts
  POST   /api/v1/personalities/:id/prompts
  DELETE /api/v1/personalities/:id/prompts/:promptId
  GET    /api/v1/agents/:agentName/prompts            (agent-direct)

Routes (extended):
  POST /api/v1/prompts now resolves `agent: <name>` like `project: <name>`
  POST /api/v1/agents/:name/chat accepts `personality: <name>`

RBAC: `personalities` segment maps to the `agents` resource so
view/edit/create/delete on the parent agent governs personality access.
No new RBAC roles — piggybacking keeps the surface flat.

System block (chat.service.ts):
  agent.systemPrompt
  + agent-direct prompts (Prompt.agentId === agent.id, priority desc)
  + project prompts        (existing behavior, priority desc)
  + personality prompts    (PersonalityPrompt[chosen], priority desc)
  + systemAppend

Personality is selected by request body `personality: <name>`, falling
back to `agent.defaultPersonalityId` if unset. A typo'd flag throws
404 rather than silently dropping back to no overlay — failing loudly
on misconfiguration is the only way users learn it didn't apply.

Backwards-compatible by construction: when no agent-direct prompts
exist and no personality is selected, the resulting block is byte-
identical to the old layout (verified by a regression test).

Tests: 5 new chat-service.test cases cover ordering, default-
personality fallback, missing-personality 404, and the regression
guard. mcpd suite: 801/801 (was 796). Typecheck clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 19:27:59 +01:00

667 lines
27 KiB
TypeScript

/**
* ChatService — orchestrates an agent's chat turn end-to-end.
*
* For one inbound chat call:
* 1. Resolve the agent → its Llm and (optional) Project.
* 2. Build messages: merged system block (agent.systemPrompt + project
* Prompts joined by priority desc) + persisted thread history + new
* user turn. Persist the user turn (status:complete) up front.
* 3. Enumerate tools from the project's MCP servers via the injected
* ToolDispatcher and translate to OpenAI function-tool format.
* 4. Loop (cap = MAX_ITERATIONS) calling the adapter:
* - if the model returns text → persist as assistant (complete), end.
* - if it returns tool_calls → persist assistant turn (pending) with
* the tool_calls JSON; for each call, dispatch through the
* ToolDispatcher; persist a tool turn with the result; flip the
* assistant turn to complete; loop.
* 5. On any exception, mark all `pending` rows in the thread as `error`
* and surface the error to the caller. No big DB transaction wraps the
* loop because tool calls can take minutes.
*
* Per-call params merge resolution: request body → agent.defaultParams →
* adapter default. `extra` is forwarded as-is for provider-specific knobs.
*/
import type { ChatMessage } from '@prisma/client';
import type { AgentService } from './agent.service.js';
import type { LlmService } from './llm.service.js';
import type { LlmAdapterRegistry } from './llm/dispatcher.js';
import type {
IChatRepository,
ChatRole,
} from '../repositories/chat.repository.js';
import type { IPromptRepository } from '../repositories/prompt.repository.js';
import type { IPersonalityRepository } from '../repositories/personality.repository.js';
import type { OpenAiChatRequest, OpenAiMessage } from './llm/types.js';
import type { AgentChatParams } from '../validation/agent.schema.js';
import { NotFoundError } from './mcp-server.service.js';
export const TOOL_NAME_SEPARATOR = '__';
/** Default tool-loop cap. Per-agent override via `agent.extras.maxIterations`, clamped to MIN..MAX. */
export const MAX_ITERATIONS = 12;
const MIN_ITERATIONS_CAP = 1;
const MAX_ITERATIONS_CAP = 50;
/**
* Resolve the loop cap for this turn:
* agent.extras.maxIterations → clamp(MIN_ITERATIONS_CAP, MAX_ITERATIONS_CAP) →
* fallback to MAX_ITERATIONS default.
*
* The clamp is the soft-DoS guard: a hostile agent definition can't pick a
* thousand-iteration cap, even with `create:agents` permission.
*/
function resolveMaxIterations(extras: Record<string, unknown> | null | undefined): number {
const raw = extras?.['maxIterations'];
if (typeof raw !== 'number' || !Number.isFinite(raw)) return MAX_ITERATIONS;
const truncated = Math.trunc(raw);
if (truncated < MIN_ITERATIONS_CAP) return MIN_ITERATIONS_CAP;
if (truncated > MAX_ITERATIONS_CAP) return MAX_ITERATIONS_CAP;
return truncated;
}
/** Project-scoped tool surface the chat loop calls into. Stub-friendly. */
export interface ChatTool {
/** Wire format: `<serverName>${TOOL_NAME_SEPARATOR}<toolName>`. */
name: string;
description: string;
parameters: Record<string, unknown>;
}
export interface ChatToolDispatcher {
/** List tools available to an agent's project. Empty if no project. */
listTools(projectId: string | null): Promise<ChatTool[]>;
/** Execute a tool call. Throws on error. */
callTool(args: {
projectId: string;
serverName: string;
toolName: string;
args: Record<string, unknown>;
}): Promise<unknown>;
}
export interface ChatStreamChunk {
/**
* Chunk type:
* - text: assistant text delta
* - thinking: reasoning_content delta (qwen3-thinking, o1, deepseek-reasoner
* etc. emit reasoning before content; surface it so the REPL can show
* "the model is thinking" instead of going silent for 30-90s)
* - tool_call: model decided to call a tool
* - tool_result: tool dispatch outcome
* - final: terminal turn (carries threadId/turnIndex)
* - error: fatal error in the loop
*/
type: 'text' | 'thinking' | 'tool_call' | 'tool_result' | 'final' | 'error';
delta?: string;
toolName?: string;
args?: Record<string, unknown>;
ok?: boolean;
threadId?: string;
turnIndex?: number;
message?: string;
}
export interface ChatRequestArgs {
agentName: string;
threadId?: string;
userMessage?: string;
/** Optional full-history override; if set, threadId history is ignored. */
messagesOverride?: OpenAiMessage[];
ownerId: string;
params?: AgentChatParams;
/**
* Personality overlay for this turn. If set, the personality's bound
* prompts are appended to the system block (additive). If unset, falls
* back to `agent.defaultPersonalityId`. If neither is present, today's
* behavior (no personality overlay) holds.
*/
personalityName?: string;
}
export interface ChatResult {
threadId: string;
assistant: string;
turnIndex: number;
}
export class ChatService {
constructor(
private readonly agents: AgentService,
private readonly llms: LlmService,
private readonly adapters: LlmAdapterRegistry,
private readonly chatRepo: IChatRepository,
private readonly promptRepo: IPromptRepository,
private readonly tools: ChatToolDispatcher,
private readonly personalities?: IPersonalityRepository,
) {}
async createThread(agentName: string, ownerId: string, title?: string): Promise<{ id: string }> {
const agent = await this.agents.getByName(agentName);
const thread = await this.chatRepo.createThread({
agentId: agent.id,
ownerId,
...(title !== undefined ? { title } : {}),
});
return { id: thread.id };
}
async listThreads(agentName: string, ownerId?: string): Promise<Array<{ id: string; title: string; lastTurnAt: Date; createdAt: Date }>> {
const agent = await this.agents.getByName(agentName);
const rows = await this.chatRepo.listThreadsByAgent(agent.id, ownerId);
return rows.map((r) => ({ id: r.id, title: r.title, lastTurnAt: r.lastTurnAt, createdAt: r.createdAt }));
}
async listMessages(threadId: string, ownerId: string): Promise<ChatMessage[]> {
// Owner check guards `view:agents` from leaking another user's thread by
// ID. Thread IDs are CUIDs (hard to guess) but they leak through SSE
// `final` chunks, the agents-plugin tool _meta, and several response
// bodies, so id-knowledge is not a security boundary on its own. Return
// 404 (not 403) on mismatch to avoid id-enumeration via differential
// status codes.
const thread = await this.chatRepo.findThread(threadId);
if (thread === null) throw new NotFoundError(`Thread not found: ${threadId}`);
if (thread.ownerId !== ownerId) throw new NotFoundError(`Thread not found: ${threadId}`);
return this.chatRepo.listMessages(threadId);
}
/** Non-streaming chat. Persists rows + returns the final assistant text. */
async chat(args: ChatRequestArgs): Promise<ChatResult> {
const ctx = await this.prepareContext(args);
let assistantFinal = '';
let lastTurnIndex = ctx.startingTurnIndex;
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const result = await adapter.infer({
body: this.buildBody(ctx),
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
});
const choice = extractChoice(result.body);
if (choice === null) {
throw new Error(`Adapter returned no choice (status ${String(result.status)})`);
}
if (choice.tool_calls !== undefined && choice.tool_calls.length > 0) {
const assistantTurn = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'assistant',
content: choice.content ?? '',
toolCalls: choice.tool_calls.map((c) => ({
id: c.id,
name: c.function.name,
arguments: safeParseJson(c.function.arguments),
})),
status: 'pending',
});
ctx.history.push({
role: 'assistant',
content: choice.content ?? '',
tool_calls: choice.tool_calls,
});
for (const call of choice.tool_calls) {
const toolResult = await this.dispatchTool(call.function.name, call.function.arguments, ctx.projectId);
const resultMsg = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'tool',
content: typeof toolResult === 'string' ? toolResult : JSON.stringify(toolResult),
toolCallId: call.id,
});
lastTurnIndex = resultMsg.turnIndex;
ctx.history.push({
role: 'tool',
content: typeof toolResult === 'string' ? toolResult : JSON.stringify(toolResult),
tool_call_id: call.id,
});
}
await this.chatRepo.updateStatus(assistantTurn.id, 'complete');
continue;
}
// Terminal text turn.
const finalMsg = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'assistant',
content: choice.content ?? '',
});
assistantFinal = choice.content ?? '';
lastTurnIndex = finalMsg.turnIndex;
await this.chatRepo.touchThread(ctx.threadId);
return { threadId: ctx.threadId, assistant: assistantFinal, turnIndex: lastTurnIndex };
}
throw new Error(`Chat loop exceeded ${String(ctx.maxIterations)} iterations without a terminal turn`);
} catch (err) {
await this.chatRepo.markPendingAsError(ctx.threadId);
throw err;
}
}
/** Streaming chat. Yields text deltas + tool events. Persists rows in lockstep. */
async *chatStream(args: ChatRequestArgs): AsyncGenerator<ChatStreamChunk> {
const ctx = await this.prepareContext(args);
try {
for (let i = 0; i < ctx.maxIterations; i += 1) {
const adapter = this.adapters.get(ctx.llmType);
const accumulated: { content: string; toolCalls: Array<{ id: string; name: string; argumentsJson: string }> } = {
content: '',
toolCalls: [],
};
let finishReason: string | null = null;
for await (const chunk of adapter.stream({
body: { ...this.buildBody(ctx), stream: true },
modelOverride: ctx.modelOverride,
apiKey: ctx.apiKey,
url: ctx.url,
extraConfig: ctx.extraConfig,
})) {
if (chunk.done === true) break;
if (chunk.data === '[DONE]') break;
const evt = parseStreamingChunk(chunk.data);
if (evt === null) continue;
if (evt.contentDelta !== undefined) {
accumulated.content += evt.contentDelta;
yield { type: 'text', delta: evt.contentDelta };
}
if (evt.reasoningDelta !== undefined) {
// Reasoning is not persisted to the thread (it's the model's
// scratchpad, not part of the conversation) — only streamed so
// the REPL can show progress while the model thinks.
yield { type: 'thinking', delta: evt.reasoningDelta };
}
if (evt.toolCallDeltas !== undefined) {
for (const td of evt.toolCallDeltas) {
const slot = (accumulated.toolCalls[td.index] ??= { id: '', name: '', argumentsJson: '' });
if (td.id !== undefined) slot.id = td.id;
if (td.name !== undefined) slot.name = td.name;
if (td.argumentsDelta !== undefined) slot.argumentsJson += td.argumentsDelta;
}
}
if (evt.finishReason !== null && evt.finishReason !== undefined) {
finishReason = evt.finishReason;
}
}
if (accumulated.toolCalls.length > 0 && finishReason === 'tool_calls') {
const assistantTurn = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'assistant',
content: accumulated.content,
toolCalls: accumulated.toolCalls.map((c) => ({
id: c.id,
name: c.name,
arguments: safeParseJson(c.argumentsJson),
})),
status: 'pending',
});
ctx.history.push({
role: 'assistant',
content: accumulated.content,
tool_calls: accumulated.toolCalls.map((c) => ({
id: c.id,
type: 'function',
function: { name: c.name, arguments: c.argumentsJson },
})),
});
for (const call of accumulated.toolCalls) {
yield { type: 'tool_call', toolName: call.name, args: safeParseJson(call.argumentsJson) as Record<string, unknown> };
try {
const result = await this.dispatchTool(call.name, call.argumentsJson, ctx.projectId);
const resultStr = typeof result === 'string' ? result : JSON.stringify(result);
await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'tool',
content: resultStr,
toolCallId: call.id,
});
ctx.history.push({ role: 'tool', content: resultStr, tool_call_id: call.id });
yield { type: 'tool_result', toolName: call.name, ok: true };
} catch (toolErr) {
const errMsg = (toolErr as Error).message;
await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'tool',
content: `error: ${errMsg}`,
toolCallId: call.id,
status: 'error',
});
ctx.history.push({ role: 'tool', content: `error: ${errMsg}`, tool_call_id: call.id });
yield { type: 'tool_result', toolName: call.name, ok: false };
}
}
await this.chatRepo.updateStatus(assistantTurn.id, 'complete');
continue;
}
const finalMsg = await this.chatRepo.appendMessage({
threadId: ctx.threadId,
role: 'assistant',
content: accumulated.content,
});
await this.chatRepo.touchThread(ctx.threadId);
yield { type: 'final', threadId: ctx.threadId, turnIndex: finalMsg.turnIndex };
return;
}
throw new Error(`Chat loop exceeded ${String(ctx.maxIterations)} iterations without a terminal turn`);
} catch (err) {
await this.chatRepo.markPendingAsError(ctx.threadId);
yield { type: 'error', message: (err as Error).message };
}
}
private async prepareContext(args: ChatRequestArgs): Promise<{
threadId: string;
history: OpenAiMessage[];
systemBlock: string;
llmName: string;
llmType: string;
modelOverride: string;
url: string;
apiKey: string;
extraConfig: Record<string, unknown>;
mergedParams: AgentChatParams;
toolList: ChatTool[];
projectId: string | null;
startingTurnIndex: number;
maxIterations: number;
}> {
const agent = await this.agents.getByName(args.agentName);
const llm = await this.llms.getByName(agent.llm.name);
const apiKey = await this.llms.resolveApiKey(agent.llm.name).catch(() => '');
const threadId = await this.resolveThreadId(args, agent.id);
const projectId = agent.project?.id ?? null;
// Project prompts (existing): only those whose projectId actually matches
// the agent's project — `findAll(projectId)` also returns globals which
// we exclude here so they don't double-count if a future change adds an
// explicit "global" injection step.
const projectPrompts = projectId !== null
? await this.promptRepo.findAll(projectId)
: [];
const sortedProjectPrompts = [...projectPrompts]
.filter((p) => p.projectId === projectId)
.sort((a, b) => b.priority - a.priority);
// Agent-direct prompts: always-on overlay scoped to this specific agent.
// Ordered after agent.systemPrompt and BEFORE project prompts so
// agent-specific tone/guardrails win over project-wide context.
const agentDirectPrompts = (await this.promptRepo.findByAgent(agent.id))
.sort((a, b) => b.priority - a.priority);
// Personality overlay: chooses by request-supplied name first, falling
// back to the agent's defaultPersonalityId. Without a personality this
// path is a no-op and the resulting block matches today's behavior.
const personalityPromptContents = await this.resolvePersonalityPrompts(args, agent);
const mergedParams: AgentChatParams = {
...(agent.defaultParams ?? {}),
...(args.params ?? {}),
};
const baseSystem = mergedParams.systemOverride ?? agent.systemPrompt;
const systemBlock = [
baseSystem,
...agentDirectPrompts.map((p) => p.content),
...sortedProjectPrompts.map((p) => p.content),
...personalityPromptContents,
mergedParams.systemAppend ?? '',
]
.filter((s) => s.length > 0)
.join('\n\n');
const history = args.messagesOverride !== undefined
? [...args.messagesOverride]
: await this.loadHistory(threadId);
let startingTurnIndex = await this.chatRepo.nextTurnIndex(threadId);
if (args.userMessage !== undefined && args.messagesOverride === undefined) {
const userTurn = await this.chatRepo.appendMessage({
threadId,
role: 'user',
content: args.userMessage,
});
startingTurnIndex = userTurn.turnIndex;
history.push({ role: 'user', content: args.userMessage });
}
const toolList = await this.tools.listTools(projectId);
const allowed = mergedParams.tools_allowlist;
const filteredTools = allowed === undefined
? toolList
: toolList.filter((t) => allowed.includes(t.name));
return {
threadId,
history,
systemBlock,
llmName: llm.name,
llmType: llm.type,
modelOverride: llm.model,
url: llm.url,
apiKey,
extraConfig: llm.extraConfig,
mergedParams,
toolList: filteredTools,
projectId,
startingTurnIndex,
maxIterations: resolveMaxIterations(agent.extras),
};
}
/**
* Resolves a personality (request override → agent default) and returns
* its bound prompt contents in `PersonalityPrompt.priority` desc order.
* Returns `[]` when no personality is selected, when the personality
* repository is not wired (legacy callers), or when the named personality
* doesn't exist on this agent. The "doesn't exist" case throws — typos in
* a CLI flag should fail loudly, not silently fall back to no overlay.
*/
private async resolvePersonalityPrompts(
args: ChatRequestArgs,
agent: Awaited<ReturnType<AgentService['getByName']>>,
): Promise<string[]> {
if (this.personalities === undefined) return [];
let personalityId: string | null = null;
if (args.personalityName !== undefined && args.personalityName !== '') {
const named = await this.personalities.findByNameAndAgent(args.personalityName, agent.id);
if (named === null) {
throw new NotFoundError(
`Personality not found on agent ${agent.name}: ${args.personalityName}`,
);
}
personalityId = named.id;
} else if (agent.defaultPersonality !== null) {
personalityId = agent.defaultPersonality.id;
}
if (personalityId === null) return [];
const bindings = await this.personalities.listPrompts(personalityId);
return [...bindings]
.sort((a, b) => b.priority - a.priority)
.map((b) => b.prompt.content);
}
private async resolveThreadId(args: ChatRequestArgs, agentId: string): Promise<string> {
if (args.threadId !== undefined) {
const existing = await this.chatRepo.findThread(args.threadId);
if (existing === null) throw new NotFoundError(`Thread not found: ${args.threadId}`);
return existing.id;
}
const created = await this.chatRepo.createThread({ agentId, ownerId: args.ownerId });
return created.id;
}
private async loadHistory(threadId: string): Promise<OpenAiMessage[]> {
const rows = await this.chatRepo.listMessages(threadId);
return rows
.filter((r) => r.status !== 'error')
.map<OpenAiMessage>((r) => {
const msg: OpenAiMessage = { role: r.role as ChatRole, content: r.content };
if (r.toolCallId !== null) msg.tool_call_id = r.toolCallId;
if (r.toolCalls !== null && Array.isArray(r.toolCalls)) {
const calls = r.toolCalls as Array<{ id: string; name: string; arguments: unknown }>;
msg.tool_calls = calls.map((c) => ({
id: c.id,
type: 'function' as const,
function: { name: c.name, arguments: typeof c.arguments === 'string' ? c.arguments : JSON.stringify(c.arguments) },
}));
}
return msg;
});
}
private buildBody(ctx: {
history: OpenAiMessage[];
systemBlock: string;
modelOverride: string;
mergedParams: AgentChatParams;
toolList: ChatTool[];
}): OpenAiChatRequest {
const messages: OpenAiMessage[] = [];
if (ctx.systemBlock.length > 0) {
messages.push({ role: 'system', content: ctx.systemBlock });
}
messages.push(...ctx.history);
const body: OpenAiChatRequest = {
model: ctx.modelOverride,
messages,
};
const p = ctx.mergedParams;
if (p.temperature !== undefined) body.temperature = p.temperature;
if (p.top_p !== undefined) body.top_p = p.top_p;
if (p.top_k !== undefined) (body as Record<string, unknown>)['top_k'] = p.top_k;
if (p.max_tokens !== undefined) body.max_tokens = p.max_tokens;
if (p.stop !== undefined) body.stop = p.stop;
if (p.presence_penalty !== undefined) (body as Record<string, unknown>)['presence_penalty'] = p.presence_penalty;
if (p.frequency_penalty !== undefined) (body as Record<string, unknown>)['frequency_penalty'] = p.frequency_penalty;
if (p.seed !== undefined) (body as Record<string, unknown>)['seed'] = p.seed;
if (p.response_format !== undefined) (body as Record<string, unknown>)['response_format'] = p.response_format;
if (p.tool_choice !== undefined) body.tool_choice = p.tool_choice;
if (ctx.toolList.length > 0) {
body.tools = ctx.toolList.map((t) => ({
type: 'function' as const,
function: { name: t.name, description: t.description, parameters: t.parameters },
}));
}
if (p.extra !== undefined) {
for (const [k, v] of Object.entries(p.extra)) {
(body as Record<string, unknown>)[k] = v;
}
}
return body;
}
private async dispatchTool(toolWireName: string, argsJson: string, projectId: string | null): Promise<unknown> {
if (projectId === null) {
throw new Error('Tool calls require an agent attached to a Project');
}
const sep = toolWireName.indexOf(TOOL_NAME_SEPARATOR);
if (sep === -1) {
throw new Error(`Tool name '${toolWireName}' missing '${TOOL_NAME_SEPARATOR}' separator`);
}
const serverName = toolWireName.slice(0, sep);
const toolName = toolWireName.slice(sep + TOOL_NAME_SEPARATOR.length);
const parsed = safeParseJson(argsJson) as Record<string, unknown>;
return this.tools.callTool({ projectId, serverName, toolName, args: parsed });
}
}
interface ExtractedChoice {
content: string | null;
tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string } }>;
}
function extractChoice(body: unknown): ExtractedChoice | null {
if (typeof body !== 'object' || body === null) return null;
const choices = (body as { choices?: unknown }).choices;
if (!Array.isArray(choices) || choices.length === 0) return null;
const first = choices[0] as { message?: { content?: unknown; tool_calls?: unknown } } | undefined;
if (first?.message === undefined) return null;
const content = typeof first.message.content === 'string' ? first.message.content : null;
const toolCalls = first.message.tool_calls;
const out: ExtractedChoice = { content };
if (Array.isArray(toolCalls)) {
out.tool_calls = toolCalls as NonNullable<ExtractedChoice['tool_calls']>;
}
return out;
}
function safeParseJson(s: string): unknown {
if (s === '') return {};
try {
return JSON.parse(s);
} catch {
return {};
}
}
interface ParsedStreamEvent {
contentDelta?: string;
/**
* Reasoning text emitted by thinking models (qwen3-thinking,
* deepseek-reasoner, OpenAI o1 family). Different providers spell the
* field differently — we accept both `reasoning_content` (qwen, deepseek)
* and `reasoning` (some o1 variants) and the older `provider_specific_fields.reasoning`
* shape that LiteLLM passes through from vLLM.
*/
reasoningDelta?: string;
toolCallDeltas?: Array<{ index: number; id?: string; name?: string; argumentsDelta?: string }>;
finishReason?: string | null;
}
function parseStreamingChunk(data: string): ParsedStreamEvent | null {
if (data === '' || data === '[DONE]') return null;
let json: unknown;
try {
json = JSON.parse(data);
} catch {
return null;
}
if (typeof json !== 'object' || json === null) return null;
const choices = (json as { choices?: unknown }).choices;
if (!Array.isArray(choices) || choices.length === 0) return null;
const c = choices[0] as {
delta?: {
content?: unknown;
reasoning_content?: unknown;
reasoning?: unknown;
tool_calls?: unknown;
provider_specific_fields?: { reasoning_content?: unknown; reasoning?: unknown };
};
finish_reason?: unknown;
};
const evt: ParsedStreamEvent = {};
const delta = c.delta;
if (delta !== undefined) {
if (typeof delta.content === 'string' && delta.content.length > 0) {
evt.contentDelta = delta.content;
}
// Try the standard fields first, then the LiteLLM passthrough shape.
const reasoning =
(typeof delta.reasoning_content === 'string' && delta.reasoning_content.length > 0 ? delta.reasoning_content : undefined)
?? (typeof delta.reasoning === 'string' && delta.reasoning.length > 0 ? delta.reasoning : undefined)
?? (typeof delta.provider_specific_fields?.reasoning_content === 'string' && delta.provider_specific_fields.reasoning_content.length > 0 ? delta.provider_specific_fields.reasoning_content : undefined)
?? (typeof delta.provider_specific_fields?.reasoning === 'string' && delta.provider_specific_fields.reasoning.length > 0 ? delta.provider_specific_fields.reasoning : undefined);
if (reasoning !== undefined) {
evt.reasoningDelta = reasoning;
}
if (Array.isArray(delta.tool_calls)) {
evt.toolCallDeltas = (delta.tool_calls as Array<{
index: number;
id?: string;
function?: { name?: string; arguments?: string };
}>).map((t) => {
const td: { index: number; id?: string; name?: string; argumentsDelta?: string } = { index: t.index };
if (t.id !== undefined) td.id = t.id;
if (t.function?.name !== undefined) td.name = t.function.name;
if (t.function?.arguments !== undefined) td.argumentsDelta = t.function.arguments;
return td;
});
}
}
if (c.finish_reason !== undefined) {
evt.finishReason = (c.finish_reason as string | null);
}
return evt;
}