@@ -27,125 +27,55 @@ go get github.com/golang-queue/queue@master
2727
2828## Usage
2929
30- The first step to create a new job as ` QueueMessage ` interface:
30+ ### Basic usage of Pool (use Task function)
3131
32- ``` go
33- type job struct {
34- Message string
35- }
36-
37- func (j *job ) Bytes () []byte {
38- b , err := json.Marshal (j)
39- if err != nil {
40- panic (err)
41- }
42- return b
43- }
44- ```
45-
46- The second step to create the new worker, use the buffered channel as an example.
32+ By calling ` QueueTask() ` method, it schedules the task executed by worker (goroutines) in the Pool.
4733
4834``` go
49- // define the worker
50- w := simple.NewWorker (
51- simple.WithQueueNum (taskN),
52- simple.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
53- v , ok := m.(*job)
54- if !ok {
55- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
56- return err
57- }
58- }
59-
60- rets <- v.Message
61- return nil
62- }),
63- )
64- ```
65-
66- or use the [ NSQ] ( https://nsq.io/ ) as backend, see the worker example:
67-
68- ``` go
69- // define the worker
70- w := nsq.NewWorker (
71- nsq.WithAddr (" 127.0.0.1:4150" ),
72- nsq.WithTopic (" example" ),
73- nsq.WithChannel (" foobar" ),
74- // concurrent job number
75- nsq.WithMaxInFlight (10 ),
76- nsq.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
77- v , ok := m.(*job)
78- if !ok {
79- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
80- return err
81- }
82- }
83-
84- rets <- v.Message
85- return nil
86- }),
87- )
88- ```
89-
90- or use the [ NATS] ( https://nats.io/ ) as backend, see the worker example:
35+ package main
9136
92- ``` go
93- w := nats.NewWorker (
94- nats.WithAddr (" 127.0.0.1:4222" ),
95- nats.WithSubj (" example" ),
96- nats.WithQueue (" foobar" ),
97- nats.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
98- v , ok := m.(*job)
99- if !ok {
100- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
101- return err
102- }
103- }
37+ import (
38+ " context"
39+ " fmt"
40+ " time"
10441
105- rets <- v.Message
106- return nil
107- }),
42+ " github.com/golang-queue/queue"
10843)
109- ```
11044
111- The third step to create a queue and initialize multiple workers, receive all job messages:
45+ func main () {
46+ taskN := 100
47+ rets := make (chan string , taskN)
11248
113- ``` go
114- // define the queue
115- q , err := queue.NewQueue (
116- queue.WithWorkerCount (5 ),
117- queue.WithWorker (w),
118- )
119- if err != nil {
120- log.Fatal (err)
121- }
49+ // initial queue pool
50+ q := queue.NewPool (5 )
51+ // shutdown the service and notify all the worker
52+ // wait all jobs are complete.
53+ defer q.Release ()
12254
123- // start the five worker
124- q.Start ()
125-
126- // assign tasks in queue
127- for i := 0 ; i < taskN; i++ {
128- go func (i int ) {
129- q.Queue (&job{
130- Name: " foobar" ,
131- Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
132- })
133- }(i)
134- }
55+ // assign tasks in queue
56+ for i := 0 ; i < taskN; i++ {
57+ go func (i int ) {
58+ if err := q.QueueTask (func (ctx context.Context ) error {
59+ rets <- fmt.Sprintf (" Hi Gopher, handle the job: %02d " , +i)
60+ return nil
61+ }); err != nil {
62+ panic (err)
63+ }
64+ }(i)
65+ }
13566
136- // wait until all tasks done
137- for i := 0 ; i < taskN; i++ {
138- fmt.Println (" message:" , <- rets)
139- time.Sleep (50 * time.Millisecond )
67+ // wait until all tasks done
68+ for i := 0 ; i < taskN; i++ {
69+ fmt.Println (" message:" , <- rets)
70+ time.Sleep (20 * time.Millisecond )
71+ }
14072}
141-
142- // shutdown the service and notify all the worker
143- q.Shutdown ()
144- // wait all jobs are complete.
145- q.Wait ()
14673```
14774
148- Full example code as below or [ try it in playground] ( https://play.golang.org/p/77PtkZRaPE- ) .
75+ ### Basic usage of Pool (use message queue)
76+
77+ Define the new message struct and implement the ` Bytes() ` func to encode message. Give the ` WithFn ` func
78+ to handle the message from Queue.
14979
15080``` go
15181package main
@@ -158,7 +88,6 @@ import (
15888 " time"
15989
16090 " github.com/golang-queue/queue"
161- " github.com/golang-queue/queue/simple"
16291)
16392
16493type job struct {
@@ -178,41 +107,31 @@ func main() {
178107 taskN := 100
179108 rets := make (chan string , taskN)
180109
181- // define the worker
182- w := simple.NewWorker (
183- simple.WithQueueNum (taskN),
184- simple.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
185- v , ok := m.(*job)
186- if !ok {
187- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
188- return err
189- }
110+ // initial queue pool
111+ q := queue.NewPool (5 , queue.WithFn (func (ctx context.Context , m queue.QueuedMessage ) error {
112+ v , ok := m.(*job)
113+ if !ok {
114+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
115+ return err
190116 }
117+ }
191118
192- rets <- " Hi, " + v.Name + " , " + v.Message
193- return nil
194- }),
195- )
196-
197- // define the queue
198- q , err := queue.NewQueue (
199- queue.WithWorkerCount (5 ),
200- queue.WithWorker (w),
201- )
202- if err != nil {
203- log.Fatal (err)
204- }
205-
206- // start the five worker
207- q.Start ()
119+ rets <- " Hi, " + v.Name + " , " + v.Message
120+ return nil
121+ }))
122+ // shutdown the service and notify all the worker
123+ // wait all jobs are complete.
124+ defer q.Release ()
208125
209126 // assign tasks in queue
210127 for i := 0 ; i < taskN; i++ {
211128 go func (i int ) {
212- q.Queue (&job{
213- Name: " foobar " ,
129+ if err := q.Queue (&job{
130+ Name: " Gopher " ,
214131 Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
215- })
132+ }); err != nil {
133+ log.Println (err)
134+ }
216135 }(i)
217136 }
218137
@@ -221,10 +140,5 @@ func main() {
221140 fmt.Println (" message:" , <- rets)
222141 time.Sleep (50 * time.Millisecond )
223142 }
224-
225- // shutdown the service and notify all the worker
226- q.Shutdown ()
227- // wait all jobs are complete.
228- q.Wait ()
229143}
230144```
0 commit comments