Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 25965fe

Browse files
author
João Assunção
committed
refactor(all): changes withKeyAndValue to includeKey also adds a test for when this parameter is true
1 parent f7aa42b commit 25965fe

File tree

5 files changed

+62
-16
lines changed

5 files changed

+62
-16
lines changed

src/injector/injector.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55

66
"time"
77

8-
"github.com/inloco/kafka-elasticsearch-injector/src/kafka"
9-
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
108
"github.com/go-kit/kit/log"
119
"github.com/go-kit/kit/log/level"
10+
"github.com/inloco/kafka-elasticsearch-injector/src/kafka"
11+
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
1212
)
1313

1414
func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *schema_registry.SchemaRegistry, kafkaConfig *kafka.Config) (kafka.Consumer, error) {
@@ -37,10 +37,10 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
3737
SchemaRegistry: schemaRegistry,
3838
}
3939

40-
withKeyAndValue, err := strconv.ParseBool(kafkaConfig.WithKeyAndValue)
40+
includeKey, err := strconv.ParseBool(kafkaConfig.IncludeKey)
4141
if err != nil {
4242
level.Warn(logger).Log("err", err, "message", "failed to get consumer with key and value")
43-
withKeyAndValue = false
43+
includeKey = false
4444
}
4545

4646
return kafka.Consumer{
@@ -53,6 +53,6 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
5353
BatchSize: batchSize,
5454
MetricsUpdateInterval: metricsUpdateInterval,
5555
BufferSize: bufferSize,
56-
WithKeyAndValue: withKeyAndValue,
56+
IncludeKey: includeKey,
5757
}, nil
5858
}

src/kafka/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ type Config struct {
1313
MetricsUpdateInterval string
1414
BufferSize string
1515
RecordType string
16-
WithKeyAndValue string
16+
IncludeKey string
1717
}

src/kafka/consumer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"time"
99

1010
"github.com/Shopify/sarama"
11-
"github.com/bsm/sarama-cluster"
11+
cluster "github.com/bsm/sarama-cluster"
1212
"github.com/go-kit/kit/endpoint"
1313
"github.com/go-kit/kit/log"
1414
"github.com/go-kit/kit/log/level"
@@ -43,7 +43,7 @@ type Consumer struct {
4343
BatchSize int
4444
MetricsUpdateInterval time.Duration
4545
BufferSize int
46-
WithKeyAndValue bool
46+
IncludeKey bool
4747
}
4848

