@@ -11,8 +11,8 @@ import (
1111 "github.com/opentracing/opentracing-go"
1212 "github.com/parquet-go/parquet-go"
1313 "github.com/pkg/errors"
14+ "github.com/prometheus-community/parquet-common/queryable"
1415 "github.com/prometheus-community/parquet-common/schema"
15- "github.com/prometheus-community/parquet-common/search"
1616 parquet_storage "github.com/prometheus-community/parquet-common/storage"
1717 "github.com/prometheus/client_golang/prometheus"
1818 "github.com/prometheus/client_golang/prometheus/promauto"
@@ -125,14 +125,14 @@ func NewParquetQueryable(
125125 return nil , err
126126 }
127127
128- cache , err := newCache [* parquet_storage.ParquetShard ]("parquet-shards" , config .ParquetQueryableShardCacheSize , newCacheMetrics (reg ))
128+ cache , err := newCache [parquet_storage.ParquetShard ]("parquet-shards" , config .ParquetQueryableShardCacheSize , newCacheMetrics (reg ))
129129 if err != nil {
130130 return nil , err
131131 }
132132
133133 cDecoder := schema .NewPrometheusParquetChunksDecoder (chunkenc .NewPool ())
134134
135- parquetQueryable , err := search .NewParquetQueryable (cDecoder , func (ctx context.Context , mint , maxt int64 ) ([]* parquet_storage.ParquetShard , error ) {
135+ parquetQueryable , err := queryable .NewParquetQueryable (cDecoder , func (ctx context.Context , mint , maxt int64 ) ([]parquet_storage.ParquetShard , error ) {
136136 userID , err := tenant .TenantID (ctx )
137137 if err != nil {
138138 return nil , err
@@ -143,8 +143,8 @@ func NewParquetQueryable(
143143 return nil , errors .Errorf ("failed to extract blocks from context" )
144144 }
145145 userBkt := bucket .NewUserBucketClient (userID , bucketClient , limits )
146-
147- shards := make ([]* parquet_storage.ParquetShard , len (blocks ))
146+ bucketOpener := parquet_storage . NewParquetBucketOpener ( userBkt )
147+ shards := make ([]parquet_storage.ParquetShard , len (blocks ))
148148 errGroup := & errgroup.Group {}
149149
150150 span , ctx := opentracing .StartSpanFromContext (ctx , "parquetQuerierWithFallback.OpenShards" )
@@ -157,16 +157,18 @@ func NewParquetQueryable(
157157 if shard == nil {
158158 // we always only have 1 shard - shard 0
159159 // Use context.Background() here as the file can be cached and live after the request ends.
160- shard , err = parquet_storage .OpenParquetShard ( context . WithoutCancel ( ctx ),
161- userBkt ,
160+ shard , err = parquet_storage .NewParquetShardOpener (
161+ context . WithoutCancel ( ctx ) ,
162162 block .ID .String (),
163+ bucketOpener ,
164+ bucketOpener ,
163165 0 ,
164166 parquet_storage .WithFileOptions (
165167 parquet .SkipMagicBytes (true ),
166168 parquet .ReadBufferSize (100 * 1024 ),
167169 parquet .SkipBloomFilters (true ),
170+ parquet .OptimisticRead (true ),
168171 ),
169- parquet_storage .WithOptimisticReader (true ),
170172 )
171173 if err != nil {
172174 return errors .Wrapf (err , "failed to open parquet shard. block: %v" , block .ID .String ())
0 commit comments