@@ -142,6 +142,8 @@ func (s streamStatus) SafeFormat(w redact.SafePrinter, _ rune) {
142142 w .Printf ("%s" , redact .SafeString (s .String ()))
143143}
144144
145+ var errNoSuchStream = errors .New ("stream already encountered an error or has not be added to buffered sender" )
146+
145147func NewBufferedSender (
146148 sender ServerStreamSender , settings * cluster.Settings , bsMetrics * BufferedSenderMetrics ,
147149) * BufferedSender {
@@ -161,7 +163,8 @@ func NewBufferedSender(
161163//
162164// It errors in the case of a stopped sender of if the registration has exceeded
163165// its capacity. sendBuffered with rangefeed events for streams that have
164- // already encountered an error will be dropped without error.
166+ // already enqueued an error event or have not been added via addStream will
167+ // return an error.
165168func (bs * BufferedSender ) sendBuffered (
166169 ev * kvpb.MuxRangeFeedEvent , alloc * SharedBudgetAllocation ,
167170) error {
@@ -177,25 +180,21 @@ func (bs *BufferedSender) sendBuffered(
177180 // stream.
178181 status , ok := bs .queueMu .byStream [ev .StreamID ]
179182 if ! ok {
180- // We don't error if the stream status is not found as this may be an
181- // event for an already closed stream. Such events are possible while the
182- // registration publishes the catch up scan buffer.
183- //
184- // The client ignores such events, but we drop it here regardless.
185- return nil
186- }
187- nextState , shouldAdmit , err := bs .nextPerQueueStateLocked (status , ev )
188- status .state = nextState
189- if shouldAdmit {
190- status .queueItems ++
183+ return errNoSuchStream
191184 }
185+ nextState , err := bs .nextPerQueueStateLocked (status , ev )
192186 if nextState == streamErrored {
187+ // We will be admitting this event but no events after this.
188+ assertTrue (err == nil , "expected error event to be admitted" )
193189 delete (bs .queueMu .byStream , ev .StreamID )
194190 } else {
191+ if err == nil {
192+ status .queueItems ++
193+ }
194+ status .state = nextState
195195 bs .queueMu .byStream [ev .StreamID ] = status
196196 }
197-
198- if err != nil || ! shouldAdmit {
197+ if err != nil {
199198 return err
200199 }
201200
@@ -211,42 +210,43 @@ func (bs *BufferedSender) sendBuffered(
211210}
212211
213212// nextPerQueueStateLocked returns the next state that should be stored for the
214- // stream related to the given rangefeed event and a bool that indicates whether
215- // the event should be admitted to the buffer. Any error returned should
216- // returned to the caller of sendBuffered .
213+ // stream related to the given rangefeed event. If an error is returned, the
214+ // event should not be admitted and the given error should be returned to the
215+ // client .
217216func (bs * BufferedSender ) nextPerQueueStateLocked (
218217 status streamStatus , ev * kvpb.MuxRangeFeedEvent ,
219- ) (streamState , bool , error ) {
218+ ) (streamState , error ) {
220219 // An error should always put us into stateErrored, so let's do that first.
221220 if ev .Error != nil {
222221 if status .state == streamErrored {
223222 assumedUnreachable ("unexpected buffered event on stream in state streamErrored" )
224223 }
225- return streamErrored , true , nil
224+ return streamErrored , nil
226225 }
227226
228227 switch status .state {
229228 case streamActive :
230229 if bs .queueMu .perStreamCapacity > 0 && status .queueItems == bs .queueMu .perStreamCapacity {
231230 // This stream is at capacity, return an error to the registration that it
232231 // should send back to us after cleaning up.
233- return streamOverflowing , false , newRetryErrBufferCapacityExceeded ()
232+ return streamOverflowing , newRetryErrBufferCapacityExceeded ()
234233 }
235234 // Happy path.
236- return streamActive , true , nil
235+ return streamActive , nil
237236 case streamOverflowing :
238237 // The only place we do concurrent buffered sends is during catch-up scan
239238 // publishing which may be concurrent with a disconnect. The catch-up scan
240239 // will stop publishing if it receives an error and try to send an error
241- // back. A disconnect only sends an error. This path exclusively handles non
242- // errors.
240+ // back. A disconnect only sends an error. This path exclusively handles
241+ // non- errors.
243242 assumedUnreachable ("unexpected buffered event on stream in state streamOverflowing" )
244- return streamOverflowing , false , nil
243+ return streamOverflowing , newRetryErrBufferCapacityExceeded ()
245244 case streamErrored :
246245 // This is unexpected because streamErrored streams are removed from the
247- // status map and future sends are ignored.
246+ // status map and thus should be handled in sendBuffered before this
247+ // function is called.
248248 assumedUnreachable ("unexpected buffered event on stream in state streamErrored" )
249- return streamErrored , false , nil
249+ return streamErrored , errNoSuchStream
250250 default :
251251 panic (fmt .Sprintf ("unhandled stream state: %v" , status .state ))
252252 }
0 commit comments