11package queue
22
33import (
4+ "context"
45 "testing"
56 "time"
67
@@ -33,7 +34,7 @@ func TestNewQueue(t *testing.T) {
3334}
3435
3536func TestWorkerNum (t * testing.T ) {
36- w := & queueWorker {
37+ w := & messageWorker {
3738 messages : make (chan QueuedMessage , 100 ),
3839 }
3940 q , err := NewQueue (
@@ -52,7 +53,7 @@ func TestWorkerNum(t *testing.T) {
5253}
5354
5455func TestShtdonwOnce (t * testing.T ) {
55- w := & queueWorker {
56+ w := & messageWorker {
5657 messages : make (chan QueuedMessage , 100 ),
5758 }
5859 q , err := NewQueue (
@@ -76,7 +77,7 @@ func TestWorkerStatus(t *testing.T) {
7677 m := mockMessage {
7778 message : "foobar" ,
7879 }
79- w := & queueWorker {
80+ w := & messageWorker {
8081 messages : make (chan QueuedMessage , 100 ),
8182 }
8283 q , err := NewQueue (
@@ -99,7 +100,7 @@ func TestWorkerStatus(t *testing.T) {
99100}
100101
101102func TestWorkerPanic (t * testing.T ) {
102- w := & queueWorker {
103+ w := & messageWorker {
103104 messages : make (chan QueuedMessage , 10 ),
104105 }
105106 q , err := NewQueue (
@@ -127,7 +128,7 @@ func TestWorkerPanic(t *testing.T) {
127128}
128129
129130func TestCapacityReached (t * testing.T ) {
130- w := & queueWorker {
131+ w := & messageWorker {
131132 messages : make (chan QueuedMessage , 1 ),
132133 }
133134 q , err := NewQueue (
@@ -148,7 +149,7 @@ func TestCapacityReached(t *testing.T) {
148149}
149150
150151func TestCloseQueueAfterShutdown (t * testing.T ) {
151- w := & queueWorker {
152+ w := & messageWorker {
152153 messages : make (chan QueuedMessage , 10 ),
153154 }
154155 q , err := NewQueue (
@@ -174,3 +175,37 @@ func TestCloseQueueAfterShutdown(t *testing.T) {
174175 assert .Error (t , err )
175176 assert .Equal (t , ErrQueueShutdown , err )
176177}
178+
179+ func TestQueueTaskJob (t * testing.T ) {
180+ w := & taskWorker {
181+ messages : make (chan QueuedMessage , 10 ),
182+ }
183+ q , err := NewQueue (
184+ WithWorker (w ),
185+ WithWorkerCount (5 ),
186+ WithLogger (NewLogger ()),
187+ )
188+ assert .NoError (t , err )
189+ assert .NotNil (t , q )
190+ q .Start ()
191+ assert .NoError (t , q .QueueTask (func (ctx context.Context ) error {
192+ time .Sleep (120 * time .Millisecond )
193+ q .logger .Info ("Add new task 1" )
194+ return nil
195+ }))
196+ assert .NoError (t , q .QueueTask (func (ctx context.Context ) error {
197+ time .Sleep (100 * time .Millisecond )
198+ q .logger .Info ("Add new task 2" )
199+ return nil
200+ }))
201+ assert .NoError (t , q .QueueTaskWithTimeout (50 * time .Millisecond , func (ctx context.Context ) error {
202+ time .Sleep (80 * time .Millisecond )
203+ return nil
204+ }))
205+ time .Sleep (50 * time .Millisecond )
206+ q .Shutdown ()
207+ assert .Equal (t , ErrQueueShutdown , q .QueueTask (func (ctx context.Context ) error {
208+ return nil
209+ }))
210+ q .Wait ()
211+ }
0 commit comments