Skip to content

Commit 803401b

Browse files
committed
feat(reactive-rpc): 🎸 support different request/response codecs in Websocket rpc
1 parent fe690d8 commit 803401b

File tree

4 files changed

+31
-26
lines changed

4 files changed

+31
-26
lines changed

‎src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts‎

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,32 +21,32 @@ if (process.env.TEST_E2E) {
2121
['rpc.rx.compact.json', compact, json, json],
2222
['rpc.rx.compact.cbor', compact, cbor, cbor],
2323
['rpc.rx.compact.msgpack', compact, msgpack, msgpack],
24-
// ['rpc.rx.compact.json-cbor', compact, json, cbor],
25-
// ['rpc.rx.compact.json-msgpack', compact, json, msgpack],
26-
// ['rpc.rx.compact.cbor-json', compact, cbor, json],
27-
// ['rpc.rx.compact.cbor-msgpack', compact, cbor, msgpack],
28-
// ['rpc.rx.compact.msgpack-json', compact, msgpack, json],
29-
// ['rpc.rx.compact.msgpack-cbor', compact, msgpack, cbor],
24+
['rpc.rx.compact.json-cbor', compact, json, cbor],
25+
['rpc.rx.compact.json-msgpack', compact, json, msgpack],
26+
['rpc.rx.compact.cbor-json', compact, cbor, json],
27+
['rpc.rx.compact.cbor-msgpack', compact, cbor, msgpack],
28+
['rpc.rx.compact.msgpack-json', compact, msgpack, json],
29+
['rpc.rx.compact.msgpack-cbor', compact, msgpack, cbor],
3030

3131
['rpc.rx.binary.cbor', binary, cbor, cbor],
3232
['rpc.rx.binary.msgpack', binary, msgpack, msgpack],
3333
['rpc.rx.binary.json', binary, json, json],
34-
// ['rpc.rx.binary.json-cbor', binary, json, cbor],
35-
// ['rpc.rx.binary.json-msgpack', binary, json, msgpack],
36-
// ['rpc.rx.binary.cbor-json', binary, cbor, json],
37-
// ['rpc.rx.binary.cbor-msgpack', binary, cbor, msgpack],
38-
// ['rpc.rx.binary.msgpack-json', binary, msgpack, json],
39-
// ['rpc.rx.binary.msgpack-cbor', binary, msgpack, cbor],
34+
['rpc.rx.binary.json-cbor', binary, json, cbor],
35+
['rpc.rx.binary.json-msgpack', binary, json, msgpack],
36+
['rpc.rx.binary.cbor-json', binary, cbor, json],
37+
['rpc.rx.binary.cbor-msgpack', binary, cbor, msgpack],
38+
['rpc.rx.binary.msgpack-json', binary, msgpack, json],
39+
['rpc.rx.binary.msgpack-cbor', binary, msgpack, cbor],
4040

4141
['rpc.json2.verbose.json', jsonRpc2, json, json],
4242
['rpc.json2.verbose.cbor', jsonRpc2, cbor, cbor],
4343
['rpc.json2.verbose.msgpack', jsonRpc2, msgpack, msgpack],
44-
// ['rpc.json2.verbose.json-cbor', jsonRpc2, json, cbor],
45-
// ['rpc.json2.verbose.json-msgpack', jsonRpc2, json, msgpack],
46-
// ['rpc.json2.verbose.cbor-json', jsonRpc2, cbor, json],
47-
// ['rpc.json2.verbose.cbor-msgpack', jsonRpc2, cbor, msgpack],
48-
// ['rpc.json2.verbose.msgpack-json', jsonRpc2, msgpack, json],
49-
// ['rpc.json2.verbose.msgpack-cbor', jsonRpc2, msgpack, cbor],
44+
['rpc.json2.verbose.json-cbor', jsonRpc2, json, cbor],
45+
['rpc.json2.verbose.json-msgpack', jsonRpc2, json, msgpack],
46+
['rpc.json2.verbose.cbor-json', jsonRpc2, cbor, json],
47+
['rpc.json2.verbose.cbor-msgpack', jsonRpc2, cbor, msgpack],
48+
['rpc.json2.verbose.msgpack-json', jsonRpc2, msgpack, json],
49+
['rpc.json2.verbose.msgpack-cbor', jsonRpc2, msgpack, cbor],
5050
];
5151

