Skip to content

Commit 3a3f271

Browse files
thomasballingerConvex, Inc.
authored andcommitted
Split transition messages over 5MB into multiple TransitionChunk messages (#41607)
To prevent large Transition messages on slow network connections from making the server appear unresponsive while they download, split Transition messages into chunks. This currently requires npm version 1.27.5 (unreleased). GitOrigin-RevId: 376609422e5496f62c07f3913a0f476733c84b43
1 parent 54021a0 commit 3a3f271

File tree

7 files changed

+192
-16
lines changed

7 files changed

+192
-16
lines changed

src/browser/sync/client.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import {
2323
MutationRequest,
2424
QueryId,
2525
QueryJournal,
26-
RequestId,
2726
ServerMessage,
27+
RequestId,
2828
TS,
2929
UserIdentityAttributes,
3030
} from "./protocol.js";
@@ -485,8 +485,6 @@ export class BaseConvexClient {
485485
void this.webSocketManager.terminate();
486486
throw error;
487487
}
488-
case "Ping":
489-
break; // do nothing
490488
default: {
491489
serverMessage satisfies never;
492490
}

src/browser/sync/client_node.test.ts

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
MutationRequest,
1010
parseServerMessage,
1111
RequestId,
12-
ServerMessage,
12+
WireServerMessage,
1313
} from "./protocol.js";
1414
import {
1515
encodeServerMessage,
@@ -62,7 +62,7 @@ test("BaseConvexClient closes cleanly", () => {
6262
});
6363

6464
test("Tests can encode longs in server messages", () => {
65-
const orig: ServerMessage = {
65+
const orig: WireServerMessage = {
6666
type: "Transition",
6767
startVersion: { querySet: 0, identity: 0, ts: Long.fromNumber(0) },
6868
endVersion: { querySet: 1, identity: 0, ts: Long.fromNumber(1) },
@@ -103,7 +103,7 @@ test("Actions can be called immediately", async () => {
103103
});
104104
});
105105

106-
function actionSuccess(requestId: RequestId): ServerMessage {
106+
function actionSuccess(requestId: RequestId): WireServerMessage {
107107
return {
108108
type: "ActionResponse",
109109
requestId: requestId,
@@ -779,3 +779,74 @@ test("Query unsubscription triggers empty transition for listeners", async () =>
779779
await client.close();
780780
}, true);
781781
});
782+
783+
test("TransitionChunk messages are assembled into a Transition", async () => {
784+
await withInMemoryWebSocket(async ({ address, receive, send }) => {
785+
const client = new BaseConvexClient(address, () => null, {
786+
webSocketConstructor: nodeWebSocket,
787+
unsavedChangesWarning: false,
788+
});
789+
expect((await receive()).type).toEqual("Connect");
790+
expect((await receive()).type).toEqual("ModifyQuerySet");
791+
792+
const fullTransition: WireServerMessage = {
793+
type: "Transition",
794+
startVersion: {
795+
querySet: 0,
796+
ts: Long.fromNumber(0),
797+
identity: 0,
798+
},
799+
endVersion: {
800+
querySet: 1,
801+
ts: Long.fromNumber(1000),
802+
identity: 0,
803+
},
804+
modifications: [
805+
{
806+
type: "QueryUpdated",
807+
queryId: 0,
808+
value: { result: "chunk test data" },
809+
logLines: [],
810+
journal: null,
811+
},
812+
],
813+
};
814+
815+
const fullJson = encodeServerMessage(fullTransition);
816+
const midpoint = Math.floor(fullJson.length / 2);
817+
const chunk1 = fullJson.substring(0, midpoint);
818+
const chunk2 = fullJson.substring(midpoint);
819+
820+
send({
821+
type: "TransitionChunk",
822+
chunk: chunk1,
823+
partNumber: 0,
824+
totalParts: 2,
825+
messageLength: fullJson.length,
826+
});
827+
828+
expect(client.getMaxObservedTimestamp()).toBeUndefined();
829+
830+
send({
831+
type: "TransitionChunk",
832+
chunk: chunk2,
833+
partNumber: 1,
834+
totalParts: 2,
835+
messageLength: fullJson.length,
836+
});
837+
838+
// synchronously, it's still undefined
839+
expect(client.getMaxObservedTimestamp()).toEqual(undefined);
840+
841+
for (let i = 0; i < 10; i++) {
842+
if (client.getMaxObservedTimestamp()) {
843+
break;
844+
}
845+
await new Promise((resolve) => setTimeout(resolve, 100));
846+
}
847+
848+
expect(client.getMaxObservedTimestamp()).toEqual(Long.fromNumber(1000));
849+
850+
await client.close();
851+
});
852+
});

src/browser/sync/client_node_test_helpers.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ import WebSocket, { WebSocketServer } from "ws";
77
// Let's pretend this ws WebSocket is a browser WebSocket (it's very close)
88
export const nodeWebSocket = WebSocket as unknown as typeof window.WebSocket;
99

10-
import { ClientMessage, ServerMessage } from "./protocol.js";
10+
import { ClientMessage, WireServerMessage } from "./protocol.js";
1111
import { QueryToken } from "./udf_path_utils.js";
1212
import { BaseConvexClient } from "./client.js";
1313

1414
export type InMemoryWebSocketTest = (args: {
1515
address: string;
1616
socket: () => WebSocket;
1717
receive: () => Promise<ClientMessage>;
18-
send: (message: ServerMessage) => void;
18+
send: (message: WireServerMessage) => void;
1919
close: () => void;
2020
}) => Promise<void>;
2121

@@ -66,7 +66,7 @@ export async function withInMemoryWebSocket(
6666
const structured = JSON.parse(text);
6767
return structured;
6868
}
69-
function send(message: ServerMessage) {
69+
function send(message: WireServerMessage) {
7070
// eslint-disable-next-line no-console
7171
if (debug) console.debug(` <--${message.type}-- server`);
7272
socket!.send(encodeServerMessage(message));
@@ -97,7 +97,7 @@ export async function withInMemoryWebSocket(
9797
}
9898
}
9999

100-
export function encodeServerMessage(message: ServerMessage): string {
100+
export function encodeServerMessage(message: WireServerMessage): string {
101101
function replacer(_key: string, value: any) {
102102
if (Long.isLong(value)) {
103103
return encodeLong(value);

src/browser/sync/protocol.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ export function longToU64(raw: U64): EncodedU64 {
1919

2020
export function parseServerMessage(
2121
encoded: EncodedServerMessage,
22-
): ServerMessage {
22+
): WireServerMessage {
2323
switch (encoded.type) {
2424
case "FatalError":
2525
case "AuthError":
2626
case "ActionResponse":
27+
case "TransitionChunk":
2728
case "Ping": {
2829
return { ...encoded };
2930
}
@@ -262,6 +263,14 @@ export type Transition = {
262263
serverTs?: number;
263264
};
264265

266+
export type TransitionChunk = {
267+
type: "TransitionChunk";
268+
chunk: string;
269+
partNumber: number;
270+
totalParts: number;
271+
messageLength: number;
272+
};
273+
265274
type MutationSuccess = {
266275
type: "MutationResponse";
267276
requestId: RequestId;
@@ -312,11 +321,20 @@ type Ping = {
312321
type: "Ping";
313322
};
314323

324+
// Server Messages without the messages only visible to WebSocketManager
315325
export type ServerMessage =
316326
| Transition
317327
| MutationResponse
318328
| ActionResponse
319329
| FatalError
330+
| AuthError;
331+
332+
export type WireServerMessage =
333+
| Transition
334+
| TransitionChunk
335+
| MutationResponse
336+
| ActionResponse
337+
| FatalError
320338
| AuthError
321339
| Ping;
322340

@@ -329,6 +347,7 @@ type EncodedMutationResponse = MutationFailed | EncodedMutationSuccess;
329347

330348
type EncodedServerMessage =
331349
| EncodedTransition
350+
| TransitionChunk
332351
| EncodedMutationResponse
333352
| ActionResponse
334353
| FatalError

src/browser/sync/web_socket_manager.ts

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
parseServerMessage,
66
ServerMessage,
77
Transition,
8+
TransitionChunk,
89
} from "./protocol.js";
910

1011
const CLOSE_NORMAL = 1000;
@@ -162,6 +163,13 @@ export class WebSocketManager {
162163
| (string & {}) // a full serverErrorReason (not just the prefix) or a new one
163164
| null;
164165

166+
// State for assembling the split-up Transition currently being received.
167+
private transitionChunkBuffer: {
168+
chunks: string[];
169+
totalParts: number;
170+
messageLength: number;
171+
} | null = null;
172+
165173
/** Upon HTTPS/WSS failure, the first jittered backoff duration, in ms. */
166174
private readonly defaultInitialBackoff: number;
167175

@@ -239,6 +247,60 @@ export class WebSocketManager {
239247
this.markConnectionStateDirty();
240248
}
241249

250+
private assembleTransition(chunk: TransitionChunk): Transition | null {
251+
if (
252+
chunk.partNumber < 0 ||
253+
chunk.partNumber >= chunk.totalParts ||
254+
chunk.totalParts === 0 ||
255+
(this.transitionChunkBuffer &&
256+
(this.transitionChunkBuffer.totalParts !== chunk.totalParts ||
257+
this.transitionChunkBuffer.messageLength !== chunk.messageLength))
258+
) {
259+
// Throwing an error doesn't crash the client, so clear the buffer.
260+
this.transitionChunkBuffer = null;
261+
throw new Error("Invalid TransitionChunk");
262+
}
263+
264+
if (this.transitionChunkBuffer === null) {
265+
this.transitionChunkBuffer = {
266+
chunks: [],
267+
totalParts: chunk.totalParts,
268+
messageLength: chunk.messageLength,
269+
};
270+
}
271+
272+
if (chunk.partNumber !== this.transitionChunkBuffer.chunks.length) {
273+
// Throwing an error doesn't crash the client, so clear the buffer.
274+
const expectedLength = this.transitionChunkBuffer.chunks.length;
275+
this.transitionChunkBuffer = null;
276+
throw new Error(
277+
`TransitionChunk received out of order: expected part ${expectedLength}, got ${chunk.partNumber}`,
278+
);
279+
}
280+
281+
this.transitionChunkBuffer.chunks.push(chunk.chunk);
282+
283+
if (this.transitionChunkBuffer.chunks.length === chunk.totalParts) {
284+
const fullJson = this.transitionChunkBuffer.chunks.join("");
285+
this.transitionChunkBuffer = null;
286+
287+
if (fullJson.length !== chunk.messageLength) {
288+
throw new Error(
289+
`Assembled Transition length mismatch: expected ${chunk.messageLength}, got ${fullJson.length}`,
290+
);
291+
}
292+
const transition = parseServerMessage(JSON.parse(fullJson));
293+
if (transition.type !== "Transition") {
294+
throw new Error(
295+
`Expected Transition, got ${transition.type} after assembling chunks`,
296+
);
297+
}
298+
return transition;
299+
}
300+
301+
return null;
302+
}
303+
242304
private connect() {
243305
if (this.socket.state === "terminated") {
244306
return;
@@ -304,6 +366,7 @@ export class WebSocketManager {
304366
};
305367
// NB: The WebSocket API calls `onclose` even if connection fails, so we can route all error paths through `onclose`.
306368
ws.onerror = (error) => {
369+
this.transitionChunkBuffer = null;
307370
const message = (error as ErrorEvent).message;
308371
if (message) {
309372
this.logger.log(`WebSocket error message: ${message}`);
@@ -312,8 +375,32 @@ export class WebSocketManager {
312375
ws.onmessage = (message) => {
313376
this.resetServerInactivityTimeout();
314377
const messageLength = message.data.length;
315-
const serverMessage = parseServerMessage(JSON.parse(message.data));
378+
let serverMessage = parseServerMessage(JSON.parse(message.data));
316379
this._logVerbose(`received ws message with type ${serverMessage.type}`);
380+
381+
// Ping's only purpose is to reset the server inactivity timer.
382+
if (serverMessage.type === "Ping") {
383+
return;
384+
}
385+
386+
// TransitionChunks never reach the main client logic.
387+
if (serverMessage.type === "TransitionChunk") {
388+
const transition = this.assembleTransition(serverMessage);
389+
if (!transition) {
390+
return;
391+
}
392+
serverMessage = transition;
393+
this._logVerbose(
394+
`assembled full ws message of type ${serverMessage.type}`,
395+
);
396+
}
397+
398+
if (this.transitionChunkBuffer !== null) {
399+
throw new Error(
400+
`Received unexpected ${serverMessage.type} while buffering TransitionChunks`,
401+
);
402+
}
403+
317404
if (serverMessage.type === "Transition") {
318405
this.reportLargeTransition({
319406
messageLength,
@@ -329,6 +416,7 @@ export class WebSocketManager {
329416
};
330417
ws.onclose = (event) => {
331418
this._logVerbose("begin ws.onclose");
419+
this.transitionChunkBuffer = null;
332420
if (this.lastCloseReason === null) {
333421
// event.reason is often an empty string
334422
this.lastCloseReason = event.reason || `closed with code ${event.code}`;

src/react/auth_websocket.test.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { Long } from "../vendor/long.js";
1414
import {
1515
AuthError,
1616
ClientMessage,
17-
ServerMessage,
17+
WireServerMessage,
1818
} from "../browser/sync/protocol.js";
1919

2020
const testReactClient = (address: string, options?: ConvexReactClientOptions) =>
@@ -1152,7 +1152,7 @@ async function assertReconnectWithAuth(
11521152
}
11531153

11541154
async function simulateAuthError(args: {
1155-
send: (message: ServerMessage) => void;
1155+
send: (message: WireServerMessage) => void;
11561156
close: () => void;
11571157
authError: AuthError;
11581158
}) {

src/react/react_node.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { ConvexReactClient } from "./client.js";
55
import {
66
ClientMessage,
77
QuerySetModification,
8-
ServerMessage,
8+
WireServerMessage,
99
} from "../browser/sync/protocol.js";
1010
import {
1111
nodeWebSocket,
@@ -60,7 +60,7 @@ const expectQuerySetModification = (
6060
return message;
6161
};
6262

63-
function transition(): ServerMessage {
63+
function transition(): WireServerMessage {
6464
return {
6565
type: "Transition",
6666
startVersion: { querySet: 0, identity: 0, ts: Long.fromNumber(0) },

0 commit comments

Comments
 (0)