Skip to content

Commit f15c9ac

Browse files
committed
fix(reactive-rpc): 🐛 decode close frame payload
1 parent 00e9f8d commit f15c9ac

File tree

2 files changed

+141
-12
lines changed

2 files changed

+141
-12
lines changed

src/reactive-rpc/server/ws/server/WsServerConnection.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ export class WsServerConnection {
4949
if (!frame) break;
5050
else if (frame instanceof WsPingFrame) this.onping(frame.data);
5151
else if (frame instanceof WsPongFrame) this.onpong(frame.data);
52-
else if (frame instanceof WsCloseFrame) this.onClose(frame.code, frame.reason);
53-
else if (frame instanceof WsFrameHeader) {
52+
else if (frame instanceof WsCloseFrame) {
53+
decoder.readCloseFrameData(frame);
54+
this.onClose(frame.code, frame.reason);
55+
} else if (frame instanceof WsFrameHeader) {
5456
if (this.stream) {
5557
if (frame.opcode !== WsFrameOpcode.CONTINUE) {
5658
this.onClose(1002, 'DATA');

src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts

Lines changed: 137 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import * as stream from 'stream';
22
import {WsServerConnection} from '../WsServerConnection';
33
import {WsFrameEncoder} from '../../codec/WsFrameEncoder';
44
import {until} from 'thingies';
5+
import {WsFrameOpcode} from '../../codec';
6+
import {bufferToUint8Array} from '../../../../../util/buffers/bufferToUint8Array';
57

68
const setup = () => {
79
const socket = new stream.PassThrough();
@@ -10,14 +12,139 @@ const setup = () => {
1012
return {socket, encoder, connection};
1113
};
1214

13-
test('can parse PING frame', async () => {
14-
const {socket, encoder, connection} = setup();
15-
const pings: Uint8Array[] = [];
16-
connection.onping = (data: Uint8Array | null): void => {
17-
if (data) pings.push(data);
18-
};
19-
const pingFrame = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03]));
20-
socket.write(pingFrame);
21-
await until(() => pings.length === 1);
22-
expect(pings[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03]));
15+
describe('.onping', () => {
16+
test('can parse PING frame', async () => {
17+
const {socket, encoder, connection} = setup();
18+
const pings: Uint8Array[] = [];
19+
connection.onping = (data: Uint8Array | null): void => {
20+
if (data) pings.push(data);
21+
};
22+
const pingFrame = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03]));
23+
socket.write(pingFrame);
24+
await until(() => pings.length === 1);
25+
expect(pings[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03]));
26+
});
27+
28+
test('can parse empty PING frame', async () => {
29+
const {socket, encoder, connection} = setup();
30+
const pings: Uint8Array[] = [];
31+
connection.onping = (data: Uint8Array | null): void => {
32+
if (data) pings.push(data);
33+
};
34+
const pingFrame = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03]));
35+
socket.write(pingFrame);
36+
const pingFrame2 = encoder.encodePing(Buffer.from([]));
37+
socket.write(pingFrame2);
38+
await until(() => pings.length === 2);
39+
expect(pings[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03]));
40+
expect(pings[1]).toEqual(new Uint8Array([]));
41+
});
42+
});
43+
44+
describe('.onping', () => {
45+
test('can parse PONG frame', async () => {
46+
const {socket, encoder, connection} = setup();
47+
const pongs: Uint8Array[] = [];
48+
connection.onpong = (data: Uint8Array | null): void => {
49+
if (data) pongs.push(data);
50+
};
51+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
52+
socket.write(pingFrame);
53+
await until(() => pongs.length === 1);
54+
expect(pongs[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03]));
55+
});
56+
});
57+
58+
describe('.onclose', () => {
59+
test('can parse CLOSE frame', async () => {
60+
const {socket, encoder, connection} = setup();
61+
const closes: [code: number, reason: string][] = [];
62+
connection.onclose = (code: number, reason: string): void => {
63+
closes.push([code, reason]);
64+
};
65+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
66+
socket.write(pingFrame);
67+
const closeFrame = encoder.encodeClose('OK', 1000);
68+
socket.write(closeFrame);
69+
await until(() => closes.length === 1);
70+
expect(closes[0]).toEqual([1000, 'OK']);
71+
});
72+
});
73+
74+
describe('.onmessage', () => {
75+
describe('un-masked', () => {
76+
test('binary data frame', async () => {
77+
const {socket, encoder, connection} = setup();
78+
const messages: [data: Uint8Array, isUtf8: boolean][] = [];
79+
connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => {
80+
messages.push([data, isUtf8]);
81+
};
82+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
83+
const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0);
84+
encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3);
85+
const payload = encoder.writer.flush();
86+
socket.write(pingFrame);
87+
socket.write(closeFrame);
88+
socket.write(payload);
89+
await until(() => messages.length === 1);
90+
expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]);
91+
});
92+
93+
test('text frame', async () => {
94+
const {socket, encoder, connection} = setup();
95+
const messages: [data: Uint8Array, isUtf8: boolean][] = [];
96+
connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => {
97+
messages.push([data, isUtf8]);
98+
};
99+
const pingFrame1 = encoder.encodePing(Buffer.from([0x01]));
100+
const pingFrame2 = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03]));
101+
const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.TEXT, 4, 0);
102+
encoder.writer.buf(Buffer.from('asdf'), 4);
103+
const payload = encoder.writer.flush();
104+
socket.write(pingFrame1);
105+
socket.write(pingFrame2);
106+
socket.write(closeFrame);
107+
socket.write(payload);
108+
await until(() => messages.length === 1);
109+
expect(messages[0]).toEqual([bufferToUint8Array(Buffer.from('asdf')), true]);
110+
});
111+
});
112+
113+
describe('masked', () => {
114+
test('binary data frame', async () => {
115+
const {socket, encoder, connection} = setup();
116+
const messages: [data: Uint8Array, isUtf8: boolean][] = [];
117+
connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => {
118+
messages.push([data, isUtf8]);
119+
};
120+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
121+
const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0x12345678);
122+
encoder.writeBufXor(Buffer.from([0x01, 0x02, 0x03]), 0x12345678);
123+
const payload = encoder.writer.flush();
124+
socket.write(pingFrame);
125+
socket.write(closeFrame);
126+
socket.write(payload);
127+
await until(() => messages.length === 1);
128+
expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]);
129+
});
130+
131+
test('text frame', async () => {
132+
const {socket, encoder, connection} = setup();
133+
const messages: [data: Uint8Array, isUtf8: boolean][] = [];
134+
connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => {
135+
messages.push([data, isUtf8]);
136+
};
137+
const pingFrame1 = encoder.encodePing(Buffer.from([0x01]));
138+
const pingFrame2 = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03]));
139+
const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.TEXT, 4, 0x12345678);
140+
encoder.writeBufXor(Buffer.from('asdf'), 0x12345678);
141+
const payload = encoder.writer.flush();
142+
socket.write(pingFrame1);
143+
socket.write(pingFrame2);
144+
socket.write(closeFrame);
145+
socket.write(payload);
146+
await until(() => messages.length === 1);
147+
expect(messages[0]).toEqual([bufferToUint8Array(Buffer.from('asdf')), true]);
148+
});
149+
});
23150
});

0 commit comments

Comments
 (0)