From 53c44d0fece8494a53638102e7e34a970f030c6d Mon Sep 17 00:00:00 2001 From: Honza Dvorsky Date: Wed, 13 Aug 2025 20:21:18 +0200 Subject: [PATCH 1/7] [WIP] Add combineLatestMany --- .../AsyncCombineLatestManySequence.swift | 86 +++ .../CombineLatestManyStateMachine.swift | 605 ++++++++++++++++++ .../CombineLatestManyStorage.swift | 229 +++++++ .../TestCombineLatestMany.swift | 106 +++ 4 files changed, 1026 insertions(+) create mode 100644 Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift create mode 100644 Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift create mode 100644 Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift create mode 100644 Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift diff --git a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift new file mode 100644 index 00000000..38bde13c --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift @@ -0,0 +1,86 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// Creates an asynchronous sequence that combines the latest values from many `AsyncSequence` types +/// by emitting a tuple of the values. ``combineLatestMany(_:)`` only emits a value whenever any of the base `AsyncSequence`s +/// emit a value (so long as each of the bases have emitted at least one value). +/// +/// Finishes: +/// ``combineLatestMany(_:)`` finishes when one of the bases finishes before emitting any value or +/// when all bases finished. +/// +/// Throws: +/// ``combineLatestMany(_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed +/// values will be dropped. +@available(AsyncAlgorithms 1.1, *) +public func combineLatestMany(_ bases: [any CombineLatestManyBase]) -> AsyncCombineLatestManySequence +{ + AsyncCombineLatestManySequence(bases) +} + +// TODO: Can we get rid of this typealias? +@available(AsyncAlgorithms 1.1, *) +public typealias CombineLatestManyBase = AsyncSequence & Sendable + +/// An `AsyncSequence` that combines the latest values produced from many asynchronous sequences into an asynchronous sequence of tuples. +@available(AsyncAlgorithms 1.1, *) +public struct AsyncCombineLatestManySequence: AsyncSequence, Sendable { + public typealias AsyncIterator = Iterator + + typealias Base = AsyncSequence & Sendable + let bases: [any Base] + + init(_ bases: [any Base]) { + self.bases = bases + } + + public func makeAsyncIterator() -> AsyncIterator { + Iterator( + storage: .init(self.bases) + ) + } + + public struct Iterator: AsyncIteratorProtocol { + final class InternalClass { + private let storage: CombineLatestManyStorage + + fileprivate init(storage: CombineLatestManyStorage) { + self.storage = storage + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async throws -> [Element]? { + guard let element = try await self.storage.next() else { + return nil + } + + // This force unwrap is safe since there must be a third element. + return element + } + } + + let internalClass: InternalClass + + fileprivate init(storage: CombineLatestManyStorage) { + self.internalClass = InternalClass(storage: storage) + } + + public mutating func next() async throws -> [Element]? { + try await self.internalClass.next() + } + } +} + +@available(*, unavailable) +extension AsyncCombineLatestManySequence.Iterator: Sendable {} diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift new file mode 100644 index 00000000..621f0aec --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift @@ -0,0 +1,605 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import DequeModule + +// TODO: Do we need to add Failure: Error generic here? + +/// State machine for combine latest +@available(AsyncAlgorithms 1.1, *) +struct CombineLatestManyStateMachine: Sendable { + typealias DownstreamContinuation = UnsafeContinuation< + Result<[Element]?, Error>, Never + > + typealias Base = AsyncSequence & Sendable + + private enum State: Sendable { + /// Small wrapper for the state of an upstream sequence. + struct Upstream: Sendable { + /// The upstream continuation. + var continuation: UnsafeContinuation? + /// The produced upstream element. + var element: Element? + /// Indicates wether the upstream finished/threw already + var isFinished: Bool + } + + /// The initial state before a call to `next` happened. + case initial([any Base]) + + /// The state while we are waiting for downstream demand. + case waitingForDemand( + task: Task, + upstreams: ([Upstream]), + buffer: Deque<[Element]> + ) + + /// The state while we are consuming the upstream and waiting until we get a result from all upstreams. + case combining( + task: Task, + upstreams: ([Upstream]), + downstreamContinuation: DownstreamContinuation, + buffer: Deque<[Element]> + ) + + case upstreamsFinished( + buffer: Deque<[Element]> + ) + + case upstreamThrew( + error: Error + ) + + /// The state once the downstream consumer stopped, i.e. by dropping all references + /// or by getting their `Task` cancelled. + case finished + + /// Internal state to avoid CoW. + case modifying + } + + private var state: State + + private let numberOfUpstreamSequences: Int + + /// Initializes a new `StateMachine`. + init(bases: [any Base]) { + self.state = .initial(bases) + self.numberOfUpstreamSequences = bases.count + } + + /// Actions returned by `iteratorDeinitialized()`. + enum IteratorDeinitializedAction { + /// Indicates that the `Task` needs to be cancelled and + /// the upstream continuations need to be resumed with a `CancellationError`. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { + switch self.state { + case .initial: + // Nothing to do here. No demand was signalled until now + return .none + + case .combining: + // An iterator was deinitialized while we have a suspended continuation. + preconditionFailure( + "Internal inconsistency current state \(self.state) and received iteratorDeinitialized()" + ) + + case .waitingForDemand(let task, let upstreams, _): + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now and need to clean everything up. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map { $0.continuation } + .compactMap { $0 } + ) + + case .upstreamThrew, .upstreamsFinished: + // The iterator was dropped so we can transition to finished now. + self.state = .finished + + return .none + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + mutating func taskIsStarted( + task: Task, + downstreamContinuation: DownstreamContinuation + ) { + switch self.state { + case .initial: + // The user called `next` and we are starting the `Task` + // to consume the upstream sequences + self.state = .combining( + task: task, + upstreams: Array(repeating: .init(isFinished: false), count: self.numberOfUpstreamSequences), + downstreamContinuation: downstreamContinuation, + buffer: .init() + ) + + case .combining, .waitingForDemand, .upstreamThrew, .upstreamsFinished, .finished: + // We only allow a single task to be created so this must never happen. + preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()") + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `childTaskSuspended()`. + enum ChildTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. + case resumeContinuation( + upstreamContinuation: UnsafeContinuation + ) + /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. + case resumeContinuationWithError( + upstreamContinuation: UnsafeContinuation, + error: Error + ) + } + + mutating func childTaskSuspended( + baseIndex: Int, + continuation: UnsafeContinuation + ) -> ChildTaskSuspendedAction? { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, let buffer): + self.state = .modifying + upstreams[baseIndex].continuation = continuation + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining: + // We are currently combining and need to resume any upstream until we transition to waitingForDemand + + return .resumeContinuation(upstreamContinuation: continuation) + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuationWithError( + upstreamContinuation: continuation, + error: CancellationError() + ) + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `elementProduced()`. + enum ElementProducedAction { + /// Indicates that the downstream continuation should be resumed with the element. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<[Element]?, Error> + ) + } + + mutating func elementProduced(value: Element, atBaseIndex baseIndex: Int) -> ElementProducedAction? { + switch self.state { + case .initial: + // Child tasks that are producing elements are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got an element in late. This can happen since we race the upstreams. + // We have to store the new tuple in our buffer and remember the upstream states. + + var upstreamValues = upstreams.compactMap { $0.element } + guard upstreamValues.count == self.numberOfUpstreamSequences else { + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + } + + self.state = .modifying + + upstreamValues[baseIndex] = value + buffer.append(upstreamValues) + upstreams[baseIndex].element = value + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + precondition( + buffer.isEmpty, + "Internal inconsistency current state \(self.state) and the buffer is not empty" + ) + self.state = .modifying + upstreams[baseIndex].element = value + + let nonNilElements = upstreams.compactMap(\.element) + if nonNilElements.count == self.numberOfUpstreamSequences { + // We got an element from each upstream so we can resume the downstream now + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .resumeContinuation( + downstreamContinuation: downstreamContinuation, + result: .success(nonNilElements) + ) + } else { + // We are still waiting for one of the upstreams to produce an element + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + + return .none + } + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks + // are still producing elements after we finished. + // We are just going to drop them since there is nothing we can do + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamFinished()`. + enum UpstreamFinishedAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamFinished(baseIndex: Int) -> UpstreamFinishedAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(let task, var upstreams, let buffer): + // One of the upstreams finished. + + self.state = .modifying + upstreams[0].isFinished = true + + if upstreams.allSatisfy(\.isFinished) { + // All upstreams finished we can transition to either finished or upstreamsFinished now + if buffer.isEmpty { + self.state = .finished + } else { + self.state = .upstreamsFinished(buffer: buffer) + } + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + } else { + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + return .none + } + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + // One of the upstreams finished. + + self.state = .modifying + + // We need to track if an empty upstream finished. + // If that happens we can transition to finish right away. + let emptyUpstreamFinished = upstreams[baseIndex].element == nil + upstreams[baseIndex].isFinished = true + + // Implementing this for the two arities without variadic generics is a bit awkward sadly. + if emptyUpstreamFinished { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + } else if upstreams.allSatisfy(\.isFinished) { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + } else { + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + return .none + } + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamThrew()`. + enum UpstreamThrewAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the `error` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + error: Error, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .upstreamsFinished: + // We need to tolerate multiple upstreams failing + return .none + + case .waitingForDemand(let task, let upstreams, _): + // An upstream threw. We can cancel everything now and transition to finished. + // We just need to store the error for the next downstream demand + self.state = .upstreamThrew( + error: error + ) + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // One of our upstreams threw. We need to transition to finished ourselves now + // and resume the downstream continuation with the error. Furthermore, we need to cancel all of + // the upstream work. + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `cancelled()`. + enum CancelledAction { + /// Indicates that the downstream continuation needs to be resumed and + /// task and the upstream continuations should be cancelled. + case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func cancelled() -> CancelledAction? { + switch self.state { + case .initial: + state = .finished + + return .none + + case .waitingForDemand(let task, let upstreams, _): + // The downstream task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // The downstream Task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .upstreamsFinished: + // We can transition to finished now + self.state = .finished + + return .none + + case .upstreamThrew, .finished: + // We are already finished so nothing to do here: + + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `next()`. + enum NextAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask([any Base]) + /// Indicates that all upstream continuations should be resumed. + case resumeUpstreamContinuations( + upstreamContinuation: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the result. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<[Element]?, Error> + ) + /// Indicates that the downstream continuation should be resumed with `nil`. + case resumeDownstreamContinuationWithNil(DownstreamContinuation) + } + + mutating func next(for continuation: DownstreamContinuation) -> NextAction { + switch self.state { + case .initial(let bases): + // This is the first time we get demand singalled so we have to start the task + // The transition to the next state is done in the taskStarted method + return .startTask(bases) + + case .combining: + // We already got demand signalled and have suspended the downstream task + // Getting a second next calls means the iterator was transferred across Tasks which is not allowed + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got demand signalled now we have to check if there is anything buffered. + // If not we have to transition to combining and need to resume all upstream continuations now + self.state = .modifying + + guard let element = buffer.popFirst() else { + let upstreamContinuations = upstreams.map(\.continuation).compactMap { $0 } + for index in 0..: Sendable { + typealias StateMachine = CombineLatestManyStateMachine + + private let stateMachine: ManagedCriticalState + + init(_ bases: [any StateMachine.Base]) { + self.stateMachine = .init(.init(bases: bases)) + } + + func iteratorDeinitialized() { + let action = self.stateMachine.withCriticalRegion { $0.iteratorDeinitialized() } + + switch action { + case .cancelTaskAndUpstreamContinuations( + let task, + let upstreamContinuation + ): + upstreamContinuation.forEach { $0.resume(throwing: CancellationError()) } + + task.cancel() + + case .none: + break + } + } + + func next() async throws -> [Element]? { + try await withTaskCancellationHandler { + let result = await withUnsafeContinuation { continuation in + let action: StateMachine.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.next(for: continuation) + switch action { + case .startTask(let bases): + // first iteration, we start one child task per base to iterate over them + self.startTask( + stateMachine: &stateMachine, + bases: bases, + downstreamContinuation: continuation + ) + return nil + + case .resumeContinuation: + return action + + case .resumeUpstreamContinuations: + return action + + case .resumeDownstreamContinuationWithNil: + return action + } + } + + switch action { + case .startTask: + // We are handling the startTask in the lock already because we want to avoid + // other inputs interleaving while starting the task + fatalError("Internal inconsistency") + + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .resumeUpstreamContinuations(let upstreamContinuations): + // bases can be iterated over for 1 iteration so their next value can be retrieved + upstreamContinuations.forEach { $0.resume() } + + case .resumeDownstreamContinuationWithNil(let continuation): + // the async sequence is already finished, immediately resuming + continuation.resume(returning: .success(nil)) + + case .none: + break + } + } + + return try result._rethrowGet() + + } onCancel: { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.cancelled() + } + + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + case .none: + break + } + } + } + + private func startTask( + stateMachine: inout StateMachine, + bases: [any CombineLatestManyStateMachine.Base], + downstreamContinuation: StateMachine.DownstreamContinuation + ) { + // This creates a new `Task` that is iterating the upstream + // sequences. We must store it to cancel it at the right times. + let task = Task { + await withThrowingTaskGroup(of: Void.self) { group in + // For each upstream sequence we are adding a child task that + // is consuming the upstream sequence + for (baseIndex, base) in bases.enumerated() { + group.addTask { + var baseIterator = base.makeAsyncIterator() + + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand. + try await withUnsafeThrowingContinuation { continuation in + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.childTaskSuspended(baseIndex: baseIndex, continuation: continuation) + } + + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() + + case .resumeContinuationWithError(let upstreamContinuation, let error): + upstreamContinuation.resume(throwing: error) + + case .none: + break + } + } + + if let element = try await baseIterator.next() { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.elementProduced(value: element, atBaseIndex: baseIndex) + } + + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .none: + break + } + } else { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamFinished(baseIndex: baseIndex) + } + + switch action { + case .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + break loop + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + break loop + + case .none: + break loop + } + } + } + } + } + + while !group.isEmpty { + do { + try await group.next() + } catch { + // One of the upstream sequences threw an error + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamThrew(error) + } + + switch action { + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let error, + let task, + let upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + downstreamContinuation.resume(returning: .failure(error)) + case .none: + break + } + + group.cancelAll() + } + } + } + } + + stateMachine.taskIsStarted(task: task, downstreamContinuation: downstreamContinuation) + } +} diff --git a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift new file mode 100644 index 00000000..6224db7d --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift @@ -0,0 +1,106 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import XCTest +import AsyncAlgorithms + +@available(AsyncAlgorithms 1.1, *) +final class TestCombineLatestMany: XCTestCase { + func test_combineLatest() async throws { + let a = [1, 2, 3].async.mappedFailureToError + let b = [4, 5, 6].async.mappedFailureToError + let c = [7, 8, 9].async.mappedFailureToError + let sequence = combineLatestMany([a, b, c]) + let actual = try await Array(sequence) + XCTAssertGreaterThanOrEqual(actual.count, 3) + } + + func test_ordering1() async { + var a = GatedSequence([1, 2, 3]).mappedFailureToError + var b = GatedSequence([4, 5, 6]).mappedFailureToError + var c = GatedSequence([7, 8, 9]).mappedFailureToError + let finished = expectation(description: "finished") + let sequence = combineLatestMany([a, b, c]) + let validator = Validator<[Int]>() + validator.test(sequence) { iterator in + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) + finished.fulfill() + } + var value = await validator.validate() + XCTAssertEqual(value, []) + a.advance() + value = validator.current + XCTAssertEqual(value, []) + b.advance() + value = validator.current + XCTAssertEqual(value, []) + c.advance() + + value = await validator.validate() + XCTAssertEqual(value, [(1, "a", 4)]) + a.advance() + + value = await validator.validate() + XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4)]) + b.advance() + + value = await validator.validate() + XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4)]) + c.advance() + + value = await validator.validate() + XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5)]) + a.advance() + + value = await validator.validate() + XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5)]) + b.advance() + + value = await validator.validate() + XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5), (3, "c", 5)]) + c.advance() + + value = await validator.validate() + XCTAssertEqual( + value, + [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5), (3, "c", 5), (3, "c", 6)] + ) + + await fulfillment(of: [finished], timeout: 1.0) + value = validator.current + XCTAssertEqual( + value, + [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5), (3, "c", 5), (3, "c", 6)] + ) + } +} + +@available(AsyncAlgorithms 1.1, *) +private struct MappingErrorAsyncSequence: AsyncSequence, Sendable where Upstream.Failure == Never { + var upstream: Upstream + func makeAsyncIterator() -> Iterator { + Iterator(upstream: upstream.makeAsyncIterator()) + } + struct Iterator: AsyncIteratorProtocol { + var upstream: Upstream.AsyncIterator + mutating func next(isolation actor: isolated (any Actor)?) async throws -> Upstream.Element? { + await upstream.next(isolation: actor) + } + } +} + +@available(AsyncAlgorithms 1.1, *) +extension AsyncSequence where Failure == Never, Self: Sendable { + var mappedFailureToError: some AsyncSequence & Sendable { + MappingErrorAsyncSequence(upstream: self) + } +} From 67a279576fdcc2731039b2483a423eda9daa05f6 Mon Sep 17 00:00:00 2001 From: Honza Dvorsky Date: Thu, 14 Aug 2025 13:21:09 +0200 Subject: [PATCH 2/7] Remove the mapping error util --- .../CombineLatestManyStateMachine.swift | 2 -- .../TestCombineLatestMany.swift | 33 ++++--------------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift index 621f0aec..f7d6632a 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift @@ -11,8 +11,6 @@ import DequeModule -// TODO: Do we need to add Failure: Error generic here? - /// State machine for combine latest @available(AsyncAlgorithms 1.1, *) struct CombineLatestManyStateMachine: Sendable { diff --git a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift index 6224db7d..eecf0872 100644 --- a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift +++ b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift @@ -15,18 +15,18 @@ import AsyncAlgorithms @available(AsyncAlgorithms 1.1, *) final class TestCombineLatestMany: XCTestCase { func test_combineLatest() async throws { - let a = [1, 2, 3].async.mappedFailureToError - let b = [4, 5, 6].async.mappedFailureToError - let c = [7, 8, 9].async.mappedFailureToError + let a = [1, 2, 3].async + let b = [4, 5, 6].async + let c = [7, 8, 9].async let sequence = combineLatestMany([a, b, c]) let actual = try await Array(sequence) XCTAssertGreaterThanOrEqual(actual.count, 3) } func test_ordering1() async { - var a = GatedSequence([1, 2, 3]).mappedFailureToError - var b = GatedSequence([4, 5, 6]).mappedFailureToError - var c = GatedSequence([7, 8, 9]).mappedFailureToError + var a = GatedSequence([1, 2, 3]) + var b = GatedSequence([4, 5, 6]) + var c = GatedSequence([7, 8, 9]) let finished = expectation(description: "finished") let sequence = combineLatestMany([a, b, c]) let validator = Validator<[Int]>() @@ -83,24 +83,3 @@ final class TestCombineLatestMany: XCTestCase { ) } } - -@available(AsyncAlgorithms 1.1, *) -private struct MappingErrorAsyncSequence: AsyncSequence, Sendable where Upstream.Failure == Never { - var upstream: Upstream - func makeAsyncIterator() -> Iterator { - Iterator(upstream: upstream.makeAsyncIterator()) - } - struct Iterator: AsyncIteratorProtocol { - var upstream: Upstream.AsyncIterator - mutating func next(isolation actor: isolated (any Actor)?) async throws -> Upstream.Element? { - await upstream.next(isolation: actor) - } - } -} - -@available(AsyncAlgorithms 1.1, *) -extension AsyncSequence where Failure == Never, Self: Sendable { - var mappedFailureToError: some AsyncSequence & Sendable { - MappingErrorAsyncSequence(upstream: self) - } -} From 5a4946b3d963affae0f0c73c68c86e573be70054 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Thu, 14 Aug 2025 14:12:32 +0200 Subject: [PATCH 3/7] Fun --- .../AsyncCombineLatestManySequence.swift | 38 +++++++------ .../CombineLatestManyStateMachine.swift | 54 +++++++++---------- .../CombineLatestManyStorage.swift | 53 ++++++++++-------- .../Support/GatedSequence.swift | 1 + .../TestCombineLatestMany.swift | 4 +- 5 files changed, 75 insertions(+), 75 deletions(-) diff --git a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift index 38bde13c..b9feb812 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift @@ -21,21 +21,18 @@ /// ``combineLatestMany(_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed /// values will be dropped. @available(AsyncAlgorithms 1.1, *) -public func combineLatestMany(_ bases: [any CombineLatestManyBase]) -> AsyncCombineLatestManySequence -{ - AsyncCombineLatestManySequence(bases) +public func combineLatestMany( + _ bases: [any (AsyncSequence & Sendable)] +) -> some AsyncSequence<[Element], Failure> & Sendable { + AsyncCombineLatestManySequence(bases) } -// TODO: Can we get rid of this typealias? -@available(AsyncAlgorithms 1.1, *) -public typealias CombineLatestManyBase = AsyncSequence & Sendable - /// An `AsyncSequence` that combines the latest values produced from many asynchronous sequences into an asynchronous sequence of tuples. @available(AsyncAlgorithms 1.1, *) -public struct AsyncCombineLatestManySequence: AsyncSequence, Sendable { +public struct AsyncCombineLatestManySequence: AsyncSequence, Sendable { public typealias AsyncIterator = Iterator - typealias Base = AsyncSequence & Sendable + typealias Base = AsyncSequence & Sendable let bases: [any Base] init(_ bases: [any Base]) { @@ -50,9 +47,9 @@ public struct AsyncCombineLatestManySequence: AsyncSequence, public struct Iterator: AsyncIteratorProtocol { final class InternalClass { - private let storage: CombineLatestManyStorage + private let storage: CombineLatestManyStorage - fileprivate init(storage: CombineLatestManyStorage) { + fileprivate init(storage: CombineLatestManyStorage) { self.storage = storage } @@ -60,23 +57,24 @@ public struct AsyncCombineLatestManySequence: AsyncSequence, self.storage.iteratorDeinitialized() } - func next() async throws -> [Element]? { - guard let element = try await self.storage.next() else { - return nil - } - - // This force unwrap is safe since there must be a third element. - return element + func next() async throws(Failure) -> [Element]? { + fatalError() +// guard let element = try await self.storage.next() else { +// return nil +// } +// +// // This force unwrap is safe since there must be a third element. +// return element } } let internalClass: InternalClass - fileprivate init(storage: CombineLatestManyStorage) { + fileprivate init(storage: CombineLatestManyStorage) { self.internalClass = InternalClass(storage: storage) } - public mutating func next() async throws -> [Element]? { + public mutating func next() async throws(Failure) -> [Element]? { try await self.internalClass.next() } } diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift index f7d6632a..4ca05b34 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift @@ -13,17 +13,17 @@ import DequeModule /// State machine for combine latest @available(AsyncAlgorithms 1.1, *) -struct CombineLatestManyStateMachine: Sendable { +struct CombineLatestManyStateMachine: Sendable { typealias DownstreamContinuation = UnsafeContinuation< - Result<[Element]?, Error>, Never + Result<[Element]?, Failure>, Never > - typealias Base = AsyncSequence & Sendable + typealias Base = AsyncSequence & Sendable private enum State: Sendable { /// Small wrapper for the state of an upstream sequence. struct Upstream: Sendable { /// The upstream continuation. - var continuation: UnsafeContinuation? + var continuation: UnsafeContinuation? /// The produced upstream element. var element: Element? /// Indicates wether the upstream finished/threw already @@ -53,7 +53,7 @@ struct CombineLatestManyStateMachine: Sendable { ) case upstreamThrew( - error: Error + error: Failure ) /// The state once the downstream consumer stopped, i.e. by dropping all references @@ -77,10 +77,10 @@ struct CombineLatestManyStateMachine: Sendable { /// Actions returned by `iteratorDeinitialized()`. enum IteratorDeinitializedAction { /// Indicates that the `Task` needs to be cancelled and - /// the upstream continuations need to be resumed with a `CancellationError`. + /// the upstream continuations need to be resumed with a `CancellationFailure`. case cancelTaskAndUpstreamContinuations( task: Task, - upstreamContinuations: [UnsafeContinuation] + upstreamContinuations: [UnsafeContinuation] ) } @@ -151,18 +151,13 @@ struct CombineLatestManyStateMachine: Sendable { enum ChildTaskSuspendedAction { /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. case resumeContinuation( - upstreamContinuation: UnsafeContinuation - ) - /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. - case resumeContinuationWithError( - upstreamContinuation: UnsafeContinuation, - error: Error + upstreamContinuation: UnsafeContinuation ) } mutating func childTaskSuspended( baseIndex: Int, - continuation: UnsafeContinuation + continuation: UnsafeContinuation ) -> ChildTaskSuspendedAction? { switch self.state { case .initial: @@ -193,9 +188,8 @@ struct CombineLatestManyStateMachine: Sendable { // Since cancellation is cooperative it might be that child tasks are still getting // suspended even though we already cancelled them. We must tolerate this and just resume // the continuation with an error. - return .resumeContinuationWithError( - upstreamContinuation: continuation, - error: CancellationError() + return .resumeContinuation( + upstreamContinuation: continuation ) case .modifying: @@ -208,7 +202,7 @@ struct CombineLatestManyStateMachine: Sendable { /// Indicates that the downstream continuation should be resumed with the element. case resumeContinuation( downstreamContinuation: DownstreamContinuation, - result: Result<[Element]?, Error> + result: Result<[Element]?, Failure> ) } @@ -293,14 +287,14 @@ struct CombineLatestManyStateMachine: Sendable { /// Indicates the task and the upstream continuations should be cancelled. case cancelTaskAndUpstreamContinuations( task: Task, - upstreamContinuations: [UnsafeContinuation] + upstreamContinuations: [UnsafeContinuation] ) /// Indicates that the downstream continuation should be resumed with `nil` and /// the task and the upstream continuations should be cancelled. case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( downstreamContinuation: DownstreamContinuation, task: Task, - upstreamContinuations: [UnsafeContinuation] + upstreamContinuations: [UnsafeContinuation] ) } @@ -394,19 +388,19 @@ struct CombineLatestManyStateMachine: Sendable { /// Indicates the task and the upstream continuations should be cancelled. case cancelTaskAndUpstreamContinuations( task: Task, - upstreamContinuations: [UnsafeContinuation] + upstreamContinuations: [UnsafeContinuation] ) /// Indicates that the downstream continuation should be resumed with the `error` and /// the task and the upstream continuations should be cancelled. - case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + case resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( downstreamContinuation: DownstreamContinuation, - error: Error, + error: Failure, task: Task, - upstreamContinuations: [UnsafeContinuation] + upstreamContinuations: [UnsafeContinuation] ) } - mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction? { + mutating func upstreamThrew(_ error: Failure) -> UpstreamThrewAction? { switch self.state { case .initial: preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") @@ -433,7 +427,7 @@ struct CombineLatestManyStateMachine: Sendable { // the upstream work. self.state = .finished - return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + return .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( downstreamContinuation: downstreamContinuation, error: error, task: task, @@ -456,12 +450,12 @@ struct CombineLatestManyStateMachine: Sendable { case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( downstreamContinuation: DownstreamContinuation, task: Task, - upstreamContinuations: [UnsafeContinuation] + upstreamContinuations: [UnsafeContinuation] ) /// Indicates that the task and the upstream continuations should be cancelled. case cancelTaskAndUpstreamContinuations( task: Task, - upstreamContinuations: [UnsafeContinuation] + upstreamContinuations: [UnsafeContinuation] ) } @@ -515,12 +509,12 @@ struct CombineLatestManyStateMachine: Sendable { case startTask([any Base]) /// Indicates that all upstream continuations should be resumed. case resumeUpstreamContinuations( - upstreamContinuation: [UnsafeContinuation] + upstreamContinuation: [UnsafeContinuation] ) /// Indicates that the downstream continuation should be resumed with the result. case resumeContinuation( downstreamContinuation: DownstreamContinuation, - result: Result<[Element]?, Error> + result: Result<[Element]?, Failure> ) /// Indicates that the downstream continuation should be resumed with `nil`. case resumeDownstreamContinuationWithNil(DownstreamContinuation) diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift index a828f5e1..4a7cb094 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift @@ -10,8 +10,8 @@ //===----------------------------------------------------------------------===// @available(AsyncAlgorithms 1.1, *) -final class CombineLatestManyStorage: Sendable { - typealias StateMachine = CombineLatestManyStateMachine +final class CombineLatestManyStorage: Sendable { + typealias StateMachine = CombineLatestManyStateMachine private let stateMachine: ManagedCriticalState @@ -27,9 +27,8 @@ final class CombineLatestManyStorage: Sendable { let task, let upstreamContinuation ): - upstreamContinuation.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() + upstreamContinuation.forEach { $0.resume() } case .none: break @@ -97,14 +96,14 @@ final class CombineLatestManyStorage: Sendable { let task, let upstreamContinuations ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() + task.cancel() + upstreamContinuations.forEach { $0.resume() } downstreamContinuation.resume(returning: .success(nil)) case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() + task.cancel() + upstreamContinuations.forEach { $0.resume() } case .none: break @@ -114,13 +113,13 @@ final class CombineLatestManyStorage: Sendable { private func startTask( stateMachine: inout StateMachine, - bases: [any CombineLatestManyStateMachine.Base], + bases: [any (AsyncSequence & Sendable)], downstreamContinuation: StateMachine.DownstreamContinuation ) { // This creates a new `Task` that is iterating the upstream // sequences. We must store it to cancel it at the right times. let task = Task { - await withThrowingTaskGroup(of: Void.self) { group in + await withTaskGroup(of: Result.self) { group in // For each upstream sequence we are adding a child task that // is consuming the upstream sequence for (baseIndex, base) in bases.enumerated() { @@ -131,7 +130,7 @@ final class CombineLatestManyStorage: Sendable { // We are creating a continuation before requesting the next // element from upstream. This continuation is only resumed // if the downstream consumer called `next` to signal his demand. - try await withUnsafeThrowingContinuation { continuation in + await withUnsafeContinuation { continuation in let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.childTaskSuspended(baseIndex: baseIndex, continuation: continuation) } @@ -140,15 +139,19 @@ final class CombineLatestManyStorage: Sendable { case .resumeContinuation(let upstreamContinuation): upstreamContinuation.resume() - case .resumeContinuationWithError(let upstreamContinuation, let error): - upstreamContinuation.resume(throwing: error) - case .none: break } } + + let element: Element? + do { + element = try await baseIterator.next(isolation: nil) + } catch { + return .failure(error as! Failure) // Looks like a compiler bug + } - if let element = try await baseIterator.next() { + if let element = element { let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.elementProduced(value: element, atBaseIndex: baseIndex) } @@ -172,15 +175,15 @@ final class CombineLatestManyStorage: Sendable { let upstreamContinuations ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } task.cancel() + upstreamContinuations.forEach { $0.resume() } downstreamContinuation.resume(returning: .success(nil)) break loop case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } task.cancel() + upstreamContinuations.forEach { $0.resume() } break loop @@ -189,13 +192,17 @@ final class CombineLatestManyStorage: Sendable { } } } + return .success(()) } } while !group.isEmpty { - do { - try await group.next() - } catch { + let result = await group.next() + + switch result { + case .success, .none: + break + case .failure(let error): // One of the upstream sequences threw an error let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.upstreamThrew(error) @@ -203,16 +210,16 @@ final class CombineLatestManyStorage: Sendable { switch action { case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } task.cancel() - case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + upstreamContinuations.forEach { $0.resume() } + case .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( let downstreamContinuation, let error, let task, let upstreamContinuations ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } task.cancel() + upstreamContinuations.forEach { $0.resume() } downstreamContinuation.resume(returning: .failure(error)) case .none: break diff --git a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift index 9167a4c1..bab4cf4f 100644 --- a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift +++ b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift @@ -10,6 +10,7 @@ //===----------------------------------------------------------------------===// public struct GatedSequence { + typealias Failure = Never let elements: [Element] let gates: [Gate] var index = 0 diff --git a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift index eecf0872..023e1d5d 100644 --- a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift +++ b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift @@ -19,7 +19,7 @@ final class TestCombineLatestMany: XCTestCase { let b = [4, 5, 6].async let c = [7, 8, 9].async let sequence = combineLatestMany([a, b, c]) - let actual = try await Array(sequence) + let actual = await Array(sequence) XCTAssertGreaterThanOrEqual(actual.count, 3) } @@ -31,7 +31,7 @@ final class TestCombineLatestMany: XCTestCase { let sequence = combineLatestMany([a, b, c]) let validator = Validator<[Int]>() validator.test(sequence) { iterator in - let pastEnd = await iterator.next() + let pastEnd = await iterator.next(isolation: nil) XCTAssertNil(pastEnd) finished.fulfill() } From f429665614439043f21e4c98462777877d3481c7 Mon Sep 17 00:00:00 2001 From: Honza Dvorsky Date: Thu, 14 Aug 2025 14:53:05 +0200 Subject: [PATCH 4/7] Got the combineLatestMany algorithm working, using Swift 6.2+ --- .../AsyncCombineLatestManySequence.swift | 17 +++++++++------- .../CombineLatestManyStateMachine.swift | 4 ++++ .../CombineLatestManyStorage.swift | 18 +++++++++-------- .../TestCombineLatestMany.swift | 20 +++++++++++-------- 4 files changed, 36 insertions(+), 23 deletions(-) diff --git a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift index b9feb812..1850e7c6 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift @@ -9,6 +9,8 @@ // //===----------------------------------------------------------------------===// +#if compiler(>=6.2) + /// Creates an asynchronous sequence that combines the latest values from many `AsyncSequence` types /// by emitting a tuple of the values. ``combineLatestMany(_:)`` only emits a value whenever any of the base `AsyncSequence`s /// emit a value (so long as each of the bases have emitted at least one value). @@ -58,13 +60,12 @@ public struct AsyncCombineLatestManySequence: } func next() async throws(Failure) -> [Element]? { - fatalError() -// guard let element = try await self.storage.next() else { -// return nil -// } -// -// // This force unwrap is safe since there must be a third element. -// return element + guard let element = try await self.storage.next() else { + return nil + } + + // This force unwrap is safe since there must be a third element. + return element } } @@ -82,3 +83,5 @@ public struct AsyncCombineLatestManySequence: @available(*, unavailable) extension AsyncCombineLatestManySequence.Iterator: Sendable {} + +#endif diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift index 4ca05b34..ef9bfd04 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift @@ -9,6 +9,8 @@ // //===----------------------------------------------------------------------===// +#if compiler(>=6.2) + import DequeModule /// State machine for combine latest @@ -595,3 +597,5 @@ struct CombineLatestManyStateMachine: Sendabl } } } + +#endif diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift index 4a7cb094..008b12a9 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift @@ -9,6 +9,8 @@ // //===----------------------------------------------------------------------===// +#if compiler(>=6.2) + @available(AsyncAlgorithms 1.1, *) final class CombineLatestManyStorage: Sendable { typealias StateMachine = CombineLatestManyStateMachine @@ -35,9 +37,9 @@ final class CombineLatestManyStorage: Sendabl } } - func next() async throws -> [Element]? { - try await withTaskCancellationHandler { - let result = await withUnsafeContinuation { continuation in + func next() async throws(Failure) -> [Element]? { + let result = await withTaskCancellationHandler { + await withUnsafeContinuation { continuation in let action: StateMachine.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in let action = stateMachine.next(for: continuation) switch action { @@ -82,9 +84,6 @@ final class CombineLatestManyStorage: Sendabl break } } - - return try result._rethrowGet() - } onCancel: { let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.cancelled() @@ -96,19 +95,20 @@ final class CombineLatestManyStorage: Sendabl let task, let upstreamContinuations ): - task.cancel() + task.cancel() upstreamContinuations.forEach { $0.resume() } downstreamContinuation.resume(returning: .success(nil)) case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): - task.cancel() + task.cancel() upstreamContinuations.forEach { $0.resume() } case .none: break } } + return try result.get() } private func startTask( @@ -234,3 +234,5 @@ final class CombineLatestManyStorage: Sendabl stateMachine.taskIsStarted(task: task, downstreamContinuation: downstreamContinuation) } } + +#endif diff --git a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift index 023e1d5d..09c7e5fc 100644 --- a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift +++ b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift @@ -9,6 +9,8 @@ // //===----------------------------------------------------------------------===// +#if compiler(>=6.2) + import XCTest import AsyncAlgorithms @@ -46,40 +48,42 @@ final class TestCombineLatestMany: XCTestCase { c.advance() value = await validator.validate() - XCTAssertEqual(value, [(1, "a", 4)]) + XCTAssertEqual(value, [[1, 4, 7]]) a.advance() value = await validator.validate() - XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4)]) + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7]]) b.advance() value = await validator.validate() - XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4)]) + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7]]) c.advance() value = await validator.validate() - XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5)]) + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8]]) a.advance() value = await validator.validate() - XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5)]) + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8]]) b.advance() value = await validator.validate() - XCTAssertEqual(value, [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5), (3, "c", 5)]) + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8]]) c.advance() value = await validator.validate() XCTAssertEqual( value, - [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5), (3, "c", 5), (3, "c", 6)] + [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8], [3, 6, 9]] ) await fulfillment(of: [finished], timeout: 1.0) value = validator.current XCTAssertEqual( value, - [(1, "a", 4), (2, "a", 4), (2, "b", 4), (2, "b", 5), (3, "b", 5), (3, "c", 5), (3, "c", 6)] + [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8], [3, 6, 9]] ) } } + +#endif From f1f576240a2792af270e517320b900a0e58321c3 Mon Sep 17 00:00:00 2001 From: Honza Dvorsky Date: Thu, 14 Aug 2025 14:54:44 +0200 Subject: [PATCH 5/7] Revert an unnecessary change --- Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift index bab4cf4f..9167a4c1 100644 --- a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift +++ b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift @@ -10,7 +10,6 @@ //===----------------------------------------------------------------------===// public struct GatedSequence { - typealias Failure = Never let elements: [Element] let gates: [Gate] var index = 0 From 1b2cd9659ab3b633a553ded694345fce23d64114 Mon Sep 17 00:00:00 2001 From: Honza Dvorsky Date: Tue, 4 Nov 2025 12:21:37 +0100 Subject: [PATCH 6/7] Fix an index bug --- .../CombineLatest/CombineLatestManyStateMachine.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift index ef9bfd04..1a03f605 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift @@ -312,7 +312,7 @@ struct CombineLatestManyStateMachine: Sendabl // One of the upstreams finished. self.state = .modifying - upstreams[0].isFinished = true + upstreams[baseIndex].isFinished = true if upstreams.allSatisfy(\.isFinished) { // All upstreams finished we can transition to either finished or upstreamsFinished now From cd2e0e8e04a58c10b661a72556976859d88b0d8d Mon Sep 17 00:00:00 2001 From: Honza Dvorsky Date: Tue, 4 Nov 2025 12:24:32 +0100 Subject: [PATCH 7/7] More cleanup --- .../CombineLatest/AsyncCombineLatestManySequence.swift | 2 -- .../CombineLatest/CombineLatestManyStateMachine.swift | 1 - 2 files changed, 3 deletions(-) diff --git a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift index 1850e7c6..5a1b4d72 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift @@ -63,8 +63,6 @@ public struct AsyncCombineLatestManySequence: guard let element = try await self.storage.next() else { return nil } - - // This force unwrap is safe since there must be a third element. return element } } diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift index 1a03f605..2b279040 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift @@ -345,7 +345,6 @@ struct CombineLatestManyStateMachine: Sendabl let emptyUpstreamFinished = upstreams[baseIndex].element == nil upstreams[baseIndex].isFinished = true - // Implementing this for the two arities without variadic generics is a bit awkward sadly. if emptyUpstreamFinished { // All upstreams finished self.state = .finished