File tree Expand file tree Collapse file tree 3 files changed +51
-10
lines changed Expand file tree Collapse file tree 3 files changed +51
-10
lines changed Original file line number Diff line number Diff line change @@ -15,25 +15,25 @@ var errMaxCapacity = errors.New("max capacity reached")
1515
1616// Worker for simple queue using channel
1717type Consumer struct {
18- taskQueue chan QueuedMessage
19- runFunc func (context.Context , QueuedMessage ) error
20- stop chan struct {}
21- logger Logger
22- stopOnce sync.Once
23- stopFlag int32
24- busyWorkers uint64
18+ taskQueue chan QueuedMessage
19+ runFunc func (context.Context , QueuedMessage ) error
20+ stop chan struct {}
21+ logger Logger
22+ stopOnce sync.Once
23+ stopFlag int32
24+ metric Metric
2525}
2626
2727func (s * Consumer ) incBusyWorker () {
28- atomic . AddUint64 ( & s . busyWorkers , 1 )
28+ s . metric . IncBusyWorker ( )
2929}
3030
3131func (s * Consumer ) decBusyWorker () {
32- atomic . AddUint64 ( & s . busyWorkers , ^ uint64 ( 0 ) )
32+ s . metric . DecBusyWorker ( )
3333}
3434
3535func (s * Consumer ) BusyWorkers () uint64 {
36- return atomic . LoadUint64 ( & s . busyWorkers )
36+ return s . metric . BusyWorkers ( )
3737}
3838
3939// BeforeRun run script before start worker
@@ -168,6 +168,7 @@ func NewConsumer(opts ...Option) *Consumer {
168168 stop : make (chan struct {}),
169169 logger : o .logger ,
170170 runFunc : o .fn ,
171+ metric : o .metric ,
171172 }
172173
173174 return w
Original file line number Diff line number Diff line change 1+ package queue
2+
3+ import "sync/atomic"
4+
5+ // Metric interface
6+ type Metric interface {
7+ IncBusyWorker ()
8+ DecBusyWorker ()
9+ BusyWorkers () uint64
10+ }
11+
12+ type metric struct {
13+ busyWorkers uint64
14+ }
15+
16+ func newMetric () Metric {
17+ return & metric {}
18+ }
19+
20+ func (m * metric ) IncBusyWorker () {
21+ atomic .AddUint64 (& m .busyWorkers , 1 )
22+ }
23+
24+ func (m * metric ) DecBusyWorker () {
25+ atomic .AddUint64 (& m .busyWorkers , ^ uint64 (0 ))
26+ }
27+
28+ func (m * metric ) BusyWorkers () uint64 {
29+ return atomic .LoadUint64 (& m .busyWorkers )
30+ }
Original file line number Diff line number Diff line change 1212 defaultTimeout = 60 * time .Minute
1313 defaultNewLogger = NewLogger ()
1414 defaultFn = func (context.Context , QueuedMessage ) error { return nil }
15+ defaultMetric = newMetric ()
1516)
1617
1718// Option for queue system
@@ -38,6 +39,13 @@ func WithLogger(l Logger) Option {
3839 }
3940}
4041
42+ // WithMetric set custom Metric
43+ func WithMetric (m Metric ) Option {
44+ return func (q * Options ) {
45+ q .metric = m
46+ }
47+ }
48+
4149// WithWorker set custom worker
4250func WithWorker (w Worker ) Option {
4351 return func (q * Options ) {
@@ -66,6 +74,7 @@ type Options struct {
6674 queueSize int
6775 worker Worker
6876 fn func (context.Context , QueuedMessage ) error
77+ metric Metric
6978}
7079
7180func NewOptions (opts ... Option ) * Options {
@@ -76,6 +85,7 @@ func NewOptions(opts ...Option) *Options {
7685 logger : defaultNewLogger ,
7786 worker : nil ,
7887 fn : defaultFn ,
88+ metric : defaultMetric ,
7989 }
8090
8191 // Loop through each option
You can’t perform that action at this time.
0 commit comments