Skip to content

Commit 437c599

Browse files
authored
Merge pull request #2 from get-convex/mikec/update-example
Update the example
2 parents ca4ba44 + bea86f9 commit 437c599

File tree

13 files changed

+402
-446
lines changed

13 files changed

+402
-446
lines changed

example/convex/_generated/api.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import type * as chat from "../chat.js";
1212
import type * as http from "../http.js";
13+
import type * as messages from "../messages.js";
14+
import type * as streaming from "../streaming.js";
1315

1416
import type {
1517
ApiFromModules,
@@ -28,6 +30,8 @@ import type {
2830
declare const fullApi: ApiFromModules<{
2931
chat: typeof chat;
3032
http: typeof http;
33+
messages: typeof messages;
34+
streaming: typeof streaming;
3135
}>;
3236
declare const fullApiWithMounts: typeof fullApi;
3337

example/convex/chat.ts

Lines changed: 38 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,148 +1,50 @@
1-
import {
2-
query,
3-
mutation,
4-
httpAction,
5-
ActionCtx,
6-
internalQuery,
7-
} from "./_generated/server";
8-
import { api, components } from "./_generated/api";
9-
import {
10-
PersistentTextStreaming,
11-
StreamId,
12-
StreamIdValidator,
13-
} from "@convex-dev/persistent-text-streaming";
14-
import { v } from "convex/values";
1+
import { httpAction } from "./_generated/server";
2+
import { internal } from "./_generated/api";
3+
import { StreamId } from "@convex-dev/persistent-text-streaming";
154
import { OpenAI } from "openai";
5+
import { streamingComponent } from "./streaming";
166

17-
const persistentTextStreaming = new PersistentTextStreaming(
18-
components.persistentTextStreaming
19-
);
20-
21-
export const createChat = mutation({
22-
args: {
23-
prompt: v.string(),
24-
},
25-
handler: async (ctx, args) => {
26-
const streamId = await persistentTextStreaming.createStream(ctx);
27-
const chatId = await ctx.db.insert("chats", {
28-
title: "...",
29-
prompt: args.prompt,
30-
stream: streamId,
31-
});
32-
return chatId;
33-
},
34-
});
35-
36-
export const getChatBody = query({
37-
args: {
38-
streamId: StreamIdValidator,
39-
},
40-
handler: async (ctx, args) => {
41-
return await persistentTextStreaming.getStreamBody(
42-
ctx,
43-
args.streamId as StreamId
44-
);
45-
},
46-
});
47-
48-
export const getChatForStream = internalQuery({
49-
args: {
50-
streamId: StreamIdValidator,
51-
},
52-
handler: async (ctx, args) => {
53-
return await ctx.db
54-
.query("chats")
55-
.withIndex("by_stream", (q) => q.eq("stream", args.streamId))
56-
.first();
57-
},
58-
});
59-
60-
type StreamChatRequestBody = {
61-
streamId: string;
62-
};
63-
64-
async function gptStreamer(
65-
ctx: ActionCtx,
66-
request: Request,
67-
streamId: StreamId,
68-
chunkAppender: (text: string) => Promise<void>
69-
) {
70-
let chats = await ctx.runQuery(api.chat.getChats);
71-
const prompt = chats[chats.length - 1].prompt;
72-
chats = chats.slice(0, -1);
73-
74-
// Let's pass along the prior context.
75-
const context = [];
76-
77-
const bodies = await Promise.all(
78-
chats.map(async (chat) => {
79-
const body = await ctx.runQuery(api.chat.getChatBody, {
80-
streamId: chat.stream,
81-
});
82-
return body;
83-
})
84-
);
85-
86-
for (let i = 0; i < bodies.length; i++) {
87-
const body = bodies[i];
88-
const chat = chats[i];
89-
context.push({
90-
userMessage: chat.prompt,
91-
assistantMessage: body,
92-
});
93-
}
94-
95-
const openai = new OpenAI();
96-
const stream = await openai.chat.completions.create({
97-
model: "gpt-4o-mini",
98-
messages: [
99-
{
100-
role: "system",
101-
content: `You are a helpful assistant that can answer questions and help with tasks.
102-
Please provide your response in markdown format.
103-
104-
You are continuing a conversation. The conversation so far is found in the following JSON-formatted value:
105-
106-
${JSON.stringify(context)}`,
107-
},
108-
{
109-
role: "user",
110-
content: prompt,
111-
},
112-
],
113-
stream: true,
114-
});
115-
116-
for await (const part of stream) {
117-
const text = part.choices[0]?.delta?.content || "";
118-
await chunkAppender(text);
119-
}
120-
}
7+
const openai = new OpenAI();
1218

1229
export const streamChat = httpAction(async (ctx, request) => {
123-
const body = (await request.json()) as StreamChatRequestBody;
124-
const response = await persistentTextStreaming.stream(
10+
const body = (await request.json()) as {
11+
streamId: string;
12+
};
13+
14+
// Start streaming and persisting at the same time while
15+
// we immediately return a streaming response to the client
16+
const response = await streamingComponent.stream(
12517
ctx,
12618
request,
12719
body.streamId as StreamId,
128-
gptStreamer
20+
async (ctx, request, streamId, append) => {
21+
// Lets grab the history up to now so that the AI has some context
22+
const history = await ctx.runQuery(internal.messages.getHistory);
23+
24+
// Lets kickoff a stream request to OpenAI
25+
const stream = await openai.chat.completions.create({
26+
model: "gpt-4.1-mini",
27+
messages: [
28+
{
29+
role: "system",
30+
content: `You are a helpful assistant that can answer questions and help with tasks.
31+
Please provide your response in markdown format.
32+
33+
You are continuing a conversation. The conversation so far is found in the following JSON-formatted value:`,
34+
},
35+
...history,
36+
],
37+
stream: true,
38+
});
39+
40+
// Append each chunk to the persistent stream as they come in from openai
41+
for await (const part of stream)
42+
await append(part.choices[0]?.delta?.content || "");
43+
}
12944
);
45+
13046
response.headers.set("Access-Control-Allow-Origin", "*");
13147
response.headers.set("Vary", "Origin");
132-
return response;
133-
});
13448

135-
export const getChats = query({
136-
args: {},
137-
handler: async (ctx) => {
138-
return await ctx.db.query("chats").collect();
139-
},
140-
});
141-
142-
export const clearChat = mutation({
143-
args: {},
144-
handler: async (ctx) => {
145-
const chats = await ctx.db.query("chats").collect();
146-
await Promise.all(chats.map((chat) => ctx.db.delete(chat._id)));
147-
},
49+
return response;
14850
});

example/convex/messages.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { query, mutation, internalQuery } from "./_generated/server";
2+
import { StreamId } from "@convex-dev/persistent-text-streaming";
3+
import { v } from "convex/values";
4+
import { streamingComponent } from "./streaming";
5+
6+
export const listMessages = query({
7+
args: {},
8+
handler: async (ctx) => {
9+
return await ctx.db.query("userMessages").collect();
10+
},
11+
});
12+
13+
export const clearMessages = mutation({
14+
args: {},
15+
handler: async (ctx) => {
16+
const chats = await ctx.db.query("userMessages").collect();
17+
await Promise.all(chats.map((chat) => ctx.db.delete(chat._id)));
18+
},
19+
});
20+
21+
export const sendMessage = mutation({
22+
args: {
23+
prompt: v.string(),
24+
},
25+
handler: async (ctx, args) => {
26+
const responseStreamId = await streamingComponent.createStream(ctx);
27+
const chatId = await ctx.db.insert("userMessages", {
28+
prompt: args.prompt,
29+
responseStreamId,
30+
});
31+
return chatId;
32+
},
33+
});
34+
35+
export const getHistory = internalQuery({
36+
args: {},
37+
handler: async (ctx) => {
38+
// Grab all the user messages
39+
const allMessages = await ctx.db.query("userMessages").collect();
40+
41+
// Lets join the user messages with the assistant messages
42+
const joinedResponses = await Promise.all(
43+
allMessages.map(async (userMessage) => {
44+
return {
45+
userMessage,
46+
responseMessage: await streamingComponent.getStreamBody(
47+
ctx,
48+
userMessage.responseStreamId as StreamId
49+
),
50+
};
51+
})
52+
);
53+
54+
return joinedResponses.flatMap((joined) => {
55+
const user = {
56+
role: "user" as const,
57+
content: joined.userMessage.prompt,
58+
};
59+
60+
const assistant = {
61+
role: "assistant" as const,
62+
content: joined.responseMessage.text,
63+
};
64+
65+
// If the assistant message is empty, its probably because we have not
66+
// started streaming yet so lets not include it in the history
67+
if (!assistant.content) return [user];
68+
69+
return [user, assistant];
70+
});
71+
},
72+
});

example/convex/schema.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ import { StreamIdValidator } from "@convex-dev/persistent-text-streaming";
33
import { v } from "convex/values";
44

55
export default defineSchema({
6-
chats: defineTable({
7-
title: v.string(),
6+
userMessages: defineTable({
87
prompt: v.string(),
9-
stream: StreamIdValidator,
10-
}).index("by_stream", ["stream"]),
8+
responseStreamId: StreamIdValidator,
9+
}).index("by_stream", ["responseStreamId"]),
1110
});

example/convex/streaming.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import {
2+
PersistentTextStreaming,
3+
StreamId,
4+
StreamIdValidator,
5+
} from "@convex-dev/persistent-text-streaming";
6+
import { components } from "./_generated/api";
7+
import { query } from "./_generated/server";
8+
9+
export const streamingComponent = new PersistentTextStreaming(
10+
components.persistentTextStreaming
11+
);
12+
13+
export const getStreamBody = query({
14+
args: {
15+
streamId: StreamIdValidator,
16+
},
17+
handler: async (ctx, args) => {
18+
return await streamingComponent.getStreamBody(
19+
ctx,
20+
args.streamId as StreamId
21+
);
22+
},
23+
});

example/src/App.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import "./App.css";
2-
import ChatWindow from "./components/chat-window";
2+
import ChatWindow from "./components/ChatWindow";
33

44
function App() {
55
return (

0 commit comments

Comments
 (0)