@@ -54,7 +54,7 @@ import Foundation
5454/// Message(id: "4", timestamp: 2025-01-01 12:00:30 +0000, content: .userTranscript("Hello Apple!"))
5555/// ```
5656///
57- actor TranscriptionStreamReceiver : MessageReceiver {
57+ actor TranscriptionStreamReceiver : MessageReceiver , Loggable {
5858 private struct PartialMessageID : Hashable {
5959 let segmentID : String
6060 let participantID : Participant . Identity
@@ -75,25 +75,21 @@ actor TranscriptionStreamReceiver: MessageReceiver {
7575 }
7676 }
7777
78- private let transcriptionTopic = " lk.transcription "
79- private enum TranscriptionAttributes : String {
80- case final = " lk.transcription_final "
81- case segment = " lk.segment_id "
82- }
83-
8478 private let room : Room
79+ private let topic : String
8580
8681 private lazy var partialMessages : [ PartialMessageID : PartialMessage ] = [ : ]
8782
88- init ( room: Room ) {
83+ init ( room: Room , topic : String = " lk.transcription " ) {
8984 self . room = room
85+ self . topic = topic
9086 }
9187
9288 /// Creates a new message stream for the chat topic.
9389 func messages( ) async throws -> AsyncStream < ReceivedMessage > {
9490 let ( stream, continuation) = AsyncStream . makeStream ( of: ReceivedMessage . self)
9591
96- try await room. registerTextStreamHandler ( for: transcriptionTopic ) { [ weak self] reader, participantIdentity in
92+ try await room. registerTextStreamHandler ( for: topic ) { [ weak self] reader, participantIdentity in
9793 guard let self else { return }
9894 for try await message in reader where !message. isEmpty {
9995 await continuation. yield ( processIncoming ( partialMessage: message, reader: reader, participantIdentity: participantIdentity) )
@@ -103,7 +99,7 @@ actor TranscriptionStreamReceiver: MessageReceiver {
10399 continuation. onTermination = { [ weak self] _ in
104100 Task {
105101 guard let self else { return }
106- await self . room. unregisterTextStreamHandler ( for: self . transcriptionTopic )
102+ await self . room. unregisterTextStreamHandler ( for: self . topic )
107103 }
108104 }
109105
@@ -113,7 +109,12 @@ actor TranscriptionStreamReceiver: MessageReceiver {
113109 /// Aggregates the incoming text into a message, storing the partial content in the `partialMessages` dictionary.
114110 /// - Note: When the message is finalized, or a new message is started, the dictionary is purged to limit memory usage.
115111 private func processIncoming( partialMessage message: String , reader: TextStreamReader , participantIdentity: Participant . Identity ) -> ReceivedMessage {
116- let segmentID = reader. info. attributes [ TranscriptionAttributes . segment. rawValue] ?? reader. info. id
112+ let attributes = reader. info. attributes. mapped ( to: TranscriptionAttributes . self)
113+ if attributes == nil {
114+ log ( " Unable to read message attributes from \( reader. info. attributes) " , . error)
115+ }
116+
117+ let segmentID = attributes? . lkSegmentID ?? reader. info. id
117118 let participantID = participantIdentity
118119 let partialID = PartialMessageID ( segmentID: segmentID, participantID: participantID)
119120
@@ -146,7 +147,7 @@ actor TranscriptionStreamReceiver: MessageReceiver {
146147 cleanupPreviousTurn ( participantIdentity, exceptSegmentID: segmentID)
147148 }
148149
149- let isFinal = reader . info . attributes [ TranscriptionAttributes . final . rawValue ] == " true "
150+ let isFinal = attributes? . lkTranscriptionFinal ?? false
150151 if isFinal {
151152 partialMessages [ partialID] = nil
152153 }
0 commit comments