@@ -321,3 +321,66 @@ func TestTaskJobComplete(t *testing.T) {
321321 }
322322 assert .Equal (t , context .DeadlineExceeded , w .handle (job ))
323323}
324+
325+ func TestIncreaseWorkerCount (t * testing.T ) {
326+ w := NewConsumer (
327+ WithLogger (NewEmptyLogger ()),
328+ WithFn (func (ctx context.Context , m QueuedMessage ) error {
329+ time .Sleep (500 * time .Millisecond )
330+ return nil
331+ }),
332+ )
333+ q , err := NewQueue (
334+ WithLogger (NewLogger ()),
335+ WithWorker (w ),
336+ WithWorkerCount (5 ),
337+ )
338+ assert .NoError (t , err )
339+
340+ for i := 1 ; i <= 10 ; i ++ {
341+ m := mockMessage {
342+ message : fmt .Sprintf ("new message: %d" , i ),
343+ }
344+ assert .NoError (t , q .Queue (m ))
345+ }
346+
347+ q .Start ()
348+ time .Sleep (100 * time .Millisecond )
349+ assert .Equal (t , 5 , q .BusyWorkers ())
350+ q .UpdateWorkerCount (10 )
351+ time .Sleep (100 * time .Millisecond )
352+ assert .Equal (t , 10 , q .BusyWorkers ())
353+ q .Release ()
354+ }
355+
356+ func TestDecreaseWorkerCount (t * testing.T ) {
357+ w := NewConsumer (
358+ WithFn (func (ctx context.Context , m QueuedMessage ) error {
359+ time .Sleep (100 * time .Millisecond )
360+ return nil
361+ }),
362+ )
363+ q , err := NewQueue (
364+ WithLogger (NewLogger ()),
365+ WithWorker (w ),
366+ WithWorkerCount (5 ),
367+ )
368+ assert .NoError (t , err )
369+
370+ for i := 1 ; i <= 10 ; i ++ {
371+ m := mockMessage {
372+ message : fmt .Sprintf ("test message: %d" , i ),
373+ }
374+ assert .NoError (t , q .Queue (m ))
375+ }
376+
377+ q .Start ()
378+ time .Sleep (20 * time .Millisecond )
379+ assert .Equal (t , 5 , q .BusyWorkers ())
380+ q .UpdateWorkerCount (3 )
381+ time .Sleep (100 * time .Millisecond )
382+ assert .Equal (t , 3 , q .BusyWorkers ())
383+ time .Sleep (100 * time .Millisecond )
384+ assert .Equal (t , 2 , q .BusyWorkers ())
385+ q .Release ()
386+ }
0 commit comments