Skip to content

Commit cc64299

Browse files
authored
🤖 fix: clear todos on stream end with smart reconnection handling (#498)
## Problem **TODOs were not being cleared when streams ended**, causing stale todos to persist in the UI until the next user message. Additionally, on page reload, TODOs were incorrectly reconstructed from history even for completed streams. The challenge: We need **different behavior for two reload scenarios**: - **Reconnection during active stream**: Should reconstruct TODOs (work in progress) - **Reload after completed stream**: Should NOT reconstruct TODOs (clean slate) ## Solution ### 1. Clear TODOs on stream end Modified `cleanupStreamState()` to clear `currentTodos` when streams complete (end/abort/error). TODOs are now truly stream-scoped. ### 2. Smart reconstruction on reload **Key insight**: Check buffered events for `stream-start` to detect active streams. - `loadHistoricalMessages()` now accepts `hasActiveStream` parameter - `WorkspaceStore` checks buffered events before loading history - **Active stream** (hasActiveStream=true) → Reconstruct TODOs ✅ - **Completed stream** (hasActiveStream=false) → Don't reconstruct TODOs ✅ - **agentStatus** always reconstructed (persists across sessions) ✅ ### 3. Improved separation of concerns Centralized tool persistence logic in `processToolResult`: - Added `context` parameter: `"streaming" | "historical"` - `loadHistoricalMessages` no longer knows about specific tool behaviors - Each tool declares its own persistence policy in one place ## Implementation **WorkspaceStore checks for active streams:** ```typescript const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? []; const hasActiveStream = pendingEvents.some( (event) => "type" in event && event.type === "stream-start" ); aggregator.loadHistoricalMessages(historicalMsgs, hasActiveStream); ``` **StreamingMessageAggregator handles context:** ```typescript loadHistoricalMessages(messages: CmuxMessage[], hasActiveStream = false) { const context = hasActiveStream ? "streaming" : "historical"; // Process tool results with context this.processToolResult(toolName, input, output, context); } ``` **processToolResult decides based on tool + context:** ```typescript private processToolResult(toolName, input, output, context: "streaming" | "historical") { // TODOs: stream-scoped (only during streaming) if (toolName === "todo_write" && context === "streaming") { this.currentTodos = args.todos; } // agentStatus: persistent (always reconstruct) if (toolName === "status_set") { this.agentStatus = { emoji, message, url }; } } ``` ## Testing Comprehensive test coverage for the full todo lifecycle: ✅ Clear todos on stream end ✅ Clear todos on stream abort ✅ Reconstruct todos when `hasActiveStream=true` (reconnection) ✅ Don't reconstruct todos when `hasActiveStream=false` (completed) ✅ Always reconstruct agentStatus (persists across sessions) ✅ Clear todos on new user message All 146 message-related tests pass. ## Behavior Matrix | Scenario | TODOs | agentStatus | |----------|-------|-------------| | During streaming | ✅ Visible | ✅ Visible | | Stream ends | ❌ Cleared | ✅ Persists | | Reload (active stream) | ✅ Reconstructed | ✅ Reconstructed | | Reload (completed) | ❌ Not reconstructed | ✅ Reconstructed | | New user message | ❌ Cleared | ❌ Cleared | ## Key Insight The **reconnection scenario** is the critical edge case: when a user reloads during an active stream, historical messages contain completed tool calls from the *current* stream, and buffered events contain `stream-start`. In this case, we *should* reconstruct TODOs to show work in progress. This is fundamentally different from reloading after stream completion, where TODOs should remain cleared. --- _Generated with `cmux`_
1 parent 0f870a6 commit cc64299

File tree

4 files changed

+309
-34
lines changed

4 files changed

+309
-34
lines changed

src/components/PinnedTodoList.tsx

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ interface PinnedTodoListProps {
1010

1111
/**
1212
* Pinned TODO list displayed at bottom of chat (before StreamingBarrier).
13-
* Shows current TODOs from active stream only.
13+
* Shows current TODOs from active stream only - automatically cleared when stream ends.
1414
* Reuses TodoList component for consistent styling.
1515
*
1616
* Relies on natural reference stability from MapStore + Aggregator architecture:
1717
* - Aggregator.getCurrentTodos() returns direct reference (not a copy)
1818
* - Reference only changes when todos are actually modified
1919
* - MapStore caches WorkspaceState per version, avoiding unnecessary recomputation
20+
* - Todos are cleared by StreamingMessageAggregator when stream completes
2021
*/
2122
export const PinnedTodoList: React.FC<PinnedTodoListProps> = ({ workspaceId }) => {
2223
const [expanded, setExpanded] = usePersistedState("pinnedTodoExpanded", true);
@@ -27,17 +28,8 @@ export const PinnedTodoList: React.FC<PinnedTodoListProps> = ({ workspaceId }) =
2728
() => workspaceStore.getWorkspaceState(workspaceId).todos
2829
);
2930

30-
// Get streaming state
31-
const canInterrupt = useSyncExternalStore(
32-
(callback) => workspaceStore.subscribeKey(workspaceId, callback),
33-
() => workspaceStore.getWorkspaceState(workspaceId).canInterrupt
34-
);
35-
36-
// When idle (not streaming), only show completed todos for clean summary
37-
// When streaming, show all todos so user can see active work
38-
const displayTodos = canInterrupt ? todos : todos.filter((todo) => todo.status === "completed");
39-
40-
if (displayTodos.length === 0) {
31+
// Todos are cleared when stream ends, so if there are todos they're from an active stream
32+
if (todos.length === 0) {
4133
return null;
4234
}
4335

@@ -57,7 +49,7 @@ export const PinnedTodoList: React.FC<PinnedTodoListProps> = ({ workspaceId }) =
5749
</span>
5850
TODO{expanded ? ":" : ""}
5951
</div>
60-
{expanded && <TodoList todos={displayTodos} />}
52+
{expanded && <TodoList todos={todos} />}
6153
</div>
6254
);
6355
};

src/stores/WorkspaceStore.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -910,17 +910,22 @@ export class WorkspaceStore {
910910
const historicalMsgs = this.historicalMessages.get(workspaceId) ?? [];
911911

912912
if (isCaughtUpMessage(data)) {
913+
// Check if there's an active stream in buffered events (reconnection scenario)
914+
const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? [];
915+
const hasActiveStream = pendingEvents.some(
916+
(event) => "type" in event && event.type === "stream-start"
917+
);
918+
913919
// Load historical messages first
914920
if (historicalMsgs.length > 0) {
915-
aggregator.loadHistoricalMessages(historicalMsgs);
921+
aggregator.loadHistoricalMessages(historicalMsgs, hasActiveStream);
916922
this.historicalMessages.set(workspaceId, []);
917923
}
918924

919925
// Mark that we're replaying buffered history (prevents O(N) scheduling)
920926
this.replayingHistory.add(workspaceId);
921927

922928
// Process buffered stream events now that history is loaded
923-
const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? [];
924929
for (const event of pendingEvents) {
925930
this.processStreamEvent(workspaceId, aggregator, event);
926931
}

src/utils/messages/StreamingMessageAggregator.test.ts

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,246 @@ describe("StreamingMessageAggregator", () => {
137137
expect(messages1).toBe(messages2);
138138
});
139139
});
140+
141+
describe("todo lifecycle", () => {
142+
test("should clear todos when stream ends", () => {
143+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
144+
145+
// Start a stream
146+
aggregator.handleStreamStart({
147+
type: "stream-start",
148+
workspaceId: "test-workspace",
149+
messageId: "msg1",
150+
historySequence: 1,
151+
model: "claude-3-5-sonnet-20241022",
152+
});
153+
154+
// Simulate todo_write tool call
155+
aggregator.handleToolCallStart({
156+
messageId: "msg1",
157+
toolCallId: "tool1",
158+
toolName: "todo_write",
159+
args: {
160+
todos: [
161+
{ content: "Do task 1", status: "in_progress" },
162+
{ content: "Do task 2", status: "pending" },
163+
],
164+
},
165+
tokens: 10,
166+
timestamp: Date.now(),
167+
type: "tool-call-start",
168+
workspaceId: "test-workspace",
169+
});
170+
171+
aggregator.handleToolCallEnd({
172+
type: "tool-call-end",
173+
workspaceId: "test-workspace",
174+
messageId: "msg1",
175+
toolCallId: "tool1",
176+
toolName: "todo_write",
177+
result: { success: true },
178+
});
179+
180+
// Verify todos are set
181+
expect(aggregator.getCurrentTodos()).toHaveLength(2);
182+
expect(aggregator.getCurrentTodos()[0].content).toBe("Do task 1");
183+
184+
// End the stream
185+
aggregator.handleStreamEnd({
186+
type: "stream-end",
187+
workspaceId: "test-workspace",
188+
messageId: "msg1",
189+
metadata: {
190+
historySequence: 1,
191+
timestamp: Date.now(),
192+
model: "claude-3-5-sonnet-20241022",
193+
},
194+
parts: [],
195+
});
196+
197+
// Todos should be cleared
198+
expect(aggregator.getCurrentTodos()).toHaveLength(0);
199+
});
200+
201+
test("should clear todos when stream aborts", () => {
202+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
203+
204+
aggregator.handleStreamStart({
205+
type: "stream-start",
206+
workspaceId: "test-workspace",
207+
messageId: "msg1",
208+
historySequence: 1,
209+
model: "claude-3-5-sonnet-20241022",
210+
});
211+
212+
// Simulate todo_write
213+
aggregator.handleToolCallStart({
214+
messageId: "msg1",
215+
toolCallId: "tool1",
216+
toolName: "todo_write",
217+
args: {
218+
todos: [{ content: "Task", status: "in_progress" }],
219+
},
220+
tokens: 10,
221+
timestamp: Date.now(),
222+
type: "tool-call-start",
223+
workspaceId: "test-workspace",
224+
});
225+
226+
aggregator.handleToolCallEnd({
227+
type: "tool-call-end",
228+
workspaceId: "test-workspace",
229+
messageId: "msg1",
230+
toolCallId: "tool1",
231+
toolName: "todo_write",
232+
result: { success: true },
233+
});
234+
235+
expect(aggregator.getCurrentTodos()).toHaveLength(1);
236+
237+
// Abort the stream
238+
aggregator.handleStreamAbort({
239+
type: "stream-abort",
240+
workspaceId: "test-workspace",
241+
messageId: "msg1",
242+
metadata: {},
243+
});
244+
245+
// Todos should be cleared
246+
expect(aggregator.getCurrentTodos()).toHaveLength(0);
247+
});
248+
249+
test("should reconstruct todos on reload ONLY when reconnecting to active stream", () => {
250+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
251+
252+
const historicalMessage = {
253+
id: "msg1",
254+
role: "assistant" as const,
255+
parts: [
256+
{
257+
type: "dynamic-tool" as const,
258+
toolCallId: "tool1",
259+
toolName: "todo_write",
260+
state: "output-available" as const,
261+
input: {
262+
todos: [
263+
{ content: "Historical task 1", status: "completed" },
264+
{ content: "Historical task 2", status: "completed" },
265+
],
266+
},
267+
output: { success: true },
268+
},
269+
],
270+
metadata: {
271+
historySequence: 1,
272+
timestamp: Date.now(),
273+
model: "claude-3-5-sonnet-20241022",
274+
},
275+
};
276+
277+
// Scenario 1: Reload with active stream (hasActiveStream = true)
278+
aggregator.loadHistoricalMessages([historicalMessage], true);
279+
expect(aggregator.getCurrentTodos()).toHaveLength(2);
280+
expect(aggregator.getCurrentTodos()[0].content).toBe("Historical task 1");
281+
282+
// Reset for next scenario
283+
const aggregator2 = new StreamingMessageAggregator(TEST_CREATED_AT);
284+
285+
// Scenario 2: Reload without active stream (hasActiveStream = false)
286+
aggregator2.loadHistoricalMessages([historicalMessage], false);
287+
expect(aggregator2.getCurrentTodos()).toHaveLength(0);
288+
});
289+
290+
test("should reconstruct agentStatus but NOT todos when no active stream", () => {
291+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
292+
293+
const historicalMessage = {
294+
id: "msg1",
295+
role: "assistant" as const,
296+
parts: [
297+
{
298+
type: "dynamic-tool" as const,
299+
toolCallId: "tool1",
300+
toolName: "todo_write",
301+
state: "output-available" as const,
302+
input: {
303+
todos: [{ content: "Task 1", status: "completed" }],
304+
},
305+
output: { success: true },
306+
},
307+
{
308+
type: "dynamic-tool" as const,
309+
toolCallId: "tool2",
310+
toolName: "status_set",
311+
state: "output-available" as const,
312+
input: { emoji: "🔧", message: "Working on it" },
313+
output: { success: true, emoji: "🔧", message: "Working on it" },
314+
},
315+
],
316+
metadata: {
317+
historySequence: 1,
318+
timestamp: Date.now(),
319+
model: "claude-3-5-sonnet-20241022",
320+
},
321+
};
322+
323+
// Load without active stream
324+
aggregator.loadHistoricalMessages([historicalMessage], false);
325+
326+
// agentStatus should be reconstructed (persists across sessions)
327+
expect(aggregator.getAgentStatus()).toEqual({ emoji: "🔧", message: "Working on it" });
328+
329+
// TODOs should NOT be reconstructed (stream-scoped)
330+
expect(aggregator.getCurrentTodos()).toHaveLength(0);
331+
});
332+
333+
test("should clear todos when new user message arrives during active stream", () => {
334+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
335+
336+
// Simulate an active stream with todos
337+
aggregator.handleStreamStart({
338+
type: "stream-start",
339+
workspaceId: "test-workspace",
340+
messageId: "msg1",
341+
historySequence: 1,
342+
model: "claude-3-5-sonnet-20241022",
343+
});
344+
345+
aggregator.handleToolCallStart({
346+
messageId: "msg1",
347+
toolCallId: "tool1",
348+
toolName: "todo_write",
349+
args: {
350+
todos: [{ content: "Task", status: "completed" }],
351+
},
352+
tokens: 10,
353+
timestamp: Date.now(),
354+
type: "tool-call-start",
355+
workspaceId: "test-workspace",
356+
});
357+
358+
aggregator.handleToolCallEnd({
359+
type: "tool-call-end",
360+
workspaceId: "test-workspace",
361+
messageId: "msg1",
362+
toolCallId: "tool1",
363+
toolName: "todo_write",
364+
result: { success: true },
365+
});
366+
367+
// TODOs should be set
368+
expect(aggregator.getCurrentTodos()).toHaveLength(1);
369+
370+
// Add new user message (simulating user sending a new message)
371+
aggregator.handleMessage({
372+
id: "msg2",
373+
role: "user",
374+
parts: [{ type: "text", text: "Hello" }],
375+
metadata: { historySequence: 2, timestamp: Date.now() },
376+
});
377+
378+
// Todos should be cleared when new user message arrives
379+
expect(aggregator.getCurrentTodos()).toHaveLength(0);
380+
});
381+
});
140382
});

0 commit comments

Comments
 (0)