33using System . Threading . Tasks ;
44using InEngine . Core . Exceptions ;
55using Newtonsoft . Json ;
6- using RedisBoost ;
6+ using StackExchange . Redis ;
77
88namespace InEngine . Core . Queue
99{
@@ -15,16 +15,16 @@ public class Broker
1515 public string SecondaryWaitingQueueName { get { return QueueBaseName + ":SecondaryWaiting" ; } }
1616 public string SecondaryProcessingQueueName { get { return QueueBaseName + ":SecondaryProcessing" ; } }
1717
18- public IRedisClient _redis ;
19- public IRedisClient Redis
18+ public ConnectionMultiplexer _redis ;
19+ public ConnectionMultiplexer Redis
2020 {
2121 get
2222 {
2323 if ( _redis == null )
2424 {
25- var connectionTask = RedisClient . ConnectAsync ( RedisHost , RedisPort , RedisDb ) ;
26- connectionTask . Wait ( ) ;
27- _redis = connectionTask . Result ;
25+ var redisConfig = ConfigurationOptions . Parse ( $ " { RedisHost } : { RedisPort } " ) ;
26+ redisConfig . Password = string . IsNullOrWhiteSpace ( RedisPassword ) ? null : RedisPassword ;
27+ _redis = ConnectionMultiplexer . Connect ( redisConfig ) ;
2828 }
2929 return _redis ;
3030 }
@@ -49,22 +49,26 @@ public static Broker Make()
4949
5050 public void Publish ( ICommand command , bool useSecondaryQueue = false )
5151 {
52- Redis . LPushAsync ( useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName ,
53- new Message ( )
54- {
52+ Redis . GetDatabase ( RedisDb ) . ListLeftPush (
53+ useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName ,
54+ new Message ( ) {
5555 CommandClassName = command . GetType ( ) . FullName ,
5656 CommandAssemblyName = command . GetType ( ) . Assembly . GetName ( ) . Name + ".dll" ,
5757 SerializedCommand = JsonConvert . SerializeObject ( command )
58- } ) ;
58+ } . SerializeToJson ( )
59+ ) ;
5960 }
6061
6162 public bool Consume ( bool useSecondaryQueue = false )
6263 {
6364 var waitingQueueName = useSecondaryQueue ? SecondaryWaitingQueueName : PrimaryWaitingQueueName ;
6465 var processingQueueName = useSecondaryQueue ? SecondaryProcessingQueueName : PrimaryProcessingQueueName ;
65- var stageMessageTask = Redis . RPopLPushAsync ( waitingQueueName , processingQueueName ) ;
66- stageMessageTask . Wait ( ) ;
67- var message = stageMessageTask . Result . As < Message > ( ) ;
66+
67+ var stageMessageTask = Redis . GetDatabase ( RedisDb ) . ListRightPopLeftPush ( waitingQueueName , processingQueueName ) ;
68+ var serializedMessage = stageMessageTask . ToString ( ) ;
69+ if ( serializedMessage == null )
70+ return false ;
71+ var message = serializedMessage . DeserializeFromJson < Message > ( ) ;
6872 if ( message == null )
6973 return false ;
7074
@@ -76,7 +80,7 @@ public bool Consume(bool useSecondaryQueue = false)
7680 try
7781 {
7882 commandInstance . Run ( ) ;
79- Redis . LRemAsync ( processingQueueName , 1 , message ) . Wait ( ) ;
83+ Redis . GetDatabase ( RedisDb ) . ListRemove ( processingQueueName , serializedMessage , 1 ) ;
8084 }
8185 catch ( Exception exception )
8286 {
@@ -88,52 +92,45 @@ public bool Consume(bool useSecondaryQueue = false)
8892 #region Primary Queue Management Methods
8993 public long GetPrimaryWaitingQueueLength ( )
9094 {
91- return WaitAndReturnResult ( Redis . LLenAsync ( PrimaryWaitingQueueName ) ) ;
95+ return Redis . GetDatabase ( RedisDb ) . ListLength ( PrimaryWaitingQueueName ) ;
9296 }
9397
9498 public long GetPrimaryProcessingQueueLength ( )
9599 {
96- return WaitAndReturnResult ( Redis . LLenAsync ( PrimaryProcessingQueueName ) ) ;
100+ return Redis . GetDatabase ( RedisDb ) . ListLength ( PrimaryProcessingQueueName ) ;
97101 }
98102
99- public long ClearPrimaryWaitingQueue ( )
103+ public bool ClearPrimaryWaitingQueue ( )
100104 {
101- return WaitAndReturnResult ( Redis . DelAsync ( SecondaryWaitingQueueName ) ) ;
105+ return Redis . GetDatabase ( RedisDb ) . KeyDelete ( PrimaryWaitingQueueName ) ;
102106 }
103107
104- public long ClearPrimaryProcessingQueue ( )
108+ public bool ClearPrimaryProcessingQueue ( )
105109 {
106- return WaitAndReturnResult ( Redis . DelAsync ( SecondaryProcessingQueueName ) ) ;
110+ return Redis . GetDatabase ( RedisDb ) . KeyDelete ( PrimaryProcessingQueueName ) ;
107111 }
108112 #endregion
109113
110114 #region Secondary Queue Management Methods
111115 public long GetSecondaryWaitingQueueLength ( )
112116 {
113- return WaitAndReturnResult ( Redis . LLenAsync ( PrimaryWaitingQueueName ) ) ;
117+ return Redis . GetDatabase ( RedisDb ) . ListLength ( SecondaryWaitingQueueName ) ;
114118 }
115119
116120 public long GetSecondaryProcessingQueueLength ( )
117121 {
118- return WaitAndReturnResult ( Redis . LLenAsync ( PrimaryProcessingQueueName ) ) ;
122+ return Redis . GetDatabase ( RedisDb ) . ListLength ( SecondaryProcessingQueueName ) ;
119123 }
120124
121- public long ClearSecondaryWaitingQueue ( )
125+ public bool ClearSecondaryWaitingQueue ( )
122126 {
123- return WaitAndReturnResult ( Redis . DelAsync ( SecondaryWaitingQueueName ) ) ;
127+ return Redis . GetDatabase ( RedisDb ) . KeyDelete ( SecondaryWaitingQueueName ) ;
124128 }
125129
126- public long ClearSecondaryProcessingQueue ( )
130+ public bool ClearSecondaryProcessingQueue ( )
127131 {
128- return WaitAndReturnResult ( Redis . DelAsync ( SecondaryProcessingQueueName ) ) ;
132+ return Redis . GetDatabase ( RedisDb ) . KeyDelete ( SecondaryProcessingQueueName ) ;
129133 }
130134 #endregion
131-
132-
133- public long WaitAndReturnResult ( Task < long > task )
134- {
135- task . Wait ( ) ;
136- return task . Result ;
137- }
138135 }
139136}
0 commit comments