diff --git a/.changes/agent-session b/.changes/agent-session new file mode 100644 index 000000000..dcadce57b --- /dev/null +++ b/.changes/agent-session @@ -0,0 +1 @@ +minor type="added" "Agent and Session APIs for creating agent-based apps" diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a537ee678..f4932fa18 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -61,11 +61,11 @@ jobs: # https://github.com/actions/runner-images/blob/main/images/macos/macos-26-arm64-Readme.md - os: macos-26 xcode: latest - platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.0" + platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.1" symbol-graph: true - os: macos-26 xcode: latest - platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.0" + platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.1" extension-api-only: true - os: macos-26 xcode: latest @@ -84,10 +84,10 @@ jobs: platform: "macOS,variant=Mac Catalyst" - os: macos-26 xcode: latest - platform: "visionOS Simulator,name=Apple Vision Pro,OS=26.0" + platform: "visionOS Simulator,name=Apple Vision Pro,OS=26.1" - os: macos-26 xcode: latest - platform: "tvOS Simulator,name=Apple TV,OS=26.0" + platform: "tvOS Simulator,name=Apple TV,OS=26.1" runs-on: ${{ matrix.os }} timeout-minutes: 60 diff --git a/Sources/LiveKit/Agent/Agent.swift b/Sources/LiveKit/Agent/Agent.swift new file mode 100644 index 000000000..27a9d2166 --- /dev/null +++ b/Sources/LiveKit/Agent/Agent.swift @@ -0,0 +1,151 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// Represents a LiveKit Agent. +/// +/// The ``Agent`` struct represents the state of a LiveKit agent within a ``Session``. +/// It provides information about the agent's connection status, its current state +/// (e.g., listening, thinking, speaking), and its media tracks. +/// +/// The ``Agent``'s properties are updated automatically by the ``Session`` as the agent's +/// state changes. This allows the application to react to the agent's +/// behavior, such as displaying its avatar video or indicating when it is speaking. +/// The ``agentState`` property is particularly useful for building UIs that reflect +/// the agent's current activity. +/// +/// - SeeAlso: [LiveKit SwiftUI Agent Starter](https://github.com/livekit-examples/agent-starter-swift). +/// - SeeAlso: [LiveKit Agents documentation](https://docs.livekit.io/agents/). +public struct Agent: Loggable { + // MARK: - Error + + public enum Error: LocalizedError { + case timeout + + public var errorDescription: String? { + switch self { + case .timeout: + "Agent did not connect" + } + } + } + + // MARK: - State + + private enum State { + case disconnected + case connecting(buffering: Bool) + case connected(agentState: AgentState, audioTrack: (any AudioTrack)?, avatarVideoTrack: (any VideoTrack)?) + case failed(error: Error) + } + + private var state: State = .disconnected + + // MARK: - Transitions + + mutating func disconnected() { + log("Agent disconnected from \(state)") + // From any state + state = .disconnected + } + + mutating func failed(error: Error) { + log("Agent failed with error \(error) from \(state)") + // From any state + state = .failed(error: error) + } + + mutating func connecting(buffering: Bool) { + log("Agent connecting from \(state)") + switch state { + case .disconnected, .connecting: + state = .connecting(buffering: buffering) + default: + log("Invalid transition from \(state) to connecting", .warning) + } + } + + mutating func connected(participant: Participant) { + log("Agent connected to \(participant) from \(state)") + switch state { + case .connecting, .connected: + state = .connected(agentState: participant.agentState, + audioTrack: participant.agentAudioTrack, + avatarVideoTrack: participant.avatarVideoTrack) + default: + log("Invalid transition from \(state) to connected", .warning) + } + } + + // MARK: - Public + + /// A boolean value indicating whether the agent is connected. + public var isConnected: Bool { + switch state { + case .connected: true + default: false + } + } + + /// The current conversational state of the agent. + public var agentState: AgentState? { + switch state { + case let .connected(agentState, _, _): agentState + default: nil + } + } + + /// The agent's audio track. + public var audioTrack: (any AudioTrack)? { + switch state { + case let .connected(_, audioTrack, _): audioTrack + default: nil + } + } + + /// The agent's avatar video track. + public var avatarVideoTrack: (any VideoTrack)? { + switch state { + case let .connected(_, _, avatarVideoTrack): avatarVideoTrack + default: nil + } + } + + /// The last error that occurred. + public var error: Error? { + switch state { + case let .failed(error): error + default: nil + } + } +} + +private extension Participant { + var agentAudioTrack: (any AudioTrack)? { + audioTracks.first(where: { $0.source == .microphone })?.track as? AudioTrack + } + + var avatarVideoTrack: (any VideoTrack)? { + avatarWorker?.firstCameraVideoTrack + } +} + +extension AgentState: CustomStringConvertible { + public var description: String { + rawValue.capitalized + } +} diff --git a/Sources/LiveKit/Agent/Chat/Message.swift b/Sources/LiveKit/Agent/Chat/Message.swift new file mode 100644 index 000000000..529728c1c --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Message.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// A message received from the agent. +public struct ReceivedMessage: Identifiable, Equatable, Codable, Sendable { + public let id: String + public let timestamp: Date + public let content: Content + + public enum Content: Equatable, Codable, Sendable { + case agentTranscript(String) + case userTranscript(String) + case userInput(String) + } +} + +/// A message sent to the agent. +public struct SentMessage: Identifiable, Equatable, Codable, Sendable { + public let id: String + public let timestamp: Date + public let content: Content + + public enum Content: Equatable, Codable, Sendable { + case userInput(String) + } +} diff --git a/Sources/LiveKit/Agent/Chat/Receive/MessageReceiver.swift b/Sources/LiveKit/Agent/Chat/Receive/MessageReceiver.swift new file mode 100644 index 000000000..20b7edec4 --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Receive/MessageReceiver.swift @@ -0,0 +1,25 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// A protocol that defines a message receiver. +/// +/// A message receiver is responsible for creating a stream of messages from the agent. +/// It is used to receive messages from the agent and update the message feed. +public protocol MessageReceiver: Sendable { + func messages() async throws -> AsyncStream +} diff --git a/Sources/LiveKit/Agent/Chat/Receive/TranscriptionDelegateReceiver.swift b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionDelegateReceiver.swift new file mode 100644 index 000000000..c9b24bb1d --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionDelegateReceiver.swift @@ -0,0 +1,69 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An actor that receives transcription messages from the room and yields them as messages. +/// +/// Room delegate methods are called multiple times for each message, with a stable message ID +/// that can be direcly used for diffing. +/// +/// Example: +/// ``` +/// { id: "1", content: "Hello" } +/// { id: "1", content: "Hello world!" } +/// ``` +@available(*, deprecated, message: "Use TranscriptionStreamReceiver compatible with livekit-agents 1.0") +actor TranscriptionDelegateReceiver: MessageReceiver, RoomDelegate { + private let room: Room + private var continuation: AsyncStream.Continuation? + + init(room: Room) { + self.room = room + room.add(delegate: self) + } + + deinit { + continuation?.finish() + room.remove(delegate: self) + } + + /// Creates a new message stream for the transcription delegate receiver. + func messages() -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self) + self.continuation = continuation + return stream + } + + nonisolated func room(_: Room, participant: Participant, trackPublication _: TrackPublication, didReceiveTranscriptionSegments segments: [TranscriptionSegment]) { + segments + .filter { !$0.text.isEmpty } + .forEach { segment in + let message = ReceivedMessage( + id: segment.id, + timestamp: segment.lastReceivedTime, + content: participant.isAgent ? .agentTranscript(segment.text) : .userTranscript(segment.text) + ) + Task { + await yield(message) + } + } + } + + private func yield(_ message: ReceivedMessage) { + continuation?.yield(message) + } +} diff --git a/Sources/LiveKit/Agent/Chat/Receive/TranscriptionStreamReceiver.swift b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionStreamReceiver.swift new file mode 100644 index 000000000..bec7674cc --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionStreamReceiver.swift @@ -0,0 +1,174 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An actor that converts raw text streams from the LiveKit `Room` into `Message` objects. +/// - Note: Streams are supported by `livekit-agents` >= 1.0.0. +/// - SeeAlso: ``TranscriptionDelegateReceiver`` +/// +/// For agent messages, new text stream is emitted for each message, and the stream is closed when the message is finalized. +/// Each agent message is delivered in chunks, that are accumulated and published into the message stream. +/// +/// For user messages, the full transcription is sent each time, but may be updated until finalized. +/// +/// The ID of the segment is stable and unique across the lifetime of the message. +/// This ID can be used directly for `Identifiable` conformance. +/// +/// Example text stream for agent messages: +/// ``` +/// { segment_id: "1", content: "Hello" } +/// { segment_id: "1", content: " world" } +/// { segment_id: "1", content: "!" } +/// { segment_id: "2", content: "Hello" } +/// { segment_id: "2", content: " Apple" } +/// { segment_id: "2", content: "!" } +/// ``` +/// +/// Example text stream for user messages: +/// ``` +/// { segment_id: "3", content: "Hello" } +/// { segment_id: "3", content: "Hello world!" } +/// { segment_id: "4", content: "Hello" } +/// { segment_id: "4", content: "Hello Apple!" } +/// ``` +/// +/// Example output: +/// ``` +/// Message(id: "1", timestamp: 2025-01-01 12:00:00 +0000, content: .agentTranscript("Hello world!")) +/// Message(id: "2", timestamp: 2025-01-01 12:00:10 +0000, content: .agentTranscript("Hello Apple!")) +/// Message(id: "3", timestamp: 2025-01-01 12:00:20 +0000, content: .userTranscript("Hello world!")) +/// Message(id: "4", timestamp: 2025-01-01 12:00:30 +0000, content: .userTranscript("Hello Apple!")) +/// ``` +/// +actor TranscriptionStreamReceiver: MessageReceiver, Loggable { + private struct PartialMessageID: Hashable { + let segmentID: String + let participantID: Participant.Identity + } + + private struct PartialMessage { + var content: String + let timestamp: Date + var streamID: String + + mutating func appendContent(_ newContent: String) { + content += newContent + } + + mutating func replaceContent(_ newContent: String, streamID: String) { + content = newContent + self.streamID = streamID + } + } + + private let room: Room + private let topic: String + + private lazy var partialMessages: [PartialMessageID: PartialMessage] = [:] + + init(room: Room, topic: String = "lk.transcription") { + self.room = room + self.topic = topic + } + + /// Creates a new message stream for the chat topic. + func messages() async throws -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self) + + let topic = topic + + try await room.registerTextStreamHandler(for: topic) { [weak self] reader, participantIdentity in + for try await message in reader where !message.isEmpty { + guard let self else { return } + await continuation.yield(processIncoming(partialMessage: message, reader: reader, participantIdentity: participantIdentity)) + } + } + + continuation.onTermination = { _ in + Task { [weak self] in + await self?.room.unregisterTextStreamHandler(for: topic) + } + } + + return stream + } + + /// Aggregates the incoming text into a message, storing the partial content in the `partialMessages` dictionary. + /// - Note: When the message is finalized, or a new message is started, the dictionary is purged to limit memory usage. + private func processIncoming(partialMessage message: String, reader: TextStreamReader, participantIdentity: Participant.Identity) -> ReceivedMessage { + let attributes = reader.info.attributes.mapped(to: TranscriptionAttributes.self) + if attributes == nil { + log("Unable to read message attributes from \(reader.info.attributes)", .error) + } + + let segmentID = attributes?.lkSegmentID ?? reader.info.id + let participantID = participantIdentity + let partialID = PartialMessageID(segmentID: segmentID, participantID: participantID) + + let currentStreamID = reader.info.id + + let timestamp: Date + let updatedContent: String + + if var existingMessage = partialMessages[partialID] { + // Update existing message + if existingMessage.streamID == currentStreamID { + // Same stream, append content + existingMessage.appendContent(message) + } else { + // Different stream for same segment, replace content + existingMessage.replaceContent(message, streamID: currentStreamID) + } + updatedContent = existingMessage.content + timestamp = existingMessage.timestamp + partialMessages[partialID] = existingMessage + } else { + // This is a new message + updatedContent = message + timestamp = reader.info.timestamp + partialMessages[partialID] = PartialMessage( + content: updatedContent, + timestamp: timestamp, + streamID: currentStreamID + ) + cleanupPreviousTurn(participantIdentity, exceptSegmentID: segmentID) + } + + let isFinal = attributes?.lkTranscriptionFinal ?? false + if isFinal { + partialMessages[partialID] = nil + } + + let newOrUpdatedMessage = ReceivedMessage( + id: segmentID, + timestamp: timestamp, + content: participantIdentity == room.localParticipant.identity ? .userTranscript(updatedContent) : .agentTranscript(updatedContent) + ) + + return newOrUpdatedMessage + } + + private func cleanupPreviousTurn(_ participantID: Participant.Identity, exceptSegmentID: String) { + let keysToRemove = partialMessages.keys.filter { + $0.participantID == participantID && $0.segmentID != exceptSegmentID + } + + for key in keysToRemove { + partialMessages[key] = nil + } + } +} diff --git a/Sources/LiveKit/Agent/AgentState+.swift b/Sources/LiveKit/Agent/Chat/Send/MessageSender.swift similarity index 65% rename from Sources/LiveKit/Agent/AgentState+.swift rename to Sources/LiveKit/Agent/Chat/Send/MessageSender.swift index 9bb45b096..a9bdc86fb 100644 --- a/Sources/LiveKit/Agent/AgentState+.swift +++ b/Sources/LiveKit/Agent/Chat/Send/MessageSender.swift @@ -14,8 +14,12 @@ * limitations under the License. */ -extension AgentState: CustomStringConvertible { - public var description: String { - rawValue.capitalized - } +import Foundation + +/// A protocol that defines a message sender. +/// +/// A message sender is responsible for sending messages to the agent. +/// It is used to send messages to the agent and update the message feed. +public protocol MessageSender: Sendable { + func send(_ message: SentMessage) async throws } diff --git a/Sources/LiveKit/Agent/Chat/Send/TextMessageSender.swift b/Sources/LiveKit/Agent/Chat/Send/TextMessageSender.swift new file mode 100644 index 000000000..d1c7e6a86 --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Send/TextMessageSender.swift @@ -0,0 +1,59 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An actor that sends local messages to the agent. +/// Currently, it only supports sending text messages. +/// +/// It also serves as the loopback for the local messages, +/// so that they can be displayed in the message feed +/// without relying on the agent-side transcription. +actor TextMessageSender: MessageSender, MessageReceiver { + private let room: Room + private let topic: String + + private var messageContinuation: AsyncStream.Continuation? + + init(room: Room, topic: String = "lk.chat") { + self.room = room + self.topic = topic + } + + deinit { + messageContinuation?.finish() + } + + func send(_ message: SentMessage) async throws { + guard case let .userInput(text) = message.content else { return } + + try await room.localParticipant.sendText(text, for: topic) + + let loopbackMessage = ReceivedMessage( + id: message.id, + timestamp: message.timestamp, + content: .userInput(text) + ) + + messageContinuation?.yield(loopbackMessage) + } + + func messages() async throws -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() + messageContinuation = continuation + return stream + } +} diff --git a/Sources/LiveKit/Agent/Session.swift b/Sources/LiveKit/Agent/Session.swift new file mode 100644 index 000000000..a5403879f --- /dev/null +++ b/Sources/LiveKit/Agent/Session.swift @@ -0,0 +1,324 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Combine +import Foundation +import OrderedCollections + +/// A ``Session`` represents a connection to a LiveKit Room that can contain an ``Agent``. +/// +/// ``Session`` is the main entry point for interacting with a LiveKit agent. It encapsulates +/// the connection to a LiveKit ``Room``, manages the agent's lifecycle, and handles +/// communication between the user and the agent. +/// +/// ``Session`` is created with a token source and optional configuration. The ``start()`` +/// method establishes the connection, and the ``end()`` method terminates it. The session's +/// state, including connection status and any errors, is published for observation, +/// making it suitable for use in SwiftUI applications. +/// +/// Communication with the agent is handled through messages. The ``send(text:)`` method +/// sends a user message, and the ``messages`` property provides an ordered history of the +/// conversation. The session can be configured with custom message senders and receivers +/// to support different communication channels, such as text messages or transcription streams. +/// +/// - SeeAlso: [LiveKit SwiftUI Agent Starter](https://github.com/livekit-examples/agent-starter-swift). +/// - SeeAlso: [LiveKit Agents documentation](https://docs.livekit.io/agents/). +@MainActor +open class Session: ObservableObject { + private static let agentNameAttribute = "lk.agent_name" + + // MARK: - Error + + public enum Error: LocalizedError { + case connection(Swift.Error) + case sender(Swift.Error) + case receiver(Swift.Error) + + public var errorDescription: String? { + switch self { + case let .connection(error): + "Connection failed: \(error.localizedDescription)" + case let .sender(error): + "Message sender failed: \(error.localizedDescription)" + case let .receiver(error): + "Message receiver failed: \(error.localizedDescription)" + } + } + } + + // MARK: - Published + + /// The last error that occurred. + @Published public private(set) var error: Error? + + /// The current connection state of the session. + @Published private var connectionState: ConnectionState = .disconnected + /// A boolean value indicating whether the session is connected. + public var isConnected: Bool { + switch connectionState { + case .connecting, .connected, .reconnecting: // pre-connect is connecting + true + default: + false + } + } + + /// The ``Agent`` associated with this session. + @Published public private(set) var agent = Agent() + + @Published private var messagesDict: OrderedDictionary = [:] + /// The ordered list of received messages. + public var messages: [ReceivedMessage] { messagesDict.values.elements } + + // MARK: - Dependencies + + /// The underlying ``Room`` object for the session. + public let room: Room + + private enum TokenSourceConfiguration { + case fixed(any TokenSourceFixed) + case configurable(any TokenSourceConfigurable, TokenRequestOptions) + + func fetch() async throws -> TokenSourceResponse { + switch self { + case let .fixed(source): + try await source.fetch() + case let .configurable(source, options): + try await source.fetch(options) + } + } + } + + private let tokenSourceConfiguration: TokenSourceConfiguration + private var options: SessionOptions + + private let senders: [any MessageSender] + private let receivers: [any MessageReceiver] + + // MARK: - Internal state + + private var waitForAgentTask: Task? + + // MARK: - Init + + private init(tokenSourceConfiguration: TokenSourceConfiguration, + options: SessionOptions, + senders: [any MessageSender]?, + receivers: [any MessageReceiver]?) + { + self.tokenSourceConfiguration = tokenSourceConfiguration + self.options = options + room = options.room + + let textMessageSender = TextMessageSender(room: room) + let resolvedSenders = senders ?? [textMessageSender] + let resolvedReceivers = receivers ?? [textMessageSender, TranscriptionStreamReceiver(room: room)] + + self.senders = resolvedSenders + self.receivers = resolvedReceivers + + observe(room: room) + observe(receivers: resolvedReceivers) + } + + /// Initializes a new ``Session`` with a fixed token source. + /// - Parameters: + /// - tokenSource: A token source that provides a fixed token. + /// - options: The session options. + /// - senders: An array of message senders. + /// - receivers: An array of message receivers. + public convenience init(tokenSource: any TokenSourceFixed, + options: SessionOptions = .init(), + senders: [any MessageSender]? = nil, + receivers: [any MessageReceiver]? = nil) + { + self.init(tokenSourceConfiguration: .fixed(tokenSource), + options: options, + senders: senders, + receivers: receivers) + } + + /// Initializes a new ``Session`` with a configurable token source. + /// - Parameters: + /// - tokenSource: A token source that can generate tokens with specific options. + /// - tokenOptions: The options for generating the token. + /// - options: The session options. + /// - senders: An array of message senders. + /// - receivers: An array of message receivers. + public convenience init(tokenSource: any TokenSourceConfigurable, + tokenOptions: TokenRequestOptions = .init(), + options: SessionOptions = .init(), + senders: [any MessageSender]? = nil, + receivers: [any MessageReceiver]? = nil) + { + self.init(tokenSourceConfiguration: .configurable(tokenSource, tokenOptions), + options: options, + senders: senders, + receivers: receivers) + } + + /// Creates a new ``Session`` configured for a specific agent. + /// - Parameters: + /// - agentName: The name of the agent to dispatch. + /// - agentMetadata: Metadata passed to the agent. + /// - tokenSource: A configurable token source. + /// - options: The session options. + /// - senders: An array of message senders. + /// - receivers: An array of message receivers. + /// - Returns: A new ``Session`` instance. + public static func withAgent(_ agentName: String, + agentMetadata: String? = nil, + tokenSource: any TokenSourceConfigurable, + options: SessionOptions = .init(), + senders: [any MessageSender]? = nil, + receivers: [any MessageReceiver]? = nil) -> Session + { + Session(tokenSource: tokenSource, + tokenOptions: .init(agentName: agentName, agentMetadata: agentMetadata), + options: options, + senders: senders, + receivers: receivers) + } + + deinit { + waitForAgentTask?.cancel() + } + + private func observe(room: Room) { + Task { [weak self] in + for try await _ in room.changes { + guard let self else { return } + updateAgent(in: room) + } + } + } + + private func updateAgent(in room: Room) { + connectionState = room.connectionState + + if connectionState == .disconnected { + agent.disconnected() + } else if let firstAgent = room.agentParticipants.values.first { + agent.connected(participant: firstAgent) + } else { + agent.connecting(buffering: options.preConnectAudio) + } + } + + private func observe(receivers: [any MessageReceiver]) { + for receiver in receivers { + Task { [weak self] in + do { + for await message in try await receiver.messages() { + guard let self else { return } + messagesDict.updateValue(message, forKey: message.id) + } + } catch { + self?.error = .receiver(error) + } + } + } + } + + // MARK: - Lifecycle + + /// Starts the session. + public func start() async { + guard connectionState == .disconnected else { return } + + error = nil + waitForAgentTask?.cancel() + + let timeout = options.agentConnectTimeout + + let connect = { @Sendable in + let response = try await self.tokenSourceConfiguration.fetch() + try await self.room.connect(url: response.serverURL.absoluteString, + token: response.participantToken) + } + + do { + if options.preConnectAudio { + try await room.withPreConnectAudio(timeout: timeout) { + await MainActor.run { + self.connectionState = .connecting + self.agent.connecting(buffering: true) + } + try await connect() + } + } else { + connectionState = .connecting + agent.connecting(buffering: false) + try await connect() + try await room.localParticipant.setMicrophone(enabled: true) + } + + waitForAgentTask = Task { [weak self] in + try await Task.sleep(nanoseconds: UInt64(timeout * Double(NSEC_PER_SEC))) + try Task.checkCancellation() + guard let self else { return } + if isConnected, !agent.isConnected { + agent.failed(error: .timeout) + } + } + } catch { + self.error = .connection(error) + connectionState = .disconnected + agent.disconnected() + } + } + + /// Terminates the session. + public func end() async { + await room.disconnect() + } + + /// Resets the last error. + public func dismissError() { + error = nil + } + + // MARK: - Messages + + /// Sends a text message. + /// - Parameter text: The text to send. + /// - Returns: The ``SentMessage`` that was sent, or `nil` if the message failed to send. + @discardableResult + public func send(text: String) async -> SentMessage? { + let message = SentMessage(id: UUID().uuidString, timestamp: Date(), content: .userInput(text)) + do { + for sender in senders { + try await sender.send(message) + } + return message + } catch { + self.error = .sender(error) + return nil + } + } + + /// Gets the message history. + /// - Returns: An array of ``ReceivedMessage``. + public func getMessageHistory() -> [ReceivedMessage] { + messages + } + + /// Restores the message history. + /// - Parameter messages: An array of ``ReceivedMessage`` to restore. + public func restoreMessageHistory(_ messages: [ReceivedMessage]) { + messagesDict = .init(uniqueKeysWithValues: messages.sorted(by: { $0.timestamp < $1.timestamp }).map { ($0.id, $0) }) + } +} diff --git a/Sources/LiveKit/Agent/SessionOptions.swift b/Sources/LiveKit/Agent/SessionOptions.swift new file mode 100644 index 000000000..96bddf01c --- /dev/null +++ b/Sources/LiveKit/Agent/SessionOptions.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// Options for creating a ``Session``. +public struct SessionOptions: Sendable { + /// The undelying ``Room`` object for the session. + public var room: Room + /// Whether to enable audio pre-connect with ``PreConnectAudioBuffer``. + /// If enabled, the microphone will be enabled before connecting to the room. + /// Use ``LocalMedia`` or ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` + /// to request microphone permissions early in the app lifecycle. + public var preConnectAudio: Bool + /// The timeout for the agent to connect, in seconds. + /// If exceeded, the ``Agent`` will transition to a failed state. + public var agentConnectTimeout: TimeInterval + + public init( + room: Room = .init(), + preConnectAudio: Bool = true, + agentConnectTimeout: TimeInterval = 20 + ) { + self.room = room + self.preConnectAudio = preConnectAudio + self.agentConnectTimeout = agentConnectTimeout + } +} diff --git a/Sources/LiveKit/Support/ObservableObject+.swift b/Sources/LiveKit/Support/ObservableObject+.swift new file mode 100644 index 000000000..8be4f1558 --- /dev/null +++ b/Sources/LiveKit/Support/ObservableObject+.swift @@ -0,0 +1,31 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@preconcurrency import Combine + +extension ObservableObject { + /// An async stream that emits the `objectWillChange` events. + var changes: AsyncStream { + AsyncStream { continuation in + let cancellable = objectWillChange.sink { _ in + continuation.yield() + } + continuation.onTermination = { _ in + cancellable.cancel() + } + } + } +} diff --git a/Sources/LiveKit/SwiftUI/LocalMedia.swift b/Sources/LiveKit/SwiftUI/LocalMedia.swift new file mode 100644 index 000000000..e637001f9 --- /dev/null +++ b/Sources/LiveKit/SwiftUI/LocalMedia.swift @@ -0,0 +1,221 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@preconcurrency import AVFoundation +import Combine +import Foundation + +/// An ``ObservableObject`` that can be used to control the local participant's media devices. +/// +/// This class provides a convenient way to manage local media tracks, including enabling/disabling +/// microphone and camera, and selecting audio and video devices. It is designed to be used +/// in SwiftUI views. +@MainActor +open class LocalMedia: ObservableObject { + // MARK: - Error + + public enum Error: LocalizedError { + case mediaDevice(Swift.Error) + + public var errorDescription: String? { + switch self { + case let .mediaDevice(error): + "Media device error: \(error.localizedDescription)" + } + } + } + + // MARK: - Devices + + /// The last error that occurred. + @Published public private(set) var error: Error? + + /// The local microphone track. + @Published public private(set) var microphoneTrack: (any AudioTrack)? + /// The local camera track. + @Published public private(set) var cameraTrack: (any VideoTrack)? + /// The local screen share track. + @Published public private(set) var screenShareTrack: (any VideoTrack)? + + /// A boolean value indicating whether the microphone is enabled. + @Published public private(set) var isMicrophoneEnabled: Bool = false + /// A boolean value indicating whether the camera is enabled. + @Published public private(set) var isCameraEnabled: Bool = false + /// A boolean value indicating whether screen sharing is enabled. + @Published public private(set) var isScreenShareEnabled: Bool = false + + /// The available audio input devices. + @Published public private(set) var audioDevices: [AudioDevice] = AudioManager.shared.inputDevices + /// The ID of the selected audio input device. + @Published public private(set) var selectedAudioDeviceID: String = AudioManager.shared.inputDevice.deviceId + + /// The available video capture devices. + @Published public private(set) var videoDevices: [AVCaptureDevice] = [] + /// The ID of the selected video capture device. + @Published public private(set) var selectedVideoDeviceID: String? + + /// A boolean value indicating whether the camera position can be switched. + @Published public private(set) var canSwitchCamera = false + + // MARK: - Dependencies + + private var localParticipant: LocalParticipant + + // MARK: - Initialization + + /// Initializes a new ``LocalMedia`` object. + /// - Parameter localParticipant: The ``LocalParticipant`` to control. + public init(localParticipant: LocalParticipant) { + self.localParticipant = localParticipant + + observe(localParticipant) + observeDevices() + } + + /// Initializes a new ``LocalMedia`` object. + /// - Parameter room: The ``Room`` to control. + public convenience init(room: Room) { + self.init(localParticipant: room.localParticipant) + } + + /// Initializes a new ``LocalMedia`` object. + /// - Parameter session: The ``Session`` to control. + public convenience init(session: Session) { + self.init(room: session.room) + } + + private func observe(_ localParticipant: LocalParticipant) { + Task { [weak self] in + for try await _ in localParticipant.changes { + guard let self else { return } + + microphoneTrack = localParticipant.firstAudioTrack + cameraTrack = localParticipant.firstCameraVideoTrack + screenShareTrack = localParticipant.firstScreenShareVideoTrack + + isMicrophoneEnabled = localParticipant.isMicrophoneEnabled() + isCameraEnabled = localParticipant.isCameraEnabled() + isScreenShareEnabled = localParticipant.isScreenShareEnabled() + } + } + } + + private func observeDevices() { + try? AudioManager.shared.set(microphoneMuteMode: .inputMixer) // don't play mute sound effect + Task { + try await AudioManager.shared.setRecordingAlwaysPreparedMode(true) + } + + AudioManager.shared.onDeviceUpdate = { _ in + Task { @MainActor [weak self] in + self?.audioDevices = AudioManager.shared.inputDevices + self?.selectedAudioDeviceID = AudioManager.shared.defaultInputDevice.deviceId + } + } + + Task { + canSwitchCamera = try await CameraCapturer.canSwitchPosition() + videoDevices = try await CameraCapturer.captureDevices() + selectedVideoDeviceID = videoDevices.first?.uniqueID + } + } + + deinit { + AudioManager.shared.onDeviceUpdate = nil + } + + /// Resets the last error. + public func dismissError() { + error = nil + } + + // MARK: - Toggle + + /// Toggles the microphone on or off. + public func toggleMicrophone() async { + do { + try await localParticipant.setMicrophone(enabled: !isMicrophoneEnabled) + } catch { + self.error = .mediaDevice(error) + } + } + + /// Toggles the camera on or off. + /// - Parameter disableScreenShare: If `true`, screen sharing will be disabled when the camera is enabled. + public func toggleCamera(disableScreenShare: Bool = false) async { + let enable = !isCameraEnabled + do { + if enable, disableScreenShare, isScreenShareEnabled { + try await localParticipant.setScreenShare(enabled: false) + } + + let device = try await CameraCapturer.captureDevices().first(where: { $0.uniqueID == selectedVideoDeviceID }) + try await localParticipant.setCamera(enabled: enable, captureOptions: CameraCaptureOptions(device: device)) + } catch { + self.error = .mediaDevice(error) + } + } + + /// Toggles screen sharing on or off. + /// - Parameter disableCamera: If `true`, the camera will be disabled when screen sharing is enabled. + public func toggleScreenShare(disableCamera: Bool = false) async { + let enable = !isScreenShareEnabled + do { + if enable, disableCamera, isCameraEnabled { + try await localParticipant.setCamera(enabled: false) + } + try await localParticipant.setScreenShare(enabled: enable) + } catch { + self.error = .mediaDevice(error) + } + } + + // MARK: - Select + + /// Selects an audio input device. + /// - Parameter audioDevice: The ``AudioDevice`` to select. + public func select(audioDevice: AudioDevice) { + selectedAudioDeviceID = audioDevice.deviceId + + let device = AudioManager.shared.inputDevices.first(where: { $0.deviceId == selectedAudioDeviceID }) ?? AudioManager.shared.defaultInputDevice + AudioManager.shared.inputDevice = device + } + + /// Selects a video capture device. + /// - Parameter videoDevice: The ``AVCaptureDevice`` to select. + public func select(videoDevice: AVCaptureDevice) async { + guard let cameraCapturer = getCameraCapturer() else { return } + do { + try await cameraCapturer.set(options: .init(device: videoDevice)) + selectedVideoDeviceID = videoDevice.uniqueID + } catch { + self.error = .mediaDevice(error) + } + } + + /// Switches the camera position. + public func switchCamera() async { + guard let cameraCapturer = getCameraCapturer() else { return } + _ = try? await cameraCapturer.switchCameraPosition() + } + + // MARK: - Private + + private func getCameraCapturer() -> CameraCapturer? { + guard let cameraTrack = localParticipant.firstCameraVideoTrack as? LocalVideoTrack else { return nil } + return cameraTrack.capturer as? CameraCapturer + } +} diff --git a/Sources/LiveKit/Track/VideoTrack.swift b/Sources/LiveKit/Track/VideoTrack.swift index 65ff33f33..6598567cf 100644 --- a/Sources/LiveKit/Track/VideoTrack.swift +++ b/Sources/LiveKit/Track/VideoTrack.swift @@ -71,3 +71,11 @@ extension VideoTrackProtocol where Self: Track { return missingCodecs } } + +public extension Track { + /// The aspect ratio of the video track or 1 if the dimensions are not available. + var aspectRatio: CGFloat { + guard let dimensions else { return 1 } + return CGFloat(dimensions.width) / CGFloat(dimensions.height) + } +} diff --git a/Sources/LiveKit/Types/Attributes/AttributeTypings.swift b/Sources/LiveKit/Types/Attributes/AttributeTypings.swift index 35dbc8f0b..9fc56e609 100644 --- a/Sources/LiveKit/Types/Attributes/AttributeTypings.swift +++ b/Sources/LiveKit/Types/Attributes/AttributeTypings.swift @@ -20,6 +20,35 @@ import Foundation extension AgentAttributes: Hashable {} extension AgentAttributes: Equatable {} +// Bool as String encoding +extension TranscriptionAttributes { + init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + lkSegmentID = try container.decodeIfPresent(String.self, forKey: .lkSegmentID) + lkTranscribedTrackID = try container.decodeIfPresent(String.self, forKey: .lkTranscribedTrackID) + + // Decode as Bool first, fallback to String + if let boolValue = try? container.decodeIfPresent(Bool.self, forKey: .lkTranscriptionFinal) { + lkTranscriptionFinal = boolValue + } else if let stringValue = try? container.decodeIfPresent(String.self, forKey: .lkTranscriptionFinal) { + lkTranscriptionFinal = (stringValue as NSString).boolValue + } else { + lkTranscriptionFinal = nil + } + } + + func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encodeIfPresent(lkSegmentID, forKey: .lkSegmentID) + try container.encodeIfPresent(lkTranscribedTrackID, forKey: .lkTranscribedTrackID) + + // Always encode Bool as a string if it exists + if let boolValue = lkTranscriptionFinal { + try container.encode(boolValue ? "true" : "false", forKey: .lkTranscriptionFinal) + } + } +} + // MARK: - AgentAttributes struct AgentAttributes: Codable, Sendable { diff --git a/Tests/LiveKitCoreTests/Agent/TranscriptionTests.swift b/Tests/LiveKitCoreTests/Agent/TranscriptionTests.swift new file mode 100644 index 000000000..33713d152 --- /dev/null +++ b/Tests/LiveKitCoreTests/Agent/TranscriptionTests.swift @@ -0,0 +1,187 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import OrderedCollections +#if canImport(LiveKitTestSupport) +import LiveKitTestSupport +#endif + +actor MessageCollector { + private var updates: [ReceivedMessage] = [] + private var messages: OrderedDictionary = [:] + + func add(_ message: ReceivedMessage) { + updates.append(message) + messages[message.id] = message + } + + func getUpdates() -> [ReceivedMessage] { + updates + } + + func getMessages() -> OrderedDictionary { + messages + } +} + +class TranscriptionTests: LKTestCase, @unchecked Sendable { + private var rooms: [Room] = [] + private var receiver: TranscriptionStreamReceiver! + private var senderRoom: Room! + private var messageCollector: MessageCollector! + private var collectionTask: Task! + private var messageExpectation: XCTestExpectation! + + // Same segment, same stream + func testUpdates() async throws { + let segmentID = "test-segment" + let streamID = UUID().uuidString + let testChunks = ["Hey", " there!", " What's up?"] + let expectedContent = ["Hey", "Hey there!", "Hey there! What's up?"] + + try await runTranscriptionTest( + chunks: testChunks, + segmentID: segmentID, + streamID: streamID, + expectedContent: expectedContent + ) + } + + // Same segment, different stream + func testReplace() async throws { + let segmentID = "test-segment" + let testChunks = ["Hey", "Hey there!", "Hey there! What's up?"] + let expectedContent = ["Hey", "Hey there!", "Hey there! What's up?"] + + try await runTranscriptionTest( + chunks: testChunks, + segmentID: segmentID, + streamID: nil, + expectedContent: expectedContent + ) + } + + private func setupTestEnvironment(expectedCount: Int) async throws { + messageExpectation = expectation(description: "Receives all message updates") + messageExpectation.expectedFulfillmentCount = expectedCount + + receiver = TranscriptionStreamReceiver(room: rooms[0]) + let messageStream = try await receiver.messages() + messageCollector = MessageCollector() + senderRoom = rooms[1] + + collectionTask = Task { @Sendable in + var iterator = messageStream.makeAsyncIterator() + while let message = await iterator.next() { + await self.messageCollector.add(message) + self.messageExpectation.fulfill() + } + } + } + + private func sendTranscriptionChunks( + chunks: [String], + segmentID: String, + streamID: String? = nil, + to room: Room + ) async throws { + let topic = "lk.transcription" + + for (index, chunk) in chunks.enumerated() { + let isLast = index == chunks.count - 1 + + var attributes: [String: String] = [ + "lk.segment_id": segmentID, + "lk.transcription_final": "false", + ] + + if isLast { + attributes["lk.transcription_final"] = "true" + } + + let options = StreamTextOptions( + topic: topic, + attributes: attributes, + id: streamID ?? UUID().uuidString + ) + + try await room.localParticipant.sendText(chunk, options: options) + try await Task.sleep(nanoseconds: 10_000_000) + } + } + + private func validateTranscriptionResults( + updates: [ReceivedMessage], + messages: OrderedDictionary, + segmentID: String, + expectedContent: [String] + ) { + // Validate updates + XCTAssertEqual(updates.count, expectedContent.count) + for (index, expected) in expectedContent.enumerated() { + XCTAssertEqual(updates[index].content, .agentTranscript(expected)) + XCTAssertEqual(updates[index].id, segmentID) + } + + // Validate timestamps are consistent + let firstTimestamp = updates[0].timestamp + for update in updates { + XCTAssertEqual(update.timestamp, firstTimestamp) + } + + // Validate final message + XCTAssertEqual(messages.count, 1) + XCTAssertEqual(messages.keys[0], segmentID) + XCTAssertEqual(messages.values[0].content, .agentTranscript(expectedContent.last!)) + XCTAssertEqual(messages.values[0].id, segmentID) + XCTAssertEqual(messages.values[0].timestamp, firstTimestamp) + } + + private func runTranscriptionTest( + chunks: [String], + segmentID: String, + streamID: String? = nil, + expectedContent: [String] + ) async throws { + try await withRooms([ + RoomTestingOptions(canSubscribe: true), + RoomTestingOptions(canPublishData: true), + ]) { rooms in + self.rooms = rooms + try await self.setupTestEnvironment(expectedCount: expectedContent.count) + try await self.sendTranscriptionChunks( + chunks: chunks, + segmentID: segmentID, + streamID: streamID, + to: self.senderRoom + ) + + await self.fulfillment(of: [self.messageExpectation], timeout: 5) + self.collectionTask.cancel() + + let updates = await self.messageCollector.getUpdates() + let messages = await self.messageCollector.getMessages() + + self.validateTranscriptionResults( + updates: updates, + messages: messages, + segmentID: segmentID, + expectedContent: expectedContent + ) + } + } +}