Skip to content

Commit 94ff1c3

Browse files
committed
[pubsub] fix lint issues and improve errors model
1 parent eb5cd8b commit 94ff1c3

File tree

4 files changed

+34
-25
lines changed

4 files changed

+34
-25
lines changed

pkg/pubsub/memory.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/google/uuid"
88
)
99

10-
type memoryPubSub struct {
10+
type MemoryPubSub struct {
1111
bufferSize uint
1212

1313
wg sync.WaitGroup
@@ -21,15 +21,17 @@ type subscriber struct {
2121
ctx context.Context
2222
}
2323

24-
func NewMemory(opts ...Option) *memoryPubSub {
24+
func NewMemory(opts ...Option) *MemoryPubSub {
2525
o := options{
2626
bufferSize: 0,
2727
}
2828
o.apply(opts...)
2929

30-
return &memoryPubSub{
30+
return &MemoryPubSub{
3131
bufferSize: o.bufferSize,
3232

33+
wg: sync.WaitGroup{},
34+
mu: sync.RWMutex{},
3335
topics: make(map[string]map[string]subscriber),
3436
closeCh: make(chan struct{}),
3537
}
@@ -38,7 +40,7 @@ func NewMemory(opts ...Option) *memoryPubSub {
3840
// Publish sends a message to all subscribers of the given topic.
3941
// This method blocks until all subscribers have received the message
4042
// or until ctx is cancelled or the pubsub instance is closed.
41-
func (m *memoryPubSub) Publish(ctx context.Context, topic string, data []byte) error {
43+
func (m *MemoryPubSub) Publish(ctx context.Context, topic string, data []byte) error {
4244
select {
4345
case <-m.closeCh:
4446
return ErrPubSubClosed
@@ -82,7 +84,7 @@ func (m *memoryPubSub) Publish(ctx context.Context, topic string, data []byte) e
8284
return nil
8385
}
8486

85-
func (m *memoryPubSub) Subscribe(ctx context.Context, topic string) (*Subscription, error) {
87+
func (m *MemoryPubSub) Subscribe(ctx context.Context, topic string) (*Subscription, error) {
8688
select {
8789
case <-m.closeCh:
8890
return nil, ErrPubSubClosed
@@ -116,7 +118,7 @@ func (m *memoryPubSub) Subscribe(ctx context.Context, topic string) (*Subscripti
116118
return &Subscription{id: id, ctx: subCtx, cancel: cancel, ch: ch}, nil
117119
}
118120

119-
func (m *memoryPubSub) subscribe(id, topic string, sub subscriber) {
121+
func (m *MemoryPubSub) subscribe(id, topic string, sub subscriber) {
120122
m.mu.Lock()
121123
defer m.mu.Unlock()
122124

@@ -128,7 +130,7 @@ func (m *memoryPubSub) subscribe(id, topic string, sub subscriber) {
128130
subscriptions[id] = sub
129131
}
130132

131-
func (m *memoryPubSub) unsubscribe(id, topic string) {
133+
func (m *MemoryPubSub) unsubscribe(id, topic string) {
132134
m.mu.Lock()
133135
defer m.mu.Unlock()
134136

@@ -142,7 +144,7 @@ func (m *memoryPubSub) unsubscribe(id, topic string) {
142144
}
143145
}
144146

145-
func (m *memoryPubSub) Close() error {
147+
func (m *MemoryPubSub) Close() error {
146148
select {
147149
case <-m.closeCh:
148150
return nil
@@ -155,4 +157,4 @@ func (m *memoryPubSub) Close() error {
155157
return nil
156158
}
157159

158-
var _ PubSub = (*memoryPubSub)(nil)
160+
var _ PubSub = (*MemoryPubSub)(nil)

pkg/pubsub/options.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ type options struct {
66
bufferSize uint
77
}
88

9-
func (o *options) apply(opts ...Option) *options {
9+
func (o *options) apply(opts ...Option) {
1010
for _, opt := range opts {
1111
opt(o)
1212
}
13-
14-
return o
1513
}
1614

1715
func WithBufferSize(bufferSize uint) Option {

pkg/pubsub/pubsub.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import (
66
)
77

88
var (
9-
ErrPubSubClosed = errors.New("pubsub is closed")
10-
ErrInvalidTopic = errors.New("invalid topic name")
9+
ErrInvalidConfig = errors.New("invalid config")
10+
ErrPubSubClosed = errors.New("pubsub is closed")
11+
ErrInvalidTopic = errors.New("invalid topic name")
1112
)
1213

1314
type Message struct {

pkg/pubsub/redis.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type RedisConfig struct {
2626
Prefix string
2727
}
2828

29-
type redisPubSub struct {
29+
type RedisPubSub struct {
3030
prefix string
3131
bufferSize uint
3232

@@ -39,13 +39,13 @@ type redisPubSub struct {
3939
closeCh chan struct{}
4040
}
4141

42-
func NewRedis(config RedisConfig, opts ...Option) (*redisPubSub, error) {
42+
func NewRedis(config RedisConfig, opts ...Option) (*RedisPubSub, error) {
4343
if config.Prefix != "" && !strings.HasSuffix(config.Prefix, ":") {
4444
config.Prefix += ":"
4545
}
4646

4747
if config.Client == nil && config.URL == "" {
48-
return nil, fmt.Errorf("no redis client or url provided")
48+
return nil, fmt.Errorf("%w: no redis client or url provided", ErrInvalidConfig)
4949
}
5050

5151
client := config.Client
@@ -63,19 +63,21 @@ func NewRedis(config RedisConfig, opts ...Option) (*redisPubSub, error) {
6363
}
6464
o.apply(opts...)
6565

66-
return &redisPubSub{
66+
return &RedisPubSub{
6767
prefix: config.Prefix,
6868
bufferSize: o.bufferSize,
6969

7070
client: client,
7171
ownedClient: config.Client == nil,
7272

73+
wg: sync.WaitGroup{},
74+
mu: sync.Mutex{},
7375
subscribers: make(map[string]context.CancelFunc),
7476
closeCh: make(chan struct{}),
7577
}, nil
7678
}
7779

78-
func (r *redisPubSub) Publish(ctx context.Context, topic string, data []byte) error {
80+
func (r *RedisPubSub) Publish(ctx context.Context, topic string, data []byte) error {
7981
select {
8082
case <-r.closeCh:
8183
return ErrPubSubClosed
@@ -86,10 +88,14 @@ func (r *redisPubSub) Publish(ctx context.Context, topic string, data []byte) er
8688
return ErrInvalidTopic
8789
}
8890

89-
return r.client.Publish(ctx, r.prefix+topic, data).Err()
91+
if err := r.client.Publish(ctx, r.prefix+topic, data).Err(); err != nil {
92+
return fmt.Errorf("failed to publish message: %w", err)
93+
}
94+
95+
return nil
9096
}
9197

92-
func (r *redisPubSub) Subscribe(ctx context.Context, topic string) (*Subscription, error) {
98+
func (r *RedisPubSub) Subscribe(ctx context.Context, topic string) (*Subscription, error) {
9399
select {
94100
case <-r.closeCh:
95101
return nil, ErrPubSubClosed
@@ -104,7 +110,7 @@ func (r *redisPubSub) Subscribe(ctx context.Context, topic string) (*Subscriptio
104110
_, err := ps.Receive(ctx)
105111
if err != nil {
106112
closeErr := ps.Close()
107-
return nil, errors.Join(fmt.Errorf("can't subscribe: %w", err), closeErr)
113+
return nil, errors.Join(fmt.Errorf("failed to subscribe: %w", err), closeErr)
108114
}
109115

110116
id := uuid.NewString()
@@ -160,7 +166,7 @@ func (r *redisPubSub) Subscribe(ctx context.Context, topic string) (*Subscriptio
160166
return &Subscription{id: id, ctx: subCtx, cancel: cancel, ch: ch}, nil
161167
}
162168

163-
func (r *redisPubSub) Close() error {
169+
func (r *RedisPubSub) Close() error {
164170
select {
165171
case <-r.closeCh:
166172
return nil
@@ -171,10 +177,12 @@ func (r *redisPubSub) Close() error {
171177
r.wg.Wait()
172178

173179
if r.ownedClient {
174-
return r.client.Close()
180+
if err := r.client.Close(); err != nil {
181+
return fmt.Errorf("failed to close redis client: %w", err)
182+
}
175183
}
176184

177185
return nil
178186
}
179187

180-
var _ PubSub = (*redisPubSub)(nil)
188+
var _ PubSub = (*RedisPubSub)(nil)

0 commit comments

Comments
 (0)