@@ -177,6 +177,77 @@ func TestMatcherCache(t *testing.T) {
177177 ` , callPerMatcher * numberOfDifferentMatchers - numberOfDifferentMatchers , cfg .MatchersCacheMaxItems , callPerMatcher * numberOfDifferentMatchers )), "ingester_matchers_cache_requests_total" , "ingester_matchers_cache_hits_total" , "ingester_matchers_cache_items" , "ingester_matchers_cache_max_items" , "ingester_matchers_cache_evicted_total" ))
178178}
179179
180+ func TestIngesterDeletionRace (t * testing.T ) {
181+ registry := prometheus .NewRegistry ()
182+ limits := defaultLimitsTestConfig ()
183+ tenantLimits := newMockTenantLimits (map [string ]* validation.Limits {userID : & limits })
184+ cfg := defaultIngesterTestConfig (t )
185+ cfg .BlocksStorageConfig .TSDB .PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig {
186+ Head : cortex_tsdb.PostingsCacheConfig {
187+ Enabled : true ,
188+ Ttl : time .Hour ,
189+ MaxBytes : 1024 * 1024 * 1024 ,
190+ },
191+ Blocks : cortex_tsdb.PostingsCacheConfig {
192+ Enabled : true ,
193+ Ttl : time .Hour ,
194+ MaxBytes : 1024 * 1024 * 1024 ,
195+ },
196+ }
197+
198+ dir := t .TempDir ()
199+ chunksDir := filepath .Join (dir , "chunks" )
200+ blocksDir := filepath .Join (dir , "blocks" )
201+ require .NoError (t , os .Mkdir (chunksDir , os .ModePerm ))
202+ require .NoError (t , os .Mkdir (blocksDir , os .ModePerm ))
203+
204+ ing , err := prepareIngesterWithBlocksStorageAndLimits (t , cfg , limits , tenantLimits , blocksDir , registry , false )
205+ require .NoError (t , err )
206+ require .NoError (t , services .StartAndAwaitRunning (context .Background (), ing ))
207+ defer services .StopAndAwaitTerminated (context .Background (), ing ) //nolint:errcheck
208+ // Wait until it's ACTIVE
209+ test .Poll (t , time .Second , ring .ACTIVE , func () interface {} {
210+ return ing .lifecycler .GetState ()
211+ })
212+
213+ numberOfTenants := 50
214+ wg := sync.WaitGroup {}
215+ wg .Add (numberOfTenants )
216+
217+ for i := 0 ; i < numberOfTenants ; i ++ {
218+ go func () {
219+ defer wg .Done ()
220+ u := fmt .Sprintf ("userId_%v" , i )
221+ ctx := user .InjectOrgID (context .Background (), u )
222+ samples := []cortexpb.Sample {{Value : 2 , TimestampMs : 10 }}
223+ _ , err := ing .Push (ctx , cortexpb .ToWriteRequest ([]labels.Labels {labels .FromStrings (labels .MetricName , "name" )}, samples , nil , nil , cortexpb .API ))
224+ require .NoError (t , err )
225+ ing .getTSDB (u ).postingCache = & wrappedExpandedPostingsCache {ExpandedPostingsCache : ing .getTSDB (u ).postingCache , purgeDelay : 10 * time .Millisecond }
226+ ing .getTSDB (u ).deletionMarkFound .Store (true ) // lets force close the tenant
227+ }()
228+ }
229+
230+ wg .Wait ()
231+
232+ ctx , c := context .WithCancel (context .Background ())
233+ defer c ()
234+
235+ wg .Add (1 )
236+ go func () {
237+ wg .Done ()
238+ ing .expirePostingsCache (ctx ) //nolint:errcheck
239+ }()
240+
241+ go func () {
242+ wg .Wait () // make sure we clean after we started the purge go routine
243+ ing .closeAndDeleteIdleUserTSDBs (ctx ) //nolint:errcheck
244+ }()
245+
246+ test .Poll (t , 5 * time .Second , 0 , func () interface {} {
247+ return len (ing .getTSDBUsers ())
248+ })
249+ }
250+
180251func TestIngesterPerLabelsetLimitExceeded (t * testing.T ) {
181252 limits := defaultLimitsTestConfig ()
182253 userID := "1"
@@ -3528,6 +3599,17 @@ func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context {
35283599 return m .ctx
35293600}
35303601
3602+ type wrappedExpandedPostingsCache struct {
3603+ cortex_tsdb.ExpandedPostingsCache
3604+
3605+ purgeDelay time.Duration
3606+ }
3607+
3608+ func (w * wrappedExpandedPostingsCache ) PurgeExpiredItems () {
3609+ time .Sleep (w .purgeDelay )
3610+ w .ExpandedPostingsCache .PurgeExpiredItems ()
3611+ }
3612+
35313613type mockQueryStreamServer struct {
35323614 grpc.ServerStream
35333615 ctx context.Context
0 commit comments