Skip to content

Commit 98dc90e

Browse files
committed
rangefeed: add newBaseRegistration
Epic: none Release note: None
1 parent e010259 commit 98dc90e

File tree

3 files changed

+45
-23
lines changed

3 files changed

+45
-23
lines changed

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,25 +78,23 @@ func newBufferedRegistration(
7878
withDiff bool,
7979
withFiltering bool,
8080
withOmitRemote bool,
81-
withBulkDelivery int,
81+
bulkDeliverySize int,
8282
bufferSz int,
8383
blockWhenFull bool,
8484
metrics *Metrics,
8585
stream Stream,
8686
removeRegFromProcessor func(registration),
8787
) *bufferedRegistration {
8888
br := &bufferedRegistration{
89-
baseRegistration: baseRegistration{
90-
streamCtx: streamCtx,
91-
span: span,
92-
keys: span.AsRange(),
93-
catchUpTimestamp: startTS,
94-
withDiff: withDiff,
95-
withFiltering: withFiltering,
96-
withOmitRemote: withOmitRemote,
97-
removeRegFromProcessor: removeRegFromProcessor,
98-
bulkDelivery: withBulkDelivery,
99-
},
89+
baseRegistration: newBaseRegistration(
90+
streamCtx,
91+
span,
92+
startTS,
93+
withDiff,
94+
withFiltering,
95+
withOmitRemote,
96+
bulkDeliverySize,
97+
removeRegFromProcessor),
10098
metrics: metrics,
10199
stream: stream,
102100
buf: make(chan *sharedEvent, bufferSz),

pkg/kv/kvserver/rangefeed/registry.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,32 @@ type baseRegistration struct {
105105
shouldUnreg atomic.Bool
106106
}
107107

108+
// newBaseRegistration returns a baseRegistration. Note that a baseRegistration
109+
// does not implement registration. Rather this is used by the buffered and
110+
// unbuffered registrations to share some basic methods.
111+
func newBaseRegistration(
112+
streamCtx context.Context,
113+
span roachpb.Span,
114+
startTS hlc.Timestamp,
115+
withDiff bool,
116+
withFiltering bool,
117+
withOmitRemote bool,
118+
bulkDeliverySize int,
119+
removeRegFromProcessor func(registration),
120+
) baseRegistration {
121+
return baseRegistration{
122+
streamCtx: streamCtx,
123+
span: span,
124+
keys: span.AsRange(),
125+
catchUpTimestamp: startTS,
126+
withDiff: withDiff,
127+
withFiltering: withFiltering,
128+
withOmitRemote: withOmitRemote,
129+
bulkDelivery: bulkDeliverySize,
130+
removeRegFromProcessor: removeRegFromProcessor,
131+
}
132+
}
133+
108134
// ID implements interval.Interface.
109135
func (r *baseRegistration) ID() uintptr {
110136
return uintptr(r.id)

pkg/kv/kvserver/rangefeed/unbuffered_registration.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,15 @@ func newUnbufferedRegistration(
102102
removeRegFromProcessor func(registration),
103103
) *unbufferedRegistration {
104104
br := &unbufferedRegistration{
105-
baseRegistration: baseRegistration{
106-
streamCtx: streamCtx,
107-
span: span,
108-
keys: span.AsRange(),
109-
catchUpTimestamp: startTS,
110-
withDiff: withDiff,
111-
withFiltering: withFiltering,
112-
withOmitRemote: withOmitRemote,
113-
removeRegFromProcessor: removeRegFromProcessor,
114-
bulkDelivery: bulkDeliverySize,
115-
},
105+
baseRegistration: newBaseRegistration(
106+
streamCtx,
107+
span,
108+
startTS,
109+
withDiff,
110+
withFiltering,
111+
withOmitRemote,
112+
bulkDeliverySize,
113+
removeRegFromProcessor),
116114
metrics: metrics,
117115
stream: stream,
118116
}

0 commit comments

Comments
 (0)