11package nats
22
33import (
4+ "context"
5+ "encoding/json"
46 "sync"
7+ "sync/atomic"
8+ "time"
59
610 "github.com/appleboy/queue"
711
@@ -13,16 +17,6 @@ var _ queue.Worker = (*Worker)(nil)
1317// Option for queue system
1418type Option func (* Worker )
1519
16- // Job with NSQ message
17- type Job struct {
18- Body []byte
19- }
20-
21- // Bytes get bytes format
22- func (j * Job ) Bytes () []byte {
23- return j .Body
24- }
25-
2620// Worker for NSQ
2721type Worker struct {
2822 addr string
@@ -31,7 +25,9 @@ type Worker struct {
3125 client * nats.Conn
3226 stop chan struct {}
3327 stopOnce sync.Once
34- runFunc func (queue.QueuedMessage , <- chan struct {}) error
28+ runFunc func (context.Context , queue.QueuedMessage ) error
29+ logger queue.Logger
30+ stopFlag int32
3531}
3632
3733// WithAddr setup the addr of NATS
@@ -56,12 +52,19 @@ func WithQueue(queue string) Option {
5652}
5753
5854// WithRunFunc setup the run func of queue
59- func WithRunFunc (fn func (queue. QueuedMessage , <- chan struct {} ) error ) Option {
55+ func WithRunFunc (fn func (context. Context , queue. QueuedMessage ) error ) Option {
6056 return func (w * Worker ) {
6157 w .runFunc = fn
6258 }
6359}
6460
61+ // WithLogger set custom logger
62+ func WithLogger (l queue.Logger ) Option {
63+ return func (w * Worker ) {
64+ w .logger = l
65+ }
66+ }
67+
6568// NewWorker for struc
6669func NewWorker (opts ... Option ) * Worker {
6770 var err error
@@ -70,7 +73,7 @@ func NewWorker(opts ...Option) *Worker {
7073 subj : "foobar" ,
7174 queue : "foobar" ,
7275 stop : make (chan struct {}),
73- runFunc : func (queue. QueuedMessage , <- chan struct {} ) error {
76+ runFunc : func (context. Context , queue. QueuedMessage ) error {
7477 return nil
7578 },
7679 }
@@ -99,26 +102,81 @@ func (s *Worker) AfterRun() error {
99102 return nil
100103}
101104
105+ func (s * Worker ) handle (job queue.Job ) error {
106+ // create channel with buffer size 1 to avoid goroutine leak
107+ done := make (chan error , 1 )
108+ panicChan := make (chan interface {}, 1 )
109+ startTime := time .Now ()
110+ ctx , cancel := context .WithTimeout (context .Background (), job .Timeout )
111+ defer cancel ()
112+
113+ // run the job
114+ go func () {
115+ // handle panic issue
116+ defer func () {
117+ if p := recover (); p != nil {
118+ panicChan <- p
119+ }
120+ }()
121+
122+ // run custom process function
123+ done <- s .runFunc (ctx , job )
124+ }()
125+
126+ select {
127+ case p := <- panicChan :
128+ panic (p )
129+ case <- ctx .Done (): // timeout reached
130+ return ctx .Err ()
131+ case <- s .stop : // shutdown service
132+ // cancel job
133+ cancel ()
134+
135+ leftTime := job .Timeout - time .Since (startTime )
136+ // wait job
137+ select {
138+ case <- time .After (leftTime ):
139+ return context .DeadlineExceeded
140+ case err := <- done : // job finish
141+ return err
142+ case p := <- panicChan :
143+ panic (p )
144+ }
145+ case err := <- done : // job finish
146+ return err
147+ }
148+ }
149+
102150// Run start the worker
103151func (s * Worker ) Run () error {
104152 wg := & sync.WaitGroup {}
105-
153+ panicChan := make ( chan interface {}, 1 )
106154 _ , err := s .client .QueueSubscribe (s .subj , s .queue , func (m * nats.Msg ) {
107155 wg .Add (1 )
108- defer wg .Done ()
109- job := & Job {
110- Body : m .Data ,
156+ defer func () {
157+ wg .Done ()
158+ if p := recover (); p != nil {
159+ panicChan <- p
160+ }
161+ }()
162+
163+ var data queue.Job
164+ _ = json .Unmarshal (m .Data , & data )
165+
166+ if err := s .handle (data ); err != nil {
167+ s .logger .Error (err )
111168 }
112-
113- // run custom process function
114- _ = s .runFunc (job , s .stop )
115169 })
116170 if err != nil {
117171 return err
118172 }
119173
120174 // wait close signal
121- <- s .stop
175+ select {
176+ case <- s .stop :
177+ case err := <- panicChan :
178+ s .logger .Error (err )
179+ }
122180
123181 // wait job completed
124182 wg .Wait ()
@@ -128,9 +186,13 @@ func (s *Worker) Run() error {
128186
129187// Shutdown worker
130188func (s * Worker ) Shutdown () error {
189+ if ! atomic .CompareAndSwapInt32 (& s .stopFlag , 0 , 1 ) {
190+ return queue .ErrQueueShutdown
191+ }
192+
131193 s .stopOnce .Do (func () {
132- close (s .stop )
133194 s .client .Close ()
195+ close (s .stop )
134196 })
135197 return nil
136198}
@@ -147,6 +209,10 @@ func (s *Worker) Usage() int {
147209
148210// Queue send notification to queue
149211func (s * Worker ) Queue (job queue.QueuedMessage ) error {
212+ if atomic .LoadInt32 (& s .stopFlag ) == 1 {
213+ return queue .ErrQueueShutdown
214+ }
215+
150216 err := s .client .Publish (s .subj , job .Bytes ())
151217 if err != nil {
152218 return err
0 commit comments