Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/release-notes/release-notes-0.20.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
* Fix a bug where [repeated network
addresses](https://github.com/lightningnetwork/lnd/pull/10341) were added to
the node announcement and `getinfo` output.

* [Fix a startup issue in LND when encountering a
deserialization issue](https://github.com/lightningnetwork/lnd/pull/10383)
in the mission control store. Now we skip over potential errors and also
delete them from the store.

# New Features

Expand Down
74 changes: 72 additions & 2 deletions routing/missioncontrol_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,100 @@ func (b *missionControlStore) clear() error {
}

// fetchAll returns all results currently stored in the database.
// It also removes any corrupted entries that fail to deserialize from both
// the database and the in-memory tracking structures.
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
var results []*paymentResult
var corruptedKeys [][]byte

// Read all results and identify corrupted entries.
err := b.db.view(func(resultBucket kvdb.RBucket) error {
results = make([]*paymentResult, 0)
corruptedKeys = make([][]byte, 0)

return resultBucket.ForEach(func(k, v []byte) error {
err := resultBucket.ForEach(func(k, v []byte) error {
result, err := deserializeResult(k, v)

// In case of an error, track the key for removal.
if err != nil {
return err
log.Warnf("Failed to deserialize mission "+
"control entry (key=%x): %v", k, err)

// Make a copy of the key since ForEach reuses
// the slice.
keyCopy := make([]byte, len(k))
copy(keyCopy, k)
corruptedKeys = append(corruptedKeys, keyCopy)

return nil
}

results = append(results, result)

return nil
})
if err != nil {
return err
}

return nil
}, func() {
results = nil
corruptedKeys = nil
})
if err != nil {
return nil, err
}

// Delete corrupted entries from the database which were identified
// when loading the results from the database.
//
// TODO: This code part should eventually be removed once we move the
// mission control store to a native sql database and have to do a
// full migration of the data.
if len(corruptedKeys) > 0 {
err = b.db.update(func(resultBucket kvdb.RwBucket) error {
for _, key := range corruptedKeys {
if err := resultBucket.Delete(key); err != nil {
return fmt.Errorf("failed to delete "+
"corrupted entry: %w", err)
}
}

return nil
}, func() {})
if err != nil {
return nil, err
}

// Build a set of corrupted keys.
corruptedSet := make(map[string]struct{}, len(corruptedKeys))
for _, key := range corruptedKeys {
corruptedSet[string(key)] = struct{}{}
}

// Remove corrupted keys from in-memory map.
for keyStr := range corruptedSet {
delete(b.keysMap, keyStr)
}

// Remove from the keys list in a single pass.
for e := b.keys.Front(); e != nil; {
next := e.Next()
keyVal, ok := e.Value.(string)
if ok {
_, isCorrupted := corruptedSet[keyVal]
if isCorrupted {
b.keys.Remove(e)
}
}
e = next
}

log.Infof("Removed %d corrupted mission control entries",
len(corruptedKeys))
}

return results, nil
}

Expand Down
105 changes: 105 additions & 0 deletions routing/missioncontrol_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,108 @@ func BenchmarkMissionControlStoreFlushing(b *testing.B) {
})
}
}

// TestMissionControlStoreDeletesCorruptedEntries tests that fetchAll() skips
// entries that fail to deserialize, deletes them from the database, and
// removes them from the in-memory tracking structures.
func TestMissionControlStoreDeletesCorruptedEntries(t *testing.T) {
h := newMCStoreTestHarness(t, testMaxRecords, time.Second)
store := h.store

failureSourceIdx := 1

// Create two valid results.
result1 := newPaymentResult(
1, mcStoreTestRoute, testTime, testTime,
fn.Some(newPaymentFailure(
&failureSourceIdx,
lnwire.NewFailIncorrectDetails(100, 1000),
)),
)

result2 := newPaymentResult(
2, mcStoreTestRoute, testTime.Add(time.Hour),
testTime.Add(time.Hour),
fn.Some(newPaymentFailure(
&failureSourceIdx,
lnwire.NewFailIncorrectDetails(100, 1000),
)),
)

// Store both results.
store.AddResult(result1)
store.AddResult(result2)
require.NoError(t, store.storeResults())

// Insert a corrupted entry into the database.
var corruptedKey [8 + 8 + 33]byte
byteOrder.PutUint64(corruptedKey[:], uint64(testTime.Add(
30*time.Minute).UnixNano()),
)
byteOrder.PutUint64(corruptedKey[8:], 99) // Unique ID.
copy(corruptedKey[16:], result1.route.Val.sourcePubKey.Val[:])

err := store.db.update(func(bucket kvdb.RwBucket) error {
// Insert corrupted/invalid TLV data that will fail to
// deserialize.
corruptedValue := []byte{0xFF, 0xFF, 0xFF, 0xFF}

return bucket.Put(corruptedKey[:], corruptedValue)
}, func() {})
require.NoError(t, err)

// Add the corrupted key to in-memory tracking to simulate it being
// loaded at startup (newMissionControlStore populates keysMap from
// all DB keys).
corruptedKeyStr := string(corruptedKey[:])
store.keysMap[corruptedKeyStr] = struct{}{}
store.keys.PushBack(corruptedKeyStr)

// Verify the corrupted key is in the in-memory tracking.
_, exists := store.keysMap[corruptedKeyStr]
require.True(t, exists, "corrupted key should be in keysMap")

// Verify we have 3 entries in the database before fetchAll.
var dbEntryCountBefore int
err = store.db.view(func(bucket kvdb.RBucket) error {
return bucket.ForEach(func(k, v []byte) error {
dbEntryCountBefore++
return nil
})
}, func() {
dbEntryCountBefore = 0
})
require.NoError(t, err)
require.Equal(t, 3, dbEntryCountBefore, "should have 3 entries "+
"in the database before cleanup")

// Now fetch all results. The corrupted entry should be skipped,
// deleted from the DB, and removed from in-memory tracking.
results, err := store.fetchAll()
require.NoError(t, err, "fetchAll should not return an error "+
"even when encountering corrupted entries")
require.Len(t, results, 2, "should skip the corrupted entry and "+
"return only valid results")

// Verify we still have the correct results.
require.Equal(t, result1, results[0])
require.Equal(t, result2, results[1])

// Verify the corrupted entry was removed from in-memory tracking.
_, exists = store.keysMap[corruptedKeyStr]
require.False(t, exists, "corrupted key should not exist in keysMap")

// Verify the corrupted entry was deleted from the database.
var dbEntryCountAfter int
err = store.db.view(func(bucket kvdb.RBucket) error {
return bucket.ForEach(func(k, v []byte) error {
dbEntryCountAfter++
return nil
})
}, func() {
dbEntryCountAfter = 0
})
require.NoError(t, err)
require.Equal(t, 2, dbEntryCountAfter, "corrupted entry should be "+
"deleted from the database")
}
Loading