@@ -2454,7 +2454,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
24542454 // Connect rangefeeds to closed timestamp updates.
24552455 s .startRangefeedUpdater (ctx )
24562456
2457- s .startRangefeedTxnPushNotifier (ctx )
2457+ if err := s .startRangefeedTxnPushNotifier (ctx ); err != nil {
2458+ return err
2459+ }
24582460
24592461 if s .replicateQueue != nil {
24602462 s .storeRebalancer = NewStoreRebalancer (
@@ -2662,48 +2664,27 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) {
26622664// startRangefeedTxnPushNotifier starts a worker that would periodically
26632665// enqueue txn push event for rangefeed processors to let them push lagging
26642666// transactions.
2665- func (s * Store ) startRangefeedTxnPushNotifier (ctx context.Context ) {
2667+ func (s * Store ) startRangefeedTxnPushNotifier (ctx context.Context ) error {
26662668 interval := rangefeed .DefaultPushTxnsInterval
26672669 if i := s .TestingKnobs ().RangeFeedPushTxnsInterval ; i > 0 {
26682670 interval = i
26692671 }
2670-
2671- _ /* err */ = s .stopper .RunAsyncTaskEx (ctx , stop.TaskOpts {
2672- TaskName : "transaction-rangefeed-push-notifier" ,
2673- SpanOpt : stop .SterileRootSpan ,
2674- }, func (ctx context.Context ) {
2675- ctx , cancel := s .stopper .WithCancelOnQuiesce (ctx )
2676- defer cancel ()
2677-
2678- makeSchedulerBatch := func () * rangefeed.SchedulerBatch {
2679- batch := s .rangefeedScheduler .NewEnqueueBatch ()
2672+ tpn := rangefeed .NewTxnPushNotifier (
2673+ interval ,
2674+ s .ClusterSettings (),
2675+ s .rangefeedScheduler ,
2676+ func (f func (int64 )) {
26802677 s .rangefeedReplicas .Lock ()
2681- for _ , id := range s .rangefeedReplicas .m {
2682- if id != 0 {
2683- // Only process ranges that use scheduler.
2684- batch . Add ( id )
2678+ for _ , procID := range s .rangefeedReplicas .m {
2679+ // Only process ranges that use scheduler.
2680+ if procID != 0 {
2681+ f ( procID )
26852682 }
26862683 }
26872684 s .rangefeedReplicas .Unlock ()
2688- return batch
2689- }
2690-
2691- ticker := time .NewTicker (interval )
2692- for {
2693- select {
2694- case <- ticker .C :
2695- if ! rangefeed .PushTxnsEnabled .Get (& s .ClusterSettings ().SV ) {
2696- continue
2697- }
2698- batch := makeSchedulerBatch ()
2699- s .rangefeedScheduler .EnqueueBatch (batch , rangefeed .PushTxnQueued )
2700- batch .Close ()
2701- case <- ctx .Done ():
2702- ticker .Stop ()
2703- return
2704- }
2705- }
2706- })
2685+ },
2686+ )
2687+ return tpn .Start (ctx , s .stopper )
27072688}
27082689
27092690func (s * Store ) addReplicaWithRangefeed (rangeID roachpb.RangeID , schedulerID int64 ) {
0 commit comments