1- package simple
1+ package queue
22
33import (
44 "context"
@@ -7,40 +7,38 @@ import (
77 "sync"
88 "sync/atomic"
99 "time"
10-
11- "github.com/golang-queue/queue"
1210)
1311
1412const defaultQueueSize = 4096
1513
16- var _ queue. Worker = (* Worker )(nil )
14+ var _ Worker = (* Consumer )(nil )
1715
18- // Option for queue system
19- type Option func (* Worker )
16+ // ConsumerOption for queue system
17+ type ConsumerOption func (* Consumer )
2018
2119var errMaxCapacity = errors .New ("max capacity reached" )
2220
2321// Worker for simple queue using channel
24- type Worker struct {
25- taskQueue chan queue. QueuedMessage
26- runFunc func (context.Context , queue. QueuedMessage ) error
22+ type Consumer struct {
23+ taskQueue chan QueuedMessage
24+ runFunc func (context.Context , QueuedMessage ) error
2725 stop chan struct {}
28- logger queue. Logger
26+ logger Logger
2927 stopOnce sync.Once
3028 stopFlag int32
3129}
3230
3331// BeforeRun run script before start worker
34- func (s * Worker ) BeforeRun () error {
32+ func (s * Consumer ) BeforeRun () error {
3533 return nil
3634}
3735
3836// AfterRun run script after start worker
39- func (s * Worker ) AfterRun () error {
37+ func (s * Consumer ) AfterRun () error {
4038 return nil
4139}
4240
43- func (s * Worker ) handle (job queue. Job ) error {
41+ func (s * Consumer ) handle (job Job ) error {
4442 // create channel with buffer size 1 to avoid goroutine leak
4543 done := make (chan error , 1 )
4644 panicChan := make (chan interface {}, 1 )
@@ -90,18 +88,18 @@ func (s *Worker) handle(job queue.Job) error {
9088}
9189
9290// Run start the worker
93- func (s * Worker ) Run () error {
91+ func (s * Consumer ) Run () error {
9492 // check queue status
9593 select {
9694 case <- s .stop :
97- return queue . ErrQueueShutdown
95+ return ErrQueueShutdown
9896 default :
9997 }
10098
10199 for task := range s .taskQueue {
102- var data queue. Job
100+ var data Job
103101 _ = json .Unmarshal (task .Bytes (), & data )
104- if v , ok := task .(queue. Job ); ok {
102+ if v , ok := task .(Job ); ok {
105103 if v .Task != nil {
106104 data .Task = v .Task
107105 }
@@ -114,9 +112,9 @@ func (s *Worker) Run() error {
114112}
115113
116114// Shutdown worker
117- func (s * Worker ) Shutdown () error {
115+ func (s * Consumer ) Shutdown () error {
118116 if ! atomic .CompareAndSwapInt32 (& s .stopFlag , 0 , 1 ) {
119- return queue . ErrQueueShutdown
117+ return ErrQueueShutdown
120118 }
121119
122120 s .stopOnce .Do (func () {
@@ -127,19 +125,19 @@ func (s *Worker) Shutdown() error {
127125}
128126
129127// Capacity for channel
130- func (s * Worker ) Capacity () int {
128+ func (s * Consumer ) Capacity () int {
131129 return cap (s .taskQueue )
132130}
133131
134132// Usage for count of channel usage
135- func (s * Worker ) Usage () int {
133+ func (s * Consumer ) Usage () int {
136134 return len (s .taskQueue )
137135}
138136
139137// Queue send notification to queue
140- func (s * Worker ) Queue (job queue. QueuedMessage ) error {
138+ func (s * Consumer ) Queue (job QueuedMessage ) error {
141139 if atomic .LoadInt32 (& s .stopFlag ) == 1 {
142- return queue . ErrQueueShutdown
140+ return ErrQueueShutdown
143141 }
144142
145143 select {
@@ -151,33 +149,33 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
151149}
152150
153151// WithQueueNum setup the capcity of queue
154- func WithQueueNum (num int ) Option {
155- return func (w * Worker ) {
156- w .taskQueue = make (chan queue. QueuedMessage , num )
152+ func WithConsumerQueueNum (num int ) ConsumerOption {
153+ return func (w * Consumer ) {
154+ w .taskQueue = make (chan QueuedMessage , num )
157155 }
158156}
159157
160158// WithRunFunc setup the run func of queue
161- func WithRunFunc (fn func (context.Context , queue. QueuedMessage ) error ) Option {
162- return func (w * Worker ) {
159+ func WithConsumerRunFunc (fn func (context.Context , QueuedMessage ) error ) ConsumerOption {
160+ return func (w * Consumer ) {
163161 w .runFunc = fn
164162 }
165163}
166164
167- // WithLogger set custom logger
168- func WithLogger (l queue. Logger ) Option {
169- return func (w * Worker ) {
165+ // WithConsumerLogger set custom logger
166+ func WithConsumerLogger (l Logger ) ConsumerOption {
167+ return func (w * Consumer ) {
170168 w .logger = l
171169 }
172170}
173171
174- // NewWorker for struc
175- func NewWorker (opts ... Option ) * Worker {
176- w := & Worker {
177- taskQueue : make (chan queue. QueuedMessage , defaultQueueSize ),
172+ // NewConsumer for struc
173+ func NewConsumer (opts ... ConsumerOption ) * Consumer {
174+ w := & Consumer {
175+ taskQueue : make (chan QueuedMessage , defaultQueueSize ),
178176 stop : make (chan struct {}),
179- logger : queue . NewLogger (),
180- runFunc : func (context.Context , queue. QueuedMessage ) error {
177+ logger : NewLogger (),
178+ runFunc : func (context.Context , QueuedMessage ) error {
181179 return nil
182180 },
183181 }
0 commit comments