@@ -10,12 +10,13 @@ namespace InEngine.Core.Queue
1010 public class Broker
1111 {
1212 public string QueueBaseName { get ; set ; } = "InEngine:Queue" ;
13- public string PrimaryWaitingQueueName { get { return QueueBaseName + ":PrimaryWaiting" ; } }
14- public string PrimaryProcessingQueueName { get { return QueueBaseName + ":PrimaryProcessing" ; } }
15- public string SecondaryWaitingQueueName { get { return QueueBaseName + ":SecondaryWaiting" ; } }
16- public string SecondaryProcessingQueueName { get { return QueueBaseName + ":SecondaryProcessing" ; } }
17-
18- private static Lazy < ConnectionMultiplexer > lazyConnection = new Lazy < ConnectionMultiplexer > ( ( ) => {
13+ public string PrimaryWaitingQueueName { get { return QueueBaseName + ":Primary:Waiting" ; } }
14+ public string PrimaryProcessingQueueName { get { return QueueBaseName + ":Primary:Processing" ; } }
15+ public string PrimaryFailedQueueName { get { return QueueBaseName + ":Primary:Failed" ; } }
16+ public string SecondaryWaitingQueueName { get { return QueueBaseName + ":Secondary:Waiting" ; } }
17+ public string SecondaryProcessingQueueName { get { return QueueBaseName + ":Secondary:Processing" ; } }
18+ public string SecondaryFailedQueueName { get { return QueueBaseName + ":Secondary:Failed" ; } }
19+ public static Lazy < ConnectionMultiplexer > lazyConnection = new Lazy < ConnectionMultiplexer > ( ( ) => {
1920 var queueSettings = InEngineSettings . Make ( ) . Queue ;
2021 var redisConfig = ConfigurationOptions . Parse ( $ "{ queueSettings . RedisHost } :{ queueSettings . RedisPort } ") ;
2122 redisConfig . Password = string . IsNullOrWhiteSpace ( queueSettings . RedisPassword ) ?
@@ -24,9 +25,7 @@ public class Broker
2425 redisConfig . AbortOnConnectFail = false ;
2526 return ConnectionMultiplexer . Connect ( redisConfig ) ;
2627 } ) ;
27-
2828 public static ConnectionMultiplexer Connection { get { return lazyConnection . Value ; } }
29-
3029 public ConnectionMultiplexer _connectionMultiplexer ;
3130 public IDatabase Redis
3231 {
@@ -63,6 +62,7 @@ public bool Consume(bool useSecondaryQueue = false)
6362 {
6463 var waitingQueueName = useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName ;
6564 var processingQueueName = useSecondaryQueue ? SecondaryProcessingQueueName : PrimaryProcessingQueueName ;
65+ var failedQueueName = useSecondaryQueue ? SecondaryFailedQueueName : PrimaryFailedQueueName ;
6666
6767 var stageMessageTask = Redis . ListRightPopLeftPush ( waitingQueueName , processingQueueName ) ;
6868 var serializedMessage = stageMessageTask . ToString ( ) ;
@@ -80,12 +80,26 @@ public bool Consume(bool useSecondaryQueue = false)
8080 try
8181 {
8282 commandInstance . Run ( ) ;
83- Redis . ListRemove ( processingQueueName , serializedMessage , 1 ) ;
8483 }
8584 catch ( Exception exception )
8685 {
86+ Redis . ListRemove ( processingQueueName , serializedMessage , 1 ) ;
87+ Redis . ListLeftPush (
88+ failedQueueName ,
89+ commandInstance . SerializeToJson ( )
90+ ) ;
8791 throw new CommandFailedException ( "Consumed command failed." , exception ) ;
8892 }
93+
94+ try
95+ {
96+ Redis . ListRemove ( processingQueueName , serializedMessage , 1 ) ;
97+ }
98+ catch ( Exception exception )
99+ {
100+ throw new CommandFailedException ( $ "Failed to remove completed message from queue: { processingQueueName } ", exception ) ;
101+ }
102+
89103 return true ;
90104 }
91105
0 commit comments