66using System . Threading ;
77using System . Threading . Tasks ;
88using Microsoft . Extensions . Logging ;
9- using RabbitMQ . Stream . Client . Reconnect ;
109
1110namespace RabbitMQ . Stream . Client . Reliable ;
1211
@@ -50,6 +49,11 @@ public abstract class ReliableBase
5049 private readonly object _lock = new ( ) ;
5150 protected ReliableEntityStatus _status = ReliableEntityStatus . Initialization ;
5251
52+ protected static async Task RandomWait ( )
53+ {
54+ await Task . Delay ( Consts . RandomMid ( ) ) . ConfigureAwait ( false ) ;
55+ }
56+
5357 protected void UpdateStatus ( ReliableEntityStatus status )
5458 {
5559 lock ( _lock )
@@ -112,7 +116,6 @@ private async Task MaybeInit(bool boot)
112116
113117 reconnect = true ;
114118 LogException ( e ) ;
115-
116119 }
117120
118121 if ( reconnect )
@@ -130,6 +133,7 @@ private async Task Init(bool boot)
130133 BaseLogger . LogDebug ( "{Identity} is already closed. The init will be skipped" , ToString ( ) ) ;
131134 return ;
132135 }
136+
133137 // each time that the client is initialized, we need to reset the status
134138 // if we hare here it means that the entity is not open for some reason like:
135139 // first time initialization or reconnect due of a IsAKnownException
@@ -156,7 +160,8 @@ private async Task Init(bool boot)
156160 /// <summary>
157161 /// When the clients receives a meta data update, it doesn't know
158162 /// If the stream exists or not. It just knows that the stream topology has changed.
159- /// the method CheckIfStreamIsAvailable checks if the stream exists.
163+ /// the method CheckIfStreamIsAvailable checks if the stream exists
164+ /// and if the leader is available.
160165 /// </summary>
161166 /// <param name="stream">stream name</param>
162167 /// <param name="system">stream system</param>
@@ -172,7 +177,16 @@ private async Task<bool> CheckIfStreamIsAvailable(string stream, StreamSystem sy
172177 {
173178 exists = await system . StreamExists ( stream ) . ConfigureAwait ( false ) ;
174179 var available = exists ? "available" : "not available" ;
175- await _resourceAvailableReconnectStrategy . WhenConnected ( $ "{ stream } is { available } ")
180+ if ( exists )
181+ {
182+ // It is not enough to check if the stream exists
183+ // we need to check if the stream has the leader
184+ var streamInfo = await system . StreamInfo ( stream ) . ConfigureAwait ( false ) ;
185+ ClientExceptions . CheckLeader ( streamInfo ) ;
186+ available += " and has a valid leader" ;
187+ }
188+
189+ await _resourceAvailableReconnectStrategy . WhenConnected ( $ "{ stream } for { ToString ( ) } is { available } ")
176190 . ConfigureAwait ( false ) ;
177191 break ;
178192 }
@@ -229,7 +243,8 @@ private async Task MaybeReconnect()
229243 }
230244 }
231245
232- private async Task MaybeReconnectPartition ( StreamInfo streamInfo , string info , Func < StreamInfo , Task > reconnectPartitionFunc )
246+ private async Task MaybeReconnectPartition ( StreamInfo streamInfo , string info ,
247+ Func < StreamInfo , Task > reconnectPartitionFunc )
233248 {
234249 var reconnect = await _reconnectStrategy
235250 . WhenDisconnected ( $ "Super Stream partition: { streamInfo . Stream } for { info } ") . ConfigureAwait ( false ) ;
@@ -285,7 +300,8 @@ private void LogException(Exception exception)
285300 /// <param name="system">Stream System</param>
286301 /// <param name="stream">Partition Stream</param>
287302 /// <param name="reconnectPartitionFunc">Function to reconnect the partition</param>
288- internal async Task OnEntityClosed ( StreamSystem system , string stream , Func < StreamInfo , Task > reconnectPartitionFunc )
303+ internal async Task OnEntityClosed ( StreamSystem system , string stream ,
304+ Func < StreamInfo , Task > reconnectPartitionFunc )
289305 {
290306 var streamExists = false ;
291307 await SemaphoreSlim . WaitAsync ( ) . ConfigureAwait ( false ) ;
0 commit comments