11package queue
22
3- import (
4- "context"
5- "errors"
6- "time"
7- )
8-
93// Worker interface
104type Worker interface {
115 // BeforeRun is called before starting the worker
@@ -28,83 +22,3 @@ type Worker interface {
2822type QueuedMessage interface {
2923 Bytes () []byte
3024}
31-
32- var (
33- _ Worker = (* emptyWorker )(nil )
34- _ Worker = (* messageWorker )(nil )
35- )
36-
37- type emptyWorker struct {}
38-
39- func (w * emptyWorker ) BeforeRun () error { return nil }
40- func (w * emptyWorker ) AfterRun () error { return nil }
41- func (w * emptyWorker ) Run () error { return nil }
42- func (w * emptyWorker ) Shutdown () error { return nil }
43- func (w * emptyWorker ) Queue (job QueuedMessage ) error { return nil }
44- func (w * emptyWorker ) Capacity () int { return 0 }
45- func (w * emptyWorker ) Usage () int { return 0 }
46-
47- type messageWorker struct {
48- messages chan QueuedMessage
49- }
50-
51- func (w * messageWorker ) BeforeRun () error { return nil }
52- func (w * messageWorker ) AfterRun () error { return nil }
53- func (w * messageWorker ) Run () error {
54- for msg := range w .messages {
55- if string (msg .Bytes ()) == "panic" {
56- panic ("show panic" )
57- }
58- time .Sleep (20 * time .Millisecond )
59- }
60- return nil
61- }
62-
63- func (w * messageWorker ) Shutdown () error {
64- close (w .messages )
65- return nil
66- }
67-
68- func (w * messageWorker ) Queue (job QueuedMessage ) error {
69- select {
70- case w .messages <- job :
71- return nil
72- default :
73- return errors .New ("max capacity reached" )
74- }
75- }
76- func (w * messageWorker ) Capacity () int { return cap (w .messages ) }
77- func (w * messageWorker ) Usage () int { return len (w .messages ) }
78-
79- type taskWorker struct {
80- messages chan QueuedMessage
81- }
82-
83- func (w * taskWorker ) BeforeRun () error { return nil }
84- func (w * taskWorker ) AfterRun () error { return nil }
85- func (w * taskWorker ) Run () error {
86- for msg := range w .messages {
87- if v , ok := msg .(Job ); ok {
88- if v .Task != nil {
89- _ = v .Task (context .Background ())
90- }
91- }
92- }
93- return nil
94- }
95-
96- func (w * taskWorker ) Shutdown () error {
97- close (w .messages )
98- return nil
99- }
100-
101- func (w * taskWorker ) Queue (job QueuedMessage ) error {
102- select {
103- case w .messages <- job :
104- return nil
105- default :
106- return errors .New ("max capacity reached" )
107- }
108- }
109- func (w * taskWorker ) Capacity () int { return cap (w .messages ) }
110- func (w * taskWorker ) Usage () int { return len (w .messages ) }
0 commit comments