11package nsq
22
33import (
4+ "context"
5+ "encoding/json"
46 "runtime"
57 "sync"
8+ "sync/atomic"
69 "time"
710
811 "github.com/appleboy/queue"
@@ -15,16 +18,6 @@ var _ queue.Worker = (*Worker)(nil)
1518// Option for queue system
1619type Option func (* Worker )
1720
18- // Job with NSQ message
19- type Job struct {
20- Body []byte
21- }
22-
23- // Bytes get bytes format
24- func (j * Job ) Bytes () []byte {
25- return j .Body
26- }
27-
2821// Worker for NSQ
2922type Worker struct {
3023 q * nsq.Consumer
@@ -36,7 +29,10 @@ type Worker struct {
3629 addr string
3730 topic string
3831 channel string
39- runFunc func (queue.QueuedMessage , <- chan struct {}) error
32+ runFunc func (context.Context , queue.QueuedMessage ) error
33+ logger queue.Logger
34+ stopFlag int32
35+ startFlag int32
4036}
4137
4238// WithAddr setup the addr of NSQ
@@ -61,7 +57,7 @@ func WithChannel(channel string) Option {
6157}
6258
6359// WithRunFunc setup the run func of queue
64- func WithRunFunc (fn func (queue. QueuedMessage , <- chan struct {} ) error ) Option {
60+ func WithRunFunc (fn func (context. Context , queue. QueuedMessage ) error ) Option {
6561 return func (w * Worker ) {
6662 w .runFunc = fn
6763 }
@@ -74,6 +70,13 @@ func WithMaxInFlight(num int) Option {
7470 }
7571}
7672
73+ // WithLogger set custom logger
74+ func WithLogger (l queue.Logger ) Option {
75+ return func (w * Worker ) {
76+ w .logger = l
77+ }
78+ }
79+
7780// NewWorker for struc
7881func NewWorker (opts ... Option ) * Worker {
7982 var err error
@@ -83,7 +86,8 @@ func NewWorker(opts ...Option) *Worker {
8386 channel : "ch" ,
8487 maxInFlight : runtime .NumCPU (),
8588 stop : make (chan struct {}),
86- runFunc : func (queue.QueuedMessage , <- chan struct {}) error {
89+ logger : queue .NewLogger (),
90+ runFunc : func (context.Context , queue.QueuedMessage ) error {
8791 return nil
8892 },
8993 }
@@ -122,33 +126,87 @@ func (s *Worker) AfterRun() error {
122126 if err != nil {
123127 panic ("Could not connect nsq server: " + err .Error ())
124128 }
129+
130+ atomic .CompareAndSwapInt32 (& s .startFlag , 0 , 1 )
125131 })
126132
127133 return nil
128134}
129135
136+ func (s * Worker ) handle (job queue.Job ) error {
137+ // create channel with buffer size 1 to avoid goroutine leak
138+ done := make (chan error , 1 )
139+ panicChan := make (chan interface {}, 1 )
140+ startTime := time .Now ()
141+ ctx , cancel := context .WithTimeout (context .Background (), job .Timeout )
142+ defer cancel ()
143+
144+ // run the job
145+ go func () {
146+ // handle panic issue
147+ defer func () {
148+ if p := recover (); p != nil {
149+ panicChan <- p
150+ }
151+ }()
152+
153+ // run custom process function
154+ done <- s .runFunc (ctx , job )
155+ }()
156+
157+ select {
158+ case p := <- panicChan :
159+ panic (p )
160+ case <- ctx .Done (): // timeout reached
161+ return ctx .Err ()
162+ case <- s .stop : // shutdown service
163+ // cancel job
164+ cancel ()
165+
166+ leftTime := job .Timeout - time .Since (startTime )
167+ // wait job
168+ select {
169+ case <- time .After (leftTime ):
170+ return context .DeadlineExceeded
171+ case err := <- done : // job finish
172+ return err
173+ case p := <- panicChan :
174+ panic (p )
175+ }
176+ case err := <- done : // job finish
177+ return err
178+ }
179+ }
180+
130181// Run start the worker
131182func (s * Worker ) Run () error {
132183 wg := & sync.WaitGroup {}
184+ panicChan := make (chan interface {}, 1 )
133185 s .q .AddHandler (nsq .HandlerFunc (func (msg * nsq.Message ) error {
134186 wg .Add (1 )
135- defer wg .Done ()
187+ defer func () {
188+ wg .Done ()
189+ if p := recover (); p != nil {
190+ panicChan <- p
191+ }
192+ }()
136193 if len (msg .Body ) == 0 {
137194 // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
138195 // In this case, a message with an empty body is simply ignored/discarded.
139196 return nil
140197 }
141198
142- job := & Job {
143- Body : msg .Body ,
144- }
145-
146- // run custom process function
147- return s .runFunc (job , s .stop )
199+ var data queue.Job
200+ _ = json .Unmarshal (msg .Body , & data )
201+ return s .handle (data )
148202 }))
149203
150204 // wait close signal
151- <- s .stop
205+ select {
206+ case <- s .stop :
207+ case err := <- panicChan :
208+ s .logger .Error (err )
209+ }
152210
153211 // wait job completed
154212 wg .Wait ()
@@ -158,9 +216,16 @@ func (s *Worker) Run() error {
158216
159217// Shutdown worker
160218func (s * Worker ) Shutdown () error {
219+ if ! atomic .CompareAndSwapInt32 (& s .stopFlag , 0 , 1 ) {
220+ return queue .ErrQueueShutdown
221+ }
222+
161223 s .stopOnce .Do (func () {
162- s .q .Stop ()
163- s .p .Stop ()
224+ if atomic .LoadInt32 (& s .startFlag ) == 1 {
225+ s .q .Stop ()
226+ s .p .Stop ()
227+ }
228+
164229 close (s .stop )
165230 })
166231 return nil
@@ -178,6 +243,10 @@ func (s *Worker) Usage() int {
178243
179244// Queue send notification to queue
180245func (s * Worker ) Queue (job queue.QueuedMessage ) error {
246+ if atomic .LoadInt32 (& s .stopFlag ) == 1 {
247+ return queue .ErrQueueShutdown
248+ }
249+
181250 err := s .p .Publish (s .topic , job .Bytes ())
182251 if err != nil {
183252 return err
0 commit comments