Skip to content

Commit 8e5c0a6

Browse files
craig[bot]stevendanna
andcommitted
Merge #154828
154828: rangefeed: explicitly initialize stream status r=wenyihu8 a=stevendanna Since we do have a window after an error where the unbufferedRegistration may send events, here we move to explicit initialization of the stream status to avoid leaking any streamStatus structs. Epic: none Release note: None Co-authored-by: Steven Danna <danna@cockroachlabs.com>
2 parents 9bfbb28 + abc1282 commit 8e5c0a6

File tree

7 files changed

+93
-49
lines changed

7 files changed

+93
-49
lines changed

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1515
"github.com/cockroachdb/cockroach/pkg/settings"
1616
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
17+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
1718
"github.com/cockroachdb/cockroach/pkg/util/retry"
1819
"github.com/cockroachdb/cockroach/pkg/util/stop"
1920
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -147,47 +148,50 @@ func (bs *BufferedSender) sendBuffered(
147148
// registration. This error should be the next event that is sent to
148149
// stream.
149150
//
150-
// NB: The zero-value of streamStatus is the valid state of a newly seen
151-
// stream.
152-
status := bs.queueMu.byStream[ev.StreamID]
153-
switch status.state {
154-
case streamActive:
155-
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
151+
// NB: We don't error if the stream status is not found as this may be an
152+
// event for an already closed stream. Such events are possible while the
153+
// registration publishes the catch up scan buffer.
154+
status, ok := bs.queueMu.byStream[ev.StreamID]
155+
if ok {
156+
switch status.state {
157+
case streamActive:
158+
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
159+
if ev.Error != nil {
160+
// If _this_ event is an error, no use sending another error. This stream
161+
// is going down. Admit this error and mark the stream as overflowed.
162+
status.state = streamOverflowed
163+
} else {
164+
// This stream is at capacity, return an error to the registration that it
165+
// should send back to us after cleaning up.
166+
status.state = streamOverflowing
167+
return newRetryErrBufferCapacityExceeded()
168+
}
169+
}
170+
case streamOverflowing:
171+
// The unbufferedRegistration is the only component that sends non-error
172+
// events to our stream. In response to the error we return when moving to
173+
// stateOverflowing, it should immediately send us an error and mark itself
174+
// as disconnected.
175+
//
176+
// The only unfortunate exception is if we get disconnected while flushing
177+
// the catch-up scan buffer.
156178
if ev.Error != nil {
157-
// If _this_ event is an error, no use sending another error. This stream
158-
// is going down. Admit this error and mark the stream as overflowed.
159179
status.state = streamOverflowed
160-
} else {
161-
// This stream is at capacity, return an error to the registration that it
162-
// should send back to us after cleaning up.
163-
status.state = streamOverflowing
164-
return newRetryErrBufferCapacityExceeded()
165180
}
181+
case streamOverflowed:
182+
// If we are overflowed, we don't expect any further events because the
183+
// registration should have disconnected in response to the error.
184+
//
185+
// TODO(ssd): Consider adding an assertion here.
186+
return nil
187+
default:
188+
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
166189
}
167-
case streamOverflowing:
168-
// The unbufferedRegistration is the only component that sends non-error
169-
// events to our stream. In response to the error we return when moving to
170-
// stateOverflowing, it should immediately send us an error and mark itself
171-
// as disconnected.
172-
//
173-
// The only unfortunate exception is if we get disconnected while flushing
174-
// the catch-up scan buffer.
175-
if ev.Error != nil {
176-
status.state = streamOverflowed
177-
}
178-
case streamOverflowed:
179-
// If we are overflowed, we don't expect any further events because the
180-
// registration should have disconnected in response to the error.
181-
//
182-
// TODO(ssd): Consider adding an assertion here.
183-
return nil
184-
default:
185-
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
186-
}
190+
// We are admitting this event.
191+
status.queueItems++
192+
bs.queueMu.byStream[ev.StreamID] = status
187193

188-
// We are admitting this event.
189-
status.queueItems++
190-
bs.queueMu.byStream[ev.StreamID] = status
194+
}
191195

