feat(chat): surface reasoning_content as thinking chunks; fix --no-stream timeout

Reasoning models (qwen3-thinking, deepseek-reasoner, OpenAI o1 family) emit
their scratchpad as `delta.reasoning_content` (or `delta.reasoning`,
or `delta.provider_specific_fields.reasoning_content` when LiteLLM passes
through from vLLM) — separate from `delta.content`. Before this commit
mcpd's parseStreamingChunk only watched `content`, so the model's 30-90s
reasoning phase looked like dead air to the REPL: streaming connection
open, no chunks, no progress. Caught during the agents-feature shakedown
when qwen3-thinking sat silent for 90s on a docmost__list_pages call.

mcpd
====
chat.service.ts
  - parseStreamingChunk extracts a `reasoningDelta` from the chunk body,
    accepting all four spellings (reasoning_content / reasoning /
    provider_specific_fields.{reasoning_content,reasoning}). Future
    providers can add their own field names by extending the
    fallback chain.
  - chatStream yields `{ type: 'thinking', delta }` chunks as reasoning
    arrives, alongside the existing `{ type: 'text', delta }` for content.
  - Reasoning is intentionally NOT persisted to the thread. It's the
    model's scratchpad, not part of the conversation. Subsequent turns
    don't see it.
  - Adds 'thinking' to the ChatStreamChunk.type union.

CLI
===
chat.ts
  - streamOnce handles 'thinking' chunks: writes them dim+italic to
    stderr (ANSI 2;3m) so the model's reasoning visually flows like a
    quote block while the final answer streams to stdout. Plain text
    when stderr isn't a TTY (pipe to file → no escape codes leak).
  - chatRequestNonStream replaces the shared ApiClient.post() for the
    --no-stream path. ApiClient defaults to a 10s timeout, way too tight
    for any chat that calls a tool: LLM round + tool dispatch + LLM
    summary easily exceeds 10s. The new helper uses the same 600s timeout
    the streaming path has been using all along.

Tests:
  chat-service.test.ts (+2):
    - reasoning_content deltas surface as `thinking` chunks (not text);
      reasoning is NOT persisted to the assistant turn's content.
    - LiteLLM's provider_specific_fields.reasoning_content shape parses
      identically to the vendor-native shape.

mcpd 777/777, cli 430/430.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michal
2026-04-26 17:04:01 +01:00
parent cc225eb70f
commit 7cfa449465
3 changed files with 189 additions and 11 deletions

View File

