@@ -102,6 +102,12 @@ var WaitForInitialVersion = settings.RegisterBoolSetting(settings.ApplicationLev
102102 "enables waiting for the initial version of a descriptor" ,
103103 true )
104104
105+ var LockedLeaseTimestamp = settings .RegisterBoolSetting (settings .ApplicationLevel ,
106+ "sql.catalog.descriptor_lease.use_locked_timestamps.enabled" ,
107+ "guarantees transactional version consistency for descriptors used by the lease manager," +
108+ "descriptors used can be intentionally older to support this" ,
109+ false )
110+
105111// WaitForNoVersion returns once there are no unexpired leases left
106112// for any version of the descriptor.
107113func (m * Manager ) WaitForNoVersion (
@@ -1265,7 +1271,7 @@ func (m *Manager) purgeOldVersions(
12651271 // Acquire a refcount on the descriptor on the latest version to maintain an
12661272 // active lease, so that it doesn't get released when removeInactives()
12671273 // is called below. Release this lease after calling removeInactives().
1268- desc , _ , err = t .findForTimestamp (ctx , m .storage .clock .Now ())
1274+ desc , _ , err = t .findForTimestamp (ctx , TimestampToReadTimestamp ( m .storage .clock .Now () ))
12691275 if err == nil || ! errors .Is (err , errRenewLease ) {
12701276 break
12711277 }
@@ -1600,7 +1606,7 @@ func (m *Manager) SetRegionPrefix(val []byte) {
16001606// id and fails because the id has been dropped by the TRUNCATE.
16011607func (m * Manager ) AcquireByName (
16021608 ctx context.Context ,
1603- timestamp hlc. Timestamp ,
1609+ timestamp ReadTimestamp ,
16041610 parentID descpb.ID ,
16051611 parentSchemaID descpb.ID ,
16061612 name string ,
@@ -1622,9 +1628,9 @@ func (m *Manager) AcquireByName(
16221628 return desc , nil
16231629 }
16241630 // Check if we have cached an ID for this name.
1625- descVersion , _ := m .names .get (ctx , parentID , parentSchemaID , name , timestamp )
1631+ descVersion , _ := m .names .get (ctx , parentID , parentSchemaID , name , timestamp . GetTimestamp () )
16261632 if descVersion != nil {
1627- if descVersion .GetModificationTime ().LessEq (timestamp ) {
1633+ if descVersion .GetModificationTime ().LessEq (timestamp . GetTimestamp () ) {
16281634 return validateDescriptorForReturn (descVersion )
16291635 }
16301636 // m.names.get() incremented the refcount, we decrement it to get a new
@@ -1643,7 +1649,7 @@ func (m *Manager) AcquireByName(
16431649 // lease with at least a bit of lifetime left in it. So, we do it the hard
16441650 // way: look in the database to resolve the name, then acquire a new lease.
16451651 var err error
1646- id , err := m .resolveName (ctx , timestamp , parentID , parentSchemaID , name )
1652+ id , err := m .resolveName (ctx , timestamp . GetTimestamp () , parentID , parentSchemaID , name )
16471653 if err != nil {
16481654 return nil , err
16491655 }
@@ -1777,7 +1783,7 @@ type LeasedDescriptor interface {
17771783// can only return an older version of a descriptor if the latest version
17781784// can be leased; as it stands a dropped descriptor cannot be leased.
17791785func (m * Manager ) Acquire (
1780- ctx context.Context , timestamp hlc. Timestamp , id descpb.ID ,
1786+ ctx context.Context , timestamp ReadTimestamp , id descpb.ID ,
17811787) (LeasedDescriptor , error ) {
17821788 for {
17831789 if m .IsDraining () {
@@ -1802,7 +1808,7 @@ func (m *Manager) Acquire(
18021808
18031809 case errors .Is (err , errReadOlderVersion ):
18041810 // Read old versions from the store. This can block while reading.
1805- versions , errRead := m .readOlderVersionForTimestamp (ctx , id , timestamp )
1811+ versions , errRead := m .readOlderVersionForTimestamp (ctx , id , timestamp . GetTimestamp () )
18061812 if errRead != nil {
18071813 return nil , errRead
18081814 }
@@ -2135,6 +2141,9 @@ func (m *Manager) watchForUpdates(ctx context.Context) {
21352141 }
21362142
21372143 handleCheckpoint := func (ctx context.Context , checkpoint * kvpb.RangeFeedCheckpoint ) {
2144+ if m .testingKnobs .TestingOnRangeFeedCheckPoint != nil {
2145+ m .testingKnobs .TestingOnRangeFeedCheckPoint ()
2146+ }
21382147 // Track checkpoints that occur from the rangefeed to make sure progress
21392148 // is always made.
21402149 m .mu .Lock ()
@@ -2908,6 +2917,23 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
29082917 instanceID , releasedCount .Load (), totalLeases )
29092918}
29102919
2920+ // GetReadTimestamp returns a locked timestamp to use for lease management.
2921+ func (m * Manager ) GetReadTimestamp (timestamp hlc.Timestamp ) ReadTimestamp {
2922+ if LockedLeaseTimestamp .Get (& m .settings .SV ) {
2923+ replicationTS := m .GetSafeReplicationTS ()
2924+ if ! replicationTS .IsEmpty () && replicationTS .Less (timestamp ) {
2925+ return LeaseTimestamp {
2926+ ReadTimestamp : timestamp ,
2927+ LeaseTimestamp : replicationTS ,
2928+ }
2929+ }
2930+ }
2931+ // Fallback to existing behavior with timestamps.
2932+ return LeaseTimestamp {
2933+ ReadTimestamp : timestamp ,
2934+ }
2935+ }
2936+
29112937// TestingGetBoundAccount returns the bound account used by the lease manager.
29122938func (m * Manager ) TestingGetBoundAccount () * mon.ConcurrentBoundAccount {
29132939 return m .boundAccount
0 commit comments