@@ -81,7 +81,9 @@ func NewQueue(opts ...Option) (*Queue, error) {
8181
8282// Start to enable all worker
8383func (q * Queue ) Start () {
84- go q .start ()
84+ q .routineGroup .Run (func () {
85+ q .start ()
86+ })
8587}
8688
8789// Shutdown stops all queues.
@@ -220,6 +222,7 @@ func (q *Queue) work(task QueuedMessage) {
220222 }
221223}
222224
225+ // UpdateWorkerCount to update worker number dynamically.
223226func (q * Queue ) UpdateWorkerCount (num int ) {
224227 q .workerCount = num
225228 q .schedule ()
@@ -244,13 +247,9 @@ func (q *Queue) start() {
244247
245248 for {
246249 var task QueuedMessage
247- if atomic .LoadInt32 (& q .stopFlag ) == 1 {
248- return
249- }
250250
251251 // request task from queue in background
252252 q .routineGroup .Run (func () {
253- loop:
254253 for {
255254 select {
256255 case <- q .quit :
@@ -261,15 +260,15 @@ func (q *Queue) start() {
261260 if err != nil {
262261 select {
263262 case <- q .quit :
264- break loop
263+ return
265264 case <- time .After (time .Second ):
266265 // sleep 1 second to fetch new task
267266 }
268267 }
269268 }
270269 if t != nil {
271270 tasks <- t
272- break loop
271+ return
273272 }
274273 }
275274 }
@@ -292,24 +291,8 @@ func (q *Queue) start() {
292291
293292 // check worker number
294293 q .schedule ()
295-
296- // get worker to execute new task
297- select {
298- case <- q .quit :
299- if err := q .worker .Queue (task ); err != nil {
300- q .logger .Errorf ("can't re-queue task: %v" , err )
301- }
302- return
303- case <- q .ready :
304- select {
305- case <- q .quit :
306- if err := q .worker .Queue (task ); err != nil {
307- q .logger .Errorf ("can't re-queue task: %v" , err )
308- }
309- return
310- default :
311- }
312- }
294+ // wait worker ready
295+ <- q .ready
313296
314297 // start new task
315298 q .metric .IncBusyWorker ()
0 commit comments