Skip to content

Commit eed1662

Browse files
committed
feat(reactive-rpc): 🎸 integrate WebSocket into RPC server
1 parent 72ac248 commit eed1662

File tree

5 files changed

+199
-80
lines changed

5 files changed

+199
-80
lines changed

src/reactive-rpc/server/http1/Http1Server.ts

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import {Router} from '../../../util/router';
88
import {Printable} from '../../../util/print/types';
99
import {printTree} from '../../../util/print/printTree';
1010
import {PayloadTooLarge} from './errors';
11-
import {findTokenInText} from './util';
12-
import {Http1ConnectionContext} from './context';
11+
import {findTokenInText, setCodecs} from './util';
12+
import {Http1ConnectionContext, WsConnectionContext} from './context';
1313
import {RpcCodecs} from '../../common/codec/RpcCodecs';
1414
import {Codecs} from '../../../json-pack/codecs/Codecs';
1515
import {RpcMessageCodecs} from '../../common/codec/RpcMessageCodecs';
@@ -43,9 +43,10 @@ export interface Http1EndpointDefinition {
4343

4444
export interface WsEndpointDefinition {
4545
path: string;
46-
maxPayload?: number;
46+
maxIncomingMessage?: number;
47+
maxOutgoingBackpressure?: number;
4748
onUpgrade?(req: http.IncomingMessage, connection: WsServerConnection): void;
48-
onConnect(connection: WsServerConnection, req: http.IncomingMessage): void;
49+
onConnect(ctx: WsConnectionContext, req: http.IncomingMessage): void;
4950
}
5051

5152
export interface Http1ServerOpts {
@@ -150,7 +151,7 @@ export class Http1Server implements Printable {
150151
);
151152
const headers = req.headers;
152153
const contentType = headers['content-type'];
153-
if (typeof contentType === 'string') ctx.setCodecs(contentType, codecs);
154+
if (typeof contentType === 'string') setCodecs(ctx, contentType, codecs);
154155
const handler = match.data.handler;
155156
await handler(ctx);
156157
} catch (error) {
@@ -165,7 +166,15 @@ export class Http1Server implements Printable {
165166
protected wsMatcher: RouteMatcher<WsEndpointDefinition> = () => undefined;
166167

167168
private readonly onWsUpgrade = (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => {
168-
const route = req.url || '';
169+
const url = req.url ?? '';
170+
const queryStartIndex = url.indexOf('?');
171+
let path = url;
172+
let query = '';
173+
if (queryStartIndex >= 0) {
174+
path = url.slice(0, queryStartIndex);
175+
query = url.slice(queryStartIndex + 1);
176+
}
177+
const route = (req.method || '') + path;
169178
const match = this.wsMatcher(route);
170179
if (!match) {
171180
socket.end();
@@ -174,14 +183,37 @@ export class Http1Server implements Printable {
174183
const def = match.data;
175184
const headers = req.headers;
176185
const connection = new WsServerConnection(this.wsEncoder, socket as net.Socket, head);
186+
connection.maxIncomingMessage = def.maxIncomingMessage ?? 2 * 1024 * 1024;
187+
connection.maxBackpressure = def.maxOutgoingBackpressure ?? 2 * 1024 * 1024;
177188
if (def.onUpgrade) def.onUpgrade(req, connection);
178189
else {
179190
const secWebSocketKey = headers['sec-websocket-key'] ?? '';
180191
const secWebSocketProtocol = headers['sec-websocket-protocol'] ?? '';
181192
const secWebSocketExtensions = headers['sec-websocket-extensions'] ?? '';
182193
connection.upgrade(secWebSocketKey, secWebSocketProtocol, secWebSocketExtensions);
183194
}
184-
def.onConnect(connection, req);
195+
const codecs = this.codecs;
196+
const ip = this.findIp(req);
197+
const token = this.findToken(req);
198+
const ctx = new WsConnectionContext(
199+
connection,
200+
path,
201+
query,
202+
ip,
203+
token,
204+
match.params,
205+
new NullObject(),
206+
codecs.value.json,
207+
codecs.value.json,
208+
codecs.messages.compact,
209+
);
210+
const contentType = headers['content-type'];
211+
if (typeof contentType === 'string') setCodecs(ctx, contentType, codecs);
212+
else {
213+
const secWebSocketProtocol = headers['sec-websocket-protocol'] ?? '';
214+
if (typeof secWebSocketProtocol === 'string') setCodecs(ctx, secWebSocketProtocol, codecs);
215+
}
216+
def.onConnect(ctx, req);
185217
};
186218

187219
public ws(def: WsEndpointDefinition): void {
@@ -209,23 +241,19 @@ export class Http1Server implements Printable {
209241
*/
210242
public findToken(req: http.IncomingMessage): string {
211243
let token: string = '';
212-
let text: string = '';
213244
const headers = req.headers;
214245
let header: string | string[] | undefined;
215246
header = headers['authorization'];
216-
text = typeof header === 'string' ? header : header?.[0] ?? '';
217-
if (text) token = findTokenInText(text);
247+
if (typeof header === 'string') token = findTokenInText(header);
218248
if (token) return token;
219-
text = req.url || '';
220-
if (text) token = findTokenInText(text);
249+
const url = req.url;
250+
if (typeof url === 'string') token = findTokenInText(url);
221251
if (token) return token;
222252
header = headers['cookie'];
223-
text = typeof header === 'string' ? header : header?.[0] ?? '';
224-
if (text) token = findTokenInText(text);
253+
if (typeof header === 'string') token = findTokenInText(header);
225254
if (token) return token;
226255
header = headers['sec-websocket-protocol'];
227-
text = typeof header === 'string' ? header : header?.[0] ?? '';
228-
if (text) token = findTokenInText(text);
256+
if (typeof header === 'string') token = findTokenInText(header);
229257
return token;
230258
}
231259

src/reactive-rpc/server/http1/RpcServer.ts

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import {Printable} from '../../../util/print/types';
33
import {printTree} from '../../../util/print/printTree';
44
import {Http1Server} from './Http1Server';
55
import {RpcError} from '../../common/rpc/caller';
6-
import {IncomingBatchMessage, RpcMessageBatchProcessor} from '../../common';
7-
import {ConnectionContext} from './context';
6+
import {IncomingBatchMessage, ReactiveRpcClientMessage, ReactiveRpcMessage, RpcMessageBatchProcessor, RpcMessageStreamProcessor} from '../../common';
7+
import {ConnectionContext, WsConnectionContext} from './context';
88
import type {RpcCaller} from '../../common/rpc/caller/RpcCaller';
99
import type {ServerLogger} from './types';
1010

@@ -105,6 +105,61 @@ export class RpcServer implements Printable {
105105
});
106106
}
107107

108+
public enableWsRpc(path: string = '/rpc'): void {
109+
const opts = this.opts;
110+
const logger = opts.logger ?? console;
111+
const caller = opts.caller;
112+
this.http1.ws({
113+
path,
114+
maxIncomingMessage: 2 * 1024 * 1024,
115+
maxOutgoingBackpressure: 2 * 1024 * 1024,
116+
onConnect: (ctx: WsConnectionContext, req: http.IncomingMessage) => {
117+
const connection = ctx.connection;
118+
const reqCodec = ctx.reqCodec;
119+
const resCodec = ctx.resCodec;
120+
const msgCodec = ctx.msgCodec;
121+
const encoder = resCodec.encoder;
122+
const rpc = new RpcMessageStreamProcessor({
123+
caller,
124+
send: (messages: ReactiveRpcMessage[]) => {
125+
try {
126+
const writer = encoder.writer;
127+
writer.reset();
128+
msgCodec.encodeBatch(resCodec, messages);
129+
const encoded = writer.flush();
130+
connection.sendBinMsg(encoded);
131+
} catch (error) {
132+
logger.error('WS_SEND', error, {messages});
133+
connection.close();
134+
}
135+
},
136+
bufferSize: 1,
137+
bufferTime: 0,
138+
});
139+
connection.onmessage = (uint8: Uint8Array, isUtf8: boolean) => {
140+
let messages: ReactiveRpcClientMessage[];
141+
try {
142+
messages = msgCodec.decodeBatch(reqCodec, uint8) as ReactiveRpcClientMessage[];
143+
} catch (error) {
144+
logger.error('RX_RPC_DECODING', error, {codec: reqCodec.id, buf: Buffer.from(uint8).toString('base64')});
145+
connection.close();
146+
return;
147+
}
148+
try {
149+
rpc.onMessages(messages, ctx);
150+
} catch (error) {
151+
logger.error('RX_RPC_PROCESSING', error, messages!);
152+
connection.close();
153+
return;
154+
}
155+
};
156+
connection.onclose = (code: number, reason: string) => {
157+
rpc.stop();
158+
};
159+
},
160+
});
161+
}
162+
108163
public enableDefaults(): void {
109164
// this.enableCors();
110165
this.enableHttpPing();

src/reactive-rpc/server/http1/context.ts

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ import {listToUint8} from '../../../util/buffers/concat';
33
import type * as http from 'http';
44
import type {JsonValueCodec} from '../../../json-pack/codecs/types';
55
import type {RpcMessageCodec} from '../../common/codec/types';
6-
import type {RpcCodecs} from '../../common/codec/RpcCodecs';
7-
8-
const REGEX_CODECS_SPECIFIER = /rpc\.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/;
6+
import type {WsServerConnection} from '../ws/server/WsServerConnection';
97

108
export interface ConnectionContext<Meta = Record<string, unknown>> {
119
path: string;
@@ -34,67 +32,24 @@ export class Http1ConnectionContext<Meta = Record<string, unknown>> implements C
3432
public msgCodec: RpcMessageCodec,
3533
) {}
3634

37-
/**
38-
* @param specifier A string which may contain a codec specifier. For example:
39-
* - `rpc.rx.compact.cbor` for Rx-RPC with compact messages and CBOR values.
40-
* - `rpc.json2.verbose.json` for JSON-RPC 2.0 with verbose messages encoded as JSON.
41-
*/
42-
public setCodecs(specifier: string, codecs: RpcCodecs): void {
43-
const match = REGEX_CODECS_SPECIFIER.exec(specifier);
44-
if (!match) return;
45-
const [, protocol, messageFormat, request, response] = match;
46-
switch (protocol) {
47-
case 'rx': {
48-
switch (messageFormat) {
49-
case 'compact': {
50-
this.msgCodec = codecs.messages.compact;
51-
break;
52-
}
53-
case 'binary': {
54-
this.msgCodec = codecs.messages.binary;
55-
break;
56-
}
57-
}
58-
break;
59-
}
60-
case 'json2': {
61-
this.msgCodec = codecs.messages.jsonRpc2;
62-
break;
63-
}
64-
}
65-
switch (request) {
66-
case 'cbor': {
67-
this.resCodec = this.reqCodec = codecs.value.cbor;
68-
break;
69-
}
70-
case 'json': {
71-
this.resCodec = this.reqCodec = codecs.value.json;
72-
break;
73-
}
74-
case 'msgpack': {
75-
this.resCodec = this.reqCodec = codecs.value.msgpack;
76-
break;
77-
}
78-
}
79-
switch (response) {
80-
case 'cbor': {
81-
this.resCodec = codecs.value.cbor;
82-
break;
83-
}
84-
case 'json': {
85-
this.resCodec = codecs.value.json;
86-
break;
87-
}
88-
case 'msgpack': {
89-
this.resCodec = codecs.value.msgpack;
90-
break;
91-
}
92-
}
93-
}
94-
9535
public async body(maxPayload: number): Promise<Uint8Array> {
9636
const list = await getBody(this.req, maxPayload);
9737
const bodyUint8 = listToUint8(list);
9838
return bodyUint8;
9939
}
10040
}
41+
42+
export class WsConnectionContext<Meta = Record<string, unknown>> implements ConnectionContext<Meta> {
43+
constructor(
44+
public readonly connection: WsServerConnection,
45+
public path: string,
46+
public query: string,
47+
public readonly ip: string,
48+
public token: string,
49+
public readonly params: string[] | null,
50+
public readonly meta: Meta,
51+
public reqCodec: JsonValueCodec,
52+
public resCodec: JsonValueCodec,
53+
public msgCodec: RpcMessageCodec,
54+
) {}
55+
}

src/reactive-rpc/server/http1/util.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import {PayloadTooLarge} from './errors';
2+
import type {ConnectionContext} from './context';
23
import type * as http from 'http';
4+
import type {RpcCodecs} from '../../common/codec/RpcCodecs';
35

46
export const getBody = (request: http.IncomingMessage, max: number): Promise<Buffer[]> => {
57
return new Promise<Buffer[]>((resolve, reject) => {
@@ -32,3 +34,64 @@ export const findTokenInText = (text: string): string => {
3234
if (!match) return '';
3335
return match[1] || '';
3436
};
37+
38+
39+
const REGEX_CODECS_SPECIFIER = /rpc\.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/;
40+
41+
/**
42+
* @param specifier A string which may contain a codec specifier. For example:
43+
* - `rpc.rx.compact.cbor` for Rx-RPC with compact messages and CBOR values.
44+
* - `rpc.json2.verbose.json` for JSON-RPC 2.0 with verbose messages encoded as JSON.
45+
*/
46+
export const setCodecs = (ctx: ConnectionContext, specifier: string, codecs: RpcCodecs): void => {
47+
const match = REGEX_CODECS_SPECIFIER.exec(specifier);
48+
if (!match) return;
49+
const [, protocol, messageFormat, request, response] = match;
50+
switch (protocol) {
51+
case 'rx': {
52+
switch (messageFormat) {
53+
case 'compact': {
54+
ctx.msgCodec = codecs.messages.compact;
55+
break;
56+
}
57+
case 'binary': {
58+
ctx.msgCodec = codecs.messages.binary;
59+
break;
60+
}
61+
}
62+
break;
63+
}
64+
case 'json2': {
65+
ctx.msgCodec = codecs.messages.jsonRpc2;
66+
break;
67+
}
68+
}
69+
switch (request) {
70+
case 'cbor': {
71+
ctx.resCodec = ctx.reqCodec = codecs.value.cbor;
72+
break;
73+
}
74+
case 'json': {
75+
ctx.resCodec = ctx.reqCodec = codecs.value.json;
76+
break;
77+
}
78+
case 'msgpack': {
79+
ctx.resCodec = ctx.reqCodec = codecs.value.msgpack;
80+
break;
81+
}
82+
}
83+
switch (response) {
84+
case 'cbor': {
85+
ctx.resCodec = codecs.value.cbor;
86+
break;
87+
}
88+
case 'json': {
89+
ctx.resCodec = codecs.value.json;
90+
break;
91+
}
92+
case 'msgpack': {
93+
ctx.resCodec = codecs.value.msgpack;
94+
break;
95+
}
96+
}
97+
};

0 commit comments

Comments
 (0)