1111
1212namespace RabbitMQ . Stream . Client ;
1313
14+ public enum ConnectionClosePolicy
15+ {
16+ /// <summary>
17+ /// The connection is closed when the last consumer or producer is removed.
18+ /// </summary>
19+ CloseWhenEmpty ,
20+
21+ /// <summary>
22+ /// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
23+ /// </summary>
24+ CloseWhenEmptyAndIdle
25+ }
26+
27+ public class ConnectionCloseConfig
28+ {
29+ /// <summary>
30+ /// Policy to close the connection.
31+ /// </summary>
32+
33+ public ConnectionClosePolicy Policy { get ; set ; } = ConnectionClosePolicy . CloseWhenEmpty ;
34+
35+ /// <summary>
36+ /// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
37+ /// Idle time is valid only if the policy is CloseWhenEmptyAndIdle.
38+ /// </summary>
39+ public TimeSpan IdleTime { get ; set ; } = TimeSpan . FromMinutes ( 5 ) ;
40+
41+ /// <summary>
42+ /// Interval to check the idle time.
43+ /// Default is high because the check is done in a separate thread.
44+ /// The filed is internal to help the test.
45+ /// </summary>
46+ internal TimeSpan CheckIdleTime { get ; set ; } = TimeSpan . FromSeconds ( 60 ) ;
47+ }
48+
1449public class ConnectionPoolConfig
1550{
1651 /// <summary>
@@ -30,6 +65,11 @@ public class ConnectionPoolConfig
3065 /// but it is not the best for performance.
3166 /// </summary>
3267 public byte ProducersPerConnection { get ; set ; } = 1 ;
68+
69+ /// <summary>
70+ /// Define the connection close policy.
71+ /// </summary>
72+ public ConnectionCloseConfig ConnectionCloseConfig { get ; set ; } = new ConnectionCloseConfig ( ) ;
3373}
3474
3575public class LastSecret
@@ -87,9 +127,10 @@ public bool Available
87127/// subscriptionIds
88128/// publisherIds
89129/// </summary>
90- public class ConnectionsPool
130+ public class ConnectionsPool : IDisposable
91131{
92132 private static readonly object s_lock = new ( ) ;
133+ private bool _isRunning = false ;
93134
94135 internal static byte FindNextValidId ( List < byte > ids , byte nextId = 0 )
95136 {
@@ -127,16 +168,56 @@ internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
127168 private readonly byte _idsPerConnection ;
128169 private readonly SemaphoreSlim _semaphoreSlim = new ( 1 , 1 ) ;
129170 private readonly LastSecret _lastSecret = new ( ) ;
171+ private readonly Task _checkIdleConnectionTimeTask ;
130172
131173 /// <summary>
132174 /// Init the pool with the max connections and the max ids per connection
133175 /// </summary>
134176 /// <param name="maxConnections"> The max connections are allowed for session</param>
135177 /// <param name="idsPerConnection"> The max ids per Connection</param>
136- public ConnectionsPool ( int maxConnections , byte idsPerConnection )
178+ /// <param name="connectionCloseConfig"> Policy to close the connections in the pool</param>
179+ public ConnectionsPool ( int maxConnections , byte idsPerConnection , ConnectionCloseConfig connectionCloseConfig )
137180 {
138181 _maxConnections = maxConnections ;
139182 _idsPerConnection = idsPerConnection ;
183+ ConnectionPoolConfig = connectionCloseConfig ;
184+ _isRunning = true ;
185+ if ( ConnectionPoolConfig . Policy == ConnectionClosePolicy . CloseWhenEmptyAndIdle )
186+ {
187+ _checkIdleConnectionTimeTask = Task . Run ( CheckIdleConnectionTime ) ;
188+ }
189+ }
190+
191+ private ConnectionCloseConfig ConnectionPoolConfig { get ; }
192+
193+ private async Task CheckIdleConnectionTime ( )
194+ {
195+ while ( _isRunning )
196+ {
197+ await Task . Delay ( ConnectionPoolConfig . CheckIdleTime )
198+ . ConfigureAwait ( false ) ;
199+
200+ if ( ! _isRunning )
201+ {
202+ var now = DateTime . UtcNow ;
203+ var connectionItems = Connections . Values . ToList ( ) ;
204+ foreach ( var connectionItem in connectionItems . Where ( connectionItem =>
205+ connectionItem . EntitiesCount == 0 &&
206+ connectionItem . LastUsed . Add ( ConnectionPoolConfig . IdleTime ) < now ) )
207+ {
208+ CloseItemAndConnection ( "Idle connection" , connectionItem ) ;
209+ }
210+ }
211+ else
212+ {
213+ var connectionItems = Connections . Values . ToList ( ) ;
214+ foreach ( var connectionItem in connectionItems . Where (
215+ connectionItem => connectionItem . EntitiesCount == 0 ) )
216+ {
217+ CloseItemAndConnection ( "Idle connection" , connectionItem ) ;
218+ }
219+ }
220+ }
140221 }
141222
142223 /// <summary>
@@ -208,10 +289,7 @@ public bool TryMergeClientParameters(ClientParameters clientParameters, out Clie
208289 return false ;
209290 }
210291
211- cp = clientParameters with
212- {
213- Password = _lastSecret . Secret
214- } ;
292+ cp = clientParameters with { Password = _lastSecret . Secret } ;
215293 return true ;
216294 }
217295
@@ -264,20 +342,31 @@ public void MaybeClose(string clientId, string reason)
264342 return ;
265343 }
266344
267- // close the connection
268- connectionItem . Client . Close ( reason ) ;
345+ connectionItem . LastUsed = DateTime . UtcNow ;
269346
270- // remove the connection from the pool
271- // it means that the connection is closed
272- // we don't care if it is called two times for the same connection
273- Connections . TryRemove ( clientId , out _ ) ;
347+ if ( ConnectionPoolConfig . Policy == ConnectionClosePolicy . CloseWhenEmpty )
348+ {
349+ CloseItemAndConnection ( reason , connectionItem ) ;
350+ }
274351 }
275352 finally
276353 {
277354 _semaphoreSlim . Release ( ) ;
278355 }
279356 }
280357
358+ private void CloseItemAndConnection ( string reason , ConnectionItem connectionItem )
359+ {
360+ // close the connection
361+ connectionItem . Client . Close ( reason ) ;
362+ // remove the connection from the pool
363+ // it means that the connection is closed
364+ // we don't care if it is called two times for the same connection
365+ Connections . TryRemove ( connectionItem . Client . ClientId , out _ ) ;
366+ }
367+
368+ internal int PendingConnections => Connections . Values . Count ( x => x . EntitiesCount > 0 ) ;
369+
281370 /// <summary>
282371 /// Removes the consumer entity from the client.
283372 /// When the metadata update is called we need to remove the consumer entity from the client.
@@ -328,4 +417,33 @@ public void RemoveProducerEntityFromStream(string clientId, byte id, string stre
328417 }
329418
330419 public int ConnectionsCount => Connections . Count ;
420+
421+ public async Task Close ( )
422+ {
423+ // The pool can't be closed if there are pending connections with the policy: CloseWhenEmptyAndIdle
424+ // else there is no way to close the pending connections.
425+ // The user needs to close the pending connections before to close the pool.
426+ // At the moment when the pool is closed the pending connections are not closed with CloseWhenEmpty
427+ // because the pool is not strictly bound to the stream system.
428+ // The StreamSystem doesn't close the connections when it is closed. That was by design
429+ // We could consider (Version 2.0) to close all the Producers and Consumers and their connection when the StreamSystem is closed.
430+ // Other clients like Java and Golang close the connections when the Environment (alias StreamSystem) is closed.
431+ if ( PendingConnections > 0 && ConnectionPoolConfig . Policy == ConnectionClosePolicy . CloseWhenEmptyAndIdle )
432+ {
433+ throw new PendingConnectionsException (
434+ $ "There are { PendingConnections } pending connections. With the policy CloseWhenEmptyAndIdle you need to close them") ;
435+ }
436+
437+ _isRunning = false ;
438+ if ( _checkIdleConnectionTimeTask is not null )
439+ {
440+ await _checkIdleConnectionTimeTask . ConfigureAwait ( false ) ;
441+ }
442+ }
443+
444+ public void Dispose ( )
445+ {
446+ _semaphoreSlim . Dispose ( ) ;
447+ GC . SuppressFinalize ( this ) ;
448+ }
331449}
0 commit comments