@@ -345,19 +345,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
345345
346346 spans , err := ca .setupSpansAndFrontier ()
347347 if err != nil {
348- if log .V (2 ) {
349- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error setting up spans and frontier: %v" , err )
350- }
348+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error setting up spans and frontier: %v" , err )
351349 ca .MoveToDraining (err )
352350 ca .cancel ()
353351 return
354352 }
355353
356354 feed , err := makeChangefeedConfigFromJobDetails (ctx , ca .spec .Feed , ca .FlowCtx .Cfg .ExecutorConfig .(* sql.ExecutorConfig ))
357355 if err != nil {
358- if log .V (2 ) {
359- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error making changefeed config: %v" , err )
360- }
356+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error making changefeed config: %v" , err )
361357 ca .MoveToDraining (err )
362358 ca .cancel ()
363359 return
@@ -380,19 +376,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
380376 scope , _ := opts .GetMetricScope ()
381377 ca .sliMetrics , err = ca .metrics .getSLIMetrics (scope )
382378 if err != nil {
383- if log .V (2 ) {
384- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error getting sli metrics: %v" , err )
385- }
379+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error getting sli metrics: %v" , err )
386380 ca .MoveToDraining (err )
387381 ca .cancel ()
388382 return
389383 }
390384 ca .sliMetricsID = ca .sliMetrics .claimId ()
391385 ca .targets , err = AllTargets (ctx , ca .spec .Feed , ca .FlowCtx .Cfg .ExecutorConfig .(* sql.ExecutorConfig ))
392386 if err != nil {
393- if log .V (2 ) {
394- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error getting targets: %v" , err )
395- }
387+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error getting targets: %v" , err )
396388 ca .MoveToDraining (err )
397389 ca .cancel ()
398390 return
@@ -402,9 +394,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
402394 recorder , err = ca .wrapMetricsRecorderWithTelemetry (ctx , recorder , ca .targets )
403395
404396 if err != nil {
405- if log .V (2 ) {
406- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error wrapping metrics controller: %v" , err )
407- }
397+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error wrapping metrics controller: %v" , err )
408398 ca .MoveToDraining (err )
409399 ca .cancel ()
410400 }
@@ -413,9 +403,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
413403 ca .spec .User (), ca .spec .JobID , recorder , ca .targets )
414404 if err != nil {
415405 err = changefeedbase .MarkRetryableError (err )
416- if log .V (2 ) {
417- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error getting sink: %v" , err )
418- }
406+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error getting sink: %v" , err )
419407 ca .MoveToDraining (err )
420408 ca .cancel ()
421409 return
@@ -447,9 +435,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
447435 limit := changefeedbase .PerChangefeedMemLimit .Get (& ca .FlowCtx .Cfg .Settings .SV )
448436 ca .eventProducer , ca .kvFeedDoneCh , ca .errCh , err = ca .startKVFeed (ctx , spans , kvFeedHighWater , needsInitialScan , feed , pool , limit , opts )
449437 if err != nil {
450- if log .V (2 ) {
451- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error starting kv feed: %v" , err )
452- }
438+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error starting kv feed: %v" , err )
453439 ca .MoveToDraining (err )
454440 ca .cancel ()
455441 return
@@ -459,9 +445,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
459445 ctx , ca .FlowCtx .Cfg , ca .spec , feed , ca .frontier , kvFeedHighWater ,
460446 ca .sink , ca .metrics , ca .sliMetrics , ca .knobs )
461447 if err != nil {
462- if log .V (2 ) {
463- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error creating event consumer: %v" , err )
464- }
448+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error creating event consumer: %v" , err )
465449 ca .MoveToDraining (err )
466450 ca .cancel ()
467451 return
@@ -661,7 +645,7 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
661645 // Checkpointed spans are spans that were above the highwater mark, and we
662646 // must preserve that information in the frontier for future checkpointing.
663647 if err := checkpoint .Restore (ca .frontier , ca .spec .SpanLevelCheckpoint ); err != nil {
664- return nil , err
648+ return nil , errors . Wrapf ( err , "failed to restore span-level checkpoint" )
665649 }
666650
667651 return spans , nil
@@ -777,9 +761,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
777761 // NB: we do not invoke ca.cancel here -- just merely moving
778762 // to drain state so that the trailing metadata callback
779763 // has a chance to produce shutdown checkpoint.
780- if log .V (2 ) {
781- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error while checking for node drain: %v" , err )
782- }
764+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error while checking for node drain: %v" , err )
783765 ca .MoveToDraining (err )
784766 break
785767 }
@@ -810,9 +792,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
810792 }
811793 // Shut down the poller if it wasn't already.
812794 ca .cancel ()
813- if log .V (2 ) {
814- log .Dev .Infof (ca .Ctx (), "change aggregator moving to draining due to error from tick: %v" , err )
815- }
795+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error from tick: %v" , err )
816796 ca .MoveToDraining (err )
817797 break
818798 }
@@ -1378,9 +1358,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13781358 scope := cf .spec .Feed .Opts [changefeedbase .OptMetricsScope ]
13791359 sli , err := cf .metrics .getSLIMetrics (scope )
13801360 if err != nil {
1381- if log .V (2 ) {
1382- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining due to error getting sli metrics: %v" , err )
1383- }
1361+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to error getting sli metrics: %v" , err )
13841362 cf .MoveToDraining (err )
13851363 return
13861364 }
@@ -1389,9 +1367,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13891367 cf .spec .User (), cf .spec .JobID , sli , cf .targets )
13901368 if err != nil {
13911369 err = changefeedbase .MarkRetryableError (err )
1392- if log .V (2 ) {
1393- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining due to error getting sink: %v" , err )
1394- }
1370+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to error getting sink: %v" , err )
13951371 cf .MoveToDraining (err )
13961372 return
13971373 }
@@ -1404,9 +1380,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14041380
14051381 cf .highWaterAtStart = cf .spec .Feed .StatementTime
14061382 if cf .evalCtx .ChangefeedState == nil {
1407- if log .V (2 ) {
1408- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining due to missing changefeed state" )
1409- }
1383+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to missing changefeed state" )
14101384 cf .MoveToDraining (errors .AssertionFailedf ("expected initialized local state" ))
14111385 return
14121386 }
@@ -1418,9 +1392,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14181392 if cf .spec .JobID != 0 {
14191393 job , err := cf .FlowCtx .Cfg .JobRegistry .LoadClaimedJob (ctx , cf .spec .JobID )
14201394 if err != nil {
1421- if log .V (2 ) {
1422- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining due to error loading claimed job: %v" , err )
1423- }
1395+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to error loading claimed job: %v" , err )
14241396 cf .MoveToDraining (err )
14251397 return
14261398 }
@@ -1472,15 +1444,16 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14721444 perTableTracking ,
14731445 cf .spec .TrackedSpans ... )
14741446 if err != nil {
1475- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining due to error setting up frontier: %v" , err )
1447+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to error setting up frontier: %v" , err )
14761448 cf .MoveToDraining (err )
14771449 return
14781450 }
14791451
14801452 if err := checkpoint .Restore (cf .frontier , cf .spec .SpanLevelCheckpoint ); err != nil {
1481- if log .V (2 ) {
1482- log .Dev .Infof (cf .Ctx (), "change frontier encountered error on checkpoint restore: %v" , err )
1483- }
1453+ log .Dev .Warningf (cf .Ctx (),
1454+ "moving to draining due to error restoring span-level checkpoint: %v" , err )
1455+ cf .MoveToDraining (err )
1456+ return
14841457 }
14851458
14861459 if cf .knobs .AfterCoordinatorFrontierRestore != nil {
@@ -1637,39 +1610,31 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
16371610 }
16381611 }
16391612
1640- if log .V (2 ) {
1641- log .Dev .Infof (cf .Ctx (),
1642- "change frontier moving to draining after reaching resolved span boundary (%s): %v" ,
1643- boundaryType , err )
1644- }
1613+ log .Dev .Warningf (cf .Ctx (),
1614+ "moving to draining after reaching resolved span boundary (%s): %v" ,
1615+ boundaryType , err )
16451616 cf .MoveToDraining (err )
16461617 break
16471618 }
16481619
16491620 row , meta := cf .input .Next ()
16501621 if meta != nil {
16511622 if meta .Err != nil {
1652- if log .V (2 ) {
1653- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining after getting error from aggregator: %v" , meta .Err )
1654- }
1623+ log .Dev .Warningf (cf .Ctx (), "moving to draining after getting error from aggregator: %v" , meta .Err )
16551624 cf .MoveToDraining (nil /* err */ )
16561625 }
16571626 if meta .Changefeed != nil && meta .Changefeed .DrainInfo != nil {
16581627 // Seeing changefeed drain info metadata from the aggregator means
16591628 // that the aggregator exited due to node shutdown. Transition to
16601629 // draining so that the remaining aggregators will shut down and
16611630 // transmit their up-to-date frontier.
1662- if log .V (2 ) {
1663- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining due to aggregator shutdown: %s" , meta .Changefeed )
1664- }
1631+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to aggregator shutdown: %s" , meta .Changefeed )
16651632 cf .MoveToDraining (changefeedbase .ErrNodeDraining )
16661633 }
16671634 return nil , meta
16681635 }
16691636 if row == nil {
1670- if log .V (2 ) {
1671- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining after getting nil row from aggregator" )
1672- }
1637+ log .Dev .Warningf (cf .Ctx (), "moving to draining after getting nil row from aggregator" )
16731638 cf .MoveToDraining (nil /* err */ )
16741639 break
16751640 }
@@ -1684,9 +1649,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
16841649 }
16851650
16861651 if err := cf .noteAggregatorProgress (cf .Ctx (), row [0 ]); err != nil {
1687- if log .V (2 ) {
1688- log .Dev .Infof (cf .Ctx (), "change frontier moving to draining after error while processing aggregator progress: %v" , err )
1689- }
1652+ log .Dev .Warningf (cf .Ctx (), "moving to draining after error while processing aggregator progress: %v" , err )
16901653 cf .MoveToDraining (err )
16911654 break
16921655 }
0 commit comments