Skip to content

Commit 1126267

Browse files
committed
fix(rivetkit): skip sending RivetKit messages to conns that do not support it
1 parent dba206d commit 1126267

File tree

5 files changed

+131
-100
lines changed

5 files changed

+131
-100
lines changed

rivetkit-typescript/packages/rivetkit/src/actor/conn/driver.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import type { AnyConn } from "@/actor/conn/mod";
22
import type { AnyActorInstance } from "@/actor/instance/mod";
33
import type { CachedSerializer } from "@/actor/protocol/serde";
4-
import type * as protocol from "@/schemas/client-protocol/mod";
54

65
export enum DriverReadyState {
76
UNKNOWN = -1,
@@ -15,6 +14,22 @@ export interface ConnDriver {
1514
/** The type of driver. Used for debug purposes only. */
1615
type: string;
1716

17+
/**
18+
* If defined, this connection driver talks the RivetKit client driver (see
19+
* schemas/client-protocol/).
20+
*
21+
* If enabled, events like `Init`, subscription events, etc. will be sent
22+
* to this connection.
23+
*/
24+
rivetKitProtocol?: {
25+
/** Sends a RivetKit client message. */
26+
sendMessage(
27+
actor: AnyActorInstance,
28+
conn: AnyConn,
29+
message: CachedSerializer<any, any, any>,
30+
): void;
31+
};
32+
1833
/**
1934
* Unique request ID provided by the underlying provider. If none is
2035
* available for this conn driver, a random UUID is generated.
@@ -29,12 +44,6 @@ export interface ConnDriver {
2944
**/
3045
hibernatable: boolean;
3146

32-
sendMessage?(
33-
actor: AnyActorInstance,
34-
conn: AnyConn,
35-
message: CachedSerializer<any, any, any>,
36-
): void;
37-
3847
/**
3948
* This returns a promise since we commonly disconnect at the end of a program, and not waiting will cause the socket to not close cleanly.
4049
*/

rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/websocket.ts

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -23,69 +23,71 @@ export function createWebSocketSocket(
2323
requestId,
2424
requestIdBuf,
2525
hibernatable,
26-
sendMessage: (
27-
actor: AnyActorInstance,
28-
conn: AnyConn,
29-
message: CachedSerializer<any, any, any>,
30-
) => {
31-
if (!websocket) {
32-
actor.rLog.warn({
33-
msg: "websocket not open",
34-
connId: conn.id,
35-
});
36-
return;
37-
}
38-
if (websocket.readyState !== DriverReadyState.OPEN) {
39-
actor.rLog.warn({
40-
msg: "attempting to send message to closed websocket, this is likely a bug in RivetKit",
41-
connId: conn.id,
42-
wsReadyState: websocket.readyState,
43-
});
44-
return;
45-
}
26+
rivetKitProtocol: {
27+
sendMessage: (
28+
actor: AnyActorInstance,
29+
conn: AnyConn,
30+
message: CachedSerializer<any, any, any>,
31+
) => {
32+
if (!websocket) {
33+
actor.rLog.warn({
34+
msg: "websocket not open",
35+
connId: conn.id,
36+
});
37+
return;
38+
}
39+
if (websocket.readyState !== DriverReadyState.OPEN) {
40+
actor.rLog.warn({
41+
msg: "attempting to send message to closed websocket, this is likely a bug in RivetKit",
42+
connId: conn.id,
43+
wsReadyState: websocket.readyState,
44+
});
45+
return;
46+
}
4647

47-
const serialized = message.serialize(encoding);
48+
const serialized = message.serialize(encoding);
4849

49-
actor.rLog.debug({
50-
msg: "sending websocket message",
51-
encoding: encoding,
52-
dataType: typeof serialized,
53-
isUint8Array: serialized instanceof Uint8Array,
54-
isArrayBuffer: serialized instanceof ArrayBuffer,
55-
dataLength:
56-
(serialized as any).byteLength ||
57-
(serialized as any).length,
58-
});
50+
actor.rLog.debug({
51+
msg: "sending websocket message",
52+
encoding: encoding,
53+
dataType: typeof serialized,
54+
isUint8Array: serialized instanceof Uint8Array,
55+
isArrayBuffer: serialized instanceof ArrayBuffer,
56+
dataLength:
57+
(serialized as any).byteLength ||
58+
(serialized as any).length,
59+
});
5960

60-
// Convert Uint8Array to ArrayBuffer for proper transmission
61-
if (serialized instanceof Uint8Array) {
62-
const buffer = serialized.buffer.slice(
63-
serialized.byteOffset,
64-
serialized.byteOffset + serialized.byteLength,
65-
);
66-
// Handle SharedArrayBuffer case
67-
if (buffer instanceof SharedArrayBuffer) {
68-
const arrayBuffer = new ArrayBuffer(buffer.byteLength);
69-
new Uint8Array(arrayBuffer).set(new Uint8Array(buffer));
70-
actor.rLog.debug({
71-
msg: "converted SharedArrayBuffer to ArrayBuffer",
72-
byteLength: arrayBuffer.byteLength,
73-
});
74-
websocket.send(arrayBuffer);
61+
// Convert Uint8Array to ArrayBuffer for proper transmission
62+
if (serialized instanceof Uint8Array) {
63+
const buffer = serialized.buffer.slice(
64+
serialized.byteOffset,
65+
serialized.byteOffset + serialized.byteLength,
66+
);
67+
// Handle SharedArrayBuffer case
68+
if (buffer instanceof SharedArrayBuffer) {
69+
const arrayBuffer = new ArrayBuffer(buffer.byteLength);
70+
new Uint8Array(arrayBuffer).set(new Uint8Array(buffer));
71+
actor.rLog.debug({
72+
msg: "converted SharedArrayBuffer to ArrayBuffer",
73+
byteLength: arrayBuffer.byteLength,
74+
});
75+
websocket.send(arrayBuffer);
76+
} else {
77+
actor.rLog.debug({
78+
msg: "sending ArrayBuffer",
79+
byteLength: buffer.byteLength,
80+
});
81+
websocket.send(buffer);
82+
}
7583
} else {
7684
actor.rLog.debug({
77-
msg: "sending ArrayBuffer",
78-
byteLength: buffer.byteLength,
85+
msg: "sending string data",
86+
length: (serialized as string).length,
7987
});
80-
websocket.send(buffer);
88+
websocket.send(serialized);
8189
}
82-
} else {
83-
actor.rLog.debug({
84-
msg: "sending string data",
85-
length: (serialized as string).length,
86-
});
87-
websocket.send(serialized);
88-
}
90+
},
8991
},
9092

9193
disconnect: async (

rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export type ConnId = string;
2323
export type AnyConn = Conn<any, any, any, any, any, any>;
2424

2525
export const CONN_CONNECTED_SYMBOL = Symbol("connected");
26+
export const CONN_SPEAKS_RIVETKIT_SYMBOL = Symbol("speaksRivetKit");
2627
export const CONN_PERSIST_SYMBOL = Symbol("persist");
2728
export const CONN_DRIVER_SYMBOL = Symbol("driver");
2829
export const CONN_ACTOR_SYMBOL = Symbol("actor");
@@ -62,6 +63,10 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
6263
/** Connections exist before being connected to an actor. If true, this connection has been connected. */
6364
[CONN_CONNECTED_SYMBOL] = false;
6465

66+
[CONN_SPEAKS_RIVETKIT_SYMBOL](): boolean {
67+
return this[CONN_DRIVER_SYMBOL]?.rivetKitProtocol !== undefined;
68+
}
69+
6570
#assertConnected() {
6671
if (!this[CONN_CONNECTED_SYMBOL])
6772
throw new InternalError(
@@ -174,11 +179,12 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
174179
[CONN_SEND_MESSAGE_SYMBOL](message: CachedSerializer<any, any, any>) {
175180
if (this[CONN_DRIVER_SYMBOL]) {
176181
const driver = this[CONN_DRIVER_SYMBOL];
177-
if (driver.sendMessage) {
178-
driver.sendMessage(this.#actor, this, message);
182+
183+
if (driver.rivetKitProtocol) {
184+
driver.rivetKitProtocol.sendMessage(this.#actor, this, message);
179185
} else {
180186
this.#actor.rLog.debug({
181-
msg: "conn driver does not support sending messages",
187+
msg: "attempting to send RivetKit protocol message to connection that does not support it",
182188
conn: this.id,
183189
});
184190
}
@@ -199,6 +205,13 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
199205
*/
200206
send(eventName: string, ...args: unknown[]) {
201207
this.#assertConnected();
208+
if (!this[CONN_SPEAKS_RIVETKIT_SYMBOL]) {
209+
this.#actor.rLog.warn({
210+
msg: "cannot send messages to this connection type",
211+
connId: this.id,
212+
connType: this[CONN_DRIVER_SYMBOL]?.type,
213+
});
214+
}
202215

203216
this.#actor.inspector.emitter.emit("eventFired", {
204217
type: "event",

rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
CONN_PERSIST_RAW_SYMBOL,
1212
CONN_PERSIST_SYMBOL,
1313
CONN_SEND_MESSAGE_SYMBOL,
14+
CONN_SPEAKS_RIVETKIT_SYMBOL,
1415
Conn,
1516
type ConnId,
1617
} from "../conn/mod";
@@ -155,30 +156,31 @@ export class ConnectionManager<
155156

156157
conn[CONN_CONNECTED_SYMBOL] = true;
157158

158-
// TODO: Only do this for action messages
159159
// Send init message
160-
const initData = { actorId: this.#actor.id, connectionId: conn.id };
161-
conn[CONN_SEND_MESSAGE_SYMBOL](
162-
new CachedSerializer(
163-
initData,
164-
TO_CLIENT_VERSIONED,
165-
ToClientSchema,
166-
// JSON: identity conversion (no nested data to encode)
167-
(value) => ({
168-
body: {
169-
tag: "Init" as const,
170-
val: value,
171-
},
172-
}),
173-
// BARE/CBOR: identity conversion (no nested data to encode)
174-
(value) => ({
175-
body: {
176-
tag: "Init" as const,
177-
val: value,
178-
},
179-
}),
180-
),
181-
);
160+
if (conn[CONN_SPEAKS_RIVETKIT_SYMBOL]) {
161+
const initData = { actorId: this.#actor.id, connectionId: conn.id };
162+
conn[CONN_SEND_MESSAGE_SYMBOL](
163+
new CachedSerializer(
164+
initData,
165+
TO_CLIENT_VERSIONED,
166+
ToClientSchema,
167+
// JSON: identity conversion (no nested data to encode)
168+
(value) => ({
169+
body: {
170+
tag: "Init" as const,
171+
val: value,
172+
},
173+
}),
174+
// BARE/CBOR: identity conversion (no nested data to encode)
175+
(value) => ({
176+
body: {
177+
tag: "Init" as const,
178+
val: value,
179+
},
180+
}),
181+
),
182+
);
183+
}
182184
}
183185

184186
/**

rivetkit-typescript/packages/rivetkit/src/actor/instance/event-manager.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { bufferToArrayBuffer } from "@/utils";
99
import {
1010
CONN_PERSIST_SYMBOL,
1111
CONN_SEND_MESSAGE_SYMBOL,
12+
CONN_SPEAKS_RIVETKIT_SYMBOL,
1213
type Conn,
1314
} from "../conn/mod";
1415
import type { AnyDatabaseProvider } from "../database";
@@ -215,17 +216,21 @@ export class EventManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
215216
// Send to all subscribers
216217
let sentCount = 0;
217218
for (const connection of subscribers) {
218-
try {
219-
connection[CONN_SEND_MESSAGE_SYMBOL](toClientSerializer);
220-
sentCount++;
221-
} catch (error) {
222-
this.#actor.rLog.error({
223-
msg: "failed to send event to connection",
224-
eventName: name,
225-
connId: connection.id,
226-
error:
227-
error instanceof Error ? error.message : String(error),
228-
});
219+
if (connection[CONN_SPEAKS_RIVETKIT_SYMBOL]) {
220+
try {
221+
connection[CONN_SEND_MESSAGE_SYMBOL](toClientSerializer);
222+
sentCount++;
223+
} catch (error) {
224+
this.#actor.rLog.error({
225+
msg: "failed to send event to connection",
226+
eventName: name,
227+
connId: connection.id,
228+
error:
229+
error instanceof Error
230+
? error.message
231+
: String(error),
232+
});
233+
}
229234
}
230235
}
231236

0 commit comments

Comments
 (0)