Skip to content

Commit 6e2470f

Browse files
committed
feat(reactive-rpc): 🎸 add support for fragmented ws messages
1 parent f15c9ac commit 6e2470f

File tree

2 files changed

+105
-38
lines changed

2 files changed

+105
-38
lines changed

src/reactive-rpc/server/ws/server/WsServerConnection.ts

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import * as crypto from 'crypto';
22
import * as stream from 'stream';
33
import {WsCloseFrame, WsFrameDecoder, WsFrameHeader, WsFrameOpcode, WsPingFrame, WsPongFrame} from '../codec';
44
import {utf8Size} from '../../../../util/strings/utf8';
5-
import {FanOut} from 'thingies/es2020/fanout';
65
import type {WsFrameEncoder} from '../codec/WsFrameEncoder';
76

87
export type WsServerConnectionSocket = stream.Duplex;
@@ -12,12 +11,6 @@ export class WsServerConnection {
1211
public maxIncomingMessage: number = 2 * 1024 * 1024;
1312
public maxBackpressure: number = 2 * 1024 * 1024;
1413

15-
/**
16-
* If this is not null, then the connection is receiving a stream: a sequence
17-
* of fragment frames.
18-
*/
19-
protected stream: FanOut<Uint8Array> | null = null;
20-
2114
public readonly defaultOnPing = (data: Uint8Array | null): void => {
2215
this.sendPong(data);
2316
};
@@ -30,49 +23,64 @@ export class WsServerConnection {
3023

3124
constructor(protected readonly encoder: WsFrameEncoder, public readonly socket: WsServerConnectionSocket) {
3225
const decoder = new WsFrameDecoder();
33-
let currentFrame: WsFrameHeader | null = null;
26+
let currentFrameHeader: WsFrameHeader | null = null;
27+
let fragmentStartFrameHeader: WsFrameHeader | null = null;
3428
const handleData = (data: Uint8Array): void => {
3529
try {
3630
decoder.push(data);
37-
if (currentFrame) {
38-
const length = currentFrame.length;
39-
if (length <= decoder.reader.size()) {
40-
const buf = new Uint8Array(length);
41-
decoder.copyFrameData(currentFrame, buf, 0);
42-
const isText = currentFrame.opcode === WsFrameOpcode.TEXT;
43-
currentFrame = null;
44-
this.onmessage(buf, isText);
31+
main: while (true) {
32+
if (currentFrameHeader instanceof WsFrameHeader) {
33+
const length = currentFrameHeader.length;
34+
if (length > this.maxIncomingMessage) {
35+
this.onClose(1009, 'TOO_LARGE');
36+
return;
37+
}
38+
if (length <= decoder.reader.size()) {
39+
const buf = new Uint8Array(length);
40+
decoder.copyFrameData(currentFrameHeader, buf, 0);
41+
if (fragmentStartFrameHeader instanceof WsFrameHeader) {
42+
const isText = fragmentStartFrameHeader.opcode === WsFrameOpcode.TEXT;
43+
const isLast = currentFrameHeader.fin === 1;
44+
currentFrameHeader = null;
45+
if (isLast) fragmentStartFrameHeader = null;
46+
this.onfragment(isLast, buf, isText)
47+
} else {
48+
const isText = currentFrameHeader.opcode === WsFrameOpcode.TEXT;
49+
currentFrameHeader = null;
50+
this.onmessage(buf, isText);
51+
}
52+
} else break;
4553
}
46-
}
47-
while (true) {
4854
const frame = decoder.readFrameHeader();
4955
if (!frame) break;
50-
else if (frame instanceof WsPingFrame) this.onping(frame.data);
51-
else if (frame instanceof WsPongFrame) this.onpong(frame.data);
52-
else if (frame instanceof WsCloseFrame) {
56+
if (frame instanceof WsPingFrame) {
57+
this.onping(frame.data);
58+
continue main;
59+
}
60+
if (frame instanceof WsPongFrame) {
61+
this.onpong(frame.data);
62+
continue main;
63+
}
64+
if (frame instanceof WsCloseFrame) {
5365
decoder.readCloseFrameData(frame);
5466
this.onClose(frame.code, frame.reason);
55-
} else if (frame instanceof WsFrameHeader) {
56-
if (this.stream) {
67+
continue main;
68+
}
69+
if (frame instanceof WsFrameHeader) {
70+
if (fragmentStartFrameHeader) {
5771
if (frame.opcode !== WsFrameOpcode.CONTINUE) {
5872
this.onClose(1002, 'DATA');
5973
return;
6074
}
61-
throw new Error('streaming not implemented');
75+
currentFrameHeader = frame;
6276
}
63-
const length = frame.length;
64-
if (length > this.maxIncomingMessage) {
65-
this.onClose(1009, 'TOO_LARGE');
66-
return;
67-
}
68-
if (length <= decoder.reader.size()) {
69-
const buf = new Uint8Array(length);
70-
decoder.copyFrameData(frame, buf, 0);
71-
const isText = frame.opcode === WsFrameOpcode.TEXT;
72-
this.onmessage(buf, isText);
73-
} else {
74-
currentFrame = frame;
77+
if (frame.fin === 0) {
78+
fragmentStartFrameHeader = frame;
79+
currentFrameHeader = frame;
80+
continue main;
7581
}
82+
currentFrameHeader = frame;
83+
continue main;
7684
}
7785
}
7886
} catch (error) {

src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {WsFrameEncoder} from '../../codec/WsFrameEncoder';
44
import {until} from 'thingies';
55
import {WsFrameOpcode} from '../../codec';
66
import {bufferToUint8Array} from '../../../../../util/buffers/bufferToUint8Array';
7+
import {listToUint8} from '../../../../../util/buffers/concat';
78

89
const setup = () => {
910
const socket = new stream.PassThrough();
@@ -80,15 +81,40 @@ describe('.onmessage', () => {
8081
messages.push([data, isUtf8]);
8182
};
8283
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
83-
const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0);
84+
const frame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0);
8485
encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3);
8586
const payload = encoder.writer.flush();
8687
socket.write(pingFrame);
87-
socket.write(closeFrame);
88+
socket.write(frame);
8889
socket.write(payload);
8990
await until(() => messages.length === 1);
9091
expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]);
9192
});
93+
94+
test('two binary data frames', async () => {
95+
const {socket, encoder, connection} = setup();
96+
const messages: [data: Uint8Array, isUtf8: boolean][] = [];
97+
connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => {
98+
messages.push([data, isUtf8]);
99+
};
100+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
101+
const frame1 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0);
102+
encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3);
103+
const payload1 = encoder.writer.flush();
104+
const frame2 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0);
105+
encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3);
106+
const payload2 = encoder.writer.flush();
107+
socket.write(pingFrame);
108+
socket.write(listToUint8([
109+
frame1,
110+
payload1,
111+
frame2,
112+
payload2,
113+
]));
114+
await until(() => messages.length === 2);
115+
expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]);
116+
expect(messages[1]).toEqual([new Uint8Array([0x04, 0x05, 0x06]), false]);
117+
});
92118

