|
42 | 42 | GROUP_CHUNK_SIZE = 100 |
43 | 43 | EVENT_CHUNK_SIZE = 10000 |
44 | 44 | GROUP_HASH_ITERATIONS = 10000 |
| 45 | +GROUP_HASH_METADATA_ITERATIONS = 10000 |
45 | 46 |
|
46 | 47 | # Group models that relate only to groups and not to events. These models are |
47 | 48 | # transferred during reprocessing operations because they represent group-level |
@@ -267,83 +268,46 @@ def update_group_hash_metadata_in_batches(hash_ids: Sequence[int]) -> None: |
267 | 268 |
|
268 | 269 | This function performs the update in smaller batches to reduce lock |
269 | 270 | contention and prevent statement timeouts when many rows need updating. |
270 | | - Uses cursor-based pagination with the primary key to avoid loading all |
271 | | - IDs into memory and to avoid growing NOT IN clauses. |
| 271 | + Includes a maximum iteration limit as a safeguard against potential |
| 272 | + infinite loops. |
272 | 273 | """ |
273 | | - option_batch_size = options.get("deletions.group-hash-metadata.batch-size", 1000) |
| 274 | + option_batch_size = options.get("deletions.group-hash-metadata.batch-size") |
274 | 275 | batch_size = max(1, option_batch_size) |
275 | 276 |
|
276 | | - # Use cursor-based pagination with the primary key to efficiently |
277 | | - # process large datasets without loading all IDs into memory or |
278 | | - # creating large NOT IN clauses. We fetch IDs without ORDER BY to avoid |
279 | | - # database sorting overhead, then sort the small batch in Python. |
280 | | - last_max_id = 0 |
281 | | - while True: |
| 277 | + # Process rows in batches with a maximum iteration limit to prevent |
| 278 | + # infinite loops while still allowing processing of large datasets. |
| 279 | + updated_rows = 0 |
| 280 | + iteration_count = 0 |
| 281 | + while iteration_count < GROUP_HASH_METADATA_ITERATIONS: |
| 282 | + iteration_count += 1 |
282 | 283 | # Note: hash_ids is bounded to ~100 items (deletions.group-hashes-batch-size) |
283 | 284 | # from the caller, so this IN clause is intentionally not batched |
284 | 285 | batch_metadata_ids = list( |
285 | | - GroupHashMetadata.objects.filter( |
286 | | - seer_matched_grouphash_id__in=hash_ids, id__gt=last_max_id |
287 | | - ).values_list("id", flat=True)[:batch_size] |
| 286 | + GroupHashMetadata.objects.filter(seer_matched_grouphash_id__in=hash_ids).values_list( |
| 287 | + "id", flat=True |
| 288 | + )[:batch_size] |
288 | 289 | ) |
289 | 290 | if not batch_metadata_ids: |
290 | 291 | break |
291 | 292 |
|
292 | | - # Sort in Python to ensure we process lowest IDs first and can safely |
293 | | - # advance the cursor. Sorting a small batch (e.g., 1000 items) in Python |
294 | | - # is trivial and avoids database ORDER BY overhead. |
295 | | - batch_metadata_ids.sort() |
296 | | - |
297 | 293 | updated = GroupHashMetadata.objects.filter(id__in=batch_metadata_ids).update( |
298 | 294 | seer_matched_grouphash=None |
299 | 295 | ) |
| 296 | + updated_rows += updated |
300 | 297 | metrics.incr("deletions.group_hash_metadata.rows_updated", amount=updated, sample_rate=1.0) |
| 298 | + # It could be possible we could be trying to update the same rows again and again, |
| 299 | + # thus, let's break the loop. |
| 300 | + if updated == 0: |
| 301 | + break |
301 | 302 |
|
302 | | - last_max_id = batch_metadata_ids[-1] # Last element after sorting |
303 | | - |
304 | | - |
305 | | -def update_group_hash_metadata_in_batches_old(hash_ids: Sequence[int]) -> int: |
306 | | - """ |
307 | | - Update seer_matched_grouphash to None for GroupHashMetadata rows |
308 | | - that reference the given hash_ids, in batches to avoid timeouts. |
309 | | -
|
310 | | - This function performs the update in smaller batches to reduce lock |
311 | | - contention and prevent statement timeouts when many rows need updating. |
312 | | -
|
313 | | - Returns the total number of rows updated. |
314 | | - """ |
315 | | - # First, get all the IDs that need updating |
316 | | - metadata_ids = list( |
317 | | - GroupHashMetadata.objects.filter(seer_matched_grouphash_id__in=hash_ids).values_list( |
318 | | - "id", flat=True |
| 303 | + # We will try again these hash_ids on the next run of the cleanup script. |
| 304 | + # This is a safeguard to prevent infinite loops. |
| 305 | + if iteration_count >= GROUP_HASH_METADATA_ITERATIONS: |
| 306 | + logger.warning( |
| 307 | + "update_group_hash_metadata_in_batches.max_iterations_reached", |
| 308 | + extra={"updated_rows": updated_rows}, |
319 | 309 | ) |
320 | | - ) |
321 | | - |
322 | | - if not metadata_ids: |
323 | | - return 0 |
324 | | - |
325 | | - option_batch_size = options.get("deletions.group-hash-metadata.batch-size", 1000) |
326 | | - batch_size = max(1, option_batch_size) |
327 | | - total_updated = 0 |
328 | | - for i in range(0, len(metadata_ids), batch_size): |
329 | | - batch = metadata_ids[i : i + batch_size] |
330 | | - updated = GroupHashMetadata.objects.filter(id__in=batch).update(seer_matched_grouphash=None) |
331 | | - total_updated += updated |
332 | | - |
333 | | - metrics.incr( |
334 | | - "deletions.group_hash_metadata.rows_updated", |
335 | | - amount=total_updated, |
336 | | - sample_rate=1.0, |
337 | | - ) |
338 | | - logger.info( |
339 | | - "update_group_hash_metadata_in_batches.complete", |
340 | | - extra={ |
341 | | - "hash_ids_count": len(hash_ids), |
342 | | - "total_updated": total_updated, |
343 | | - }, |
344 | | - ) |
345 | | - |
346 | | - return total_updated |
| 310 | + metrics.incr("deletions.group_hash_metadata.max_iterations_reached", sample_rate=1.0) |
347 | 311 |
|
348 | 312 |
|
349 | 313 | def delete_group_hashes( |
@@ -381,10 +345,7 @@ def delete_group_hashes( |
381 | 345 | # 2. Delete the GroupHashMetadata rows entirely (they'll be deleted anyway) |
382 | 346 | # If we update the columns first, the deletion of the grouphash metadata rows will have less work to do, |
383 | 347 | # thus, improving the performance of the deletion. |
384 | | - if options.get("deletions.group-hash-metadata.use-old-update-method"): |
385 | | - update_group_hash_metadata_in_batches_old(hash_ids) |
386 | | - else: |
387 | | - update_group_hash_metadata_in_batches(hash_ids) |
| 348 | + update_group_hash_metadata_in_batches(hash_ids) |
388 | 349 | GroupHashMetadata.objects.filter(grouphash_id__in=hash_ids).delete() |
389 | 350 | GroupHash.objects.filter(id__in=hash_ids).delete() |
390 | 351 |
|
|
0 commit comments