4949
type topicPartitionOffset struct {
@@ -145,7 +145,7 @@ func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications c
145145
for idx == buffSize {
146146
if decoded == nil {
147147
for _, msg := range buf {
148-
req, err := k.consumer.Decoder(nil, msg, k.consumer.WithKeyAndValue)
148+
req, err := k.consumer.Decoder(nil, msg, k.consumer.IncludeKey)
149149
if err != nil {
150150
if errors.Is(err, e.ErrNilMessage) {
151151
continue

src/kafka/encode_decode.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ import (
1111
"sync"
1212

1313
"github.com/Shopify/sarama"
14-
"github.com/linkedin/goavro/v2"
1514
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1615
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1716
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
17+
"github.com/linkedin/goavro/v2"
1818
)
1919

2020
// DecodeMessageFunc extracts a user-domain request object from an Kafka
@@ -39,7 +39,7 @@ func (d *Decoder) DeserializerFor(recordType string) DecodeMessageFunc {
3939
}
4040
}
4141

42-
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, withKeyAndValue bool) (*models.Record, error) {
42+
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error) {
4343
if msg.Value == nil {
4444
return nil, e.ErrNilMessage
4545
}
@@ -63,7 +63,7 @@ func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.Consu
6363

6464
parsedNative[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
6565

66-
if withKeyAndValue && msg.Key != nil {
66+
if includeKey && msg.Key != nil {
6767
nativeKey, err := d.nativeFromBinary(msg.Key)
6868
if err != nil {
6969
return nil, err
@@ -84,8 +84,9 @@ func makeTimestamp(timestamp time.Time) int64 {
8484
return timestamp.UnixNano() / int64(time.Millisecond)
8585
}
8686

87-
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, withKeyAndValue bool) (*models.Record, error) {
87+
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error) {
8888
var jsonValue map[string]interface{}
89+
var jsonKey map[string]interface{}
8990
err := json.Unmarshal(msg.Value, &jsonValue)
9091

9192
if err != nil {
@@ -94,6 +95,14 @@ func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.Consu
9495

9596
jsonValue[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
9697

98+
if includeKey && msg.Key != nil {
99+
err := json.Unmarshal(msg.Key, &jsonKey)
100+
if err != nil {
101+
return nil, err
102+
}
103+
jsonValue[keyField] = jsonKey
104+
}
105+
97106
return &models.Record{
98107
Topic: msg.Topic,
99108
Partition: msg.Partition,

src/kafka/encode_decode_test.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,24 @@ import (
1313
"github.com/stretchr/testify/assert"
1414
)
1515

16-
type dummy struct {
16+
type dummyValue struct {
1717
Id string `json:"id"`
1818
Timestamp int64 `json:"timestamp"`
1919
}
2020

21+
type dummyKey struct {
22+
Id string `json:"id"`
23+
}
24+
25+
type dummyIncludeKey struct {
26+
Id string `json:"id"`
27+
Timestamp int64 `json:"timestamp"`
28+
Key dummyKey `json:"key"`
29+
}
30+
2131
func TestDecoder_JsonMessageToRecord(t *testing.T) {
2232
d := &Decoder{CodecCache: sync.Map{}}
23-
val := dummy{"alo", 60}
33+
val := dummyValue{"alo", 60}
2434
jsonBytes, err := json.Marshal(val)
2535
record, err := d.JsonMessageToRecord(context.Background(), &sarama.ConsumerMessage{
2636
Value: jsonBytes,
@@ -32,7 +42,7 @@ func TestDecoder_JsonMessageToRecord(t *testing.T) {
3242
assert.Nil(t, err)
3343
returnedJsonBytes, err := json.Marshal(record.Json)
3444
assert.Nil(t, err)
35-
var returnedVal dummy
45+
var returnedVal dummyValue
3646
err = json.Unmarshal(returnedJsonBytes, &returnedVal)
3747
assert.Nil(t, err)
3848
assert.Equal(t, val, returnedVal)
@@ -59,3 +69,30 @@ func TestDecoder_AvroMessageToRecord_NilMessageValue(t *testing.T) {
5969
assert.Nil(t, record)
6070
assert.True(t, isErrNilMessage)
6171
}
72+
73+
func TestDecoder_JsonMessageToRecord_IncludeKey(t *testing.T) {
74+
d := &Decoder{CodecCache: sync.Map{}}
75+
key := dummyKey{"marco"}
76+
jsonBytesKey, err := json.Marshal(key)
77+
val := dummyValue{"pop", 60}
78+
jsonBytesValue, err := json.Marshal(val)
79+
expected := dummyIncludeKey{"pop", 60, dummyKey{"marco"}}
80+
record, err := d.JsonMessageToRecord(context.Background(), &sarama.ConsumerMessage{
81+
Key: jsonBytesKey,
82+
Value: jsonBytesValue,
83+
Topic: "test",
84+
Partition: 1,
85+
Offset: 54,
86+
Timestamp: time.Now(),
87+
}, true)
88+
assert.Nil(t, err)
89+
returnedJsonBytes, err := json.Marshal(record.Json)
90+
assert.Nil(t, err)
91+
var returnedVal dummyValue
92+
err = json.Unmarshal(returnedJsonBytes, &returnedVal)
93+
assert.Nil(t, err)
94+
assert.Equal(t, val, returnedVal)
95+
var returnedKeyIncluded dummyIncludeKey
96+
err = json.Unmarshal(returnedJsonBytes, &returnedKeyIncluded)
97+
assert.Equal(t, expected, returnedKeyIncluded)
98+
}

0 commit comments

Comments
 (0)