File tree Expand file tree Collapse file tree 2 files changed +36
-0
lines changed Expand file tree Collapse file tree 2 files changed +36
-0
lines changed Original file line number Diff line number Diff line change @@ -128,6 +128,10 @@ func (q *Queue) Wait() {
128128
129129// Queue to queue all job
130130func (q * Queue ) Queue (job QueuedMessage ) error {
131+ if atomic .LoadInt32 (& q .stopFlag ) == 1 {
132+ return ErrQueueShutdown
133+ }
134+
131135 return q .worker .Queue (Job {
132136 Timeout : q .timeout ,
133137 Body : job .Bytes (),
@@ -136,6 +140,10 @@ func (q *Queue) Queue(job QueuedMessage) error {
136140
137141// Queue to queue all job
138142func (q * Queue ) QueueWithTimeout (timeout time.Duration , job QueuedMessage ) error {
143+ if atomic .LoadInt32 (& q .stopFlag ) == 1 {
144+ return ErrQueueShutdown
145+ }
146+
139147 return q .worker .Queue (Job {
140148 Timeout : timeout ,
141149 Body : job .Bytes (),
Original file line number Diff line number Diff line change @@ -146,3 +146,31 @@ func TestCapacityReached(t *testing.T) {
146146 message : "foobar" ,
147147 }))
148148}
149+
150+ func TestCloseQueueAfterShutdown (t * testing.T ) {
151+ w := & queueWorker {
152+ messages : make (chan QueuedMessage , 10 ),
153+ }
154+ q , err := NewQueue (
155+ WithWorker (w ),
156+ WithWorkerCount (5 ),
157+ WithLogger (NewEmptyLogger ()),
158+ )
159+ assert .NoError (t , err )
160+ assert .NotNil (t , q )
161+
162+ assert .NoError (t , q .Queue (mockMessage {
163+ message : "foobar" ,
164+ }))
165+ q .Shutdown ()
166+ err = q .Queue (mockMessage {
167+ message : "foobar" ,
168+ })
169+ assert .Error (t , err )
170+ assert .Equal (t , ErrQueueShutdown , err )
171+ err = q .QueueWithTimeout (10 * time .Millisecond , mockMessage {
172+ message : "foobar" ,
173+ })
174+ assert .Error (t , err )
175+ assert .Equal (t , ErrQueueShutdown , err )
176+ }
You can’t perform that action at this time.
0 commit comments