@@ -118,51 +118,6 @@ func TestKafka_Start(t *testing.T) {
118118 rec := fixtures .NewFixtureRecord ()
119119 var msg * sarama.ProducerMessage
120120 if assert .NoError (t , err ) {
121-
122- err = producer .Publish (rec )
123- if assert .NoError (t , err ) {
124- msg = <- producer .GetSuccesses ()
125- } else {
126- fmt .Println (err .Error ())
127- }
128- }
129- <- notifications
130- esIndex := fmt .Sprintf ("%s-%s" , msg .Topic , time .Now ().Format ("2006-01-02" ))
131- esId := fmt .Sprintf ("%d:%d" , msg .Partition , msg .Offset )
132- _ , err = db .GetClient ().Refresh (esIndex ).Do (context .Background ())
133- if assert .NoError (t , err ) {
134- res , err := db .GetClient ().Get ().Index (esIndex ).Id (esId ).Do (context .Background ())
135- var r fixtures.FixtureRecord
136- if assert .NoError (t , err ) {
137- assert .True (t , res .Found )
138- err = json .Unmarshal (res .Source , & r )
139- if assert .NoError (t , err ) {
140- assert .Equal (t , rec .Id , r .Id )
141- assert .InDelta (t , expectedTimestamp , r .Timestamp , 5000.0 )
142- }
143- }
144- signals <- os .Interrupt
145- }
146- db .GetClient ().DeleteByQuery (esIndex ).Query (elastic.MatchAllQuery {}).Do (context .Background ())
147- db .CloseClient ()
148- }
149-
150- func TestKafka_Start_WithNilValue (t * testing.T ) {
151- signals := make (chan os.Signal , 1 )
152- notifications := make (chan Notification , 1 )
153- go k .Start (signals , notifications )
154- config := sarama .NewConfig ()
155- config .Producer .Return .Successes = true
156- config .Producer .MaxMessageBytes = 20 * 1024 * 1024 // 20mb
157- config .Producer .Flush .Frequency = 1 * time .Millisecond
158- config .Version = sarama .V0_10_0_0 // This version is the same as in production
159- <- notifications
160- producer , err := fixtures .NewProducer ("localhost:9092" , config , schemaRegistry )
161- expectedTimestamp := time .Now ().UnixNano () / int64 (time .Millisecond )
162- rec := fixtures .NewEmptyFixtureRecord ()
163- var msg * sarama.ProducerMessage
164- if assert .NoError (t , err ) {
165-
166121 err = producer .Publish (rec )
167122 if assert .NoError (t , err ) {
168123 msg = <- producer .GetSuccesses ()
0 commit comments