@@ -160,6 +160,30 @@ var (
160160 }
161161 return compactor , plannerFactory , nil
162162 }
163+
164+ DefaultBlockDeletableCheckerFactory = func (_ context.Context , _ objstore.InstrumentedBucket , _ log.Logger ) compact.BlockDeletableChecker {
165+ return compact.DefaultBlockDeletableChecker {}
166+ }
167+
168+ PartitionCompactionBlockDeletableCheckerFactory = func (ctx context.Context , bkt objstore.InstrumentedBucket , logger log.Logger ) compact.BlockDeletableChecker {
169+ return NewPartitionCompactionBlockDeletableChecker ()
170+ }
171+
172+ DefaultCompactionLifecycleCallbackFactory = func (_ context.Context , _ objstore.InstrumentedBucket , _ log.Logger , _ int , _ string , _ string , _ * compactorMetrics ) compact.CompactionLifecycleCallback {
173+ return compact.DefaultCompactionLifecycleCallback {}
174+ }
175+
176+ ShardedCompactionLifecycleCallbackFactory = func (ctx context.Context , userBucket objstore.InstrumentedBucket , logger log.Logger , metaSyncConcurrency int , compactDir string , userID string , compactorMetrics * compactorMetrics ) compact.CompactionLifecycleCallback {
177+ return NewShardedCompactionLifecycleCallback (
178+ ctx ,
179+ userBucket ,
180+ logger ,
181+ metaSyncConcurrency ,
182+ compactDir ,
183+ userID ,
184+ compactorMetrics ,
185+ )
186+ }
163187)
164188
165189// BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
@@ -202,6 +226,22 @@ type PlannerFactory func(
202226 compactorMetrics * compactorMetrics ,
203227) compact.Planner
204228
229+ type CompactionLifecycleCallbackFactory func (
230+ ctx context.Context ,
231+ userBucket objstore.InstrumentedBucket ,
232+ logger log.Logger ,
233+ metaSyncConcurrency int ,
234+ compactDir string ,
235+ userID string ,
236+ compactorMetrics * compactorMetrics ,
237+ ) compact.CompactionLifecycleCallback
238+
239+ type BlockDeletableCheckerFactory func (
240+ ctx context.Context ,
241+ bkt objstore.InstrumentedBucket ,
242+ logger log.Logger ,
243+ ) compact.BlockDeletableChecker
244+
205245// Limits defines limits used by the Compactor.
206246type Limits interface {
207247 CompactorTenantShardSize (userID string ) int
@@ -380,6 +420,10 @@ type Compactor struct {
380420
381421 blocksPlannerFactory PlannerFactory
382422
423+ blockDeletableCheckerFactory BlockDeletableCheckerFactory
424+
425+ compactionLifecycleCallbackFactory CompactionLifecycleCallbackFactory
426+
383427 // Client used to run operations on the bucket storing blocks.
384428 bucketClient objstore.InstrumentedBucket
385429
@@ -436,11 +480,25 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
436480 }
437481 }
438482
483+ var blockDeletableCheckerFactory BlockDeletableCheckerFactory
484+ if compactorCfg .ShardingStrategy == util .ShardingStrategyShuffle && compactorCfg .CompactionStrategy == util .CompactionStrategyPartitioning {
485+ blockDeletableCheckerFactory = PartitionCompactionBlockDeletableCheckerFactory
486+ } else {
487+ blockDeletableCheckerFactory = DefaultBlockDeletableCheckerFactory
488+ }
489+
490+ var compactionLifecycleCallbackFactory CompactionLifecycleCallbackFactory
491+ if compactorCfg .ShardingStrategy == util .ShardingStrategyShuffle && compactorCfg .CompactionStrategy == util .CompactionStrategyPartitioning {
492+ compactionLifecycleCallbackFactory = ShardedCompactionLifecycleCallbackFactory
493+ } else {
494+ compactionLifecycleCallbackFactory = DefaultCompactionLifecycleCallbackFactory
495+ }
496+
439497 if ingestionReplicationFactor <= 0 {
440498 ingestionReplicationFactor = 1
441499 }
442500
443- cortexCompactor , err := newCompactor (compactorCfg , storageCfg , logger , registerer , bucketClientFactory , blocksGrouperFactory , blocksCompactorFactory , limits , ingestionReplicationFactor )
501+ cortexCompactor , err := newCompactor (compactorCfg , storageCfg , logger , registerer , bucketClientFactory , blocksGrouperFactory , blocksCompactorFactory , blockDeletableCheckerFactory , compactionLifecycleCallbackFactory , limits , ingestionReplicationFactor )
444502 if err != nil {
445503 return nil , errors .Wrap (err , "failed to create Cortex blocks compactor" )
446504 }
@@ -456,6 +514,8 @@ func newCompactor(
456514 bucketClientFactory func (ctx context.Context ) (objstore.InstrumentedBucket , error ),
457515 blocksGrouperFactory BlocksGrouperFactory ,
458516 blocksCompactorFactory BlocksCompactorFactory ,
517+ blockDeletableCheckerFactory BlockDeletableCheckerFactory ,
518+ compactionLifecycleCallbackFactory CompactionLifecycleCallbackFactory ,
459519 limits * validation.Overrides ,
460520 ingestionReplicationFactor int ,
461521) (* Compactor , error ) {
@@ -466,15 +526,17 @@ func newCompactor(
466526 compactorMetrics = newDefaultCompactorMetrics (registerer )
467527 }
468528 c := & Compactor {
469- compactorCfg : compactorCfg ,
470- storageCfg : storageCfg ,
471- parentLogger : logger ,
472- logger : log .With (logger , "component" , "compactor" ),
473- registerer : registerer ,
474- bucketClientFactory : bucketClientFactory ,
475- blocksGrouperFactory : blocksGrouperFactory ,
476- blocksCompactorFactory : blocksCompactorFactory ,
477- allowedTenants : util .NewAllowedTenants (compactorCfg .EnabledTenants , compactorCfg .DisabledTenants ),
529+ compactorCfg : compactorCfg ,
530+ storageCfg : storageCfg ,
531+ parentLogger : logger ,
532+ logger : log .With (logger , "component" , "compactor" ),
533+ registerer : registerer ,
534+ bucketClientFactory : bucketClientFactory ,
535+ blocksGrouperFactory : blocksGrouperFactory ,
536+ blocksCompactorFactory : blocksCompactorFactory ,
537+ blockDeletableCheckerFactory : blockDeletableCheckerFactory ,
538+ compactionLifecycleCallbackFactory : compactionLifecycleCallbackFactory ,
539+ allowedTenants : util .NewAllowedTenants (compactorCfg .EnabledTenants , compactorCfg .DisabledTenants ),
478540
479541 CompactorStartDurationSeconds : promauto .With (registerer ).NewGauge (prometheus.GaugeOpts {
480542 Name : "cortex_compactor_start_duration_seconds" ,
@@ -662,12 +724,6 @@ func (c *Compactor) starting(ctx context.Context) error {
662724 }, c .bucketClient , c .usersScanner , c .compactorCfg .CompactionVisitMarkerTimeout , c .limits , c .parentLogger , cleanerRingLifecyclerID , c .registerer , c .compactorCfg .CleanerVisitMarkerTimeout , c .compactorCfg .CleanerVisitMarkerFileUpdateInterval ,
663725 c .compactorMetrics .syncerBlocksMarkedForDeletion , c .compactorMetrics .remainingPlannedCompactions )
664726
665- // Ensure an initial cleanup occurred before starting the compactor.
666- if err := services .StartAndAwaitRunning (ctx , c .blocksCleaner ); err != nil {
667- c .ringSubservices .StopAsync ()
668- return errors .Wrap (err , "failed to start the blocks cleaner" )
669- }
670-
671727 if c .compactorCfg .CachingBucketEnabled {
672728 matchers := cortex_tsdb .NewMatchers ()
673729 // Do not cache tenant deletion marker and block deletion marker for compactor
@@ -698,15 +754,30 @@ func (c *Compactor) stopping(_ error) error {
698754}
699755
700756func (c * Compactor ) running (ctx context.Context ) error {
757+ // Ensure an initial cleanup occurred as first thing when running compactor.
758+ if err := services .StartAndAwaitRunning (ctx , c .blocksCleaner ); err != nil {
759+ c .ringSubservices .StopAsync ()
760+ return errors .Wrap (err , "failed to start the blocks cleaner" )
761+ }
762+
701763 // Run an initial compaction before starting the interval.
764+ // Insert jitter right before compaction starts to avoid multiple starting compactor to be in sync
765+ select {
766+ case <- ctx .Done ():
767+ return ctx .Err ()
768+ case <- time .After (time .Duration (rand .Int63n (int64 (float64 (c .compactorCfg .CompactionInterval ) * 0.1 )))):
769+ }
702770 c .compactUsers (ctx )
703771
704- ticker := time .NewTicker (util . DurationWithJitter ( c .compactorCfg .CompactionInterval , 0.05 ) )
772+ ticker := time .NewTicker (c .compactorCfg .CompactionInterval )
705773 defer ticker .Stop ()
706774
707775 for {
708776 select {
709777 case <- ticker .C :
778+ // Insert jitter right before compaction starts, so that there will always
779+ // have jitter even compaction time is longer than CompactionInterval
780+ time .Sleep (time .Duration (rand .Int63n (int64 (float64 (c .compactorCfg .CompactionInterval ) * 0.1 ))))
710781 c .compactUsers (ctx )
711782 case <- ctx .Done ():
712783 return nil
@@ -717,23 +788,19 @@ func (c *Compactor) running(ctx context.Context) error {
717788}
718789
719790func (c * Compactor ) compactUsers (ctx context.Context ) {
720- failed := false
791+ succeeded := false
721792 interrupted := false
793+ compactionErrorCount := 0
722794
723795 c .CompactionRunsStarted .Inc ()
724796
725797 defer func () {
726- // interruptions and successful runs are considered
727- // mutually exclusive but we consider a run failed if any
728- // tenant runs failed even if later runs are interrupted
729- if ! interrupted && ! failed {
798+ if succeeded && compactionErrorCount == 0 {
730799 c .CompactionRunsCompleted .Inc ()
731800 c .CompactionRunsLastSuccess .SetToCurrentTime ()
732- }
733- if interrupted {
801+ } else if interrupted {
734802 c .CompactionRunsInterrupted .Inc ()
735- }
736- if failed {
803+ } else {
737804 c .CompactionRunsFailed .Inc ()
738805 }
739806
@@ -747,7 +814,6 @@ func (c *Compactor) compactUsers(ctx context.Context) {
747814 level .Info (c .logger ).Log ("msg" , "discovering users from bucket" )
748815 users , err := c .discoverUsersWithRetries (ctx )
749816 if err != nil {
750- failed = true
751817 level .Error (c .logger ).Log ("msg" , "failed to discover users from bucket" , "err" , err )
752818 return
753819 }
@@ -816,7 +882,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
816882 }
817883
818884 c .CompactionRunFailedTenants .Inc ()
819- failed = true
885+ compactionErrorCount ++
820886 level .Error (c .logger ).Log ("msg" , "failed to compact user blocks" , "user" , userID , "err" , err )
821887 continue
822888 }
@@ -851,6 +917,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
851917 }
852918 }
853919 }
920+ succeeded = true
854921}
855922
856923func (c * Compactor ) compactUserWithRetries (ctx context.Context , userID string ) error {
@@ -885,6 +952,11 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
885952 retries .Wait ()
886953 }
887954
955+ err := errors .Unwrap (errors .Cause (lastErr ))
956+ if errors .Is (err , plannerCompletedPartitionError ) || errors .Is (err , plannerVisitedPartitionError ) {
957+ return nil
958+ }
959+
888960 return lastErr
889961}
890962
@@ -898,7 +970,12 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
898970
899971 // Filters out duplicate blocks that can be formed from two or more overlapping
900972 // blocks that fully submatches the source blocks of the older blocks.
901- deduplicateBlocksFilter := block .NewDeduplicateFilter (c .compactorCfg .BlockSyncConcurrency )
973+ var deduplicateBlocksFilter CortexMetadataFilter
974+ if c .compactorCfg .ShardingStrategy == util .ShardingStrategyShuffle && c .compactorCfg .CompactionStrategy == util .CompactionStrategyPartitioning {
975+ deduplicateBlocksFilter = & disabledDeduplicateFilter {}
976+ } else {
977+ deduplicateBlocksFilter = block .NewDeduplicateFilter (c .compactorCfg .BlockSyncConcurrency )
978+ }
902979
903980 // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
904981 // No delay is used -- all blocks with deletion marker are ignored, and not considered for compaction.
@@ -966,12 +1043,14 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
9661043
9671044 currentCtx , cancel := context .WithCancel (ctx )
9681045 defer cancel ()
969- compactor , err := compact .NewBucketCompactor (
1046+ compactor , err := compact .NewBucketCompactorWithCheckerAndCallback (
9701047 ulogger ,
9711048 syncer ,
9721049 c .blocksGrouperFactory (currentCtx , c .compactorCfg , bucket , ulogger , c .BlocksMarkedForNoCompaction , c .blockVisitMarkerReadFailed , c .blockVisitMarkerWriteFailed , syncerMetrics , c .compactorMetrics , c .ring , c .ringLifecycler , c .limits , userID , noCompactMarkerFilter , c .ingestionReplicationFactor ),
9731050 c .blocksPlannerFactory (currentCtx , bucket , ulogger , c .compactorCfg , noCompactMarkerFilter , c .ringLifecycler , userID , c .blockVisitMarkerReadFailed , c .blockVisitMarkerWriteFailed , c .compactorMetrics ),
9741051 c .blocksCompactor ,
1052+ c .blockDeletableCheckerFactory (currentCtx , bucket , ulogger ),
1053+ c .compactionLifecycleCallbackFactory (currentCtx , bucket , ulogger , c .compactorCfg .MetaSyncConcurrency , c .compactDirForUser (userID ), userID , c .compactorMetrics ),
9751054 c .compactDirForUser (userID ),
9761055 bucket ,
9771056 c .compactorCfg .CompactionConcurrency ,
@@ -982,6 +1061,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
9821061 }
9831062
9841063 if err := compactor .Compact (ctx ); err != nil {
1064+ level .Warn (ulogger ).Log ("msg" , "compaction failed with error" , "err" , err )
9851065 return errors .Wrap (err , "compaction" )
9861066 }
9871067
@@ -1148,3 +1228,24 @@ func (c *Compactor) isPermissionDeniedErr(err error) bool {
11481228 }
11491229 return s .Code () == codes .PermissionDenied
11501230}
1231+
1232+ type CortexMetadataFilter interface {
1233+ block.DeduplicateFilter
1234+ block.MetadataFilter
1235+ }
1236+
1237+ // disabledDeduplicateFilter is only used by Partitioning Compaction. Because Partitioning Compaction
1238+ // would always generate multiple result blocks (different partitions) for the same time range compaction.
1239+ // Those result blocks would always have same source blocks. Those result blocks should not be marked
1240+ // as duplicates when grouping for the next level of compaction. So DeduplicateFilter is disabled.
1241+ type disabledDeduplicateFilter struct {
1242+ }
1243+
1244+ func (f * disabledDeduplicateFilter ) Filter (ctx context.Context , metas map [ulid.ULID ]* metadata.Meta , synced block.GaugeVec , modified block.GaugeVec ) error {
1245+ // don't do any deduplicate filtering
1246+ return nil
1247+ }
1248+
1249+ func (f * disabledDeduplicateFilter ) DuplicateIDs () []ulid.ULID {
1250+ return nil
1251+ }
0 commit comments