Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit b6af16a

Browse files
committed
test(consumer_test.go): add tests for empty record value
1 parent 1a6169f commit b6af16a

File tree

4 files changed

+53
-1
lines changed

4 files changed

+53
-1
lines changed

src/injector/store/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ type basicStore struct {
2121
}
2222

2323
func (s basicStore) Insert(records []*models.Record) error {
24+
if len(records) == 0 {
25+
return nil
26+
}
27+
2428
elasticRecords, err := s.codec.EncodeElasticRecords(records)
2529
if err != nil {
2630
return err

src/kafka/consumer_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,47 @@ func TestKafka_Start(t *testing.T) {
139139
db.GetClient().DeleteByQuery(esIndex).Query(elastic.MatchAllQuery{}).Do(context.Background())
140140
db.CloseClient()
141141
}
142+
143+
func TestKafka_Start_WithNilValue(t *testing.T) {
144+
signals := make(chan os.Signal, 1)
145+
notifications := make(chan Notification, 1)
146+
go k.Start(signals, notifications)
147+
config := sarama.NewConfig()
148+
config.Producer.Return.Successes = true
149+
config.Producer.MaxMessageBytes = 20 * 1024 * 1024 // 20mb
150+
config.Producer.Flush.Frequency = 1 * time.Millisecond
151+
config.Version = sarama.V0_10_0_0 // This version is the same as in production
152+
<-notifications
153+
producer, err := fixtures.NewProducer("localhost:9092", config, schemaRegistry)
154+
expectedTimestamp := time.Now().UnixNano() / int64(time.Millisecond)
155+
rec := fixtures.NewEmptyFixtureRecord()
156+
var msg *sarama.ProducerMessage
157+
if assert.NoError(t, err) {
158+
159+
err = producer.Publish(rec)
160+
if assert.NoError(t, err) {
161+
msg = <-producer.GetSuccesses()
162+
} else {
163+
fmt.Println(err.Error())
164+
}
165+
}
166+
<-notifications
167+
esIndex := fmt.Sprintf("%s-%s", msg.Topic, time.Now().Format("2006-01-02"))
168+
esId := fmt.Sprintf("%d:%d", msg.Partition, msg.Offset)
169+
_, err = db.GetClient().Refresh(esIndex).Do(context.Background())
170+
if assert.NoError(t, err) {
171+
res, err := db.GetClient().Get().Index(esIndex).Id(esId).Do(context.Background())
172+
var r fixtures.FixtureRecord
173+
if assert.NoError(t, err) {
174+
assert.True(t, res.Found)
175+
err = json.Unmarshal(res.Source, &r)
176+
if assert.NoError(t, err) {
177+
assert.Equal(t, rec.Id, r.Id)
178+
assert.InDelta(t, expectedTimestamp, r.Timestamp, 5000.0)
179+
}
180+
}
181+
signals <- os.Interrupt
182+
}
183+
db.GetClient().DeleteByQuery(esIndex).Query(elastic.MatchAllQuery{}).Do(context.Background())
184+
db.CloseClient()
185+
}

src/kafka/encode_decode_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ func TestDecoder_AvroMessageToRecord_NilMessageValue(t *testing.T) {
5757
record, err := d.AvroMessageToRecord(nil, &sarama.ConsumerMessage{Value: nil})
5858
isErrNilMessage := errors.Is(err, e.ErrNilMessage)
5959
assert.Nil(t, record)
60-
assert.Equal(t, isErrNilMessage, true)
60+
assert.True(t, isErrNilMessage)
6161
}

src/kafka/fixtures/fixtures.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ func NewFixtureRecord() *FixtureRecord {
3838
return &FixtureRecord{Id: rand.Int31()}
3939
}
4040

41+
func NewEmptyFixtureRecord() *FixtureRecord {
42+
return &FixtureRecord{}
43+
}
44+
4145
func NewRecord(ts time.Time) (*models.Record, int32, int32) {
4246
id := rand.Int31()
4347
value := rand.Int31()

0 commit comments

Comments
 (0)