Skip to content

Commit 398ae39

Browse files
authored
Merge pull request #156545 from aerfrei/blathers/backport-release-25.4-156190
release-25.4: changefeedccl: update timestamp on event descriptor cache hits
2 parents 617bcf8 + ade467f commit 398ae39

File tree

6 files changed

+249
-13
lines changed

6 files changed

+249
-13
lines changed

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -521,9 +521,14 @@ func getEventDescriptorCached(
521521
idVer := CacheKey{ID: desc.GetID(), Version: desc.GetVersion(), FamilyID: family.ID}
522522

523523
if v, ok := cache.Get(idVer); ok {
524-
ed := v.(*EventDescriptor)
525-
if catalog.UserDefinedTypeColsHaveSameVersion(ed.td, desc) {
526-
return ed, nil
524+
cached := v.(*EventDescriptor)
525+
if catalog.UserDefinedTypeColsHaveSameVersion(cached.td, desc) {
526+
// Make a shallow copy to avoid modifying the cached value. The cached
527+
// EventDescriptor is shared across changefeed operations and may be
528+
// referenced concurrently.
529+
ed := *cached
530+
ed.SchemaTS = schemaTS
531+
return &ed, nil
527532
}
528533
}
529534

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6785,7 +6785,7 @@ func TestChangefeedDataTTL(t *testing.T) {
67856785

67866786
// Force a GC of the table. This should cause both
67876787
// versions of the table to be deleted.
6788-
forceTableGC(t, s.SystemServer, sqlDB, "d", "foo")
6788+
forceTableGC(t, s.SystemServer, "d", "foo")
67896789

67906790
// Resume our changefeed normally.
67916791
atomic.StoreInt32(&shouldWait, 0)
@@ -6842,7 +6842,7 @@ func TestChangefeedOutdatedCursor(t *testing.T) {
68426842
sqlDB.Exec(t, `CREATE TABLE f (a INT PRIMARY KEY)`)
68436843
outdatedTS := s.Server.Clock().Now().AsOfSystemTime()
68446844
sqlDB.Exec(t, `INSERT INTO f VALUES (1)`)
6845-
forceTableGC(t, s.SystemServer, sqlDB, "system", "descriptor")
6845+
forceTableGC(t, s.SystemServer, "system", "descriptor")
68466846
createChangefeed :=
68476847
fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE f with cursor = '%s'`, outdatedTS)
68486848
expectedErrorSubstring :=
@@ -6968,7 +6968,7 @@ func TestChangefeedSchemaTTL(t *testing.T) {
69686968

69696969
// Force a GC of the table. This should cause both older versions of the
69706970
// table to be deleted, with the middle version being lost to the changefeed.
6971-
forceTableGC(t, s.SystemServer, sqlDB, "system", "descriptor")
6971+
forceTableGC(t, s.SystemServer, "system", "descriptor")
69726972

69736973
// Do an unnecessary version bump on the descriptor, which will purge old
69746974
// versions of the descriptor that the lease manager may have cached for

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,14 +1720,21 @@ func maybeUseExternalConnection(
17201720
}
17211721
}
17221722

1723-
func forceTableGC(
1723+
func forceTableGC(t testing.TB, tsi serverutils.TestServerInterface, database, table string) {
1724+
t.Helper()
1725+
if err := tsi.ForceTableGC(context.Background(), database, table, tsi.Clock().Now()); err != nil {
1726+
t.Fatal(err)
1727+
}
1728+
}
1729+
1730+
func forceTableGCAtTimestamp(
17241731
t testing.TB,
17251732
tsi serverutils.TestServerInterface,
1726-
sqlDB *sqlutils.SQLRunner,
17271733
database, table string,
1734+
timestamp hlc.Timestamp,
17281735
) {
17291736
t.Helper()
1730-
if err := tsi.ForceTableGC(context.Background(), database, table, tsi.Clock().Now()); err != nil {
1737+
if err := tsi.ForceTableGC(context.Background(), database, table, timestamp); err != nil {
17311738
t.Fatal(err)
17321739
}
17331740
}
@@ -2002,12 +2009,12 @@ func runWithAndWithoutRegression141453(
20022009
var blockPop atomic.Bool
20032010
popCh := make(chan struct{})
20042011
return kvevent.BlockingBufferTestingKnobs{
2005-
BeforeAdd: func(ctx context.Context, e kvevent.Event) (context.Context, kvevent.Event) {
2012+
BeforeAdd: func(ctx context.Context, e kvevent.Event) (_ context.Context, _ kvevent.Event, shouldAdd bool) {
20062013
if e.Type() == kvevent.TypeResolved &&
20072014
e.Resolved().BoundaryType == jobspb.ResolvedSpan_RESTART {
20082015
blockPop.Store(true)
20092016
}
2010-
return ctx, e
2017+
return ctx, e, true
20112018
},
20122019
BeforePop: func() {
20132020
if blockPop.Load() {

pkg/ccl/changefeedccl/kvevent/blocking_buffer.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,11 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo
279279
// Add implements Writer interface.
280280
func (b *blockingBuffer) Add(ctx context.Context, e Event) error {
281281
if b.knobs.BeforeAdd != nil {
282-
ctx, e = b.knobs.BeforeAdd(ctx, e)
282+
var shouldAdd bool
283+
ctx, e, shouldAdd = b.knobs.BeforeAdd(ctx, e)
284+
if !shouldAdd {
285+
return nil
286+
}
283287
}
284288

285289
if log.V(2) {

pkg/ccl/changefeedccl/kvevent/testing_knobs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import "context"
99

1010
// BlockingBufferTestingKnobs are testing knobs for blocking buffers.
1111
type BlockingBufferTestingKnobs struct {
12-
BeforeAdd func(ctx context.Context, e Event) (context.Context, Event)
12+
// BeforeAdd is called before adding a KV event to the buffer. If the function
13+
// returns false in the third return value, the event is skipped.
14+
BeforeAdd func(ctx context.Context, e Event) (_ context.Context, _ Event, shouldAdd bool)
1315
BeforePop func()
1416
BeforeDrain func(ctx context.Context) context.Context
1517
AfterDrain func(err error)

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
1919
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
20+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
2021
"github.com/cockroachdb/cockroach/pkg/jobs"
2122
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2223
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -1206,6 +1207,223 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
12061207
cdcTest(t, testFn, feedTestEnterpriseSinks)
12071208
}
12081209

1210+
// TestCachedEventDescriptorGivesUpdatedTimestamp is a regression test for
1211+
// #156091. It tests that when a changefeed with a cdc query receives events
1212+
// from the KVFeed out of order, even across table descriptor versions, the
1213+
// query will be replanned at a timestamp we know has not been garbage collected.
1214+
// Previously, we would get an old timestamp from the cached event descriptor
1215+
// and if the db descriptor version had changed and been GC'd, this replan would
1216+
// fail, failing the changefeed.
1217+
func TestCachedEventDescriptorGivesUpdatedTimestamp(t *testing.T) {
1218+
defer leaktest.AfterTest(t)()
1219+
defer log.Scope(t).Close(t)
1220+
1221+
testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
1222+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1223+
1224+
// Making sure there's only one worker so that there is a single event
1225+
// descriptor cache. This means that the later events will use the
1226+
// cached event descriptor and its outdated timestamp, to properly
1227+
// reproduce the issue.
1228+
changefeedbase.EventConsumerWorkers.Override(
1229+
context.Background(), &s.Server.ClusterSettings().SV, 1)
1230+
1231+
/*
1232+
The situation from issue #156091 that we are trying to reproduce here
1233+
happens when a changefeed is replanning a CDC query for a table descriptor
1234+
version it has seen before. In that case, it replans the query at the
1235+
timestamp (stored in the cache) of the first event it saw for that table
1236+
descriptor version.
1237+
1238+
That's problematic because that could be well before the highwater for
1239+
the feed and therefore not protected by PTS. Even though our protected
1240+
timestamp system ensures that the relevant table descriptor version has
1241+
not been GC'd (since it is still used by the event we're processing),
1242+
there is no such guarantee that the *DB* descriptor version from that
1243+
time (which wasn't around at the time of that event) has not been GC'd.
1244+
If we try to fetch the DB descriptor version (as we do when replanning
1245+
a CDC query) and it has already been GC'd, the CDC query replan will
1246+
fail, and the changefeed with it.
1247+
1248+
So, in order to reproduce this we require that
1249+
a) KV events must come out of order so that we are both doing a CDC
1250+
query replan AND that the timestamp for that replan comes from the
1251+
EventDescriptor cache.
1252+
b) the DB descriptor version has changed between the event that seeded
1253+
the cache and the later event that shares that table descriptor version
1254+
and finally
1255+
c) that the old DB descriptor version has been garbage collected.
1256+
1257+
Ultimately the series of events will be
1258+
1. We see event 1 with table descriptor version 1 and DB descriptor
1259+
version 1. This is processed by the changefeeed seeding the event
1260+
descriptor cache at T_0.
1261+
2. We update the DB descriptor version to version 2.
1262+
3. We garbage collect the descriptor table through time T_0 and with it
1263+
DB descriptor version 1.
1264+
4. We make an update causing event 2 with the same table descriptor version
1265+
as event 1 (table descriptor version 1) but whose kv event will only
1266+
come through after event 3's.
1267+
5. We update the table descriptor version (to version 2) and make an
1268+
update (event 3).
1269+
6. The KV event for event 3 comes in first, causing the changefeed's
1270+
current table version to be table version 2.
1271+
7. The KV event for event 2 comes in out of order causing a replan of
1272+
the CDC query to happen (back to table descriptor version 1).
1273+
1274+
If we return the timestamp from the first event we saw on table descriptor
1275+
version 1, this replan will try to fetch the DB descriptor at time T_0
1276+
(when event 1 happened) which has been garbage collected and would fail
1277+
the feed.
1278+
*/
1279+
var dbDescTS atomic.Value
1280+
dbDescTS.Store(hlc.Timestamp{})
1281+
var hasGCdDBDesc atomic.Bool
1282+
var kvEvents []kvevent.Event
1283+
var hasProcessedAllEvents atomic.Bool
1284+
beforeAddKnob := func(ctx context.Context, e kvevent.Event) (_ context.Context, _ kvevent.Event, shouldAdd bool) {
1285+
// Since we are going to be ignoring some KV events, we don't send
1286+
// resolved events to avoid violating changefeed guarantees.
1287+
// Since this test also depends on specific GC behavior, we handle GC
1288+
// ourselves.
1289+
if e.Type() == kvevent.TypeResolved {
1290+
resolvedTimestamp := e.Timestamp()
1291+
1292+
// We need to wait for the resolved timestamp to move past the
1293+
// first kv event so that we know it's safe to GC the first database
1294+
// descriptor version.
1295+
if !hasGCdDBDesc.Load() {
1296+
dbDescTSVal := dbDescTS.Load().(hlc.Timestamp)
1297+
if !dbDescTSVal.IsEmpty() && dbDescTSVal.Less(resolvedTimestamp) {
1298+
t.Logf("GCing database descriptor table at timestamp: %s", dbDescTSVal)
1299+
forceTableGCAtTimestamp(t, s.SystemServer, "system", "descriptor", dbDescTSVal)
1300+
hasGCdDBDesc.Store(true)
1301+
}
1302+
}
1303+
1304+
// We use the resolved events to know when we can stop the test.
1305+
if len(kvEvents) > 2 && resolvedTimestamp.After(kvEvents[2].Timestamp()) {
1306+
hasProcessedAllEvents.Store(true)
1307+
}
1308+
1309+
// Do not send any of the resolved events.
1310+
return ctx, e, false
1311+
}
1312+
1313+
if e.Type() == kvevent.TypeKV {
1314+
if len(kvEvents) > 0 && e.Timestamp() == kvEvents[0].Timestamp() {
1315+
// Ignore duplicates of the first kv event which may come while
1316+
// we're waiting to GC the first database descriptor version.
1317+
return ctx, e, false
1318+
}
1319+
1320+
kvEvents = append(kvEvents, e)
1321+
switch len(kvEvents) {
1322+
case 1:
1323+
// Event 1 is sent as normal to seed the event descriptor cache.
1324+
t.Logf("Event 1 timestamp: %s", kvEvents[0].Timestamp())
1325+
return ctx, kvEvents[0], true
1326+
case 2:
1327+
// Event 2 is stored in kvEvents and we will send it later.
1328+
// Sending it after event 3, which has a different table
1329+
// descriptor version, will cause CDC query replan.
1330+
return ctx, e, false
1331+
case 3:
1332+
// Event 3 is sent as normal to replan the CDC query with the
1333+
// new table descriptor version.
1334+
t.Logf("Event 3 timestamp: %s", kvEvents[2].Timestamp())
1335+
return ctx, kvEvents[2], true
1336+
case 4:
1337+
// Now we send event 2 *after* we've sent event 3. This should
1338+
// cause a CDC query replan with a cached event descriptor,
1339+
// since the table version is the same as event 1. If we use
1340+
// the timestamp of event 1, that replan will fail to fetch
1341+
// the GC'd DB descriptor version failing the changefeed.
1342+
// This is what we saw in issue #156091.
1343+
t.Logf("Event 2 timestamp: %s", kvEvents[1].Timestamp())
1344+
return ctx, kvEvents[1], true
1345+
default:
1346+
// We do not need to send any more events after events
1347+
// 1, 2 and 3 have been processed.
1348+
return ctx, e, false
1349+
}
1350+
}
1351+
1352+
return ctx, e, true
1353+
}
1354+
1355+
knobs := s.TestingKnobs.
1356+
DistSQL.(*execinfra.TestingKnobs).
1357+
Changefeed.(*TestingKnobs)
1358+
1359+
knobs.MakeKVFeedToAggregatorBufferKnobs = func() kvevent.BlockingBufferTestingKnobs {
1360+
return kvevent.BlockingBufferTestingKnobs{
1361+
BeforeAdd: beforeAddKnob,
1362+
}
1363+
}
1364+
1365+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
1366+
foo := feed(t, f, `CREATE CHANGEFEED WITH resolved = '10ms' AS SELECT * FROM foo`)
1367+
defer closeFeed(t, foo)
1368+
1369+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
1370+
1371+
// Change the database descriptor version by granting permission to a user.
1372+
sqlDB.Exec(t, `CREATE USER testuser`)
1373+
sqlDB.Exec(t, `GRANT CREATE ON DATABASE d TO testuser`)
1374+
1375+
// Fetch the cluster logical timestamp so that we can make sure the
1376+
// resolved timestamp has moved past it and it's safe to GC the
1377+
// descriptor table (specifically the first database descriptor version).
1378+
var dbDescTSString string
1379+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&dbDescTSString)
1380+
dbDescTSParsed, err := hlc.ParseHLC(dbDescTSString)
1381+
require.NoError(t, err)
1382+
dbDescTS.Store(dbDescTSParsed)
1383+
t.Logf("Timestamp after DB descriptor version change: %s", dbDescTSParsed)
1384+
1385+
testutils.SucceedsSoon(t, func() error {
1386+
if !hasGCdDBDesc.Load() {
1387+
return errors.New("database descriptor table not GCed")
1388+
}
1389+
return nil
1390+
})
1391+
1392+
// This event will be delayed by the KVFeed until after event 3 has been sent.
1393+
// Instead of sending it out, we GC the descriptor table including the
1394+
// old database descriptor version.
1395+
sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`)
1396+
1397+
// Change the table descriptor version by granting permission to a user.
1398+
sqlDB.Exec(t, `GRANT CREATE ON TABLE foo TO testuser`)
1399+
1400+
sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`)
1401+
1402+
// Since we skip processing the KV event for event 2, we will replace
1403+
// the KV event for event 4 the stored one for event 2. This event is
1404+
// not itself relevant to the test, but helps us send the KV events out
1405+
// of order.
1406+
sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`)
1407+
1408+
// Wait for changefeed events 1, 2 and 3 to be processed. If the feed
1409+
// has failed, stop waiting and fail the test immediately.
1410+
testutils.SucceedsSoon(t, func() error {
1411+
var errorStr string
1412+
sqlDB.QueryRow(t, `SELECT error FROM [SHOW CHANGEFEED JOBS] WHERE job_id = $1`, foo.(cdctest.EnterpriseTestFeed).JobID()).Scan(&errorStr)
1413+
if errorStr != "" {
1414+
t.Fatalf("changefeed error: %s", errorStr)
1415+
return nil
1416+
}
1417+
if !hasProcessedAllEvents.Load() {
1418+
return errors.New("events not processed")
1419+
}
1420+
return nil
1421+
})
1422+
}
1423+
1424+
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
1425+
}
1426+
12091427
func fetchRoleMembers(
12101428
ctx context.Context, execCfg *sql.ExecutorConfig, ts hlc.Timestamp,
12111429
) ([][]string, error) {

0 commit comments

Comments
 (0)