Skip to content

Commit d3cbbf7

Browse files
committed
changefeedccl: update timestamp on event descriptor cache hits
Before, in some cases we could replan a CDC query with an old timestamp from the event descriptor cache, which would try to read the database descriptor at an old potentially garbage collected timestamp. Now, we make sure the event descriptor cache always gives events descriptors with an up to date timestamp. Epic: none Fixes: #156091 Release note (bug fix): A bug where changefeeds using CDC queries could sometimes unexpectedly fail after a schema change with a descriptor retrieval error has been fixed.
1 parent 46c2126 commit d3cbbf7

File tree

6 files changed

+249
-13
lines changed

6 files changed

+249
-13
lines changed

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -521,9 +521,14 @@ func getEventDescriptorCached(
521521
idVer := CacheKey{ID: desc.GetID(), Version: desc.GetVersion(), FamilyID: family.ID}
522522

523523
if v, ok := cache.Get(idVer); ok {
524-
ed := v.(*EventDescriptor)
525-
if catalog.UserDefinedTypeColsHaveSameVersion(ed.td, desc) {
526-
return ed, nil
524+
cached := v.(*EventDescriptor)
525+
if catalog.UserDefinedTypeColsHaveSameVersion(cached.td, desc) {
526+
// Make a shallow copy to avoid modifying the cached value. The cached
527+
// EventDescriptor is shared across changefeed operations and may be
528+
// referenced concurrently.
529+
ed := *cached
530+
ed.SchemaTS = schemaTS
531+
return &ed, nil
527532
}
528533
}
529534

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6890,7 +6890,7 @@ func TestChangefeedDataTTL(t *testing.T) {
68906890

68916891
// Force a GC of the table. This should cause both
68926892
// versions of the table to be deleted.
6893-
forceTableGC(t, s.SystemServer, sqlDB, "d", "foo")
6893+
forceTableGC(t, s.SystemServer, "d", "foo")
68946894

68956895
// Resume our changefeed normally.
68966896
atomic.StoreInt32(&shouldWait, 0)
@@ -6947,7 +6947,7 @@ func TestChangefeedOutdatedCursor(t *testing.T) {
69476947
sqlDB.Exec(t, `CREATE TABLE f (a INT PRIMARY KEY)`)
69486948
outdatedTS := s.Server.Clock().Now().AsOfSystemTime()
69496949
sqlDB.Exec(t, `INSERT INTO f VALUES (1)`)
6950-
forceTableGC(t, s.SystemServer, sqlDB, "system", "descriptor")
6950+
forceTableGC(t, s.SystemServer, "system", "descriptor")
69516951
createChangefeed :=
69526952
fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE f with cursor = '%s'`, outdatedTS)
69536953
expectedErrorSubstring :=
@@ -7072,7 +7072,7 @@ func TestChangefeedSchemaTTL(t *testing.T) {
70727072

70737073
// Force a GC of the table. This should cause both older versions of the
70747074
// table to be deleted, with the middle version being lost to the changefeed.
7075-
forceTableGC(t, s.SystemServer, sqlDB, "system", "descriptor")
7075+
forceTableGC(t, s.SystemServer, "system", "descriptor")
70767076

70777077
// Resume our changefeed normally.
70787078
atomic.StoreInt32(&shouldWait, 0)

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,14 +1720,21 @@ func maybeUseExternalConnection(
17201720
}
17211721
}
17221722

1723-
func forceTableGC(
1723+
func forceTableGC(t testing.TB, tsi serverutils.TestServerInterface, database, table string) {
1724+
t.Helper()
1725+
if err := tsi.ForceTableGC(context.Background(), database, table, tsi.Clock().Now()); err != nil {
1726+
t.Fatal(err)
1727+
}
1728+
}
1729+
1730+
func forceTableGCAtTimestamp(
17241731
t testing.TB,
17251732
tsi serverutils.TestServerInterface,
1726-
sqlDB *sqlutils.SQLRunner,
17271733
database, table string,
1734+
timestamp hlc.Timestamp,
17281735
) {
17291736
t.Helper()
1730-
if err := tsi.ForceTableGC(context.Background(), database, table, tsi.Clock().Now()); err != nil {
1737+
if err := tsi.ForceTableGC(context.Background(), database, table, timestamp); err != nil {
17311738
t.Fatal(err)
17321739
}
17331740
}
@@ -2002,12 +2009,12 @@ func runWithAndWithoutRegression141453(
20022009
var blockPop atomic.Bool
20032010
popCh := make(chan struct{})
20042011
return kvevent.BlockingBufferTestingKnobs{
2005-
BeforeAdd: func(ctx context.Context, e kvevent.Event) (context.Context, kvevent.Event) {
2012+
BeforeAdd: func(ctx context.Context, e kvevent.Event) (_ context.Context, _ kvevent.Event, shouldAdd bool) {
20062013
if e.Type() == kvevent.TypeResolved &&
20072014
e.Resolved().BoundaryType == jobspb.ResolvedSpan_RESTART {
20082015
blockPop.Store(true)
20092016
}
2010-
return ctx, e
2017+
return ctx, e, true
20112018
},
20122019
BeforePop: func() {
20132020
if blockPop.Load() {

pkg/ccl/changefeedccl/kvevent/blocking_buffer.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,11 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo
279279
// Add implements Writer interface.
280280
func (b *blockingBuffer) Add(ctx context.Context, e Event) error {
281281
if b.knobs.BeforeAdd != nil {
282-
ctx, e = b.knobs.BeforeAdd(ctx, e)
282+
var shouldAdd bool
283+
ctx, e, shouldAdd = b.knobs.BeforeAdd(ctx, e)
284+
if !shouldAdd {
285+
return nil
286+
}
283287
}
284288

285289
if log.V(2) {

pkg/ccl/changefeedccl/kvevent/testing_knobs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import "context"
99

1010
// BlockingBufferTestingKnobs are testing knobs for blocking buffers.
1111
type BlockingBufferTestingKnobs struct {
12-
BeforeAdd func(ctx context.Context, e Event) (context.Context, Event)
12+
// BeforeAdd is called before adding a KV event to the buffer. If the function
13+
// returns false in the third return value, the event is skipped.
14+
BeforeAdd func(ctx context.Context, e Event) (_ context.Context, _ Event, shouldAdd bool)
1315
BeforePop func()
1416
BeforeDrain func(ctx context.Context) context.Context
1517
AfterDrain func(err error)

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
1919
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
20+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
2021
"github.com/cockroachdb/cockroach/pkg/jobs"
2122
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2223
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -1216,6 +1217,223 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
12161217
cdcTest(t, testFn, feedTestEnterpriseSinks)
12171218
}
12181219

1220+
// TestCachedEventDescriptorGivesUpdatedTimestamp is a regression test for
1221+
// #156091. It tests that when a changefeed with a cdc query receives events
1222+
// from the KVFeed out of order, even across table descriptor versions, the
1223+
// query will be replanned at a timestamp we know has not been garbage collected.
1224+
// Previously, we would get an old timestamp from the cached event descriptor
1225+
// and if the db descriptor version had changed and been GC'd, this replan would
1226+
// fail, failing the changefeed.
1227+
func TestCachedEventDescriptorGivesUpdatedTimestamp(t *testing.T) {
1228+
defer leaktest.AfterTest(t)()
1229+
defer log.Scope(t).Close(t)
1230+
1231+
testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
1232+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1233+
1234+
// Making sure there's only one worker so that there is a single event
1235+
// descriptor cache. This means that the later events will use the
1236+
// cached event descriptor and its outdated timestamp, to properly
1237+
// reproduce the issue.
1238+
changefeedbase.EventConsumerWorkers.Override(
1239+
context.Background(), &s.Server.ClusterSettings().SV, 1)
1240+
1241+
/*
1242+
The situation from issue #156091 that we are trying to reproduce here
1243+
happens when a changefeed is replanning a CDC query for a table descriptor
1244+
version it has seen before. In that case, it replans the query at the
1245+
timestamp (stored in the cache) of the first event it saw for that table
1246+
descriptor version.
1247+
1248+
That's problematic because that could be well before the highwater for
1249+
the feed and therefore not protected by PTS. Even though our protected
1250+
timestamp system ensures that the relevant table descriptor version has
1251+
not been GC'd (since it is still used by the event we're processing),
1252+
there is no such guarantee that the *DB* descriptor version from that
1253+
time (which wasn't around at the time of that event) has not been GC'd.
1254+
If we try to fetch the DB descriptor version (as we do when replanning
1255+
a CDC query) and it has already been GC'd, the CDC query replan will
1256+
fail, and the changefeed with it.
1257+
1258+
So, in order to reproduce this we require that
1259+
a) KV events must come out of order so that we are both doing a CDC
1260+
query replan AND that the timestamp for that replan comes from the
1261+
EventDescriptor cache.
1262+
b) the DB descriptor version has changed between the event that seeded
1263+
the cache and the later event that shares that table descriptor version
1264+
and finally
1265+
c) that the old DB descriptor version has been garbage collected.
1266+
1267+
Ultimately the series of events will be
1268+
1. We see event 1 with table descriptor version 1 and DB descriptor
1269+
version 1. This is processed by the changefeeed seeding the event
1270+
descriptor cache at T_0.
1271+
2. We update the DB descriptor version to version 2.
1272+
3. We garbage collect the descriptor table through time T_0 and with it
1273+
DB descriptor version 1.
1274+
4. We make an update causing event 2 with the same table descriptor version
1275+
as event 1 (table descriptor version 1) but whose kv event will only
1276+
come through after event 3's.
1277+
5. We update the table descriptor version (to version 2) and make an
1278+
update (event 3).
1279+
6. The KV event for event 3 comes in first, causing the changefeed's
1280+
current table version to be table version 2.
1281+
7. The KV event for event 2 comes in out of order causing a replan of
1282+
the CDC query to happen (back to table descriptor version 1).
1283+
1284+
If we return the timestamp from the first event we saw on table descriptor
1285+
version 1, this replan will try to fetch the DB descriptor at time T_0
1286+
(when event 1 happened) which has been garbage collected and would fail
1287+
the feed.
1288+
*/
1289+
var dbDescTS atomic.Value
1290+
dbDescTS.Store(hlc.Timestamp{})
1291+
var hasGCdDBDesc atomic.Bool
1292+
var kvEvents []kvevent.Event
1293+
var hasProcessedAllEvents atomic.Bool
1294+
beforeAddKnob := func(ctx context.Context, e kvevent.Event) (_ context.Context, _ kvevent.Event, shouldAdd bool) {
1295+
// Since we are going to be ignoring some KV events, we don't send
1296+
// resolved events to avoid violating changefeed guarantees.
1297+
// Since this test also depends on specific GC behavior, we handle GC
1298+
// ourselves.
1299+
if e.Type() == kvevent.TypeResolved {
1300+
resolvedTimestamp := e.Timestamp()
1301+
1302+
// We need to wait for the resolved timestamp to move past the
1303+
// first kv event so that we know it's safe to GC the first database
1304+
// descriptor version.
1305+
if !hasGCdDBDesc.Load() {
1306+
dbDescTSVal := dbDescTS.Load().(hlc.Timestamp)
1307+
if !dbDescTSVal.IsEmpty() && dbDescTSVal.Less(resolvedTimestamp) {
1308+
t.Logf("GCing database descriptor table at timestamp: %s", dbDescTSVal)
1309+
forceTableGCAtTimestamp(t, s.SystemServer, "system", "descriptor", dbDescTSVal)
1310+
hasGCdDBDesc.Store(true)
1311+
}
1312+
}
1313+
1314+
// We use the resolved events to know when we can stop the test.
1315+
if len(kvEvents) > 2 && resolvedTimestamp.After(kvEvents[2].Timestamp()) {
1316+
hasProcessedAllEvents.Store(true)
1317+
}
1318+
1319+
// Do not send any of the resolved events.
1320+
return ctx, e, false
1321+
}
1322+
1323+
if e.Type() == kvevent.TypeKV {
1324+
if len(kvEvents) > 0 && e.Timestamp() == kvEvents[0].Timestamp() {
1325+
// Ignore duplicates of the first kv event which may come while
1326+
// we're waiting to GC the first database descriptor version.
1327+
return ctx, e, false
1328+
}
1329+
1330+
kvEvents = append(kvEvents, e)
1331+
switch len(kvEvents) {
1332+
case 1:
1333+
// Event 1 is sent as normal to seed the event descriptor cache.
1334+
t.Logf("Event 1 timestamp: %s", kvEvents[0].Timestamp())
1335+
return ctx, kvEvents[0], true
1336+
case 2:
1337+
// Event 2 is stored in kvEvents and we will send it later.
1338+
// Sending it after event 3, which has a different table
1339+
// descriptor version, will cause CDC query replan.
1340+
return ctx, e, false
1341+
case 3:
1342+
// Event 3 is sent as normal to replan the CDC query with the
1343+
// new table descriptor version.
1344+
t.Logf("Event 3 timestamp: %s", kvEvents[2].Timestamp())
1345+
return ctx, kvEvents[2], true
1346+
case 4:
1347+
// Now we send event 2 *after* we've sent event 3. This should
1348+
// cause a CDC query replan with a cached event descriptor,
1349+
// since the table version is the same as event 1. If we use
1350+
// the timestamp of event 1, that replan will fail to fetch
1351+
// the GC'd DB descriptor version failing the changefeed.
1352+
// This is what we saw in issue #156091.
1353+
t.Logf("Event 2 timestamp: %s", kvEvents[1].Timestamp())
1354+
return ctx, kvEvents[1], true
1355+
default:
1356+
// We do not need to send any more events after events
1357+
// 1, 2 and 3 have been processed.
1358+
return ctx, e, false
1359+
}
1360+
}
1361+
1362+
return ctx, e, true
1363+
}
1364+
1365+
knobs := s.TestingKnobs.
1366+
DistSQL.(*execinfra.TestingKnobs).
1367+
Changefeed.(*TestingKnobs)
1368+
1369+
knobs.MakeKVFeedToAggregatorBufferKnobs = func() kvevent.BlockingBufferTestingKnobs {
1370+
return kvevent.BlockingBufferTestingKnobs{
1371+
BeforeAdd: beforeAddKnob,
1372+
}
1373+
}
1374+
1375+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
1376+
foo := feed(t, f, `CREATE CHANGEFEED WITH resolved = '10ms' AS SELECT * FROM foo`)
1377+
defer closeFeed(t, foo)
1378+
1379+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
1380+
1381+
// Change the database descriptor version by granting permission to a user.
1382+
sqlDB.Exec(t, `CREATE USER testuser`)
1383+
sqlDB.Exec(t, `GRANT CREATE ON DATABASE d TO testuser`)
1384+
1385+
// Fetch the cluster logical timestamp so that we can make sure the
1386+
// resolved timestamp has moved past it and it's safe to GC the
1387+
// descriptor table (specifically the first database descriptor version).
1388+
var dbDescTSString string
1389+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&dbDescTSString)
1390+
dbDescTSParsed, err := hlc.ParseHLC(dbDescTSString)
1391+
require.NoError(t, err)
1392+
dbDescTS.Store(dbDescTSParsed)
1393+
t.Logf("Timestamp after DB descriptor version change: %s", dbDescTSParsed)
1394+
1395+
testutils.SucceedsSoon(t, func() error {
1396+
if !hasGCdDBDesc.Load() {
1397+
return errors.New("database descriptor table not GCed")
1398+
}
1399+
return nil
1400+
})
1401+
1402+
// This event will be delayed by the KVFeed until after event 3 has been sent.
1403+
// Instead of sending it out, we GC the descriptor table including the
1404+
// old database descriptor version.
1405+
sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`)
1406+
1407+
// Change the table descriptor version by granting permission to a user.
1408+
sqlDB.Exec(t, `GRANT CREATE ON TABLE foo TO testuser`)
1409+
1410+
sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`)
1411+
1412+
// Since we skip processing the KV event for event 2, we will replace
1413+
// the KV event for event 4 the stored one for event 2. This event is
1414+
// not itself relevant to the test, but helps us send the KV events out
1415+
// of order.
1416+
sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`)
1417+
1418+
// Wait for changefeed events 1, 2 and 3 to be processed. If the feed
1419+
// has failed, stop waiting and fail the test immediately.
1420+
testutils.SucceedsSoon(t, func() error {
1421+
var errorStr string
1422+
sqlDB.QueryRow(t, `SELECT error FROM [SHOW CHANGEFEED JOBS] WHERE job_id = $1`, foo.(cdctest.EnterpriseTestFeed).JobID()).Scan(&errorStr)
1423+
if errorStr != "" {
1424+
t.Fatalf("changefeed error: %s", errorStr)
1425+
return nil
1426+
}
1427+
if !hasProcessedAllEvents.Load() {
1428+
return errors.New("events not processed")
1429+
}
1430+
return nil
1431+
})
1432+
}
1433+
1434+
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
1435+
}
1436+
12191437
func fetchRoleMembers(
12201438
ctx context.Context, execCfg *sql.ExecutorConfig, ts hlc.Timestamp,
12211439
) ([][]string, error) {

0 commit comments

Comments
 (0)