@@ -141,10 +141,7 @@ async function runOneShot(
if (stream === false) {
const body: Record<string, unknown> = { message, ...overrides };
if (threadId !== undefined) body.threadId = threadId;
const res = await deps.client.post<{ assistant: string; threadId: string; turnIndex: number }>(
`/api/v1/agents/${encodeURIComponent(agent)}/chat`,
body,
);
const res = await chatRequestNonStream(deps, agent, body);
process.stdout.write(`${res.assistant}\n`);
process.stderr.write(`(thread: ${res.threadId})\n`);
return;
@@ -188,10 +185,7 @@ async function runRepl(
if (stream === false) {
const body: Record<string, unknown> = { message: line, ...overrides };
if (threadId !== undefined) body.threadId = threadId;
const res = await deps.client.post<{ assistant: string; threadId: string }>(
`/api/v1/agents/${encodeURIComponent(agent)}/chat`,
body,
);
const res = await chatRequestNonStream(deps, agent, body);
threadId = res.threadId;
process.stdout.write(`${res.assistant}\n`);
} else {
@@ -306,6 +300,60 @@ function applySetCommand(o: Overrides, key: string, valueRaw: string): void {
}
}
/**
* Non-streaming POST to the chat endpoint. Uses the SAME 10-minute timeout
* as the streaming path — `deps.client.post` (the shared ApiClient) defaults
* to 10s, which is too tight for any chat that calls a tool. Returns the
* parsed JSON body on 2xx, throws on 4xx/5xx with the response body.
*/
async function chatRequestNonStream(
deps: ChatCommandDeps,
agent: string,
body: Record<string, unknown>,
): Promise<{ assistant: string; threadId: string; turnIndex: number }> {
const url = new URL(`${deps.baseUrl}/api/v1/agents/${encodeURIComponent(agent)}/chat`);
const payload = JSON.stringify(body);
return new Promise((resolve, reject) => {
const driver = url.protocol === 'https:' ? https : http;
const req = driver.request({
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 443 : 80),
path: url.pathname + url.search,
method: 'POST',
timeout: STREAM_TIMEOUT_MS,
headers: {
'Content-Type': 'application/json',
Accept: 'application/json',
...(deps.token !== undefined ? { Authorization: `Bearer ${deps.token}` } : {}),
},
}, (res) => {
const status = res.statusCode ?? 0;
const chunks: Buffer[] = [];
res.on('data', (c: Buffer) => chunks.push(c));
res.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf-8');
if (status >= 400) {
reject(new Error(`HTTP ${String(status)}: ${raw}`));
return;
}
try {
resolve(JSON.parse(raw) as { assistant: string; threadId: string; turnIndex: number });
} catch (err) {
reject(new Error(`malformed response: ${(err as Error).message}`));
}
});
res.on('error', reject);
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error('chat request timed out'));
});
req.write(payload);
req.end();
});
}
/** Stream a single chat call. Returns the resolved threadId. */
async function streamOnce(
deps: ChatCommandDeps,
@@ -356,6 +404,13 @@ async function streamOnce(
case 'text':
if (typeof evt.delta === 'string') process.stdout.write(evt.delta);
break;
case 'thinking':
// Reasoning models (qwen3-thinking, deepseek-reasoner, o1
// family) emit this for tens of seconds before producing
// any content delta. Show it dim+italic on stderr so the
// final answer (stdout) stays clean for grepping/redirect.
if (typeof evt.delta === 'string') process.stderr.write(styleThinking(evt.delta));
break;
case 'tool_call':
process.stderr.write(`\n[tool_call: ${evt.toolName ?? ''}]\n`);
break;
@@ -389,7 +444,7 @@ async function streamOnce(
}
interface ChatStreamFrame {
type: 'text' | 'tool_call' | 'tool_result' | 'final' | 'error';
type: 'text' | 'thinking' | 'tool_call' | 'tool_result' | 'final' | 'error';
delta?: string;
toolName?: string;
ok?: boolean;
@@ -398,6 +453,16 @@ interface ChatStreamFrame {
message?: string;
}
// ANSI codes for the reasoning sidebar. Dim + italic visually separates
// reasoning ("the model is thinking") from final assistant content. We only
// emit the codes when stderr is a TTY — piping to a file should stay clean.
const ANSI_DIM_ITALIC = '\x1b[2;3m';
const ANSI_RESET = '\x1b[0m';
const STDERR_IS_TTY = process.stderr.isTTY === true;
function styleThinking(s: string): string {
return STDERR_IS_TTY ? `${ANSI_DIM_ITALIC}${s}${ANSI_RESET}` : s;
}
function collect(value: string, prev: string[]): string[] {
return [...prev, value];
}

View File

@@ -78,7 +78,18 @@ export interface ChatToolDispatcher {
}
export interface ChatStreamChunk {
type: 'text' | 'tool_call' | 'tool_result' | 'final' | 'error';
/**
* Chunk type:
* - text: assistant text delta
* - thinking: reasoning_content delta (qwen3-thinking, o1, deepseek-reasoner
* etc. emit reasoning before content; surface it so the REPL can show
* "the model is thinking" instead of going silent for 30-90s)
* - tool_call: model decided to call a tool
* - tool_result: tool dispatch outcome
* - final: terminal turn (carries threadId/turnIndex)
* - error: fatal error in the loop
*/
type: 'text' | 'thinking' | 'tool_call' | 'tool_result' | 'final' | 'error';
delta?: string;
toolName?: string;
args?: Record<string, unknown>;
@@ -241,6 +252,12 @@ export class ChatService {
accumulated.content += evt.contentDelta;
yield { type: 'text', delta: evt.contentDelta };
}
if (evt.reasoningDelta !== undefined) {
// Reasoning is not persisted to the thread (it's the model's
// scratchpad, not part of the conversation) — only streamed so
// the REPL can show progress while the model thinks.
yield { type: 'thinking', delta: evt.reasoningDelta };
}
if (evt.toolCallDeltas !== undefined) {
for (const td of evt.toolCallDeltas) {
const slot = (accumulated.toolCalls[td.index] ??= { id: '', name: '', argumentsJson: '' });
@@ -520,6 +537,14 @@ function safeParseJson(s: string): unknown {
interface ParsedStreamEvent {
contentDelta?: string;
/**
* Reasoning text emitted by thinking models (qwen3-thinking,
* deepseek-reasoner, OpenAI o1 family). Different providers spell the
* field differently — we accept both `reasoning_content` (qwen, deepseek)
* and `reasoning` (some o1 variants) and the older `provider_specific_fields.reasoning`
* shape that LiteLLM passes through from vLLM.
*/
reasoningDelta?: string;
toolCallDeltas?: Array<{ index: number; id?: string; name?: string; argumentsDelta?: string }>;
finishReason?: string | null;
}
@@ -535,13 +560,31 @@ function parseStreamingChunk(data: string): ParsedStreamEvent | null {
if (typeof json !== 'object' || json === null) return null;
const choices = (json as { choices?: unknown }).choices;
if (!Array.isArray(choices) || choices.length === 0) return null;
const c = choices[0] as { delta?: { content?: unknown; tool_calls?: unknown }; finish_reason?: unknown };
const c = choices[0] as {
delta?: {
content?: unknown;
reasoning_content?: unknown;
reasoning?: unknown;
tool_calls?: unknown;
provider_specific_fields?: { reasoning_content?: unknown; reasoning?: unknown };
};
finish_reason?: unknown;
};
const evt: ParsedStreamEvent = {};
const delta = c.delta;
if (delta !== undefined) {
if (typeof delta.content === 'string' && delta.content.length > 0) {
evt.contentDelta = delta.content;
}
// Try the standard fields first, then the LiteLLM passthrough shape.
const reasoning =
(typeof delta.reasoning_content === 'string' && delta.reasoning_content.length > 0 ? delta.reasoning_content : undefined)
?? (typeof delta.reasoning === 'string' && delta.reasoning.length > 0 ? delta.reasoning : undefined)
?? (typeof delta.provider_specific_fields?.reasoning_content === 'string' && delta.provider_specific_fields.reasoning_content.length > 0 ? delta.provider_specific_fields.reasoning_content : undefined)
?? (typeof delta.provider_specific_fields?.reasoning === 'string' && delta.provider_specific_fields.reasoning.length > 0 ? delta.provider_specific_fields.reasoning : undefined);
if (reasoning !== undefined) {
evt.reasoningDelta = reasoning;
}
if (Array.isArray(delta.tool_calls)) {
evt.toolCallDeltas = (delta.tool_calls as Array<{
index: number;

View File

@@ -411,6 +411,76 @@ describe('ChatService', () => {
expect(ctx.body.tools?.[0]?.function.name).toBe(`s1${TOOL_NAME_SEPARATOR}a`);
});
// Regression: reasoning_content (qwen3-thinking, deepseek-reasoner, o1)
// streams as `thinking` chunks, separate from `text`.
// Without this, the model's 30-90s reasoning phase looks like dead air to
// the REPL — caught by user feedback during the agents-feature shakedown.
it('chatStream surfaces reasoning_content deltas as `thinking` chunks', async () => {
const chatRepo = mockChatRepo();
// Adapter that yields a sequence of openai-format chunks: 2 reasoning
// deltas, then 1 content delta, then [DONE].
const adapter: LlmAdapter = {
kind: 'scripted-thinking',
infer: vi.fn(),
stream: async function*() {
yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'Let me think... ' }, finish_reason: null }] }) };
yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'OK, ready.' }, finish_reason: null }] }) };
yield { data: JSON.stringify({ choices: [{ delta: { content: 'DONE' }, finish_reason: 'stop' }] }) };
yield { data: '[DONE]', done: true };
},
};
const svc = new ChatService(
mockAgents(), mockLlms(), adapterRegistry(adapter),
chatRepo, mockPromptRepo(), mockTools(),
);
const chunks: Array<{ type: string; delta?: string }> = [];
for await (const chunk of svc.chatStream({
agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1',
})) {
chunks.push({ type: chunk.type, delta: chunk.delta });
}
// Expect: 2 thinking + 1 text + 1 final
expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta))
.toEqual(['Let me think... ', 'OK, ready.']);
expect(chunks.filter((c) => c.type === 'text').map((c) => c.delta)).toEqual(['DONE']);
expect(chunks.find((c) => c.type === 'final')).toBeDefined();
// Reasoning is NOT persisted to the thread — only assistant content.
const assistantTurn = chatRepo._msgs.find((m) => m.role === 'assistant');
expect(assistantTurn?.content).toBe('DONE');
expect(assistantTurn?.content).not.toContain('Let me think');
});
// Regression: provider_specific_fields.reasoning_content shape (LiteLLM
// passthrough from vLLM) is also recognized.
it('chatStream recognizes LiteLLM provider_specific_fields.reasoning_content', async () => {
const chatRepo = mockChatRepo();
const adapter: LlmAdapter = {
kind: 'scripted-litellm',
infer: vi.fn(),
stream: async function*() {
yield { data: JSON.stringify({ choices: [{ delta: { provider_specific_fields: { reasoning_content: 'thinking via litellm...' } }, finish_reason: null }] }) };
yield { data: JSON.stringify({ choices: [{ delta: { content: 'ok' }, finish_reason: 'stop' }] }) };
yield { data: '[DONE]', done: true };
},
};
const svc = new ChatService(
mockAgents(), mockLlms(), adapterRegistry(adapter),
chatRepo, mockPromptRepo(), mockTools(),
);
const chunks: Array<{ type: string; delta?: string }> = [];
for await (const chunk of svc.chatStream({
agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1',
})) {
chunks.push({ type: chunk.type, delta: chunk.delta });
}
expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta))
.toEqual(['thinking via litellm...']);
});
// Regression: per-agent maxIterations override + clamp.
// Found by /gstack-review on 2026-04-25.
// Without the clamp, a hostile agent definition with `extras.maxIterations:1000000`