@@ -13,7 +13,10 @@ import (
1313 "time"
1414
1515 "github.com/cockroachdb/cockroach/pkg/base"
16+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
17+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
1618 "github.com/cockroachdb/cockroach/pkg/sql"
19+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
1720 "github.com/cockroachdb/cockroach/pkg/testutils"
1821 "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1922 "github.com/cockroachdb/cockroach/pkg/testutils/skip"
@@ -145,3 +148,159 @@ func TestInspectJobImplicitTxnSemantics(t *testing.T) {
145148 })
146149 }
147150}
151+
152+ // TestInspectJobProtectedTimestamp verifies that INSPECT jobs properly create
153+ // and clean up protected timestamp records when using AS OF SYSTEM TIME.
154+ func TestInspectJobProtectedTimestamp (t * testing.T ) {
155+ defer leaktest .AfterTest (t )()
156+ defer log .Scope (t ).Close (t )
157+
158+ for _ , tc := range []struct {
159+ desc string
160+ forceJobFailure bool
161+ expectedJobStatus string
162+ expectError bool
163+ }{
164+ {
165+ desc : "job success with cleanup" ,
166+ forceJobFailure : false ,
167+ expectedJobStatus : "succeeded" ,
168+ expectError : false ,
169+ },
170+ {
171+ desc : "job failure with cleanup" ,
172+ forceJobFailure : true ,
173+ expectedJobStatus : "failed" ,
174+ expectError : true ,
175+ },
176+ } {
177+ t .Run (tc .desc , func (t * testing.T ) {
178+ var blockInspectExecution atomic.Bool
179+ var protectedTimestampCreated atomic.Bool
180+
181+ ctx := context .Background ()
182+ s , db , _ := serverutils .StartServer (t , base.TestServerArgs {
183+ Knobs : base.TestingKnobs {
184+ Inspect : & sql.InspectTestingKnobs {
185+ OnInspectAfterProtectedTimestamp : func () error {
186+ protectedTimestampCreated .Store (true )
187+ // Block execution until we've verified the protected timestamp
188+ for blockInspectExecution .Load () {
189+ time .Sleep (10 * time .Millisecond )
190+ }
191+ if tc .forceJobFailure {
192+ return errors .New ("forced job failure for testing" )
193+ }
194+ return nil
195+ },
196+ },
197+ },
198+ })
199+ defer s .Stopper ().Stop (ctx )
200+
201+ runner := sqlutils .MakeSQLRunner (db )
202+ runner .Exec (t , `
203+ CREATE DATABASE db;
204+ SET enable_scrub_job = true;
205+ CREATE TABLE db.t (
206+ id INT PRIMARY KEY,
207+ val INT
208+ );
209+ CREATE INDEX i1 on db.t (val);
210+ INSERT INTO db.t VALUES (1, 2), (2, 3);` )
211+
212+ // Start blocking inspection execution
213+ blockInspectExecution .Store (true )
214+
215+ // Start INSPECT job with AS OF timestamp in a goroutine
216+ errCh := make (chan error , 1 )
217+ go func () {
218+ _ , err := db .Exec ("EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1us'" )
219+ errCh <- err
220+ }()
221+
222+ // Wait for the protected timestamp hook to be called
223+ testutils .SucceedsSoon (t , func () error {
224+ if ! protectedTimestampCreated .Load () {
225+ return errors .New ("protected timestamp hook not called yet" )
226+ }
227+ return nil
228+ })
229+
230+ // Get the job ID
231+ var jobID int64
232+ runner .QueryRow (t , `
233+ SELECT id
234+ FROM crdb_internal.system_jobs
235+ WHERE job_type = 'INSPECT' AND status = 'running'
236+ ORDER BY created DESC
237+ LIMIT 1
238+ ` ).Scan (& jobID )
239+
240+ // Load the job and get protected timestamp record
241+ execCfg := s .ApplicationLayer ().ExecutorConfig ().(sql.ExecutorConfig )
242+ job , err := execCfg .JobRegistry .LoadJob (ctx , jobspb .JobID (jobID ))
243+ require .NoError (t , err )
244+
245+ details := job .Details ().(jobspb.InspectDetails )
246+ require .NotNil (t , details .ProtectedTimestampRecord , "protected timestamp record should be set" )
247+ protectedTSID := * details .ProtectedTimestampRecord
248+
249+ // Check that the protected timestamp record actually exists in the system
250+ var recordExists bool
251+ require .NoError (t , execCfg .InternalDB .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
252+ pts := execCfg .ProtectedTimestampProvider .WithTxn (txn )
253+ _ , err := pts .GetRecord (ctx , protectedTSID )
254+ if err != nil {
255+ if errors .Is (err , protectedts .ErrNotExists ) {
256+ recordExists = false
257+ return nil
258+ }
259+ return err
260+ }
261+ recordExists = true
262+ return nil
263+ }))
264+ require .True (t , recordExists , "protected timestamp record should exist in the system" )
265+
266+ // Allow the job to complete
267+ blockInspectExecution .Store (false )
268+
269+ // Wait for job to complete
270+ select {
271+ case err := <- errCh :
272+ if tc .expectError {
273+ require .Error (t , err , "INSPECT job should fail due to forced error" )
274+ } else {
275+ require .NoError (t , err , "INSPECT job should complete successfully" )
276+ }
277+ case <- time .After (30 * time .Second ):
278+ t .Fatal ("INSPECT job did not complete within timeout" )
279+ }
280+
281+ // Verify job status
282+ var status string
283+ runner .QueryRow (t , `
284+ SELECT status
285+ FROM crdb_internal.system_jobs
286+ WHERE id = $1
287+ ` , jobID ).Scan (& status )
288+ require .Equal (t , tc .expectedJobStatus , status , "job should have expected status" )
289+
290+ // Verify protected timestamp record is cleaned up
291+ testutils .SucceedsSoon (t , func () error {
292+ return execCfg .InternalDB .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
293+ pts := execCfg .ProtectedTimestampProvider .WithTxn (txn )
294+ _ , err := pts .GetRecord (ctx , protectedTSID )
295+ if err != nil {
296+ if errors .Is (err , protectedts .ErrNotExists ) {
297+ return nil // This is what we want
298+ }
299+ return err
300+ }
301+ return errors .New ("protected timestamp record still exists" )
302+ })
303+ })
304+ })
305+ }
306+ }
0 commit comments