@@ -897,6 +897,36 @@ func TestReaderConsumerGroup(t *testing.T) {
897897 }
898898}
899899
900+ func TestReaderConsumerGroup2 (t * testing.T ) {
901+ // It appears that some of the tests depend on all these tests being
902+ // run concurrently to pass... this is brittle and should be fixed
903+ // at some point.
904+ t .Parallel ()
905+
906+ topic := makeTopic ()
907+ createTopic (t , topic , 2 )
908+ defer deleteTopic (t , topic )
909+
910+ groupID := makeGroupID ()
911+ r := NewReader (ReaderConfig {
912+ Brokers : []string {"localhost:9092" },
913+ Topic : topic ,
914+ GroupID : groupID ,
915+ HeartbeatInterval : 2 * time .Second ,
916+ CommitInterval : 0 ,
917+ RebalanceTimeout : 2 * time .Second ,
918+ RetentionTime : time .Hour ,
919+ MinBytes : 1 ,
920+ MaxBytes : 1e6 ,
921+ })
922+ defer r .Close ()
923+
924+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
925+ defer cancel ()
926+
927+ testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions (t , ctx , r )
928+ }
929+
900930func testReaderConsumerGroupHandshake (t * testing.T , ctx context.Context , r * Reader ) {
901931 prepareReader (t , context .Background (), r , makeTestSequence (5 )... )
902932
@@ -1228,27 +1258,7 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12281258 }
12291259
12301260 require .NoError (t , secondReader .CommitMessages (ctx , msgsForSecondReader [len (msgsForSecondReader )- 1 ]))
1231- require .NoError (t , firstReader .CommitMessages (ctx , msgsForSecondReader [0 ]))
1232- resp , err := client .OffsetFetch (
1233- ctx ,
1234- & OffsetFetchRequest {
1235- GroupID : firstReader .config .GroupID ,
1236- Topics : map [string ][]int {firstReader .config .Topic : {0 , 1 }},
1237- },
1238- )
1239- require .NoError (t , err )
1240- require .NotNil (t , resp )
1241- if topics , ok := resp .Topics [firstReader .config .Topic ]; ! ok {
1242- require .True (t , ok , "Topic not found" )
1243- } else {
1244- for _ , topic := range topics {
1245- if offset , ok := topicsToCommit [topic .Partition ]; ok {
1246- assert .Equal (t , offset + 1 , topic .CommittedOffset , "committed partition %d had committed offset %d instead of %d" , topic .Partition , topic .CommittedOffset , offset )
1247- } else {
1248- assert .Equal (t , int64 (- 1 ), topic .CommittedOffset , "not-committed partition %d had committed offset %d instead of 0" , topic .Partition , topic .CommittedOffset )
1249- }
1250- }
1251- }
1261+ require .ErrorIs (t , errInvalidWritePartition , firstReader .CommitMessages (ctx , msgsForSecondReader [0 ]))
12521262}
12531263
12541264func TestOffsetStash (t * testing.T ) {
@@ -1272,16 +1282,16 @@ func TestOffsetStash(t *testing.T) {
12721282 Given : offsetStash {},
12731283 Messages : []Message {newMessage (0 , 0 )},
12741284 Expected : offsetStash {
1275- topic : {0 : 1 },
1285+ topic : {0 : { 1 , 1 } },
12761286 },
12771287 },
12781288 "ignores earlier offsets" : {
12791289 Given : offsetStash {
1280- topic : {0 : 2 },
1290+ topic : {0 : { 2 , 1 } },
12811291 },
12821292 Messages : []Message {newMessage (0 , 0 )},
12831293 Expected : offsetStash {
1284- topic : {0 : 2 },
1294+ topic : {0 : { 2 , 1 } },
12851295 },
12861296 },
12871297 "uses latest offset" : {
@@ -1292,7 +1302,7 @@ func TestOffsetStash(t *testing.T) {
12921302 newMessage (0 , 1 ),
12931303 },
12941304 Expected : offsetStash {
1295- topic : {0 : 4 },
1305+ topic : {0 : { 4 , 1 } },
12961306 },
12971307 },
12981308 "uses latest offset, across multiple topics" : {
@@ -1306,8 +1316,8 @@ func TestOffsetStash(t *testing.T) {
13061316 },
13071317 Expected : offsetStash {
13081318 topic : {
1309- 0 : 4 ,
1310- 1 : 7 ,
1319+ 0 : { 4 , 1 } ,
1320+ 1 : { 7 , 1 } ,
13111321 },
13121322 },
13131323 },
@@ -1359,10 +1369,11 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
13591369 return offsetCommitResponseV2 {}, nil
13601370 },
13611371 },
1362- done : make (chan struct {}),
1363- log : func (func (Logger )) {},
1364- logError : func (func (Logger )) {},
1365- joined : make (chan struct {}),
1372+ done : make (chan struct {}),
1373+ log : func (func (Logger )) {},
1374+ logError : func (func (Logger )) {},
1375+ joined : make (chan struct {}),
1376+ Assignments : map [string ][]PartitionAssignment {"topic" : {{0 , 1 }}},
13661377 }
13671378
13681379 // initialize commits so that the commitLoopImmediate select statement blocks
@@ -1396,7 +1407,7 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
13961407}
13971408
13981409func TestCommitOffsetsWithRetry (t * testing.T ) {
1399- offsets := offsetStash {"topic" : {0 : 0 }}
1410+ offsets := offsetStash {"topic" : {0 : { 0 , 1 } }}
14001411
14011412 tests := map [string ]struct {
14021413 Fails int
@@ -1430,9 +1441,10 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
14301441 return offsetCommitResponseV2 {}, nil
14311442 },
14321443 },
1433- done : make (chan struct {}),
1434- log : func (func (Logger )) {},
1435- logError : func (func (Logger )) {},
1444+ done : make (chan struct {}),
1445+ log : func (func (Logger )) {},
1446+ logError : func (func (Logger )) {},
1447+ Assignments : map [string ][]PartitionAssignment {"topic" : {{0 , 1 }}},
14361448 }
14371449
14381450 r := & Reader {stctx : context .Background ()}
@@ -1636,7 +1648,7 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
16361648 }
16371649}
16381650
1639- func TestConsumerGroupWithGroupTopicsMultple (t * testing.T ) {
1651+ func TestConsumerGroupWithGroupTopicsMultiple (t * testing.T ) {
16401652 ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
16411653 defer cancel ()
16421654
0 commit comments