Skip to content

Commit ade467f

Browse files
committed
changefeedccl: update timestamp on event descriptor cache hits
Before, in some cases we could replan a CDC query with an old timestamp from the event descriptor cache, which would try to read the database descriptor at an old potentially garbage collected timestamp. Now, we make sure the event descriptor cache always gives events descriptors with an up to date timestamp. Epic: none Fixes: #156091 Release note (bug fix): A bug where changefeeds using CDC queries could sometimes unexpectedly fail after a schema change with a descriptor retrieval error has been fixed.
1 parent f3ad3f9 commit ade467f

File tree

6 files changed

+249
-13
lines changed

6 files changed

+249
-13
lines changed

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -521,9 +521,14 @@ func getEventDescriptorCached(
521521
idVer := CacheKey{ID: desc.GetID(), Version: desc.GetVersion(), FamilyID: family.ID}
522522

523523
if v, ok := cache.Get(idVer); ok {
524-
ed := v.(*EventDescriptor)
525-
if catalog.UserDefinedTypeColsHaveSameVersion(ed.td, desc) {
526-
return ed, nil
524+
cached := v.(*EventDescriptor)
525+
if catalog.UserDefinedTypeColsHaveSameVersion(cached.td, desc) {
526+
// Make a shallow copy to avoid modifying the cached value. The cached
527+
// EventDescriptor is shared across changefeed operations and may be
528+
// referenced concurrently.
529+
ed := *cached
530+
ed.SchemaTS = schemaTS
531+
return &ed, nil
527532
}
528533
}
529534

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6785,7 +6785,7 @@ func TestChangefeedDataTTL(t *testing.T) {
67856785

67866786
// Force a GC of the table. This should cause both
67876787
// versions of the table to be deleted.
6788-
forceTableGC(t, s.SystemServer, sqlDB, "d", "foo")
6788+
forceTableGC(t, s.SystemServer, "d", "foo")
67896789

67906790
// Resume our changefeed normally.
67916791
atomic.StoreInt32(&shouldWait, 0)
@@ -6842,7 +6842,7 @@ func TestChangefeedOutdatedCursor(t *testing.T) {
68426842
sqlDB.Exec(t, `CREATE TABLE f (a INT PRIMARY KEY)`)
68436843
outdatedTS := s.Server.Clock().Now().AsOfSystemTime()
68446844
sqlDB.Exec(t, `INSERT INTO f VALUES (1)`)
6845-
forceTableGC(t, s.SystemServer, sqlDB, "system", "descriptor")
6845+
forceTableGC(t, s.SystemServer, "system", "descriptor")
68466846
createChangefeed :=
68476847
fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE f with cursor = '%s'`, outdatedTS)
68486848
expectedErrorSubstring :=
@@ -6968,7 +6968,7 @@ func TestChangefeedSchemaTTL(t *testing.T) {
69686968

69696969
// Force a GC of the table. This should cause both older versions of the
69706970
// table to be deleted, with the middle version being lost to the changefeed.
6971-
forceTableGC(t, s.SystemServer, sqlDB, "system", "descriptor")
6971+
forceTableGC(t, s.SystemServer, "system", "descriptor")
69726972

69736973
// Do an unnecessary version bump on the descriptor, which will purge old
69746974
// versions of the descriptor that the lease manager may have cached for

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,14 +1720,21 @@ func maybeUseExternalConnection(
17201720
}
17211721
}
17221722

1723-
func forceTableGC(
1723+
func forceTableGC(t testing.TB, tsi serverutils.TestServerInterface, database, table string) {
1724+
t.Helper()
1725+
if err := tsi.ForceTableGC(context.Background(), database, table, tsi.Clock().Now()); err != nil {
1726+
t.Fatal(err)
1727+
}
1728+
}
1729+
1730+
func forceTableGCAtTimestamp(
17241731
t testing.TB,
17251732
tsi serverutils.TestServerInterface,
1726-
sqlDB *sqlutils.SQLRunner,
17271733
database, table string,
1734+
timestamp hlc.Timestamp,
17281735
) {
17291736
t.Helper()
1730-
if err := tsi.ForceTableGC(context.Background(), database, table, tsi.Clock().Now()); err != nil {
1737+
if err := tsi.ForceTableGC(context.Background(), database, table, timestamp); err != nil {
17311738
t.Fatal(err)
17321739
}
17331740
}
@@ -2002,12 +2009,12 @@ func runWithAndWithoutRegression141453(
20022009
var blockPop atomic.Bool
20032010
popCh := make(chan struct{})
20042011
return kvevent.BlockingBufferTestingKnobs{
2005-
BeforeAdd: func(ctx context.Context, e kvevent.Event) (context.Context, kvevent.Event) {
2012+
BeforeAdd: func(ctx context.Context, e kvevent.Event) (_ context.Context, _ kvevent.Event, shouldAdd bool) {
20062013
if e.Type() == kvevent.TypeResolved &&
20072014
e.Resolved().BoundaryType == jobspb.ResolvedSpan_RESTART {
20082015
blockPop.Store(true)
20092016
}
2010-
return ctx, e
2017+
return ctx, e, true
20112018
},
20122019
BeforePop: func() {
20132020
if blockPop.Load() {

pkg/ccl/changefeedccl/kvevent/blocking_buffer.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,11 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo
279279
// Add implements Writer interface.
280280
func (b *blockingBuffer) Add(ctx context.Context, e Event) error {
281281
if b.knobs.BeforeAdd != nil {
282-
ctx, e = b.knobs.BeforeAdd(ctx, e)
282+
var shouldAdd bool
283+
ctx, e, shouldAdd = b.knobs.BeforeAdd(ctx, e)
284+
if !shouldAdd {
285+
return nil
286+
}
283287
}
284288

285289
if log.V(2) {

pkg/ccl/changefeedccl/kvevent/testing_knobs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import "context"
99

1010
// BlockingBufferTestingKnobs are testing knobs for blocking buffers.
1111
type BlockingBufferTestingKnobs struct {
12-
BeforeAdd func(ctx context.Context, e Event) (context.Context, Event)
12+
// BeforeAdd is called before adding a KV event to the buffer. If the function
13+
// returns false in the third return value, the event is skipped.
14+
BeforeAdd func(ctx context.Context, e Event) (_ context.Context, _ Event, shouldAdd bool)
1315
BeforePop func()
1416
BeforeDrain func(ctx context.Context) context.Context
1517
AfterDrain func(err error)

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
1919
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
20+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
2021
"github.com/cockroachdb/cockroach/pkg/jobs"
2122
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2223
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -1206,6 +1207,223 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
12061207
cdcTest(t, testFn, feedTestEnterpriseSinks)
12071208
}
12081209

1210+
// TestCachedEventDescriptorGivesUpdatedTimestamp is a regression test for
1211+
// #156091. It tests that when a changefeed with a cdc query receives events
1212+
// from the KVFeed out of order, even across table descriptor versions, the
1213+
// query will be replanned at a timestamp we know has not been garbage collected.
1214+
// Previously, we would get an old timestamp from the cached event descriptor
1215+
// and if the db descriptor version had changed and been GC'd, this replan would
1216+
// fail, failing the changefeed.
1217+
func TestCachedEventDescriptorGivesUpdatedTimestamp(t *testing.T) {
1218+
defer leaktest.AfterTest(t)()
1219+
defer log.Scope(t).Close(t)
1220+
1221+
testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
1222+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1223+
1224+
// Making sure there's only one worker so that there is a single event
1225+
// descriptor cache. This means that the later events will use the
1226+
// cached event descriptor and its outdated timestamp, to properly
1227+
// reproduce the issue.
1228+
changefeedbase.EventConsumerWorkers.Override(
1229+
context.Background(), &s.Server.ClusterSettings().SV, 1)
1230+
1231+
/*
1232+
The situation from issue #156091 that we are trying to reproduce here
1233+
happens when a changefeed is replanning a CDC query for a table descriptor
1234+
version it has seen before. In that case, it replans the query at the
1235+
timestamp (stored in the cache) of the first event it saw for that table
1236+
descriptor version.
1237+
1238+
That's problematic because that could be well before the highwater for
1239+
the feed and therefore not protected by PTS. Even though our protected
1240+
timestamp system ensures that the relevant table descriptor version has
1241+
not been GC'd (since it is still used by the event we're processing),
1242+
there is no such guarantee that the *DB* descriptor version from that
1243+
time (which wasn't around at the time of that event) has not been GC'd.
1244+
If we try to fetch the DB descriptor version (as we do when replanning
1245+
a CDC query) and it has already been GC'd, the CDC query replan will
1246+
fail, and the changefeed with it.
1247+
1248+
So, in order to reproduce this we require that
1249+
a) KV events must come out of order so that we are both doing a CDC
1250+
query replan AND that the timestamp for that replan comes from the
1251+
EventDescriptor cache.
1252+
b) the DB descriptor version has changed between the event that seeded
1253+
the cache and the later event that shares that table descriptor version
1254+
and finally
1255+
c) that the old DB descriptor version has been garbage collected.
1256+
1257+
Ultimately the series of events will be
1258+
1. We see event 1 with table descriptor version 1 and DB descriptor
1259+
version 1. This is processed by the changefeeed seeding the event
1260+
descriptor cache at T_0.
1261+
2. We update the DB descriptor version to version 2.
1262+
3. We garbage collect the descriptor table through time T_0 and with it
1263+
DB descriptor version 1.
1264+
4. We make an update causing event 2 with the same table descriptor version
1265+
as event 1 (table descriptor version 1) but whose kv event will only
1266+
come through after event 3's.
1267+
5. We update the table descriptor version (to version 2) and make an
1268+
update (event 3).
1269+
6. The KV event for event 3 comes in first, causing the changefeed's
1270+
current table version to be table version 2.
1271+
7. The KV event for event 2 comes in out of order causing a replan of
1272+
the CDC query to happen (back to table descriptor version 1).
1273+
1274+
If we return the timestamp from the first event we saw on table descriptor
1275+
version 1, this replan will try to fetch the DB descriptor at time T_0
1276+
(when event 1 happened) which has been garbage collected and would fail
1277+
the feed.
1278+
*/
1279+
var dbDescTS atomic.Value
1280+
dbDescTS.Store(hlc.Timestamp{})
1281+
var hasGCdDBDesc atomic.Bool
1282+
var kvEvents []kvevent.Event
1283+
var hasProcessedAllEvents atomic.Bool
1284+
beforeAddKnob := func(ctx context.Context, e kvevent.Event) (_ context.Context, _ kvevent.Event, shouldAdd bool) {
1285+
// Since we are going to be ignoring some KV events, we don't send
1286+
// resolved events to avoid violating changefeed guarantees.
1287+
// Since this test also depends on specific GC behavior, we handle GC
1288+
// ourselves.
1289+
if e.Type() == kvevent.TypeResolved {
1290+
resolvedTimestamp := e.Timestamp()
1291+
1292+
// We need to wait for the resolved timestamp to move past the
1293+
// first kv event so that we know it's safe to GC the first database
1294+
// descriptor version.
1295+
if !hasGCdDBDesc.Load() {
1296+
dbDescTSVal := dbDescTS.Load().(hlc.Timestamp)
1297+
if !dbDescTSVal.IsEmpty() && dbDescTSVal.Less(resolvedTimestamp) {
1298+
t.Logf("GCing database descriptor table at timestamp: %s", dbDescTSVal)
1299+
forceTableGCAtTimestamp(t, s.SystemServer, "system", "descriptor", dbDescTSVal)
1300+
hasGCdDBDesc.Store(true)
1301+
}
1302+
}
1303+
1304+
// We use the resolved events to know when we can stop the test.
1305+
if len(kvEvents) > 2 && resolvedTimestamp.After(kvEvents[2].Timestamp()) {
1306+
hasProcessedAllEvents.Store(true)
1307+
}
1308+
1309+
// Do not send any of the resolved events.
1310+
return ctx, e, false
1311+
}
1312+
1313+
if e.Type() == kvevent.TypeKV {
1314+
if len(kvEvents) > 0 && e.Timestamp() == kvEvents[0].Timestamp() {
1315+
// Ignore duplicates of the first kv event which may come while
1316+
// we're waiting to GC the first database descriptor version.
1317+
return ctx, e, false
1318+
}
1319+
1320+
kvEvents = append(kvEvents, e)
1321+
switch len(kvEvents) {
1322+
case 1:
1323+
// Event 1 is sent as normal to seed the event descriptor cache.
1324+
t.Logf("Event 1 timestamp: %s", kvEvents[0].Timestamp())
1325+
return ctx, kvEvents[0], true
1326+
case 2:
1327+
// Event 2 is stored in kvEvents and we will send it later.
1328+
// Sending it after event 3, which has a different table
1329+
// descriptor version, will cause CDC query replan.
1330+
return ctx, e, false
1331+
case 3:
1332+
// Event 3 is sent as normal to replan the CDC query with the
1333+
// new table descriptor version.
1334+
t.Logf("Event 3 timestamp: %s", kvEvents[2].Timestamp())
1335+
return ctx, kvEvents[2], true
1336+
case 4:
1337+
// Now we send event 2 *after* we've sent event 3. This should
1338+
// cause a CDC query replan with a cached event descriptor,
1339+
// since the table version is the same as event 1. If we use
1340+
// the timestamp of event 1, that replan will fail to fetch
1341+
// the GC'd DB descriptor version failing the changefeed.
1342+
// This is what we saw in issue #156091.
1343+
t.Logf("Event 2 timestamp: %s", kvEvents[1].Timestamp())
1344+
return ctx, kvEvents[1], true
1345+
default:
1346+
// We do not need to send any more events after events
1347+
// 1, 2 and 3 have been processed.
1348+
return ctx, e, false
1349+
}
1350+
}
1351+
1352+
return ctx, e, true
1353+
}
1354+
1355+
knobs := s.TestingKnobs.
1356+
DistSQL.(*execinfra.TestingKnobs).
1357+
Changefeed.(*TestingKnobs)
1358+
1359+
knobs.MakeKVFeedToAggregatorBufferKnobs = func() kvevent.BlockingBufferTestingKnobs {
1360+
return kvevent.BlockingBufferTestingKnobs{
1361+
BeforeAdd: beforeAddKnob,
1362+
}
1363+
}
1364+
1365+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
1366+
foo := feed(t, f, `CREATE CHANGEFEED WITH resolved = '10ms' AS SELECT * FROM foo`)
1367+
defer closeFeed(t, foo)
1368+
1369+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
1370+
1371+
// Change the database descriptor version by granting permission to a user.
1372+
sqlDB.Exec(t, `CREATE USER testuser`)
1373+
sqlDB.Exec(t, `GRANT CREATE ON DATABASE d TO testuser`)
1374+
1375+
// Fetch the cluster logical timestamp so that we can make sure the
1376+
// resolved timestamp has moved past it and it's safe to GC the
1377+
// descriptor table (specifically the first database descriptor version).
1378+
var dbDescTSString string
1379+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&dbDescTSString)
1380+
dbDescTSParsed, err := hlc.ParseHLC(dbDescTSString)
1381+
require.NoError(t, err)
1382+
dbDescTS.Store(dbDescTSParsed)
1383+
t.Logf("Timestamp after DB descriptor version change: %s", dbDescTSParsed)
1384+
1385+
testutils.SucceedsSoon(t, func() error {
1386+
if !hasGCdDBDesc.Load() {
1387+
return errors.New("database descriptor table not GCed")
1388+
}
1389+
return nil
1390+
})
1391+
1392+
// This event will be delayed by the KVFeed until after event 3 has been sent.
1393+
// Instead of sending it out, we GC the descriptor table including the
1394+
// old database descriptor version.
1395+
sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`)
1396+
1397+
// Change the table descriptor version by granting permission to a user.
1398+
sqlDB.Exec(t, `GRANT CREATE ON TABLE foo TO testuser`)
1399+
1400+
sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`)
1401+
1402+
// Since we skip processing the KV event for event 2, we will replace
1403+
// the KV event for event 4 the stored one for event 2. This event is
1404+
// not itself relevant to the test, but helps us send the KV events out
1405+
// of order.
1406+
sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`)
1407+
1408+
// Wait for changefeed events 1, 2 and 3 to be processed. If the feed
1409+
// has failed, stop waiting and fail the test immediately.
1410+
testutils.SucceedsSoon(t, func() error {
1411+
var errorStr string
1412+
sqlDB.QueryRow(t, `SELECT error FROM [SHOW CHANGEFEED JOBS] WHERE job_id = $1`, foo.(cdctest.EnterpriseTestFeed).JobID()).Scan(&errorStr)
1413+
if errorStr != "" {
1414+
t.Fatalf("changefeed error: %s", errorStr)
1415+
return nil
1416+
}
1417+
if !hasProcessedAllEvents.Load() {
1418+
return errors.New("events not processed")
1419+
}
1420+
return nil
1421+
})
1422+
}
1423+
1424+
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
1425+
}
1426+
12091427
func fetchRoleMembers(
12101428
ctx context.Context, execCfg *sql.ExecutorConfig, ts hlc.Timestamp,
12111429
) ([][]string, error) {

0 commit comments

Comments
 (0)