77using System . Collections . Generic ;
88using System . Diagnostics ;
99using System . Runtime . CompilerServices ;
10+ using System . Threading ;
1011using System . Threading . Channels ;
1112using System . Threading . Tasks ;
1213using Microsoft . Extensions . Logging ;
@@ -197,7 +198,8 @@ public async Task StoreOffset(ulong offset)
197198 await _client . StoreOffset ( _config . Reference , _config . Stream , offset ) . ConfigureAwait ( false ) ;
198199 }
199200
200- // It is needed to understand if the consumer is active or not
201+ ////// *********************
202+ // IsPromotedAsActive is needed to understand if the consumer is active or not
201203 // by default is active
202204 // in case of single active consumer can be not active
203205 // it is important to skip the messages in the chunk that
@@ -206,6 +208,36 @@ public async Task StoreOffset(ulong offset)
206208 // long task
207209 private bool IsPromotedAsActive { get ; set ; }
208210
211+ // PromotionLock avoids race conditions when the consumer is promoted as active
212+ // and the messages are dispatched in parallel.
213+ // The consumer can be promoted as active with the function ConsumerUpdateListener
214+ // It is needed when the consumer is single active consumer
215+ private SemaphoreSlim PromotionLock { get ; } = new ( 1 ) ;
216+
217+ /// <summary>
218+ /// MaybeLockDispatch locks the dispatch of the messages
219+ /// it is needed only when the consumer is single active consumer
220+ /// MaybeLockDispatch is an optimization to avoid to lock the dispatch
221+ /// when the consumer is not single active consumer
222+ /// </summary>
223+
224+ private async Task MaybeLockDispatch ( )
225+ {
226+ if ( _config . IsSingleActiveConsumer )
227+ await PromotionLock . WaitAsync ( Token ) . ConfigureAwait ( false ) ;
228+ }
229+
230+ /// <summary>
231+ /// MaybeReleaseLock releases the lock on the dispatch of the messages
232+ /// Following the MaybeLockDispatch method
233+ /// </summary>
234+ private void MaybeReleaseLock ( )
235+ {
236+ if ( _config . IsSingleActiveConsumer )
237+ PromotionLock . Release ( ) ;
238+ }
239+
240+ ////// *********************
209241 public static async Task < IConsumer > Create (
210242 ClientParameters clientParameters ,
211243 RawConsumerConfig config ,
@@ -389,7 +421,15 @@ await _config.MessageHandler(this,
389421 for ( ulong z = 0 ; z < subEntryChunk . NumRecordsInBatch ; z ++ )
390422 {
391423 var message = MessageFromSequence ( ref unCompressedData , ref compressOffset ) ;
392- await DispatchMessage ( message , messageOffset ++ ) . ConfigureAwait ( false ) ;
424+ await MaybeLockDispatch ( ) . ConfigureAwait ( false ) ;
425+ try
426+ {
427+ await DispatchMessage ( message , messageOffset ++ ) . ConfigureAwait ( false ) ;
428+ }
429+ finally
430+ {
431+ MaybeReleaseLock ( ) ;
432+ }
393433 }
394434
395435 numRecords -= subEntryChunk . NumRecordsInBatch ;
@@ -590,7 +630,7 @@ private async Task Init()
590630 {
591631 // in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener
592632 // since the user decided to override the default behavior
593-
633+ await MaybeLockDispatch ( ) . ConfigureAwait ( false ) ;
594634 try
595635 {
596636 _config . StoredOffsetSpec = await _config . ConsumerUpdateListener (
@@ -606,6 +646,10 @@ private async Task Init()
606646 // in this case the default behavior is to use the OffsetTypeNext
607647 _config . StoredOffsetSpec = new OffsetTypeNext ( ) ;
608648 }
649+ finally
650+ {
651+ MaybeReleaseLock ( ) ;
652+ }
609653 }
610654
611655 // Here we set the promotion status
0 commit comments