@@ -716,3 +716,157 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) {
716716 })
717717 }
718718}
719+
720+ func TestParquetQueryableFallbackDisabled (t * testing.T ) {
721+ block1 := ulid .MustNew (1 , nil )
722+ block2 := ulid .MustNew (2 , nil )
723+ minT := int64 (10 )
724+ maxT := util .TimeToMillis (time .Now ())
725+
726+ createStore := func () * blocksStoreSetMock {
727+ return & blocksStoreSetMock {mockedResponses : []interface {}{
728+ map [BlocksStoreClient ][]ulid.ULID {
729+ & storeGatewayClientMock {remoteAddr : "1.1.1.1" ,
730+ mockedSeriesResponses : []* storepb.SeriesResponse {
731+ mockSeriesResponse (labels.Labels {{Name : labels .MetricName , Value : "fromSg" }}, []cortexpb.Sample {{Value : 1 , TimestampMs : minT }, {Value : 2 , TimestampMs : minT + 1 }}, nil , nil ),
732+ mockHintsResponse (block1 , block2 ),
733+ },
734+ mockedLabelNamesResponse : & storepb.LabelNamesResponse {
735+ Names : namesFromSeries (labels .FromMap (map [string ]string {labels .MetricName : "fromSg" , "fromSg" : "fromSg" })),
736+ Warnings : []string {},
737+ Hints : mockNamesHints (block1 , block2 ),
738+ },
739+ mockedLabelValuesResponse : & storepb.LabelValuesResponse {
740+ Values : valuesFromSeries (labels .MetricName , labels .FromMap (map [string ]string {labels .MetricName : "fromSg" , "fromSg" : "fromSg" })),
741+ Warnings : []string {},
742+ Hints : mockValuesHints (block1 , block2 ),
743+ },
744+ }: {block1 , block2 }},
745+ },
746+ }
747+ }
748+
749+ matchers := []* labels.Matcher {
750+ labels .MustNewMatcher (labels .MatchEqual , labels .MetricName , "fromSg" ),
751+ }
752+ ctx := user .InjectOrgID (context .Background (), "user-1" )
753+
754+ t .Run ("should return consistency check errors when fallback disabled and some blocks not available as parquet" , func (t * testing.T ) {
755+ finder := & blocksFinderMock {}
756+ stores := createStore ()
757+
758+ q := & blocksStoreQuerier {
759+ minT : minT ,
760+ maxT : maxT ,
761+ finder : finder ,
762+ stores : stores ,
763+ consistency : NewBlocksConsistencyChecker (0 , 0 , log .NewNopLogger (), nil ),
764+ logger : log .NewNopLogger (),
765+ metrics : newBlocksStoreQueryableMetrics (prometheus .NewPedanticRegistry ()),
766+ limits : & blocksStoreLimitsMock {},
767+
768+ storeGatewayConsistencyCheckMaxAttempts : 3 ,
769+ }
770+
771+ mParquetQuerier := & mockParquetQuerier {}
772+ pq := & parquetQuerierWithFallback {
773+ minT : minT ,
774+ maxT : maxT ,
775+ finder : finder ,
776+ blocksStoreQuerier : q ,
777+ parquetQuerier : mParquetQuerier ,
778+ queryStoreAfter : time .Hour ,
779+ metrics : newParquetQueryableFallbackMetrics (prometheus .NewRegistry ()),
780+ limits : defaultOverrides (t , 0 ),
781+ logger : log .NewNopLogger (),
782+ defaultBlockStoreType : parquetBlockStore ,
783+ fallbackDisabled : true , // Disable fallback
784+ }
785+
786+ // Set up blocks where block1 has parquet metadata but block2 doesn't
787+ finder .On ("GetBlocks" , mock .Anything , "user-1" , minT , mock .Anything ).Return (bucketindex.Blocks {
788+ & bucketindex.Block {ID : block1 , Parquet : & parquet.ConverterMarkMeta {Version : 1 }}, // Available as parquet
789+ & bucketindex.Block {ID : block2 }, // Not available as parquet
790+ }, map [ulid.ULID ]* bucketindex.BlockDeletionMark (nil ), nil )
791+
792+ expectedError := fmt .Sprintf ("consistency check failed because some blocks were not available as parquet files: %s" , block2 .String ())
793+
794+ t .Run ("select should return consistency check error" , func (t * testing.T ) {
795+ ss := pq .Select (ctx , true , nil , matchers ... )
796+ require .Error (t , ss .Err ())
797+ require .Contains (t , ss .Err ().Error (), expectedError )
798+ })
799+
800+ t .Run ("labelNames should return consistency check error" , func (t * testing.T ) {
801+ _ , _ , err := pq .LabelNames (ctx , nil , matchers ... )
802+ require .Error (t , err )
803+ require .Contains (t , err .Error (), expectedError )
804+ })
805+
806+ t .Run ("labelValues should return consistency check error" , func (t * testing.T ) {
807+ _ , _ , err := pq .LabelValues (ctx , labels .MetricName , nil , matchers ... )
808+ require .Error (t , err )
809+ require .Contains (t , err .Error (), expectedError )
810+ })
811+ })
812+
813+ t .Run ("should work normally when all blocks are available as parquet and fallback disabled" , func (t * testing.T ) {
814+ finder := & blocksFinderMock {}
815+ stores := createStore ()
816+
817+ q := & blocksStoreQuerier {
818+ minT : minT ,
819+ maxT : maxT ,
820+ finder : finder ,
821+ stores : stores ,
822+ consistency : NewBlocksConsistencyChecker (0 , 0 , log .NewNopLogger (), nil ),
823+ logger : log .NewNopLogger (),
824+ metrics : newBlocksStoreQueryableMetrics (prometheus .NewPedanticRegistry ()),
825+ limits : & blocksStoreLimitsMock {},
826+
827+ storeGatewayConsistencyCheckMaxAttempts : 3 ,
828+ }
829+
830+ mParquetQuerier := & mockParquetQuerier {}
831+ pq := & parquetQuerierWithFallback {
832+ minT : minT ,
833+ maxT : maxT ,
834+ finder : finder ,
835+ blocksStoreQuerier : q ,
836+ parquetQuerier : mParquetQuerier ,
837+ queryStoreAfter : time .Hour ,
838+ metrics : newParquetQueryableFallbackMetrics (prometheus .NewRegistry ()),
839+ limits : defaultOverrides (t , 0 ),
840+ logger : log .NewNopLogger (),
841+ defaultBlockStoreType : parquetBlockStore ,
842+ fallbackDisabled : true , // Disable fallback
843+ }
844+
845+ // Set up blocks where both blocks have parquet metadata
846+ finder .On ("GetBlocks" , mock .Anything , "user-1" , minT , mock .Anything ).Return (bucketindex.Blocks {
847+ & bucketindex.Block {ID : block1 , Parquet : & parquet.ConverterMarkMeta {Version : 1 }}, // Available as parquet
848+ & bucketindex.Block {ID : block2 , Parquet : & parquet.ConverterMarkMeta {Version : 1 }}, // Available as parquet
849+ }, map [ulid.ULID ]* bucketindex.BlockDeletionMark (nil ), nil )
850+
851+ t .Run ("select should work without error" , func (t * testing.T ) {
852+ mParquetQuerier .Reset ()
853+ ss := pq .Select (ctx , true , nil , matchers ... )
854+ require .NoError (t , ss .Err ())
855+ require .Len (t , mParquetQuerier .queriedBlocks , 2 )
856+ })
857+
858+ t .Run ("labelNames should work without error" , func (t * testing.T ) {
859+ mParquetQuerier .Reset ()
860+ _ , _ , err := pq .LabelNames (ctx , nil , matchers ... )
861+ require .NoError (t , err )
862+ require .Len (t , mParquetQuerier .queriedBlocks , 2 )
863+ })
864+
865+ t .Run ("labelValues should work without error" , func (t * testing.T ) {
866+ mParquetQuerier .Reset ()
867+ _ , _ , err := pq .LabelValues (ctx , labels .MetricName , nil , matchers ... )
868+ require .NoError (t , err )
869+ require .Len (t , mParquetQuerier .queriedBlocks , 2 )
870+ })
871+ })
872+ }
0 commit comments