@@ -792,10 +792,11 @@ func TestExtractTopics(t *testing.T) {
792792
793793func TestReaderConsumerGroup (t * testing.T ) {
794794 tests := []struct {
795- scenario string
796- partitions int
797- commitInterval time.Duration
798- function func (* testing.T , context.Context , * Reader )
795+ scenario string
796+ partitions int
797+ commitInterval time.Duration
798+ errorOnWrongGeneration bool
799+ function func (* testing.T , context.Context , * Reader )
799800 }{
800801 {
801802 scenario : "basic handshake" ,
@@ -856,11 +857,13 @@ func TestReaderConsumerGroup(t *testing.T) {
856857 partitions : 1 ,
857858 function : testConsumerGroupSimple ,
858859 },
860+
859861 {
860- scenario : "Do not commit not assigned messages after rebalance" ,
861- partitions : 2 ,
862- function : testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions ,
863- commitInterval : 0 ,
862+ scenario : "Do not commit not assigned messages after rebalance" ,
863+ partitions : 2 ,
864+ function : testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions ,
865+ commitInterval : 0 ,
866+ errorOnWrongGeneration : true ,
864867 },
865868 }
866869
@@ -877,15 +880,16 @@ func TestReaderConsumerGroup(t *testing.T) {
877880
878881 groupID := makeGroupID ()
879882 r := NewReader (ReaderConfig {
880- Brokers : []string {"localhost:9092" },
881- Topic : topic ,
882- GroupID : groupID ,
883- HeartbeatInterval : 2 * time .Second ,
884- CommitInterval : test .commitInterval ,
885- RebalanceTimeout : 2 * time .Second ,
886- RetentionTime : time .Hour ,
887- MinBytes : 1 ,
888- MaxBytes : 1e6 ,
883+ Brokers : []string {"localhost:9092" },
884+ Topic : topic ,
885+ GroupID : groupID ,
886+ HeartbeatInterval : 2 * time .Second ,
887+ CommitInterval : test .commitInterval ,
888+ RebalanceTimeout : 2 * time .Second ,
889+ RetentionTime : time .Hour ,
890+ MinBytes : 1 ,
891+ MaxBytes : 1e6 ,
892+ ErrorOnWrongGenerationCommit : test .errorOnWrongGeneration ,
889893 })
890894 defer r .Close ()
891895
@@ -1193,6 +1197,8 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
11931197 BatchSize : 1 ,
11941198 Transport : client .Transport ,
11951199 }
1200+
1201+ // Write 4 messages and ensure that they go the each one of the partitions
11961202 messageCount := 4
11971203 if err := writer .WriteMessages (ctx , makeTestSequence (messageCount )... ); err != nil {
11981204 t .Fatalf ("bad write messages: %v" , err )
@@ -1220,9 +1226,12 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12201226 }
12211227 }
12221228 require .Equal (t , messageCount , totalEvents )
1229+
1230+ // create a second reader
12231231 secondReader := NewReader (firstReader .config )
12241232 defer secondReader .Close ()
12251233
1234+ // wait until the group has 2 members
12261235 require .Eventually (t , func () bool {
12271236 resp , err := client .DescribeGroups (
12281237 ctx ,
@@ -1247,28 +1256,14 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12471256 }
12481257 partitionAssignedToFirstConsumer := (partitionAssignedToSecondConsumer + 1 ) % 2
12491258
1250- // commit all messages for the second reader and wait until commits reach the server
1259+ // commit all messages for the second reader (no need to wait until commits reach the server
1260+ // because CommitInterval is set to 0)
12511261 require .NoError (t , secondReader .CommitMessages (ctx , msgsForSecondReader ... ))
1252- require .Eventually (t , func () bool {
1253- resp , err := client .OffsetFetch (ctx , & OffsetFetchRequest {
1254- GroupID : firstReader .config .GroupID ,
1255- Topics : map [string ][]int {firstReader .config .Topic : {partitionAssignedToSecondConsumer }},
1256- })
1257- require .NoError (t , err )
1258- require .NotNil (t , resp )
12591262
1260- for _ , topicOffsets := range resp .Topics {
1261- for _ , offsetPartition := range topicOffsets {
1262- if offsetPartition .Partition == partitionAssignedToSecondConsumer {
1263- return msgsForSecondReader [len (msgsForSecondReader )- 1 ].Offset + 1 == offsetPartition .CommittedOffset
1264- }
1265- }
1266- }
1267- return false
1268- }, 5 * time .Second , 100 * time .Millisecond , "Offsets were never committed" )
1269-
1270- // commit all messages the first reader received
1263+ // commit all messages the first reader received, we expect an error
12711264 require .ErrorIs (t , IllegalGeneration , firstReader .CommitMessages (ctx , allMessages ... ))
1265+
1266+ // verify that no offsets have been altered
12721267 require .Eventually (t , func () bool {
12731268 resp , err := client .OffsetFetch (ctx , & OffsetFetchRequest {
12741269 GroupID : firstReader .config .GroupID ,
@@ -1287,7 +1282,9 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12871282 return false
12881283 }, 5 * time .Second , 100 * time .Millisecond , "Offsets were altered" )
12891284
1290- // commit the messages it can actually commit and verify it works
1285+ // we can read the messages again because generation changes
1286+ // cause uncommitted offsets to be lost
1287+ totalEvents = 0
12911288 for i := 0 ; i < len (msgsForFirstReader ); i ++ {
12921289 if msg , err := firstReader .FetchMessage (ctx ); err != nil {
12931290 t .Errorf ("reader %v expected to read 1 message" , i )
@@ -1301,24 +1298,11 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
13011298 totalEvents ++
13021299 }
13031300 }
1304- require .NoError (t , firstReader .CommitMessages (ctx , msgsForFirstReader [partitionAssignedToFirstConsumer ][len (msgsForFirstReader [partitionAssignedToFirstConsumer ])- 1 ]))
1305- require .Eventually (t , func () bool {
1306- resp , err := client .OffsetFetch (ctx , & OffsetFetchRequest {
1307- GroupID : firstReader .config .GroupID ,
1308- Topics : map [string ][]int {firstReader .config .Topic : {partitionAssignedToFirstConsumer }},
1309- })
1310- require .NoError (t , err )
1311- require .NotNil (t , resp )
1301+ require .Equal (t , 2 , totalEvents )
13121302
1313- for _ , topicOffsets := range resp .Topics {
1314- for _ , offsetPartition := range topicOffsets {
1315- if offsetPartition .Partition == partitionAssignedToFirstConsumer {
1316- return msgsForFirstReader [partitionAssignedToFirstConsumer ][len (msgsForSecondReader )- 1 ].Offset + 1 == offsetPartition .CommittedOffset
1317- }
1318- }
1319- }
1320- return false
1321- }, 5 * time .Second , 100 * time .Millisecond , "Could not commit the new element" )
1303+ // commit the messages it can actually commit and verify it works
1304+ // no need to wait because CommitInterval is 0
1305+ require .NoError (t , firstReader .CommitMessages (ctx , msgsForFirstReader [partitionAssignedToFirstConsumer ][len (msgsForFirstReader [partitionAssignedToFirstConsumer ])- 1 ]))
13221306}
13231307
13241308func TestOffsetStash (t * testing.T ) {
0 commit comments