@@ -88,6 +88,12 @@ type Converter struct {
8888 fetcherMetrics * block.FetcherMetrics
8989
9090 baseConverterOptions []convert.ConvertOption
91+
92+ metrics * metrics
93+
94+ // Keep track of the last owned users.
95+ // This is not thread safe now.
96+ lastOwnedUsers map [string ]struct {}
9197}
9298
9399func (cfg * Config ) RegisterFlags (f * flag.FlagSet ) {
@@ -125,6 +131,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
125131 pool : chunkenc .NewPool (),
126132 blockRanges : blockRanges ,
127133 fetcherMetrics : block .NewFetcherMetrics (registerer , nil , nil ),
134+ metrics : newMetrics (registerer ),
128135 bkt : bkt ,
129136 baseConverterOptions : []convert.ConvertOption {
130137 convert .WithSortBy (labels .MetricName ),
@@ -194,6 +201,9 @@ func (c *Converter) running(ctx context.Context) error {
194201
195202 for _ , userID := range users {
196203 if ! c .limits .ParquetConverterEnabled (userID ) {
204+ // It is possible that parquet is disabled for the userID so we
205+ // need to check if the user was owned last time.
206+ c .cleanupMetricsForNotOwnedUser (userID )
197207 continue
198208 }
199209
@@ -211,13 +221,15 @@ func (c *Converter) running(ctx context.Context) error {
211221 continue
212222 }
213223 if ! owned {
224+ c .cleanupMetricsForNotOwnedUser (userID )
214225 continue
215226 }
216227
217228 if markedForDeletion , err := cortex_tsdb .TenantDeletionMarkExists (ctx , c .bkt , userID ); err != nil {
218229 level .Warn (userLogger ).Log ("msg" , "unable to check if user is marked for deletion" , "user" , userID , "err" , err )
219230 continue
220231 } else if markedForDeletion {
232+ c .metrics .deleteMetricsForTenant (userID )
221233 level .Info (userLogger ).Log ("msg" , "skipping user because it is marked for deletion" , "user" , userID )
222234 continue
223235 }
@@ -229,6 +241,8 @@ func (c *Converter) running(ctx context.Context) error {
229241 level .Error (userLogger ).Log ("msg" , "failed to convert user" , "err" , err )
230242 }
231243 }
244+ c .lastOwnedUsers = ownedUsers
245+ c .metrics .ownedUsers .Set (float64 (len (ownedUsers )))
232246
233247 // Delete local files for unowned tenants, if there are any. This cleans up
234248 // leftover local files for tenants that belong to different converter now,
@@ -269,8 +283,15 @@ func (c *Converter) stopping(_ error) error {
269283}
270284
271285func (c * Converter ) discoverUsers (ctx context.Context ) ([]string , error ) {
272- // Only active users are considered.
273- active , _ , _ , err := c .usersScanner .ScanUsers (ctx )
286+ // Only active users are considered for conversion.
287+ // We still check deleting and deleted users just to clean up metrics.
288+ active , deleting , deleted , err := c .usersScanner .ScanUsers (ctx )
289+ for _ , userID := range deleting {
290+ c .cleanupMetricsForNotOwnedUser (userID )
291+ }
292+ for _ , userID := range deleted {
293+ c .cleanupMetricsForNotOwnedUser (userID )
294+ }
274295 return active , err
275296}
276297
@@ -378,6 +399,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
378399 }
379400
380401 level .Info (logger ).Log ("msg" , "converting block" , "block" , b .ULID .String (), "dir" , bdir )
402+ start := time .Now ()
381403
382404 converterOpts := append (c .baseConverterOptions , convert .WithName (b .ULID .String ()))
383405
@@ -397,14 +419,18 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
397419 _ = tsdbBlock .Close ()
398420
399421 if err != nil {
400- level .Error (logger ).Log ("msg" , "Error converting block" , "err" , err )
422+ level .Error (logger ).Log ("msg" , "Error converting block" , "block" , b . ULID . String (), " err" , err )
401423 continue
402424 }
425+ duration := time .Since (start )
426+ c .metrics .convertBlockDuration .WithLabelValues (userID ).Set (duration .Seconds ())
427+ level .Info (logger ).Log ("msg" , "successfully converted block" , "block" , b .ULID .String (), "duration" , duration )
403428
404- err = cortex_parquet .WriteConverterMark (ctx , b .ULID , uBucket )
405- if err != nil {
406- level . Error ( logger ). Log ( "msg" , "Error writing block" , "err" , err )
429+ if err = cortex_parquet .WriteConverterMark (ctx , b .ULID , uBucket ); err != nil {
430+ level . Error ( logger ). Log ( "msg" , "Error writing block" , "block" , b . ULID . String (), "err" , err )
431+ continue
407432 }
433+ c .metrics .convertedBlocks .WithLabelValues (userID ).Inc ()
408434 }
409435
410436 return nil
@@ -442,6 +468,12 @@ func (c *Converter) ownBlock(ring ring.ReadRing, blockId string) (bool, error) {
442468 return rs .Instances [0 ].Addr == c .ringLifecycler .Addr , nil
443469}
444470
471+ func (c * Converter ) cleanupMetricsForNotOwnedUser (userID string ) {
472+ if _ , ok := c .lastOwnedUsers [userID ]; ok {
473+ c .metrics .deleteMetricsForTenant (userID )
474+ }
475+ }
476+
445477func (c * Converter ) compactRootDir () string {
446478 return filepath .Join (c .cfg .DataDir , "compact" )
447479}
0 commit comments