Skip to content

Commit acb0447

Browse files
committed
feat(rivetkit): add json protocol support
1 parent cefe601 commit acb0447

File tree

15 files changed

+503
-169
lines changed

15 files changed

+503
-169
lines changed

rivetkit-typescript/packages/rivetkit/src/actor/conn/driver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export interface ConnDriver {
3232
sendMessage?(
3333
actor: AnyActorInstance,
3434
conn: AnyConn,
35-
message: CachedSerializer<protocol.ToClient>,
35+
message: CachedSerializer<any, any, any>,
3636
): void;
3737

3838
/**

rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/websocket.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export function createWebSocketSocket(
2323
sendMessage: (
2424
actor: AnyActorInstance,
2525
conn: AnyConn,
26-
message: CachedSerializer<protocol.ToClient>,
26+
message: CachedSerializer<any, any, any>,
2727
) => {
2828
if (websocket.readyState !== DriverReadyState.OPEN) {
2929
actor.rLog.warn({

rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import * as cbor from "cbor-x";
2-
import { ToClientSchema } from "@/actor/client-protocol-schema-json/mod";
32
import type * as protocol from "@/schemas/client-protocol/mod";
43
import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
4+
import {
5+
type ToClient as ToClientJson,
6+
ToClientSchema,
7+
} from "@/schemas/client-protocol-zod/mod";
58
import { arrayBuffersEqual, bufferToArrayBuffer } from "@/utils";
69
import type { AnyDatabaseProvider } from "../database";
710
import {
@@ -161,7 +164,7 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
161164
return this.#stateManager.persistRaw;
162165
}
163166

164-
[CONN_SEND_MESSAGE_SYMBOL](message: CachedSerializer<protocol.ToClient>) {
167+
[CONN_SEND_MESSAGE_SYMBOL](message: CachedSerializer<any, any, any>) {
165168
if (this[CONN_DRIVER_SYMBOL]) {
166169
const driver = this[CONN_DRIVER_SYMBOL];
167170
if (driver.sendMessage) {
@@ -194,19 +197,32 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
194197
args,
195198
connId: this.id,
196199
});
200+
const eventData = { name: eventName, args };
197201
this[CONN_SEND_MESSAGE_SYMBOL](
198-
new CachedSerializer<protocol.ToClient>(
199-
{
202+
new CachedSerializer(
203+
eventData,
204+
TO_CLIENT_VERSIONED,
205+
ToClientSchema,
206+
// JSON: args is the raw value (array of arguments)
207+
(value): ToClientJson => ({
200208
body: {
201-
tag: "Event",
209+
tag: "Event" as const,
202210
val: {
203-
name: eventName,
204-
args: bufferToArrayBuffer(cbor.encode(args)),
211+
name: value.name,
212+
args: value.args,
205213
},
206214
},
207-
},
208-
TO_CLIENT_VERSIONED,
209-
ToClientSchema,
215+
}),
216+
// BARE/CBOR: args needs to be CBOR-encoded to ArrayBuffer
217+
(value): protocol.ToClient => ({
218+
body: {
219+
tag: "Event" as const,
220+
val: {
221+
name: value.name,
222+
args: bufferToArrayBuffer(cbor.encode(value.args)),
223+
},
224+
},
225+
}),
210226
),
211227
);
212228
}

rivetkit-typescript/packages/rivetkit/src/actor/instance/event-manager.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import * as cbor from "cbor-x";
2-
import { ToClientSchema } from "@/actor/client-protocol-schema-json/mod";
32
import type * as protocol from "@/schemas/client-protocol/mod";
43
import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
4+
import {
5+
type ToClient as ToClientJson,
6+
ToClientSchema,
7+
} from "@/schemas/client-protocol-zod/mod";
58
import { bufferToArrayBuffer } from "@/utils";
69
import {
710
CONN_PERSIST_SYMBOL,
@@ -180,18 +183,31 @@ export class EventManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
180183
}
181184

182185
// Create serialized message
183-
const toClientSerializer = new CachedSerializer<protocol.ToClient>(
184-
{
186+
const eventData = { name, args };
187+
const toClientSerializer = new CachedSerializer(
188+
eventData,
189+
TO_CLIENT_VERSIONED,
190+
ToClientSchema,
191+
// JSON: args is the raw value (array of arguments)
192+
(value): ToClientJson => ({
185193
body: {
186-
tag: "Event",
194+
tag: "Event" as const,
187195
val: {
188-
name,
189-
args: bufferToArrayBuffer(cbor.encode(args)),
196+
name: value.name,
197+
args: value.args,
190198
},
191199
},
192-
},
193-
TO_CLIENT_VERSIONED,
194-
ToClientSchema,
200+
}),
201+
// BARE/CBOR: args needs to be CBOR-encoded to ArrayBuffer
202+
(value): protocol.ToClient => ({
203+
body: {
204+
tag: "Event" as const,
205+
val: {
206+
name: value.name,
207+
args: bufferToArrayBuffer(cbor.encode(value.args)),
208+
},
209+
},
210+
}),
195211
);
196212

197213
// Send to all subscribers

rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import * as cbor from "cbor-x";
22
import invariant from "invariant";
3-
import { ToClientSchema } from "@/actor/client-protocol-schema-json/mod";
43
import type { ActorKey } from "@/actor/mod";
54
import type { Client } from "@/client/client";
65
import { getBaseLogger, getIncludeTarget, type Logger } from "@/common/log";
@@ -11,6 +10,7 @@ import type { Registry } from "@/mod";
1110
import { ACTOR_VERSIONED } from "@/schemas/actor-persist/versioned";
1211
import type * as protocol from "@/schemas/client-protocol/mod";
1312
import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
13+
import { ToClientSchema } from "@/schemas/client-protocol-zod/mod";
1414
import { EXTRA_ERROR_LOG, idToStr } from "@/utils";
1515
import type { ActorConfig, InitContext } from "../config";
1616
import type { ConnDriver } from "../conn/driver";
@@ -511,19 +511,26 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
511511
await this.saveState({ immediate: true });
512512

513513
// Send init message
514+
const initData = { actorId: this.id, connectionId: conn.id };
514515
conn[CONN_SEND_MESSAGE_SYMBOL](
515-
new CachedSerializer<protocol.ToClient>(
516-
{
517-
body: {
518-
tag: "Init",
519-
val: {
520-
actorId: this.id,
521-
connectionId: conn.id,
522-
},
523-
},
524-
},
516+
new CachedSerializer(
517+
initData,
525518
TO_CLIENT_VERSIONED,
526519
ToClientSchema,
520+
// JSON: identity conversion (no nested data to encode)
521+
(value) => ({
522+
body: {
523+
tag: "Init" as const,
524+
val: value,
525+
},
526+
}),
527+
// BARE/CBOR: identity conversion (no nested data to encode)
528+
(value) => ({
529+
body: {
530+
tag: "Init" as const,
531+
val: value,
532+
},
533+
}),
527534
),
528535
);
529536

@@ -532,7 +539,17 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
532539

533540
// MARK: - Message Processing
534541
async processMessage(
535-
message: protocol.ToServer,
542+
message: {
543+
body:
544+
| {
545+
tag: "ActionRequest";
546+
val: { id: bigint; name: string; args: unknown };
547+
}
548+
| {
549+
tag: "SubscriptionRequest";
550+
val: { eventName: string; subscribe: boolean };
551+
};
552+
},
536553
conn: Conn<S, CP, CS, V, I, DB>,
537554
) {
538555
await processMessage(message, this, conn, {

0 commit comments

Comments
 (0)