Skip to content

Commit c6ebb21

Browse files
committed
streams example
1 parent 1cc281d commit c6ebb21

File tree

5 files changed

+366
-0
lines changed

5 files changed

+366
-0
lines changed

convex/_generated/api.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import type * as relationshipsExample from "../relationshipsExample.js";
2121
import type * as retriesExample from "../retriesExample.js";
2222
import type * as rlsExample from "../rlsExample.js";
2323
import type * as sessionsExample from "../sessionsExample.js";
24+
import type * as streamsExample from "../streamsExample.js";
2425
import type * as testingFunctions from "../testingFunctions.js";
2526
import type * as triggersExample from "../triggersExample.js";
2627

@@ -41,6 +42,7 @@ declare const fullApi: ApiFromModules<{
4142
retriesExample: typeof retriesExample;
4243
rlsExample: typeof rlsExample;
4344
sessionsExample: typeof sessionsExample;
45+
streamsExample: typeof streamsExample;
4446
testingFunctions: typeof testingFunctions;
4547
triggersExample: typeof triggersExample;
4648
}>;

convex/schema.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,17 @@ export default defineSchema({
3232
sum_table: defineTable({ sum: v.number() }),
3333
notes: defineTable({ session: v.string(), note: v.string() }),
3434
migrations: migrationsTable,
35+
privateMessages: defineTable({
36+
from: v.string(),
37+
to: v.string(),
38+
message: v.string(),
39+
// we have creation time, but let's say we want to store it separately
40+
sentAt: v.number(),
41+
})
42+
// inbox
43+
.index("to", ["to", "sentAt"])
44+
// outbox
45+
.index("from", ["from", "sentAt"])
46+
// pairwise
47+
.index("from_to", ["from", "to", "sentAt"]),
3548
});

convex/streamsExample.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { v } from "convex/values";
2+
import { api, internal } from "./_generated/api.js";
3+
import type { Doc, Id } from "./_generated/dataModel.js";
4+
import {
5+
action,
6+
internalAction,
7+
internalMutation,
8+
internalQuery,
9+
mutation,
10+
query,
11+
} from "./_generated/server.js";
12+
import { stream, mergedStream } from "convex-helpers/server/stream";
13+
import schema from "./schema.js";
14+
import { paginationOptsValidator } from "convex/server";
15+
16+
export const getInbox = query({
17+
args: {
18+
id: v.string(),
19+
paginationOpts: paginationOptsValidator,
20+
},
21+
handler: async (ctx, args) => {
22+
const messages = await stream(ctx.db, schema)
23+
.query("privateMessages")
24+
.withIndex("to", (q) => q.eq("to", args.id))
25+
.order("desc")
26+
.paginate(args.paginationOpts);
27+
return messages;
28+
},
29+
});
30+
31+
export const getOutbox = query({
32+
args: {
33+
id: v.string(),
34+
paginationOpts: paginationOptsValidator,
35+
},
36+
handler: async (ctx, args) => {
37+
const messages = await ctx.db
38+
.query("privateMessages")
39+
.withIndex("from", (q) => q.eq("from", args.id))
40+
.order("desc")
41+
.paginate(args.paginationOpts);
42+
return messages;
43+
},
44+
});
45+
46+
export const getMessagesBetween = query({
47+
args: {
48+
a: v.string(),
49+
b: v.string(),
50+
paginationOpts: paginationOptsValidator,
51+
},
52+
handler: async (ctx, args) => {
53+
const aToB = stream(ctx.db, schema)
54+
.query("privateMessages")
55+
.withIndex("from_to", (q) => q.eq("from", args.a).eq("to", args.b))
56+
.order("desc");
57+
const bToA = stream(ctx.db, schema)
58+
.query("privateMessages")
59+
.withIndex("from_to", (q) => q.eq("from", args.b).eq("to", args.a))
60+
.order("desc");
61+
62+
// Both indexes have the "sentAt" field after the fields they're doing
63+
// equality on, so they're both sorted by "sentAt" descending, so they
64+
// can be merged together.
65+
const messages = await mergedStream([aToB, bToA], ["sentAt"]).paginate(
66+
args.paginationOpts,
67+
);
68+
return messages;
69+
},
70+
});
71+
72+
export const sendMessage = mutation({
73+
args: {
74+
from: v.string(),
75+
to: v.string(),
76+
message: v.string(),
77+
},
78+
handler: async (ctx, args) => {
79+
await ctx.db.insert("privateMessages", {
80+
from: args.from,
81+
to: args.to,
82+
message: args.message,
83+
sentAt: Date.now(),
84+
});
85+
},
86+
});

src/App.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import Counter from "./components/Counter";
22
import RelationshipExample from "./components/RelationshipExample";
33
import SessionsExample from "./components/SessionsExample";
44
import { HonoExample } from "./components/HonoExample";
5+
import { StreamsExample } from "./components/StreamsExample";
56
import { SessionProvider } from "convex-helpers/react/sessions";
67
import { CacheExample } from "./components/CacheExample";
78
import { ConvexQueryCacheProvider } from "convex-helpers/react/cache";
@@ -21,6 +22,7 @@ export default function App() {
2122
<SessionsExample />
2223
<HonoExample />
2324
<CacheExample />
25+
<StreamsExample />
2426
</ConvexQueryCacheProvider>
2527
</SessionProvider>
2628
</main>

0 commit comments

Comments
 (0)