@@ -15,6 +15,7 @@ import (
1515 "testing"
1616 "time"
1717
18+ "github.com/stretchr/testify/assert"
1819 "github.com/stretchr/testify/require"
1920)
2021
@@ -855,6 +856,12 @@ func TestReaderConsumerGroup(t *testing.T) {
855856 partitions : 1 ,
856857 function : testConsumerGroupSimple ,
857858 },
859+
860+ {
861+ scenario : "Do not commit not assigned messages after rebalance" ,
862+ partitions : 2 ,
863+ function : testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions ,
864+ },
858865 }
859866
860867 for _ , test := range tests {
@@ -1174,6 +1181,76 @@ func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing
11741181 }
11751182}
11761183
1184+ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions (t * testing.T , ctx context.Context , firstReader * Reader ) {
1185+ client , shutdown := newLocalClient ()
1186+ defer shutdown ()
1187+
1188+ // write messages across both partitions
1189+ writer := & Writer {
1190+ Addr : TCP (firstReader .config .Brokers ... ),
1191+ Topic : firstReader .config .Topic ,
1192+ Balancer : & RoundRobin {},
1193+ BatchSize : 1 ,
1194+ Transport : client .Transport ,
1195+ }
1196+ messageCount := 4
1197+ if err := writer .WriteMessages (ctx , makeTestSequence (messageCount )... ); err != nil {
1198+ t .Fatalf ("bad write messages: %v" , err )
1199+ }
1200+ if err := writer .Close (); err != nil {
1201+ t .Fatalf ("bad write err: %v" , err )
1202+ }
1203+
1204+ secondReader := NewReader (firstReader .config )
1205+ defer secondReader .Close ()
1206+
1207+ require .Eventually (t , func () bool {
1208+ resp , err := client .DescribeGroups (
1209+ ctx ,
1210+ & DescribeGroupsRequest {
1211+ GroupIDs : []string {firstReader .config .GroupID },
1212+ },
1213+ )
1214+ assert .NoError (t , err )
1215+ assert .NotNil (t , resp )
1216+ return len (resp .Groups [0 ].Members ) == 2
1217+ }, 10 * time .Second , 100 * time .Millisecond )
1218+
1219+ topicsToCommit := make (map [int ]int64 )
1220+ msgsForSecondReader := make ([]Message , 0 , messageCount )
1221+ for i := 0 ; i < messageCount / 2 ; i ++ {
1222+ if msg , err := secondReader .FetchMessage (ctx ); err != nil {
1223+ t .Errorf ("reader %v expected to read 1 message" , i )
1224+ } else {
1225+ msgsForSecondReader = append (msgsForSecondReader , msg )
1226+ topicsToCommit [msg .Partition ] = msg .Offset
1227+ }
1228+ }
1229+
1230+ require .NoError (t , secondReader .CommitMessages (ctx , msgsForSecondReader [len (msgsForSecondReader )- 1 ]))
1231+ require .NoError (t , firstReader .CommitMessages (ctx , msgsForSecondReader [0 ]))
1232+ resp , err := client .OffsetFetch (
1233+ ctx ,
1234+ & OffsetFetchRequest {
1235+ GroupID : firstReader .config .GroupID ,
1236+ Topics : map [string ][]int {firstReader .config .Topic : {0 , 1 }},
1237+ },
1238+ )
1239+ require .NoError (t , err )
1240+ require .NotNil (t , resp )
1241+ if topics , ok := resp .Topics [firstReader .config .Topic ]; ! ok {
1242+ require .True (t , ok , "Topic not found" )
1243+ } else {
1244+ for _ , topic := range topics {
1245+ if offset , ok := topicsToCommit [topic .Partition ]; ok {
1246+ assert .Equal (t , offset + 1 , topic .CommittedOffset , "committed partition %d had committed offset %d instead of %d" , topic .Partition , topic .CommittedOffset , offset )
1247+ } else {
1248+ assert .Equal (t , int64 (- 1 ), topic .CommittedOffset , "not-committed partition %d had committed offset %d instead of 0" , topic .Partition , topic .CommittedOffset )
1249+ }
1250+ }
1251+ }
1252+ }
1253+
11771254func TestOffsetStash (t * testing.T ) {
11781255 const topic = "topic"
11791256
0 commit comments