@@ -150,54 +150,53 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
150150 backoffDelayMax = 5 * time .Second
151151 )
152152
153- messagesToSend := make (map [string ]map [int ]int64 )
153+ messagesToSendForGeneration := make (map [ int32 ] map [string ]map [int ]int64 )
154154 for topic , partitionsInfo := range offsetStash {
155- msgsForTopic := make (map [int ]int64 )
156155 for partition , commitInfo := range partitionsInfo {
157- // if there is a generation and it is different than the current
158- // it means there was a rebalance
159- if commitInfo .generationId != undefinedGenerationId {
160- if gen .ID == commitInfo .generationId {
161- msgsForTopic [partition ] = commitInfo .offset
162- } else {
163- if assignments , ok := gen .Assignments [topic ]; ok {
164- for _ , assignment := range assignments {
165- if assignment .ID == partition && commitInfo .generationId > gen .ID {
166- msgsForTopic [partition ] = commitInfo .offset
167- }
168- }
169- }
170- r .withErrorLogger (func (l Logger ) {
171- l .Printf ("Discarding commint for %s - %d: %d . Current generation is %d, commit generation is %d" , topic , partition , commitInfo .offset , gen .ID , commitInfo .generationId )
172- })
173- }
174- } else {
175- msgsForTopic [partition ] = commitInfo .offset
156+ if _ , ok := messagesToSendForGeneration [commitInfo .generationID ]; ! ok {
157+ messagesToSendForGeneration [commitInfo .generationID ] = make (map [string ]map [int ]int64 )
176158 }
177- }
178- if len (msgsForTopic ) > 0 {
179- messagesToSend [topic ] = msgsForTopic
159+ msgsForTopic := messagesToSendForGeneration [commitInfo .generationID ]
160+ if _ , ok := msgsForTopic [topic ]; ! ok {
161+ msgsForTopic [topic ] = make (map [int ]int64 )
162+ }
163+ msgsForPartition := msgsForTopic [topic ]
164+ msgsForPartition [partition ] = commitInfo .offset
180165 }
181166 }
182- for attempt := 0 ; attempt < retries ; attempt ++ {
183- if attempt != 0 {
184- if ! sleep (r .stctx , backoff (attempt , backoffDelayMin , backoffDelayMax )) {
185- return
167+ var illegalGenerationErr error
168+ for generationID , messages := range messagesToSendForGeneration {
169+ for attempt := 0 ; attempt < retries ; attempt ++ {
170+ if attempt != 0 {
171+ if ! sleep (r .stctx , backoff (attempt , backoffDelayMin , backoffDelayMax )) {
172+ continue
173+ }
186174 }
187- }
188175
189- if err = gen .CommitOffsets (messagesToSend ); err == nil {
190- return
176+ if err = gen .CommitOffsetsForGenID (generationID , messages ); err == nil {
177+ continue
178+ }
179+ // IllegalGeneration error is not retriable, but we should attempt to
180+ // perform the remaining commits
181+ if err == IllegalGeneration {
182+ illegalGenerationErr = err
183+ // we prevent useless retries and we will attempt to
184+ // commit the remaining generations.
185+ err = nil
186+ offsetStash .removeGenerationID (generationID )
187+ }
191188 }
192189 }
193-
190+ if illegalGenerationErr != nil {
191+ err = illegalGenerationErr
192+ }
194193 return // err will not be nil
195194}
196195
197196// offsetStash holds offsets by topic => partition => offsetEntry.
198197type offsetEntry struct {
199198 offset int64
200- generationId int32
199+ generationID int32
201200}
202201type offsetStash map [string ]map [int ]offsetEntry
203202
@@ -213,7 +212,7 @@ func (o offsetStash) merge(commits []commit) {
213212 if offset , ok := offsetsByPartition [c .partition ]; ! ok || c .offset > offset .offset {
214213 offsetsByPartition [c .partition ] = offsetEntry {
215214 offset : c .offset ,
216- generationId : c .generationId ,
215+ generationID : c .generationId ,
217216 }
218217 }
219218 }
@@ -226,6 +225,16 @@ func (o offsetStash) reset() {
226225 }
227226}
228227
228+ func (o offsetStash ) removeGenerationID (genID int32 ) {
229+ for _ , offsetsForTopic := range o {
230+ for partition , offsetsForPartition := range offsetsForTopic {
231+ if offsetsForPartition .generationID == genID {
232+ delete (offsetsForTopic , partition )
233+ }
234+ }
235+ }
236+ }
237+
229238// commitLoopImmediate handles each commit synchronously.
230239func (r * Reader ) commitLoopImmediate (ctx context.Context , gen * Generation ) {
231240 offsets := offsetStash {}
0 commit comments