File tree Expand file tree Collapse file tree 2 files changed +24
-15
lines changed Expand file tree Collapse file tree 2 files changed +24
-15
lines changed Original file line number Diff line number Diff line change 11package queue
22
33import (
4+ "encoding/json"
45 "errors"
56 "runtime"
67 "sync"
@@ -37,6 +38,11 @@ func (j Job) Bytes() []byte {
3738 return j .Body
3839}
3940
41+ func (j Job ) Encode () []byte {
42+ b , _ := json .Marshal (j )
43+ return b
44+ }
45+
4046// Option for queue system
4147type Option func (* Queue )
4248
@@ -126,28 +132,29 @@ func (q *Queue) Wait() {
126132 q .routineGroup .Wait ()
127133}
128134
129- // Queue to queue all job
130- func (q * Queue ) Queue (job QueuedMessage ) error {
135+ func (q * Queue ) handleQueue (timeout time.Duration , job QueuedMessage ) error {
131136 if atomic .LoadInt32 (& q .stopFlag ) == 1 {
132137 return ErrQueueShutdown
133138 }
134139
135- return q . worker . Queue ( Job {
136- Timeout : q . timeout ,
140+ data := Job {
141+ Timeout : timeout ,
137142 Body : job .Bytes (),
143+ }
144+
145+ return q .worker .Queue (Job {
146+ Body : data .Encode (),
138147 })
139148}
140149
141150// Queue to queue all job
142- func (q * Queue ) QueueWithTimeout (timeout time.Duration , job QueuedMessage ) error {
143- if atomic .LoadInt32 (& q .stopFlag ) == 1 {
144- return ErrQueueShutdown
145- }
151+ func (q * Queue ) Queue (job QueuedMessage ) error {
152+ return q .handleQueue (q .timeout , job )
153+ }
146154
147- return q .worker .Queue (Job {
148- Timeout : timeout ,
149- Body : job .Bytes (),
150- })
155+ // Queue to queue all job
156+ func (q * Queue ) QueueWithTimeout (timeout time.Duration , job QueuedMessage ) error {
157+ return q .handleQueue (q .timeout , job )
151158}
152159
153160func (q * Queue ) work () {
Original file line number Diff line number Diff line change @@ -2,6 +2,7 @@ package simple
22
33import (
44 "context"
5+ "encoding/json"
56 "errors"
67 "sync"
78 "sync/atomic"
@@ -39,11 +40,10 @@ func (s *Worker) AfterRun() error {
3940 return nil
4041}
4142
42- func (s * Worker ) handle (m interface {} ) error {
43+ func (s * Worker ) handle (job queue. Job ) error {
4344 // create channel with buffer size 1 to avoid goroutine leak
4445 done := make (chan error , 1 )
4546 panicChan := make (chan interface {}, 1 )
46- job , _ := m .(queue.Job )
4747 startTime := time .Now ()
4848 ctx , cancel := context .WithTimeout (context .Background (), job .Timeout )
4949 defer cancel ()
@@ -95,7 +95,9 @@ func (s *Worker) Run() error {
9595 }
9696
9797 for task := range s .taskQueue {
98- if err := s .handle (task ); err != nil {
98+ var data queue.Job
99+ _ = json .Unmarshal (task .Bytes (), & data )
100+ if err := s .handle (data ); err != nil {
99101 s .logger .Error (err .Error ())
100102 }
101103 }
You can’t perform that action at this time.
0 commit comments