@@ -15,105 +15,40 @@ import (
1515
1616var _ queue.Worker = (* Worker )(nil )
1717
18- // Option for queue system
19- type Option func (* Worker )
20-
2118// Worker for NSQ
2219type Worker struct {
23- q * nsq.Consumer
24- p * nsq.Producer
25- startOnce sync.Once
26- stopOnce sync.Once
27- stop chan struct {}
28- maxInFlight int
29- addr string
30- topic string
31- channel string
32- runFunc func (context.Context , queue.QueuedMessage ) error
33- logger queue.Logger
34- stopFlag int32
35- startFlag int32
36- metric queue.Metric
37- disable bool
20+ q * nsq.Consumer
21+ p * nsq.Producer
22+ startOnce sync.Once
23+ stopOnce sync.Once
24+ stop chan struct {}
25+ stopFlag int32
26+ startFlag int32
27+ opts options
3828}
3929
4030func (w * Worker ) incBusyWorker () {
41- w .metric .IncBusyWorker ()
31+ w .opts . metric .IncBusyWorker ()
4232}
4333
4434func (w * Worker ) decBusyWorker () {
45- w .metric .DecBusyWorker ()
35+ w .opts . metric .DecBusyWorker ()
4636}
4737
4838// BusyWorkers return count of busy workers currently.
4939func (w * Worker ) BusyWorkers () uint64 {
50- return w .metric .BusyWorkers ()
51- }
52-
53- // WithAddr setup the addr of NSQ
54- func WithAddr (addr string ) Option {
55- return func (w * Worker ) {
56- w .addr = addr
57- }
58- }
59-
60- // WithTopic setup the topic of NSQ
61- func WithTopic (topic string ) Option {
62- return func (w * Worker ) {
63- w .topic = topic
64- }
65- }
66-
67- // WithChannel setup the channel of NSQ
68- func WithChannel (channel string ) Option {
69- return func (w * Worker ) {
70- w .channel = channel
71- }
72- }
73-
74- // WithRunFunc setup the run func of queue
75- func WithRunFunc (fn func (context.Context , queue.QueuedMessage ) error ) Option {
76- return func (w * Worker ) {
77- w .runFunc = fn
78- }
79- }
80-
81- // WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob)
82- func WithMaxInFlight (num int ) Option {
83- return func (w * Worker ) {
84- w .maxInFlight = num
85- }
86- }
87-
88- // WithLogger set custom logger
89- func WithLogger (l queue.Logger ) Option {
90- return func (w * Worker ) {
91- w .logger = l
92- }
93- }
94-
95- // WithMetric set custom Metric
96- func WithMetric (m queue.Metric ) Option {
97- return func (w * Worker ) {
98- w .metric = m
99- }
100- }
101-
102- func withDisable () Option {
103- return func (w * Worker ) {
104- w .disable = true
105- }
40+ return w .opts .metric .BusyWorkers ()
10641}
10742
10843// NewWorker for struc
10944func NewWorker (opts ... Option ) * Worker {
110- w := & Worker {
45+ defaultOpts := options {
11146 addr : "127.0.0.1:4150" ,
11247 topic : "gorush" ,
11348 channel : "ch" ,
11449 maxInFlight : runtime .NumCPU (),
115- stop : make ( chan struct {}),
116- logger : queue .NewLogger (),
50+
51+ logger : queue .NewLogger (),
11752 runFunc : func (context.Context , queue.QueuedMessage ) error {
11853 return nil
11954 },
@@ -123,7 +58,12 @@ func NewWorker(opts ...Option) *Worker {
12358 // Loop through each option
12459 for _ , opt := range opts {
12560 // Call the option giving the instantiated
126- opt (w )
61+ opt (& defaultOpts )
62+ }
63+
64+ w := & Worker {
65+ opts : defaultOpts ,
66+ stop : make (chan struct {}),
12767 }
12868
12969 w .startProducerAndConsumer ()
@@ -132,19 +72,19 @@ func NewWorker(opts ...Option) *Worker {
13272}
13373
13474func (w * Worker ) startProducerAndConsumer () {
135- if w .disable {
75+ if w .opts . disable {
13676 return
13777 }
13878
13979 var err error
14080 cfg := nsq .NewConfig ()
141- cfg .MaxInFlight = w .maxInFlight
142- w .q , err = nsq .NewConsumer (w .topic , w .channel , cfg )
81+ cfg .MaxInFlight = w .opts . maxInFlight
82+ w .q , err = nsq .NewConsumer (w .opts . topic , w . opts .channel , cfg )
14383 if err != nil {
14484 panic (err )
14585 }
14686
147- w .p , err = nsq .NewProducer (w .addr , cfg )
87+ w .p , err = nsq .NewProducer (w .opts . addr , cfg )
14888 if err != nil {
14989 panic (err )
15090 }
@@ -159,7 +99,7 @@ func (w *Worker) BeforeRun() error {
15999func (w * Worker ) AfterRun () error {
160100 w .startOnce .Do (func () {
161101 time .Sleep (100 * time .Millisecond )
162- err := w .q .ConnectToNSQD (w .addr )
102+ err := w .q .ConnectToNSQD (w .opts . addr )
163103 if err != nil {
164104 panic ("Could not connect nsq server: " + err .Error ())
165105 }
@@ -192,7 +132,7 @@ func (w *Worker) handle(job queue.Job) error {
192132 }()
193133
194134 // run custom process function
195- done <- w .runFunc (ctx , job )
135+ done <- w .opts . runFunc (ctx , job )
196136 }()
197137
198138 select {
@@ -253,7 +193,7 @@ func (w *Worker) Run() error {
253193 select {
254194 case <- w .stop :
255195 case err := <- panicChan :
256- w .logger .Error (err )
196+ w .opts . logger .Error (err )
257197 }
258198
259199 // wait job completed
@@ -297,7 +237,7 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
297237 return queue .ErrQueueShutdown
298238 }
299239
300- err := w .p .Publish (w .topic , job .Bytes ())
240+ err := w .p .Publish (w .opts . topic , job .Bytes ())
301241 if err != nil {
302242 return err
303243 }
0 commit comments