@@ -897,36 +897,6 @@ func TestReaderConsumerGroup(t *testing.T) {
897897 }
898898}
899899
900- func TestReaderConsumerGroup2 (t * testing.T ) {
901- // It appears that some of the tests depend on all these tests being
902- // run concurrently to pass... this is brittle and should be fixed
903- // at some point.
904- t .Parallel ()
905-
906- topic := makeTopic ()
907- createTopic (t , topic , 2 )
908- defer deleteTopic (t , topic )
909-
910- groupID := makeGroupID ()
911- r := NewReader (ReaderConfig {
912- Brokers : []string {"localhost:9092" },
913- Topic : topic ,
914- GroupID : groupID ,
915- HeartbeatInterval : 2 * time .Second ,
916- CommitInterval : 0 ,
917- RebalanceTimeout : 2 * time .Second ,
918- RetentionTime : time .Hour ,
919- MinBytes : 1 ,
920- MaxBytes : 1e6 ,
921- })
922- defer r .Close ()
923-
924- ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
925- defer cancel ()
926-
927- testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions (t , ctx , r )
928- }
929-
930900func testReaderConsumerGroupHandshake (t * testing.T , ctx context.Context , r * Reader ) {
931901 prepareReader (t , context .Background (), r , makeTestSequence (5 )... )
932902
@@ -1231,6 +1201,23 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12311201 t .Fatalf ("bad write err: %v" , err )
12321202 }
12331203
1204+ // read all messages for the first reader
1205+ msgsForFirstReader := make (map [int ][]Message , 0 )
1206+ totalEvents := 0
1207+ for i := 0 ; i < messageCount ; i ++ {
1208+ if msg , err := firstReader .FetchMessage (ctx ); err != nil {
1209+ t .Errorf ("reader %v expected to read 1 message" , i )
1210+ } else {
1211+ msgs , ok := msgsForFirstReader [msg .Partition ]
1212+ if ! ok {
1213+ msgs = make ([]Message , 0 )
1214+ }
1215+ msgs = append (msgs , msg )
1216+ msgsForFirstReader [msg .Partition ] = msgs
1217+ totalEvents ++
1218+ }
1219+ }
1220+ require .Equal (t , messageCount , totalEvents )
12341221 secondReader := NewReader (firstReader .config )
12351222 defer secondReader .Close ()
12361223
@@ -1244,21 +1231,92 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12441231 assert .NoError (t , err )
12451232 assert .NotNil (t , resp )
12461233 return len (resp .Groups [0 ].Members ) == 2
1247- }, 10 * time .Second , 100 * time .Millisecond )
1234+ }, 10 * time .Second , 100 * time .Millisecond , "Group does not have 2 members" )
12481235
1249- topicsToCommit := make ( map [ int ] int64 )
1236+ var partitionAssignedToSecondConsumer int
12501237 msgsForSecondReader := make ([]Message , 0 , messageCount )
12511238 for i := 0 ; i < messageCount / 2 ; i ++ {
12521239 if msg , err := secondReader .FetchMessage (ctx ); err != nil {
12531240 t .Errorf ("reader %v expected to read 1 message" , i )
12541241 } else {
12551242 msgsForSecondReader = append (msgsForSecondReader , msg )
1256- topicsToCommit [msg .Partition ] = msg .Offset
1243+ partitionAssignedToSecondConsumer = msg .Partition
1244+ }
1245+ }
1246+ partitionAssignedToFirstConsumer := (partitionAssignedToSecondConsumer + 1 ) % 2
1247+
1248+ // commit all messages for the second reader and wait until commits reach the server
1249+ require .NoError (t , secondReader .CommitMessages (ctx , msgsForSecondReader ... ))
1250+ require .Eventually (t , func () bool {
1251+ resp , err := client .OffsetFetch (ctx , & OffsetFetchRequest {
1252+ GroupID : firstReader .config .GroupID ,
1253+ Topics : map [string ][]int {firstReader .config .Topic : []int {partitionAssignedToSecondConsumer }},
1254+ })
1255+ require .NoError (t , err )
1256+ require .NotNil (t , resp )
1257+
1258+ for _ , topicOffsets := range resp .Topics {
1259+ for _ , offsetPartition := range topicOffsets {
1260+ if offsetPartition .Partition == partitionAssignedToSecondConsumer {
1261+ return msgsForSecondReader [len (msgsForSecondReader )- 1 ].Offset + 1 == offsetPartition .CommittedOffset
1262+ }
1263+ }
1264+ }
1265+ return false
1266+ }, 5 * time .Second , 100 * time .Millisecond , "Offsets were never committed" )
1267+
1268+ // commit first message for the second reader on the first reader
1269+ require .NoError (t , firstReader .CommitMessages (ctx , msgsForFirstReader [partitionAssignedToSecondConsumer ][0 ]))
1270+ require .Eventually (t , func () bool {
1271+ resp , err := client .OffsetFetch (ctx , & OffsetFetchRequest {
1272+ GroupID : firstReader .config .GroupID ,
1273+ Topics : map [string ][]int {firstReader .config .Topic : []int {partitionAssignedToSecondConsumer }},
1274+ })
1275+ require .NoError (t , err )
1276+ require .NotNil (t , resp )
1277+
1278+ for _ , topicOffsets := range resp .Topics {
1279+ for _ , offsetPartition := range topicOffsets {
1280+ if offsetPartition .Partition == partitionAssignedToSecondConsumer {
1281+ return msgsForSecondReader [len (msgsForSecondReader )- 1 ].Offset + 1 == offsetPartition .CommittedOffset
1282+ }
1283+ }
1284+ }
1285+ return false
1286+ }, 5 * time .Second , 100 * time .Millisecond , "Offsets were altered" )
1287+
1288+ // commit the messages it can actually commit and verify it works
1289+ for i := 0 ; i < len (msgsForFirstReader ); i ++ {
1290+ if msg , err := firstReader .FetchMessage (ctx ); err != nil {
1291+ t .Errorf ("reader %v expected to read 1 message" , i )
1292+ } else {
1293+ msgs , ok := msgsForFirstReader [msg .Partition ]
1294+ if ! ok {
1295+ msgs = make ([]Message , 0 )
1296+ }
1297+ msgs = append (msgs , msg )
1298+ msgsForFirstReader [msg .Partition ] = msgs
1299+ totalEvents ++
12571300 }
12581301 }
1302+ require .NoError (t , firstReader .CommitMessages (ctx , msgsForFirstReader [partitionAssignedToFirstConsumer ][len (msgsForFirstReader [partitionAssignedToFirstConsumer ])- 1 ]))
1303+ require .Eventually (t , func () bool {
1304+ resp , err := client .OffsetFetch (ctx , & OffsetFetchRequest {
1305+ GroupID : firstReader .config .GroupID ,
1306+ Topics : map [string ][]int {firstReader .config .Topic : []int {partitionAssignedToFirstConsumer }},
1307+ })
1308+ require .NoError (t , err )
1309+ require .NotNil (t , resp )
12591310
1260- require .NoError (t , secondReader .CommitMessages (ctx , msgsForSecondReader [len (msgsForSecondReader )- 1 ]))
1261- require .ErrorIs (t , errInvalidWritePartition , firstReader .CommitMessages (ctx , msgsForSecondReader [0 ]))
1311+ for _ , topicOffsets := range resp .Topics {
1312+ for _ , offsetPartition := range topicOffsets {
1313+ if offsetPartition .Partition == partitionAssignedToFirstConsumer {
1314+ return msgsForFirstReader [partitionAssignedToFirstConsumer ][len (msgsForSecondReader )- 1 ].Offset + 1 == offsetPartition .CommittedOffset
1315+ }
1316+ }
1317+ }
1318+ return false
1319+ }, 5 * time .Second , 100 * time .Millisecond , "Could not commit the new element" )
12621320}
12631321
12641322func TestOffsetStash (t * testing.T ) {
0 commit comments