@@ -143,3 +143,256 @@ func main() {
143143 }
144144}
145145```
146+
147+ ## Using NSQ as Queue
148+
149+ See the [ NSQ documentation] ( https://github.com/golang-queue/nsq ) .
150+
151+ ``` go
152+ package main
153+
154+ import (
155+ " context"
156+ " encoding/json"
157+ " fmt"
158+ " log"
159+ " time"
160+
161+ " github.com/golang-queue/nsq"
162+ " github.com/golang-queue/queue"
163+ )
164+
165+ type job struct {
166+ Message string
167+ }
168+
169+ func (j *job ) Bytes () []byte {
170+ b , err := json.Marshal (j)
171+ if err != nil {
172+ panic (err)
173+ }
174+ return b
175+ }
176+
177+ func main () {
178+ taskN := 100
179+ rets := make (chan string , taskN)
180+
181+ // define the worker
182+ w := nsq.NewWorker (
183+ nsq.WithAddr (" 127.0.0.1:4150" ),
184+ nsq.WithTopic (" example" ),
185+ nsq.WithChannel (" foobar" ),
186+ // concurrent job number
187+ nsq.WithMaxInFlight (10 ),
188+ nsq.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
189+ v , ok := m.(*job)
190+ if !ok {
191+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
192+ return err
193+ }
194+ }
195+
196+ rets <- v.Message
197+ return nil
198+ }),
199+ )
200+
201+ // define the queue
202+ q , err := queue.NewQueue (
203+ queue.WithWorkerCount (10 ),
204+ queue.WithWorker (w),
205+ )
206+ if err != nil {
207+ log.Fatal (err)
208+ }
209+
210+ // start the five worker
211+ q.Start ()
212+
213+ // assign tasks in queue
214+ for i := 0 ; i < taskN; i++ {
215+ go func (i int ) {
216+ q.Queue (&job{
217+ Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
218+ })
219+ }(i)
220+ }
221+
222+ // wait until all tasks done
223+ for i := 0 ; i < taskN; i++ {
224+ fmt.Println (" message:" , <- rets)
225+ time.Sleep (50 * time.Millisecond )
226+ }
227+
228+ // shutdown the service and notify all the worker
229+ q.Release ()
230+ }
231+ ```
232+
233+ ## Using NATs as Queue
234+
235+ See the [ NATs documentation] ( https://github.com/golang-queue/nats )
236+
237+ ``` go
238+ package main
239+
240+ import (
241+ " context"
242+ " encoding/json"
243+ " fmt"
244+ " log"
245+ " time"
246+
247+ " github.com/golang-queue/nats"
248+ " github.com/golang-queue/queue"
249+ )
250+
251+ type job struct {
252+ Message string
253+ }
254+
255+ func (j *job ) Bytes () []byte {
256+ b , err := json.Marshal (j)
257+ if err != nil {
258+ panic (err)
259+ }
260+ return b
261+ }
262+
263+ func main () {
264+ taskN := 100
265+ rets := make (chan string , taskN)
266+
267+ // define the worker
268+ w := nats.NewWorker (
269+ nats.WithAddr (" 127.0.0.1:4222" ),
270+ nats.WithSubj (" example" ),
271+ nats.WithQueue (" foobar" ),
272+ nats.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
273+ v , ok := m.(*job)
274+ if !ok {
275+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
276+ return err
277+ }
278+ }
279+
280+ rets <- v.Message
281+ return nil
282+ }),
283+ )
284+
285+ // define the queue
286+ q , err := queue.NewQueue (
287+ queue.WithWorkerCount (10 ),
288+ queue.WithWorker (w),
289+ )
290+ if err != nil {
291+ log.Fatal (err)
292+ }
293+
294+ // start the five worker
295+ q.Start ()
296+
297+ // assign tasks in queue
298+ for i := 0 ; i < taskN; i++ {
299+ go func (i int ) {
300+ q.Queue (&job{
301+ Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
302+ })
303+ }(i)
304+ }
305+
306+ // wait until all tasks done
307+ for i := 0 ; i < taskN; i++ {
308+ fmt.Println (" message:" , <- rets)
309+ time.Sleep (50 * time.Millisecond )
310+ }
311+
312+ // shutdown the service and notify all the worker
313+ q.Release ()
314+ }
315+ ```
316+
317+ ## Using Redis(Pub/Sub) as Queue
318+
319+ See the [ redis documentation] ( https://github.com/golang-queue/redisdb )
320+
321+ ``` go
322+ package main
323+
324+ import (
325+ " context"
326+ " encoding/json"
327+ " fmt"
328+ " log"
329+ " time"
330+
331+ " github.com/golang-queue/queue"
332+ " github.com/golang-queue/redisdb"
333+ )
334+
335+ type job struct {
336+ Message string
337+ }
338+
339+ func (j *job ) Bytes () []byte {
340+ b , err := json.Marshal (j)
341+ if err != nil {
342+ panic (err)
343+ }
344+ return b
345+ }
346+
347+ func main () {
348+ taskN := 100
349+ rets := make (chan string , taskN)
350+
351+ // define the worker
352+ w := redisdb.NewWorker (
353+ redisdb.WithAddr (" 127.0.0.1:6379" ),
354+ redisdb.WithChannel (" foobar" ),
355+ redisdb.WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
356+ v , ok := m.(*job)
357+ if !ok {
358+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
359+ return err
360+ }
361+ }
362+
363+ rets <- v.Message
364+ return nil
365+ }),
366+ )
367+
368+ // define the queue
369+ q , err := queue.NewQueue (
370+ queue.WithWorkerCount (10 ),
371+ queue.WithWorker (w),
372+ )
373+ if err != nil {
374+ log.Fatal (err)
375+ }
376+
377+ // start the five worker
378+ q.Start ()
379+
380+ // assign tasks in queue
381+ for i := 0 ; i < taskN; i++ {
382+ go func (i int ) {
383+ q.Queue (&job{
384+ Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
385+ })
386+ }(i)
387+ }
388+
389+ // wait until all tasks done
390+ for i := 0 ; i < taskN; i++ {
391+ fmt.Println (" message:" , <- rets)
392+ time.Sleep (50 * time.Millisecond )
393+ }
394+
395+ // shutdown the service and notify all the worker
396+ q.Release ()
397+ }
398+ ```
0 commit comments