Skip to content

Commit e5aaa79

Browse files
[IMP] wip/poc
1 parent f22ceba commit e5aaa79

File tree

10 files changed

+106
-10
lines changed

10 files changed

+106
-10
lines changed

src/client.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,23 @@ export class SfuClient extends EventTarget {
260260
await Promise.all(proms);
261261
return stats;
262262
}
263+
async startRecording() {
264+
return this._bus?.request(
265+
{
266+
name: CLIENT_REQUEST.START_RECORDING,
267+
},
268+
{ batch: true }
269+
);
270+
}
271+
272+
async stopRecording() {
273+
return this._bus?.request(
274+
{
275+
name: CLIENT_REQUEST.STOP_RECORDING,
276+
},
277+
{ batch: true }
278+
);
279+
}
263280

264281
/**
265282
* Updates the server with the info of the session (isTalking, isCameraOn,...) so that it can broadcast it to the

src/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export const PORT: number = Number(process.env.PORT) || 8070;
6767
/**
6868
* Whether the recording feature is enabled, true by default.
6969
*/
70-
export const RECORDING: boolean = !FALSY_INPUT.has(process.env.LOG_TIMESTAMP!);
70+
export const RECORDING: boolean = !FALSY_INPUT.has(process.env.RECORDING!);
7171

