@@ -24,6 +24,7 @@ import (
2424type DecodeMessageFunc func (context.Context , * sarama.ConsumerMessage ) (record * models.Record , err error )
2525
2626const kafkaTimestampKey = "@timestamp"
27+ const keyField = "key"
2728
2829type Decoder struct {
2930 SchemaRegistry * schema_registry.SchemaRegistry
@@ -43,27 +44,7 @@ func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.Consu
4344 return nil , e .ErrNilMessage
4445 }
4546
46- schemaId := getSchemaId (msg )
47- avroRecord := msg .Value [5 :]
48- schema , err := d .SchemaRegistry .GetSchema (schemaId )
49- if err != nil {
50- return nil , err
51- }
52- var codec * goavro.Codec
53- if codecI , ok := d .CodecCache .Load (schemaId ); ok {
54- codec , ok = codecI .(* goavro.Codec )
55- }
56-
57- if codec == nil {
58- codec , err = goavro .NewCodec (schema )
59- if err != nil {
60- return nil , err
61- }
62-
63- d .CodecCache .Store (schemaId , codec )
64- }
65-
66- native , _ , err := codec .NativeFromBinary (avroRecord )
47+ native , err := d .nativeFromBinary (msg .Value )
6748 if err != nil {
6849 return nil , err
6950 }
@@ -82,6 +63,14 @@ func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.Consu
8263
8364 parsedNative [kafkaTimestampKey ] = makeTimestamp (msg .Timestamp )
8465
66+ if msg .Key != nil {
67+ nativeKey , err := d .nativeFromBinary (msg .Key )
68+ if err != nil {
69+ return nil , err
70+ }
71+ parsedNative [keyField ] = nativeKey
72+ }
73+
8574 return & models.Record {
8675 Topic : msg .Topic ,
8776 Partition : msg .Partition ,
@@ -114,7 +103,36 @@ func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.Consu
114103 }, nil
115104}
116105
117- func getSchemaId (msg * sarama.ConsumerMessage ) int32 {
118- schemaIdBytes := msg .Value [1 :5 ]
106+ func (d * Decoder ) nativeFromBinary (value []byte ) (interface {}, error ) {
107+ schemaId := getSchemaId (value )
108+ avroRecord := value [5 :]
109+ schema , err := d .SchemaRegistry .GetSchema (schemaId )
110+ if err != nil {
111+ return nil , err
112+ }
113+ var codec * goavro.Codec
114+ if codecI , ok := d .CodecCache .Load (schemaId ); ok {
115+ codec , ok = codecI .(* goavro.Codec )
116+ }
117+
118+ if codec == nil {
119+ codec , err = goavro .NewCodec (schema )
120+ if err != nil {
121+ return nil , err
122+ }
123+
124+ d .CodecCache .Store (schemaId , codec )
125+ }
126+
127+ native , _ , err := codec .NativeFromBinary (avroRecord )
128+ if err != nil {
129+ return nil , err
130+ }
131+
132+ return native , nil
133+ }
134+
135+ func getSchemaId (value []byte ) int32 {
136+ schemaIdBytes := value [1 :5 ]
119137 return int32 (schemaIdBytes [0 ])<< 24 | int32 (schemaIdBytes [1 ])<< 16 | int32 (schemaIdBytes [2 ])<< 8 | int32 (schemaIdBytes [3 ])
120138}
0 commit comments