93119
test('text frame', async () => {
94120
const {socket, encoder, connection} = setup();
@@ -148,3 +174,36 @@ describe('.onmessage', () => {
148174
});
149175
});
150176
});
177+
178+
describe('.fragment', () => {
179+
test('parses out message fragments', async () => {
180+
const {socket, encoder, connection} = setup();
181+
const fragments: [isLast: boolean, data: Uint8Array, isUtf8: boolean][] = [];
182+
connection.onfragment = (isLast: boolean, data: Uint8Array, isUtf8: boolean): void => {
183+
fragments.push([isLast, data, isUtf8]);
184+
};
185+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
186+
const buf1 = encoder.encodeHdr(0, WsFrameOpcode.BINARY, 3, 0);
187+
encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3);
188+
const buf2 = encoder.writer.flush();
189+
const buf3 = encoder.encodeHdr(0, WsFrameOpcode.CONTINUE, 3, 0);
190+
encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3);
191+
const buf4 = encoder.writer.flush();
192+
const buf5 = encoder.encodeHdr(1, WsFrameOpcode.CONTINUE, 3, 0);
193+
encoder.writer.buf(Buffer.from([0x07, 0x08, 0x09]), 3);
194+
const buf6 = encoder.writer.flush();
195+
socket.write(pingFrame);
196+
socket.write(buf1);
197+
socket.write(buf2);
198+
socket.write(buf3);
199+
socket.write(buf4);
200+
socket.write(buf5);
201+
socket.write(buf6);
202+
await until(() => fragments.length === 3);
203+
expect(fragments).toEqual([
204+
[false, new Uint8Array([0x01, 0x02, 0x03]), false],
205+
[false, new Uint8Array([0x04, 0x05, 0x06]), false],
206+
[true, new Uint8Array([0x07, 0x08, 0x09]), false],
207+
]);
208+
});
209+
});

0 commit comments

Comments
 (0)