@@ -428,11 +428,11 @@ private bool TryPerformAutomaticRecovery()
428428 // 2. Recover queues
429429 // 3. Recover bindings
430430 // 4. Recover consumers
431- using ( var recoveryModel = _delegate . CreateModel ( ) )
431+ using ( var recoveryModelFactory = new RecoveryModelFactory ( _delegate ) )
432432 {
433- RecoverExchanges ( recoveryModel ) ;
434- RecoverQueues ( recoveryModel ) ;
435- RecoverBindings ( recoveryModel ) ;
433+ RecoverExchanges ( recoveryModelFactory ) ;
434+ RecoverQueues ( recoveryModelFactory ) ;
435+ RecoverBindings ( recoveryModelFactory ) ;
436436 }
437437 }
438438
@@ -985,7 +985,7 @@ private void PropagateQueueNameChangeToConsumers(string oldName, string newName)
985985 }
986986 }
987987
988- private void RecoverBindings ( IModel model )
988+ private void RecoverBindings ( RecoveryModelFactory recoveryModelFactory )
989989 {
990990 Dictionary < RecordedBinding , byte > recordedBindingsCopy ;
991991 lock ( _recordedEntitiesLock )
@@ -997,13 +997,21 @@ private void RecoverBindings(IModel model)
997997 {
998998 try
999999 {
1000- b . Recover ( model ) ;
1000+ b . Recover ( recoveryModelFactory . RecoveryModel ) ;
10011001 }
10021002 catch ( Exception cause )
10031003 {
1004- string s = string . Format ( "Caught an exception while recovering binding between {0} and {1}: {2}" ,
1005- b . Source , b . Destination , cause . Message ) ;
1006- HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1004+ if ( _factory . TopologyRecoveryExceptionHandler . BindingRecoveryExceptionHandler != null
1005+ && _factory . TopologyRecoveryExceptionHandler . BindingRecoveryExceptionCondition ( b , cause ) )
1006+ {
1007+ _factory . TopologyRecoveryExceptionHandler . BindingRecoveryExceptionHandler ( b , cause ) ;
1008+ }
1009+ else
1010+ {
1011+ string s = string . Format ( "Caught an exception while recovering binding between {0} and {1}: {2}" ,
1012+ b . Source , b . Destination , cause . Message ) ;
1013+ HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1014+ }
10071015 }
10081016 }
10091017 }
@@ -1139,14 +1147,23 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe
11391147 }
11401148 catch ( Exception cause )
11411149 {
1142- string s = string . Format ( "Caught an exception while recovering consumer {0} on queue {1}: {2}" ,
1143- tag , cons . Queue , cause . Message ) ;
1144- HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1150+ if ( channelToUse . IsOpen
1151+ && _factory . TopologyRecoveryExceptionHandler . ConsumerRecoveryExceptionHandler != null
1152+ && _factory . TopologyRecoveryExceptionHandler . ConsumerRecoveryExceptionCondition ( cons , cause ) )
1153+ {
1154+ _factory . TopologyRecoveryExceptionHandler . ConsumerRecoveryExceptionHandler ( cons , cause ) ;
1155+ }
1156+ else
1157+ {
1158+ string s = string . Format ( "Caught an exception while recovering consumer {0} on queue {1}: {2}" ,
1159+ tag , cons . Queue , cause . Message ) ;
1160+ HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1161+ }
11451162 }
11461163 }
11471164 }
11481165
1149- private void RecoverExchanges ( IModel model )
1166+ private void RecoverExchanges ( RecoveryModelFactory recoveryModelFactory )
11501167 {
11511168 Dictionary < string , RecordedExchange > recordedExchangesCopy ;
11521169 lock ( _recordedEntitiesLock )
@@ -1158,13 +1175,21 @@ private void RecoverExchanges(IModel model)
11581175 {
11591176 try
11601177 {
1161- rx . Recover ( model ) ;
1178+ rx . Recover ( recoveryModelFactory . RecoveryModel ) ;
11621179 }
11631180 catch ( Exception cause )
11641181 {
1165- string s = string . Format ( "Caught an exception while recovering exchange {0}: {1}" ,
1166- rx . Name , cause . Message ) ;
1167- HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1182+ if ( _factory . TopologyRecoveryExceptionHandler . ExchangeRecoveryExceptionHandler != null
1183+ && _factory . TopologyRecoveryExceptionHandler . ExchangeRecoveryExceptionCondition ( rx , cause ) )
1184+ {
1185+ _factory . TopologyRecoveryExceptionHandler . ExchangeRecoveryExceptionHandler ( rx , cause ) ;
1186+ }
1187+ else
1188+ {
1189+ string s = string . Format ( "Caught an exception while recovering exchange {0}: {1}" ,
1190+ rx . Name , cause . Message ) ;
1191+ HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1192+ }
11681193 }
11691194 }
11701195 }
@@ -1180,7 +1205,7 @@ private void RecoverModelsAndItsConsumers()
11801205 }
11811206 }
11821207
1183- private void RecoverQueues ( IModel model )
1208+ private void RecoverQueues ( RecoveryModelFactory recoveryModelFactory )
11841209 {
11851210 Dictionary < string , RecordedQueue > recordedQueuesCopy ;
11861211 lock ( _recordedEntitiesLock )
@@ -1195,7 +1220,7 @@ private void RecoverQueues(IModel model)
11951220
11961221 try
11971222 {
1198- rq . Recover ( model ) ;
1223+ rq . Recover ( recoveryModelFactory . RecoveryModel ) ;
11991224 string newName = rq . Name ;
12001225
12011226 if ( ! oldName . Equals ( newName ) )
@@ -1232,9 +1257,17 @@ private void RecoverQueues(IModel model)
12321257 }
12331258 catch ( Exception cause )
12341259 {
1235- string s = string . Format ( "Caught an exception while recovering queue {0}: {1}" ,
1236- oldName , cause . Message ) ;
1237- HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1260+ if ( _factory . TopologyRecoveryExceptionHandler . QueueRecoveryExceptionHandler != null
1261+ && _factory . TopologyRecoveryExceptionHandler . QueueRecoveryExceptionCondition ( rq , cause ) )
1262+ {
1263+ _factory . TopologyRecoveryExceptionHandler . QueueRecoveryExceptionHandler ( rq , cause ) ;
1264+ }
1265+ else
1266+ {
1267+ string s = string . Format ( "Caught an exception while recovering queue {0}: {1}" ,
1268+ oldName , cause . Message ) ;
1269+ HandleTopologyRecoveryException ( new TopologyRecoveryException ( s , cause ) ) ;
1270+ }
12381271 }
12391272 }
12401273 }
@@ -1295,6 +1328,45 @@ private enum RecoveryConnectionState
12951328 Recovering
12961329 }
12971330
1331+ private sealed class RecoveryModelFactory : IDisposable
1332+ {
1333+ private readonly IConnection _connection ;
1334+ private IModel _recoveryModel ;
1335+
1336+ public RecoveryModelFactory ( IConnection connection )
1337+ {
1338+ _connection = connection ;
1339+ }
1340+
1341+ public IModel RecoveryModel
1342+ {
1343+ get
1344+ {
1345+ if ( _recoveryModel == null )
1346+ {
1347+ _recoveryModel = _connection . CreateModel ( ) ;
1348+ }
1349+
1350+ if ( _recoveryModel . IsClosed )
1351+ {
1352+ _recoveryModel . Dispose ( ) ;
1353+ _recoveryModel = _connection . CreateModel ( ) ;
1354+ }
1355+
1356+ return _recoveryModel ;
1357+ }
1358+ }
1359+
1360+ public void Dispose ( )
1361+ {
1362+ if ( _recoveryModel != null )
1363+ {
1364+ _recoveryModel . Close ( ) ;
1365+ _recoveryModel . Dispose ( ) ;
1366+ }
1367+ }
1368+ }
1369+
12981370 private Task _recoveryTask ;
12991371 private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState . Connected ;
13001372
0 commit comments