Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit f667bdb

Browse files
committed
fix(encode_decode.go): ignore message with nil values
1 parent c3fe541 commit f667bdb

File tree

1 file changed

+10
-0
lines changed

1 file changed

+10
-0
lines changed

src/kafka/encode_decode.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ func (d *Decoder) DeserializerFor(recordType string) DecodeMessageFunc {
3838
}
3939

4040
func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
41+
if msg.Value == nil {
42+
return &models.Record{
43+
Topic: msg.Topic,
44+
Partition: msg.Partition,
45+
Offset: msg.Offset,
46+
Timestamp: msg.Timestamp,
47+
Json: nil,
48+
}, nil
49+
}
50+
4151
schemaId := getSchemaId(msg)
4252
avroRecord := msg.Value[5:]
4353
schema, err := d.SchemaRegistry.GetSchema(schemaId)

0 commit comments

Comments
 (0)