Skip to content

Commit 00680d7

Browse files
committed
feat(reactive-rpc): 🎸 implement server builder
1 parent 4e84b00 commit 00680d7

File tree

6 files changed

+144
-27
lines changed

6 files changed

+144
-27
lines changed

src/reactive-rpc/__demos__/ws.ts

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,10 @@
11
// npx ts-node src/reactive-rpc/__demos__/ws.ts
22

3-
import {Http1Server} from '../server/http1/Http1Server';
3+
import {createCaller} from '../common/rpc/__tests__/sample-api';
4+
import {RpcServer} from '../server/http1/RpcServer';
45

5-
const server = Http1Server.start();
6-
7-
server.ws({
8-
path: '/ws',
9-
onConnect: (connection) => {
10-
console.log('CONNECTED');
11-
connection.onmessage = (data, isUtf8) => {
12-
console.log('MESSAGE', data, isUtf8);
13-
};
14-
},
15-
});
16-
17-
server.route({
18-
path: '/hello',
19-
handler: ({res}) => {
20-
res.statusCode = 200;
21-
res.end('Hello World\n');
22-
},
6+
RpcServer.startWithDefaults({
7+
port: 3000,
8+
caller: createCaller(),
9+
logger: console,
2310
});
24-
25-
server.enableHttpPing();
26-
27-
server.start();
28-
29-
console.log(server + '');

src/reactive-rpc/common/rpc/caller/error/RpcError.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export enum RpcErrorCodes {
3434
export type RpcErrorValue = RpcValue<RpcError>;
3535

3636
export class RpcError extends Error implements IRpcError {
37-
public static from(error: unknown) {
37+
public static from(error: unknown): RpcError {
3838
if (error instanceof RpcError) return error;
3939
return RpcError.internal(error);
4040
}

src/reactive-rpc/server/http1/Http1Server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ export class Http1Server implements Printable {
135135
const codecs = this.codecs;
136136
const ip = this.findIp(req);
137137
const token = this.findToken(req);
138-
const ctx = new Http1ConnectionContext(res, path, query, ip, token, match.params, new NullObject(), codecs.value.json, codecs.value.json, codecs.messages.jsonRpc2);
138+
const ctx = new Http1ConnectionContext(req, res, path, query, ip, token, match.params, new NullObject(), codecs.value.json, codecs.value.json, codecs.messages.jsonRpc2);
139139
const handler = match.data.handler;
140140
handler(ctx);
141141
} catch (error) {
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import * as http from 'http';
2+
import {Printable} from '../../../util/print/types';
3+
import {printTree} from '../../../util/print/printTree';
4+
import {Http1Server} from './Http1Server';
5+
import {RpcError} from '../../common/rpc/caller';
6+
import {IncomingBatchMessage, RpcMessageBatchProcessor} from '../../common';
7+
import {ConnectionContext} from './context';
8+
import type {RpcCaller} from '../../common/rpc/caller/RpcCaller';
9+
import type {ServerLogger} from './types';
10+
11+
const DEFAULT_MAX_PAYLOAD = 4 * 1024 * 1024;
12+
13+
export interface RpcServerOpts {
14+
http1: Http1Server;
15+
caller: RpcCaller<any>;
16+
logger?: ServerLogger;
17+
}
18+
19+
export interface RpcServerStartOpts extends Omit<RpcServerOpts, 'http1'> {
20+
port?: number;
21+
server?: http.Server;
22+
}
23+
24+
export class RpcServer implements Printable {
25+
public static readonly create = (opts: RpcServerOpts) => {
26+
const server = new RpcServer(opts);
27+
opts.http1.enableHttpPing();
28+
return server;
29+
};
30+
31+
public static readonly startWithDefaults = (opts: RpcServerStartOpts) => {
32+
const port = opts.port ?? 8080;
33+
const logger = opts.logger ?? console;
34+
const server = http.createServer();
35+
const http1Server = new Http1Server({
36+
server,
37+
});
38+
const rpcServer = RpcServer.create({
39+
caller: opts.caller,
40+
http1: http1Server,
41+
logger,
42+
});
43+
rpcServer.enableDefaults();
44+
http1Server.start();
45+
server.listen(port, () => {
46+
let host = server.address() || 'localhost';
47+
if (typeof host === 'object') host = (host as any).address;
48+
logger.log({msg: 'SERVER_STARTED', host, port});
49+
});
50+
};
51+
52+
public readonly http1: Http1Server;
53+
protected readonly batchProcessor: RpcMessageBatchProcessor<ConnectionContext>;
54+
55+
constructor (protected readonly opts: RpcServerOpts) {
56+
const http1 = this.http1 = opts.http1;
57+
const onInternalError = http1.oninternalerror;
58+
http1.oninternalerror = (error, res, req) => {
59+
if (error instanceof RpcError) {
60+
res.statusCode = 400;
61+
const data = JSON.stringify(error.toJson());
62+
res.end(data);
63+
return;
64+
}
65+
onInternalError(error, res, req);
66+
};
67+
this.batchProcessor = new RpcMessageBatchProcessor<ConnectionContext>({caller: opts.caller});
68+
}
69+
70+
public enableHttpPing(): void {
71+
this.http1.enableHttpPing();
72+
}
73+
74+
public enableHttpRpc(path: string = '/rpc'): void {
75+
const batchProcessor = this.batchProcessor;
76+
const logger = this.opts.logger ?? console;
77+
this.http1.route({
78+
method: 'POST',
79+
path,
80+
handler: async (ctx) => {
81+
const res = ctx.res;
82+
const body = await ctx.body(DEFAULT_MAX_PAYLOAD);
83+
if (!res.socket) return;
84+
try {
85+
const messageCodec = ctx.msgCodec;
86+
const incomingMessages = messageCodec.decodeBatch(ctx.reqCodec, body);
87+
try {
88+
const outgoingMessages = await batchProcessor.onBatch(incomingMessages as IncomingBatchMessage[], ctx);
89+
if (!res.socket) return;
90+
const resCodec = ctx.resCodec;
91+
messageCodec.encodeBatch(resCodec, outgoingMessages);
92+
const buf = resCodec.encoder.writer.flush();
93+
if (!res.socket) return;
94+
res.end(buf);
95+
} catch (error) {
96+
logger.error('HTTP_RPC_PROCESSING', error, {messages: incomingMessages});
97+
throw RpcError.from(error);
98+
}
99+
} catch (error) {
100+
if (typeof error === 'object' && error)
101+
if ((error as any).message === 'Invalid JSON') throw RpcError.badRequest();
102+
throw RpcError.from(error);
103+
}
104+
},
105+
});
106+
}
107+
108+
public enableDefaults(): void {
109+
// this.enableCors();
110+
this.enableHttpPing();
111+
this.enableHttpRpc();
112+
// this.enableWsRpc();
113+
// this.startRouting();
114+
}
115+
116+
// ---------------------------------------------------------------- Printable
117+
118+
public toString(tab: string = ''): string {
119+
return `${this.constructor.name}` + printTree(tab, [
120+
(tab) => `HTTP/1.1 ${this.http1.toString(tab)}`,
121+
]);
122+
}
123+
}

src/reactive-rpc/server/http1/context.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import {getBody} from './util';
2+
import {listToUint8} from '../../../util/buffers/concat';
13
import type * as http from 'http';
24
import type {JsonValueCodec} from '../../../json-pack/codecs/types';
35
import type {RpcMessageCodec} from '../../common/codec/types';
@@ -19,6 +21,7 @@ export interface ConnectionContext<Meta = Record<string, unknown>> {
1921

2022
export class Http1ConnectionContext<Meta = Record<string, unknown>> implements ConnectionContext<Meta> {
2123
constructor(
24+
public readonly req: http.IncomingMessage,
2225
public readonly res: http.ServerResponse,
2326
public path: string,
2427
public query: string,
@@ -88,4 +91,10 @@ export class Http1ConnectionContext<Meta = Record<string, unknown>> implements C
8891
}
8992
}
9093
}
94+
95+
public async body(maxPayload: number): Promise<Uint8Array> {
96+
const list = await getBody(this.req, maxPayload);
97+
const bodyUint8 = listToUint8(list);
98+
return bodyUint8;
99+
}
91100
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export interface ServerLogger {
2+
log(msg: unknown): void;
3+
error(kind: string, error?: Error | unknown | null, meta?: unknown): void;
4+
}

0 commit comments

Comments
 (0)