From 71baee5d571e4ba6fb69bca63606deb1f1e58565 Mon Sep 17 00:00:00 2001 From: Dragos Daian Date: Thu, 27 Feb 2025 22:58:36 +0200 Subject: [PATCH] notify mechanism fix for new task added --- queue.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/queue.go b/queue.go index a502821..a1f008e 100644 --- a/queue.go +++ b/queue.go @@ -26,6 +26,7 @@ type ( routineGroup *routineGroup quit chan struct{} ready chan struct{} + newTaskAdded chan struct{} worker core.Worker stopOnce sync.Once stopFlag int32 @@ -43,6 +44,7 @@ func NewQueue(opts ...Option) (*Queue, error) { routineGroup: newRoutineGroup(), quit: make(chan struct{}), ready: make(chan struct{}, 1), + newTaskAdded: make(chan struct{}), workerCount: o.workerCount, logger: o.logger, worker: o.worker, @@ -147,6 +149,7 @@ func (q *Queue) queue(m *job.Message) error { } q.metric.IncSubmittedTask() + q.newTaskAdded <- struct{}{} return nil } @@ -320,8 +323,8 @@ func (q *Queue) start() { close(tasks) return } - case <-time.After(time.Second): - // sleep 1 second to fetch new task + case <-q.newTaskAdded: + // New task added } } }