@@ -50,14 +50,16 @@ func TestCustomFuncAndWait(t *testing.T) {
5050 q , err := NewQueue (
5151 WithWorker (w ),
5252 WithWorkerCount (2 ),
53+ WithLogger (NewLogger ()),
5354 )
5455 assert .NoError (t , err )
55- q .Start ()
56- time .Sleep (100 * time .Millisecond )
5756 assert .NoError (t , q .Queue (m ))
5857 assert .NoError (t , q .Queue (m ))
5958 assert .NoError (t , q .Queue (m ))
6059 assert .NoError (t , q .Queue (m ))
60+ q .Start ()
61+ time .Sleep (100 * time .Millisecond )
62+ assert .Equal (t , 2 , int (q .metric .BusyWorkers ()))
6163 time .Sleep (600 * time .Millisecond )
6264 q .Shutdown ()
6365 q .Wait ()
@@ -84,26 +86,6 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
8486 q .Wait ()
8587}
8688
87- func TestConsumerNumAfterShutdown (t * testing.T ) {
88- w := NewConsumer ()
89- q , err := NewQueue (
90- WithWorker (w ),
91- WithWorkerCount (2 ),
92- )
93- assert .NoError (t , err )
94- q .Start ()
95- q .Start ()
96- time .Sleep (50 * time .Millisecond )
97- assert .Equal (t , 4 , q .Workers ())
98- q .Shutdown ()
99- q .Wait ()
100- assert .Equal (t , 0 , q .Workers ())
101- // show queue has been shutdown meesgae
102- q .Start ()
103- q .Start ()
104- assert .Equal (t , 0 , q .Workers ())
105- }
106-
10789func TestJobReachTimeout (t * testing.T ) {
10890 m := mockMessage {
10991 message : "foo" ,
@@ -131,12 +113,10 @@ func TestJobReachTimeout(t *testing.T) {
131113 WithWorkerCount (2 ),
132114 )
133115 assert .NoError (t , err )
134- q .Start ()
135- time .Sleep (50 * time .Millisecond )
136116 assert .NoError (t , q .QueueWithTimeout (30 * time .Millisecond , m ))
117+ q .Start ()
137118 time .Sleep (50 * time .Millisecond )
138- q .Shutdown ()
139- q .Wait ()
119+ q .Release ()
140120}
141121
142122func TestCancelJobAfterShutdown (t * testing.T ) {
@@ -167,17 +147,15 @@ func TestCancelJobAfterShutdown(t *testing.T) {
167147 WithWorkerCount (2 ),
168148 )
169149 assert .NoError (t , err )
170- q .Start ()
171- time .Sleep (50 * time .Millisecond )
172150 assert .NoError (t , q .QueueWithTimeout (100 * time .Millisecond , m ))
173- q .Shutdown ()
174- q .Wait ()
151+ assert .NoError (t , q .QueueWithTimeout (100 * time .Millisecond , m ))
152+ q .Start ()
153+ time .Sleep (10 * time .Millisecond )
154+ assert .Equal (t , 2 , int (q .metric .busyWorkers ))
155+ q .Release ()
175156}
176157
177158func TestGoroutineLeak (t * testing.T ) {
178- m := mockMessage {
179- message : "foo" ,
180- }
181159 w := NewConsumer (
182160 WithLogger (NewEmptyLogger ()),
183161 WithFn (func (ctx context.Context , m QueuedMessage ) error {
@@ -200,20 +178,22 @@ func TestGoroutineLeak(t *testing.T) {
200178 }),
201179 )
202180 q , err := NewQueue (
203- WithLogger (NewEmptyLogger ()),
181+ WithLogger (NewLogger ()),
204182 WithWorker (w ),
205183 WithWorkerCount (10 ),
206184 )
207185 assert .NoError (t , err )
208- q .Start ()
209- time .Sleep (50 * time .Millisecond )
210- for i := 0 ; i < 500 ; i ++ {
211- m .message = fmt .Sprintf ("foobar: %d" , i + 1 )
186+ for i := 0 ; i < 400 ; i ++ {
187+ m := mockMessage {
188+ message : fmt .Sprintf ("new message: %d" , i + 1 ),
189+ }
190+
212191 assert .NoError (t , q .Queue (m ))
213192 }
193+
194+ q .Start ()
214195 time .Sleep (2 * time .Second )
215- q .Shutdown ()
216- q .Wait ()
196+ q .Release ()
217197 fmt .Println ("number of goroutines:" , runtime .NumGoroutine ())
218198}
219199
@@ -231,12 +211,10 @@ func TestGoroutinePanic(t *testing.T) {
231211 WithWorkerCount (2 ),
232212 )
233213 assert .NoError (t , err )
234- q .Start ()
235- time .Sleep (50 * time .Millisecond )
236214 assert .NoError (t , q .Queue (m ))
237- time . Sleep ( 50 * time . Millisecond )
238- q . Shutdown ( )
239- q .Wait ()
215+ q . Start ( )
216+ time . Sleep ( 10 * time . Millisecond )
217+ q .Release ()
240218}
241219
242220func TestHandleTimeout (t * testing.T ) {
0 commit comments