diff --git a/Package.swift b/Package.swift index 0a16837..f22459b 100644 --- a/Package.swift +++ b/Package.swift @@ -61,6 +61,7 @@ let enableAllTraitsExplicit = ProcessInfo.processInfo.environment["ENABLE_ALL_TR let enableAllTraits = spiGenerateDocs || previewDocs || enableAllTraitsExplicit let addDoccPlugin = previewDocs || spiGenerateDocs +let enableAllCIFlags = enableAllTraitsExplicit traits.insert( .default( @@ -77,6 +78,7 @@ let package = Package( traits: traits, dependencies: [ .package(url: "https://github.com/apple/swift-system", from: "1.5.0"), + .package(url: "https://github.com/apple/swift-collections", from: "1.3.0"), .package(url: "https://github.com/swift-server/swift-service-lifecycle", from: "2.7.0"), .package(url: "https://github.com/apple/swift-log", from: "1.6.3"), .package(url: "https://github.com/apple/swift-metrics", from: "2.7.0"), @@ -92,6 +94,10 @@ let package = Package( name: "SystemPackage", package: "swift-system" ), + .product( + name: "DequeModule", + package: "swift-collections" + ), .product( name: "Logging", package: "swift-log", @@ -179,8 +185,16 @@ for target in package.targets { // https://github.com/swiftlang/swift-evolution/blob/main/proposals/0409-access-level-on-imports.md settings.append(.enableUpcomingFeature("InternalImportsByDefault")) + // https://docs.swift.org/compiler/documentation/diagnostics/nonisolated-nonsending-by-default/ + settings.append(.enableUpcomingFeature("NonisolatedNonsendingByDefault")) + settings.append(.enableExperimentalFeature("AvailabilityMacro=Configuration 1.0:macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0")) + if enableAllCIFlags { + // Ensure all public types are explicitly annotated as Sendable or not Sendable. + settings.append(.unsafeFlags(["-Xfrontend", "-require-explicit-sendable"])) + } + target.swiftSettings = settings } diff --git a/Sources/Configuration/ConfigProviderHelpers.swift b/Sources/Configuration/ConfigProviderHelpers.swift index 3dbef54..95cb83c 100644 --- a/Sources/Configuration/ConfigProviderHelpers.swift +++ b/Sources/Configuration/ConfigProviderHelpers.swift @@ -42,13 +42,15 @@ extension ConfigProvider { /// - updatesHandler: The closure that processes the async sequence of value updates. /// - Returns: The value returned by the handler closure. /// - Throws: Provider-specific errors or errors thrown by the handler. - public func watchValueFromValue( - forKey key: AbsoluteConfigKey, - type: ConfigType, - updatesHandler: ( - ConfigUpdatesAsyncSequence, Never> + nonisolated(nonsending) + public func watchValueFromValue( + forKey key: AbsoluteConfigKey, + type: ConfigType, + updatesHandler: ( + ConfigUpdatesAsyncSequence, Never> + ) async throws -> Return ) async throws -> Return - ) async throws -> Return { + { let (stream, continuation) = AsyncStream> .makeStream(bufferingPolicy: .bufferingNewest(1)) let initialValue: Result @@ -83,9 +85,11 @@ extension ConfigProvider { /// - Parameter updatesHandler: The closure that processes the async sequence of snapshot updates. /// - Returns: The value returned by the handler closure. /// - Throws: Provider-specific errors or errors thrown by the handler. - public func watchSnapshotFromSnapshot( - updatesHandler: (ConfigUpdatesAsyncSequence) async throws -> Return - ) async throws -> Return { + nonisolated(nonsending) + public func watchSnapshotFromSnapshot( + updatesHandler: (ConfigUpdatesAsyncSequence) async throws -> Return + ) async throws -> Return + { let (stream, continuation) = AsyncStream .makeStream(bufferingPolicy: .bufferingNewest(1)) let initialValue = snapshot() diff --git a/Sources/Configuration/Documentation.docc/Reference/ConfigReader-Watch.md b/Sources/Configuration/Documentation.docc/Reference/ConfigReader-Watch.md index 686e2a7..6abb641 100644 --- a/Sources/Configuration/Documentation.docc/Reference/ConfigReader-Watch.md +++ b/Sources/Configuration/Documentation.docc/Reference/ConfigReader-Watch.md @@ -4,47 +4,47 @@ ### Watching string values - ``ConfigReader/watchString(forKey:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchString(forKey:as:isSecret:fileID:line:updatesHandler:)-4q1c0`` -- ``ConfigReader/watchString(forKey:as:isSecret:fileID:line:updatesHandler:)-7lki4`` +- ``ConfigReader/watchString(forKey:as:isSecret:fileID:line:updatesHandler:)-7mxw1`` +- ``ConfigReader/watchString(forKey:as:isSecret:fileID:line:updatesHandler:)-818sy`` - ``ConfigReader/watchString(forKey:isSecret:default:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchString(forKey:as:isSecret:default:fileID:line:updatesHandler:)-4x6zt`` -- ``ConfigReader/watchString(forKey:as:isSecret:default:fileID:line:updatesHandler:)-1ncw1`` +- ``ConfigReader/watchString(forKey:as:isSecret:default:fileID:line:updatesHandler:)-6m0yu`` +- ``ConfigReader/watchString(forKey:as:isSecret:default:fileID:line:updatesHandler:)-6dpc3`` - ``ConfigReader/watchString(forKey:context:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-1vua5`` -- ``ConfigReader/watchString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-1s8wu`` +- ``ConfigReader/watchString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-34wbx`` +- ``ConfigReader/watchString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-549xr`` - ``ConfigReader/watchString(forKey:context:isSecret:default:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchString(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-3ppdh`` -- ``ConfigReader/watchString(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-80t2z`` +- ``ConfigReader/watchString(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-9u7vf`` +- ``ConfigReader/watchString(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-1ofiv`` ### Watching required string values - ``ConfigReader/watchRequiredString(forKey:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchRequiredString(forKey:as:isSecret:fileID:line:updatesHandler:)-29xb0`` -- ``ConfigReader/watchRequiredString(forKey:as:isSecret:fileID:line:updatesHandler:)-3dox3`` +- ``ConfigReader/watchRequiredString(forKey:as:isSecret:fileID:line:updatesHandler:)-86ot1`` +- ``ConfigReader/watchRequiredString(forKey:as:isSecret:fileID:line:updatesHandler:)-3lrs7`` - ``ConfigReader/watchRequiredString(forKey:context:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchRequiredString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-6v7w5`` -- ``ConfigReader/watchRequiredString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-76kbb`` +- ``ConfigReader/watchRequiredString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-77978`` +- ``ConfigReader/watchRequiredString(forKey:context:as:isSecret:fileID:line:updatesHandler:)-138o2`` ### Watching lists of string values - ``ConfigReader/watchStringArray(forKey:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-5igvu`` -- ``ConfigReader/watchStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-38ruy`` +- ``ConfigReader/watchStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-8t4nb`` +- ``ConfigReader/watchStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-9cmju`` - ``ConfigReader/watchStringArray(forKey:isSecret:default:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchStringArray(forKey:as:isSecret:default:fileID:line:updatesHandler:)-7oi5b`` -- ``ConfigReader/watchStringArray(forKey:as:isSecret:default:fileID:line:updatesHandler:)-4rhx2`` +- ``ConfigReader/watchStringArray(forKey:as:isSecret:default:fileID:line:updatesHandler:)-59de`` +- ``ConfigReader/watchStringArray(forKey:as:isSecret:default:fileID:line:updatesHandler:)-8nsil`` - ``ConfigReader/watchStringArray(forKey:context:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-6gaip`` -- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-5dyyx`` +- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-5occx`` +- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-30hf0`` - ``ConfigReader/watchStringArray(forKey:context:isSecret:default:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-7tbs9`` -- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-5yo2r`` +- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-4txm0`` +- ``ConfigReader/watchStringArray(forKey:context:as:isSecret:default:fileID:line:updatesHandler:)-3eipe`` ### Watching required lists of string values - ``ConfigReader/watchRequiredStringArray(forKey:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchRequiredStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-1t82o`` -- ``ConfigReader/watchRequiredStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-7lk1k`` +- ``ConfigReader/watchRequiredStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-3whiy`` +- ``ConfigReader/watchRequiredStringArray(forKey:as:isSecret:fileID:line:updatesHandler:)-4zyyq`` - ``ConfigReader/watchRequiredStringArray(forKey:context:isSecret:fileID:line:updatesHandler:)`` -- ``ConfigReader/watchRequiredStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-5zo1e`` -- ``ConfigReader/watchRequiredStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-6kvcj`` +- ``ConfigReader/watchRequiredStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-97r4l`` +- ``ConfigReader/watchRequiredStringArray(forKey:context:as:isSecret:fileID:line:updatesHandler:)-4jcy3`` ### Watching Boolean values - ``ConfigReader/watchBool(forKey:isSecret:fileID:line:updatesHandler:)`` diff --git a/Sources/Configuration/MultiProvider.swift b/Sources/Configuration/MultiProvider.swift index 4770473..23bcd0c 100644 --- a/Sources/Configuration/MultiProvider.swift +++ b/Sources/Configuration/MultiProvider.swift @@ -195,28 +195,33 @@ extension MultiProvider { /// - Parameter body: A closure that receives an async sequence of ``MultiSnapshot`` updates. /// - Returns: The value returned by the body closure. /// - Throws: Any error thrown by the nested providers or the body closure. - func watchSnapshot( - _ body: (ConfigUpdatesAsyncSequence) async throws -> Return - ) async throws -> Return { + nonisolated(nonsending) + func watchSnapshot( + _ body: (ConfigUpdatesAsyncSequence) async throws -> Return + ) async throws -> Return + { let providers = storage.providers - let sources: - [@Sendable ( - (ConfigUpdatesAsyncSequence) async throws -> Void - ) async throws -> Void] = providers.map { $0.watchSnapshot } - return try await combineLatestOneOrMore( - elementType: (any ConfigSnapshotProtocol).self, - sources: sources, - updatesHandler: { updateArrays in - try await body( - ConfigUpdatesAsyncSequence( - updateArrays - .map { array in - MultiSnapshot(snapshots: array) - } - ) + typealias UpdatesSequence = any (AsyncSequence & Sendable) + var updateSequences: [UpdatesSequence] = [] + updateSequences.reserveCapacity(providers.count) + return try await withProvidersWatchingSnapshot( + providers: ArraySlice(providers), + updateSequences: &updateSequences, + ) { providerUpdateSequences in + let updateArrays = combineLatestMany( + elementType: (any ConfigSnapshotProtocol).self, + failureType: Never.self, + providerUpdateSequences + ) + return try await body( + ConfigUpdatesAsyncSequence( + updateArrays + .map { array in + MultiSnapshot(snapshots: array) + } ) - } - ) + ) + } } /// Asynchronously resolves a configuration value from nested providers. @@ -281,52 +286,97 @@ extension MultiProvider { /// - updatesHandler: A closure that receives an async sequence of combined updates from all providers. /// - Throws: Any error thrown by the nested providers or the handler closure. /// - Returns: The value returned by the handler. - func watchValue( - forKey key: AbsoluteConfigKey, - type: ConfigType, - updatesHandler: ( - ConfigUpdatesAsyncSequence<([AccessEvent.ProviderResult], Result), Never> + nonisolated(nonsending) + func watchValue( + forKey key: AbsoluteConfigKey, + type: ConfigType, + updatesHandler: ( + ConfigUpdatesAsyncSequence<([AccessEvent.ProviderResult], Result), Never> + ) async throws -> Return ) async throws -> Return - ) async throws -> Return { + { let providers = storage.providers let providerNames = providers.map(\.providerName) - let sources: - [@Sendable ( - ( - ConfigUpdatesAsyncSequence, Never> - ) async throws -> Void - ) async throws -> Void] = providers.map { provider in - { handler in - _ = try await provider.watchValue(forKey: key, type: type, updatesHandler: handler) - } - } - return try await combineLatestOneOrMore( - elementType: Result.self, - sources: sources, - updatesHandler: { updateArrays in - try await updatesHandler( - ConfigUpdatesAsyncSequence( - updateArrays - .map { array in - var results: [AccessEvent.ProviderResult] = [] - for (providerIndex, lookupResult) in array.enumerated() { - let providerName = providerNames[providerIndex] - results.append(.init(providerName: providerName, result: lookupResult)) - switch lookupResult { - case .success(let value) where value.value == nil: - // Got a success + nil from a nested provider, keep iterating. - continue - default: - // Got a success + non-nil or an error from a nested provider, propagate that up. - return (results, lookupResult.map { $0.value }) - } + typealias UpdatesSequence = any (AsyncSequence, Never> & Sendable) + var updateSequences: [UpdatesSequence] = [] + updateSequences.reserveCapacity(providers.count) + return try await withProvidersWatchingValue( + providers: ArraySlice(providers), + updateSequences: &updateSequences, + key: key, + configType: type, + ) { providerUpdateSequences in + let updateArrays = combineLatestMany( + elementType: Result.self, + failureType: Never.self, + providerUpdateSequences + ) + return try await updatesHandler( + ConfigUpdatesAsyncSequence( + updateArrays + .map { array in + var results: [AccessEvent.ProviderResult] = [] + for (providerIndex, lookupResult) in array.enumerated() { + let providerName = providerNames[providerIndex] + results.append(.init(providerName: providerName, result: lookupResult)) + switch lookupResult { + case .success(let value) where value.value == nil: + // Got a success + nil from a nested provider, keep iterating. + continue + default: + // Got a success + non-nil or an error from a nested provider, propagate that up. + return (results, lookupResult.map { $0.value }) } - // If all nested results were success + nil, return the same. - return (results, .success(nil)) } - ) + // If all nested results were success + nil, return the same. + return (results, .success(nil)) + } ) - } + ) + } + } +} + +@available(Configuration 1.0, *) +nonisolated(nonsending) private func withProvidersWatchingValue( + providers: ArraySlice, + updateSequences: inout [any (AsyncSequence, Never> & Sendable)], + key: AbsoluteConfigKey, + configType: ConfigType, + body: ([any (AsyncSequence, Never> & Sendable)]) async throws -> ReturnInner +) async throws -> ReturnInner { + guard let provider = providers.first else { + // Recursion termination, once we've collected all update sequences, execute the body. + return try await body(updateSequences) + } + return try await provider.watchValue(forKey: key, type: configType) { updates in + updateSequences.append(updates) + return try await withProvidersWatchingValue( + providers: providers.dropFirst(), + updateSequences: &updateSequences, + key: key, + configType: configType, + body: body + ) + } +} + +@available(Configuration 1.0, *) +nonisolated(nonsending) private func withProvidersWatchingSnapshot( + providers: ArraySlice, + updateSequences: inout [any (AsyncSequence & Sendable)], + body: ([any (AsyncSequence & Sendable)]) async throws -> ReturnInner +) async throws -> ReturnInner { + guard let provider = providers.first else { + // Recursion termination, once we've collected all update sequences, execute the body. + return try await body(updateSequences) + } + return try await provider.watchSnapshot { updates in + updateSequences.append(updates) + return try await withProvidersWatchingSnapshot( + providers: providers.dropFirst(), + updateSequences: &updateSequences, + body: body ) } } diff --git a/Sources/Configuration/Utilities/AsyncAlgos/AsyncCombineLatestManySequence.swift b/Sources/Configuration/Utilities/AsyncAlgos/AsyncCombineLatestManySequence.swift new file mode 100644 index 0000000..8ee9185 --- /dev/null +++ b/Sources/Configuration/Utilities/AsyncAlgos/AsyncCombineLatestManySequence.swift @@ -0,0 +1,102 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftConfiguration open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftConfiguration project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftConfiguration project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +// Vendored copy of https://github.com/apple/swift-async-algorithms/pull/360 + +/// Creates an asynchronous sequence that combines the latest values from many async sequences. +/// +/// ``combineLatestMany(_:)`` only emits a value whenever any of the base `AsyncSequence`s +/// emit a value (so long as each of the bases have emitted at least one value). +/// +/// Finishes: +/// ``combineLatestMany(_:)`` finishes when one of the bases finishes before emitting any value or +/// when all bases finished. +/// +/// Throws: +/// ``combineLatestMany(_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed +/// values will be dropped. +@available(Configuration 1.0, *) +internal func combineLatestMany( + elementType: Element.Type = Element.self, + failureType: Failure.Type = Failure.self, + _ bases: [any (AsyncSequence & Sendable)] +) -> some AsyncSequence<[Element], Failure> & Sendable { + AsyncCombineLatestManySequence(bases) +} + +/// An `AsyncSequence` that combines the latest values produced from many asynchronous sequences into an asynchronous sequence of tuples. +@available(Configuration 1.0, *) +internal struct AsyncCombineLatestManySequence: AsyncSequence, Sendable { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public typealias AsyncIterator = Iterator + + typealias Base = AsyncSequence & Sendable + let bases: [any Base] + + init(_ bases: [any Base]) { + self.bases = bases + } + + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public func makeAsyncIterator() -> AsyncIterator { + Iterator( + storage: .init(self.bases) + ) + } + + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public struct Iterator: AsyncIteratorProtocol { + final class InternalClass { + private let storage: CombineLatestManyStorage + + fileprivate init(storage: CombineLatestManyStorage) { + self.storage = storage + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async throws(Failure) -> [Element]? { + guard let element = try await self.storage.next() else { + return nil + } + return element + } + } + + let internalClass: InternalClass + + fileprivate init(storage: CombineLatestManyStorage) { + self.internalClass = InternalClass(storage: storage) + } + + public mutating func next() async throws(Failure) -> [Element]? { + try await self.internalClass.next() + } + } +} + +@available(*, unavailable) +extension AsyncCombineLatestManySequence.Iterator: Sendable {} diff --git a/Sources/Configuration/Utilities/AsyncAlgos/CombineLatestManyStateMachine.swift b/Sources/Configuration/Utilities/AsyncAlgos/CombineLatestManyStateMachine.swift new file mode 100644 index 0000000..beeec05 --- /dev/null +++ b/Sources/Configuration/Utilities/AsyncAlgos/CombineLatestManyStateMachine.swift @@ -0,0 +1,611 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftConfiguration open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftConfiguration project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftConfiguration project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +// Vendored copy of https://github.com/apple/swift-async-algorithms/pull/360 + +import DequeModule + +/// State machine for combine latest. +@available(Configuration 1.0, *) +struct CombineLatestManyStateMachine: Sendable { + typealias DownstreamContinuation = UnsafeContinuation< + Result<[Element]?, Failure>, Never + > + typealias Base = AsyncSequence & Sendable + + private enum State: Sendable { + /// Small wrapper for the state of an upstream sequence. + struct Upstream: Sendable { + /// The upstream continuation. + var continuation: UnsafeContinuation? + /// The produced upstream element. + var element: Element? + /// Indicates wether the upstream finished/threw already. + var isFinished: Bool + } + + /// The initial state before a call to `next` happened. + case initial([any Base]) + + /// The state while we are waiting for downstream demand. + case waitingForDemand( + task: Task, + upstreams: ([Upstream]), + buffer: Deque<[Element]> + ) + + /// The state while we are consuming the upstream and waiting until we get a result from all upstreams. + case combining( + task: Task, + upstreams: ([Upstream]), + downstreamContinuation: DownstreamContinuation, + buffer: Deque<[Element]> + ) + + case upstreamsFinished( + buffer: Deque<[Element]> + ) + + case upstreamThrew( + error: Failure + ) + + /// The state once the downstream consumer stopped, i.e. by dropping all references + /// or by getting their `Task` cancelled. + case finished + + /// Internal state to avoid CoW. + case modifying + } + + private var state: State + + private let numberOfUpstreamSequences: Int + + /// Initializes a new `StateMachine`. + init(bases: [any Base]) { + self.state = .initial(bases) + self.numberOfUpstreamSequences = bases.count + } + + /// Actions returned by `iteratorDeinitialized()`. + enum IteratorDeinitializedAction { + /// Indicates that the `Task` needs to be cancelled and + /// the upstream continuations need to be resumed with a `CancellationFailure`. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { + switch self.state { + case .initial: + // Nothing to do here. No demand was signalled until now + return .none + + case .combining: + // An iterator was deinitialized while we have a suspended continuation. + preconditionFailure( + "Internal inconsistency current state \(self.state) and received iteratorDeinitialized()" + ) + + case .waitingForDemand(let task, let upstreams, _): + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now and need to clean everything up. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map { $0.continuation } + .compactMap { $0 } + ) + + case .upstreamThrew, .upstreamsFinished: + // The iterator was dropped so we can transition to finished now. + self.state = .finished + + return .none + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + mutating func taskIsStarted( + task: Task, + downstreamContinuation: DownstreamContinuation + ) { + switch self.state { + case .initial: + // The user called `next` and we are starting the `Task` + // to consume the upstream sequences + self.state = .combining( + task: task, + upstreams: Array(repeating: .init(isFinished: false), count: self.numberOfUpstreamSequences), + downstreamContinuation: downstreamContinuation, + buffer: .init() + ) + + case .combining, .waitingForDemand, .upstreamThrew, .upstreamsFinished, .finished: + // We only allow a single task to be created so this must never happen. + preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()") + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `childTaskSuspended()`. + enum ChildTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. + case resumeContinuation( + upstreamContinuation: UnsafeContinuation + ) + } + + mutating func childTaskSuspended( + baseIndex: Int, + continuation: UnsafeContinuation + ) -> ChildTaskSuspendedAction? { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, let buffer): + self.state = .modifying + upstreams[baseIndex].continuation = continuation + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining: + // We are currently combining and need to resume any upstream until we transition to waitingForDemand + + return .resumeContinuation(upstreamContinuation: continuation) + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuation( + upstreamContinuation: continuation + ) + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `elementProduced()`. + enum ElementProducedAction { + /// Indicates that the downstream continuation should be resumed with the element. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<[Element]?, Failure> + ) + } + + mutating func elementProduced(value: Element, atBaseIndex baseIndex: Int) -> ElementProducedAction? { + switch self.state { + case .initial: + // Child tasks that are producing elements are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got an element in late. This can happen since we race the upstreams. + // We have to store the new tuple in our buffer and remember the upstream states. + + var upstreamValues = upstreams.compactMap { $0.element } + guard upstreamValues.count == self.numberOfUpstreamSequences else { + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + } + + self.state = .modifying + + upstreamValues[baseIndex] = value + buffer.append(upstreamValues) + upstreams[baseIndex].element = value + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + precondition( + buffer.isEmpty, + "Internal inconsistency current state \(self.state) and the buffer is not empty" + ) + self.state = .modifying + upstreams[baseIndex].element = value + + let nonNilElements = upstreams.compactMap(\.element) + if nonNilElements.count == self.numberOfUpstreamSequences { + // We got an element from each upstream so we can resume the downstream now + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .resumeContinuation( + downstreamContinuation: downstreamContinuation, + result: .success(nonNilElements) + ) + } else { + // We are still waiting for one of the upstreams to produce an element + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + + return .none + } + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks + // are still producing elements after we finished. + // We are just going to drop them since there is nothing we can do + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamFinished()`. + enum UpstreamFinishedAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamFinished(baseIndex: Int) -> UpstreamFinishedAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(let task, var upstreams, let buffer): + // One of the upstreams finished. + + self.state = .modifying + upstreams[baseIndex].isFinished = true + + if upstreams.allSatisfy(\.isFinished) { + // All upstreams finished we can transition to either finished or upstreamsFinished now + if buffer.isEmpty { + self.state = .finished + } else { + self.state = .upstreamsFinished(buffer: buffer) + } + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + } else { + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + return .none + } + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + // One of the upstreams finished. + + self.state = .modifying + + // We need to track if an empty upstream finished. + // If that happens we can transition to finish right away. + let emptyUpstreamFinished = upstreams[baseIndex].element == nil + upstreams[baseIndex].isFinished = true + + if emptyUpstreamFinished { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + } else if upstreams.allSatisfy(\.isFinished) { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + } else { + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + return .none + } + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamThrew()`. + enum UpstreamThrewAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the `error` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + error: Failure, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamThrew(_ error: Failure) -> UpstreamThrewAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .upstreamsFinished: + // We need to tolerate multiple upstreams failing + return .none + + case .waitingForDemand(let task, let upstreams, _): + // An upstream threw. We can cancel everything now and transition to finished. + // We just need to store the error for the next downstream demand + self.state = .upstreamThrew( + error: error + ) + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // One of our upstreams threw. We need to transition to finished ourselves now + // and resume the downstream continuation with the error. Furthermore, we need to cancel all of + // the upstream work. + self.state = .finished + + return .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `cancelled()`. + enum CancelledAction { + /// Indicates that the downstream continuation needs to be resumed and + /// task and the upstream continuations should be cancelled. + case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func cancelled() -> CancelledAction? { + switch self.state { + case .initial: + state = .finished + + return .none + + case .waitingForDemand(let task, let upstreams, _): + // The downstream task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // The downstream Task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .upstreamsFinished: + // We can transition to finished now + self.state = .finished + + return .none + + case .upstreamThrew, .finished: + // We are already finished so nothing to do here: + + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `next()`. + enum NextAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask([any Base]) + /// Indicates that all upstream continuations should be resumed. + case resumeUpstreamContinuations( + upstreamContinuation: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the result. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<[Element]?, Failure> + ) + /// Indicates that the downstream continuation should be resumed with `nil`. + case resumeDownstreamContinuationWithNil(DownstreamContinuation) + } + + mutating func next(for continuation: DownstreamContinuation) -> NextAction { + switch self.state { + case .initial(let bases): + // This is the first time we get demand singalled so we have to start the task + // The transition to the next state is done in the taskStarted method + return .startTask(bases) + + case .combining: + // We already got demand signalled and have suspended the downstream task + // Getting a second next calls means the iterator was transferred across Tasks which is not allowed + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got demand signalled now we have to check if there is anything buffered. + // If not we have to transition to combining and need to resume all upstream continuations now + self.state = .modifying + + guard let element = buffer.popFirst() else { + let upstreamContinuations = upstreams.map(\.continuation).compactMap { $0 } + for index in 0..: Sendable { + typealias StateMachine = CombineLatestManyStateMachine + + private let stateMachine: Mutex + + init(_ bases: [any StateMachine.Base]) { + self.stateMachine = .init(.init(bases: bases)) + } + + func iteratorDeinitialized() { + let action = self.stateMachine.withLock { $0.iteratorDeinitialized() } + + switch action { + case .cancelTaskAndUpstreamContinuations( + let task, + let upstreamContinuation + ): + task.cancel() + for item in upstreamContinuation { + item.resume() + } + + case .none: + break + } + } + + func next() async throws(Failure) -> [Element]? { + let result = await withTaskCancellationHandler { + await withUnsafeContinuation { continuation in + let action: StateMachine.NextAction? = self.stateMachine.withLock { stateMachine in + let action = stateMachine.next(for: continuation) + switch action { + case .startTask(let bases): + // first iteration, we start one child task per base to iterate over them + self.startTask( + stateMachine: &stateMachine, + bases: bases, + downstreamContinuation: continuation + ) + return nil + + case .resumeContinuation: + return action + + case .resumeUpstreamContinuations: + return action + + case .resumeDownstreamContinuationWithNil: + return action + } + } + + switch action { + case .startTask: + // We are handling the startTask in the lock already because we want to avoid + // other inputs interleaving while starting the task + fatalError("Internal inconsistency") + + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .resumeUpstreamContinuations(let upstreamContinuations): + // bases can be iterated over for 1 iteration so their next value can be retrieved + for item in upstreamContinuations { + item.resume() + } + + case .resumeDownstreamContinuationWithNil(let continuation): + // the async sequence is already finished, immediately resuming + continuation.resume(returning: .success(nil)) + + case .none: + break + } + } + } onCancel: { + let action = self.stateMachine.withLock { stateMachine in + stateMachine.cancelled() + } + + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + task.cancel() + for item in upstreamContinuations { + item.resume() + } + + downstreamContinuation.resume(returning: .success(nil)) + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + task.cancel() + for item in upstreamContinuations { + item.resume() + } + + case .none: + break + } + } + return try result.get() + } + + private func startTask( + stateMachine: inout StateMachine, + bases: [any (AsyncSequence & Sendable)], + downstreamContinuation: StateMachine.DownstreamContinuation + ) { + // This creates a new `Task` that is iterating the upstream + // sequences. We must store it to cancel it at the right times. + let task = Task { + await withTaskGroup(of: Result.self) { group in + // For each upstream sequence we are adding a child task that + // is consuming the upstream sequence + for (baseIndex, base) in bases.enumerated() { + group.addTask { + var baseIterator = base.makeAsyncIterator() + + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand. + await withUnsafeContinuation { continuation in + let action = self.stateMachine.withLock { stateMachine in + stateMachine.childTaskSuspended(baseIndex: baseIndex, continuation: continuation) + } + + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() + + case .none: + break + } + } + + let element: Element? + do { + element = try await baseIterator.next(isolation: nil) + } catch { + return .failure(error as! Failure) // Looks like a compiler bug + } + + if let element = element { + let action = self.stateMachine.withLock { stateMachine in + stateMachine.elementProduced(value: element, atBaseIndex: baseIndex) + } + + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .none: + break + } + } else { + let action = self.stateMachine.withLock { stateMachine in + stateMachine.upstreamFinished(baseIndex: baseIndex) + } + + switch action { + case .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + + task.cancel() + for item in upstreamContinuations { + item.resume() + } + + downstreamContinuation.resume(returning: .success(nil)) + break loop + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + task.cancel() + for item in upstreamContinuations { + item.resume() + } + + break loop + + case .none: + break loop + } + } + } + return .success(()) + } + } + + while !group.isEmpty { + let result = await group.next() + + switch result { + case .success, .none: + break + case .failure(let error): + // One of the upstream sequences threw an error + let action = self.stateMachine.withLock { stateMachine in + stateMachine.upstreamThrew(error) + } + + switch action { + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + task.cancel() + for item in upstreamContinuations { + item.resume() + } + case .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let error, + let task, + let upstreamContinuations + ): + task.cancel() + for item in upstreamContinuations { + item.resume() + } + downstreamContinuation.resume(returning: .failure(error)) + case .none: + break + } + + group.cancelAll() + } + } + } + } + + stateMachine.taskIsStarted(task: task, downstreamContinuation: downstreamContinuation) + } +} diff --git a/Sources/Configuration/Utilities/AsyncSequences.swift b/Sources/Configuration/Utilities/AsyncSequences.swift index 260a981..75b2b36 100644 --- a/Sources/Configuration/Utilities/AsyncSequences.swift +++ b/Sources/Configuration/Utilities/AsyncSequences.swift @@ -14,18 +14,18 @@ /// A concrete async sequence for delivering updated configuration values. @available(Configuration 1.0, *) -public struct ConfigUpdatesAsyncSequence { +public struct ConfigUpdatesAsyncSequence: Sendable { /// The upstream async sequence that this concrete sequence wraps. /// /// This property holds the async sequence that provides the actual elements. /// All operations on this concrete sequence are delegated to this upstream sequence. - private let upstream: any AsyncSequence + private let upstream: any AsyncSequence & Sendable /// Creates a new concrete async sequence wrapping the provided existential sequence. /// /// - Parameter upstream: The async sequence to wrap. - public init(_ upstream: some AsyncSequence) { + public init(_ upstream: some AsyncSequence & Sendable) { self.upstream = upstream } } @@ -60,7 +60,7 @@ extension ConfigUpdatesAsyncSequence: AsyncSequence { // MARK: - AsyncSequence extensions @available(Configuration 1.0, *) -extension AsyncSequence where Failure == Never { +extension AsyncSequence where Failure == Never, Self: Sendable { /// Maps each element of the sequence using a throwing transform, introducing a failure type. /// @@ -89,8 +89,8 @@ extension AsyncSequence where Failure == Never { /// } /// ``` func mapThrowing( - _ transform: @escaping (Element) throws(Failure) -> NewValue - ) -> some AsyncSequence { + _ transform: @escaping @Sendable (Element) throws(Failure) -> NewValue + ) -> some AsyncSequence & Sendable { MapThrowingAsyncSequence(upstream: self, transform: transform) } } @@ -110,13 +110,18 @@ extension AsyncSequence where Failure == Never { /// - `Value`: The input element type from the upstream sequence. /// - `Upstream`: The upstream async sequence type that never throws. @available(Configuration 1.0, *) -private struct MapThrowingAsyncSequence> { +private struct MapThrowingAsyncSequence< + Element, + Failure: Error, + Value, + Upstream: AsyncSequence & Sendable +>: Sendable { /// The upstream async sequence to transform. var upstream: Upstream /// The throwing transform function to apply to each element. - var transform: (Value) throws(Failure) -> Element + var transform: @Sendable (Value) throws(Failure) -> Element } @available(Configuration 1.0, *) diff --git a/Sources/Configuration/Utilities/combineLatestOneOrMore.swift b/Sources/Configuration/Utilities/combineLatestOneOrMore.swift deleted file mode 100644 index d4f6a00..0000000 --- a/Sources/Configuration/Utilities/combineLatestOneOrMore.swift +++ /dev/null @@ -1,130 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftConfiguration open source project -// -// Copyright (c) 2025 Apple Inc. and the SwiftConfiguration project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftConfiguration project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Synchronization - -/// A container that maintains the latest values from multiple async sequences. -/// -/// This class coordinates the "combine latest" operation by storing the most recent -/// value from each source sequence and emitting combined arrays only when all sources -/// have produced at least one value. -@available(Configuration 1.0, *) -private final class Combiner: Sendable { - - /// The internal state. - private struct State { - /// The current elements. - var elements: [Element?] - } - - /// The underlying mutex-protected storage. - private let storage: Mutex - - /// The continuation where to send values. - private let continuation: AsyncStream<[Element]>.Continuation - - /// The stream of combined arrays of elements. - let stream: AsyncStream<[Element]> - - /// Creates a new combiner for the specified number of async sequences. - /// - /// - Parameter count: The number of async sequences to combine. Must be at least 1. - /// - Precondition: `count >= 1` - init(count: Int) { - precondition(count >= 1, "Combiner requires the count of 1 or more") - self.storage = .init( - .init(elements: Array(repeating: nil, count: count)) - ) - (self.stream, self.continuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingNewest(1)) - } - - /// Updates the value from a specific async sequence and emits a combined array if ready. - /// - /// This method atomically updates the value at the specified index and checks if - /// all sequences have produced at least one value. If so, it emits the current - /// snapshot of all latest values. - /// - /// - Parameters: - /// - value: The new value from the async sequence. - /// - index: The index of the async sequence that produced this value. - func updateValue(_ value: Element, at index: Int) { - let valueToEmit: [Element]? = storage.withLock { state in - state.elements[index] = value - let nonNilValues = state.elements.compactMap { $0 } - if nonNilValues.count == state.elements.count { - // All values have been emitted at least once, and something changed. - // Emit the latest snapshot now. - return nonNilValues - } else { - // Not all upstreams have emitted a value yet, don't emit a snapshot yet. - return nil - } - } - if let valueToEmit { - continuation.yield(valueToEmit) - } - } -} - -/// Combines multiple async sequences using a "combine latest" strategy. -/// -/// This function takes multiple async sequences and combines their latest values into -/// arrays. It only emits a combined array after all sequences have produced at least -/// one value, and then emits a new array whenever any sequence produces a new value. -/// -/// ## Behavior -/// -/// - **Initial emission**: No values are emitted until all sequences have produced at least one value -/// - **Subsequent emissions**: A new combined array is emitted whenever any sequence produces a new value -/// - **Order preservation**: Values in the combined arrays maintain the same order as the input sequences -/// - **Latest values**: Only the most recent value from each sequence is included in each emission -/// -/// ## Concurrency -/// -/// All input sequences are processed concurrently using a task group. If any sequence -/// throws an error or completes unexpectedly, the entire operation is cancelled. -/// -/// - Parameters: -/// - elementType: The type of elements in the input async sequences. -/// - sources: An array of closures that each iterate over an async sequence. -/// - updatesHandler: A closure that processes the combined sequence of arrays. -/// - Throws: When any source throws, when the handler throws, or when cancelled. -/// - Returns: The value returned by the handler. -/// - Precondition: `sources` must not be empty. -@available(Configuration 1.0, *) -func combineLatestOneOrMore( - elementType: Element.Type = Element.self, - sources: [@Sendable ((ConfigUpdatesAsyncSequence) async throws -> Void) async throws -> Void], - updatesHandler: (ConfigUpdatesAsyncSequence<[Element], Never>) async throws -> Return -) async throws -> Return { - precondition(!sources.isEmpty, "combineLatestTwoOrMore requires at least one source") - let combiner = Combiner(count: sources.count) - return try await withThrowingTaskGroup(of: Void.self, returning: Return.self) { group in - for (index, source) in sources.enumerated() { - group.addTask { - try await source { updates in - for await element in updates { - combiner.updateValue(element, at: index) - } - } - // TODO: Is this the right error to throw when a source returns prematurely? - throw CancellationError() - } - } - defer { - group.cancelAll() - } - return try await updatesHandler(.init(combiner.stream)) - } -} diff --git a/Sources/ConfigurationTestingInternal/AsyncSequence+first.swift b/Sources/ConfigurationTestingInternal/AsyncSequence+first.swift index 8c9cbe8..a14e4a9 100644 --- a/Sources/ConfigurationTestingInternal/AsyncSequence+first.swift +++ b/Sources/ConfigurationTestingInternal/AsyncSequence+first.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// @available(Configuration 1.0, *) -extension AsyncSequence where Failure == Never { +extension AsyncSequence where Failure == Never, Self: Sendable { /// Returns the first element of the async sequence, or nil if the sequence completes before emitting an element. package var first: Element? { get async { @@ -23,7 +23,7 @@ extension AsyncSequence where Failure == Never { } @available(Configuration 1.0, *) -extension AsyncSequence { +extension AsyncSequence where Self: Sendable { /// Returns the first element of the async sequence, or nil if the sequence completes before emitting an element. package var first: Element? { get async throws { @@ -36,7 +36,7 @@ extension AsyncSequence { /// - Parameter updates: The async sequence to get the first element from. /// - Returns: The first element, or nil if empty. @available(Configuration 1.0, *) -package func awaitFirst(updates: any AsyncSequence) async -> Value? { +package func awaitFirst(updates: any AsyncSequence & Sendable) async -> Value? { await updates.first } @@ -45,6 +45,7 @@ package func awaitFirst(updates: any AsyncSequence(updates: any AsyncSequence) async throws -> Value? { +package func awaitFirst(updates: any AsyncSequence & Sendable) async throws -> Value? +{ try await updates.first } diff --git a/Sources/ConfigurationTestingInternal/TestFuture.swift b/Sources/ConfigurationTestingInternal/TestFuture.swift index f1e7dd0..55741c4 100644 --- a/Sources/ConfigurationTestingInternal/TestFuture.swift +++ b/Sources/ConfigurationTestingInternal/TestFuture.swift @@ -12,8 +12,7 @@ // //===----------------------------------------------------------------------===// -// Needs full Foundation, NSLock is not available in FoundationEssentials. -import Foundation +import Synchronization /// A future implementation for testing asynchronous operations. /// @@ -38,7 +37,7 @@ import Foundation /// let result = await future.value /// ``` @available(Configuration 1.0, *) -package final class TestFuture: @unchecked Sendable /* lock + locked_state */ { +package final class TestFuture: @unchecked Sendable /* mutex */ { /// The internal state of the future. private enum State { @@ -48,11 +47,8 @@ package final class TestFuture: @unchecked Sendable /* lock + locke case fulfilled(T) } - /// Synchronizes access to the internal state. - private let lock: NSLock - /// The current state of the future. - private var locked_state: State + private let state: Mutex /// Optional name for debugging and logging purposes. private let name: String? @@ -82,9 +78,7 @@ package final class TestFuture: @unchecked Sendable /* lock + locke self.verbose = verbose self.file = file self.line = line - self.lock = NSLock() - self.lock.name = "TestFuture.lock" - self.locked_state = .waitingForFulfillment([]) + self.state = .init(.waitingForFulfillment([])) } /// Fulfills the future with the provided value. @@ -100,19 +94,27 @@ package final class TestFuture: @unchecked Sendable /* lock + locke if verbose { print("Fulfilling \(name ?? "unnamed") at \(file):\(line) with \(value)") } - let continuations: [CheckedContinuation] - lock.lock() - switch locked_state { - case .fulfilled: - fatalError("Fulfilled \(name ?? "unnamed") at \(file):\(line) twice") - case .waitingForFulfillment(let _continuations): - locked_state = .fulfilled(value) - continuations = _continuations + let continuations: [CheckedContinuation] = state.withLock { state in + switch state { + case .fulfilled: + fatalError("Fulfilled \(name ?? "unnamed") at \(file):\(line) twice") + case .waitingForFulfillment(let continuations): + if verbose { + print("Found \(continuations.count) waiting continuations for \(name ?? "unnamed")") + } + state = .fulfilled(value) + return continuations + } + } + if verbose { + print("Resuming \(continuations.count) continuations for \(name ?? "unnamed")") } - lock.unlock() for continuation in continuations { continuation.resume(returning: value) } + if verbose { + print("All continuations resumed for \(name ?? "unnamed")") + } } /// A result of getting the value from the internal storage. @@ -137,21 +139,34 @@ package final class TestFuture: @unchecked Sendable /* lock + locke print("Getting value from \(name ?? "unnamed") at \(file):\(line)") } return await withCheckedContinuation { continuation in - let result: GetValueResult - lock.lock() - switch locked_state { - case .fulfilled(let value): - result = .returnValue(value) - case .waitingForFulfillment(var continuations): - continuations.append(continuation) - locked_state = .waitingForFulfillment(continuations) - result = .appendedContinuation + let result: GetValueResult = state.withLock { state in + switch state { + case .fulfilled(let value): + if verbose { + print("\(name ?? "unnamed") already fulfilled, returning immediately") + } + return .returnValue(value) + case .waitingForFulfillment(var continuations): + if verbose { + print( + "\(name ?? "unnamed") not fulfilled, adding continuation (total: \(continuations.count + 1))" + ) + } + continuations.append(continuation) + state = .waitingForFulfillment(continuations) + return .appendedContinuation + } } - lock.unlock() switch result { case .appendedContinuation: + if verbose { + print("\(name ?? "unnamed") continuation stored, waiting for fulfill") + } break case .returnValue(let value): + if verbose { + print("\(name ?? "unnamed") resuming continuation immediately with \(value)") + } continuation.resume(returning: value) } } diff --git a/Tests/ConfigurationTests/CombineLatestTests.swift b/Tests/ConfigurationTests/CombineLatestTests.swift new file mode 100644 index 0000000..5ffe327 --- /dev/null +++ b/Tests/ConfigurationTests/CombineLatestTests.swift @@ -0,0 +1,219 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftConfiguration open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftConfiguration project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftConfiguration project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +// swift-format-ignore-file + +import Testing +@testable import Configuration +import AsyncAlgorithms +import Synchronization +import ConfigurationTestingInternal + +struct TestCombineLatestMany { + @Test + @available(Configuration 1.0, *) + func combineLatest() async throws { + let a = [1, 2, 3].async + let b = [4, 5, 6].async + let c = [7, 8, 9].async + let sequence = combineLatestMany([a, b, c]) + let actual = await Array(sequence) + #expect(actual.count >= 3) + } + + @Test + @available(Configuration 1.0, *) + func ordering1() async { + var a = GatedSequence([1, 2, 3]) + var b = GatedSequence([4, 5, 6]) + var c = GatedSequence([7, 8, 9]) + + let completion = TestFuture() + let sequence = combineLatestMany([a, b, c]) + let validator = Validator<[Int]>() + validator.test(sequence) { iterator in + let pastEnd = await iterator.next(isolation: nil) + #expect(pastEnd == nil) + completion.fulfill(()) + } + var value = await validator.validate() + #expect(value == []) + a.advance() + value = validator.current + #expect(value == []) + b.advance() + value = validator.current + #expect(value == []) + c.advance() + + value = await validator.validate() + #expect(value == [[1, 4, 7]]) + a.advance() + + value = await validator.validate() + #expect(value == [[1, 4, 7], [2, 4, 7]]) + b.advance() + + value = await validator.validate() + #expect(value == [[1, 4, 7], [2, 4, 7], [2, 5, 7]]) + c.advance() + + value = await validator.validate() + #expect(value == [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8]]) + a.advance() + + value = await validator.validate() + #expect(value == [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8]]) + b.advance() + + value = await validator.validate() + #expect(value == [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8]]) + c.advance() + + value = await validator.validate() + #expect( + value == + [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8], [3, 6, 9]] + ) + + await completion.value + + value = validator.current + #expect( + value == + [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8], [3, 6, 9]] + ) + } +} + +@available(Configuration 1.0, *) +public final class Validator: Sendable { + private enum Ready { + case idle + case ready + case pending(UnsafeContinuation) + } + + private struct State: Sendable { + var collected: [Element] = [] + var failure: (any Error)? + var ready: Ready = .idle + } + + private struct Envelope: @unchecked Sendable { + var contents: Contents + } + + private let state = Mutex(State()) + + private func ready(_ apply: (inout State) -> Void) { + state.withLock { state -> UnsafeContinuation? in + apply(&state) + switch state.ready { + case .idle: + state.ready = .ready + return nil + case .pending(let continuation): + state.ready = .idle + return continuation + case .ready: + return nil + } + }? + .resume() + } + + internal func step() async { + await withUnsafeContinuation { (continuation: UnsafeContinuation) in + state.withLock { state -> UnsafeContinuation? in + switch state.ready { + case .ready: + state.ready = .idle + return continuation + case .idle: + state.ready = .pending(continuation) + return nil + case .pending: + fatalError() + } + }? + .resume() + } + } + + let onEvent: (@Sendable (Result) async -> Void)? + + init(onEvent: @Sendable @escaping (Result) async -> Void) { + + self.onEvent = onEvent + } + + public init() { + self.onEvent = nil + } + + public func test( + _ sequence: S, + onFinish: sending @escaping (inout S.AsyncIterator) async -> Void + ) where S.Element == Element { + let envelope = Envelope(contents: sequence) + Task { + var iterator = envelope.contents.makeAsyncIterator() + ready { _ in } + do { + while let item = try await iterator.next() { + await onEvent?(.success(item)) + ready { state in + state.collected.append(item) + } + } + await onEvent?(.success(nil)) + } catch { + await onEvent?(.failure(error)) + ready { state in + state.failure = error + } + } + ready { _ in } + await onFinish(&iterator) + } + } + + public func validate() async -> [Element] { + await step() + return current + } + + public var current: [Element] { + state.withLock { state in + state.collected + } + } + + public var failure: (any Error)? { + state.withLock { state in + state.failure + } + } +} diff --git a/Tests/ConfigurationTests/GatedSequence.swift b/Tests/ConfigurationTests/GatedSequence.swift new file mode 100644 index 0000000..a1331aa --- /dev/null +++ b/Tests/ConfigurationTests/GatedSequence.swift @@ -0,0 +1,136 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftConfiguration open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftConfiguration project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftConfiguration project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +// swift-format-ignore-file + +import Synchronization + +@available(Configuration 1.0, *) +public struct GatedSequence { + public typealias Failure = Never + let elements: [Element] + let gates: [Gate] + var index = 0 + + public mutating func advance() { + defer { index += 1 } + guard index < gates.count else { + return + } + gates[index].open() + } + + public init(_ elements: [Element]) { + self.elements = elements + self.gates = elements.map { _ in Gate() } + } +} + +@available(*, unavailable) +extension GatedSequence.Iterator: Sendable {} + +@available(Configuration 1.0, *) +extension GatedSequence: AsyncSequence { + public struct Iterator: AsyncIteratorProtocol { + var gatedElements: [(Element, Gate)] + + init(elements: [Element], gates: [Gate]) { + gatedElements = Array(zip(elements, gates)) + } + + public mutating func next() async -> Element? { + guard gatedElements.count > 0 else { + return nil + } + let (element, gate) = gatedElements.removeFirst() + await gate.enter() + return element + } + + public mutating func next(isolation actor: isolated (any Actor)?) async throws(Never) -> Element? { + guard gatedElements.count > 0 else { + return nil + } + let (element, gate) = gatedElements.removeFirst() + await gate.enter() + return element + } + } + + public func makeAsyncIterator() -> Iterator { + Iterator(elements: elements, gates: gates) + } +} + +@available(Configuration 1.0, *) +extension GatedSequence: Sendable where Element: Sendable {} + +@available(Configuration 1.0, *) +public final class Gate: Sendable { + enum State { + case closed + case open + case pending(UnsafeContinuation) + } + + let state = Mutex(State.closed) + + public func `open`() { + state.withLock { state -> UnsafeContinuation? in + switch state { + case .closed: + state = .open + return nil + case .open: + return nil + case .pending(let continuation): + state = .closed + return continuation + } + }? + .resume() + } + + public func enter() async { + var other: UnsafeContinuation? + await withUnsafeContinuation { (continuation: UnsafeContinuation) in + state.withLock { state -> UnsafeContinuation? in + switch state { + case .closed: + state = .pending(continuation) + return nil + case .open: + state = .closed + return continuation + case .pending(let existing): + other = existing + state = .pending(continuation) + return nil + } + }? + .resume() + } + other?.resume() + } +} diff --git a/Tests/ConfigurationTests/MultiProviderTests.swift b/Tests/ConfigurationTests/MultiProviderTests.swift index ae8ee69..ed42d21 100644 --- a/Tests/ConfigurationTests/MultiProviderTests.swift +++ b/Tests/ConfigurationTests/MultiProviderTests.swift @@ -174,6 +174,64 @@ struct MultiProviderTests { #expect(accessReporter.events.count == 3) } + + @available(Configuration 1.0, *) + @Test func watchingTwoUpstreams_handlerReturns() async throws { + let first = InMemoryProvider( + name: "first", + values: [ + "value": "First" + ] + ) + let second = InMemoryProvider( + name: "first", + values: [ + "value": "Second" + ] + ) + let accessReporter = TestAccessReporter() + let config = ConfigReader(providers: [first, second], accessReporter: accessReporter) + + try await config.watchString(forKey: "value", default: "default") { updates in + var iterator = updates.makeAsyncIterator() + let firstValue = try await iterator.next() + #expect(firstValue == "First") + // Return immediately + } + + #expect(accessReporter.events.count == 1) + } + + @available(Configuration 1.0, *) + @Test func watchingTwoUpstreams_handlerThrowsError() async throws { + let first = InMemoryProvider( + name: "first", + values: [ + "value": "First" + ] + ) + let second = InMemoryProvider( + name: "first", + values: [ + "value": "Second" + ] + ) + let accessReporter = TestAccessReporter() + let config = ConfigReader(providers: [first, second], accessReporter: accessReporter) + + struct HandlerError: Error {} + await #expect(throws: HandlerError.self) { + try await config.watchString(forKey: "value", default: "default") { updates in + var iterator = updates.makeAsyncIterator() + let firstValue = try await iterator.next() + #expect(firstValue == "First") + // Throws immediately + throw HandlerError() + } + } + + #expect(accessReporter.events.count == 1) + } } @available(Configuration 1.0, *)