@@ -11,14 +11,20 @@ import (
1111 "time"
1212
1313 "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14+ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1415 "github.com/cockroachdb/cockroach/pkg/jobs"
1516 "github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1617 "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
18+ "github.com/cockroachdb/cockroach/pkg/roachpb"
19+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
1720 "github.com/cockroachdb/cockroach/pkg/sql/isql"
21+ "github.com/cockroachdb/cockroach/pkg/sql/rowenc"
1822 "github.com/cockroachdb/cockroach/pkg/testutils"
1923 "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
24+ "github.com/cockroachdb/cockroach/pkg/util/encoding"
2025 "github.com/cockroachdb/cockroach/pkg/util/leaktest"
2126 "github.com/cockroachdb/cockroach/pkg/util/log"
27+ "github.com/cockroachdb/cockroach/pkg/util/span"
2228 "github.com/cockroachdb/errors"
2329 "github.com/stretchr/testify/require"
2430)
@@ -54,9 +60,9 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
5460
5561 // Make sure frontier gets persisted to job_info table.
5662 jobID := foo .(cdctest.EnterpriseTestFeed ).JobID ()
63+ var allSpans []jobspb.ResolvedSpan
5764 testutils .SucceedsSoon (t , func () error {
5865 var found bool
59- var allSpans []jobspb.ResolvedSpan
6066 if err := s .Server .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
6167 var err error
6268 allSpans , found , err = jobfrontier .GetAllResolvedSpans (ctx , txn , jobID )
@@ -74,6 +80,17 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
7480 return nil
7581 })
7682
83+ // Make sure the persisted spans cover the entire table.
84+ fooTableSpan := desctestutils .
85+ TestingGetPublicTableDescriptor (s .Server .DB (), s .Codec , "d" , "foo" ).
86+ PrimaryIndexSpan (s .Codec )
87+ var spanGroup roachpb.SpanGroup
88+ spanGroup .Add (fooTableSpan )
89+ for _ , rs := range allSpans {
90+ spanGroup .Sub (rs .Span )
91+ }
92+ require .Zero (t , spanGroup .Len ())
93+
7794 // Verify metric count and average latency have sensible values.
7895 testutils .SucceedsSoon (t , func () error {
7996 metricSnapshot := metric .CumulativeSnapshot ()
@@ -91,3 +108,74 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
91108
92109 cdcTest (t , testFn , feedTestEnterpriseSinks )
93110}
111+
112+ // TestChangefeedFrontierRestore verifies that changefeeds will correctly
113+ // restore progress from persisted span frontiers.
114+ func TestChangefeedFrontierRestore (t * testing.T ) {
115+ defer leaktest .AfterTest (t )()
116+ defer log .Scope (t ).Close (t )
117+
118+ testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
119+ sqlDB := sqlutils .MakeSQLRunner (s .DB )
120+ ctx := context .Background ()
121+
122+ // Disable span-level checkpointing.
123+ changefeedbase .SpanCheckpointInterval .Override (ctx , & s .Server .ClusterSettings ().SV , 0 )
124+
125+ // Create a table and a changefeed on it.
126+ sqlDB .Exec (t , "CREATE TABLE foo (a INT PRIMARY KEY)" )
127+ foo := feed (t , f , "CREATE CHANGEFEED FOR foo WITH initial_scan='no'" )
128+ defer closeFeed (t , foo )
129+ jobFeed := foo .(cdctest.EnterpriseTestFeed )
130+
131+ // Pause the changefeed.
132+ require .NoError (t , jobFeed .Pause ())
133+
134+ // Insert a few rows into the table and save the insert time.
135+ var tsStr string
136+ sqlDB .QueryRow (t , `INSERT INTO foo VALUES (1), (2), (3), (4), (5), (6)
137+ RETURNING cluster_logical_timestamp()` ).Scan (& tsStr )
138+ ts := parseTimeToHLC (t , tsStr )
139+
140+ // Make function to create spans for single rows in the table.
141+ codec := s .Server .Codec ()
142+ fooDesc := desctestutils .TestingGetPublicTableDescriptor (s .Server .DB (), codec , "d" , "foo" )
143+ rowSpan := func (key int64 ) roachpb.Span {
144+ keyPrefix := func () []byte {
145+ return rowenc .MakeIndexKeyPrefix (codec , fooDesc .GetID (), fooDesc .GetPrimaryIndexID ())
146+ }
147+ return roachpb.Span {
148+ Key : encoding .EncodeVarintAscending (keyPrefix (), key ),
149+ EndKey : encoding .EncodeVarintAscending (keyPrefix (), key + 1 ),
150+ }
151+ }
152+
153+ // Manually persist a span frontier that manually marks some of the
154+ // inserted rows as resolved already.
155+ hw , err := jobFeed .HighWaterMark ()
156+ require .NoError (t , err )
157+ frontier , err := span .MakeFrontierAt (hw , fooDesc .PrimaryIndexSpan (codec ))
158+ require .NoError (t , err )
159+ for _ , id := range []int64 {2 , 5 } {
160+ _ , err := frontier .Forward (rowSpan (id ), ts )
161+ require .NoError (t , err )
162+ }
163+ err = s .Server .InternalDB ().(isql.DB ).Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
164+ return jobfrontier .Store (ctx , txn , jobFeed .JobID (), "test frontier" , frontier )
165+ })
166+ require .NoError (t , err )
167+
168+ // Resume the changefeed.
169+ require .NoError (t , jobFeed .Resume ())
170+
171+ // We should receive rows 1, 3, 4, 6 (rows 2 and 5 were marked as resolved).
172+ assertPayloads (t , foo , []string {
173+ `foo: [1]->{"after": {"a": 1}}` ,
174+ `foo: [3]->{"after": {"a": 3}}` ,
175+ `foo: [4]->{"after": {"a": 4}}` ,
176+ `foo: [6]->{"after": {"a": 6}}` ,
177+ })
178+ }
179+
180+ cdcTest (t , testFn , feedTestEnterpriseSinks )
181+ }
0 commit comments