Skip to content

Commit 0750845

Browse files
committed
make uimessages from scratch each time
1 parent 405dcdc commit 0750845

File tree

4 files changed

+62
-69
lines changed

4 files changed

+62
-69
lines changed

example/ui/main.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import FilesImages from "./files/FilesImages";
99
import RateLimiting from "./rate_limiting/RateLimiting";
1010
import { WeatherFashion } from "./workflows/WeatherFashion";
1111
import RagBasic from "./rag/RagBasic";
12+
import StreamArray from "./objects/StreamArray";
1213

1314
const convex = new ConvexReactClient(import.meta.env.VITE_CONVEX_URL as string);
1415

src/deltas.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,6 @@ export function getParts<T extends StreamDelta["parts"][number]>(
188188
}
189189
if (cursor !== delta.start) {
190190
if (cursor >= delta.end) {
191-
console.debug(
192-
`Got duplicate delta for stream ${delta.streamId} at ${delta.start}`,
193-
);
194191
continue;
195192
} else if (cursor < delta.start) {
196193
console.warn(

src/react/useDeltaStreams.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"use client";
22

33
import type { StreamQuery, StreamQueryArgs } from "./types.js";
4+
import type { SyncStreamsReturnValue } from "../client/types.js";
45
import type { FunctionArgs } from "convex/server";
56
import type { StreamArgs, StreamDelta, StreamMessage } from "../validators.js";
6-
import type { SyncStreamsReturnValue } from "@convex-dev/agent";
7+
import { sorted } from "../shared.js";
78
import { useQuery } from "convex/react";
89
import { useState } from "react";
910
import { assert } from "convex-helpers";
@@ -75,10 +76,12 @@ export function useDeltaStreams<
7576
? undefined
7677
: !streamList
7778
? state.deltaStreams?.map(({ streamMessage }) => streamMessage)
78-
: streamList.streams.messages.filter(
79-
({ streamId, order }) =>
80-
!options?.skipStreamIds?.includes(streamId) &&
81-
(!options?.startOrder || order >= options.startOrder),
79+
: sorted(
80+
streamList.streams.messages.filter(
81+
({ streamId, order }) =>
82+
!options?.skipStreamIds?.includes(streamId) &&
83+
(!options?.startOrder || order >= options.startOrder),
84+
),
8285
);
8386

8487
// Get the deltas for all the active streams, if any.

src/react/useStreamingUIMessages.ts

Lines changed: 53 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ import { type UIMessage } from "../UIMessages.js";
66
import {
77
blankUIMessage,
88
getParts,
9-
statusFromStreamStatus,
109
updateFromUIMessageChunks,
11-
updateFromTextStreamParts,
10+
deriveUIMessagesFromTextStreamParts,
1211
} from "../deltas.js";
1312
import { useDeltaStreams } from "./useDeltaStreams.js";
1413

@@ -66,75 +65,68 @@ export function useStreamingUIMessages<
6665

6766
useEffect(() => {
6867
if (!streams) return;
68+
// return if there are no new deltas beyond the cursors
69+
let noNewDeltas = true;
70+
for (const stream of streams) {
71+
const lastDelta = stream.deltas.at(-1);
72+
const cursor = messageState[stream.streamMessage.streamId]?.cursor;
73+
if (!cursor) {
74+
noNewDeltas = false;
75+
break;
76+
}
77+
if (lastDelta && lastDelta.start >= cursor) {
78+
noNewDeltas = false;
79+
break;
80+
}
81+
}
82+
if (noNewDeltas) {
83+
return;
84+
}
6985
const abortController = new AbortController();
7086
void (async () => {
71-
let changed = false;
7287
const newMessageState: Record<
7388
string,
7489
{
7590
uiMessage: UIMessage<METADATA, DATA_PARTS, TOOLS>;
7691
cursor: number;
7792
}
78-
> = {};
79-
for (const stream of streams) {
80-
if (abortController.signal.aborted) return;
81-
const oldState = messageState[stream.streamMessage.streamId];
82-
let uiMessage = oldState?.uiMessage;
83-
if (!oldState) {
84-
changed = true;
85-
uiMessage = blankUIMessage(
86-
stream.streamMessage,
87-
threadId,
88-
) as UIMessage<METADATA, DATA_PARTS, TOOLS>;
89-
}
90-
const { parts, cursor } = getParts<UIMessageChunk>(
91-
stream.deltas,
92-
oldState?.cursor,
93-
);
94-
if (parts.length) {
95-
changed = true;
96-
if (stream.streamMessage.format === "UIMessageChunk") {
97-
uiMessage = (await updateFromUIMessageChunks(
98-
uiMessage,
99-
parts,
100-
)) as UIMessage<METADATA, DATA_PARTS, TOOLS>;
101-
if (
102-
uiMessage.status !== "failed" &&
103-
uiMessage.status !== "success"
104-
) {
105-
uiMessage.status = statusFromStreamStatus(
106-
stream.streamMessage.status,
93+
> = Object.fromEntries(
94+
await Promise.all(
95+
streams.map(async ({ deltas, streamMessage }) => {
96+
const { parts, cursor } = getParts<UIMessageChunk>(deltas, 0);
97+
if (streamMessage.format === "UIMessageChunk") {
98+
// Unfortunately this can't handle resuming from a UIMessage and
99+
// adding more chunks, so we re-create it from scratch each time.
100+
const uiMessage = await updateFromUIMessageChunks(
101+
blankUIMessage(streamMessage, threadId),
102+
parts,
107103
);
104+
return [
105+
streamMessage.streamId,
106+
{
107+
uiMessage,
108+
cursor,
109+
},
110+
];
111+
} else {
112+
const [uiMessages] = deriveUIMessagesFromTextStreamParts(
113+
threadId,
114+
[streamMessage],
115+
[],
116+
deltas,
117+
);
118+
return [
119+
streamMessage.streamId,
120+
{
121+
uiMessage: uiMessages[0],
122+
cursor,
123+
},
124+
];
108125
}
109-
} else if (
110-
stream.streamMessage.format === "TextStreamPart" ||
111-
!stream.streamMessage.format
112-
) {
113-
const updated = updateFromTextStreamParts(
114-
threadId,
115-
stream.streamMessage,
116-
{
117-
streamId: stream.streamMessage.streamId,
118-
cursor,
119-
message: uiMessage,
120-
},
121-
stream.deltas,
122-
)[0];
123-
uiMessage = updated.message as UIMessage<
124-
METADATA,
125-
DATA_PARTS,
126-
TOOLS
127-
>;
128-
} else {
129-
console.error("Unknown format", stream.streamMessage.format);
130-
}
131-
}
132-
newMessageState[stream.streamMessage.streamId] = {
133-
uiMessage,
134-
cursor,
135-
};
136-
}
137-
if (!changed || abortController.signal.aborted) return;
126+
}),
127+
),
128+
);
129+
if (abortController.signal.aborted) return;
138130
setMessageState(newMessageState);
139131
})();
140132
return () => {

0 commit comments

Comments
 (0)