@@ -622,3 +622,228 @@ func TestJobProgressAndStatusAccessors(t *testing.T) {
622622 sql .CheckQueryResults (t , fmt .Sprintf ("SELECT status from system.job_status where job_id = %d" , job4 .ID ()), [][]string {{"c" }})
623623 })
624624}
625+
626+ func TestStorageRejectsInvalidJobID (t * testing.T ) {
627+ defer leaktest .AfterTest (t )()
628+ defer log .Scope (t ).Close (t )
629+
630+ ctx := context .Background ()
631+ s := serverutils .StartServerOnly (t , base.TestServerArgs {})
632+ defer s .Stopper ().Stop (ctx )
633+
634+ db := s .InternalDB ().(isql.DB )
635+
636+ t .Run ("ProgressStorage" , func (t * testing.T ) {
637+ progressStorage := jobs .ProgressStorage (jobspb .InvalidJobID )
638+
639+ t .Run ("Get" , func (t * testing.T ) {
640+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
641+ _ , _ , _ , err := progressStorage .Get (ctx , txn )
642+ require .Error (t , err )
643+ require .Contains (t , err .Error (), "invalid job ID" )
644+ return nil
645+ }))
646+ })
647+
648+ t .Run ("Set" , func (t * testing.T ) {
649+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
650+ err := progressStorage .Set (ctx , txn , 0.5 , hlc.Timestamp {})
651+ require .Error (t , err )
652+ require .Contains (t , err .Error (), "invalid job ID" )
653+ return nil
654+ }))
655+ })
656+ })
657+
658+ t .Run ("StatusStorage" , func (t * testing.T ) {
659+ statusStorage := jobs .StatusStorage (jobspb .InvalidJobID )
660+
661+ t .Run ("Get" , func (t * testing.T ) {
662+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
663+ _ , _ , err := statusStorage .Get (ctx , txn )
664+ require .Error (t , err )
665+ require .Contains (t , err .Error (), "invalid job ID" )
666+ return nil
667+ }))
668+ })
669+
670+ t .Run ("Set" , func (t * testing.T ) {
671+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
672+ err := statusStorage .Set (ctx , txn , "test status" )
673+ require .Error (t , err )
674+ require .Contains (t , err .Error (), "invalid job ID" )
675+ return nil
676+ }))
677+ })
678+
679+ t .Run ("Clear" , func (t * testing.T ) {
680+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
681+ err := statusStorage .Clear (ctx , txn )
682+ require .Error (t , err )
683+ require .Contains (t , err .Error (), "invalid job ID" )
684+ return nil
685+ }))
686+ })
687+ })
688+
689+ t .Run ("MessageStorage" , func (t * testing.T ) {
690+ messageStorage := jobs .MessageStorage (jobspb .InvalidJobID )
691+
692+ t .Run ("Record" , func (t * testing.T ) {
693+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
694+ err := messageStorage .Record (ctx , txn , "test" , "test message" )
695+ require .Error (t , err )
696+ require .Contains (t , err .Error (), "invalid job ID" )
697+ return nil
698+ }))
699+ })
700+
701+ t .Run ("Fetch" , func (t * testing.T ) {
702+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
703+ _ , err := messageStorage .Fetch (ctx , txn )
704+ require .Error (t , err )
705+ require .Contains (t , err .Error (), "invalid job ID" )
706+ return nil
707+ }))
708+ })
709+ })
710+
711+ t .Run ("InfoStorage" , func (t * testing.T ) {
712+ t .Run ("Get" , func (t * testing.T ) {
713+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
714+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
715+ _ , _ , err := infoStorage .Get (ctx , "test-op" , "test-key" )
716+ require .Error (t , err )
717+ require .Contains (t , err .Error (), "invalid job ID" )
718+ return nil
719+ }))
720+ })
721+
722+ t .Run ("Write" , func (t * testing.T ) {
723+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
724+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
725+ err := infoStorage .Write (ctx , "test-key" , []byte ("test-value" ))
726+ require .Error (t , err )
727+ require .Contains (t , err .Error (), "invalid job ID" )
728+ return nil
729+ }))
730+ })
731+
732+ t .Run ("WriteFirstKey" , func (t * testing.T ) {
733+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
734+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
735+ err := infoStorage .WriteFirstKey (ctx , "test-key" , []byte ("test-value" ))
736+ require .Error (t , err )
737+ require .Contains (t , err .Error (), "invalid job ID" )
738+ return nil
739+ }))
740+ })
741+
742+ t .Run ("Delete" , func (t * testing.T ) {
743+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
744+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
745+ err := infoStorage .Delete (ctx , "test-key" )
746+ require .Error (t , err )
747+ require .Contains (t , err .Error (), "invalid job ID" )
748+ return nil
749+ }))
750+ })
751+
752+ t .Run ("DeleteRange" , func (t * testing.T ) {
753+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
754+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
755+ err := infoStorage .DeleteRange (ctx , "start" , "end" , 0 )
756+ require .Error (t , err )
757+ require .Contains (t , err .Error (), "invalid job ID" )
758+ return nil
759+ }))
760+ })
761+
762+ t .Run ("Count" , func (t * testing.T ) {
763+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
764+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
765+ _ , err := infoStorage .Count (ctx , "start" , "end" )
766+ require .Error (t , err )
767+ require .Contains (t , err .Error (), "invalid job ID" )
768+ return nil
769+ }))
770+ })
771+
772+ t .Run ("Iterate" , func (t * testing.T ) {
773+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
774+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
775+ err := infoStorage .Iterate (ctx , "prefix" , func (key string , value []byte ) error {
776+ return nil
777+ })
778+ require .Error (t , err )
779+ require .Contains (t , err .Error (), "invalid job ID" )
780+ return nil
781+ }))
782+ })
783+
784+ t .Run ("GetLast" , func (t * testing.T ) {
785+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
786+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
787+ err := infoStorage .GetLast (ctx , "prefix" , func (key string , value []byte ) error {
788+ return nil
789+ })
790+ require .Error (t , err )
791+ require .Contains (t , err .Error (), "invalid job ID" )
792+ return nil
793+ }))
794+ })
795+
796+ t .Run ("GetLegacyPayload" , func (t * testing.T ) {
797+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
798+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
799+ _ , _ , err := infoStorage .GetLegacyPayload (ctx , "test-op" )
800+ require .Error (t , err )
801+ require .Contains (t , err .Error (), "invalid job ID" )
802+ return nil
803+ }))
804+ })
805+
806+ t .Run ("WriteLegacyPayload" , func (t * testing.T ) {
807+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
808+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
809+ err := infoStorage .WriteLegacyPayload (ctx , []byte ("test-payload" ))
810+ require .Error (t , err )
811+ require .Contains (t , err .Error (), "invalid job ID" )
812+ return nil
813+ }))
814+ })
815+
816+ t .Run ("GetLegacyProgress" , func (t * testing.T ) {
817+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
818+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
819+ _ , _ , err := infoStorage .GetLegacyProgress (ctx , "test-op" )
820+ require .Error (t , err )
821+ require .Contains (t , err .Error (), "invalid job ID" )
822+ return nil
823+ }))
824+ })
825+
826+ t .Run ("WriteLegacyProgress" , func (t * testing.T ) {
827+ require .NoError (t , db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
828+ infoStorage := jobs .InfoStorageForJob (txn , jobspb .InvalidJobID )
829+ err := infoStorage .WriteLegacyProgress (ctx , []byte ("test-progress" ))
830+ require .Error (t , err )
831+ require .Contains (t , err .Error (), "invalid job ID" )
832+ return nil
833+ }))
834+ })
835+ })
836+
837+ // Verify no rows were created in any of the job tables.
838+ sqlDB := sqlutils .MakeSQLRunner (s .SQLConn (t ))
839+ sqlDB .CheckQueryResults (t ,
840+ "SELECT count(*) FROM system.job_progress WHERE job_id = 0" , [][]string {{"0" }})
841+ sqlDB .CheckQueryResults (t ,
842+ "SELECT count(*) FROM system.job_progress_history WHERE job_id = 0" , [][]string {{"0" }})
843+ sqlDB .CheckQueryResults (t ,
844+ "SELECT count(*) FROM system.job_status WHERE job_id = 0" , [][]string {{"0" }})
845+ sqlDB .CheckQueryResults (t ,
846+ "SELECT count(*) FROM system.job_message WHERE job_id = 0" , [][]string {{"0" }})
847+ sqlDB .CheckQueryResults (t ,
848+ "SELECT count(*) FROM system.job_info WHERE job_id = 0" , [][]string {{"0" }})
849+ }
0 commit comments