@@ -243,6 +243,15 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
243243 go func () {
244244 c .runDeleteUserCleanup (ctx , deleteChan )
245245 }()
246+ var metricsChan chan * cleanerJob
247+ if c .cfg .ShardingStrategy == util .ShardingStrategyShuffle &&
248+ c .cfg .CompactionStrategy == util .CompactionStrategyPartitioning {
249+ metricsChan = make (chan * cleanerJob )
250+ defer close (metricsChan )
251+ go func () {
252+ c .runEmitPartitionMetricsWorker (ctx , metricsChan )
253+ }()
254+ }
246255
247256 for {
248257 select {
@@ -276,6 +285,17 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
276285 c .enqueueJobFailed .WithLabelValues (deletedStatus ).Inc ()
277286 }
278287
288+ if metricsChan != nil {
289+ select {
290+ case metricsChan <- & cleanerJob {
291+ users : activeUsers ,
292+ timestamp : cleanJobTimestamp ,
293+ }:
294+ default :
295+ level .Warn (c .logger ).Log ("msg" , "unable to push metrics job to metricsChan" )
296+ }
297+ }
298+
279299 case <- ctx .Done ():
280300 return nil
281301 }
@@ -295,10 +315,25 @@ func (c *BlocksCleaner) checkRunError(runType string, err error) {
295315 }
296316}
297317
298- func (c * BlocksCleaner ) runActiveUserCleanup (ctx context.Context , jobChan chan * cleanerJob ) {
318+ func (c * BlocksCleaner ) runEmitPartitionMetricsWorker (ctx context.Context , jobChan <- chan * cleanerJob ) {
319+ for job := range jobChan {
320+ err := concurrency .ForEachUser (ctx , job .users , c .cfg .CleanupConcurrency , func (ctx context.Context , userID string ) error {
321+ userLogger := util_log .WithUserID (userID , c .logger )
322+ userBucket := bucket .NewUserBucketClient (userID , c .bucketClient , c .cfgProvider )
323+ c .emitUserParititionMetrics (ctx , userLogger , userBucket , userID )
324+ return nil
325+ })
326+
327+ if err != nil {
328+ level .Error (c .logger ).Log ("msg" , "emit metrics failed" , "err" , err .Error ())
329+ }
330+ }
331+ }
332+
333+ func (c * BlocksCleaner ) runActiveUserCleanup (ctx context.Context , jobChan <- chan * cleanerJob ) {
299334 for job := range jobChan {
300335 if job .timestamp < time .Now ().Add (- c .cfg .CleanupInterval ).Unix () {
301- level .Warn (c .logger ).Log ("Active user cleaner job too old. Ignoring to get recent data" )
336+ level .Warn (c .logger ).Log ("msg" , " Active user cleaner job too old. Ignoring to get recent data" )
302337 continue
303338 }
304339 err := c .cleanUpActiveUsers (ctx , job .users , false )
@@ -746,59 +781,14 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
746781}
747782
748783func (c * BlocksCleaner ) cleanPartitionedGroupInfo (ctx context.Context , userBucket objstore.InstrumentedBucket , userLogger log.Logger , userID string ) {
749- existentPartitionedGroupInfo := make (map [* PartitionedGroupInfo ]struct {
750- path string
751- status PartitionedGroupStatus
752- })
753- err := userBucket .Iter (ctx , PartitionedGroupDirectory , func (file string ) error {
754- if strings .Contains (file , PartitionVisitMarkerDirectory ) {
755- return nil
756- }
757- partitionedGroupInfo , err := ReadPartitionedGroupInfoFile (ctx , userBucket , userLogger , file )
758- if err != nil {
759- level .Warn (userLogger ).Log ("msg" , "failed to read partitioned group info" , "partitioned_group_info" , file )
760- return nil
761- }
762-
763- status := partitionedGroupInfo .getPartitionedGroupStatus (ctx , userBucket , c .compactionVisitMarkerTimeout , userLogger )
764- level .Debug (userLogger ).Log ("msg" , "got partitioned group status" , "partitioned_group_status" , status .String ())
765- existentPartitionedGroupInfo [partitionedGroupInfo ] = struct {
766- path string
767- status PartitionedGroupStatus
768- }{
769- path : file ,
770- status : status ,
771- }
772- return nil
773- })
774-
784+ existentPartitionedGroupInfo , err := c .iterPartitionGroups (ctx , userBucket , userLogger )
775785 if err != nil {
776786 level .Warn (userLogger ).Log ("msg" , "error return when going through partitioned group directory" , "err" , err )
777787 }
778788
779- remainingCompactions := 0
780- inProgressCompactions := 0
781- var oldestPartitionGroup * PartitionedGroupInfo
782- defer func () {
783- c .remainingPlannedCompactions .WithLabelValues (userID ).Set (float64 (remainingCompactions ))
784- c .inProgressCompactions .WithLabelValues (userID ).Set (float64 (inProgressCompactions ))
785- if c .oldestPartitionGroupOffset != nil {
786- if oldestPartitionGroup != nil {
787- c .oldestPartitionGroupOffset .WithLabelValues (userID ).Set (float64 (time .Now ().Unix () - oldestPartitionGroup .CreationTime ))
788- level .Debug (userLogger ).Log ("msg" , "partition group info with oldest creation time" , "partitioned_group_id" , oldestPartitionGroup .PartitionedGroupID , "creation_time" , oldestPartitionGroup .CreationTime )
789- } else {
790- c .oldestPartitionGroupOffset .WithLabelValues (userID ).Set (0 )
791- }
792- }
793- }()
794789 for partitionedGroupInfo , extraInfo := range existentPartitionedGroupInfo {
795790 partitionedGroupInfoFile := extraInfo .path
796791
797- remainingCompactions += extraInfo .status .PendingPartitions
798- inProgressCompactions += extraInfo .status .InProgressPartitions
799- if oldestPartitionGroup == nil || partitionedGroupInfo .CreationTime < oldestPartitionGroup .CreationTime {
800- oldestPartitionGroup = partitionedGroupInfo
801- }
802792 if extraInfo .status .CanDelete {
803793 if extraInfo .status .IsCompleted {
804794 // Try to remove all blocks included in partitioned group info
@@ -829,6 +819,67 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
829819 }
830820}
831821
822+ func (c * BlocksCleaner ) emitUserParititionMetrics (ctx context.Context , userLogger log.Logger , userBucket objstore.InstrumentedBucket , userID string ) {
823+ existentPartitionedGroupInfo , err := c .iterPartitionGroups (ctx , userBucket , userLogger )
824+ if err != nil {
825+ level .Warn (userLogger ).Log ("msg" , "error listing partitioned group directory to emit metrics" , "err" , err )
826+ return
827+ }
828+
829+ remainingCompactions := 0
830+ inProgressCompactions := 0
831+ var oldestPartitionGroup * PartitionedGroupInfo
832+ defer func () {
833+ c .remainingPlannedCompactions .WithLabelValues (userID ).Set (float64 (remainingCompactions ))
834+ c .inProgressCompactions .WithLabelValues (userID ).Set (float64 (inProgressCompactions ))
835+ if oldestPartitionGroup != nil {
836+ c .oldestPartitionGroupOffset .WithLabelValues (userID ).Set (float64 (time .Now ().Unix () - oldestPartitionGroup .CreationTime ))
837+ level .Debug (userLogger ).Log ("msg" , "partition group info with oldest creation time" , "partitioned_group_id" , oldestPartitionGroup .PartitionedGroupID , "creation_time" , oldestPartitionGroup .CreationTime )
838+ } else {
839+ c .oldestPartitionGroupOffset .WithLabelValues (userID ).Set (0 )
840+ }
841+ }()
842+ for partitionedGroupInfo , extraInfo := range existentPartitionedGroupInfo {
843+ remainingCompactions += extraInfo .status .PendingPartitions
844+ inProgressCompactions += extraInfo .status .InProgressPartitions
845+ if oldestPartitionGroup == nil || partitionedGroupInfo .CreationTime < oldestPartitionGroup .CreationTime {
846+ oldestPartitionGroup = partitionedGroupInfo
847+ }
848+ }
849+ }
850+
851+ func (c * BlocksCleaner ) iterPartitionGroups (ctx context.Context , userBucket objstore.InstrumentedBucket , userLogger log.Logger ) (map [* PartitionedGroupInfo ]struct {
852+ path string
853+ status PartitionedGroupStatus
854+ }, error ) {
855+ existentPartitionedGroupInfo := make (map [* PartitionedGroupInfo ]struct {
856+ path string
857+ status PartitionedGroupStatus
858+ })
859+ err := userBucket .Iter (ctx , PartitionedGroupDirectory , func (file string ) error {
860+ if strings .Contains (file , PartitionVisitMarkerDirectory ) {
861+ return nil
862+ }
863+ partitionedGroupInfo , err := ReadPartitionedGroupInfoFile (ctx , userBucket , userLogger , file )
864+ if err != nil {
865+ level .Warn (userLogger ).Log ("msg" , "failed to read partitioned group info" , "partitioned_group_info" , file )
866+ return nil
867+ }
868+
869+ status := partitionedGroupInfo .getPartitionedGroupStatus (ctx , userBucket , c .compactionVisitMarkerTimeout , userLogger )
870+ level .Debug (userLogger ).Log ("msg" , "got partitioned group status" , "partitioned_group_status" , status .String ())
871+ existentPartitionedGroupInfo [partitionedGroupInfo ] = struct {
872+ path string
873+ status PartitionedGroupStatus
874+ }{
875+ path : file ,
876+ status : status ,
877+ }
878+ return nil
879+ })
880+ return existentPartitionedGroupInfo , err
881+ }
882+
832883// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
833884// and index are updated accordingly.
834885func (c * BlocksCleaner ) cleanUserPartialBlocks (ctx context.Context , userID string , partials map [ulid.ULID ]error , idx * bucketindex.Index , userBucket objstore.InstrumentedBucket , userLogger log.Logger ) {
0 commit comments