Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 35358b9

Browse files
author
João Assunção
committed
Merge branch 'hotfix/check-msg-with-nil-values'
2 parents acc8876 + 3bb57d4 commit 35358b9

File tree

12 files changed

+188
-37
lines changed

12 files changed

+188
-37
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
3939
- `ES_TIME_SUFFIX` Indicates what time unit to append to index names on Elasticsearch. Supported values are `day` and `hour`. Default value is `day` **OPTIONAL**
4040
- `KAFKA_CONSUMER_RECORD_TYPE` Kafka record type. Should be set to "avro" or "json". Defaults to avro. **OPTIONAL**
4141
- `KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL` The interval which the app updates the exported metrics in the format of golang's `time.ParseDuration`. Defaults to 30s. **OPTIONAL**
42+
- `KAFKA_CONSUMER_INCLUDE_KEY` Determines whether to include the Kafka key in the Elasticsearch message(as the "key" field). Defaults to false. **OPTIONAL**
4243

4344
### Important note about Elasticsearch mappings and types
4445

@@ -159,7 +160,8 @@ To build the project from source, run from project root:
159160
go build cmd/injector.go
160161
```
161162

162-
To run tests, run `docker-compose up -d zookeeper kafka schema-registry elasticsearch` and run `make test`.
163+
To run tests, first add kafka to your /etc/hosts file `echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts`
164+
and then run `docker-compose up -d zookeeper kafka schema-registry elasticsearch` and run `make test`.
163165

164166
### Versioning
165167

cmd/injector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func main() {
4545
BufferSize: os.Getenv("KAFKA_CONSUMER_BUFFER_SIZE"),
4646
MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"),
4747
RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"),
48+
IncludeKey: os.Getenv("KAFKA_CONSUMER_INCLUDE_KEY"),
4849
}
4950
metricsPublisher := metrics.NewMetricsPublisher()
5051
service := injector.NewService(logger, metricsPublisher)

src/elasticsearch/elasticsearch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
8989
return &InsertResponse{AlreadyExists: nil, Retry: records, Backoff: true}, nil
9090
}
9191
if err != nil {
92+
_ = level.Debug(d.logger).Log("message", "something went wrong with elasticsearch", "err", err)
9293
return nil, err
9394
}
9495
if res.Errors {
@@ -112,10 +113,12 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
112113
}
113114
for _, f := range failed {
114115
if f.Status == http.StatusBadRequest {
116+
_ = level.Debug(d.logger).Log("message", "elasticsearch bad requests", "err", f)
115117
d.metricsPublisher.ElasticsearchBadRequests(1)
116118
continue
117119
}
118120
if f.Status == http.StatusConflict {
121+
_ = level.Debug(d.logger).Log("message", "elasticsearch conflicts", "err", f)
119122
d.metricsPublisher.ElasticsearchConflicts(1)
120123
continue
121124
}

src/errors/ErrNilMessage.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package errors
2+
3+
import "errors"
4+
5+
var ErrNilMessage = errors.New("message value is nil")

src/injector/injector.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55

66
"time"
77

8-
"github.com/inloco/kafka-elasticsearch-injector/src/kafka"
9-
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
108
"github.com/go-kit/kit/log"
119
"github.com/go-kit/kit/log/level"
10+
"github.com/inloco/kafka-elasticsearch-injector/src/kafka"
11+
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
1212
)
1313

1414
func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *schema_registry.SchemaRegistry, kafkaConfig *kafka.Config) (kafka.Consumer, error) {
@@ -37,6 +37,15 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
3737
SchemaRegistry: schemaRegistry,
3838
}
3939

40+
includeKey, err := strconv.ParseBool(kafkaConfig.IncludeKey)
41+
if err != nil {
42+
err = level.Warn(logger).Log("err", err, "message", "failed to get consumer include key configuration flag")
43+
if err != nil {
44+
panic(err)
45+
}
46+
includeKey = false
47+
}
48+
4049
return kafka.Consumer{
4150
Topics: kafkaConfig.Topics,
4251
Group: kafkaConfig.ConsumerGroup,
@@ -47,5 +56,6 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
4756
BatchSize: batchSize,
4857
MetricsUpdateInterval: metricsUpdateInterval,
4958
BufferSize: bufferSize,
59+
IncludeKey: includeKey,
5060
}, nil
5161
}

src/injector/store/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ type basicStore struct {
2121
}
2222

2323
func (s basicStore) Insert(records []*models.Record) error {
24+
if len(records) == 0 {
25+
return nil
26+
}
27+
2428
elasticRecords, err := s.codec.EncodeElasticRecords(records)
2529
if err != nil {
2630
return err

src/kafka/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ type Config struct {
1313
MetricsUpdateInterval string
1414
BufferSize string
1515
RecordType string
16+
IncludeKey string
1617
}

src/kafka/consumer.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,18 @@ package kafka
33
import (
44
"context"
55
"os"
6+
"errors"
67

78
"time"
89

910
"github.com/Shopify/sarama"
10-
"github.com/bsm/sarama-cluster"
11+
cluster "github.com/bsm/sarama-cluster"
1112
"github.com/go-kit/kit/endpoint"
1213
"github.com/go-kit/kit/log"
1314
"github.com/go-kit/kit/log/level"
1415
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
1516
"github.com/inloco/kafka-elasticsearch-injector/src/models"
17+
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1618
)
1719

1820
type Notification int32
@@ -41,6 +43,7 @@ type Consumer struct {
4143
BatchSize int
4244
MetricsUpdateInterval time.Duration
4345
BufferSize int
46+
IncludeKey bool
4447
}
4548

4649
type topicPartitionOffset struct {
@@ -142,8 +145,12 @@ func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications c
142145
for idx == buffSize {
143146
if decoded == nil {
144147
for _, msg := range buf {
145-
req, err := k.consumer.Decoder(nil, msg)
148+
req, err := k.consumer.Decoder(nil, msg, k.consumer.IncludeKey)
146149
if err != nil {
150+
if errors.Is(err, e.ErrNilMessage) {
151+
continue
152+
}
153+
147154
level.Error(k.consumer.Logger).Log(
148155
"message", "Error decoding message",
149156
"err", err.Error(),

src/kafka/consumer_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ type fixtureService struct {
3030
}
3131

3232
func (s fixtureService) Insert(records []*models.Record) error {
33+
if len(records) == 0 {
34+
return nil
35+
}
36+
3337
elasticRecords, err := s.codec.EncodeElasticRecords(records)
3438
if err != nil {
3539
return err
@@ -63,8 +67,11 @@ var (
6367
service = fixtureService{db, codec}
6468
endpoints = &fixtureEndpoints{
6569
func(ctx context.Context, request interface{}) (response interface{}, err error) {
66-
records := request.([]*models.Record)
70+
if request == nil {
71+
return nil, nil
72+
}
6773

74+
records := request.([]*models.Record)
6875
return nil, service.Insert(records)
6976
},
7077
}
@@ -111,7 +118,6 @@ func TestKafka_Start(t *testing.T) {
111118
rec := fixtures.NewFixtureRecord()
112119
var msg *sarama.ProducerMessage
113120
if assert.NoError(t, err) {
114-
115121
err = producer.Publish(rec)
116122
if assert.NoError(t, err) {
117123
msg = <-producer.GetSuccesses()

src/kafka/encode_decode.go

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,20 @@ import (
1111
"sync"
1212

1313
"github.com/Shopify/sarama"
14-
"github.com/linkedin/goavro/v2"
14+
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1515
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1616
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
17+
"github.com/linkedin/goavro/v2"
1718
)
1819

1920
// DecodeMessageFunc extracts a user-domain request object from an Kafka
2021
// message object. It's designed to be used in Kafka consumers.
2122
// One straightforward DecodeMessageFunc could be something that
2223
// Avro decodes the message body to the concrete response type.
23-
type DecodeMessageFunc func(context.Context, *sarama.ConsumerMessage) (record *models.Record, err error)
24+
type DecodeMessageFunc func(context.Context, *sarama.ConsumerMessage, bool) (record *models.Record, err error)
2425

2526
const kafkaTimestampKey = "@timestamp"
27+
const keyField = "key"
2628

2729
type Decoder struct {
2830
SchemaRegistry *schema_registry.SchemaRegistry
@@ -37,28 +39,12 @@ func (d *Decoder) DeserializerFor(recordType string) DecodeMessageFunc {
3739
}
3840
}
3941

40-
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
41-
schemaId := getSchemaId(msg)
42-
avroRecord := msg.Value[5:]
43-
schema, err := d.SchemaRegistry.GetSchema(schemaId)
44-
if err != nil {
45-
return nil, err
46-
}
47-
var codec *goavro.Codec
48-
if codecI, ok := d.CodecCache.Load(schemaId); ok {
49-
codec, ok = codecI.(*goavro.Codec)
50-
}
51-
52-
if codec == nil {
53-
codec, err = goavro.NewCodec(schema)
54-
if err != nil {
55-
return nil, err
56-
}
57-
58-
d.CodecCache.Store(schemaId, codec)
42+
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error) {
43+
if msg.Value == nil {
44+
return nil, e.ErrNilMessage
5945
}
6046

61-
native, _, err := codec.NativeFromBinary(avroRecord)
47+
native, err := d.nativeFromBinary(msg.Value)
6248
if err != nil {
6349
return nil, err
6450
}
@@ -77,6 +63,14 @@ func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.Consu
7763

7864
parsedNative[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
7965

66+
if includeKey && msg.Key != nil {
67+
nativeKey, err := d.nativeFromBinary(msg.Key)
68+
if err != nil {
69+
return nil, err
70+
}
71+
parsedNative[keyField] = nativeKey
72+
}
73+
8074
return &models.Record{
8175
Topic: msg.Topic,
8276
Partition: msg.Partition,
@@ -90,8 +84,9 @@ func makeTimestamp(timestamp time.Time) int64 {
9084
return timestamp.UnixNano() / int64(time.Millisecond)
9185
}
9286

93-
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
87+
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error) {
9488
var jsonValue map[string]interface{}
89+
var jsonKey map[string]interface{}
9590
err := json.Unmarshal(msg.Value, &jsonValue)
9691

9792
if err != nil {
@@ -100,6 +95,14 @@ func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.Consu
10095

10196
jsonValue[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
10297

98+
if includeKey && msg.Key != nil {
99+
err := json.Unmarshal(msg.Key, &jsonKey)
100+
if err != nil {
101+
return nil, err
102+
}
103+
jsonValue[keyField] = jsonKey
104+
}
105+
103106
return &models.Record{
104107
Topic: msg.Topic,
105108
Partition: msg.Partition,
@@ -109,7 +112,36 @@ func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.Consu
109112
}, nil
110113
}
111114

112-
func getSchemaId(msg *sarama.ConsumerMessage) int32 {
113-
schemaIdBytes := msg.Value[1:5]
115+
func (d *Decoder) nativeFromBinary(value []byte) (interface{}, error) {
116+
schemaId := getSchemaId(value)
117+
avroRecord := value[5:]
118+
schema, err := d.SchemaRegistry.GetSchema(schemaId)
119+
if err != nil {
120+
return nil, err
121+
}
122+
var codec *goavro.Codec
123+
if codecI, ok := d.CodecCache.Load(schemaId); ok {
124+
codec, _ = codecI.(*goavro.Codec)
125+
}
126+
127+
if codec == nil {
128+
codec, err = goavro.NewCodec(schema)
129+
if err != nil {
130+
return nil, err
131+
}
132+
133+
d.CodecCache.Store(schemaId, codec)
134+
}
135+
136+
native, _, err := codec.NativeFromBinary(avroRecord)
137+
if err != nil {
138+
return nil, err
139+
}
140+
141+
return native, nil
142+
}
143+
144+
func getSchemaId(value []byte) int32 {
145+
schemaIdBytes := value[1:5]
114146
return int32(schemaIdBytes[0])<<24 | int32(schemaIdBytes[1])<<16 | int32(schemaIdBytes[2])<<8 | int32(schemaIdBytes[3])
115147
}

0 commit comments

Comments
 (0)