File tree Expand file tree Collapse file tree 7 files changed +63
-11
lines changed Expand file tree Collapse file tree 7 files changed +63
-11
lines changed Original file line number Diff line number Diff line change @@ -15,12 +15,25 @@ var errMaxCapacity = errors.New("max capacity reached")
1515
1616// Worker for simple queue using channel
1717type Consumer struct {
18- taskQueue chan QueuedMessage
19- runFunc func (context.Context , QueuedMessage ) error
20- stop chan struct {}
21- logger Logger
22- stopOnce sync.Once
23- stopFlag int32
18+ taskQueue chan QueuedMessage
19+ runFunc func (context.Context , QueuedMessage ) error
20+ stop chan struct {}
21+ logger Logger
22+ stopOnce sync.Once
23+ stopFlag int32
24+ busyWorkers uint64
25+ }
26+
27+ func (s * Consumer ) incBusyWorker () {
28+ atomic .AddUint64 (& s .busyWorkers , 1 )
29+ }
30+
31+ func (s * Consumer ) decBusyWorker () {
32+ atomic .AddUint64 (& s .busyWorkers , ^ uint64 (0 ))
33+ }
34+
35+ func (s * Consumer ) BusyWorkers () uint64 {
36+ return atomic .LoadUint64 (& s .busyWorkers )
2437}
2538
2639// BeforeRun run script before start worker
@@ -39,7 +52,11 @@ func (s *Consumer) handle(job Job) error {
3952 panicChan := make (chan interface {}, 1 )
4053 startTime := time .Now ()
4154 ctx , cancel := context .WithTimeout (context .Background (), job .Timeout )
42- defer cancel ()
55+ s .incBusyWorker ()
56+ defer func () {
57+ cancel ()
58+ s .decBusyWorker ()
59+ }()
4360
4461 // run the job
4562 go func () {
Original file line number Diff line number Diff line change @@ -357,3 +357,30 @@ func TestTaskJobComplete(t *testing.T) {
357357 }
358358 assert .Equal (t , context .DeadlineExceeded , w .handle (job ))
359359}
360+
361+ func TestBusyWorkerCount (t * testing.T ) {
362+ job := Job {
363+ Timeout : 200 * time .Millisecond ,
364+ Task : func (ctx context.Context ) error {
365+ time .Sleep (100 * time .Millisecond )
366+ return nil
367+ },
368+ }
369+
370+ w := NewConsumer ()
371+
372+ assert .Equal (t , uint64 (0 ), w .BusyWorkers ())
373+ go func () {
374+ assert .NoError (t , w .handle (job ))
375+ }()
376+ go func () {
377+ assert .NoError (t , w .handle (job ))
378+ }()
379+
380+ time .Sleep (50 * time .Millisecond )
381+ assert .Equal (t , uint64 (2 ), w .BusyWorkers ())
382+ time .Sleep (100 * time .Millisecond )
383+ assert .Equal (t , uint64 (0 ), w .BusyWorkers ())
384+
385+ assert .NoError (t , w .Shutdown ())
386+ }
Original file line number Diff line number Diff line change @@ -29,6 +29,7 @@ func TestNewQueue(t *testing.T) {
2929 assert .NotNil (t , q )
3030
3131 q .Start ()
32+ assert .Equal (t , uint64 (0 ), w .BusyWorkers ())
3233 q .Shutdown ()
3334 q .Wait ()
3435}
@@ -48,6 +49,7 @@ func TestWorkerNum(t *testing.T) {
4849 q .Start ()
4950 time .Sleep (20 * time .Millisecond )
5051 assert .Equal (t , 4 , q .Workers ())
52+ assert .Equal (t , uint64 (0 ), w .BusyWorkers ())
5153 q .Shutdown ()
5254 q .Wait ()
5355}
@@ -203,6 +205,7 @@ func TestQueueTaskJob(t *testing.T) {
203205 return nil
204206 }))
205207 time .Sleep (50 * time .Millisecond )
208+ assert .Equal (t , uint64 (0 ), w .BusyWorkers ())
206209 q .Shutdown ()
207210 assert .Equal (t , ErrQueueShutdown , q .QueueTask (func (ctx context.Context ) error {
208211 return nil
Original file line number Diff line number Diff line change @@ -16,6 +16,8 @@ type Worker interface {
1616 Capacity () int
1717 // Usage is how many message in queue
1818 Usage () int
19+ // BusyWorkers return count of busy worker currently
20+ BusyWorkers () uint64
1921}
2022
2123// QueuedMessage ...
Original file line number Diff line number Diff line change @@ -12,3 +12,4 @@ func (w *emptyWorker) Shutdown() error { return nil }
1212func (w * emptyWorker ) Queue (job QueuedMessage ) error { return nil }
1313func (w * emptyWorker ) Capacity () int { return 0 }
1414func (w * emptyWorker ) Usage () int { return 0 }
15+ func (w * emptyWorker ) BusyWorkers () uint64 { return uint64 (0 ) }
Original file line number Diff line number Diff line change @@ -37,5 +37,6 @@ func (w *messageWorker) Queue(job QueuedMessage) error {
3737 return errors .New ("max capacity reached" )
3838 }
3939}
40- func (w * messageWorker ) Capacity () int { return cap (w .messages ) }
41- func (w * messageWorker ) Usage () int { return len (w .messages ) }
40+ func (w * messageWorker ) Capacity () int { return cap (w .messages ) }
41+ func (w * messageWorker ) Usage () int { return len (w .messages ) }
42+ func (w * messageWorker ) BusyWorkers () uint64 { return uint64 (0 ) }
Original file line number Diff line number Diff line change @@ -38,5 +38,6 @@ func (w *taskWorker) Queue(job QueuedMessage) error {
3838 return errors .New ("max capacity reached" )
3939 }
4040}
41- func (w * taskWorker ) Capacity () int { return cap (w .messages ) }
42- func (w * taskWorker ) Usage () int { return len (w .messages ) }
41+ func (w * taskWorker ) Capacity () int { return cap (w .messages ) }
42+ func (w * taskWorker ) Usage () int { return len (w .messages ) }
43+ func (w * taskWorker ) BusyWorkers () uint64 { return uint64 (0 ) }
You can’t perform that action at this time.
0 commit comments