File tree Expand file tree Collapse file tree 5 files changed +171
-0
lines changed Expand file tree Collapse file tree 5 files changed +171
-0
lines changed Original file line number Diff line number Diff line change 1+ # Example with server and client
2+
3+ Please refer the following steps to build server and client.
4+
5+ ## Build server
6+
7+ ``` sh
8+ go build -o app server/main.go
9+ ```
10+
11+ ## Build client
12+
13+ ``` sh
14+ go build -o agent client/main.go
15+ ```
16+
17+ ## Usage
18+
19+ Run the multiple agent. (open two console in the same terminal)
20+
21+ ``` sh
22+ ./agent
23+ ```
24+
25+ Publish the message.
26+
27+ ``` sh
28+ ./app
29+ ```
Original file line number Diff line number Diff line change 1+ package main
2+
3+ import (
4+ "context"
5+ "encoding/json"
6+ "fmt"
7+ "time"
8+
9+ "github.com/golang-queue/nsq"
10+ "github.com/golang-queue/queue"
11+ )
12+
13+ type job struct {
14+ Message string
15+ }
16+
17+ func (j * job ) Bytes () []byte {
18+ b , err := json .Marshal (j )
19+ if err != nil {
20+ panic (err )
21+ }
22+ return b
23+ }
24+
25+ func main () {
26+ taskN := 10000
27+ rets := make (chan string , taskN )
28+
29+ // define the worker
30+ w := nsq .NewWorker (
31+ nsq .WithAddr ("127.0.0.1:4150" ),
32+ nsq .WithTopic ("example" ),
33+ nsq .WithChannel ("foobar" ),
34+ // concurrent job number
35+ nsq .WithMaxInFlight (10 ),
36+ nsq .WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
37+ var v * job
38+ if err := json .Unmarshal (m .Bytes (), & v ); err != nil {
39+ return err
40+ }
41+ rets <- v .Message
42+ return nil
43+ }),
44+ )
45+
46+ // define the queue
47+ q := queue .NewPool (
48+ 5 ,
49+ queue .WithWorker (w ),
50+ )
51+
52+ // wait until all tasks done
53+ for i := 0 ; i < taskN ; i ++ {
54+ fmt .Println ("message:" , <- rets )
55+ time .Sleep (50 * time .Millisecond )
56+ }
57+
58+ // shutdown the service and notify all the worker
59+ q .Release ()
60+ }
Original file line number Diff line number Diff line change 1+ module example
2+
3+ go 1.16
4+
5+ require (
6+ github.com/golang-queue/nsq v0.0.3-0.20210907001930-28fb526f914f
7+ github.com/golang-queue/queue v0.0.8-0.20210905095503-cc99dff8fdc3
8+ )
Original file line number Diff line number Diff line change 1+ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 =
2+ github.com/davecgh/go-spew v1.1.0 /go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38 =
3+ github.com/golang-queue/nsq v0.0.3-0.20210907001930-28fb526f914f h1:h8Ni0qyAdBWOeOerAq/M/HUDdcqtLuxHABWTdNXKeRY =
4+ github.com/golang-queue/nsq v0.0.3-0.20210907001930-28fb526f914f /go.mod h1:I2JzRTV5lLsgu/R6c4SXjPt8ZhyX/lsV8bhjTZP9yu0 =
5+ github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2 /go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms =
6+ github.com/golang-queue/queue v0.0.8-0.20210905095503-cc99dff8fdc3 h1:ka4/BRgVndDi92gaeOpjnnfuUXrWr1y87x/Go9M3x3Y =
7+ github.com/golang-queue/queue v0.0.8-0.20210905095503-cc99dff8fdc3 /go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms =
8+ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4 =
9+ github.com/golang/snappy v0.0.1 /go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q =
10+ github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk =
11+ github.com/nsqio/go-nsq v1.0.8 /go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY =
12+ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM =
13+ github.com/pmezard/go-difflib v1.0.0 /go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4 =
14+ github.com/stretchr/objx v0.1.0 /go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME =
15+ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY =
16+ github.com/stretchr/testify v1.7.0 /go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg =
17+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 /go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0 =
18+ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo =
19+ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c /go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM =
Original file line number Diff line number Diff line change 1+ package main
2+
3+ import (
4+ "encoding/json"
5+ "fmt"
6+ "log"
7+ "time"
8+
9+ "github.com/golang-queue/nsq"
10+ "github.com/golang-queue/queue"
11+ )
12+
13+ type job struct {
14+ Message string
15+ }
16+
17+ func (j * job ) Bytes () []byte {
18+ b , err := json .Marshal (j )
19+ if err != nil {
20+ panic (err )
21+ }
22+ return b
23+ }
24+
25+ func main () {
26+ taskN := 100
27+
28+ // define the worker
29+ w := nsq .NewWorker (
30+ nsq .WithAddr ("127.0.0.1:4150" ),
31+ nsq .WithTopic ("example" ),
32+ nsq .WithChannel ("foobar" ),
33+ )
34+
35+ // define the queue
36+ q := queue .NewPool (
37+ 0 ,
38+ queue .WithWorker (w ),
39+ )
40+
41+ // assign tasks in queue
42+ for i := 0 ; i < taskN ; i ++ {
43+ go func (i int ) {
44+ if err := q .Queue (& job {
45+ Message : fmt .Sprintf ("handle the job: %d" , i + 1 ),
46+ }); err != nil {
47+ log .Fatal (err )
48+ }
49+ }(i )
50+ }
51+
52+ time .Sleep (1 * time .Second )
53+ // shutdown the service and notify all the worker
54+ q .Release ()
55+ }
You can’t perform that action at this time.
0 commit comments