@@ -11,6 +11,7 @@ import (
1111
1212 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1313 "github.com/cockroachdb/cockroach/pkg/roachpb"
14+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
1415 "github.com/cockroachdb/cockroach/pkg/util/hlc"
1516 "github.com/cockroachdb/cockroach/pkg/util/log"
1617 "github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -20,14 +21,14 @@ import (
2021)
2122
2223// unbufferedRegistration is similar to bufferedRegistration but does not
23- // internally buffer events delivered via publish(). Rather, when caught up, it
24- // forwards events directly to the BufferedStream. It assumes that
25- // BufferedStream's SendBuffered method is non-blocking. As a result, it does
26- // not require a long-lived goroutine to move event between an internal buffer
27- // and the sender.
24+ // internally buffer events delivered via publish() after its initial catch-up
25+ // scan. Rather, it forwards events directly to the BufferedStream. It assumes
26+ // that BufferedStream's SendBuffered method is non-blocking. As a result, it
27+ // does not require a long-lived goroutine to move event between an internal
28+ // buffer and the sender.
2829//
2930// Note that UnbufferedRegistration needs to ensure that events sent to
30- // BufferedStream from the processor during a running catchup scan are sent to
31+ // BufferedStream from the processor during a running catch-up scan are sent to
3132// the client in order. To achieve this, events from catch-up scans are sent
3233// using UnbufferedSend. While the catch-up scan is ongoing, live updates
3334// delivered via publish() are temporarily buffered in catchUpBuf to hold them
@@ -38,7 +39,7 @@ import (
3839// the buffered sender directly. unbufferedRegistration is responsible for
3940// correctly handling memory reservations for data stored in the catchUpBuf.
4041//
41- // All errors are delivered to this registration via Disonnect () which is
42+ // All errors are delivered to this registration via Disconnect () which is
4243// non-blocking. Disconnect sends an error to the stream and invokes any
4344// necessary cleanup.
4445//
@@ -81,10 +82,6 @@ type unbufferedRegistration struct {
8182 // created. It is set to nil by output loop for processing and closed when
8283 // done.
8384 catchUpIter * CatchUpIterator
84-
85- // Used for testing only. Indicates that all events in catchUpBuf have been
86- // sent to BufferedStream.
87- caughtUp bool
8885 }
8986}
9087
@@ -119,7 +116,6 @@ func newUnbufferedRegistration(
119116 stream : stream ,
120117 }
121118 br .mu .catchUpIter = catchUpIter
122- br .mu .caughtUp = true
123119 if br .mu .catchUpIter != nil {
124120 // A nil catchUpIter indicates we don't need a catch-up scan. We avoid
125121 // initializing catchUpBuf in this case, which will result in publish()
@@ -132,8 +128,7 @@ func newUnbufferedRegistration(
132128// publish sends a single event to this registration. It is called by the
133129// processor if the event overlaps the span this registration is interested in.
134130// Events are either stored in catchUpBuf or sent to BufferedStream directly,
135- // depending on whether catch-up scan is done. publish is responsible for using
136- // and releasing alloc.
131+ // depending on whether catch-up scan is done.
137132func (ubr * unbufferedRegistration ) publish (
138133 ctx context.Context , event * kvpb.RangeFeedEvent , alloc * SharedBudgetAllocation ,
139134) {
@@ -159,7 +154,6 @@ func (ubr *unbufferedRegistration) publish(
159154 alloc .Use (ctx )
160155 select {
161156 case ubr .mu .catchUpBuf <- e :
162- ubr .mu .caughtUp = false
163157 default :
164158 // catchUpBuf exceeded and we are dropping this event. A catch up scan is
165159 // needed later.
@@ -258,33 +252,43 @@ func (ubr *unbufferedRegistration) runOutputLoop(ctx context.Context, forStacks
258252}
259253
260254// drainAllocations drains catchUpBuf and release all memory held by the events
261- // in catch-up buffer without publishing them. It is safe to assume that
262- // catch-up buffer is empty and nil after this.
255+ // in catch-up buffer without publishing them. It should only be called in the
256+ // case of a catch-up scan failure, in which case the registration should have
257+ // already been disconnected.
258+ //
259+ // It is safe to assume that catch-up buffer is empty and nil after this.
263260func (ubr * unbufferedRegistration ) drainAllocations (ctx context.Context ) {
264- ubr .mu .Lock ()
265- defer ubr .mu .Unlock ()
266- // TODO(wenyihu6): Check if we can just discard without holding the lock first
267- // ? We shouldn't be reading from the buffer at the same time
268- if ubr .mu .catchUpBuf == nil {
269- // Already drained.
261+ if ! ubr .drainAllocationsRequired () {
270262 return
271263 }
272264
273- func () {
274- for {
275- select {
276- case e := <- ubr .mu .catchUpBuf :
277- e .alloc .Release (ctx )
278- putPooledSharedEvent (e )
279- default :
280- // Done.
281- return
282- }
265+ for {
266+ select {
267+ case e := <- ubr .mu .catchUpBuf :
268+ e .alloc .Release (ctx )
269+ putPooledSharedEvent (e )
270+ default :
271+ // Buffer is empty, unset catchUpBuf for good measure. But we should never
272+ // be here on a connected registration.
273+ ubr .mu .Lock ()
274+ bufLen := len (ubr .mu .catchUpBuf ) // nolint:deferunlockcheck
275+ ubr .mu .catchUpBuf = nil
276+ ubr .mu .Unlock ()
277+ assertTrue (bufLen == 0 , "non-empty catch up buffer after drainAllocation" )
278+
279+ return
283280 }
284- }()
281+ }
282+ }
285283
286- ubr .mu .catchUpBuf = nil
287- ubr .mu .caughtUp = true
284+ func (ubr * unbufferedRegistration ) drainAllocationsRequired () bool {
285+ ubr .mu .Lock ()
286+ defer ubr .mu .Unlock ()
287+ if ubr .mu .catchUpBuf == nil {
288+ return false
289+ }
290+ assertTrue (ubr .mu .disconnected , "drainAllocations called on connected registration without a non-nil catchUpBuf" )
291+ return true
288292}
289293
290294// publishCatchUpBuffer drains catchUpBuf and release all memory held by the
@@ -380,7 +384,6 @@ func (ubr *unbufferedRegistration) publishCatchUpBuffer(ctx context.Context) err
380384
381385 // Success.
382386 ubr .mu .catchUpBuf = nil
383- ubr .mu .caughtUp = true
384387 return nil
385388}
386389
@@ -440,7 +443,7 @@ func (ubr *unbufferedRegistration) waitForCaughtUp(ctx context.Context) error {
440443 }
441444 for re := retry .StartWithCtx (ctx , opts ); re .Next (); {
442445 ubr .mu .Lock ()
443- caughtUp := len ( ubr .mu .catchUpBuf ) == 0 && ubr . mu . caughtUp
446+ caughtUp := ubr .mu .catchUpBuf == nil
444447 ubr .mu .Unlock ()
445448 if caughtUp {
446449 return nil
@@ -449,5 +452,11 @@ func (ubr *unbufferedRegistration) waitForCaughtUp(ctx context.Context) error {
449452 if err := ctx .Err (); err != nil {
450453 return err
451454 }
452- return errors .Errorf ("unbufferedRegistration %v failed to empty in time" , ubr .Range ())
455+ return errors .Errorf ("unbufferedRegistration for %v failed to drain its catch-up buffer in time" , ubr .Range ())
456+ }
457+
458+ func assertTrue (cond bool , msg string ) {
459+ if buildutil .CrdbTestBuild && ! cond {
460+ panic (msg )
461+ }
453462}
0 commit comments