Skip to content

Commit c9e186c

Browse files
committed
refactor(reactive-rpc): 💡 remove server part from RpcPersistentClient
1 parent 32f1791 commit c9e186c

File tree

3 files changed

+27
-50
lines changed

3 files changed

+27
-50
lines changed

src/reactive-rpc/browser/createBinaryWsRpcClient.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import {CborJsonValueCodec} from "../../json-pack/codecs/cbor";
2-
import {Writer} from "../../util/buffers/Writer";
3-
import {RpcPersistentClient, WebSocketChannel} from "../common";
4-
import {RpcCodec} from "../common/codec/RpcCodec";
5-
import {BinaryRpcMessageCodec} from "../common/codec/binary";
1+
import {CborJsonValueCodec} from '../../json-pack/codecs/cbor';
2+
import {Writer} from '../../util/buffers/Writer';
3+
import {RpcPersistentClient, WebSocketChannel} from '../common';
4+
import {RpcCodec} from '../common/codec/RpcCodec';
5+
import {BinaryRpcMessageCodec} from '../common/codec/binary';
66

77
export const createBinaryWsRpcClient = (url: string) => {
88
const writer = new Writer(1024 * 4);

src/reactive-rpc/browser/createJsonWsRpcClient.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import {JsonJsonValueCodec} from "../../json-pack/codecs/json";
2-
import {Writer} from "../../util/buffers/Writer";
3-
import {RpcPersistentClient, WebSocketChannel} from "../common";
4-
import {RpcCodec} from "../common/codec/RpcCodec";
5-
import {CompactRpcMessageCodec} from "../common/codec/compact";
1+
import {JsonJsonValueCodec} from '../../json-pack/codecs/json';
2+
import {Writer} from '../../util/buffers/Writer';
3+
import {RpcPersistentClient, WebSocketChannel} from '../common';
4+
import {RpcCodec} from '../common/codec/RpcCodec';
5+
import {CompactRpcMessageCodec} from '../common/codec/compact';
66

77
export const createJsonWsRpcClient = (url: string) => {
88
const writer = new Writer(1024 * 4);

src/reactive-rpc/common/rpc/RpcPersistentClient.ts

Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,15 @@
1+
import * as msg from '../messages';
12
import {firstValueFrom, Observable, ReplaySubject, timer} from 'rxjs';
23
import {filter, first, share, switchMap, takeUntil} from 'rxjs/operators';
3-
import * as msg from '../messages';
44
import {StreamingRpcClient, StreamingRpcClientOptions} from './client/StreamingRpcClient';
5-
import {ApiRpcCaller} from './caller/ApiRpcCaller';
6-
import {RpcDuplex} from '../rpc/RpcDuplex';
7-
import {RpcMessageStreamProcessor, RpcMessageStreamProcessorOptions} from './RpcMessageStreamProcessor';
85
import {PersistentChannel, PersistentChannelParams} from '../channel';
9-
import {RpcCodec} from '../codec/RpcCodec';
6+
import type {RpcCodec} from '../codec/RpcCodec';
107

11-
export interface RpcPersistentClientParams<Ctx = unknown> {
8+
export interface RpcPersistentClientParams {
129
channel: PersistentChannelParams;
1310
codec: RpcCodec;
1411
client?: Omit<StreamingRpcClientOptions, 'send'>;
1512

16-
/**
17-
* @todo Remove this option. Remove server from here.
18-
*/
19-
server?: Omit<RpcMessageStreamProcessorOptions<Ctx>, 'send'>;
20-
2113
/**
2214
* Number of milliseconds to periodically send keep-alive ".ping" notification
2315
* messages. If not specified, will default to 15,000 (15 seconds). If 0, will
@@ -35,59 +27,44 @@ export interface RpcPersistentClientParams<Ctx = unknown> {
3527
/**
3628
* RPC client which automatically reconnects if disconnected.
3729
*/
38-
export class RpcPersistentClient<Ctx = unknown> {
30+
export class RpcPersistentClient {
3931
public channel: PersistentChannel;
40-
public rpc?: RpcDuplex<Ctx>;
41-
public readonly rpc$ = new ReplaySubject<RpcDuplex<Ctx>>(1);
32+
public rpc?: StreamingRpcClient;
33+
public readonly rpc$ = new ReplaySubject<StreamingRpcClient>(1);
4234

43-
constructor(params: RpcPersistentClientParams<Ctx>) {
35+
constructor(params: RpcPersistentClientParams) {
4436
const ping = params.ping ?? 15000;
4537
const codec = params.codec;
4638
const textEncoder = new TextEncoder();
4739
this.channel = new PersistentChannel(params.channel);
4840
this.channel.open$.pipe(filter((open) => open)).subscribe(() => {
4941
const close$ = this.channel.open$.pipe(filter((open) => !open));
50-
51-
const duplex = new RpcDuplex<Ctx>({
52-
client: new StreamingRpcClient({
53-
...(params.client || {}),
54-
send: (messages: msg.ReactiveRpcClientMessage[]): void => {
55-
const encoded = codec.encode(messages, codec.req);
56-
this.channel.send$(encoded).subscribe();
57-
},
58-
}),
59-
server: new RpcMessageStreamProcessor<Ctx>({
60-
...(params.server || {
61-
caller: new ApiRpcCaller({
62-
api: {},
63-
}),
64-
onNotification: () => {},
65-
}),
66-
send: (messages: (msg.ReactiveRpcServerMessage | msg.NotificationMessage)[]): void => {
67-
const encoded = codec.encode(messages, codec.req);
68-
this.channel.send$(encoded).subscribe();
69-
},
70-
}),
42+
const client = new StreamingRpcClient({
43+
...(params.client || {}),
44+
send: (messages: msg.ReactiveRpcClientMessage[]): void => {
45+
const encoded = codec.encode(messages, codec.req);
46+
this.channel.send$(encoded).subscribe();
47+
},
7148
});
7249

7350
this.channel.message$.pipe(takeUntil(close$)).subscribe((data) => {
7451
const encoded = typeof data === 'string' ? textEncoder.encode(data) : new Uint8Array(data);
7552
const messages = codec.decode(encoded, codec.res);
76-
duplex.onMessages((messages instanceof Array ? messages : [messages]) as msg.ReactiveRpcMessage[], {} as Ctx);
53+
client.onMessages((messages instanceof Array ? messages : [messages]) as msg.ReactiveRpcServerMessage[]);
7754
});
7855

7956
// Send ping notifications to keep the connection alive.
8057
if (ping) {
8158
timer(ping, ping)
8259
.pipe(takeUntil(close$))
8360
.subscribe(() => {
84-
duplex.notify(params.pingMethod || '.ping', undefined);
61+
client.notify(params.pingMethod || '.ping', undefined);
8562
});
8663
}
8764

8865
if (this.rpc) this.rpc.disconnect();
89-
this.rpc = duplex;
90-
this.rpc$.next(duplex);
66+
this.rpc = client;
67+
this.rpc$.next(client);
9168
});
9269
}
9370

0 commit comments

Comments
 (0)