3838using System . Threading . Tasks ;
3939using RabbitMQ . Client ;
4040
41+ const ushort MAX_OUTSTANDING_CONFIRMS = 256 ;
42+
4143const int MESSAGE_COUNT = 50_000 ;
4244bool debug = false ;
4345
46+ var channelOpts = new CreateChannelOptions
47+ {
48+ PublisherConfirmationsEnabled = true ,
49+ PublisherConfirmationTrackingEnabled = true ,
50+ MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS
51+ } ;
52+
4453#pragma warning disable CS8321 // Local function is declared but never used
4554
4655await PublishMessagesIndividuallyAsync ( ) ;
@@ -53,12 +62,12 @@ static Task<IConnection> CreateConnectionAsync()
5362 return factory . CreateConnectionAsync ( ) ;
5463}
5564
56- static async Task PublishMessagesIndividuallyAsync ( )
65+ async Task PublishMessagesIndividuallyAsync ( )
5766{
5867 Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MESSAGE_COUNT : N0} messages and handling confirms per-message") ;
5968
6069 await using IConnection connection = await CreateConnectionAsync ( ) ;
61- await using IChannel channel = await connection . CreateChannelAsync ( new CreateChannelOptions { PublisherConfirmationsEnabled = true , PublisherConfirmationTrackingEnabled = true } ) ;
70+ await using IChannel channel = await connection . CreateChannelAsync ( channelOpts ) ;
6271
6372 // declare a server-named queue
6473 QueueDeclareOk queueDeclareResult = await channel . QueueDeclareAsync ( ) ;
@@ -85,18 +94,18 @@ static async Task PublishMessagesIndividuallyAsync()
8594 Console . WriteLine ( $ "{ DateTime . Now } [INFO] published { MESSAGE_COUNT : N0} messages individually in { sw . ElapsedMilliseconds : N0} ms") ;
8695}
8796
88- static async Task PublishMessagesInBatchAsync ( )
97+ async Task PublishMessagesInBatchAsync ( )
8998{
9099 Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MESSAGE_COUNT : N0} messages and handling confirms in batches") ;
91100
92101 await using IConnection connection = await CreateConnectionAsync ( ) ;
93- await using IChannel channel = await connection . CreateChannelAsync ( new CreateChannelOptions { PublisherConfirmationsEnabled = true , PublisherConfirmationTrackingEnabled = true } ) ;
102+ await using IChannel channel = await connection . CreateChannelAsync ( channelOpts ) ;
94103
95104 // declare a server-named queue
96105 QueueDeclareOk queueDeclareResult = await channel . QueueDeclareAsync ( ) ;
97106 string queueName = queueDeclareResult . QueueName ;
98107
99- int batchSize = 1000 ;
108+ int batchSize = MAX_OUTSTANDING_CONFIRMS ;
100109 int outstandingMessageCount = 0 ;
101110
102111 var sw = new Stopwatch ( ) ;
@@ -154,12 +163,8 @@ async Task HandlePublishConfirmsAsynchronously()
154163
155164 await using IConnection connection = await CreateConnectionAsync ( ) ;
156165
157- var channelOptions = new CreateChannelOptions
158- {
159- PublisherConfirmationsEnabled = true ,
160- PublisherConfirmationTrackingEnabled = false
161- } ;
162- await using IChannel channel = await connection . CreateChannelAsync ( channelOptions ) ;
166+ channelOpts . PublisherConfirmationTrackingEnabled = false ;
167+ await using IChannel channel = await connection . CreateChannelAsync ( channelOpts ) ;
163168
164169 // declare a server-named queue
165170 QueueDeclareOk queueDeclareResult = await channel . QueueDeclareAsync ( ) ;
0 commit comments