@@ -1985,6 +1985,7 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19851985
19861986 pts := cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
19871987 tableIDsToRelease := make ([]descpb.ID , 0 )
1988+ tableIDsToCreate := make (map [descpb.ID ]hlc.Timestamp )
19881989 for tableID , frontier := range cf .frontier .Frontiers () {
19891990 tableHighWater := func () hlc.Timestamp {
19901991 // If this table has not yet finished its initial scan, we use the highwater
@@ -2016,10 +2017,7 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20162017 }
20172018 } else {
20182019 // TODO(#152448): Do not include system table protections in these records.
2019- if err := cf .createPerTableProtectedTimestampRecord (ctx , txn , ptsEntries , tableID , tableHighWater , pts ); err != nil {
2020- return hlc.Timestamp {}, false , err
2021- }
2022- updatedPerTablePTS = true
2020+ tableIDsToCreate [tableID ] = tableHighWater
20232021 }
20242022 }
20252023
@@ -2030,6 +2028,13 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20302028 updatedPerTablePTS = true
20312029 }
20322030
2031+ if len (tableIDsToCreate ) > 0 {
2032+ if err := cf .createPerTableProtectedTimestampRecords (ctx , txn , ptsEntries , tableIDsToCreate , pts ); err != nil {
2033+ return hlc.Timestamp {}, false , err
2034+ }
2035+ updatedPerTablePTS = true
2036+ }
2037+
20332038 return newPTS , updatedPerTablePTS , nil
20342039}
20352040
@@ -2072,40 +2077,51 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20722077 return true , nil
20732078}
20742079
2075- func (cf * changeFrontier ) createPerTableProtectedTimestampRecord (
2080+ func (cf * changeFrontier ) createPerTableProtectedTimestampRecords (
20762081 ctx context.Context ,
20772082 txn isql.Txn ,
20782083 ptsEntries * cdcprogresspb.ProtectedTimestampRecords ,
2079- tableID descpb.ID ,
2080- tableHighWater hlc.Timestamp ,
2084+ tableIDsToCreate map [descpb.ID ]hlc.Timestamp ,
20812085 pts protectedts.Storage ,
20822086) error {
2083- // If the table is lagging and doesn't have a per table PTS record,
2084- // we create a new one.
2085- targets := changefeedbase.Targets {}
2086- if cf .targets .Size > 0 {
2087- err := cf .targets .EachTarget (func (target changefeedbase.Target ) error {
2088- if target .DescID == tableID {
2089- targets .Add (target )
2090- }
2091- return nil
2092- })
2087+ if ptsEntries .ProtectedTimestampRecords == nil {
2088+ ptsEntries .ProtectedTimestampRecords = make (map [descpb.ID ]* uuid.UUID )
2089+ }
2090+ for tableID , tableHighWater := range tableIDsToCreate {
2091+ targets , err := cf .createPerTablePTSTarget (tableID )
20932092 if err != nil {
20942093 return err
20952094 }
2095+ ptr := createProtectedTimestampRecord (
2096+ ctx , cf .FlowCtx .Codec (), cf .spec .JobID , targets , tableHighWater ,
2097+ )
2098+ uuid := ptr .ID .GetUUID ()
2099+ ptsEntries .ProtectedTimestampRecords [tableID ] = & uuid
2100+ if err := pts .Protect (ctx , ptr ); err != nil {
2101+ return err
2102+ }
20962103 }
2097- ptr := createProtectedTimestampRecord (
2098- ctx , cf .FlowCtx .Codec (), cf .spec .JobID , targets , tableHighWater ,
2099- )
2100- if ptsEntries .ProtectedTimestampRecords == nil {
2101- ptsEntries .ProtectedTimestampRecords = make (map [descpb.ID ]* uuid.UUID )
2104+ return writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2105+ }
2106+
2107+ func (cf * changeFrontier ) createPerTablePTSTarget (
2108+ tableID descpb.ID ,
2109+ ) (changefeedbase.Targets , error ) {
2110+ targets := changefeedbase.Targets {}
2111+ if cf .targets .Size > 0 {
2112+ if found , err := cf .targets .EachHavingTableID (tableID , func (target changefeedbase.Target ) error {
2113+ targets .Add (target )
2114+ return nil
2115+ }); err != nil {
2116+ return changefeedbase.Targets {}, err
2117+ } else if ! found {
2118+ return changefeedbase.Targets {}, errors .AssertionFailedf ("attempted to create a per-table PTS record for table %d, but no target was found" , tableID )
2119+ }
21022120 }
2103- uuid := ptr .ID .GetUUID ()
2104- ptsEntries .ProtectedTimestampRecords [tableID ] = & uuid
2105- if err := pts .Protect (ctx , ptr ); err != nil {
2106- return err
2121+ if targets .Size != 1 {
2122+ return changefeedbase.Targets {}, errors .AssertionFailedf ("expected 1 target, got %d" , targets .Size )
21072123 }
2108- return writeChangefeedJobInfo ( ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf . spec . JobID )
2124+ return targets , nil
21092125}
21102126
21112127func (cf * changeFrontier ) advanceProtectedTimestamp (
0 commit comments