File tree Expand file tree Collapse file tree 3 files changed +61
-0
lines changed Expand file tree Collapse file tree 3 files changed +61
-0
lines changed Original file line number Diff line number Diff line change 1+ package queue
2+
3+ func NewPool (size int , opts ... Option ) * Queue {
4+ o := []Option {
5+ WithWorkerCount (size ),
6+ WithWorker (NewConsumer (opts ... )),
7+ }
8+ o = append (
9+ o ,
10+ opts ... ,
11+ )
12+
13+ q , err := NewQueue (o ... )
14+ if err != nil {
15+ panic (err )
16+ }
17+
18+ q .Start ()
19+
20+ return q
21+ }
Original file line number Diff line number Diff line change 1+ package queue
2+
3+ import (
4+ "context"
5+ "testing"
6+ "time"
7+
8+ "github.com/stretchr/testify/assert"
9+ )
10+
11+ func TestNewPoolWithQueueTask (t * testing.T ) {
12+ totalN := 5
13+ taskN := 100
14+ rets := make (chan struct {}, taskN )
15+
16+ p := NewPool (totalN )
17+ time .Sleep (time .Millisecond * 50 )
18+ assert .Equal (t , totalN , p .Workers ())
19+
20+ for i := 0 ; i < taskN ; i ++ {
21+ assert .NoError (t , p .QueueTask (func (context.Context ) error {
22+ rets <- struct {}{}
23+ return nil
24+ }))
25+ }
26+
27+ for i := 0 ; i < taskN ; i ++ {
28+ <- rets
29+ }
30+
31+ // shutdown all, and now running worker is 0
32+ p .Release ()
33+ assert .Equal (t , 0 , p .Workers ())
34+ }
Original file line number Diff line number Diff line change @@ -99,6 +99,12 @@ func (q *Queue) Shutdown() {
9999 })
100100}
101101
102+ // Workers returns the numbers of workers has been created.
103+ func (q * Queue ) Release () {
104+ q .Shutdown ()
105+ q .Wait ()
106+ }
107+
102108// Workers returns the numbers of workers has been created.
103109func (q * Queue ) Workers () int {
104110 return int (atomic .LoadInt32 (& q .runningWorkers ))
You can’t perform that action at this time.
0 commit comments