11package queue
22
3+ // Package queue provides a high-performance, extensible message queue implementation
4+ // supporting multiple workers, job retries, dynamic scaling, and graceful shutdown.
5+
36import (
47 "context"
58 "errors"
@@ -13,45 +16,52 @@ import (
1316 "github.com/jpillora/backoff"
1417)
1518
16- // ErrQueueShutdown the queue is released and closed.
19+ /*
20+ ErrQueueShutdown is returned when an operation is attempted on a queue
21+ that has already been closed and released.
22+ */
1723var ErrQueueShutdown = errors .New ("queue has been closed and released" )
1824
1925type (
20- // A Queue is a message queue.
26+ // Queue represents a message queue with worker management, job scheduling,
27+ // retry logic, and graceful shutdown capabilities.
2128 Queue struct {
22- sync.Mutex
23- metric * metric
24- logger Logger
25- workerCount int64
26- routineGroup * routineGroup
27- quit chan struct {}
28- ready chan struct {}
29- notify chan struct {}
30- worker core.Worker
31- stopOnce sync.Once
32- stopFlag int32
33- afterFn func ()
34- retryInterval time.Duration
29+ sync.Mutex // Mutex to protect concurrent access to queue state
30+ metric * metric // Metrics collector for tracking queue and worker stats
31+ logger Logger // Logger for queue events and errors
32+ workerCount int64 // Number of worker goroutines to process jobs
33+ routineGroup * routineGroup // Group to manage and wait for goroutines
34+ quit chan struct {} // Channel to signal shutdown to all goroutines
35+ ready chan struct {} // Channel to signal worker readiness
36+ notify chan struct {} // Channel to notify workers of new jobs
37+ worker core.Worker // The worker implementation that processes jobs
38+ stopOnce sync.Once // Ensures shutdown is only performed once
39+ stopFlag int32 // Atomic flag indicating if shutdown has started
40+ afterFn func () // Optional callback after each job execution
41+ retryInterval time.Duration // Interval for retrying job requests
3542 }
3643)
3744
38- // ErrMissingWorker missing define worker
45+ /*
46+ ErrMissingWorker is returned when a queue is created without a worker implementation.
47+ */
3948var ErrMissingWorker = errors .New ("missing worker module" )
4049
41- // NewQueue returns a Queue.
50+ // NewQueue creates and returns a new Queue instance with the provided options.
51+ // Returns an error if no worker is specified.
4252func NewQueue (opts ... Option ) (* Queue , error ) {
4353 o := NewOptions (opts ... )
4454 q := & Queue {
45- routineGroup : newRoutineGroup (),
46- quit : make (chan struct {}),
47- ready : make (chan struct {}, 1 ),
48- notify : make (chan struct {}, 1 ),
49- workerCount : o .workerCount ,
50- logger : o .logger ,
51- worker : o .worker ,
52- metric : & metric {},
53- afterFn : o .afterFn ,
54- retryInterval : o .retryInterval ,
55+ routineGroup : newRoutineGroup (), // Manages all goroutines spawned by the queue
56+ quit : make (chan struct {}), // Signals shutdown to all goroutines
57+ ready : make (chan struct {}, 1 ), // Signals when a worker is ready to process a job
58+ notify : make (chan struct {}, 1 ), // Notifies workers of new jobs
59+ workerCount : o .workerCount , // Number of worker goroutines
60+ logger : o .logger , // Logger for queue events
61+ worker : o .worker , // Worker implementation
62+ metric : & metric {}, // Metrics collector
63+ afterFn : o .afterFn , // Optional post-job callback
64+ retryInterval : o .retryInterval , // Interval for retrying job requests
5565 }
5666
5767 if q .worker == nil {
@@ -61,7 +71,8 @@ func NewQueue(opts ...Option) (*Queue, error) {
6171 return q , nil
6272}
6373
64- // Start to enable all worker
74+ // Start launches all worker goroutines and begins processing jobs.
75+ // If workerCount is zero, Start is a no-op.
6576func (q * Queue ) Start () {
6677 q .Lock ()
6778 count := q .workerCount
@@ -74,7 +85,9 @@ func (q *Queue) Start() {
7485 })
7586}
7687
77- // Shutdown stops all queues.
88+ // Shutdown initiates a graceful shutdown of the queue.
89+ // It signals all goroutines to stop, shuts down the worker, and closes the quit channel.
90+ // Shutdown is idempotent and safe to call multiple times.
7891func (q * Queue ) Shutdown () {
7992 if ! atomic .CompareAndSwapInt32 (& q .stopFlag , 0 , 1 ) {
8093 return
@@ -92,55 +105,59 @@ func (q *Queue) Shutdown() {
92105 })
93106}
94107
95- // Release for graceful shutdown.
108+ // Release performs a graceful shutdown and waits for all goroutines to finish .
96109func (q * Queue ) Release () {
97110 q .Shutdown ()
98111 q .Wait ()
99112}
100113
101- // BusyWorkers returns the numbers of workers in the running process .
114+ // BusyWorkers returns the number of workers currently processing jobs .
102115func (q * Queue ) BusyWorkers () int64 {
103116 return q .metric .BusyWorkers ()
104117}
105118
106- // BusyWorkers returns the numbers of success tasks.
119+ // SuccessTasks returns the number of successfully completed tasks.
107120func (q * Queue ) SuccessTasks () uint64 {
108121 return q .metric .SuccessTasks ()
109122}
110123
111- // BusyWorkers returns the numbers of failure tasks.
124+ // FailureTasks returns the number of failed tasks.
112125func (q * Queue ) FailureTasks () uint64 {
113126 return q .metric .FailureTasks ()
114127}
115128
116- // BusyWorkers returns the numbers of submitted tasks .
129+ // SubmittedTasks returns the number of tasks submitted to the queue .
117130func (q * Queue ) SubmittedTasks () uint64 {
118131 return q .metric .SubmittedTasks ()
119132}
120133
121- // CompletedTasks returns the numbers of completed tasks.
134+ // CompletedTasks returns the total number of completed tasks (success + failure) .
122135func (q * Queue ) CompletedTasks () uint64 {
123136 return q .metric .CompletedTasks ()
124137}
125138
126- // Wait all process
139+ // Wait blocks until all goroutines in the routine group have finished.
127140func (q * Queue ) Wait () {
128141 q .routineGroup .Wait ()
129142}
130143
131- // Queue to queue single job with binary
144+ // Queue enqueues a single job (core.QueuedMessage) into the queue.
145+ // Accepts job options for customization.
132146func (q * Queue ) Queue (message core.QueuedMessage , opts ... job.AllowOption ) error {
133147 data := job .NewMessage (message , opts ... )
134148
135149 return q .queue (& data )
136150}
137151
138- // QueueTask to queue single task
152+ // QueueTask enqueues a single task function into the queue.
153+ // Accepts job options for customization.
139154func (q * Queue ) QueueTask (task job.TaskFunc , opts ... job.AllowOption ) error {
140155 data := job .NewTask (task , opts ... )
141156 return q .queue (& data )
142157}
143158
159+ // queue is an internal helper to enqueue a job.Message into the worker.
160+ // It increments the submitted task metric and notifies workers if possible.
144161func (q * Queue ) queue (m * job.Message ) error {
145162 if atomic .LoadInt32 (& q .stopFlag ) == 1 {
146163 return ErrQueueShutdown
@@ -151,9 +168,8 @@ func (q *Queue) queue(m *job.Message) error {
151168 }
152169
153170 q .metric .IncSubmittedTask ()
154- // notify worker
155- // if the channel is full, it means that the worker is busy
156- // and we don't want to block the main thread
171+ // Notify a worker that a new job is available.
172+ // If the notify channel is full, the worker is busy and we avoid blocking.
157173 select {
158174 case q .notify <- struct {}{}:
159175 default :
@@ -162,10 +178,11 @@ func (q *Queue) queue(m *job.Message) error {
162178 return nil
163179}
164180
181+ // work executes a single task, handling panics and updating metrics accordingly.
182+ // After execution, it schedules the next worker if needed.
165183func (q * Queue ) work (task core.TaskMessage ) {
166184 var err error
167- // to handle panic cases from inside the worker
168- // in such case, we start a new goroutine
185+ // Defer block to handle panics, update metrics, and run afterFn callback.
169186 defer func () {
170187 q .metric .DecBusyWorker ()
171188 e := recover ()
@@ -174,7 +191,7 @@ func (q *Queue) work(task core.TaskMessage) {
174191 }
175192 q .schedule ()
176193
177- // increase success or failure number
194+ // Update success or failure metrics based on execution result.
178195 if err == nil && e == nil {
179196 q .metric .IncSuccessTask ()
180197 } else {
@@ -190,6 +207,8 @@ func (q *Queue) work(task core.TaskMessage) {
190207 }
191208}
192209
210+ // run dispatches the task to the appropriate handler based on its type.
211+ // Returns an error if the task type is invalid.
193212func (q * Queue ) run (task core.TaskMessage ) error {
194213 switch t := task .(type ) {
195214 case * job.Message :
@@ -199,8 +218,11 @@ func (q *Queue) run(task core.TaskMessage) error {
199218 }
200219}
201220
221+ // handle executes a job.Message, supporting retries, timeouts, and panic recovery.
222+ // Returns an error if the job fails or times out.
202223func (q * Queue ) handle (m * job.Message ) error {
203- // create channel with buffer size 1 to avoid goroutine leak
224+ // done: receives the result of the job execution
225+ // panicChan: receives any panic that occurs in the job goroutine
204226 done := make (chan error , 1 )
205227 panicChan := make (chan any , 1 )
206228 startTime := time .Now ()
@@ -209,18 +231,18 @@ func (q *Queue) handle(m *job.Message) error {
209231 cancel ()
210232 }()
211233
212- // run the job
234+ // Run the job in a separate goroutine to support timeout and panic recovery.
213235 go func () {
214- // handle panic issue
236+ // Defer block to catch panics and send to panicChan
215237 defer func () {
216238 if p := recover (); p != nil {
217239 panicChan <- p
218240 }
219241 }()
220242
221- // run custom process function
222243 var err error
223244
245+ // Set up backoff for retry logic
224246 b := & backoff.Backoff {
225247 Min : m .RetryMin ,
226248 Max : m .RetryMax ,
@@ -230,26 +252,28 @@ func (q *Queue) handle(m *job.Message) error {
230252 delay := m .RetryDelay
231253 loop:
232254 for {
255+ // If a custom Task function is provided, use it; otherwise, use the worker's Run method.
233256 if m .Task != nil {
234257 err = m .Task (ctx )
235258 } else {
236259 err = q .worker .Run (ctx , m )
237260 }
238261
239- // check error and retry count
262+ // If no error or no retries left, exit loop.
240263 if err == nil || m .RetryCount == 0 {
241264 break
242265 }
243266 m .RetryCount --
244267
268+ // If no fixed retry delay, use backoff.
245269 if m .RetryDelay == 0 {
246270 delay = b .Duration ()
247271 }
248272
249273 select {
250- case <- time .After (delay ): // retry delay
274+ case <- time .After (delay ): // Wait before retrying
251275 q .logger .Infof ("retry remaining times: %d, delay time: %s" , m .RetryCount , delay )
252- case <- ctx .Done (): // timeout reached
276+ case <- ctx .Done (): // Timeout reached
253277 err = ctx .Err ()
254278 break loop
255279 }
@@ -261,36 +285,36 @@ func (q *Queue) handle(m *job.Message) error {
261285 select {
262286 case p := <- panicChan :
263287 panic (p )
264- case <- ctx .Done (): // timeout reached
288+ case <- ctx .Done (): // Timeout reached
265289 return ctx .Err ()
266- case <- q .quit : // shutdown service
267- // cancel job
290+ case <- q .quit : // Queue is shutting down
291+ // Cancel job and wait for remaining time or job completion
268292 cancel ()
269-
270293 leftTime := m .Timeout - time .Since (startTime )
271- // wait job
272294 select {
273295 case <- time .After (leftTime ):
274296 return context .DeadlineExceeded
275- case err := <- done : // job finish
297+ case err := <- done : // Job finished
276298 return err
277299 case p := <- panicChan :
278300 panic (p )
279301 }
280- case err := <- done : // job finish
302+ case err := <- done : // Job finished
281303 return err
282304 }
283305}
284306
285- // UpdateWorkerCount to update worker number dynamically.
307+ // UpdateWorkerCount dynamically updates the number of worker goroutines.
308+ // Triggers scheduling to adjust to the new worker count.
286309func (q * Queue ) UpdateWorkerCount (num int64 ) {
287310 q .Lock ()
288311 q .workerCount = num
289312 q .Unlock ()
290313 q .schedule ()
291314}
292315
293- // schedule to check worker number
316+ // schedule checks if more workers can be started based on the current busy count.
317+ // If so, it signals readiness to start a new worker.
294318func (q * Queue ) schedule () {
295319 q .Lock ()
296320 defer q .Unlock ()
@@ -304,24 +328,30 @@ func (q *Queue) schedule() {
304328 }
305329}
306330
307- // start to start all worker
331+ /*
332+ start launches the main worker loop, which manages job scheduling and execution.
333+
334+ - It uses a ticker to periodically retry job requests if the queue is empty.
335+ - For each available worker slot, it requests a new task from the worker.
336+ - If a task is available, it is sent to the tasks channel and processed by a new goroutine.
337+ - The loop exits when the quit channel is closed.
338+ */
308339func (q * Queue ) start () {
309340 tasks := make (chan core.TaskMessage , 1 )
310341 ticker := time .NewTicker (q .retryInterval )
311342 defer ticker .Stop ()
312343
313344 for {
314- // check worker number
345+ // Ensure the number of busy workers does not exceed the configured worker count.
315346 q .schedule ()
316347
317348 select {
318- // wait worker ready
319- case <- q .ready :
320- case <- q .quit :
349+ case <- q .ready : // Wait for a worker slot to become available
350+ case <- q .quit : // Shutdown signal received
321351 return
322352 }
323353
324- // request task from queue in background
354+ // Request a task from the worker in a background goroutine.
325355 q .routineGroup .Run (func () {
326356 for {
327357 t , err := q .worker .Request ()
@@ -359,7 +389,7 @@ func (q *Queue) start() {
359389 return
360390 }
361391
362- // start new task
392+ // Start processing the new task in a separate goroutine.
363393 q .metric .IncBusyWorker ()
364394 q .routineGroup .Run (func () {
365395 q .work (task )
0 commit comments