Skip to content

Commit 606b4b8

Browse files
committed
backfill: split merge memory accounting mutex acquisitions
Previously, constructMergeBatch() would get the mutex on memory accounting once per batch to minimize the number of atomics needed to perform memory accounting. With the addition of vector indexing, we now need to perform KV operations in order to construct batches, which can lead to deadlocks between KV and the memory accounting mutex. This patch splits the mutex acquisition into two acquisitions, releasing the mutex in between. Memory used is accumulated as merged entries are built and then accounting is notified for the entire batch. This has the effect of making the accounting lag behind allocation a bit, but that seems preferable than paying the cost of a mutex acquisition for every row merged. Fixes: #155190 Release note (bug fix): A potential deadlock during vector index creation has been corrected.
1 parent 481e179 commit 606b4b8

File tree

1 file changed

+32
-20
lines changed

1 file changed

+32
-20
lines changed

pkg/sql/backfill/mvcc_index_merger.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -518,28 +518,32 @@ func (ibm *IndexBackfillMerger) constructMergeBatch(
518518
return nil, 0, 0, 0, err
519519
}
520520

521-
// We acquire the bound account lock for the entirety of the merge batch
522-
// construction so that we don't have to acquire the lock every time we want
523-
// to grow the account.
524521
var memUsedInMerge int64
525-
ibm.muBoundAccount.Lock()
526-
defer ibm.muBoundAccount.Unlock()
527-
for i := range rb.Results {
528-
// Since the source index is delete-preserving, reading the latest value for
529-
// a key that has existed in the past should always return a value.
530-
if rb.Results[i].Rows[0].Value == nil {
531-
return nil, 0, 0, 0, errors.AssertionFailedf("expected value to be present in temp index for key=%s", rb.Results[i].Rows[0].Key)
532-
}
533-
rowMem := int64(len(rb.Results[i].Rows[0].Key)) + int64(len(rb.Results[i].Rows[0].Value.RawBytes))
534-
if err := ibm.muBoundAccount.boundAccount.Grow(ctx, rowMem); err != nil {
535-
return nil, 0, 0, 0, errors.Wrap(err, "failed to allocate space to read latest keys from temp index")
522+
err := func() error {
523+
ibm.muBoundAccount.Lock()
524+
defer ibm.muBoundAccount.Unlock()
525+
for i := range rb.Results {
526+
// Since the source index is delete-preserving, reading the latest value for
527+
// a key that has existed in the past should always return a value.
528+
if rb.Results[i].Rows[0].Value == nil {
529+
return errors.AssertionFailedf("expected value to be present in temp index for key=%s", rb.Results[i].Rows[0].Key)
530+
}
531+
rowMem := int64(len(rb.Results[i].Rows[0].Key)) + int64(len(rb.Results[i].Rows[0].Value.RawBytes))
532+
if err := ibm.muBoundAccount.boundAccount.Grow(ctx, rowMem); err != nil {
533+
return errors.Wrap(err, "failed to allocate space to read latest keys from temp index")
534+
}
535+
memUsedInMerge += rowMem
536536
}
537-
memUsedInMerge += rowMem
537+
return nil
538+
}()
539+
if err != nil {
540+
return nil, 0, 0, 0, err
538541
}
539542

540543
prefixLen := len(sourcePrefix)
541544
destKey := make([]byte, len(destPrefix))
542545
var deletedCount int
546+
var totalEntryBytes int64
543547
wb := txn.NewBatch()
544548
resultOffset := 0
545549
for sourceOffset := range sourceKeys {
@@ -571,18 +575,26 @@ func (ibm *IndexBackfillMerger) constructMergeBatch(
571575
continue
572576
}
573577

574-
entryBytes := mergedEntryBytes(mergedEntry, deleted)
575-
if err := ibm.muBoundAccount.boundAccount.Grow(ctx, entryBytes); err != nil {
576-
return nil, 0, 0, 0, errors.Wrap(err, "failed to allocate space to merge entry from temp index")
577-
}
578-
memUsedInMerge += entryBytes
578+
totalEntryBytes += mergedEntryBytes(mergedEntry, deleted)
579579
if deleted {
580580
deletedCount++
581581
wb.Del(mergedEntry.Key)
582582
} else {
583583
wb.Put(mergedEntry.Key, mergedEntry.Value)
584584
}
585585
}
586+
err = func() error {
587+
ibm.muBoundAccount.Lock()
588+
defer ibm.muBoundAccount.Unlock()
589+
if err := ibm.muBoundAccount.boundAccount.Grow(ctx, totalEntryBytes); err != nil {
590+
return errors.Wrap(err, "failed to allocate space to merge entry from temp index")
591+
}
592+
return nil
593+
}()
594+
if err != nil {
595+
return nil, 0, 0, 0, err
596+
}
597+
memUsedInMerge += totalEntryBytes
586598

587599
return wb, memUsedInMerge, deletedCount, keysToSkip, nil
588600
}

0 commit comments

Comments
 (0)