File tree Expand file tree Collapse file tree 4 files changed +39
-32
lines changed Expand file tree Collapse file tree 4 files changed +39
-32
lines changed Original file line number Diff line number Diff line change @@ -119,10 +119,13 @@ func (s *Consumer) Queue(task QueuedMessage) error {
119119
120120func (s * Consumer ) Request () (QueuedMessage , error ) {
121121 select {
122- case task := <- s .taskQueue :
122+ case task , ok := <- s .taskQueue :
123+ if ! ok {
124+ return nil , ErrQueueHasBeenClosed
125+ }
123126 return task , nil
124127 default :
125- return nil , errors . New ( "no task in queue" )
128+ return nil , ErrNoTaskInQueue
126129 }
127130}
128131
Original file line number Diff line number Diff line change @@ -148,11 +148,10 @@ func TestGoroutineLeak(t *testing.T) {
148148 for {
149149 select {
150150 case <- ctx .Done ():
151- log .Println ("get data:" , string (m .Bytes ()))
152151 if errors .Is (ctx .Err (), context .Canceled ) {
153- log .Println ("queue has been shutdown and cancel the job" )
152+ log .Println ("queue has been shutdown and cancel the job: " + string ( m . Bytes ()) )
154153 } else if errors .Is (ctx .Err (), context .DeadlineExceeded ) {
155- log .Println ("job deadline exceeded" )
154+ log .Println ("job deadline exceeded: " + string ( m . Bytes ()) )
156155 }
157156 return nil
158157 default :
Original file line number Diff line number Diff line change 1+ package queue
2+
3+ import "errors"
4+
5+ var (
6+ // ErrNoTaskInQueue there is nothing in the queue
7+ ErrNoTaskInQueue = errors .New ("no task in queue" )
8+ // ErrQueueHasBeenClosed the current queue is closed
9+ ErrQueueHasBeenClosed = errors .New ("queue has been closed" )
10+ )
Original file line number Diff line number Diff line change @@ -246,46 +246,41 @@ func (q *Queue) start() {
246246 tasks := make (chan QueuedMessage , 1 )
247247
248248 for {
249- var task QueuedMessage
250-
251249 // request task from queue in background
252250 q .routineGroup .Run (func () {
253251 for {
254- select {
255- case <- q .quit :
256- return
257- default :
258- t , err := q .worker .Request ()
259- if t == nil || err != nil {
260- if err != nil {
261- select {
262- case <- q .quit :
252+ t , err := q .worker .Request ()
253+ if t == nil || err != nil {
254+ if err != nil {
255+ select {
256+ case <- q .quit :
257+ if ! errors .Is (err , ErrNoTaskInQueue ) {
258+ close (tasks )
263259 return
264- case <- time .After (time .Second ):
265- // sleep 1 second to fetch new task
266260 }
261+ case <- time .After (time .Second ):
262+ // sleep 1 second to fetch new task
267263 }
268264 }
269- if t != nil {
270- tasks <- t
265+ }
266+ if t != nil {
267+ tasks <- t
268+ return
269+ }
270+
271+ select {
272+ case <- q .quit :
273+ if ! errors .Is (err , ErrNoTaskInQueue ) {
274+ close (tasks )
271275 return
272276 }
277+ default :
273278 }
274279 }
275280 })
276281
277- // read task
278- select {
279- case task = <- tasks :
280- case <- q .quit :
281- select {
282- case task = <- tasks :
283- // queue task before shutdown the service
284- if err := q .worker .Queue (task ); err != nil {
285- q .logger .Errorf ("can't re-queue task: %v" , err )
286- }
287- default :
288- }
282+ task , ok := <- tasks
283+ if ! ok {
289284 return
290285 }
291286
You can’t perform that action at this time.
0 commit comments