diff --git a/src/cli/src/commands/chat.ts b/src/cli/src/commands/chat.ts index 88b0901..bcd4a65 100644 --- a/src/cli/src/commands/chat.ts +++ b/src/cli/src/commands/chat.ts @@ -141,10 +141,7 @@ async function runOneShot( if (stream === false) { const body: Record = { message, ...overrides }; if (threadId !== undefined) body.threadId = threadId; - const res = await deps.client.post<{ assistant: string; threadId: string; turnIndex: number }>( - `/api/v1/agents/${encodeURIComponent(agent)}/chat`, - body, - ); + const res = await chatRequestNonStream(deps, agent, body); process.stdout.write(`${res.assistant}\n`); process.stderr.write(`(thread: ${res.threadId})\n`); return; @@ -188,10 +185,7 @@ async function runRepl( if (stream === false) { const body: Record = { message: line, ...overrides }; if (threadId !== undefined) body.threadId = threadId; - const res = await deps.client.post<{ assistant: string; threadId: string }>( - `/api/v1/agents/${encodeURIComponent(agent)}/chat`, - body, - ); + const res = await chatRequestNonStream(deps, agent, body); threadId = res.threadId; process.stdout.write(`${res.assistant}\n`); } else { @@ -306,6 +300,60 @@ function applySetCommand(o: Overrides, key: string, valueRaw: string): void { } } +/** + * Non-streaming POST to the chat endpoint. Uses the SAME 10-minute timeout + * as the streaming path — `deps.client.post` (the shared ApiClient) defaults + * to 10s, which is too tight for any chat that calls a tool. Returns the + * parsed JSON body on 2xx, throws on 4xx/5xx with the response body. + */ +async function chatRequestNonStream( + deps: ChatCommandDeps, + agent: string, + body: Record, +): Promise<{ assistant: string; threadId: string; turnIndex: number }> { + const url = new URL(`${deps.baseUrl}/api/v1/agents/${encodeURIComponent(agent)}/chat`); + const payload = JSON.stringify(body); + return new Promise((resolve, reject) => { + const driver = url.protocol === 'https:' ? https : http; + const req = driver.request({ + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + timeout: STREAM_TIMEOUT_MS, + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json', + ...(deps.token !== undefined ? { Authorization: `Bearer ${deps.token}` } : {}), + }, + }, (res) => { + const status = res.statusCode ?? 0; + const chunks: Buffer[] = []; + res.on('data', (c: Buffer) => chunks.push(c)); + res.on('end', () => { + const raw = Buffer.concat(chunks).toString('utf-8'); + if (status >= 400) { + reject(new Error(`HTTP ${String(status)}: ${raw}`)); + return; + } + try { + resolve(JSON.parse(raw) as { assistant: string; threadId: string; turnIndex: number }); + } catch (err) { + reject(new Error(`malformed response: ${(err as Error).message}`)); + } + }); + res.on('error', reject); + }); + req.on('error', reject); + req.on('timeout', () => { + req.destroy(); + reject(new Error('chat request timed out')); + }); + req.write(payload); + req.end(); + }); +} + /** Stream a single chat call. Returns the resolved threadId. */ async function streamOnce( deps: ChatCommandDeps, @@ -356,6 +404,13 @@ async function streamOnce( case 'text': if (typeof evt.delta === 'string') process.stdout.write(evt.delta); break; + case 'thinking': + // Reasoning models (qwen3-thinking, deepseek-reasoner, o1 + // family) emit this for tens of seconds before producing + // any content delta. Show it dim+italic on stderr so the + // final answer (stdout) stays clean for grepping/redirect. + if (typeof evt.delta === 'string') process.stderr.write(styleThinking(evt.delta)); + break; case 'tool_call': process.stderr.write(`\n[tool_call: ${evt.toolName ?? ''}]\n`); break; @@ -389,7 +444,7 @@ async function streamOnce( } interface ChatStreamFrame { - type: 'text' | 'tool_call' | 'tool_result' | 'final' | 'error'; + type: 'text' | 'thinking' | 'tool_call' | 'tool_result' | 'final' | 'error'; delta?: string; toolName?: string; ok?: boolean; @@ -398,6 +453,16 @@ interface ChatStreamFrame { message?: string; } +// ANSI codes for the reasoning sidebar. Dim + italic visually separates +// reasoning ("the model is thinking") from final assistant content. We only +// emit the codes when stderr is a TTY — piping to a file should stay clean. +const ANSI_DIM_ITALIC = '\x1b[2;3m'; +const ANSI_RESET = '\x1b[0m'; +const STDERR_IS_TTY = process.stderr.isTTY === true; +function styleThinking(s: string): string { + return STDERR_IS_TTY ? `${ANSI_DIM_ITALIC}${s}${ANSI_RESET}` : s; +} + function collect(value: string, prev: string[]): string[] { return [...prev, value]; } diff --git a/src/mcpd/src/services/chat.service.ts b/src/mcpd/src/services/chat.service.ts index 5c9bf7c..50a5015 100644 --- a/src/mcpd/src/services/chat.service.ts +++ b/src/mcpd/src/services/chat.service.ts @@ -78,7 +78,18 @@ export interface ChatToolDispatcher { } export interface ChatStreamChunk { - type: 'text' | 'tool_call' | 'tool_result' | 'final' | 'error'; + /** + * Chunk type: + * - text: assistant text delta + * - thinking: reasoning_content delta (qwen3-thinking, o1, deepseek-reasoner + * etc. emit reasoning before content; surface it so the REPL can show + * "the model is thinking" instead of going silent for 30-90s) + * - tool_call: model decided to call a tool + * - tool_result: tool dispatch outcome + * - final: terminal turn (carries threadId/turnIndex) + * - error: fatal error in the loop + */ + type: 'text' | 'thinking' | 'tool_call' | 'tool_result' | 'final' | 'error'; delta?: string; toolName?: string; args?: Record; @@ -241,6 +252,12 @@ export class ChatService { accumulated.content += evt.contentDelta; yield { type: 'text', delta: evt.contentDelta }; } + if (evt.reasoningDelta !== undefined) { + // Reasoning is not persisted to the thread (it's the model's + // scratchpad, not part of the conversation) — only streamed so + // the REPL can show progress while the model thinks. + yield { type: 'thinking', delta: evt.reasoningDelta }; + } if (evt.toolCallDeltas !== undefined) { for (const td of evt.toolCallDeltas) { const slot = (accumulated.toolCalls[td.index] ??= { id: '', name: '', argumentsJson: '' }); @@ -520,6 +537,14 @@ function safeParseJson(s: string): unknown { interface ParsedStreamEvent { contentDelta?: string; + /** + * Reasoning text emitted by thinking models (qwen3-thinking, + * deepseek-reasoner, OpenAI o1 family). Different providers spell the + * field differently — we accept both `reasoning_content` (qwen, deepseek) + * and `reasoning` (some o1 variants) and the older `provider_specific_fields.reasoning` + * shape that LiteLLM passes through from vLLM. + */ + reasoningDelta?: string; toolCallDeltas?: Array<{ index: number; id?: string; name?: string; argumentsDelta?: string }>; finishReason?: string | null; } @@ -535,13 +560,31 @@ function parseStreamingChunk(data: string): ParsedStreamEvent | null { if (typeof json !== 'object' || json === null) return null; const choices = (json as { choices?: unknown }).choices; if (!Array.isArray(choices) || choices.length === 0) return null; - const c = choices[0] as { delta?: { content?: unknown; tool_calls?: unknown }; finish_reason?: unknown }; + const c = choices[0] as { + delta?: { + content?: unknown; + reasoning_content?: unknown; + reasoning?: unknown; + tool_calls?: unknown; + provider_specific_fields?: { reasoning_content?: unknown; reasoning?: unknown }; + }; + finish_reason?: unknown; + }; const evt: ParsedStreamEvent = {}; const delta = c.delta; if (delta !== undefined) { if (typeof delta.content === 'string' && delta.content.length > 0) { evt.contentDelta = delta.content; } + // Try the standard fields first, then the LiteLLM passthrough shape. + const reasoning = + (typeof delta.reasoning_content === 'string' && delta.reasoning_content.length > 0 ? delta.reasoning_content : undefined) + ?? (typeof delta.reasoning === 'string' && delta.reasoning.length > 0 ? delta.reasoning : undefined) + ?? (typeof delta.provider_specific_fields?.reasoning_content === 'string' && delta.provider_specific_fields.reasoning_content.length > 0 ? delta.provider_specific_fields.reasoning_content : undefined) + ?? (typeof delta.provider_specific_fields?.reasoning === 'string' && delta.provider_specific_fields.reasoning.length > 0 ? delta.provider_specific_fields.reasoning : undefined); + if (reasoning !== undefined) { + evt.reasoningDelta = reasoning; + } if (Array.isArray(delta.tool_calls)) { evt.toolCallDeltas = (delta.tool_calls as Array<{ index: number; diff --git a/src/mcpd/tests/chat-service.test.ts b/src/mcpd/tests/chat-service.test.ts index 80fd58c..c7d8ee8 100644 --- a/src/mcpd/tests/chat-service.test.ts +++ b/src/mcpd/tests/chat-service.test.ts @@ -411,6 +411,76 @@ describe('ChatService', () => { expect(ctx.body.tools?.[0]?.function.name).toBe(`s1${TOOL_NAME_SEPARATOR}a`); }); + // Regression: reasoning_content (qwen3-thinking, deepseek-reasoner, o1) + // streams as `thinking` chunks, separate from `text`. + // Without this, the model's 30-90s reasoning phase looks like dead air to + // the REPL — caught by user feedback during the agents-feature shakedown. + it('chatStream surfaces reasoning_content deltas as `thinking` chunks', async () => { + const chatRepo = mockChatRepo(); + // Adapter that yields a sequence of openai-format chunks: 2 reasoning + // deltas, then 1 content delta, then [DONE]. + const adapter: LlmAdapter = { + kind: 'scripted-thinking', + infer: vi.fn(), + stream: async function*() { + yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'Let me think... ' }, finish_reason: null }] }) }; + yield { data: JSON.stringify({ choices: [{ delta: { reasoning_content: 'OK, ready.' }, finish_reason: null }] }) }; + yield { data: JSON.stringify({ choices: [{ delta: { content: 'DONE' }, finish_reason: 'stop' }] }) }; + yield { data: '[DONE]', done: true }; + }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + + const chunks: Array<{ type: string; delta?: string }> = []; + for await (const chunk of svc.chatStream({ + agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1', + })) { + chunks.push({ type: chunk.type, delta: chunk.delta }); + } + + // Expect: 2 thinking + 1 text + 1 final + expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta)) + .toEqual(['Let me think... ', 'OK, ready.']); + expect(chunks.filter((c) => c.type === 'text').map((c) => c.delta)).toEqual(['DONE']); + expect(chunks.find((c) => c.type === 'final')).toBeDefined(); + + // Reasoning is NOT persisted to the thread — only assistant content. + const assistantTurn = chatRepo._msgs.find((m) => m.role === 'assistant'); + expect(assistantTurn?.content).toBe('DONE'); + expect(assistantTurn?.content).not.toContain('Let me think'); + }); + + // Regression: provider_specific_fields.reasoning_content shape (LiteLLM + // passthrough from vLLM) is also recognized. + it('chatStream recognizes LiteLLM provider_specific_fields.reasoning_content', async () => { + const chatRepo = mockChatRepo(); + const adapter: LlmAdapter = { + kind: 'scripted-litellm', + infer: vi.fn(), + stream: async function*() { + yield { data: JSON.stringify({ choices: [{ delta: { provider_specific_fields: { reasoning_content: 'thinking via litellm...' } }, finish_reason: null }] }) }; + yield { data: JSON.stringify({ choices: [{ delta: { content: 'ok' }, finish_reason: 'stop' }] }) }; + yield { data: '[DONE]', done: true }; + }, + }; + const svc = new ChatService( + mockAgents(), mockLlms(), adapterRegistry(adapter), + chatRepo, mockPromptRepo(), mockTools(), + ); + + const chunks: Array<{ type: string; delta?: string }> = []; + for await (const chunk of svc.chatStream({ + agentName: 'reviewer', userMessage: 'hi', ownerId: 'owner-1', + })) { + chunks.push({ type: chunk.type, delta: chunk.delta }); + } + expect(chunks.filter((c) => c.type === 'thinking').map((c) => c.delta)) + .toEqual(['thinking via litellm...']); + }); + // Regression: per-agent maxIterations override + clamp. // Found by /gstack-review on 2026-04-25. // Without the clamp, a hostile agent definition with `extras.maxIterations:1000000`