@@ -40,33 +40,32 @@ type registration interface {
4040 // registration implementation to decide how to handle the event and how to
4141 // prevent missing events.
4242 publish (ctx context.Context , event * kvpb.RangeFeedEvent , alloc * SharedBudgetAllocation )
43+
44+ // shouldPublishLogicalOp return true if a logical operation at the given
45+ // timestamp and with the given logicalOpMetadata should be published to this
46+ // registration.
47+ shouldPublishLogicalOp (hlc.Timestamp , logicalOpMetadata ) bool
48+
4349 // runOutputLoop runs the output loop for the registration. The output loop is
4450 // meant to be run in a separate goroutine.
4551 runOutputLoop (ctx context.Context , forStacks roachpb.RangeID )
4652 // drainAllocations drains all pending allocations when being unregistered
4753 // from processor if any.
4854 drainAllocations (ctx context.Context )
49- // waitForCaughtUp waits for the registration to forward all buffered events
50- // if any.
51- waitForCaughtUp (ctx context.Context ) error
52- // setID sets the id field of the registration.
55+
56+ // Span returns the span of the registration.
57+ Span () roachpb.Span
58+
59+ // WithDiff return true if the registration consumer requires the previous
60+ // value.
61+ WithDiff () bool
62+
63+ // registrations are stored in an interval tree by the processor.
64+ //
65+ // setID sets the id field of the registration. This is used by the processor
66+ // to assign the ID that will be used by the interval tree.
5367 setID (int64 )
54- // setSpanAsKeys sets the keys field to the span of the registration.
55- setSpanAsKeys ()
56- // getSpan returns the span of the registration.
57- getSpan () roachpb.Span
58- // getCatchUpTimestamp returns the catchUpTimestamp of the registration.
59- getCatchUpTimestamp () hlc.Timestamp
60- // getWithDiff returns the withDiff field of the registration.
61- getWithDiff () bool
62- // getWithFiltering returns the withFiltering field of the registration.
63- getWithFiltering () bool
64- // getWithOmitRemote returns the withOmitRemote field of the registration.
65- getWithOmitRemote () bool
66- // Range returns the keys field of the registration.
67- Range () interval.Range
68- // ID returns the id field of the registration as a uintptr.
69- ID () uintptr
68+ interval.Interface
7069
7170 // shouldUnregister returns true if this registration should be unregistered
7271 // by unregisterMarkedRegistrations. UnregisterMarkedRegistrations is called
@@ -76,27 +75,60 @@ type registration interface {
7675 // setShouldUnregister sets shouldUnregister to true. Used by the rangefeed
7776 // processor in response to an unregister request.
7877 setShouldUnregister ()
78+
79+ // waitForCaughtUp waits for the registration to forward all buffered events
80+ // if any.
81+ waitForCaughtUp (ctx context.Context ) error
7982}
8083
8184// baseRegistration is a common base for all registration types. It is intended
8285// to be embedded in an actual registration struct.
8386type baseRegistration struct {
84- streamCtx context.Context
85- span roachpb.Span
86- withDiff bool
87- withFiltering bool
88- withOmitRemote bool
89- bulkDelivery int
87+ streamCtx context.Context
88+
89+ // Set during construction.
90+ span roachpb.Span
91+ keys interval.Range // Constructed from span.
92+ withDiff bool
93+ withFiltering bool
94+ withOmitRemote bool
95+ bulkDelivery int
96+ catchUpTimestamp hlc.Timestamp // exclusive
9097 // removeRegFromProcessor is called to remove the registration from its
9198 // processor. This is provided by the creator of the registration and called
9299 // during disconnect(). Since it is called during disconnect it must be
93100 // non-blocking.
94101 removeRegFromProcessor func (registration )
95102
96- catchUpTimestamp hlc.Timestamp // exclusive
97- id int64 // internal
98- keys interval.Range
99- shouldUnreg atomic.Bool
103+ // Set via accessors.
104+ id int64
105+ shouldUnreg atomic.Bool
106+ }
107+
108+ // newBaseRegistration returns a baseRegistration. Note that a baseRegistration
109+ // does not implement registration. Rather this is used by the buffered and
110+ // unbuffered registrations to share some basic methods.
111+ func newBaseRegistration (
112+ streamCtx context.Context ,
113+ span roachpb.Span ,
114+ startTS hlc.Timestamp ,
115+ withDiff bool ,
116+ withFiltering bool ,
117+ withOmitRemote bool ,
118+ bulkDeliverySize int ,
119+ removeRegFromProcessor func (registration ),
120+ ) baseRegistration {
121+ return baseRegistration {
122+ streamCtx : streamCtx ,
123+ span : span ,
124+ keys : span .AsRange (),
125+ catchUpTimestamp : startTS ,
126+ withDiff : withDiff ,
127+ withFiltering : withFiltering ,
128+ withOmitRemote : withOmitRemote ,
129+ bulkDelivery : bulkDeliverySize ,
130+ removeRegFromProcessor : removeRegFromProcessor ,
131+ }
100132}
101133
102134// ID implements interval.Interface.
@@ -117,24 +149,29 @@ func (r *baseRegistration) setID(id int64) {
117149 r .id = id
118150}
119151
120- func (r * baseRegistration ) setSpanAsKeys () {
121- r .keys = r .span .AsRange ()
122- }
123-
124- func (r * baseRegistration ) getSpan () roachpb.Span {
152+ func (r * baseRegistration ) Span () roachpb.Span {
125153 return r .span
126154}
127155
128- func (r * baseRegistration ) getCatchUpTimestamp () hlc.Timestamp {
129- return r .catchUpTimestamp
130- }
131-
132- func (r * baseRegistration ) getWithFiltering () bool {
133- return r .withFiltering
134- }
135-
136- func (r * baseRegistration ) getWithOmitRemote () bool {
137- return r .withOmitRemote
156+ //gcassert:inline
157+ func (r * baseRegistration ) shouldPublishLogicalOp (
158+ minTS hlc.Timestamp , valueMetadata logicalOpMetadata ,
159+ ) bool {
160+ // Don't publish events if they:
161+ // 1. are equal to or less than the registration's starting timestamp, or
162+ // 2. have OmitInRangefeeds = true and this registration has opted into filtering, or
163+ // 3. have OmitRemote = true and this value is from a remote cluster.
164+ if ! r .catchUpTimestamp .Less (minTS ) {
165+ return false
166+ }
167+ if r .withFiltering && valueMetadata .omitInRangefeeds {
168+ return false
169+ }
170+ isRemoteEvent := valueMetadata .originID != 0
171+ if r .withOmitRemote && isRemoteEvent {
172+ return false
173+ }
174+ return true
138175}
139176
140177func (r * baseRegistration ) shouldUnregister () bool {
@@ -145,7 +182,7 @@ func (r *baseRegistration) setShouldUnregister() {
145182 r .shouldUnreg .Store (true )
146183}
147184
148- func (r * baseRegistration ) getWithDiff () bool {
185+ func (r * baseRegistration ) WithDiff () bool {
149186 return r .withDiff
150187}
151188
@@ -325,7 +362,6 @@ func (reg *registry) updateMetricsOnUnregistration(r registration) {
325362func (reg * registry ) Register (ctx context.Context , r registration ) {
326363 reg .updateMetricsOnRegistration (r )
327364 r .setID (reg .nextID ())
328- r .setSpanAsKeys ()
329365 if err := reg .tree .Insert (r , false /* fast */ ); err != nil {
330366 // TODO(erikgrinaker): these errors should arguably be returned.
331367 log .KvDistribution .Fatalf (ctx , "%v" , err )
@@ -368,11 +404,7 @@ func (reg *registry) PublishToOverlapping(
368404 }
369405
370406 reg .forOverlappingRegs (ctx , span , func (r registration ) (bool , * kvpb.Error ) {
371- // Don't publish events if they:
372- // 1. are equal to or less than the registration's starting timestamp, or
373- // 2. have OmitInRangefeeds = true and this registration has opted into filtering, or
374- // 3. have OmitRemote = true and this value is from a remote cluster.
375- if r .getCatchUpTimestamp ().Less (minTS ) && ! (r .getWithFiltering () && valueMetadata .omitInRangefeeds ) && (! r .getWithOmitRemote () || valueMetadata .originID == 0 ) {
407+ if r .shouldPublishLogicalOp (minTS , valueMetadata ) {
376408 r .publish (ctx , event , alloc )
377409 }
378410 return false , nil
0 commit comments