192196
// TODO(wenyihu6): pass an actual context here
193197
alloc.Use(context.Background())
@@ -259,6 +263,18 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
259263
return event, ok
260264
}
261265

266+
func (bs *BufferedSender) addStream(streamID int64) {
267+
bs.queueMu.Lock()
268+
defer bs.queueMu.Unlock()
269+
if _, ok := bs.queueMu.byStream[streamID]; !ok {
270+
bs.queueMu.byStream[streamID] = streamStatus{}
271+
} else {
272+
if buildutil.CrdbTestBuild {
273+
panic(fmt.Sprintf("stream %d already exists in buffered sender", streamID))
274+
}
275+
}
276+
}
277+
262278
func (bs *BufferedSender) removeStream(streamID int64) {
263279
bs.queueMu.Lock()
264280
defer bs.queueMu.Unlock()

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,17 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
182182
st := cluster.MakeTestingClusterSettings()
183183

184184
queueCap := int64(24)
185+
streamID := int64(1)
186+
185187
RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
186188
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
189+
bs.addStream(streamID)
187190
require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)
188191

189192
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
190193
ev1 := new(kvpb.RangeFeedEvent)
191194
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
192-
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1}
195+
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}
193196

194197
for range queueCap {
195198
require.NoError(t, bs.sendBuffered(muxEv, nil))
@@ -235,12 +238,13 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
235238
p, h, pStopper := newTestProcessor(t, withRangefeedTestType(scheduledProcessorWithBufferedSender))
236239
defer pStopper.Stop(ctx)
237240

238-
streamID := int64(42)
241+
streamID1 := int64(42)
242+
streamID2 := streamID1 + 1
239243

240244
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
241245
ev1 := new(kvpb.RangeFeedEvent)
242246
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
243-
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}
247+
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID1}
244248

245249
// Block the stream so that we can overflow later.
246250
unblock := testServerStream.BlockSend()
@@ -256,18 +260,20 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
256260
}
257261

258262
// Add our stream to the stream manager.
263+
sm.RegisteringStream(streamID1)
259264
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
260265
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
261-
sm.NewStream(streamID, 1 /*rangeID*/))
266+
sm.NewStream(streamID1, 1 /*rangeID*/))
262267
require.True(t, registered)
263-
sm.AddStream(streamID, d)
268+
sm.AddStream(streamID1, d)
264269

265270
// Add a second stream to the stream manager.
271+
sm.RegisteringStream(streamID2)
266272
registered, d, _ = p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
267273
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
268-
sm.NewStream(streamID+1, 1 /*rangeID*/))
274+
sm.NewStream(streamID2, 1 /*rangeID*/))
269275
require.True(t, registered)
270-
sm.AddStream(streamID+1, d)
276+
sm.AddStream(streamID2, d)
271277

272278
// At this point we actually have sent 2 events, one for each checkpoint sent
273279
// by the registrations. One of these should get pulled off the queue and block.
@@ -283,7 +289,7 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
283289
require.EqualError(t, err, capExceededErrStr)
284290

285291
// A write to a different stream should be fine
286-
muxEv.StreamID = streamID + 2
292+
muxEv.StreamID = streamID2
287293
err = sm.sender.sendBuffered(muxEv, nil)
288294
require.NoError(t, err)
289295

pkg/kv/kvserver/rangefeed/stream_manager.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ type sender interface {
7676
// all streams in StreamManager.
7777
run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error
7878

79-
// Remove stream is called when an individual stream is being removed.
79+
// TODO(ssd): These two methods call into question whether StreamManager and
80+
// sender can really be separate. We might consider combining the two for
81+
// simplicity.
82+
//
83+
// addStream is called when an individual stream is being added.
84+
addStream(streamID int64)
85+
// removeStream is called when an individual stream is being removed.
8086
removeStream(streamID int64)
8187

8288
// cleanup is called when the sender is stopped. It is expected to clean up
@@ -139,6 +145,12 @@ func (sm *StreamManager) DisconnectStream(streamID int64, err *kvpb.Error) {
139145
}
140146
}
141147

