@@ -9,55 +9,53 @@ import (
99 "context"
1010 "testing"
1111
12+ "github.com/cockroachdb/cockroach/pkg/base"
1213 "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
13- "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14- "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
15- "github.com/cockroachdb/cockroach/pkg/jobs"
16- "github.com/cockroachdb/cockroach/pkg/sql"
14+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
15+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1716 "github.com/cockroachdb/cockroach/pkg/sql/isql"
18- "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils "
17+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils "
1918 "github.com/cockroachdb/cockroach/pkg/util/leaktest"
2019 "github.com/cockroachdb/cockroach/pkg/util/log"
20+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
2121 "github.com/stretchr/testify/require"
2222)
2323
24- func TestChangefeedJobInfoResolvedTables (t * testing.T ) {
24+ func TestChangefeedJobInfoRoundTrip (t * testing.T ) {
2525 defer leaktest .AfterTest (t )()
2626 defer log .Scope (t ).Close (t )
2727
28- testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
29- ctx := context .Background ()
30- sqlDB := sqlutils .MakeSQLRunner (s .DB )
31-
32- // Make sure per-table tracking is enabled.
33- changefeedbase .TrackPerTableProgress .Override (ctx , & s .Server .ClusterSettings ().SV , true )
34-
35- sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)` )
36- sqlDB .Exec (t , `CREATE TABLE bar (x INT PRIMARY KEY, y STRING)` )
37- feed := feed (t , f , `CREATE CHANGEFEED FOR foo, bar` )
38- defer closeFeed (t , feed )
39-
40- sqlDB .Exec (t , `INSERT INTO foo VALUES (1, 'one')` )
41- sqlDB .Exec (t , `INSERT INTO bar VALUES (10, 'ten')` )
42- assertPayloads (t , feed , []string {
43- `foo: [1]->{"after": {"a": 1, "b": "one"}}` ,
44- `bar: [10]->{"after": {"x": 10, "y": "ten"}}` ,
45- })
46-
47- // The ResolvedTables message should be persisted to the job_info table
48- // at the same time as the highwater being set.
49- enterpriseFeed := feed .(cdctest.EnterpriseTestFeed )
50- waitForHighwater (t , enterpriseFeed , s .Server .JobRegistry ().(* jobs.Registry ))
51-
52- // Make sure the ResolvedTables message was persisted and can be decoded.
53- var resolvedTables cdcprogresspb.ResolvedTables
54- execCfg := s .Server .ExecutorConfig ().(sql.ExecutorConfig )
55- err := execCfg .InternalDB .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
56- return readChangefeedJobInfo (ctx , resolvedTablesFilename , & resolvedTables , txn , enterpriseFeed .JobID ())
57- })
58- require .NoError (t , err )
59- require .Len (t , resolvedTables .Tables , 2 )
28+ ctx := context .Background ()
29+ srv := serverutils .StartServerOnly (t , base.TestServerArgs {})
30+ defer srv .Stopper ().Stop (ctx )
31+
32+ jobID := jobspb .JobID (123456 )
33+
34+ // Create a basic progress record.
35+ uuid1 := uuid .MakeV4 ()
36+ uuid2 := uuid .MakeV4 ()
37+ ptsRecords := cdcprogresspb.ProtectedTimestampRecords {
38+ ProtectedTimestampRecords : map [descpb.ID ]* uuid.UUID {
39+ descpb .ID (100 ): & uuid1 ,
40+ descpb .ID (200 ): & uuid2 ,
41+ },
6042 }
6143
62- cdcTest (t , testFn , feedTestEnterpriseSinks )
44+ // Write the progress record.
45+ err := srv .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
46+ return writeChangefeedJobInfo (ctx ,
47+ perTableProtectedTimestampsFilename , & ptsRecords , txn , jobID )
48+ })
49+ require .NoError (t , err )
50+
51+ // Read the record back.
52+ var readPTSRecords cdcprogresspb.ProtectedTimestampRecords
53+ err = srv .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
54+ return readChangefeedJobInfo (ctx ,
55+ perTableProtectedTimestampsFilename , & readPTSRecords , txn , jobID )
56+ })
57+ require .NoError (t , err )
58+
59+ // Verify the read data matches the written data.
60+ require .Equal (t , ptsRecords , readPTSRecords )
6361}
0 commit comments