Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 3bb57d4

Browse files
author
João Assunção
authored
Merge pull request #51 from inloco/feature/add-key-to-json
Feature/add key to json
2 parents df68394 + b31d617 commit 3bb57d4

File tree

7 files changed

+147
-39
lines changed

7 files changed

+147
-39
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
3939
- `ES_TIME_SUFFIX` Indicates what time unit to append to index names on Elasticsearch. Supported values are `day` and `hour`. Default value is `day` **OPTIONAL**
4040
- `KAFKA_CONSUMER_RECORD_TYPE` Kafka record type. Should be set to "avro" or "json". Defaults to avro. **OPTIONAL**
4141
- `KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL` The interval which the app updates the exported metrics in the format of golang's `time.ParseDuration`. Defaults to 30s. **OPTIONAL**
42+
- `KAFKA_CONSUMER_INCLUDE_KEY` Determines whether to include the Kafka key in the Elasticsearch message(as the "key" field). Defaults to false. **OPTIONAL**
4243

4344
### Important note about Elasticsearch mappings and types
4445

@@ -159,7 +160,8 @@ To build the project from source, run from project root:
159160
go build cmd/injector.go
160161
```
161162

162-
To run tests, run `docker-compose up -d zookeeper kafka schema-registry elasticsearch` and run `make test`.
163+
To run tests, first add kafka to your /etc/hosts file `echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts`
164+
and then run `docker-compose up -d zookeeper kafka schema-registry elasticsearch` and run `make test`.
163165

164166
### Versioning
165167

cmd/injector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func main() {
4545
BufferSize: os.Getenv("KAFKA_CONSUMER_BUFFER_SIZE"),
4646
MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"),
4747
RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"),
48+
IncludeKey: os.Getenv("KAFKA_CONSUMER_INCLUDE_KEY"),
4849
}
4950
metricsPublisher := metrics.NewMetricsPublisher()
5051
service := injector.NewService(logger, metricsPublisher)

src/injector/injector.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55

66
"time"
77

8-
"github.com/inloco/kafka-elasticsearch-injector/src/kafka"
9-
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
108
"github.com/go-kit/kit/log"
119
"github.com/go-kit/kit/log/level"
10+
"github.com/inloco/kafka-elasticsearch-injector/src/kafka"
11+
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
1212
)
1313

1414
func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *schema_registry.SchemaRegistry, kafkaConfig *kafka.Config) (kafka.Consumer, error) {
@@ -37,6 +37,15 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
3737
SchemaRegistry: schemaRegistry,
3838
}
3939

40+
includeKey, err := strconv.ParseBool(kafkaConfig.IncludeKey)
41+
if err != nil {
42+
err = level.Warn(logger).Log("err", err, "message", "failed to get consumer include key configuration flag")
43+
if err != nil {
44+
panic(err)
45+
}
46+
includeKey = false
47+
}
48+
4049
return kafka.Consumer{
4150
Topics: kafkaConfig.Topics,
4251
Group: kafkaConfig.ConsumerGroup,
@@ -47,5 +56,6 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
4756
BatchSize: batchSize,
4857
MetricsUpdateInterval: metricsUpdateInterval,
4958
BufferSize: bufferSize,
59+
IncludeKey: includeKey,
5060
}, nil
5161
}

src/kafka/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ type Config struct {
1313
MetricsUpdateInterval string
1414
BufferSize string
1515
RecordType string
16+
IncludeKey string
1617
}

src/kafka/consumer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"time"
99

1010
"github.com/Shopify/sarama"
11-
"github.com/bsm/sarama-cluster"
11+
cluster "github.com/bsm/sarama-cluster"
1212
"github.com/go-kit/kit/endpoint"
1313
"github.com/go-kit/kit/log"
1414
"github.com/go-kit/kit/log/level"
@@ -43,6 +43,7 @@ type Consumer struct {
4343
BatchSize int
4444
MetricsUpdateInterval time.Duration
4545
BufferSize int
46+
IncludeKey bool
4647
}
4748

4849
type topicPartitionOffset struct {
@@ -144,7 +145,7 @@ func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications c
144145
for idx == buffSize {
145146
if decoded == nil {
146147
for _, msg := range buf {
147-
req, err := k.consumer.Decoder(nil, msg)
148+
req, err := k.consumer.Decoder(nil, msg, k.consumer.IncludeKey)
148149
if err != nil {
149150
if errors.Is(err, e.ErrNilMessage) {
150151
continue

src/kafka/encode_decode.go

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,20 @@ import (
1111
"sync"
1212

1313
"github.com/Shopify/sarama"
14-
"github.com/linkedin/goavro/v2"
15-
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1614
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
15+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1716
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
17+
"github.com/linkedin/goavro/v2"
1818
)
1919

2020
// DecodeMessageFunc extracts a user-domain request object from an Kafka
2121
// message object. It's designed to be used in Kafka consumers.
2222
// One straightforward DecodeMessageFunc could be something that
2323
// Avro decodes the message body to the concrete response type.
24-
type DecodeMessageFunc func(context.Context, *sarama.ConsumerMessage) (record *models.Record, err error)
24+
type DecodeMessageFunc func(context.Context, *sarama.ConsumerMessage, bool) (record *models.Record, err error)
2525

2626
const kafkaTimestampKey = "@timestamp"
27+
const keyField = "key"
2728

2829
type Decoder struct {
2930
SchemaRegistry *schema_registry.SchemaRegistry
@@ -38,32 +39,12 @@ func (d *Decoder) DeserializerFor(recordType string) DecodeMessageFunc {
3839
}
3940
}
4041

41-
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
42+
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error) {
4243
if msg.Value == nil {
4344
return nil, e.ErrNilMessage
4445
}
4546

46-
schemaId := getSchemaId(msg)
47-
avroRecord := msg.Value[5:]
48-
schema, err := d.SchemaRegistry.GetSchema(schemaId)
49-
if err != nil {
50-
return nil, err
51-
}
52-
var codec *goavro.Codec
53-
if codecI, ok := d.CodecCache.Load(schemaId); ok {
54-
codec, ok = codecI.(*goavro.Codec)
55-
}
56-
57-
if codec == nil {
58-
codec, err = goavro.NewCodec(schema)
59-
if err != nil {
60-
return nil, err
61-
}
62-
63-
d.CodecCache.Store(schemaId, codec)
64-
}
65-
66-
native, _, err := codec.NativeFromBinary(avroRecord)
47+
native, err := d.nativeFromBinary(msg.Value)
6748
if err != nil {
6849
return nil, err
6950
}
@@ -82,6 +63,14 @@ func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.Consu
8263

8364
parsedNative[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
8465

66+
if includeKey && msg.Key != nil {
67+
nativeKey, err := d.nativeFromBinary(msg.Key)
68+
if err != nil {
69+
return nil, err
70+
}
71+
parsedNative[keyField] = nativeKey
72+
}
73+
8574
return &models.Record{
8675
Topic: msg.Topic,
8776
Partition: msg.Partition,
@@ -95,8 +84,9 @@ func makeTimestamp(timestamp time.Time) int64 {
9584
return timestamp.UnixNano() / int64(time.Millisecond)
9685
}
9786

98-
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
87+
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error) {
9988
var jsonValue map[string]interface{}
89+
var jsonKey map[string]interface{}
10090
err := json.Unmarshal(msg.Value, &jsonValue)
10191

10292
if err != nil {
@@ -105,6 +95,14 @@ func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.Consu
10595

10696
jsonValue[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
10797

98+
if includeKey && msg.Key != nil {
99+
err := json.Unmarshal(msg.Key, &jsonKey)
100+
if err != nil {
101+
return nil, err
102+
}
103+
jsonValue[keyField] = jsonKey
104+
}
105+
108106
return &models.Record{
109107
Topic: msg.Topic,
110108
Partition: msg.Partition,
@@ -114,7 +112,36 @@ func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.Consu
114112
}, nil
115113
}
116114

117-
func getSchemaId(msg *sarama.ConsumerMessage) int32 {
118-
schemaIdBytes := msg.Value[1:5]
115+
func (d *Decoder) nativeFromBinary(value []byte) (interface{}, error) {
116+
schemaId := getSchemaId(value)
117+
avroRecord := value[5:]
118+
schema, err := d.SchemaRegistry.GetSchema(schemaId)
119+
if err != nil {
120+
return nil, err
121+
}
122+
var codec *goavro.Codec
123+
if codecI, ok := d.CodecCache.Load(schemaId); ok {
124+
codec, _ = codecI.(*goavro.Codec)
125+
}
126+
127+
if codec == nil {
128+
codec, err = goavro.NewCodec(schema)
129+
if err != nil {
130+
return nil, err
131+
}
132+
133+
d.CodecCache.Store(schemaId, codec)
134+
}
135+
136+
native, _, err := codec.NativeFromBinary(avroRecord)
137+
if err != nil {
138+
return nil, err
139+
}
140+
141+
return native, nil
142+
}
143+
144+
func getSchemaId(value []byte) int32 {
145+
schemaIdBytes := value[1:5]
119146
return int32(schemaIdBytes[0])<<24 | int32(schemaIdBytes[1])<<16 | int32(schemaIdBytes[2])<<8 | int32(schemaIdBytes[3])
120147
}

src/kafka/encode_decode_test.go

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,36 @@ import (
1313
"github.com/stretchr/testify/assert"
1414
)
1515

16-
type dummy struct {
16+
type dummyValue struct {
1717
Id string `json:"id"`
1818
Timestamp int64 `json:"timestamp"`
1919
}
2020

21+
type dummyKey struct {
22+
Id string `json:"id"`
23+
}
24+
25+
type dummyIncludeKey struct {
26+
Id string `json:"id"`
27+
Timestamp int64 `json:"timestamp"`
28+
Key dummyKey `json:"key"`
29+
}
30+
2131
func TestDecoder_JsonMessageToRecord(t *testing.T) {
2232
d := &Decoder{CodecCache: sync.Map{}}
23-
val := dummy{"alo", 60}
33+
val := dummyValue{"alo", 60}
2434
jsonBytes, err := json.Marshal(val)
2535
record, err := d.JsonMessageToRecord(context.Background(), &sarama.ConsumerMessage{
2636
Value: jsonBytes,
2737
Topic: "test",
2838
Partition: 1,
2939
Offset: 54,
3040
Timestamp: time.Now(),
31-
})
41+
}, false)
3242
assert.Nil(t, err)
3343
returnedJsonBytes, err := json.Marshal(record.Json)
3444
assert.Nil(t, err)
35-
var returnedVal dummy
45+
var returnedVal dummyValue
3646
err = json.Unmarshal(returnedJsonBytes, &returnedVal)
3747
assert.Nil(t, err)
3848
assert.Equal(t, val, returnedVal)
@@ -47,15 +57,71 @@ func TestDecoder_JsonMessageToRecord_MalformedJson(t *testing.T) {
4757
Partition: 1,
4858
Offset: 54,
4959
Timestamp: time.Now(),
50-
})
60+
}, false)
5161
assert.Nil(t, record)
5262
assert.NotNil(t, err)
5363
}
5464

5565
func TestDecoder_AvroMessageToRecord_NilMessageValue(t *testing.T) {
5666
d := &Decoder{CodecCache: sync.Map{}}
57-
record, err := d.AvroMessageToRecord(context.Background(), &sarama.ConsumerMessage{Value: nil, Topic: "test", Partition: 1, Offset: 54, Timestamp: time.Now()})
67+
record, err := d.AvroMessageToRecord(context.Background(), &sarama.ConsumerMessage{
68+
Value: nil,
69+
Topic: "test",
70+
Partition: 1, Offset: 54,
71+
Timestamp: time.Now()},
72+
false)
5873
isErrNilMessage := errors.Is(err, e.ErrNilMessage)
5974
assert.Nil(t, record)
6075
assert.True(t, isErrNilMessage)
6176
}
77+
78+
func TestDecoder_AvroMessageToRecord_NilMessageValue_IncludeKey(t *testing.T) {
79+
d := &Decoder{CodecCache: sync.Map{}}
80+
key := dummyKey{"marco"}
81+
jsonBytesKey, _ := json.Marshal(key)
82+
record, err := d.AvroMessageToRecord(context.Background(), &sarama.ConsumerMessage{
83+
Value: nil,
84+
Key: jsonBytesKey,
85+
Topic: "test",
86+
Partition: 1, Offset: 54,
87+
Timestamp: time.Now()},
88+
true)
89+
isErrNilMessage := errors.Is(err, e.ErrNilMessage)
90+
assert.Nil(t, record)
91+
assert.True(t, isErrNilMessage)
92+
}
93+
94+
func TestDecoder_JsonMessageToRecord_IncludeKey(t *testing.T) {
95+
d := &Decoder{CodecCache: sync.Map{}}
96+
97+
key := dummyKey{"marco"}
98+
jsonBytesKey, _ := json.Marshal(key)
99+
100+
val := dummyValue{"pop", 60}
101+
jsonBytesValue, _ := json.Marshal(val)
102+
103+
expected := dummyIncludeKey{"pop", 60, dummyKey{"marco"}}
104+
105+
record, err := d.JsonMessageToRecord(context.Background(), &sarama.ConsumerMessage{
106+
Key: jsonBytesKey,
107+
Value: jsonBytesValue,
108+
Topic: "test",
109+
Partition: 1,
110+
Offset: 54,
111+
Timestamp: time.Now(),
112+
}, true)
113+
assert.Nil(t, err)
114+
115+
returnedJsonBytes, err := json.Marshal(record.Json)
116+
assert.Nil(t, err)
117+
118+
var returnedVal dummyValue
119+
err = json.Unmarshal(returnedJsonBytes, &returnedVal)
120+
assert.Nil(t, err)
121+
assert.Equal(t, val, returnedVal)
122+
123+
var returnedKeyIncluded dummyIncludeKey
124+
err = json.Unmarshal(returnedJsonBytes, &returnedKeyIncluded)
125+
assert.Nil(t, err)
126+
assert.Equal(t, expected, returnedKeyIncluded)
127+
}

0 commit comments

Comments
 (0)