@@ -7,12 +7,14 @@ package rangefeed
77
88import (
99 "context"
10+ "fmt"
1011 "time"
1112
1213 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1314 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1415 "github.com/cockroachdb/cockroach/pkg/settings"
1516 "github.com/cockroachdb/cockroach/pkg/settings/cluster"
17+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
1618 "github.com/cockroachdb/cockroach/pkg/util/retry"
1719 "github.com/cockroachdb/cockroach/pkg/util/stop"
1820 "github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -48,38 +50,20 @@ import (
4850// BufferedPerRangeEventSink.Send BufferedPerRangeEventSink.SendError
4951//
5052
51- // RangefeedSingleBufferedSenderQueueMaxSize is the maximum number of events
52- // that the buffered sender will buffer before it starts returning capacity
53- // exceeded errors. Updates to this setting are only applied to new
54- // MuxRangefeedCalls. Existing streams will use the previous value until
53+ // RangefeedSingleBufferedSenderQueueMaxPerReg is the maximum number of events
54+ // that the buffered sender will buffer for a single registration (identified by
55+ // streamID). Existing MuxRangefeeds will use the previous value until
5556// restarted.
5657//
57- // The default here has been arbitrarily chosen. Ideally,
58- //
59- // - We want to avoid capacity exceeded errors that wouldn't have occurred
60- // when the buffered registrations were in use.
61- //
62- // - We don't want to drastically increase the amount of queueing allowed for a
63- // single registration.
64- //
65- // A small buffer may be justified given that:
66- //
67- // - One buffered sender is feeding a single gRPC client, so scaling based on
68- // registrations doesn't necessarily make sense. If the consumer is behind, it
69- // is behind.
70- //
71- // - Events emitted during catchup scans have their own per-registration buffer
72- // still.
73- //
7458// TODO(ssd): This is a bit of a stop-gap so that we have a knob to turn if we
7559// need to. We probably want each buffered sender (or each consumerID) to be
7660// able to hold up to some fraction of the total rangefeed budget. But we are
7761// starting here for now.
78- var RangefeedSingleBufferedSenderQueueMaxSize = settings .RegisterIntSetting (
62+ var RangefeedSingleBufferedSenderQueueMaxPerReg = settings .RegisterIntSetting (
7963 settings .SystemOnly ,
80- "kv.rangefeed.buffered_sender.queue_max_size " ,
81- "max size of a buffered senders event queue (0 for no max)" ,
82- kvserverbase .DefaultRangefeedEventCap * 8 ,
64+ "kv.rangefeed.buffered_sender.per_registration_max_queue_size " ,
65+ "maximum number of events a single registration can have queued in the event queue (0 for no max)" ,
66+ kvserverbase .DefaultRangefeedEventCap * 2 ,
8367)
8468
8569// BufferedSender is embedded in every rangefeed.BufferedPerRangeEventSink,
@@ -95,9 +79,10 @@ type BufferedSender struct {
9579 syncutil.Mutex
9680 stopped bool
9781 buffer * eventQueue
98- // capacity is the maximum number of events that can be buffered.
99- capacity int64
100- overflowed bool
82+ // perStreamCapacity is the maximum number buffered events allowed per
83+ // stream.
84+ perStreamCapacity int64
85+ byStream map [int64 ]streamStatus
10186 }
10287
10388 // notifyDataC is used to notify the BufferedSender.run goroutine that there
@@ -111,6 +96,25 @@ type BufferedSender struct {
11196 metrics * BufferedSenderMetrics
11297}
11398
99+ type streamState int64
100+
101+ const (
102+ // streamActive is the default state of the stream.
103+ streamActive streamState = iota
104+ // streamOverflowing is the state we are in when the stream has reached its
105+ // limit and is waiting to deliver an error.
106+ streamOverflowing streamState = iota
107+ // streamOverflowed means the stream has overflowed and the error has been
108+ // placed in the queue.
109+ streamOverflowed streamState = iota
110+ )
111+
112+ type streamStatus struct {
113+ // queueItems is the number of items for a given stream in the event queue.
114+ queueItems int64
115+ state streamState
116+ }
117+
114118func NewBufferedSender (
115119 sender ServerStreamSender , settings * cluster.Settings , bsMetrics * BufferedSenderMetrics ,
116120) * BufferedSender {
@@ -121,7 +125,8 @@ func NewBufferedSender(
121125 bs .queueMu .buffer = newEventQueue ()
122126 bs .notifyDataC = make (chan struct {}, 1 )
123127 bs .queueMu .buffer = newEventQueue ()
124- bs .queueMu .capacity = RangefeedSingleBufferedSenderQueueMaxSize .Get (& settings .SV )
128+ bs .queueMu .perStreamCapacity = RangefeedSingleBufferedSenderQueueMaxPerReg .Get (& settings .SV )
129+ bs .queueMu .byStream = make (map [int64 ]streamStatus )
125130 return bs
126131}
127132
@@ -137,13 +142,52 @@ func (bs *BufferedSender) sendBuffered(
137142 if bs .queueMu .stopped {
138143 return errors .New ("stream sender is stopped" )
139144 }
140- if bs .queueMu .overflowed {
141- return newRetryErrBufferCapacityExceeded ()
142- }
143- if bs .queueMu .capacity > 0 && bs .queueMu .buffer .len () >= bs .queueMu .capacity {
144- bs .queueMu .overflowed = true
145- return newRetryErrBufferCapacityExceeded ()
145+
146+ // Per-stream capacity limits. If the stream is already overflowed we drop the
147+ // request. If the stream has hit its limit, we return an error to the
148+ // registration. This error should be the next event that is sent to
149+ // stream.
150+ //
151+ // NB: The zero-value of streamStatus is the valid state of a newly seen
152+ // stream.
153+ status := bs .queueMu .byStream [ev .StreamID ]
154+ switch status .state {
155+ case streamActive :
156+ if bs .queueMu .perStreamCapacity > 0 && status .queueItems == bs .queueMu .perStreamCapacity {
157+ if ev .Error != nil {
158+ // If _this_ event is an error, no use sending another error. This stream
159+ // is going down. Admit this error and mark the stream as overflowed.
160+ status .state = streamOverflowed
161+ } else {
162+ // This stream is at capacity, return an error to the registration that it
163+ // should send back to us after cleaning up.
164+ status .state = streamOverflowing
165+ return newRetryErrBufferCapacityExceeded ()
166+ }
167+ }
168+ case streamOverflowing :
169+ // The unbufferedRegistration is the only component that sends non-error
170+ // events to our stream. In response to the error we return when moving to
171+ // stateOverflowing, it should immediately send us an error and mark itself
172+ // as disconnected. As a result, no non-error events are expected.
173+ if ev .Error == nil {
174+ panic ("only error events expected after stream has exceeded capacity" )
175+ }
176+ status .state = streamOverflowed
177+ case streamOverflowed :
178+ // If we are overflowed, we don't expect any further events because the
179+ // registration should have disconnected in response to the error.
180+ //
181+ // TODO(ssd): Consider adding an assertion here.
182+ return nil
183+ default :
184+ panic (fmt .Sprintf ("unhandled stream state: %v" , status .state ))
146185 }
186+
187+ // We are admitting this event.
188+ status .queueItems ++
189+ bs .queueMu .byStream [ev .StreamID ] = status
190+
147191 // TODO(wenyihu6): pass an actual context here
148192 alloc .Use (context .Background ())
149193 bs .queueMu .buffer .pushBack (sharedMuxEvent {ev , alloc })
@@ -179,10 +223,11 @@ func (bs *BufferedSender) run(
179223 return nil
180224 case <- bs .notifyDataC :
181225 for {
182- e , success , overflowed , remains := bs .popFront ()
226+ e , success := bs .popFront ()
183227 if ! success {
184228 break
185229 }
230+
186231 bs .metrics .BufferedSenderQueueSize .Dec (1 )
187232 err := bs .sender .Send (e .ev )
188233 e .alloc .Release (ctx )
@@ -192,26 +237,33 @@ func (bs *BufferedSender) run(
192237 if err != nil {
193238 return err
194239 }
195- if overflowed && remains == int64 (0 ) {
196- return newRetryErrBufferCapacityExceeded ()
197- }
198240 }
199241 }
200242 }
201243}
202244
203245// popFront pops the front event from the buffer queue. It returns the event and
204246// a boolean indicating if the event was successfully popped.
205- func (bs * BufferedSender ) popFront () (
206- e sharedMuxEvent ,
207- success bool ,
208- overflowed bool ,
209- remains int64 ,
210- ) {
247+ func (bs * BufferedSender ) popFront () (e sharedMuxEvent , success bool ) {
211248 bs .queueMu .Lock ()
212249 defer bs .queueMu .Unlock ()
213250 event , ok := bs .queueMu .buffer .popFront ()
214- return event , ok , bs .queueMu .overflowed , bs .queueMu .buffer .len ()
251+ if ok {
252+ state , streamFound := bs .queueMu .byStream [event .ev .StreamID ]
253+ if streamFound {
254+ state .queueItems -= 1
255+ bs .queueMu .byStream [event .ev .StreamID ] = state
256+ } else {
257+ assumedUnreachable ("event found in queue with no state in byStream" )
258+ }
259+ }
260+ return event , ok
261+ }
262+
263+ func (bs * BufferedSender ) removeStream (streamID int64 ) {
264+ bs .queueMu .Lock ()
265+ defer bs .queueMu .Unlock ()
266+ delete (bs .queueMu .byStream , streamID )
215267}
216268
217269// cleanup is called when the sender is stopped. It is expected to free up
@@ -222,6 +274,7 @@ func (bs *BufferedSender) cleanup(ctx context.Context) {
222274 bs .queueMu .stopped = true
223275 remaining := bs .queueMu .buffer .len ()
224276 bs .queueMu .buffer .drain (ctx )
277+ bs .queueMu .byStream = nil
225278 bs .metrics .BufferedSenderQueueSize .Dec (remaining )
226279}
227280
@@ -231,12 +284,6 @@ func (bs *BufferedSender) len() int {
231284 return int (bs .queueMu .buffer .len ())
232285}
233286
234- func (bs * BufferedSender ) overflowed () bool {
235- bs .queueMu .Lock ()
236- defer bs .queueMu .Unlock ()
237- return bs .queueMu .overflowed
238- }
239-
240287// Used for testing only.
241288func (bs * BufferedSender ) waitForEmptyBuffer (ctx context.Context ) error {
242289 opts := retry.Options {
@@ -258,3 +305,9 @@ func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
258305 }
259306 return errors .New ("buffered sender failed to send in time" )
260307}
308+
309+ func assumedUnreachable (msg string ) {
310+ if buildutil .CrdbTestBuild {
311+ panic (fmt .Sprintf ("assumed unreachable code reached: %v" , msg ))
312+ }
313+ }
0 commit comments