Skip to content

Commit 5fdc01e

Browse files
craig[bot]stevendanna
andcommitted
Merge #155591
155591: rangefeed: extract TxnPushNotifier r=wenyihu6 a=stevendanna This addresses a testing TODO by extracting the txn push notification goroutine into a struct that can be tested without a store. This perhaps makes it a bit synthetic of a test since we don't have a test that ensures that the store starts the push notifier. But, we weren't testing anything at all before so this still feels like an improvement. Epic: none Release note: None Co-authored-by: Steven Danna <danna@cockroachlabs.com>
2 parents d1cef1a + 4a53388 commit 5fdc01e

File tree

5 files changed

+104
-45
lines changed

5 files changed

+104
-45
lines changed

pkg/kv/kvserver/rangefeed/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_library(
2020
"stream_manager.go",
2121
"task.go",
2222
"test_helpers.go",
23+
"txn_push_notifier.go",
2324
"unbuffered_registration.go",
2425
"unbuffered_sender.go",
2526
],

pkg/kv/kvserver/rangefeed/processor_helpers_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ type processorTestHelper struct {
211211
rts *resolvedTimestamp
212212
syncEventC func()
213213
sendSpanSync func(*roachpb.Span)
214+
rawScheduler *Scheduler
214215
scheduler *ClientScheduler
215216
toBufferedStreamIfNeeded func(s Stream) Stream
216217
}
@@ -348,7 +349,7 @@ func withSettings(st *cluster.Settings) option {
348349
}
349350
}
350351

