Skip to content

Commit caa59f6

Browse files
authored
fix(core): don't unregister stream handler on disconnect (#1188)
1 parent c734b92 commit caa59f6

File tree

2 files changed

+61
-47
lines changed

2 files changed

+61
-47
lines changed

.changeset/selfish-dryers-count.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@livekit/components-core": patch
3+
---
4+
5+
fix(core): don't unregister stream handler on disconnect

packages/core/src/components/textStream.ts

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { RoomEvent, type Room } from 'livekit-client';
22
import type { TextStreamInfo } from 'livekit-client/dist/src/room/types';
33
import { from, scan, Subject, type Observable } from 'rxjs';
4-
import { share } from 'rxjs/operators';
4+
import { share, tap } from 'rxjs/operators';
55

66
export interface TextStreamData {
77
text: string;
@@ -55,61 +55,70 @@ export function setupTextStream(room: Room, topic: string): Observable<TextStrea
5555
}
5656

5757
const textStreamsSubject = new Subject<TextStreamData[]>();
58-
const textStreams: TextStreamData[] = [];
58+
let textStreams: TextStreamData[] = [];
5959

6060
const segmentAttribute = 'lk.segment_id';
6161

62-
room.registerTextStreamHandler(topic, async (reader, participantInfo) => {
63-
// Create an observable from the reader
64-
const streamObservable = from(reader).pipe(
65-
scan((acc: string, chunk: string) => {
66-
return acc + chunk;
67-
}, ''),
68-
);
69-
70-
const isTranscription = !!reader.info.attributes?.[segmentAttribute];
71-
72-
// Subscribe to the stream and update our array when new chunks arrive
73-
streamObservable.subscribe((accumulatedText) => {
74-
// Find and update the stream in our array
75-
const index = textStreams.findIndex(
76-
(stream) =>
77-
stream.streamInfo.id === reader.info.id ||
78-
(isTranscription &&
79-
stream.streamInfo.attributes?.[segmentAttribute] ===
80-
reader.info.attributes?.[segmentAttribute]),
81-
);
82-
if (index !== -1) {
83-
textStreams[index] = {
84-
...textStreams[index],
85-
text: accumulatedText,
86-
};
87-
88-
// Emit the updated array
89-
textStreamsSubject.next([...textStreams]);
90-
} else {
91-
// Handle case where stream ID wasn't found (new stream)
92-
textStreams.push({
93-
text: accumulatedText,
94-
participantInfo,
95-
streamInfo: reader.info,
62+
// Create shared observable and store in cache
63+
const sharedObservable = textStreamsSubject.pipe(
64+
tap({
65+
subscribe: () => {
66+
room.registerTextStreamHandler(topic, async (reader, participantInfo) => {
67+
// Create an observable from the reader
68+
const streamObservable = from(reader).pipe(
69+
scan((acc: string, chunk: string) => {
70+
return acc + chunk;
71+
}, ''),
72+
);
73+
74+
const isTranscription = !!reader.info.attributes?.[segmentAttribute];
75+
76+
// Subscribe to the stream and update our array when new chunks arrive
77+
streamObservable.subscribe((accumulatedText) => {
78+
// Find and update the stream in our array
79+
const index = textStreams.findIndex(
80+
(stream) =>
81+
stream.streamInfo.id === reader.info.id ||
82+
(isTranscription &&
83+
stream.streamInfo.attributes?.[segmentAttribute] ===
84+
reader.info.attributes?.[segmentAttribute]),
85+
);
86+
if (index !== -1) {
87+
textStreams[index] = {
88+
...textStreams[index],
89+
text: accumulatedText,
90+
};
91+
92+
// Emit the updated array
93+
textStreamsSubject.next([...textStreams]);
94+
} else {
95+
// Handle case where stream ID wasn't found (new stream)
96+
textStreams.push({
97+
text: accumulatedText,
98+
participantInfo,
99+
streamInfo: reader.info,
100+
});
101+
102+
// Emit the updated array with the new stream
103+
textStreamsSubject.next([...textStreams]);
104+
}
105+
});
96106
});
107+
},
108+
finalize: () => {
109+
room.unregisterTextStreamHandler(topic);
110+
},
111+
}),
112+
share(),
113+
);
97114

98-
// Emit the updated array with the new stream
99-
textStreamsSubject.next([...textStreams]);
100-
}
101-
});
102-
});
103-
104-
// Create shared observable and store in cache
105-
const sharedObservable = textStreamsSubject.asObservable().pipe(share());
106115
observableCache.set(cacheKey, sharedObservable);
107116

108117
// Add cleanup when room is disconnected
109-
room.once(RoomEvent.Disconnected, () => {
110-
room.unregisterTextStreamHandler(topic);
111-
textStreamsSubject.complete();
118+
room.on(RoomEvent.Disconnected, () => {
112119
getObservableCache().delete(cacheKey);
120+
textStreams = [];
121+
textStreamsSubject.next([]);
113122
});
114123

115124
return sharedObservable;

0 commit comments

Comments
 (0)