@@ -31,21 +31,26 @@ private static async IAsyncEnumerable<TResponse> FullDuplexImpl<TRequest, TRespo
3131 [ EnumeratorCancellation ] CancellationToken cancellationToken )
3232 {
3333 using var allDone = CancellationTokenSource . CreateLinkedTokenSource ( context . CancellationToken , cancellationToken ) ;
34+ Task ? consumed = null ;
3435 try
3536 {
3637 context = new CallContext ( context , allDone . Token ) ;
37- var consumed = Task . Run ( ( ) => consumer ( source , context ) . AsTask ( ) , allDone . Token ) ; // note this shares a capture scope
38+ consumed = Task . Run ( ( ) => consumer ( source , context ) . AsTask ( ) , allDone . Token ) ; // note this shares a capture scope
3839
3940 await foreach ( var value in producer ( context ) . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) )
4041 {
4142 yield return value ;
4243 }
43- await consumed . ConfigureAwait ( false ) ;
4444 }
4545 finally
4646 {
4747 // stop the producer, in any exit scenario
4848 allDone . Cancel ( ) ;
49+
50+ if ( consumed != null )
51+ {
52+ await consumed . ConfigureAwait ( false ) ;
53+ }
4954 }
5055 }
5156
@@ -68,10 +73,11 @@ private static async IAsyncEnumerable<TResponse> FullDuplexImpl<TRequest, TRespo
6873 [ EnumeratorCancellation ] CancellationToken cancellationToken )
6974 {
7075 using var allDone = CancellationTokenSource . CreateLinkedTokenSource ( context . CancellationToken , cancellationToken ) ;
76+ Task ? consumed = null ;
7177 try
7278 {
7379 context = new CallContext ( context , allDone . Token ) ;
74- var consumed = Task . Run ( async ( ) =>
80+ consumed = Task . Run ( async ( ) =>
7581 { // note this shares a capture scope
7682 await foreach ( var value in source . WithCancellation ( allDone . Token ) . ConfigureAwait ( false ) )
7783 {
@@ -82,12 +88,16 @@ private static async IAsyncEnumerable<TResponse> FullDuplexImpl<TRequest, TRespo
8288 {
8389 yield return value ;
8490 }
85- await consumed . ConfigureAwait ( false ) ;
8691 }
8792 finally
8893 {
8994 // stop the producer, in any exit scenario
9095 allDone . Cancel ( ) ;
96+
97+ if ( consumed != null )
98+ {
99+ await consumed . ConfigureAwait ( false ) ;
100+ }
91101 }
92102 }
93103
0 commit comments