Skip to content

Commit a95e8c9

Browse files
committed
Move basic Agent files
1 parent de51dfa commit a95e8c9

11 files changed

+904
-0
lines changed

Sources/LiveKit/Agent/Agent.swift

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
import LiveKit
19+
20+
@MainActor
21+
open class Agent: ObservableObject {
22+
@Published public private(set) var state: AgentState = .idle
23+
24+
@Published public private(set) var audioTrack: (any AudioTrack)?
25+
@Published public private(set) var avatarVideoTrack: (any VideoTrack)?
26+
27+
public let participant: Participant
28+
29+
public init(participant: Participant) {
30+
self.participant = participant
31+
observe(participant)
32+
}
33+
34+
private func observe(_ participant: Participant) {
35+
Task { [weak self] in
36+
for await _ in participant.changes {
37+
guard let self else { return }
38+
39+
state = participant.agentState
40+
updateTracks(of: participant)
41+
}
42+
}
43+
}
44+
45+
private func updateTracks(of participant: Participant) {
46+
audioTrack = participant.audioTracks.first(where: { $0.source == .microphone })?.track as? AudioTrack
47+
avatarVideoTrack = participant.avatarWorker?.firstCameraVideoTrack
48+
}
49+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
19+
/// A message received from the agent.
20+
public struct ReceivedMessage: Identifiable, Equatable, Codable, Sendable {
21+
public let id: String
22+
public let timestamp: Date
23+
public let content: Content
24+
25+
public enum Content: Equatable, Codable, Sendable {
26+
case agentTranscript(String)
27+
case userTranscript(String)
28+
case userInput(String)
29+
}
30+
}
31+
32+
/// A message sent to the agent.
33+
public struct SentMessage: Identifiable, Equatable, Codable, Sendable {
34+
public let id: String
35+
public let timestamp: Date
36+
public let content: Content
37+
38+
public enum Content: Equatable, Codable, Sendable {
39+
case userInput(String)
40+
}
41+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
19+
/// A protocol that defines a message receiver.
20+
///
21+
/// A message receiver is responsible for creating a stream of messages from the agent.
22+
/// It is used to receive messages from the agent and update the message feed.
23+
///
24+
/// - SeeAlso: ``ReceivedMessage``
25+
public protocol MessageReceiver: Sendable {
26+
func messages() async throws -> AsyncStream<ReceivedMessage>
27+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
import LiveKit
19+
20+
/// An actor that receives transcription messages from the room and yields them as messages.
21+
///
22+
/// Room delegate methods are called multiple times for each message, with a stable message ID
23+
/// that can be direcly used for diffing.
24+
///
25+
/// Example:
26+
/// ```
27+
/// { id: "1", content: "Hello" }
28+
/// { id: "1", content: "Hello world!" }
29+
/// ```
30+
@available(*, deprecated, message: "Use TranscriptionStreamReceiver compatible with livekit-agents 1.0")
31+
actor TranscriptionDelegateReceiver: MessageReceiver, RoomDelegate {
32+
private let room: Room
33+
private var continuation: AsyncStream<ReceivedMessage>.Continuation?
34+
35+
init(room: Room) {
36+
self.room = room
37+
room.add(delegate: self)
38+
}
39+
40+
deinit {
41+
room.remove(delegate: self)
42+
}
43+
44+
/// Creates a new message stream for the transcription delegate receiver.
45+
func messages() -> AsyncStream<ReceivedMessage> {
46+
let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self)
47+
self.continuation = continuation
48+
return stream
49+
}
50+
51+
nonisolated func room(_: Room, participant: Participant, trackPublication _: TrackPublication, didReceiveTranscriptionSegments segments: [TranscriptionSegment]) {
52+
segments
53+
.filter { !$0.text.isEmpty }
54+
.forEach { segment in
55+
let message = ReceivedMessage(
56+
id: segment.id,
57+
timestamp: segment.lastReceivedTime,
58+
content: participant.isAgent ? .agentTranscript(segment.text) : .userTranscript(segment.text)
59+
)
60+
Task {
61+
await yield(message)
62+
}
63+
}
64+
}
65+
66+
private func yield(_ message: ReceivedMessage) {
67+
continuation?.yield(message)
68+
}
69+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
import LiveKit
19+
20+
/// An actor that converts raw text streams from the LiveKit `Room` into `Message` objects.
21+
/// - Note: Streams are supported by `livekit-agents` >= 1.0.0.
22+
/// - SeeAlso: ``TranscriptionDelegateReceiver``
23+
///
24+
/// For agent messages, new text stream is emitted for each message, and the stream is closed when the message is finalized.
25+
/// Each agent message is delivered in chunks, that are accumulated and published into the message stream.
26+
///
27+
/// For user messages, the full transcription is sent each time, but may be updated until finalized.
28+
///
29+
/// The ID of the segment is stable and unique across the lifetime of the message.
30+
/// This ID can be used directly for `Identifiable` conformance.
31+
///
32+
/// Example text stream for agent messages:
33+
/// ```
34+
/// { segment_id: "1", content: "Hello" }
35+
/// { segment_id: "1", content: " world" }
36+
/// { segment_id: "1", content: "!" }
37+
/// { segment_id: "2", content: "Hello" }
38+
/// { segment_id: "2", content: " Apple" }
39+
/// { segment_id: "2", content: "!" }
40+
/// ```
41+
///
42+
/// Example text stream for user messages:
43+
/// ```
44+
/// { segment_id: "3", content: "Hello" }
45+
/// { segment_id: "3", content: "Hello world!" }
46+
/// { segment_id: "4", content: "Hello" }
47+
/// { segment_id: "4", content: "Hello Apple!" }
48+
/// ```
49+
///
50+
/// Example output:
51+
/// ```
52+
/// Message(id: "1", timestamp: 2025-01-01 12:00:00 +0000, content: .agentTranscript("Hello world!"))
53+
/// Message(id: "2", timestamp: 2025-01-01 12:00:10 +0000, content: .agentTranscript("Hello Apple!"))
54+
/// Message(id: "3", timestamp: 2025-01-01 12:00:20 +0000, content: .userTranscript("Hello world!"))
55+
/// Message(id: "4", timestamp: 2025-01-01 12:00:30 +0000, content: .userTranscript("Hello Apple!"))
56+
/// ```
57+
///
58+
actor TranscriptionStreamReceiver: MessageReceiver {
59+
private struct PartialMessageID: Hashable {
60+
let segmentID: String
61+
let participantID: Participant.Identity
62+
}
63+
64+
private struct PartialMessage {
65+
var content: String
66+
let timestamp: Date
67+
var streamID: String
68+
69+
mutating func appendContent(_ newContent: String) {
70+
content += newContent
71+
}
72+
73+
mutating func replaceContent(_ newContent: String, streamID: String) {
74+
content = newContent
75+
self.streamID = streamID
76+
}
77+
}
78+
79+
private let transcriptionTopic = "lk.transcription"
80+
private enum TranscriptionAttributes: String {
81+
case final = "lk.transcription_final"
82+
case segment = "lk.segment_id"
83+
}
84+
85+
private let room: Room
86+
87+
private lazy var partialMessages: [PartialMessageID: PartialMessage] = [:]
88+
89+
init(room: Room) {
90+
self.room = room
91+
}
92+
93+
/// Creates a new message stream for the chat topic.
94+
func messages() async throws -> AsyncStream<ReceivedMessage> {
95+
let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self)
96+
97+
try await room.registerTextStreamHandler(for: transcriptionTopic) { [weak self] reader, participantIdentity in
98+
guard let self else { return }
99+
for try await message in reader where !message.isEmpty {
100+
await continuation.yield(processIncoming(partialMessage: message, reader: reader, participantIdentity: participantIdentity))
101+
}
102+
}
103+
104+
continuation.onTermination = { [weak self] _ in
105+
Task {
106+
guard let self else { return }
107+
await self.room.unregisterTextStreamHandler(for: self.transcriptionTopic)
108+
}
109+
}
110+
111+
return stream
112+
}
113+
114+
/// Aggregates the incoming text into a message, storing the partial content in the `partialMessages` dictionary.
115+
/// - Note: When the message is finalized, or a new message is started, the dictionary is purged to limit memory usage.
116+
private func processIncoming(partialMessage message: String, reader: TextStreamReader, participantIdentity: Participant.Identity) -> ReceivedMessage {
117+
let segmentID = reader.info.attributes[TranscriptionAttributes.segment.rawValue] ?? reader.info.id
118+
let participantID = participantIdentity
119+
let partialID = PartialMessageID(segmentID: segmentID, participantID: participantID)
120+
121+
let currentStreamID = reader.info.id
122+
123+
let timestamp: Date
124+
let updatedContent: String
125+
126+
if var existingMessage = partialMessages[partialID] {
127+
// Update existing message
128+
if existingMessage.streamID == currentStreamID {
129+
// Same stream, append content
130+
existingMessage.appendContent(message)
131+
} else {
132+
// Different stream for same segment, replace content
133+
existingMessage.replaceContent(message, streamID: currentStreamID)
134+
}
135+
updatedContent = existingMessage.content
136+
timestamp = existingMessage.timestamp
137+
partialMessages[partialID] = existingMessage
138+
} else {
139+
// This is a new message
140+
updatedContent = message
141+
timestamp = reader.info.timestamp
142+
partialMessages[partialID] = PartialMessage(
143+
content: updatedContent,
144+
timestamp: timestamp,
145+
streamID: currentStreamID
146+
)
147+
cleanupPreviousTurn(participantIdentity, exceptSegmentID: segmentID)
148+
}
149+
150+
let isFinal = reader.info.attributes[TranscriptionAttributes.final.rawValue] == "true"
151+
if isFinal {
152+
partialMessages[partialID] = nil
153+
}
154+
155+
let newOrUpdatedMessage = ReceivedMessage(
156+
id: segmentID,
157+
timestamp: timestamp,
158+
content: participantIdentity == room.localParticipant.identity ? .userTranscript(updatedContent) : .agentTranscript(updatedContent)
159+
)
160+
161+
return newOrUpdatedMessage
162+
}
163+
164+
private func cleanupPreviousTurn(_ participantID: Participant.Identity, exceptSegmentID: String) {
165+
let keysToRemove = partialMessages.keys.filter {
166+
$0.participantID == participantID && $0.segmentID != exceptSegmentID
167+
}
168+
169+
for key in keysToRemove {
170+
partialMessages[key] = nil
171+
}
172+
}
173+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
19+
/// A protocol that defines a message sender.
20+
///
21+
/// A message sender is responsible for sending messages to the agent.
22+
/// It is used to send messages to the agent and update the message feed.
23+
///
24+
/// - SeeAlso: ``SentMessage``
25+
public protocol MessageSender: Sendable {
26+
func send(_ message: SentMessage) async throws
27+
}

0 commit comments

Comments
 (0)