@@ -1999,7 +1999,19 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19991999 }()
20002000
20012001 if cf .spec .ProgressConfig != nil && cf .spec .ProgressConfig .PerTableProtectedTimestamps {
2002- return cf .managePerTableProtectedTimestamps (ctx , txn , & ptsEntries , highwater )
2002+ updatedPerTablePTS , err :=
2003+ cf .managePerTableProtectedTimestamps (ctx , txn , & ptsEntries , highwater , pts )
2004+ if err != nil {
2005+ return false , err
2006+ }
2007+
2008+ updatedSystemTablesPTS , err :=
2009+ cf .advanceSystemTablesProtectedTimestamp (ctx , txn , & ptsEntries , highwater , pts )
2010+ if err != nil {
2011+ return false , err
2012+ }
2013+
2014+ return updatedPerTablePTS || updatedSystemTablesPTS , nil
20032015 }
20042016
20052017 return cf .advanceProtectedTimestamp (ctx , progress , pts , highwater )
@@ -2010,8 +2022,8 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20102022 txn isql.Txn ,
20112023 ptsEntries * cdcprogresspb.ProtectedTimestampRecords ,
20122024 highwater hlc.Timestamp ,
2025+ pts protectedts.Storage ,
20132026) (updatedPerTablePTS bool , err error ) {
2014- pts := cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
20152027 tableIDsToCreate := make (map [descpb.ID ]hlc.Timestamp )
20162028 for tableID , frontier := range cf .frontier .Frontiers () {
20172029 tableHighWater := func () hlc.Timestamp {
@@ -2023,14 +2035,13 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20232035 return frontier .Frontier ()
20242036 }()
20252037
2026- if ptsEntries .ProtectedTimestampRecords [tableID ] != uuid .Nil {
2038+ if ptsEntries .UserTables [tableID ] != uuid .Nil {
20272039 if updated , err := cf .advancePerTableProtectedTimestampRecord (ctx , ptsEntries , tableID , tableHighWater , pts ); err != nil {
20282040 return false , err
20292041 } else if updated {
20302042 updatedPerTablePTS = true
20312043 }
20322044 } else {
2033- // TODO(#152448): Do not include system table protections in these records.
20342045 // TODO(#153894): Newly added/dropped tables should be caught and
20352046 // protected when starting the frontier, not here.
20362047 tableIDsToCreate [tableID ] = tableHighWater
@@ -2061,7 +2072,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20612072 tableHighWater hlc.Timestamp ,
20622073 pts protectedts.Storage ,
20632074) (updated bool , err error ) {
2064- rec , err := pts .GetRecord (ctx , ptsEntries .ProtectedTimestampRecords [tableID ])
2075+ rec , err := pts .GetRecord (ctx , ptsEntries .UserTables [tableID ])
20652076 if err != nil {
20662077 return false , err
20672078 }
@@ -2071,7 +2082,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20712082 return false , nil
20722083 }
20732084
2074- if err := pts .UpdateTimestamp (ctx , ptsEntries .ProtectedTimestampRecords [tableID ], tableHighWater ); err != nil {
2085+ if err := pts .UpdateTimestamp (ctx , ptsEntries .UserTables [tableID ], tableHighWater ); err != nil {
20752086 return false , err
20762087 }
20772088 return true , nil
@@ -2083,19 +2094,19 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20832094 tableIDsToCreate map [descpb.ID ]hlc.Timestamp ,
20842095 pts protectedts.Storage ,
20852096) error {
2086- if ptsEntries .ProtectedTimestampRecords == nil {
2087- ptsEntries .ProtectedTimestampRecords = make (map [descpb.ID ]uuid.UUID )
2097+ if ptsEntries .UserTables == nil {
2098+ ptsEntries .UserTables = make (map [descpb.ID ]uuid.UUID )
20882099 }
20892100 for tableID , tableHighWater := range tableIDsToCreate {
20902101 targets , err := cf .createPerTablePTSTargets (tableID )
20912102 if err != nil {
20922103 return err
20932104 }
2094- ptr := createProtectedTimestampRecord (
2105+ ptr := createUserTablesProtectedTimestampRecord (
20952106 ctx , cf .FlowCtx .Codec (), cf .spec .JobID , targets , tableHighWater ,
20962107 )
20972108 uuid := ptr .ID .GetUUID ()
2098- ptsEntries .ProtectedTimestampRecords [tableID ] = uuid
2109+ ptsEntries .UserTables [tableID ] = uuid
20992110 if err := pts .Protect (ctx , ptr ); err != nil {
21002111 return err
21012112 }
@@ -2124,14 +2135,63 @@ func (cf *changeFrontier) createPerTablePTSTargets(
21242135 return targets , nil
21252136}
21262137
2138+ func (cf * changeFrontier ) advanceSystemTablesProtectedTimestamp (
2139+ ctx context.Context ,
2140+ txn isql.Txn ,
2141+ ptsEntries * cdcprogresspb.ProtectedTimestampRecords ,
2142+ timestamp hlc.Timestamp ,
2143+ pts protectedts.Storage ,
2144+ ) (updated bool , err error ) {
2145+ if ptsEntries .SystemTables == uuid .Nil {
2146+ // All changefeeds using per-table PTS records should have a system tables
2147+ // PTS record. If they are missing one, it should be made when starting the
2148+ // changefeed.
2149+ return false , errors .AssertionFailedf ("expected system tables PTS record to be present" )
2150+ }
2151+
2152+ rec , err := pts .GetRecord (ctx , ptsEntries .SystemTables )
2153+ if err != nil {
2154+ return false , err
2155+ }
2156+
2157+ if ! makeSystemTablesTargetToProtect ().Equal (rec .Target ) {
2158+ if cf .knobs .PreservePTSTargets != nil && cf .knobs .PreservePTSTargets () {
2159+ return false , nil
2160+ }
2161+ if err := cf .remakeSystemTablesPTSRecord (ctx , txn , pts , ptsEntries , timestamp ); err != nil {
2162+ return false , err
2163+ }
2164+ log .VEventf (
2165+ ctx , 2 , "remade system tables PTS record %v to include all targets" ,
2166+ ptsEntries .SystemTables ,
2167+ )
2168+ return true , nil
2169+ }
2170+
2171+ ptsUpdateLag := changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
2172+ if rec .Timestamp .AddDuration (ptsUpdateLag ).After (timestamp ) {
2173+ return false , nil
2174+ }
2175+
2176+ if err := pts .UpdateTimestamp (ctx , ptsEntries .SystemTables , timestamp ); err != nil {
2177+ return false , err
2178+ }
2179+ return true , nil
2180+ }
2181+
2182+ // advanceProtectedTimestamp advances the single PTS record for changefeeds that
2183+ // are not using per-table protected timestamps.
21272184func (cf * changeFrontier ) advanceProtectedTimestamp (
21282185 ctx context.Context ,
21292186 progress * jobspb.ChangefeedProgress ,
21302187 pts protectedts.Storage ,
21312188 timestamp hlc.Timestamp ,
21322189) (updated bool , err error ) {
21332190 if progress .ProtectedTimestampRecord == uuid .Nil {
2134- ptr := createProtectedTimestampRecord (
2191+ // For changefeeds not using per-table PTS, system tables are protected
2192+ // in the single PTS record for the changefeed with all other targets
2193+ // in a combined record.
2194+ ptr := createCombinedProtectedTimestampRecord (
21352195 ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , timestamp ,
21362196 )
21372197 progress .ProtectedTimestampRecord = ptr .ID .GetUUID ()
@@ -2159,7 +2219,7 @@ func (cf *changeFrontier) advanceProtectedTimestamp(
21592219 // If we've identified more tables that need to be protected since this
21602220 // changefeed was created, it will be missing here. If so, we "migrate" it
21612221 // to include all the appropriate targets.
2162- if ! makeTargetToProtect (cf .targets ).Equal (rec .Target ) {
2222+ if ! makeCombinedTargetToProtect (cf .targets ).Equal (rec .Target ) {
21632223 if preservePTSTargets := cf .knobs .PreservePTSTargets != nil && cf .knobs .PreservePTSTargets (); preservePTSTargets {
21642224 return false , nil
21652225 }
@@ -2190,7 +2250,7 @@ func (cf *changeFrontier) remakePTSRecord(
21902250 resolved hlc.Timestamp ,
21912251) error {
21922252 prevRecordId := progress .ProtectedTimestampRecord
2193- ptr := createProtectedTimestampRecord (
2253+ ptr := createCombinedProtectedTimestampRecord (
21942254 ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , resolved ,
21952255 )
21962256 if err := pts .Protect (ctx , ptr ); err != nil {
@@ -2207,6 +2267,29 @@ func (cf *changeFrontier) remakePTSRecord(
22072267 return nil
22082268}
22092269
2270+ func (cf * changeFrontier ) remakeSystemTablesPTSRecord (
2271+ ctx context.Context ,
2272+ txn isql.Txn ,
2273+ pts protectedts.Storage ,
2274+ ptsEntries * cdcprogresspb.ProtectedTimestampRecords ,
2275+ resolved hlc.Timestamp ,
2276+ ) error {
2277+ ptr := createSystemTablesProtectedTimestampRecord (
2278+ ctx , cf .FlowCtx .Codec (), cf .spec .JobID , resolved ,
2279+ )
2280+ if err := pts .Protect (ctx , ptr ); err != nil {
2281+ return err
2282+ }
2283+ prevRecordId := ptsEntries .SystemTables
2284+ if err := pts .Release (ctx , prevRecordId ); err != nil {
2285+ return err
2286+ }
2287+ ptsEntries .SystemTables = ptr .ID .GetUUID ()
2288+ log .Eventf (ctx , "created new system tables pts record %v to replace old pts record %v at %v" ,
2289+ ptsEntries .SystemTables , prevRecordId , resolved )
2290+ return writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2291+ }
2292+
22102293func (cf * changeFrontier ) maybeEmitResolved (ctx context.Context , newResolved hlc.Timestamp ) error {
22112294 if cf .freqEmitResolved == emitNoResolved || newResolved .IsEmpty () {
22122295 return nil
0 commit comments