@@ -124,15 +124,15 @@ extension MultiProducerSingleConsumerAsyncChannel {
124124 ) {
125125 self . _stateMachine = Mutex < _StateMachine > ( _StateMachine ( backpressureStrategy: backpressureStrategy) )
126126 }
127-
127+
128128 func setOnTerminationCallback(
129129 sourceID: UInt64 ,
130130 callback: ( @Sendable ( ) -> Void ) ?
131131 ) {
132132 let action = self . _stateMachine. withLock {
133133 $0. setOnTerminationCallback ( sourceID: sourceID, callback: callback)
134134 }
135-
135+
136136 switch action {
137137 case . callOnTermination( let onTermination) :
138138 onTermination ( )
@@ -540,7 +540,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
540540 init ( state: consuming _State ) {
541541 self . _state = state
542542 }
543-
543+
544544 /// Actions returned by `sourceDeinitialized()`.
545545 @usableFromInline
546546 enum SetOnTerminationCallback {
@@ -565,10 +565,10 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
565565 } else {
566566 channeling. onTerminations. removeAll ( where: { $0. 0 == sourceID } )
567567 }
568-
568+
569569 self = . init( state: . channeling( channeling) )
570570 return . none
571-
571+
572572 case . sourceFinished( var sourceFinished) :
573573 if let callback {
574574 if let index = sourceFinished. onTerminations. firstIndex ( where: { $0. 0 == sourceID } ) {
@@ -579,18 +579,17 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
579579 } else {
580580 sourceFinished. onTerminations. removeAll ( where: { $0. 0 == sourceID } )
581581 }
582-
582+
583583 self = . init( state: . sourceFinished( sourceFinished) )
584584 return . none
585585
586586 case . finished( let finished) :
587587 self = . init( state: . finished( finished) )
588-
589- if let callback {
590- return . callOnTermination( callback)
591- } else {
588+
589+ guard let callback else {
592590 return . none
593591 }
592+ return . callOnTermination( callback)
594593 }
595594 }
596595
@@ -610,7 +609,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
610609
611610 case . finished( let finished) :
612611 self = . init( state: . finished( finished) )
613- return . max // We use max to indicate that this is finished
612+ return . max // We use max to indicate that this is finished
614613 }
615614 }
616615
@@ -992,22 +991,20 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
992991 let callbackToken = shouldProduceMore ? nil : channeling. nextCallbackToken ( )
993992 self = . init( state: . channeling( channeling) )
994993
995- if let callbackToken {
996- return . returnEnqueue( callbackToken: callbackToken)
997- } else {
994+ guard let callbackToken else {
998995 return . returnProduceMore
999996 }
997+ return . returnEnqueue( callbackToken: callbackToken)
1000998 }
1001999 guard let element = channeling. buffer. popFirst ( ) else {
10021000 // We got a send of an empty sequence. We just tolerate this.
10031001 let callbackToken = shouldProduceMore ? nil : channeling. nextCallbackToken ( )
10041002 self = . init( state: . channeling( channeling) )
10051003
1006- if let callbackToken {
1007- return . returnEnqueue( callbackToken: callbackToken)
1008- } else {
1004+ guard let callbackToken else {
10091005 return . returnProduceMore
10101006 }
1007+ return . returnEnqueue( callbackToken: callbackToken)
10111008 }
10121009 // This is actually safe but we can't express it right now.
10131010 // The element is taken from the deque once we and initally send it into
@@ -1022,18 +1019,17 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
10221019 let callbackToken = shouldProduceMore ? nil : channeling. nextCallbackToken ( )
10231020 self = . init( state: . channeling( channeling) )
10241021
1025- if let callbackToken {
1026- return . resumeConsumerAndReturnEnqueue(
1027- continuation: consumerContinuation,
1028- element: disconnectedElement,
1029- callbackToken: callbackToken
1030- )
1031- } else {
1022+ guard let callbackToken else {
10321023 return . resumeConsumerAndReturnProduceMore(
1033- continuation: consumerContinuation,
1034- element: disconnectedElement
1035- )
1024+ continuation: consumerContinuation,
1025+ element: disconnectedElement
1026+ )
10361027 }
1028+ return . resumeConsumerAndReturnEnqueue(
1029+ continuation: consumerContinuation,
1030+ element: disconnectedElement,
1031+ callbackToken: callbackToken
1032+ )
10371033
10381034 case . sourceFinished ( let sourceFinished ) :
10391035 // If the source has finished we are dropping the elements.
@@ -1391,7 +1387,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
13911387
13921388 return . none
13931389 }
1394-
1390+
13951391 // We have an element to fulfil the demand right away.
13961392 let shouldProduceMore = channeling. backpressureStrategy. didConsume ( element: element)
13971393 channeling. hasOutstandingDemand = shouldProduceMore
@@ -1551,7 +1547,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine {
15511547 /// The next callback token.
15521548 @usableFromInline
15531549 var nextCallbackTokenID : UInt64
1554-
1550+
15551551 /// The source ID.
15561552 @usableFromInline
15571553 var _nextSourceID : UInt64
@@ -1596,7 +1592,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine {
15961592 self . nextCallbackTokenID += 1
15971593 return id
15981594 }
1599-
1595+
16001596 /// Generates the next source ID.
16011597 @inlinable
16021598 mutating func nextSourceID( ) -> UInt64 {
@@ -1627,7 +1623,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine {
16271623 /// The onTermination callbacks.
16281624 @usableFromInline
16291625 var onTerminations : _TinyArray < ( UInt64 , @Sendable ( ) -> Void ) >
1630-
1626+
16311627 /// The source ID.
16321628 @usableFromInline
16331629 var _nextSourceID : UInt64
@@ -1652,7 +1648,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine {
16521648 self . onTerminations = onTerminations
16531649 self . _nextSourceID = nextSourceID
16541650 }
1655-
1651+
16561652 /// Generates the next source ID.
16571653 @inlinable
16581654 mutating func nextSourceID( ) -> UInt64 {
@@ -1737,12 +1733,12 @@ extension Optional where Wrapped: ~Copyable {
17371733struct SendableConsumeOnceBox< Wrapped> {
17381734 @usableFromInline
17391735 var wrapped : Optional < Wrapped >
1740-
1736+
17411737 @inlinable
17421738 init ( wrapped: consuming sending Wrapped) {
17431739 self . wrapped = . some( wrapped)
17441740 }
1745-
1741+
17461742 @inlinable
17471743 consuming func take( ) -> sending Wrapped {
17481744 return self . wrapped. takeSending ( ) !
0 commit comments