@@ -40,33 +40,32 @@ type registration interface {
4040 // registration implementation to decide how to handle the event and how to
4141 // prevent missing events.
4242 publish (ctx context.Context , event * kvpb.RangeFeedEvent , alloc * SharedBudgetAllocation )
43+
44+ // shouldPublishLogicalOp return true if a logical operation at the given
45+ // timestamp and with the given logicalOpMetadata should be published to this
46+ // registration.
47+ shouldPublishLogicalOp (hlc.Timestamp , logicalOpMetadata ) bool
48+
4349 // runOutputLoop runs the output loop for the registration. The output loop is
4450 // meant to be run in a separate goroutine.
4551 runOutputLoop (ctx context.Context , forStacks roachpb.RangeID )
4652 // drainAllocations drains all pending allocations when being unregistered
4753 // from processor if any.
4854 drainAllocations (ctx context.Context )
49- // waitForCaughtUp waits for the registration to forward all buffered events
50- // if any.
51- waitForCaughtUp (ctx context.Context ) error
52- // setID sets the id field of the registration.
55+
56+ // Span returns the span of the registration.
57+ Span () roachpb.Span
58+
59+ // WithDiff return true if the registration consumer requires the previous
60+ // value.
61+ WithDiff () bool
62+
63+ // registrations are stored in an interval tree by the processor.
64+ //
65+ // setID sets the id field of the registration. This is used by the processor
66+ // to assign the ID that will be used by the interval tree.
5367 setID (int64 )
54- // setSpanAsKeys sets the keys field to the span of the registration.
55- setSpanAsKeys ()
56- // getSpan returns the span of the registration.
57- getSpan () roachpb.Span
58- // getCatchUpTimestamp returns the catchUpTimestamp of the registration.
59- getCatchUpTimestamp () hlc.Timestamp
60- // getWithDiff returns the withDiff field of the registration.
61- getWithDiff () bool
62- // getWithFiltering returns the withFiltering field of the registration.
63- getWithFiltering () bool
64- // getWithOmitRemote returns the withOmitRemote field of the registration.
65- getWithOmitRemote () bool
66- // Range returns the keys field of the registration.
67- Range () interval.Range
68- // ID returns the id field of the registration as a uintptr.
69- ID () uintptr
68+ interval.Interface
7069
7170 // shouldUnregister returns true if this registration should be unregistered
7271 // by unregisterMarkedRegistrations. UnregisterMarkedRegistrations is called
@@ -76,27 +75,34 @@ type registration interface {
7675 // setShouldUnregister sets shouldUnregister to true. Used by the rangefeed
7776 // processor in response to an unregister request.
7877 setShouldUnregister ()
78+
79+ // waitForCaughtUp waits for the registration to forward all buffered events
80+ // if any.
81+ waitForCaughtUp (ctx context.Context ) error
7982}
8083
8184// baseRegistration is a common base for all registration types. It is intended
8285// to be embedded in an actual registration struct.
8386type baseRegistration struct {
84- streamCtx context.Context
85- span roachpb.Span
86- withDiff bool
87- withFiltering bool
88- withOmitRemote bool
89- bulkDelivery int
87+ streamCtx context.Context
88+
89+ // Set during construction.
90+ span roachpb.Span
91+ keys interval.Range // Constructed from span.
92+ withDiff bool
93+ withFiltering bool
94+ withOmitRemote bool
95+ bulkDelivery int
96+ catchUpTimestamp hlc.Timestamp // exclusive
9097 // removeRegFromProcessor is called to remove the registration from its
9198 // processor. This is provided by the creator of the registration and called
9299 // during disconnect(). Since it is called during disconnect it must be
93100 // non-blocking.
94101 removeRegFromProcessor func (registration )
95102
96- catchUpTimestamp hlc.Timestamp // exclusive
97- id int64 // internal
98- keys interval.Range
99- shouldUnreg atomic.Bool
103+ // Set via accessors.
104+ id int64
105+ shouldUnreg atomic.Bool
100106}
101107
102108// ID implements interval.Interface.
@@ -117,24 +123,29 @@ func (r *baseRegistration) setID(id int64) {
117123 r .id = id
118124}
119125
120- func (r * baseRegistration ) setSpanAsKeys () {
121- r .keys = r .span .AsRange ()
122- }
123-
124- func (r * baseRegistration ) getSpan () roachpb.Span {
126+ func (r * baseRegistration ) Span () roachpb.Span {
125127 return r .span
126128}
127129
128- func (r * baseRegistration ) getCatchUpTimestamp () hlc.Timestamp {
129- return r .catchUpTimestamp
130- }
131-
132- func (r * baseRegistration ) getWithFiltering () bool {
133- return r .withFiltering
134- }
135-
136- func (r * baseRegistration ) getWithOmitRemote () bool {
137- return r .withOmitRemote
130+ //gcassert:inline
131+ func (r * baseRegistration ) shouldPublishLogicalOp (
132+ minTS hlc.Timestamp , valueMetadata logicalOpMetadata ,
133+ ) bool {
134+ // Don't publish events if they:
135+ // 1. are equal to or less than the registration's starting timestamp, or
136+ // 2. have OmitInRangefeeds = true and this registration has opted into filtering, or
137+ // 3. have OmitRemote = true and this value is from a remote cluster.
138+ if ! r .catchUpTimestamp .Less (minTS ) {
139+ return false
140+ }
141+ if r .withFiltering && valueMetadata .omitInRangefeeds {
142+ return false
143+ }
144+ isRemoteEvent := valueMetadata .originID != 0
145+ if r .withOmitRemote && isRemoteEvent {
146+ return false
147+ }
148+ return true
138149}
139150
140151func (r * baseRegistration ) shouldUnregister () bool {
@@ -145,7 +156,7 @@ func (r *baseRegistration) setShouldUnregister() {
145156 r .shouldUnreg .Store (true )
146157}
147158
148- func (r * baseRegistration ) getWithDiff () bool {
159+ func (r * baseRegistration ) WithDiff () bool {
149160 return r .withDiff
150161}
151162
@@ -325,7 +336,6 @@ func (reg *registry) updateMetricsOnUnregistration(r registration) {
325336func (reg * registry ) Register (ctx context.Context , r registration ) {
326337 reg .updateMetricsOnRegistration (r )
327338 r .setID (reg .nextID ())
328- r .setSpanAsKeys ()
329339 if err := reg .tree .Insert (r , false /* fast */ ); err != nil {
330340 // TODO(erikgrinaker): these errors should arguably be returned.
331341 log .KvDistribution .Fatalf (ctx , "%v" , err )
@@ -368,11 +378,7 @@ func (reg *registry) PublishToOverlapping(
368378 }
369379
370380 reg .forOverlappingRegs (ctx , span , func (r registration ) (bool , * kvpb.Error ) {
371- // Don't publish events if they:
372- // 1. are equal to or less than the registration's starting timestamp, or
373- // 2. have OmitInRangefeeds = true and this registration has opted into filtering, or
374- // 3. have OmitRemote = true and this value is from a remote cluster.
375- if r .getCatchUpTimestamp ().Less (minTS ) && ! (r .getWithFiltering () && valueMetadata .omitInRangefeeds ) && (! r .getWithOmitRemote () || valueMetadata .originID == 0 ) {
381+ if r .shouldPublishLogicalOp (minTS , valueMetadata ) {
376382 r .publish (ctx , event , alloc )
377383 }
378384 return false , nil
0 commit comments