Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 1a6169f

Browse files
committed
feat(errors): add ErrNilMessage
1 parent 2189c2b commit 1a6169f

File tree

5 files changed

+28
-11
lines changed

5 files changed

+28
-11
lines changed

src/elasticsearch/elasticsearch.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
8989
return &InsertResponse{AlreadyExists: nil, Retry: records, Backoff: true}, nil
9090
}
9191
if err != nil {
92-
level.Debug(d.logger).Log("message", "something went wrong with elasticsearch", "err", err)
92+
_ = level.Debug(d.logger).Log("message", "something went wrong with elasticsearch", "err", err)
9393
return nil, err
9494
}
9595
if res.Errors {
@@ -113,19 +113,18 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
113113
}
114114
for _, f := range failed {
115115
if f.Status == http.StatusBadRequest {
116-
level.Debug(d.logger).Log("message", "elasticsearch bad requests", "err", f)
116+
_ = level.Debug(d.logger).Log("message", "elasticsearch bad requests", "err", f)
117117
d.metricsPublisher.ElasticsearchBadRequests(1)
118118
continue
119119
}
120120
if f.Status == http.StatusConflict {
121-
level.Debug(d.logger).Log("message", "elasticsearch conflicts", "err", f)
121+
_ = level.Debug(d.logger).Log("message", "elasticsearch conflicts", "err", f)
122122
d.metricsPublisher.ElasticsearchConflicts(1)
123123
continue
124124
}
125125
retry = append(retry, recordMap[f.Id])
126126
if f.Status == http.StatusTooManyRequests {
127127
//es is overloaded, backoff
128-
level.Debug(d.logger).Log("message", "es is overloaded, backoff")
129128
overloaded = true
130129
}
131130
}

src/errors/ErrNilMessage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package errors
2+
3+
type ErrNilMessage struct {}
4+
5+
func (err *ErrNilMessage) Error() string {
6+
return "message value is nil"
7+
}

src/kafka/consumer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"context"
55
"os"
6+
"errors"
67

78
"time"
89

@@ -13,6 +14,7 @@ import (
1314
"github.com/go-kit/kit/log/level"
1415
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
1516
"github.com/inloco/kafka-elasticsearch-injector/src/models"
17+
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1618
)
1719

1820
type Notification int32
@@ -144,6 +146,10 @@ func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications c
144146
for _, msg := range buf {
145147
req, err := k.consumer.Decoder(nil, msg)
146148
if err != nil {
149+
if errors.Is(err, e.ErrNilMessage) {
150+
continue
151+
}
152+
147153
level.Error(k.consumer.Logger).Log(
148154
"message", "Error decoding message",
149155
"err", err.Error(),

src/kafka/encode_decode.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/Shopify/sarama"
1414
"github.com/linkedin/goavro/v2"
1515
"github.com/inloco/kafka-elasticsearch-injector/src/models"
16+
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1617
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
1718
)
1819

@@ -39,13 +40,7 @@ func (d *Decoder) DeserializerFor(recordType string) DecodeMessageFunc {
3940

4041
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
4142
if msg.Value == nil {
42-
return &models.Record{
43-
Topic: msg.Topic,
44-
Partition: msg.Partition,
45-
Offset: msg.Offset,
46-
Timestamp: msg.Timestamp,
47-
Json: nil,
48-
}, nil
43+
return nil, new(e.ErrNilMessage)
4944
}
5045

5146
schemaId := getSchemaId(msg)

src/kafka/encode_decode_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"sync"
77
"testing"
88
"time"
9+
"errors"
910

1011
"github.com/Shopify/sarama"
1112
"github.com/stretchr/testify/assert"
13+
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1214
)
1315

1416
type dummy struct {
@@ -49,3 +51,11 @@ func TestDecoder_JsonMessageToRecord_MalformedJson(t *testing.T) {
4951
assert.Nil(t, record)
5052
assert.NotNil(t, err)
5153
}
54+
55+
func TestDecoder_AvroMessageToRecord_NilMessageValue(t *testing.T) {
56+
d := &Decoder{CodecCache: sync.Map{}}
57+
record, err := d.AvroMessageToRecord(nil, &sarama.ConsumerMessage{Value: nil})
58+
isErrNilMessage := errors.Is(err, e.ErrNilMessage)
59+
assert.Nil(t, record)
60+
assert.Equal(t, isErrNilMessage, true)
61+
}

0 commit comments

Comments
 (0)