7272
/**
7373
* The number of workers to spawn (up to core limits) to manage RTC servers.

src/models/channel.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ export class Channel extends EventEmitter {
188188
this.createDate = now.toISOString();
189189
this.remoteAddress = remoteAddress;
190190
this.recorder = config.recording.enabled && options.useRecording ? new Recorder(this) : undefined;
191-
this.recorder?.todo();
192191
this.key = key ? Buffer.from(key, "base64") : undefined;
193192
this.uuid = crypto.randomUUID();
194193
this.name = `${remoteAddress}*${this.uuid.slice(-5)}`;

src/models/recorder.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,25 @@ const logger = new Logger("RECORDER");
66

77
export class Recorder extends EventEmitter {
88
channel: Channel;
9+
state: "started" | "stopped" = "stopped";
10+
ffmpeg = null;
911

1012
constructor(channel: Channel) {
1113
super();
1214
this.channel = channel;
1315
}
1416

15-
todo() {
16-
logger.warn("TODO: Everything");
17+
async start() {
18+
this.state = "started";
19+
logger.trace("TO IMPLEMENT");
20+
// TODO ffmpeg instance creation, start
21+
return { state: this.state };
22+
}
23+
24+
async stop() {
25+
this.state = "stopped";
26+
logger.trace("TO IMPLEMENT");
27+
// TODO ffmpeg instance stop, cleanup, save,...
28+
return { state: this.state };
1729
}
1830
}

src/models/session.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ const logger = new Logger("SESSION");
110110
*
111111
* @fires Session#stateChange - Emitted when session state changes
112112
* @fires Session#close - Emitted when session is closed
113+
* @fires Session#producer - Emitted when a new producer is created
113114
*/
114115
export class Session extends EventEmitter {
115116
/** Communication bus for WebSocket messaging */
@@ -138,9 +139,9 @@ export class Session extends EventEmitter {
138139
camera: null,
139140
screen: null
140141
};
141-
public permissions: SessionPermissions = {
142+
public readonly permissions: SessionPermissions = Object.seal({
142143
recording: false
143-
};
144+
});
144145
/** Parent channel containing this session */
145146
private readonly _channel: Channel;
146147
/** Recovery timeouts for failed consumers */
@@ -184,9 +185,26 @@ export class Session extends EventEmitter {
184185

185186
set state(state: SESSION_STATE) {
186187
this._state = state;
188+
/**
189+
* @event Session#stateChange
190+
* @type {{ state: SESSION_STATE }}
191+
*/
187192
this.emit("stateChange", state);
188193
}
189194

195+
updatePermissions(permissions: SessionPermissions | undefined): void {
196+
if (!permissions) {
197+
return;
198+
}
199+
for (const key of Object.keys(this.permissions) as (keyof SessionPermissions)[]) {
200+
const newVal = permissions[key];
201+
if (newVal === undefined) {
202+
continue;
203+
}
204+
this.permissions[key] = Boolean(permissions[key]);
205+
}
206+
}
207+
190208
async getProducerBitRates(): Promise<ProducerBitRates> {
191209
const bitRates: ProducerBitRates = {};
192210
const proms: Promise<void>[] = [];
@@ -643,8 +661,25 @@ export class Session extends EventEmitter {
643661
logger.debug(`[${this.name}] producing ${type}: ${codec?.mimeType}`);
644662
this._updateRemoteConsumers();
645663
this._broadcastInfo();
664+
/**
665+
* @event Session#producer
666+
* @type {{ type: StreamType, producer: Producer }}
667+
*/
668+
this.emit("producer", { type, producer });
646669
return { id: producer.id };
647670
}
671+
case CLIENT_REQUEST.START_RECORDING: {
672+
if (this.permissions.recording && this._channel.recorder) {
673+
return this._channel.recorder.start();
674+
}
675+
return;
676+
}
677+
case CLIENT_REQUEST.STOP_RECORDING: {
678+
if (this.permissions.recording && this._channel.recorder) {
679+
return this._channel.recorder.stop();
680+
}
681+
return;
682+
}
648683
default:
649684
logger.warn(`[${this.name}] Unknown request type: ${name}`);
650685
throw new Error(`Unknown request type: ${name}`);

src/services/ws.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,7 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session {
130130
}
131131
const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch });
132132
const { session } = Channel.join(channel.uuid, session_id);
133-
if (permissions) {
134-
Object.assign(session.permissions, permissions);
135-
}
133+
session.updatePermissions(permissions);
136134
webSocket.send(JSON.stringify(session.availableFeatures)); // client can start using ws after this message.
137135
session.once("close", ({ code }: { code: string }) => {
138136
let wsCloseCode = WS_CLOSE_CODE.CLEAN;

src/shared/enums.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ export enum CLIENT_REQUEST {
3333
/** Requests the server to connect the server-to-client transport */
3434
CONNECT_STC_TRANSPORT = "CONNECT_STC_TRANSPORT",
3535
/** Requests the creation of a consumer that is used to upload a track to the server */
36-
INIT_PRODUCER = "INIT_PRODUCER"
36+
INIT_PRODUCER = "INIT_PRODUCER",
37+
/** Requests to start recording of the call */
38+
START_RECORDING = "START_RECORDING",
39+
/** Requests to stop recording of the call */
40+
STOP_RECORDING = "STOP_RECORDING"
3741
}
3842

3943
export enum CLIENT_MESSAGE {

src/shared/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ export type BusMessage =
5454
name: typeof CLIENT_REQUEST.INIT_PRODUCER;
5555
payload: { type: StreamType; kind: MediaKind; rtpParameters: RtpParameters };
5656
}
57+
| { name: typeof CLIENT_REQUEST.START_RECORDING; payload?: never }
58+
| { name: typeof CLIENT_REQUEST.STOP_RECORDING; payload?: never }
5759
| {
5860
name: typeof SERVER_MESSAGE.BROADCAST;
5961
payload: { senderId: SessionId; message: JSONSerializable };

src/utils/utils.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const ASCII = {
1212
green: "\x1b[32m",
1313
yellow: "\x1b[33m",
1414
white: "\x1b[37m",
15+
cyan: "\x1b[36m",
1516
default: "\x1b[0m"
1617
}
1718
} as const;
@@ -48,6 +49,19 @@ export interface ParseBodyOptions {
4849
json?: boolean;
4950
}
5051

52+
function getCallChain(depth: number = 8): string {
53+
const stack = new Error().stack?.split("\n").slice(2, depth + 2) ?? [];
54+
return stack
55+
.map(line => {
56+
const match = line.trim().match(/^at\s+(.*?)\s+\(/);
57+
return match ? match[1] : null;
58+
})
59+
.slice(1, depth + 1)
60+
.filter(Boolean)
61+
.reverse()
62+
.join(" > ");
63+
}
64+
5165
export class Logger {
5266
private readonly _name: string;
5367
private readonly _colorize: (text: string, color?: string) => string;
@@ -83,6 +97,9 @@ export class Logger {
8397
verbose(text: string): void {
8498
this._log(console.log, ":VERBOSE:", text, ASCII.color.white);
8599
}
100+
trace(message: string, { depth = 8 }: { depth?: number } = {}): void {
101+
this._log(console.log, ":TRACE:", `${getCallChain(depth)} ${message}`, ASCII.color.cyan);
102+
}
86103
private _generateTimeStamp(): string {
87104
const now = new Date();
88105
return now.toISOString() + " ";

tests/network.test.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,4 +284,16 @@ describe("Full network", () => {
284284
expect(event1.detail.payload.message).toBe(message);
285285
expect(event2.detail.payload.message).toBe(message);
286286
});
287+
test("POC RECORDING", async () => {
288+
const channelUUID = await network.getChannelUUID();
289+
const user1 = await network.connect(channelUUID, 1);
290+
await once(user1.session, "stateChange");
291+
const sender = await network.connect(channelUUID, 3);
292+
await once(sender.session, "stateChange");
293+
sender.session.updatePermissions({ recording: true });
294+
const startResult = await sender.sfuClient.startRecording() as { state: string };
295+
expect(startResult.state).toBe("started");
296+
const stopResult = await sender.sfuClient.stopRecording() as { state: string };
297+
expect(stopResult.state).toBe("stopped");
298+
});
287299
});

0 commit comments

Comments
 (0)