Skip to content

Commit 01bdd3a

Browse files
[IMP] discuss: better bus types
1 parent 010f54e commit 01bdd3a

File tree

5 files changed

+62
-27
lines changed

5 files changed

+62
-27
lines changed

src/client.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type {
2525
AvailableFeatures,
2626
StartupData
2727
} from "#src/shared/types";
28+
import type { RequestMessage } from "#src/shared/bus-types";
2829
import type { TransportConfig, SessionId, SessionInfo } from "#src/models/session";
2930

3031
interface Consumers {
@@ -273,24 +274,24 @@ export class SfuClient extends EventTarget {
273274
if (this.state !== SfuClientState.CONNECTED) {
274275
throw new Error("Cannot start recording when not connected");
275276
}
276-
return this._bus?.request(
277+
return this._bus!.request(
277278
{
278279
name: CLIENT_REQUEST.START_RECORDING
279280
},
280281
{ batch: true }
281-
) as Promise<boolean>;
282+
);
282283
}
283284

284285
async stopRecording(): Promise<boolean> {
285286
if (this.state !== SfuClientState.CONNECTED) {
286287
throw new Error("Cannot stop recording when not connected");
287288
}
288-
return this._bus?.request(
289+
return this._bus!.request(
289290
{
290291
name: CLIENT_REQUEST.STOP_RECORDING
291292
},
292293
{ batch: true }
293-
) as Promise<boolean>;
294+
);
294295
}
295296

296297
/**
@@ -531,10 +532,10 @@ export class SfuClient extends EventTarget {
531532
});
532533
transport.on("produce", async ({ kind, rtpParameters, appData }, callback, errback) => {
533534
try {
534-
const result = (await this._bus!.request({
535+
const result = await this._bus!.request({
535536
name: CLIENT_REQUEST.INIT_PRODUCER,
536537
payload: { type: appData.type as StreamType, kind, rtpParameters }
537-
})) as { id: string };
538+
});
538539
callback({ id: result.id });
539540
} catch (error) {
540541
errback(error as Error);
@@ -626,7 +627,10 @@ export class SfuClient extends EventTarget {
626627
}
627628
}
628629

629-
private async _handleRequest({ name, payload }: BusMessage): Promise<JSONSerializable | void> {
630+
private async _handleRequest({
631+
name,
632+
payload
633+
}: RequestMessage): Promise<JSONSerializable | void> {
630634
switch (name) {
631635
case SERVER_REQUEST.INIT_CONSUMER: {
632636
const { id, kind, producerId, rtpParameters, sessionId, type, active } = payload;

src/models/session.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
STREAM_TYPE
2222
} from "#src/shared/enums.ts";
2323
import type { BusMessage, JSONSerializable, StartupData, StreamType } from "#src/shared/types";
24+
import type { RequestMessage } from "#src/shared/bus-types";
2425
import type { Bus } from "#src/shared/bus.ts";
2526
import type { Channel } from "#src/models/channel.ts";
2627
import { RECORDER_STATE } from "#src/models/recorder.ts";
@@ -361,15 +362,15 @@ export class Session extends EventEmitter {
361362
this._ctsTransport?.close();
362363
this._stcTransport?.close();
363364
});
364-
this._clientCapabilities = (await this.bus!.request({
365+
this._clientCapabilities = await this.bus!.request({
365366
name: SERVER_REQUEST.INIT_TRANSPORTS,
366367
payload: {
367368
capabilities: this._channel.router!.rtpCapabilities,
368-
stcConfig: this._createTransportConfig(this._stcTransport),
369-
ctsConfig: this._createTransportConfig(this._ctsTransport),
369+
stcConfig: this._createTransportConfig(this._stcTransport!),
370+
ctsConfig: this._createTransportConfig(this._ctsTransport!),
370371
producerOptionsByKind: config.rtc.producerOptionsByKind
371372
}
372-
})) as RtpCapabilities;
373+
});
373374
await Promise.all([
374375
this._ctsTransport.setMaxIncomingBitrate(config.MAX_BITRATE_IN),
375376
this._stcTransport.setMaxOutgoingBitrate(config.MAX_BITRATE_OUT)
@@ -628,7 +629,10 @@ export class Session extends EventEmitter {
628629
}
629630
}
630631

631-
private async _handleRequest({ name, payload }: BusMessage): Promise<JSONSerializable | void> {
632+
private async _handleRequest({
633+
name,
634+
payload
635+
}: RequestMessage): Promise<JSONSerializable | void> {
632636
switch (name) {
633637
case CLIENT_REQUEST.CONNECT_STC_TRANSPORT: {
634638
const { dtlsParameters } = payload;

src/shared/bus-types.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// eslint-disable-next-line node/no-unpublished-import
2+
import type { RtpCapabilities } from "mediasoup-client/lib/types";
3+
import type { CLIENT_REQUEST, SERVER_REQUEST } from "./enums";
4+
import type { BusMessage } from "./types";
5+
6+
export interface RequestMap {
7+
[CLIENT_REQUEST.CONNECT_CTS_TRANSPORT]: void;
8+
[CLIENT_REQUEST.CONNECT_STC_TRANSPORT]: void;
9+
[CLIENT_REQUEST.INIT_PRODUCER]: { id: string };
10+
[CLIENT_REQUEST.START_RECORDING]: boolean;
11+
[CLIENT_REQUEST.STOP_RECORDING]: boolean;
12+
[SERVER_REQUEST.INIT_CONSUMER]: void;
13+
[SERVER_REQUEST.INIT_TRANSPORTS]: RtpCapabilities;
14+
[SERVER_REQUEST.PING]: void;
15+
}
16+
17+
export type RequestName = keyof RequestMap;
18+
19+
export type RequestMessage<T extends RequestName = RequestName> = Extract<BusMessage, { name: T }>;
20+
21+
export type ResponseFrom<T extends RequestName> = RequestMap[T];

src/shared/bus.ts

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import type { WebSocket as NodeWebSocket } from "ws";
22

33
import type { JSONSerializable, BusMessage } from "./types";
4+
import type { RequestMessage, RequestName, ResponseFrom } from "./bus-types";
5+
46
export interface Payload {
57
/** The actual message content */
6-
message: BusMessage;
8+
message: BusMessage | JSONSerializable;
79
/** Request ID if this message expects a response */
810
needResponse?: string;
911
/** Response ID if this message is responding to a request */
@@ -46,11 +48,9 @@ export class Bus {
4648
/** Global ID counter for Bus instances */
4749
private static _idCount = 0;
4850
/** Message handler for incoming messages */
49-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
5051
public onMessage?: (message: BusMessage) => void;
5152
/** Request handler for incoming requests */
52-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
53-
public onRequest?: (request: BusMessage) => Promise<any | void>;
53+
public onRequest?: (request: RequestMessage) => Promise<JSONSerializable | void>;
5454
/** Unique bus instance identifier */
5555
public readonly id: number = Bus._idCount++;
5656
/** Request counter for generating unique request IDs */
@@ -96,16 +96,22 @@ export class Bus {
9696
/**
9797
* Sends a request and waits for a response
9898
*/
99-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
100-
request(message: BusMessage, options: RequestOptions = {}): Promise<JSONSerializable> {
99+
request<T extends RequestName>(
100+
message: RequestMessage<T>,
101+
options: RequestOptions = {}
102+
): Promise<ResponseFrom<T>> {
101103
const { timeout = 5000, batch } = options;
102104
const requestId = this._getNextRequestId();
103105
return new Promise((resolve, reject) => {
104106
const timeoutId = setTimeout(() => {
105107
reject(new Error("bus request timed out"));
106108
this._pendingRequests.delete(requestId);
107109
}, timeout);
108-
this._pendingRequests.set(requestId, { resolve, reject, timeout: timeoutId });
110+
this._pendingRequests.set(requestId, {
111+
resolve,
112+
reject,
113+
timeout: timeoutId
114+
});
109115
this._sendPayload(message, { needResponse: requestId, batch });
110116
});
111117
}
@@ -138,8 +144,7 @@ export class Bus {
138144
}
139145

140146
private _sendPayload(
141-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
142-
message: BusMessage,
147+
message: BusMessage | JSONSerializable,
143148
options: {
144149
needResponse?: string;
145150
responseTo?: string;
@@ -212,11 +217,11 @@ export class Bus {
212217
}
213218
} else if (needResponse) {
214219
// This is a request that expects a response
215-
const response = await this.onRequest?.(message);
216-
this._sendPayload(response!, { responseTo: needResponse });
220+
const response = await this.onRequest?.(message as RequestMessage);
221+
this._sendPayload(response ?? {}, { responseTo: needResponse });
217222
} else {
218223
// This is a plain message
219-
this.onMessage?.(message);
224+
this.onMessage?.(message as BusMessage);
220225
}
221226
}
222227
}

tests/bus.test.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { expect, describe, jest } from "@jest/globals";
44

55
import { Bus } from "#src/shared/bus";
66
import type { JSONSerializable, BusMessage } from "#src/shared/types";
7+
import { RequestMessage } from "#src/shared/bus-types.ts";
78

89
class MockTargetWebSocket extends EventTarget {
910
send(message: JSONSerializable) {
@@ -74,14 +75,14 @@ describe("Bus API", () => {
7475
return "pong";
7576
}
7677
};
77-
const response = await aliceBus.request("ping" as unknown as BusMessage);
78+
const response = await aliceBus.request("ping" as unknown as RequestMessage);
7879
expect(response).toBe("pong");
7980
});
8081
test("promises are rejected when the bus is closed", async () => {
8182
const { aliceSocket } = mockSocketPair();
8283
const aliceBus = new Bus(aliceSocket as unknown as WebSocket);
8384
let rejected = false;
84-
const promise = aliceBus.request("ping" as unknown as BusMessage);
85+
const promise = aliceBus.request("ping" as unknown as RequestMessage);
8586
aliceBus.close();
8687
try {
8788
await promise;
@@ -96,7 +97,7 @@ describe("Bus API", () => {
9697
const { aliceSocket } = mockSocketPair();
9798
const aliceBus = new Bus(aliceSocket as unknown as WebSocket);
9899
const timeout = 500;
99-
const promise = aliceBus.request("hello" as unknown as BusMessage, { timeout });
100+
const promise = aliceBus.request("hello" as unknown as RequestMessage, { timeout });
100101
jest.advanceTimersByTime(timeout);
101102
await expect(promise).rejects.toThrow();
102103
jest.useRealTimers();

0 commit comments

Comments
 (0)