Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 3ed86f9

Browse files
luiz-nogueiraJoão Assunção
authored andcommitted
feat(injector): add WithKeyAndValue variable
1 parent 091ae77 commit 3ed86f9

File tree

6 files changed

+17
-7
lines changed

6 files changed

+17
-7
lines changed

cmd/injector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func main() {
4545
BufferSize: os.Getenv("KAFKA_CONSUMER_BUFFER_SIZE"),
4646
MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"),
4747
RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"),
48+
WithKeyAndValue: os.Getenv("KAFKA_CONSUMER_WITH_KEY_AND_VALUE"),
4849
}
4950
metricsPublisher := metrics.NewMetricsPublisher()
5051
service := injector.NewService(logger, metricsPublisher)

src/injector/injector.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
3737
SchemaRegistry: schemaRegistry,
3838
}
3939

40+
withKeyAndValue, err := strconv.ParseBool(kafkaConfig.WithKeyAndValue)
41+
if err != nil {
42+
level.Warn(logger).Log("err", err, "message", "failed to get consumer with key and value")
43+
withKeyAndValue = false
44+
}
45+
4046
return kafka.Consumer{
4147
Topics: kafkaConfig.Topics,
4248
Group: kafkaConfig.ConsumerGroup,
@@ -47,5 +53,6 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
4753
BatchSize: batchSize,
4854
MetricsUpdateInterval: metricsUpdateInterval,
4955
BufferSize: bufferSize,
56+
WithKeyAndValue: withKeyAndValue,
5057
}, nil
5158
}

src/kafka/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ type Config struct {
1313
MetricsUpdateInterval string
1414
BufferSize string
1515
RecordType string
16+
WithKeyAndValue string
1617
}

src/kafka/consumer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Consumer struct {
4343
BatchSize int
4444
MetricsUpdateInterval time.Duration
4545
BufferSize int
46+
WithKeyAndValue bool
4647
}
4748

4849
type topicPartitionOffset struct {
@@ -144,7 +145,7 @@ func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications c
144145
for idx == buffSize {
145146
if decoded == nil {
146147
for _, msg := range buf {
147-
req, err := k.consumer.Decoder(nil, msg)
148+
req, err := k.consumer.Decoder(nil, msg, k.consumer.WithKeyAndValue)
148149
if err != nil {
149150
if errors.Is(err, e.ErrNilMessage) {
150151
continue

src/kafka/encode_decode.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
// message object. It's designed to be used in Kafka consumers.
2222
// One straightforward DecodeMessageFunc could be something that
2323
// Avro decodes the message body to the concrete response type.
24-
type DecodeMessageFunc func(context.Context, *sarama.ConsumerMessage) (record *models.Record, err error)
24+
type DecodeMessageFunc func(context.Context, *sarama.ConsumerMessage, bool) (record *models.Record, err error)
2525

2626
const kafkaTimestampKey = "@timestamp"
2727
const keyField = "key"
@@ -39,7 +39,7 @@ func (d *Decoder) DeserializerFor(recordType string) DecodeMessageFunc {
3939
}
4040
}
4141

42-
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
42+
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, withKeyAndValue bool) (*models.Record, error) {
4343
if msg.Value == nil {
4444
return nil, e.ErrNilMessage
4545
}
@@ -63,7 +63,7 @@ func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.Consu
6363

6464
parsedNative[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
6565

66-
if msg.Key != nil {
66+
if withKeyAndValue && msg.Key != nil {
6767
nativeKey, err := d.nativeFromBinary(msg.Key)
6868
if err != nil {
6969
return nil, err
@@ -84,7 +84,7 @@ func makeTimestamp(timestamp time.Time) int64 {
8484
return timestamp.UnixNano() / int64(time.Millisecond)
8585
}
8686

87-
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
87+
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, withKeyAndValue bool) (*models.Record, error) {
8888
var jsonValue map[string]interface{}
8989
err := json.Unmarshal(msg.Value, &jsonValue)
9090

src/kafka/encode_decode_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestDecoder_JsonMessageToRecord(t *testing.T) {
2828
Partition: 1,
2929
Offset: 54,
3030
Timestamp: time.Now(),
31-
})
31+
}, false)
3232
assert.Nil(t, err)
3333
returnedJsonBytes, err := json.Marshal(record.Json)
3434
assert.Nil(t, err)
@@ -47,7 +47,7 @@ func TestDecoder_JsonMessageToRecord_MalformedJson(t *testing.T) {
4747
Partition: 1,
4848
Offset: 54,
4949
Timestamp: time.Now(),
50-
})
50+
}, false)
5151
assert.Nil(t, record)
5252
assert.NotNil(t, err)
5353
}

0 commit comments

Comments
 (0)