@@ -39,10 +39,10 @@ internal sealed class MultiServerCluster : Cluster
3939 private readonly CancellationTokenSource _monitorServersCancellationTokenSource ;
4040 private volatile ElectionInfo _maxElectionInfo ;
4141 private volatile string _replicaSetName ;
42- private readonly AsyncQueue < ServerDescriptionChangedEventArgs > _serverDescriptionChangedQueue ;
4342 private readonly List < IClusterableServer > _servers ;
4443 private readonly object _serversLock = new object ( ) ;
4544 private readonly InterlockedInt32 _state ;
45+ private readonly object _updateClusterDescriptionLock = new object ( ) ;
4646
4747 private readonly Action < ClusterClosingEvent > _closingEventHandler ;
4848 private readonly Action < ClusterClosedEvent > _closedEventHandler ;
@@ -69,7 +69,6 @@ public MultiServerCluster(ClusterSettings settings, IClusterableServerFactory se
6969 }
7070
7171 _monitorServersCancellationTokenSource = new CancellationTokenSource ( ) ;
72- _serverDescriptionChangedQueue = new AsyncQueue < ServerDescriptionChangedEventArgs > ( ) ;
7372 _servers = new List < IClusterableServer > ( ) ;
7473 _state = new InterlockedInt32 ( State . Initial ) ;
7574 _replicaSetName = settings . ReplicaSetName ;
@@ -132,24 +131,27 @@ public override void Initialize()
132131 }
133132
134133 var stopwatch = Stopwatch . StartNew ( ) ;
135- MonitorServersAsync ( ) . ConfigureAwait ( false ) ;
136- // We lock here even though AddServer locks. Monitors
137- // are re-entrant such that this won't cause problems,
138- // but could prevent issues of conflicting reports
139- // from servers that are quick to respond.
140- var clusterDescription = Description . WithType ( Settings . ConnectionMode . ToClusterType ( ) ) ;
134+
141135 var newServers = new List < IClusterableServer > ( ) ;
142- lock ( _serversLock )
136+ lock ( _updateClusterDescriptionLock )
143137 {
144- foreach ( var endPoint in Settings . EndPoints )
138+ // We lock here even though AddServer locks. Monitors
139+ // are re-entrant such that this won't cause problems,
140+ // but could prevent issues of conflicting reports
141+ // from servers that are quick to respond.
142+ var clusterDescription = Description . WithType ( Settings . ConnectionMode . ToClusterType ( ) ) ;
143+ lock ( _serversLock )
145144 {
146- clusterDescription = EnsureServer ( clusterDescription , endPoint , newServers ) ;
145+ foreach ( var endPoint in Settings . EndPoints )
146+ {
147+ clusterDescription = EnsureServer ( clusterDescription , endPoint , newServers ) ;
148+ }
147149 }
148- }
149150
150- stopwatch . Stop ( ) ;
151+ stopwatch . Stop ( ) ;
151152
152- UpdateClusterDescription ( clusterDescription ) ;
153+ UpdateClusterDescription ( clusterDescription ) ;
154+ }
153155
154156 foreach ( var server in newServers )
155157 {
@@ -217,95 +219,84 @@ protected override void RequestHeartbeat()
217219 }
218220 }
219221
220- private async Task MonitorServersAsync ( )
222+ private void ServerDescriptionChangedHandler ( object sender , ServerDescriptionChangedEventArgs args )
221223 {
222- var monitorServersCancellationToken = _monitorServersCancellationTokenSource . Token ;
223- while ( ! monitorServersCancellationToken . IsCancellationRequested )
224+ try
224225 {
225- try
226- {
227- var eventArgs = await _serverDescriptionChangedQueue . DequeueAsync ( monitorServersCancellationToken ) . ConfigureAwait ( false ) ; // TODO: add timeout and cancellationToken to DequeueAsync
228- ProcessServerDescriptionChanged ( eventArgs ) ;
229- }
230- catch ( OperationCanceledException ) when ( monitorServersCancellationToken . IsCancellationRequested )
231- {
232- // ignore OperationCanceledException when monitor servers cancellation is requested
233- }
234- catch ( Exception unexpectedException )
226+ ProcessServerDescriptionChanged ( args ) ;
227+ }
228+ catch ( Exception unexpectedException )
229+ {
230+ // if we catch an exception here it's because of a bug in the driver
231+ var handler = _sdamInformationEventHandler ;
232+ if ( handler != null )
235233 {
236- // if we catch an exception here it's because of a bug in the driver
237-
238- var handler = _sdamInformationEventHandler ;
239- if ( handler != null )
234+ try
240235 {
241- try
242- {
243- handler . Invoke ( new SdamInformationEvent ( ( ) =>
244- string . Format (
245- "Unexpected exception in MultiServerCluster.MonitorServersAsync: {0}" ,
246- unexpectedException . ToString ( ) ) ) ) ;
247- }
248- catch
249- {
250- // ignore any exceptions thrown by the handler (note: event handlers aren't supposed to throw exceptions)
251- }
236+ handler . Invoke ( new SdamInformationEvent ( ( ) =>
237+ string . Format (
238+ "Unexpected exception in MultiServerCluster.ServerDescriptionChangedHandler: {0}" ,
239+ unexpectedException . ToString ( ) ) ) ) ;
240+ }
241+ catch
242+ {
243+ // ignore any exceptions thrown by the handler (note: event handlers aren't supposed to throw exceptions)
252244 }
253-
254- // TODO: should we reset the cluster state in some way? (the state is undefined since an unexpected exception was thrown)
255245 }
246+ // TODO: should we reset the cluster state in some way? (the state is undefined since an unexpected exception was thrown)
256247 }
257248 }
258249
259- private void ServerDescriptionChangedHandler ( object sender , ServerDescriptionChangedEventArgs args )
260- {
261- _serverDescriptionChangedQueue . Enqueue ( args ) ;
262- }
263-
264250 private void ProcessServerDescriptionChanged ( ServerDescriptionChangedEventArgs args )
265251 {
266- var newServerDescription = args . NewServerDescription ;
267- var newClusterDescription = Description ;
268-
269- if ( ! _servers . Any ( x => EndPointHelper . Equals ( x . EndPoint , newServerDescription . EndPoint ) ) )
270- {
271- return ;
272- }
273-
274252 var newServers = new List < IClusterableServer > ( ) ;
275- if ( newServerDescription . State == ServerState . Disconnected )
253+ lock ( _updateClusterDescriptionLock )
276254 {
277- newClusterDescription = newClusterDescription . WithServerDescription ( newServerDescription ) ;
278- }
279- else
280- {
281- if ( IsServerValidForCluster ( newClusterDescription . Type , Settings . ConnectionMode , newServerDescription . Type ) )
255+ var newServerDescription = args . NewServerDescription ;
256+ var newClusterDescription = Description ;
257+
258+ if ( ! _servers . Any ( x => EndPointHelper . Equals ( x . EndPoint , newServerDescription . EndPoint ) ) )
282259 {
283- if ( newClusterDescription . Type == ClusterType . Unknown )
284- {
285- newClusterDescription = newClusterDescription . WithType ( newServerDescription . Type . ToClusterType ( ) ) ;
286- }
260+ return ;
261+ }
287262
288- switch ( newClusterDescription . Type )
263+ if ( newServerDescription . State == ServerState . Disconnected )
264+ {
265+ newClusterDescription = newClusterDescription . WithServerDescription ( newServerDescription ) ;
266+ }
267+ else
268+ {
269+ if ( IsServerValidForCluster ( newClusterDescription . Type , Settings . ConnectionMode , newServerDescription . Type ) )
289270 {
290- case ClusterType . ReplicaSet :
291- newClusterDescription = ProcessReplicaSetChange ( newClusterDescription , args , newServers ) ;
292- break ;
271+ if ( newClusterDescription . Type == ClusterType . Unknown )
272+ {
273+ newClusterDescription = newClusterDescription . WithType ( newServerDescription . Type . ToClusterType ( ) ) ;
274+ }
275+
276+ switch ( newClusterDescription . Type )
277+ {
293278
294- case ClusterType . Sharded :
295- newClusterDescription = ProcessShardedChange ( newClusterDescription , args ) ;
296- break ;
297279
298- default :
299- throw new MongoInternalException ( "Unexpected cluster type." ) ;
280+ case ClusterType . ReplicaSet :
281+ newClusterDescription = ProcessReplicaSetChange ( newClusterDescription , args , newServers ) ;
282+ break ;
283+
284+ case ClusterType . Sharded :
285+ newClusterDescription = ProcessShardedChange ( newClusterDescription , args ) ;
286+ break ;
287+
288+ default :
289+ throw new MongoInternalException ( "Unexpected cluster type." ) ;
290+ }
291+ }
292+ else
293+ {
294+ newClusterDescription = newClusterDescription . WithoutServerDescription ( newServerDescription . EndPoint ) ;
300295 }
301296 }
302- else
303- {
304- newClusterDescription = newClusterDescription . WithoutServerDescription ( newServerDescription . EndPoint ) ;
305- }
306- }
307297
308- UpdateClusterDescription ( newClusterDescription ) ;
298+ UpdateClusterDescription ( newClusterDescription ) ;
299+ }
309300
310301 foreach ( var server in newServers )
311302 {
0 commit comments