5252
for (const [protocolSpecifier, msgCodec, reqCodec, resCodec] of cases) {

‎src/reactive-rpc/common/codec/RpcCodec.ts‎

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,15 @@ export class RpcCodec {
1111
return specifier as RpcSpecifier;
1212
}
1313

14-
public encode(messages: ReactiveRpcMessage[]): Uint8Array {
15-
const valueCodec = this.res;
14+
public encode(messages: ReactiveRpcMessage[], valueCodec: JsonValueCodec): Uint8Array {
1615
const encoder = valueCodec.encoder;
1716
const writer = encoder.writer;
1817
writer.reset();
1918
this.msg.encodeBatch(valueCodec, messages);
2019
return writer.flush();
2120
}
2221

23-
public decode(data: Uint8Array): ReactiveRpcMessage[] {
24-
const valueCodec = this.req;
22+
public decode(data: Uint8Array, valueCodec: JsonValueCodec): ReactiveRpcMessage[] {
2523
const decoder = valueCodec.decoder;
2624
const reader = decoder.reader;
2725
reader.reset(data);

‎src/reactive-rpc/common/rpc/RpcPersistentClient.ts‎

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ export interface RpcPersistentClientParams<Ctx = unknown> {
1212
channel: PersistentChannelParams;
1313
codec: RpcCodec;
1414
client?: Omit<StreamingRpcClientOptions, 'send'>;
15+
16+
/**
17+
* @todo Remove this option. Remove server from here.
18+
*/
1519
server?: Omit<RpcMessageStreamProcessorOptions<Ctx>, 'send'>;
1620

1721
/**
@@ -28,6 +32,9 @@ export interface RpcPersistentClientParams<Ctx = unknown> {
2832
pingMethod?: string;
2933
}
3034

35+
/**
36+
* RPC client which automatically reconnects if disconnected.
37+
*/
3138
export class RpcPersistentClient<Ctx = unknown> {
3239
public channel: PersistentChannel;
3340
public rpc?: RpcDuplex<Ctx>;
@@ -45,7 +52,7 @@ export class RpcPersistentClient<Ctx = unknown> {
4552
client: new StreamingRpcClient({
4653
...(params.client || {}),
4754
send: (messages: msg.ReactiveRpcClientMessage[]): void => {
48-
const encoded = codec.encode(messages);
55+
const encoded = codec.encode(messages, codec.req);
4956
this.channel.send$(encoded).subscribe();
5057
},
5158
}),
@@ -57,15 +64,15 @@ export class RpcPersistentClient<Ctx = unknown> {
5764
onNotification: () => {},
5865
}),
5966
send: (messages: (msg.ReactiveRpcServerMessage | msg.NotificationMessage)[]): void => {
60-
const encoded = codec.encode(messages);
67+
const encoded = codec.encode(messages, codec.req);
6168
this.channel.send$(encoded).subscribe();
6269
},
6370
}),
6471
});
6572

6673
this.channel.message$.pipe(takeUntil(close$)).subscribe((data) => {
6774
const encoded = typeof data === 'string' ? textEncoder.encode(data) : new Uint8Array(data);
68-
const messages = codec.decode(encoded);
75+
const messages = codec.decode(encoded, codec.res);
6976
duplex.onMessages((messages instanceof Array ? messages : [messages]) as msg.ReactiveRpcMessage[], {} as Ctx);
7077
});
7178

‎src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ test('on remote method execution, sends message over WebSocket only once', async
3636
await until(() => onSend.mock.calls.length === 1);
3737
expect(onSend).toHaveBeenCalledTimes(1);
3838
const message = onSend.mock.calls[0][0];
39-
const decoded = codec.decode(message);
39+
const decoded = codec.decode(message, codec.req);
4040
const messageDecoded = decoded[0];
4141
expect(messageDecoded).toBeInstanceOf(RequestCompleteMessage);
4242
expect(messageDecoded).toMatchObject(new RequestCompleteMessage(1, 'foo.bar', new Value({foo: 'bar'}, undefined)));

0 commit comments

Comments
 (0)