@@ -2022,14 +2022,19 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20222022 }
20232023
20242024 if len (tableIDsToRelease ) > 0 {
2025- if err := cf .releasePerTableProtectedTimestampRecords (ctx , txn , ptsEntries , tableIDsToRelease , pts ); err != nil {
2025+ if err := cf .releasePerTableProtectedTimestampRecords (ctx , ptsEntries , tableIDsToRelease , pts ); err != nil {
20262026 return hlc.Timestamp {}, false , err
20272027 }
2028- updatedPerTablePTS = true
20292028 }
20302029
20312030 if len (tableIDsToCreate ) > 0 {
2032- if err := cf .createPerTableProtectedTimestampRecords (ctx , txn , ptsEntries , tableIDsToCreate , pts ); err != nil {
2031+ if err := cf .createPerTableProtectedTimestampRecords (ctx , ptsEntries , tableIDsToCreate , pts ); err != nil {
2032+ return hlc.Timestamp {}, false , err
2033+ }
2034+ }
2035+
2036+ if len (tableIDsToRelease ) > 0 || len (tableIDsToCreate ) > 0 {
2037+ if err := writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID ); err != nil {
20332038 return hlc.Timestamp {}, false , err
20342039 }
20352040 updatedPerTablePTS = true
@@ -2040,7 +2045,6 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20402045
20412046func (cf * changeFrontier ) releasePerTableProtectedTimestampRecords (
20422047 ctx context.Context ,
2043- txn isql.Txn ,
20442048 ptsEntries * cdcprogresspb.ProtectedTimestampRecords ,
20452049 tableIDs []descpb.ID ,
20462050 pts protectedts.Storage ,
@@ -2051,7 +2055,7 @@ func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
20512055 }
20522056 delete (ptsEntries .ProtectedTimestampRecords , tableID )
20532057 }
2054- return writeChangefeedJobInfo ( ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf . spec . JobID )
2058+ return nil
20552059}
20562060
20572061func (cf * changeFrontier ) advancePerTableProtectedTimestampRecord (
@@ -2079,7 +2083,6 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20792083
20802084func (cf * changeFrontier ) createPerTableProtectedTimestampRecords (
20812085 ctx context.Context ,
2082- txn isql.Txn ,
20832086 ptsEntries * cdcprogresspb.ProtectedTimestampRecords ,
20842087 tableIDsToCreate map [descpb.ID ]hlc.Timestamp ,
20852088 pts protectedts.Storage ,
@@ -2088,7 +2091,7 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20882091 ptsEntries .ProtectedTimestampRecords = make (map [descpb.ID ]* uuid.UUID )
20892092 }
20902093 for tableID , tableHighWater := range tableIDsToCreate {
2091- targets , err := cf .createPerTablePTSTarget (tableID )
2094+ targets , err := cf .createPerTablePTSTargets (tableID )
20922095 if err != nil {
20932096 return err
20942097 }
@@ -2101,22 +2104,23 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
21012104 return err
21022105 }
21032106 }
2104- return writeChangefeedJobInfo ( ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf . spec . JobID )
2107+ return nil
21052108}
21062109
2107- func (cf * changeFrontier ) createPerTablePTSTarget (
2110+ func (cf * changeFrontier ) createPerTablePTSTargets (
21082111 tableID descpb.ID ,
21092112) (changefeedbase.Targets , error ) {
21102113 targets := changefeedbase.Targets {}
2111- if cf .targets .Size > 0 {
2112- if found , err := cf .targets .EachHavingTableID (tableID , func (target changefeedbase.Target ) error {
2113- targets .Add (target )
2114- return nil
2115- }); err != nil {
2116- return changefeedbase.Targets {}, err
2117- } else if ! found {
2118- return changefeedbase.Targets {}, errors .AssertionFailedf ("attempted to create a per-table PTS record for table %d, but no target was found" , tableID )
2119- }
2114+ if found , err := cf .targets .EachHavingTableID (tableID , func (target changefeedbase.Target ) error {
2115+ targets .Add (target )
2116+ return nil
2117+ }); err != nil {
2118+ return changefeedbase.Targets {}, err
2119+ } else if ! found {
2120+ return changefeedbase.Targets {}, errors .AssertionFailedf (
2121+ "attempted to create a per-table PTS record for table %d, but no target was found" ,
2122+ tableID ,
2123+ )
21202124 }
21212125 if targets .Size != 1 {
21222126 return changefeedbase.Targets {}, errors .AssertionFailedf ("expected 1 target, got %d" , targets .Size )
0 commit comments