351-
func withPushTxnsIntervalAge(interval, age time.Duration) option {
352+
func withPushTxnsIntervalAge(age time.Duration) option {
352353
return func(config *testConfig) {
353354
config.PushTxnsAge = age
354355
}
@@ -451,6 +452,7 @@ func newTestProcessor(
451452
h.sendSpanSync = func(span *roachpb.Span) {
452453
p.syncSendAndWait(&syncEvent{c: make(chan struct{}), testRegCatchupSpan: span})
453454
}
455+
h.rawScheduler = sch
454456
h.scheduler = &p.scheduler
455457
switch cfg.feedType {
456458
case scheduledProcessorWithUnbufferedSender:

pkg/kv/kvserver/rangefeed/processor_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -927,8 +927,8 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
927927
})
928928
}
929929

930-
// TestProcessorTxnPushDisabled tests that processors don't attempt txn pushes
931-
// when disabled.
930+
// TestProcessorTxnPushDisabled tests that the TxnPushNotifier doesn't send txn
931+
// push notifications when disabled.
932932
func TestProcessorTxnPushDisabled(t *testing.T) {
933933
defer leaktest.AfterTest(t)()
934934

@@ -951,12 +951,6 @@ func TestProcessorTxnPushDisabled(t *testing.T) {
951951
PushTxnsEnabled.Override(ctx, &st.SV, false)
952952

953953
// Set up a txn pusher and processor that errors on any pushes.
954-
//
955-
// TODO(kv): We don't test the scheduled processor here, since the setting
956-
// instead controls the Store.startRangefeedTxnPushNotifier() loop which sits
957-
// outside of the processor and can't be tested with this test harness. Write
958-
// a new test when the legacy processor is removed and the scheduled processor
959-
// is used by default.
960954
var tp testTxnPusher
961955
tp.mockPushTxns(func(ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, bool, error) {
962956
err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts)
@@ -965,9 +959,16 @@ func TestProcessorTxnPushDisabled(t *testing.T) {
965959
})
966960

967961
p, h, stopper := newTestProcessor(t, withSettings(st), withPusher(&tp),
968-
withPushTxnsIntervalAge(pushInterval, time.Millisecond))
962+
withPushTxnsIntervalAge(time.Millisecond))
969963
defer stopper.Stop(ctx)
970964

965+
notifier := NewTxnPushNotifier(
966+
pushInterval,
967+
st, h.rawScheduler,
968+
func(f func(i int64)) { f(p.ID()) },
969+
)
970+
require.NoError(t, notifier.Start(ctx, stopper))
971+
971972
// Move the resolved ts forward to just before the txn timestamp.
972973
rts := ts.Add(-1, 0)
973974
require.True(t, p.ForwardClosedTS(ctx, rts))
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package rangefeed
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
13+
"github.com/cockroachdb/cockroach/pkg/util/stop"
14+
)
15+
16+
// TxnPushNotifier enqueues a PushTxnEvent for all processors yielded by
17+
// visitProcessors every interval.
18+
type TxnPushNotifier struct {
19+
interval time.Duration
20+
settings *cluster.Settings
21+
scheduler *Scheduler
22+
visitProcessors func(func(int64))
23+
}
24+
25+
func NewTxnPushNotifier(
26+
interval time.Duration,
27+
settings *cluster.Settings,
28+
scheduler *Scheduler,
29+
visitProcessors func(func(int64)),
30+
) *TxnPushNotifier {
31+
return &TxnPushNotifier{
32+
interval: interval,
33+
settings: settings,
34+
scheduler: scheduler,
35+
visitProcessors: visitProcessors,
36+
}
37+
}
38+
39+
func (t *TxnPushNotifier) Start(ctx context.Context, stopper *stop.Stopper) error {
40+
taskOpts := stop.TaskOpts{
41+
TaskName: "rangefeed-txn-push-notifier",
42+
SpanOpt: stop.SterileRootSpan,
43+
}
44+
ctx, hdl, err := stopper.GetHandle(ctx, taskOpts)
45+
if err != nil {
46+
return err
47+
}
48+
go func(ctx context.Context, hdl *stop.Handle) {
49+
defer hdl.Activate(ctx).Release(ctx)
50+
51+
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
52+
defer cancel()
53+
54+
ticker := time.NewTicker(t.interval)
55+
defer ticker.Stop()
56+
57+
for {
58+
select {
59+
case <-ticker.C:
60+
if !PushTxnsEnabled.Get(&t.settings.SV) {
61+
continue
62+
}
63+
64+
batch := t.scheduler.NewEnqueueBatch()
65+
t.visitProcessors(batch.Add)
66+
t.scheduler.EnqueueBatch(batch, PushTxnQueued)
67+
batch.Close()
68+
case <-ctx.Done():
69+
return
70+
}
71+
}
72+
}(ctx, hdl)
73+
return nil
74+
}

pkg/kv/kvserver/store.go

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2454,7 +2454,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
24542454
// Connect rangefeeds to closed timestamp updates.
24552455
s.startRangefeedUpdater(ctx)
24562456

2457-
s.startRangefeedTxnPushNotifier(ctx)
2457+
if err := s.startRangefeedTxnPushNotifier(ctx); err != nil {
2458+
return err
2459+
}
24582460

24592461
if s.replicateQueue != nil {
24602462
s.storeRebalancer = NewStoreRebalancer(
@@ -2662,48 +2664,27 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) {
26622664
// startRangefeedTxnPushNotifier starts a worker that would periodically
26632665
// enqueue txn push event for rangefeed processors to let them push lagging
26642666
// transactions.
2665-
func (s *Store) startRangefeedTxnPushNotifier(ctx context.Context) {
2667+
func (s *Store) startRangefeedTxnPushNotifier(ctx context.Context) error {
26662668
interval := rangefeed.DefaultPushTxnsInterval
26672669
if i := s.TestingKnobs().RangeFeedPushTxnsInterval; i > 0 {
26682670
interval = i
26692671
}
2670-
2671-
_ /* err */ = s.stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{
2672-
TaskName: "transaction-rangefeed-push-notifier",
2673-
SpanOpt: stop.SterileRootSpan,
2674-
}, func(ctx context.Context) {
2675-
ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx)
2676-
defer cancel()
2677-
2678-
makeSchedulerBatch := func() *rangefeed.SchedulerBatch {
2679-
batch := s.rangefeedScheduler.NewEnqueueBatch()
2672+
tpn := rangefeed.NewTxnPushNotifier(
2673+
interval,
2674+
s.ClusterSettings(),
2675+
s.rangefeedScheduler,
2676+
func(f func(int64)) {
26802677
s.rangefeedReplicas.Lock()
2681-
for _, id := range s.rangefeedReplicas.m {
2682-
if id != 0 {
2683-
// Only process ranges that use scheduler.
2684-
batch.Add(id)
2678+
for _, procID := range s.rangefeedReplicas.m {
2679+
// Only process ranges that use scheduler.
2680+
if procID != 0 {
2681+
f(procID)
26852682
}
26862683
}
26872684
s.rangefeedReplicas.Unlock()
2688-
return batch
2689-
}
2690-
2691-
ticker := time.NewTicker(interval)
2692-
for {
2693-
select {
2694-
case <-ticker.C:
2695-
if !rangefeed.PushTxnsEnabled.Get(&s.ClusterSettings().SV) {
2696-
continue
2697-
}
2698-
batch := makeSchedulerBatch()
2699-
s.rangefeedScheduler.EnqueueBatch(batch, rangefeed.PushTxnQueued)
2700-
batch.Close()
2701-
case <-ctx.Done():
2702-
ticker.Stop()
2703-
return
2704-
}
2705-
}
2706-
})
2685+
},
2686+
)
2687+
return tpn.Start(ctx, s.stopper)
27072688
}
27082689

27092690
func (s *Store) addReplicaWithRangefeed(rangeID roachpb.RangeID, schedulerID int64) {

0 commit comments

Comments
 (0)