@@ -106,9 +106,11 @@ type changeAggregator struct {
106106 // eventConsumer consumes the event.
107107 eventConsumer eventConsumer
108108
109- nextHighWaterFlush time.Time // next time high watermark may be flushed.
110- flushFrequency time.Duration // how often high watermark can be checkpointed.
111- lastSpanFlush time.Time // last time expensive, span based checkpoint was written.
109+ flushFrequency time.Duration // how often high watermark can be checkpointed.
110+
111+ // frontierFlushLimiter is a rate limiter for flushing the span frontier
112+ // to the coordinator.
113+ frontierFlushLimiter * saveRateLimiter
112114
113115 // frontier keeps track of resolved timestamps for spans along with schema change
114116 // boundary information.
@@ -281,6 +283,22 @@ func newChangeAggregatorProcessor(
281283 ca .flushFrequency = changefeedbase .DefaultMinCheckpointFrequency
282284 }
283285
286+ ca .frontierFlushLimiter , err = newSaveRateLimiter (saveRateConfig {
287+ name : "frontier" ,
288+ intervalName : func () redact.SafeValue {
289+ return redact .SafeString (changefeedbase .OptMinCheckpointFrequency )
290+ },
291+ interval : func () time.Duration {
292+ return ca .flushFrequency
293+ },
294+ jitter : func () float64 {
295+ return aggregatorFlushJitter .Get (& ca .FlowCtx .Cfg .Settings .SV )
296+ },
297+ }, timeutil.DefaultTimeSource {})
298+ if err != nil {
299+ return nil , err
300+ }
301+
284302 return ca , nil
285303}
286304
@@ -461,9 +479,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
461479
462480 // Init heartbeat timer.
463481 ca .lastPush = timeutil .Now ()
464-
465- // Generate expensive checkpoint only after we ran for a while.
466- ca .lastSpanFlush = timeutil .Now ()
467482}
468483
469484func (ca * changeAggregator ) startKVFeed (
@@ -730,18 +745,6 @@ var aggregatorFlushJitter = settings.RegisterFloatSetting(
730745 settings .WithPublic ,
731746)
732747
733- func nextFlushWithJitter (s timeutil.TimeSource , d time.Duration , j float64 ) (time.Time , error ) {
734- if j < 0 || d < 0 {
735- return s .Now (), errors .AssertionFailedf ("invalid jitter value: %f, duration: %s" , j , d )
736- }
737- maxJitter := int64 (j * float64 (d ))
738- if maxJitter == 0 {
739- return s .Now ().Add (d ), nil
740- }
741- nextFlush := d + time .Duration (rand .Int63n (maxJitter ))
742- return s .Now ().Add (nextFlush ), nil
743- }
744-
745748// Next is part of the RowSource interface.
746749func (ca * changeAggregator ) Next () (rowenc.EncDatumRow , * execinfrapb.ProducerMetadata ) {
747750 shouldEmitHeartBeat := func () bool {
@@ -897,7 +900,7 @@ func (ca *changeAggregator) flushBufferedEvents(ctx context.Context) error {
897900// noteResolvedSpan periodically flushes Frontier progress from the current
898901// changeAggregator node to the changeFrontier node to allow the changeFrontier
899902// to persist the overall changefeed's progress
900- func (ca * changeAggregator ) noteResolvedSpan (resolved jobspb.ResolvedSpan ) ( returnErr error ) {
903+ func (ca * changeAggregator ) noteResolvedSpan (resolved jobspb.ResolvedSpan ) error {
901904 ctx , sp := tracing .ChildSpan (ca .Ctx (), "changefeed.aggregator.note_resolved_span" )
902905 defer sp .Finish ()
903906
@@ -935,32 +938,17 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
935938 // TODO(yevgeniy): Consider doing something similar to how job checkpointing
936939 // works in the frontier where if we missed the window to checkpoint, we will attempt
937940 // the checkpoint at the next opportune moment.
938- checkpointFrontier := advanced &&
939- (forceFlush || timeutil .Now ().After (ca .nextHighWaterFlush ))
941+ checkpointFrontier := (advanced && forceFlush ) || ca .frontierFlushLimiter .canSave (ctx )
940942
941943 if checkpointFrontier {
942- defer func () {
943- ca .nextHighWaterFlush , err = nextFlushWithJitter (
944- timeutil.DefaultTimeSource {}, ca .flushFrequency , aggregatorFlushJitter .Get (sv ))
945- if err != nil {
946- returnErr = errors .CombineErrors (returnErr , err )
947- }
948- }()
949- return ca .flushFrontier (ctx )
944+ now := timeutil .Now ()
945+ if err := ca .flushFrontier (ctx ); err != nil {
946+ return err
947+ }
948+ ca .frontierFlushLimiter .doneSave (timeutil .Since (now ))
950949 }
951950
952- // At a lower frequency, we checkpoint specific spans in the job progress
953- // either in backfills or if the highwater mark is excessively lagging behind.
954- checkpointSpans := (ca .frontier .InBackfill (resolved ) || ca .frontier .HasLaggingSpans (sv )) &&
955- canCheckpointSpans (sv , ca .lastSpanFlush )
956-
957- if checkpointSpans {
958- defer func () {
959- ca .lastSpanFlush = timeutil .Now ()
960- }()
961- return ca .flushFrontier (ctx )
962- }
963- return returnErr
951+ return nil
964952}
965953
966954// flushFrontier flushes sink and emits resolved spans to the change frontier.
@@ -1313,8 +1301,18 @@ func newChangeFrontierProcessor(
13131301 cf .freqEmitResolved = emitNoResolved
13141302 }
13151303
1316- cf .frontierPersistenceLimiter = newSaveRateLimiter (
1317- "frontier" /* name */ , changefeedbase .FrontierPersistenceInterval )
1304+ cf .frontierPersistenceLimiter , err = newSaveRateLimiter (saveRateConfig {
1305+ name : "frontier" ,
1306+ intervalName : func () redact.SafeValue {
1307+ return changefeedbase .FrontierPersistenceInterval .Name ()
1308+ },
1309+ interval : func () time.Duration {
1310+ return changefeedbase .FrontierPersistenceInterval .Get (& cf .FlowCtx .Cfg .Settings .SV )
1311+ },
1312+ }, timeutil.DefaultTimeSource {})
1313+ if err != nil {
1314+ return nil , err
1315+ }
13181316
13191317 encodingOpts , err := opts .GetEncodingOptions ()
13201318 if err != nil {
@@ -1919,7 +1917,7 @@ func (cf *changeFrontier) maybePersistFrontier(ctx context.Context) error {
19191917
19201918 if cf .spec .JobID == 0 ||
19211919 ! cf .evalCtx .Settings .Version .IsActive (ctx , clusterversion .V25_4 ) ||
1922- ! cf .frontierPersistenceLimiter .canSave (ctx , & cf . FlowCtx . Cfg . Settings . SV ) {
1920+ ! cf .frontierPersistenceLimiter .canSave (ctx ) {
19231921 return nil
19241922 }
19251923
@@ -2349,41 +2347,62 @@ func shouldCountUsageError(err error) bool {
23492347 status .Code (err ) != codes .Canceled
23502348}
23512349
2352- // durationSetting is a duration cluster setting.
2353- type durationSetting interface {
2354- Name () settings.SettingName
2355- Get (sv * settings.Values ) time.Duration
2350+ // saveRateConfig is the config for a saveRateLimiter.
2351+ type saveRateConfig struct {
2352+ name redact.SafeString
2353+ intervalName func () redact.SafeValue
2354+ interval func () time.Duration
2355+ jitter func () float64 // optional
23562356}
23572357
23582358// saveRateLimiter is a rate limiter for saving a piece of progress.
23592359// It uses a duration setting as the minimum interval between saves.
23602360// It also limits saving to not be more frequent than the average
23612361// duration it takes to save progress.
23622362type saveRateLimiter struct {
2363- name redact.SafeString
2364- saveInterval durationSetting
2365- warnEveryN util.EveryN
2363+ config saveRateConfig
2364+ warnEveryN util.EveryN
2365+
2366+ clock timeutil.TimeSource
23662367
23672368 lastSave time.Time
23682369 avgSaveDuration time.Duration
23692370}
23702371
23712372// newSaveRateLimiter returns a new saveRateLimiter.
2372- func newSaveRateLimiter (name redact.SafeString , saveInterval durationSetting ) * saveRateLimiter {
2373- return & saveRateLimiter {
2374- name : name ,
2375- saveInterval : saveInterval ,
2376- warnEveryN : util .Every (time .Minute ),
2373+ func newSaveRateLimiter (
2374+ config saveRateConfig , clock timeutil.TimeSource ,
2375+ ) (* saveRateLimiter , error ) {
2376+ if len (config .name ) == 0 {
2377+ return nil , errors .AssertionFailedf ("name is required" )
2378+ }
2379+ if config .intervalName == nil {
2380+ return nil , errors .AssertionFailedf ("interval name is required" )
2381+ }
2382+ if config .interval == nil {
2383+ return nil , errors .AssertionFailedf ("interval is required" )
23772384 }
2385+ return & saveRateLimiter {
2386+ config : config ,
2387+ warnEveryN : util .Every (time .Minute ),
2388+ clock : clock ,
2389+ }, nil
23782390}
23792391
23802392// canSave returns whether enough time has passed to save progress again.
2381- func (l * saveRateLimiter ) canSave (ctx context.Context , sv * settings. Values ) bool {
2382- interval := l .saveInterval . Get ( sv )
2383- if interval = = 0 {
2393+ func (l * saveRateLimiter ) canSave (ctx context.Context ) bool {
2394+ interval := l .config . interval ( )
2395+ if interval < = 0 {
23842396 return false
23852397 }
2386- now := timeutil .Now ()
2398+ if l .config .jitter != nil {
2399+ if jitter := l .config .jitter (); jitter > 0 {
2400+ if maxJitter := time .Duration (jitter * float64 (interval )); maxJitter > 0 {
2401+ interval += time .Duration (rand .Int63n (int64 (maxJitter ) + 1 ))
2402+ }
2403+ }
2404+ }
2405+ now := l .clock .Now ()
23872406 elapsed := now .Sub (l .lastSave )
23882407 if elapsed < interval {
23892408 return false
@@ -2393,8 +2412,7 @@ func (l *saveRateLimiter) canSave(ctx context.Context, sv *settings.Values) bool
23932412 log .Changefeed .Warningf (ctx , "cannot save %s even though %s has elapsed " +
23942413 "since last save and %s is set to %s because average duration to save was %s " +
23952414 "and further saving is disabled until that much time elapses" ,
2396- l .name , elapsed , l .saveInterval .Name (),
2397- interval , l .avgSaveDuration )
2415+ l .config .name , elapsed , l .config .intervalName (), interval , l .avgSaveDuration )
23982416 }
23992417 return false
24002418 }
@@ -2404,7 +2422,7 @@ func (l *saveRateLimiter) canSave(ctx context.Context, sv *settings.Values) bool
24042422// doneSave must be called after each save is completed with the duration
24052423// it took to save progress.
24062424func (l * saveRateLimiter ) doneSave (saveDuration time.Duration ) {
2407- l .lastSave = timeutil .Now ()
2425+ l .lastSave = l . clock .Now ()
24082426
24092427 // Update the average save duration using an exponential moving average.
24102428 if l .avgSaveDuration == 0 {
0 commit comments