148+
// RegisteringStream is called once a stream will be registered. After this
149+
// point, the stream may start to see event.
150+
func (sm *StreamManager) RegisteringStream(streamID int64) {
151+
sm.sender.addStream(streamID)
152+
}
153+
142154
// AddStream adds a streamID with its disconnector to the StreamManager.
143155
// StreamManager can use the disconnector to shut down the rangefeed stream
144156
// later on.

pkg/kv/kvserver/rangefeed/stream_manager_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,11 @@ func TestStreamManagerErrorHandling(t *testing.T) {
211211
t.Run("Fail to register rangefeed with the processor", func(t *testing.T) {
212212
p, _, stopper := newTestProcessor(t, withRangefeedTestType(rt))
213213
defer stopper.Stop(ctx)
214-
sm.NewStream(sID, rID)
215-
// We mock failed registration by not calling p.Register.
216-
// node.MuxRangefeed would call sendBuffered with error event.
217-
require.NoError(t, sm.sender.sendBuffered(makeMuxRangefeedErrorEvent(sID, rID, disconnectErr), nil))
214+
streamSink := sm.NewStream(sID, rID)
215+
// We mock failed registration by not calling p.Register and calling
216+
// SendError just as (*Node).muxRangeFeed does.
217+
sm.RegisteringStream(sID)
218+
streamSink.SendError(disconnectErr)
218219
expectErrorHandlingInvariance(p)
219220
testServerStream.reset()
220221
})
@@ -223,6 +224,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
223224
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
224225
defer stopper.Stop(ctx)
225226
stream := sm.NewStream(sID, rID)
227+
sm.RegisteringStream(sID)
226228
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
227229
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
228230
stream)
@@ -237,6 +239,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
237239
stream := sm.NewStream(sID, rID)
238240
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
239241
defer stopper.Stop(ctx)
242+
sm.RegisteringStream(sID)
240243
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
241244
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
242245
stream)
@@ -252,6 +255,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
252255
stream := sm.NewStream(sID, rID)
253256
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
254257
defer stopper.Stop(ctx)
258+
sm.RegisteringStream(sID)
255259
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
256260
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
257261
stream)

pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func TestUnbufferedRegWithStreamManager(t *testing.T) {
4848
})
4949
t.Run("register 50 streams", func(t *testing.T) {
5050
for id := int64(0); id < 50; id++ {
51+
sm.RegisteringStream(id)
5152
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
5253
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
5354
sm.NewStream(id, r1))
@@ -142,6 +143,7 @@ func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) {
142143
evErr.MustSetValue(&kvpb.RangeFeedError{Error: *discErr})
143144

144145
// Register one stream.
146+
sm.RegisteringStream(s1)
145147
registered, d, _ := p.Register(ctx, h.span, startTs,
146148
makeCatchUpIterator(catchUpIter, span, startTs), /* catchUpIter */
147149
true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,

pkg/kv/kvserver/rangefeed/unbuffered_sender.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ func (ubs *UnbufferedSender) sendUnbuffered(event *kvpb.MuxRangeFeedEvent) error
132132
return ubs.sender.Send(event)
133133
}
134134

135+
// addStream implements sender.
136+
func (ubs *UnbufferedSender) addStream(int64) {}
137+
135138
// removeStream implements sender.
136139
func (ubs *UnbufferedSender) removeStream(int64) {}
137140

pkg/server/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,7 @@ func (n *Node) muxRangeFeed(muxStream kvpb.RPCInternal_MuxRangeFeedStream) error
23072307
// Disconnector returned can be used to shut down rangefeed from the
23082308
// stream manager. If rangefeed disconnects with an error after being
23092309
// successfully registered, it calls streamSink.SendError.
2310+
sm.RegisteringStream(req.StreamID)
23102311
if disconnector, err := n.stores.RangeFeed(streamCtx, req, streamSink, limiter); err != nil {
23112312
// The rangefeed was not registered, so it should be safe to send this
23122313
// error directly to the stream rather than via the registration.

0 commit comments

Comments
 (0)