Skip to content

Commit 27d7693

Browse files
[WIP] recording: state broadcasting, updated tests
1 parent 358ce98 commit 27d7693

File tree

12 files changed

+165
-65
lines changed

12 files changed

+165
-65
lines changed

package-lock.json

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"@typescript-eslint/eslint-plugin": "8.32.1",
3838
"@typescript-eslint/parser": "8.32.1",
3939
"eslint": "8.57.1",
40+
"eslint-config-prettier": "^10.1.8",
4041
"eslint-plugin-import": "^2.25.3",
4142
"eslint-plugin-jest": "28.11.0",
4243
"eslint-plugin-node": "^11.1.0",

src/client.ts

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ import {
1818
SERVER_REQUEST,
1919
WS_CLOSE_CODE
2020
} from "#src/shared/enums.ts";
21-
import type { JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types";
21+
import type {
22+
JSONSerializable,
23+
StreamType,
24+
BusMessage,
25+
AvailableFeatures,
26+
StartupData
27+
} from "#src/shared/types";
2228
import type { TransportConfig, SessionId, SessionInfo } from "#src/models/session";
2329

2430
interface Consumers {
@@ -55,11 +61,13 @@ export enum CLIENT_UPDATE {
5561
/** A session has left the channel */
5662
DISCONNECT = "disconnect",
5763
/** Session info has changed */
58-
INFO_CHANGE = "info_change"
64+
INFO_CHANGE = "info_change",
65+
CHANNEL_INFO_CHANGE = "channel_info_change"
5966
}
6067
type ClientUpdatePayload =
6168
| { senderId: SessionId; message: JSONSerializable }
6269
| { sessionId: SessionId }
70+
| { isRecording: boolean }
6371
| Record<SessionId, SessionInfo>
6472
| {
6573
type: StreamType;
@@ -142,9 +150,10 @@ export class SfuClient extends EventTarget {
142150
/** Connection errors encountered */
143151
public errors: Error[] = [];
144152
public availableFeatures: AvailableFeatures = {
145-
"rtc": false,
146-
"recording": false,
153+
rtc: false,
154+
recording: false
147155
};
156+
public isRecording: boolean = false;
148157
/** Current client state */
149158
private _state: SfuClientState = SfuClientState.DISCONNECTED;
150159
/** Communication bus */
@@ -266,7 +275,7 @@ export class SfuClient extends EventTarget {
266275
}
267276
return this._bus?.request(
268277
{
269-
name: CLIENT_REQUEST.START_RECORDING,
278+
name: CLIENT_REQUEST.START_RECORDING
270279
},
271280
{ batch: true }
272281
);
@@ -278,7 +287,7 @@ export class SfuClient extends EventTarget {
278287
}
279288
return this._bus?.request(
280289
{
281-
name: CLIENT_REQUEST.STOP_RECORDING,
290+
name: CLIENT_REQUEST.STOP_RECORDING
282291
},
283292
{ batch: true }
284293
);
@@ -473,7 +482,13 @@ export class SfuClient extends EventTarget {
473482
webSocket.addEventListener(
474483
"message",
475484
(message) => {
476-
this.availableFeatures = JSON.parse(message.data) as AvailableFeatures;
485+
if (message.data) {
486+
const { availableFeatures, isRecording } = JSON.parse(
487+
message.data
488+
) as StartupData;
489+
this.availableFeatures = availableFeatures;
490+
this.isRecording = isRecording;
491+
}
477492
resolve(new Bus(webSocket));
478493
},
479494
{ once: true }
@@ -604,6 +619,10 @@ export class SfuClient extends EventTarget {
604619
case SERVER_MESSAGE.INFO_CHANGE:
605620
this._updateClient(CLIENT_UPDATE.INFO_CHANGE, payload);
606621
break;
622+
case SERVER_MESSAGE.CHANNEL_INFO_CHANGE:
623+
this.isRecording = payload.isRecording;
624+
this._updateClient(CLIENT_UPDATE.CHANNEL_INFO_CHANGE, payload);
625+
break;
607626
}
608627
}
609628

src/config.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type { ProducerOptions } from "mediasoup-client/lib/Producer";
1111
const FALSY_INPUT = new Set(["disable", "false", "none", "no", "0"]);
1212
type LogLevel = "none" | "error" | "warn" | "info" | "debug" | "verbose";
1313
type WorkerLogLevel = "none" | "error" | "warn" | "debug";
14+
const testingMode = Boolean(process.env.JEST_WORKER_ID);
1415

1516
// ------------------------------------------------------------
1617
// ------------------ ENV VARIABLES -----------------------
@@ -22,7 +23,7 @@ type WorkerLogLevel = "none" | "error" | "warn" | "debug";
2223
* e.g: AUTH_KEY=u6bsUQEWrHdKIuYplirRnbBmLbrKV5PxKG7DtA71mng=
2324
*/
2425
export const AUTH_KEY: string = process.env.AUTH_KEY!;
25-
if (!AUTH_KEY && !process.env.JEST_WORKER_ID) {
26+
if (!AUTH_KEY && !testingMode) {
2627
throw new Error(
2728
"AUTH_KEY env variable is required, it is not possible to authenticate requests without it"
2829
);
@@ -34,7 +35,7 @@ if (!AUTH_KEY && !process.env.JEST_WORKER_ID) {
3435
* e.g: PUBLIC_IP=190.165.1.70
3536
*/
3637
export const PUBLIC_IP: string = process.env.PUBLIC_IP!;
37-
if (!PUBLIC_IP && !process.env.JEST_WORKER_ID) {
38+
if (!PUBLIC_IP && !testingMode) {
3839
throw new Error(
3940
"PUBLIC_IP env variable is required, clients cannot establish webRTC connections without it"
4041
);
@@ -67,7 +68,7 @@ export const PORT: number = Number(process.env.PORT) || 8070;
6768
/**
6869
* Whether the recording feature is enabled, false by default.
6970
*/
70-
export const RECORDING: boolean = Boolean(process.env.RECORDING);
71+
export const RECORDING: boolean = Boolean(process.env.RECORDING) || testingMode;
7172

7273
/**
7374
* The number of workers to spawn (up to core limits) to manage RTC servers.
@@ -199,7 +200,7 @@ export const timeouts: TimeoutConfig = Object.freeze({
199200
// how long before a channel is closed after the last session leaves
200201
channel: 60 * 60_000,
201202
// how long to wait to gather messages before sending through the bus
202-
busBatch: process.env.JEST_WORKER_ID ? 10 : 300
203+
busBatch: testingMode ? 10 : 300
203204
});
204205

205206
export const recording = Object.freeze({
@@ -212,16 +213,15 @@ export const recording = Object.freeze({
212213
audioCodec: "aac",
213214
audioLimit: 20,
214215
cameraLimit: 4, // how many camera can be merged into one recording
215-
screenLimit: 1,
216+
screenLimit: 1
216217
});
217218

218219
// TODO: This should probably be env variable, and at least documented so that deployment can open these ports.
219220
export const dynamicPorts = Object.freeze({
220221
min: 50000,
221-
max: 59999,
222+
max: 59999
222223
});
223224

224-
225225
// how many errors can occur before the session is closed, recovery attempts will be made until this limit is reached
226226
export const maxSessionErrors: number = 6;
227227

src/models/channel.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
} from "#src/models/session.ts";
1515
import { Recorder } from "#src/models/recorder.ts";
1616
import { getWorker, type RtcWorker } from "#src/services/resources.ts";
17+
import { SERVER_MESSAGE } from "#src/shared/enums.ts";
1718

1819
const logger = new Logger("CHANNEL");
1920

@@ -187,7 +188,11 @@ export class Channel extends EventEmitter {
187188
const now = new Date();
188189
this.createDate = now.toISOString();
189190
this.remoteAddress = remoteAddress;
190-
this.recorder = config.recording.enabled && options.recordingAddress ? new Recorder(this, options.recordingAddress) : undefined;
191+
this.recorder =
192+
config.recording.enabled && options.recordingAddress
193+
? new Recorder(this, options.recordingAddress)
194+
: undefined;
195+
this.recorder?.on("stateChange", () => this._broadcastState());
191196
this.key = key ? Buffer.from(key, "base64") : undefined;
192197
this.uuid = crypto.randomUUID();
193198
this.name = `${remoteAddress}*${this.uuid.slice(-5)}`;
@@ -315,6 +320,28 @@ export class Channel extends EventEmitter {
315320
this.emit("close", this.uuid);
316321
}
317322

323+
/**
324+
* Broadcast the state of this channel to all its participants
325+
*/
326+
private _broadcastState() {
327+
for (const session of this.sessions.values()) {
328+
// TODO maybe the following should be on session and some can be made in common with the startupData getter.
329+
if (!session.bus) {
330+
logger.warn(`tried to broadcast state to session ${session.id}, but had no Bus`);
331+
continue;
332+
}
333+
session.bus.send(
334+
{
335+
name: SERVER_MESSAGE.CHANNEL_INFO_CHANGE,
336+
payload: {
337+
isRecording: Boolean(this.recorder?.isRecording)
338+
}
339+
},
340+
{ batch: true }
341+
);
342+
}
343+
}
344+
318345
/**
319346
* @param event - Close event with session ID
320347
* @fires Channel#sessionLeave

src/models/recorder.ts

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
import { EventEmitter } from "node:events";
2-
import type { Channel } from "./channel";
3-
import { getFolder } from "#src/services/resources.ts";
4-
import type { Folder } from "#src/services/resources.ts";
2+
import { getFolder, type Folder } from "#src/services/resources.ts";
53
import { Logger } from "#src/utils/utils.ts";
64

5+
import type { Channel } from "./channel";
6+
77
export enum RECORDER_STATE {
88
STARTED = "started",
9-
STOPPED = "stopped",
9+
STOPPED = "stopped"
1010
}
1111
const logger = new Logger("RECORDER");
1212

1313
export class Recorder extends EventEmitter {
1414
channel: Channel;
15-
state: RECORDER_STATE = RECORDER_STATE.STOPPED;
1615
folder: Folder | undefined;
1716
ffmpeg = null;
1817
/** Path to which the final recording will be uploaded to */
1918
recordingAddress: string;
19+
private _state: RECORDER_STATE = RECORDER_STATE.STOPPED;
2020

2121
constructor(channel: Channel, recordingAddress: string) {
2222
super();
@@ -26,10 +26,10 @@ export class Recorder extends EventEmitter {
2626

2727
async start() {
2828
if (this.state === RECORDER_STATE.STOPPED) {
29-
this.folder = getFolder();
29+
this.folder = getFolder();
3030
this.state = RECORDER_STATE.STARTED;
31-
logger.trace("TO IMPLEMENT");
32-
// TODO ffmpeg instance creation for recording to folder.path with proper name, start, build timestamps object
31+
logger.trace("TO IMPLEMENT");
32+
// TODO ffmpeg instance creation for recording to folder.path with proper name, start, build timestamps object
3333
}
3434
this._record();
3535
return { state: this.state };
@@ -38,7 +38,11 @@ export class Recorder extends EventEmitter {
3838
async stop() {
3939
if (this.state === RECORDER_STATE.STARTED) {
4040
logger.trace("TO IMPLEMENT");
41-
await this.folder!.seal("test-name");
41+
try {
42+
await this.folder!.seal("test-name");
43+
} catch {
44+
logger.verbose("failed to save the recording"); // TODO maybe warn and give more info
45+
}
4246
this.folder = undefined;
4347
// TODO ffmpeg instance stop, cleanup,
4448
// only resolve promise and switch state when completely ready to start a new recording.
@@ -47,11 +51,24 @@ export class Recorder extends EventEmitter {
4751
return { state: this.state };
4852
}
4953

54+
get isRecording(): boolean {
55+
return this.state === RECORDER_STATE.STARTED;
56+
}
57+
58+
get state(): RECORDER_STATE {
59+
return this._state;
60+
}
61+
62+
set state(state: RECORDER_STATE) {
63+
this._state = state;
64+
this.emit("stateChange", state);
65+
}
66+
5067
/**
5168
* @param video whether we want to record videos or not (will always record audio)
5269
*/
5370
_record(video: boolean = false) {
54-
console.trace(`TO IMPLEMENT: recording channel ${this.channel.name}, video: ${video}`);
71+
logger.trace(`TO IMPLEMENT: recording channel ${this.channel.name}, video: ${video}`);
5572
// iterate all producers on all sessions of the channel, create a ffmpeg for each,
5673
// save them on a map by session id+type.
5774
// check if recording for that session id+type is already in progress

src/models/session.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { EventEmitter } from "node:events";
22

33
import type {
4-
IceParameters,
5-
IceCandidate,
6-
DtlsParameters,
7-
SctpParameters,
84
Consumer,
5+
DtlsParameters,
6+
IceCandidate,
7+
IceParameters,
98
Producer,
10-
WebRtcTransport,
11-
RtpCapabilities
9+
RtpCapabilities,
10+
SctpParameters,
11+
WebRtcTransport
1212
} from "mediasoup/node/lib/types";
1313

1414
import * as config from "#src/config.ts";
@@ -20,9 +20,10 @@ import {
2020
SERVER_REQUEST,
2121
STREAM_TYPE
2222
} from "#src/shared/enums.ts";
23-
import type {JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types";
23+
import type { BusMessage, JSONSerializable, StartupData, StreamType } from "#src/shared/types";
2424
import type { Bus } from "#src/shared/bus.ts";
2525
import type { Channel } from "#src/models/channel.ts";
26+
import { RECORDER_STATE } from "#src/models/recorder.ts";
2627

2728
export type SessionId = number | string;
2829
export type SessionInfo = {
@@ -168,11 +169,16 @@ export class Session extends EventEmitter {
168169
this.setMaxListeners(config.CHANNEL_SIZE * 2);
169170
}
170171

171-
get availableFeatures(): AvailableFeatures {
172+
get startupData(): StartupData {
172173
return {
173-
"rtc": Boolean(this._channel.router),
174-
"recording": Boolean(this._channel.router && this._channel.recorder && this.permissions.recording)
175-
}
174+
availableFeatures: {
175+
rtc: Boolean(this._channel.router),
176+
recording: Boolean(
177+
this._channel.router && this._channel.recorder && this.permissions.recording
178+
)
179+
},
180+
isRecording: this._channel.recorder?.state === RECORDER_STATE.STARTED
181+
};
176182
}
177183

178184
get name(): string {

src/services/ws.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session {
131131
const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch });
132132
const { session } = Channel.join(channel.uuid, session_id);
133133
session.updatePermissions(permissions);
134-
webSocket.send(JSON.stringify(session.availableFeatures)); // client can start using ws after this message.
134+
webSocket.send(JSON.stringify(session.startupData)); // client can start using ws after this message.
135135
session.once("close", ({ code }: { code: string }) => {
136136
let wsCloseCode = WS_CLOSE_CODE.CLEAN;
137137
switch (code) {

src/shared/enums.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ export enum SERVER_MESSAGE {
2424
/** Signals the clients that one of the session in their channel has left */
2525
SESSION_LEAVE = "SESSION_LEAVE",
2626
/** Signals the clients that the info (talking, mute,...) of one of the session in their channel has changed */
27-
INFO_CHANGE = "S_INFO_CHANGE"
27+
INFO_CHANGE = "S_INFO_CHANGE",
28+
/** Signals the clients that the info of the channel (isRecording,...) has changed */
29+
CHANNEL_INFO_CHANGE = "C_INFO_CHANGE"
2830
}
2931

3032
export enum CLIENT_REQUEST {

0 commit comments

Comments
 (0)