Skip to content

Commit 78e7dc1

Browse files
[wip] recording_task/transport lifecycle
1 parent ac61218 commit 78e7dc1

File tree

6 files changed

+176
-57
lines changed

6 files changed

+176
-57
lines changed

src/config.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import os from "node:os";
33
import type {
44
RtpCodecCapability,
55
WorkerSettings,
6-
WebRtcServerOptions
6+
WebRtcServerOptions,
7+
PlainTransportOptions
78
} from "mediasoup/node/lib/types";
89
// eslint-disable-next-line node/no-unpublished-import
910
import type { ProducerOptions } from "mediasoup-client/lib/Producer";
@@ -240,6 +241,7 @@ const baseProducerOptions: ProducerOptions = {
240241
export interface RtcConfig {
241242
readonly workerSettings: WorkerSettings;
242243
readonly rtcServerOptions: WebRtcServerOptions;
244+
readonly plainTransportOptions: PlainTransportOptions;
243245
readonly rtcTransportOptions: {
244246
readonly maxSctpMessageSize: number;
245247
readonly sctpSendBufferSize: number;
@@ -283,6 +285,11 @@ export const rtc: RtcConfig = Object.freeze({
283285
maxSctpMessageSize: MAX_BUF_IN,
284286
sctpSendBufferSize: MAX_BUF_OUT
285287
},
288+
plainTransportOptions: {
289+
listenIp: { ip: "0.0.0.0", announcedIp: PUBLIC_IP },
290+
rtcpMux: true,
291+
comedia: false
292+
},
286293
producerOptionsByKind: {
287294
/** Audio producer options */
288295
audio: baseProducerOptions,

src/models/ffmpeg.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,31 @@
11
/* eslint-disable prettier/prettier */
22
import { EventEmitter } from "node:events";
3+
import { Logger } from "#src/utils/utils";
4+
import type { STREAM_TYPE } from "#src/shared/enums";
5+
6+
const logger = new Logger("FFMPEG");
7+
8+
// TODO may need to give more or less stuff here, will know later.
9+
export type RtpData = {
10+
payloadType: number;
11+
clockRate: number;
12+
codec: string;
13+
channels: number | undefined;
14+
type: STREAM_TYPE;
15+
};
316

417
let currentId = 0;
518

619
export class FFMPEG extends EventEmitter {
720
readonly id: number;
8-
constructor() {
21+
private readonly rtp: RtpData;
22+
constructor(rtp: RtpData) {
923
super();
24+
this.rtp = rtp;
1025
this.id = currentId++;
26+
logger.trace(`creating FFMPEG for ${this.id} on ${this.rtp.type}`);
1127
}
1228

13-
async kill() {}
29+
async kill() {
30+
}
1431
}

src/models/recorder.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ const logger = new Logger("RECORDER");
3131
* accompanied with a metadata file describing the recording (timestamps, ids,...).
3232
*
3333
* These raw recordings can then be used for further processing (transcription, compilation,...).
34+
*
35+
* Recorder acts at the channel level, managing the creation and closure of sessions in that channel,
36+
* whereas the recording_task acts at the session level, managing the recording of an individual session
37+
* and following its producer lifecycle.
3438
*/
3539
export class Recorder extends EventEmitter {
3640
/**

src/models/recording_task.ts

Lines changed: 140 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
/* eslint-disable @typescript-eslint/no-unused-vars */
22
import { EventEmitter } from "node:events";
33

4+
import type { Producer, Consumer, PlainTransport } from "mediasoup/node/lib/types";
5+
46
import { Session } from "#src/models/session.ts";
57
import { Logger } from "#src/utils/utils.ts";
6-
import { FFMPEG } from "#src/models/ffmpeg.ts";
8+
import { FFMPEG, type RtpData } from "#src/models/ffmpeg.ts";
9+
import { rtc } from "#src/config";
10+
import { getPort, type DynamicPort } from "#src/services/resources";
711

8-
import type { PlainTransport } from "mediasoup/node/lib/PlainTransportTypes";
12+
import { STREAM_TYPE } from "#src/shared/enums";
913

1014
export type RecordingParameters = {
1115
audio: boolean;
@@ -14,14 +18,24 @@ export type RecordingParameters = {
1418
};
1519

1620
export enum RECORDING_TASK_EVENT {
17-
AUDIO_STARTED = "audio-started",
18-
AUDIO_STOPPED = "audio-stopped",
19-
CAMERA_STARTED = "camera-started",
20-
CAMERA_STOPPED = "camera-stopped",
21-
SCREEN_STARTED = "screen-started",
22-
SCREEN_STOPPED = "screen-stopped"
21+
UPDATE = "update"
2322
}
2423

24+
type RecordingData = {
25+
active: boolean; // active is different from boolean(ffmpeg) so we can flag synchronously and avoid race conditions
26+
transport?: PlainTransport;
27+
consumer?: Consumer;
28+
ffmpeg?: FFMPEG;
29+
port?: DynamicPort;
30+
type: STREAM_TYPE;
31+
};
32+
33+
type RecordingDataByStreamType = {
34+
[STREAM_TYPE.AUDIO]: RecordingData;
35+
[STREAM_TYPE.CAMERA]: RecordingData;
36+
[STREAM_TYPE.SCREEN]: RecordingData;
37+
};
38+
2539
const logger = new Logger("RECORDING_TASK");
2640

2741
export class RecordingTask extends EventEmitter {
@@ -30,72 +44,145 @@ export class RecordingTask extends EventEmitter {
3044
*/
3145
isStopped = false;
3246
private _session: Session;
33-
private _audio: boolean = false;
34-
private _camera: boolean = false;
35-
private _screen: boolean = false;
36-
private _audioRTP?: PlainTransport = undefined;
37-
private _cameraRTP?: PlainTransport = undefined;
38-
private _screenRTP?: PlainTransport = undefined;
39-
private _audioFFFMPEG?: FFMPEG = undefined;
40-
private _cameraFFMPEG?: FFMPEG = undefined;
41-
private _screenFFMPEG?: FFMPEG = undefined;
47+
private readonly recordingDataByStreamType: RecordingDataByStreamType = {
48+
[STREAM_TYPE.AUDIO]: {
49+
active: false,
50+
type: STREAM_TYPE.AUDIO
51+
},
52+
[STREAM_TYPE.CAMERA]: {
53+
active: false,
54+
type: STREAM_TYPE.CAMERA
55+
},
56+
[STREAM_TYPE.SCREEN]: {
57+
active: false,
58+
type: STREAM_TYPE.SCREEN
59+
}
60+
};
4261

4362
/**
4463
* TODO when set, start/stop recording process (create a RTP, create FFMPEG/Gstreamer process, pipe RTP to FFMPEG/Gstreamer)
4564
* The initialization process will likely be async and prone to race conditions, once the process has started, we should
4665
* remember to check if this.isStopped, and if so, stop the process.
4766
*/
4867
set audio(value: boolean) {
49-
if (value === this._audio || this.isStopped) {
68+
this._setRecording(STREAM_TYPE.AUDIO, value);
69+
}
70+
set camera(value: boolean) {
71+
this._setRecording(STREAM_TYPE.CAMERA, value);
72+
}
73+
set screen(value: boolean) {
74+
this._setRecording(STREAM_TYPE.SCREEN, value);
75+
}
76+
77+
constructor(session: Session, { audio, camera, screen }: RecordingParameters) {
78+
super();
79+
this._session = session;
80+
this._session.on("producer", this._onSessionProducer);
81+
this.audio = audio;
82+
this.camera = camera;
83+
this.screen = screen;
84+
}
85+
86+
private async _setRecording(type: STREAM_TYPE, state: boolean) {
87+
const data = this.recordingDataByStreamType[type];
88+
if (data.active === state) {
5089
return;
5190
}
52-
this._audio = value;
53-
logger.trace(
54-
`TO IMPLEMENT: recording task for session ${this._session.id} - audio: ${value}`
55-
);
56-
logger.debug(`rtp: ${this._audioRTP}, ffmpeg: ${this._audioFFFMPEG}`);
57-
if (this._audio) {
58-
this._audioFFFMPEG = new FFMPEG(); // should take RTP info as param
59-
this.emit(RECORDING_TASK_EVENT.AUDIO_STARTED, this._audioFFFMPEG.id);
60-
} else if (this._audioFFFMPEG) {
61-
this.emit(RECORDING_TASK_EVENT.AUDIO_STOPPED, this._audioFFFMPEG.id);
62-
this._audioFFFMPEG.kill();
63-
this._audioFFFMPEG = undefined;
91+
data.active = state;
92+
const producer = this._session.producers[type];
93+
if (!producer) {
94+
return; // will be handled later when the session starts producing
6495
}
96+
this._updateProcess(data, producer);
6597
}
66-
set camera(value: boolean) {
67-
if (value === this._camera || this.isStopped) {
98+
99+
private async _onSessionProducer({
100+
type,
101+
producer
102+
}: {
103+
type: STREAM_TYPE;
104+
producer: Producer;
105+
}) {
106+
const data = this.recordingDataByStreamType[type];
107+
if (!data.active) {
68108
return;
69109
}
70-
this._camera = value;
71-
logger.trace(
72-
`TO IMPLEMENT: recording task for session ${this._session.id} - camera: ${value}`
73-
);
74-
logger.debug(`rtp: ${this._cameraRTP}, ffmpeg: ${this._cameraFFMPEG}`);
110+
this._updateProcess(data, producer);
75111
}
76-
set screen(value: boolean) {
77-
if (value === this._screen || this.isStopped) {
78-
return;
112+
113+
private async _updateProcess(data: RecordingData, producer: Producer) {
114+
if (data.active) {
115+
if (data.ffmpeg) {
116+
return;
117+
}
118+
data.port = getPort();
119+
try {
120+
data.ffmpeg = new FFMPEG(await this._createRtp(producer, data));
121+
if (data.active) {
122+
if (data.ffmpeg) {
123+
// TODO emit starting
124+
}
125+
logger.verbose(
126+
`starting recording process for ${this._session.name} ${data.type}`
127+
);
128+
return;
129+
}
130+
return;
131+
} catch {
132+
logger.warn(
133+
`failed at starting the recording for ${this._session.name} ${data.type}`
134+
);
135+
}
79136
}
80-
this._screen = value;
81-
logger.trace(
82-
`TO IMPLEMENT: recording task for session ${this._session.id} - screen: ${value}`
83-
);
84-
logger.debug(`rtp: ${this._screenRTP}, ffmpeg: ${this._screenFFMPEG}`);
137+
// TODO emit ending
138+
this._clearData(data.type);
85139
}
86140

87-
constructor(session: Session, { audio, camera, screen }: RecordingParameters) {
88-
super();
89-
this._session = session;
90-
this._audio = audio ?? false;
91-
this._camera = camera ?? false;
92-
this._screen = screen ?? false;
93-
logger.trace(
94-
`TO IMPLEMENT: recording task for session ${this._session.id} - audio: ${this._audio}, camera: ${this._camera}, screen: ${this._screen}`
141+
async _createRtp(producer: Producer, data: RecordingData): Promise<RtpData> {
142+
const transport = await this._session.router?.createPlainTransport(
143+
rtc.plainTransportOptions
95144
);
145+
if (!transport) {
146+
throw new Error(`Failed at creating a plain transport for`);
147+
}
148+
transport.connect({
149+
ip: "0.0.0.0",
150+
port: data.port!.number
151+
});
152+
data.transport = transport;
153+
data.consumer = await transport.consume({
154+
producerId: producer.id,
155+
rtpCapabilities: this._session.router!.rtpCapabilities,
156+
paused: true
157+
});
158+
const codecData = producer.rtpParameters.codecs[0];
159+
return {
160+
payloadType: codecData.payloadType,
161+
clockRate: codecData.clockRate,
162+
codec: codecData.mimeType.replace(`${producer.kind}`, ""),
163+
channels: producer.kind === "audio" ? codecData.channels : undefined,
164+
type: data.type
165+
};
166+
}
167+
168+
private _clearData(type: STREAM_TYPE) {
169+
const data = this.recordingDataByStreamType[type];
170+
data.active = false;
171+
data.ffmpeg?.kill();
172+
data.ffmpeg = undefined;
173+
data.transport?.close();
174+
data.transport = undefined;
175+
data.consumer?.close();
176+
data.consumer = undefined;
177+
data.port?.release();
178+
data.port = undefined;
96179
}
97180

98181
async stop() {
99182
this.isStopped = true;
183+
this._session.off("producer", this._onSessionProducer);
184+
for (const type of Object.values(STREAM_TYPE)) {
185+
this._clearData(type);
186+
}
100187
}
101188
}

src/models/session.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ export class Session extends EventEmitter {
197197
return this._state;
198198
}
199199

200+
get router() {
201+
return this._channel.router;
202+
}
203+
200204
set state(state: SESSION_STATE) {
201205
this._state = state;
202206
/**

src/services/resources.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export function getFolder(): Folder {
112112
return new Folder(`${tempDir}/${Date.now()}-${unique++}`);
113113
}
114114

115-
class DynamicPort {
115+
export class DynamicPort {
116116
number: number;
117117

118118
constructor(number: number) {

0 commit comments

Comments
 (0)