Skip to content

Commit a333b1a

Browse files
committed
routing: allow misson control manager to startup despite errors
We now allow the mission control manager to skip over deserializable errors. We cannot repair this these results but we just skip over it so we can startup properly. When fetchAll() encounters entries that fail to deserialize, in addition to skipping them, now also: - Delete the corrupted entries from the database - Remove them from the in-memory keysMap and keys tracking structures This prevents corrupted entries from: - Being counted toward maxRecords, which would cause valid entries to be pruned prematurely - Persisting in the database indefinitely - Causing inaccurate entry counts in startup logs
1 parent 8c8662c commit a333b1a

File tree

2 files changed

+159
-3
lines changed

2 files changed

+159
-3
lines changed

routing/missioncontrol_store.go

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,30 +133,81 @@ func (b *missionControlStore) clear() error {
133133
}
134134

135135
// fetchAll returns all results currently stored in the database.
136+
// It also removes any corrupted entries that fail to deserialize from both
137+
// the database and the in-memory tracking structures.
136138
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
137139
var results []*paymentResult
140+
var corruptedKeys [][]byte
138141

139-
err := b.db.view(func(resultBucket kvdb.RBucket) error {
142+
err := b.db.update(func(resultBucket kvdb.RwBucket) error {
140143
results = make([]*paymentResult, 0)
144+
corruptedKeys = make([][]byte, 0)
141145

142-
return resultBucket.ForEach(func(k, v []byte) error {
146+
err := resultBucket.ForEach(func(k, v []byte) error {
143147
result, err := deserializeResult(k, v)
148+
149+
// In case of an error, track the key for removal.
144150
if err != nil {
145-
return err
151+
log.Warnf("Failed to deserialize mission "+
152+
"control entry (key=%x): %v", k, err)
153+
154+
// Make a copy of the key since ForEach reuses
155+
// the slice.
156+
keyCopy := make([]byte, len(k))
157+
copy(keyCopy, k)
158+
corruptedKeys = append(corruptedKeys, keyCopy)
159+
160+
return nil
146161
}
147162

148163
results = append(results, result)
149164

150165
return nil
151166
})
167+
if err != nil {
168+
return err
169+
}
170+
171+
// Delete corrupted entries from the database.
172+
for _, key := range corruptedKeys {
173+
if err := resultBucket.Delete(key); err != nil {
174+
return fmt.Errorf("failed to delete corrupted "+
175+
"entry: %w", err)
176+
}
177+
}
152178

179+
return nil
153180
}, func() {
154181
results = nil
182+
corruptedKeys = nil
155183
})
156184
if err != nil {
157185
return nil, err
158186
}
159187

188+
// Remove corrupted keys from in-memory tracking.
189+
for _, key := range corruptedKeys {
190+
keyStr := string(key)
191+
delete(b.keysMap, keyStr)
192+
193+
// Remove from the keys list.
194+
for e := b.keys.Front(); e != nil; e = e.Next() {
195+
keyVal, ok := e.Value.(string)
196+
if !ok {
197+
continue
198+
}
199+
if keyVal == keyStr {
200+
b.keys.Remove(e)
201+
break
202+
}
203+
}
204+
}
205+
206+
if len(corruptedKeys) > 0 {
207+
log.Infof("Removed %d corrupted mission control entries",
208+
len(corruptedKeys))
209+
}
210+
160211
return results, nil
161212
}
162213

routing/missioncontrol_store_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,108 @@ func BenchmarkMissionControlStoreFlushing(b *testing.B) {
332332
})
333333
}
334334
}
335+
336+
// TestMissionControlStoreDeletesCorruptedEntries tests that fetchAll() skips
337+
// entries that fail to deserialize, deletes them from the database, and
338+
// removes them from the in-memory tracking structures.
339+
func TestMissionControlStoreDeletesCorruptedEntries(t *testing.T) {
340+
h := newMCStoreTestHarness(t, testMaxRecords, time.Second)
341+
store := h.store
342+
343+
failureSourceIdx := 1
344+
345+
// Create two valid results.
346+
result1 := newPaymentResult(
347+
1, mcStoreTestRoute, testTime, testTime,
348+
fn.Some(newPaymentFailure(
349+
&failureSourceIdx,
350+
lnwire.NewFailIncorrectDetails(100, 1000),
351+
)),
352+
)
353+
354+
result2 := newPaymentResult(
355+
2, mcStoreTestRoute, testTime.Add(time.Hour),
356+
testTime.Add(time.Hour),
357+
fn.Some(newPaymentFailure(
358+
&failureSourceIdx,
359+
lnwire.NewFailIncorrectDetails(100, 1000),
360+
)),
361+
)
362+
363+
// Store both results.
364+
store.AddResult(result1)
365+
store.AddResult(result2)
366+
require.NoError(t, store.storeResults())
367+
368+
// Insert a corrupted entry into the database.
369+
var corruptedKey [8 + 8 + 33]byte
370+
byteOrder.PutUint64(corruptedKey[:], uint64(testTime.Add(
371+
30*time.Minute).UnixNano()),
372+
)
373+
byteOrder.PutUint64(corruptedKey[8:], 99) // Unique ID.
374+
copy(corruptedKey[16:], result1.route.Val.sourcePubKey.Val[:])
375+
376+
err := store.db.update(func(bucket kvdb.RwBucket) error {
377+
// Insert corrupted/invalid TLV data that will fail to
378+
// deserialize.
379+
corruptedValue := []byte{0xFF, 0xFF, 0xFF, 0xFF}
380+
381+
return bucket.Put(corruptedKey[:], corruptedValue)
382+
}, func() {})
383+
require.NoError(t, err)
384+
385+
// Add the corrupted key to in-memory tracking to simulate it being
386+
// loaded at startup (newMissionControlStore populates keysMap from
387+
// all DB keys).
388+
corruptedKeyStr := string(corruptedKey[:])
389+
store.keysMap[corruptedKeyStr] = struct{}{}
390+
store.keys.PushBack(corruptedKeyStr)
391+
392+
// Verify the corrupted key is in the in-memory tracking.
393+
_, exists := store.keysMap[corruptedKeyStr]
394+
require.True(t, exists, "corrupted key should be in keysMap")
395+
396+
// Verify we have 3 entries in the database before fetchAll.
397+
var dbEntryCountBefore int
398+
err = store.db.view(func(bucket kvdb.RBucket) error {
399+
return bucket.ForEach(func(k, v []byte) error {
400+
dbEntryCountBefore++
401+
return nil
402+
})
403+
}, func() {
404+
dbEntryCountBefore = 0
405+
})
406+
require.NoError(t, err)
407+
require.Equal(t, 3, dbEntryCountBefore, "should have 3 entries "+
408+
"in the database before cleanup")
409+
410+
// Now fetch all results. The corrupted entry should be skipped,
411+
// deleted from the DB, and removed from in-memory tracking.
412+
results, err := store.fetchAll()
413+
require.NoError(t, err, "fetchAll should not return an error "+
414+
"even when encountering corrupted entries")
415+
require.Len(t, results, 2, "should skip the corrupted entry and "+
416+
"return only valid results")
417+
418+
// Verify we still have the correct results.
419+
require.Equal(t, result1, results[0])
420+
require.Equal(t, result2, results[1])
421+
422+
// Verify the corrupted entry was removed from in-memory tracking.
423+
_, exists = store.keysMap[corruptedKeyStr]
424+
require.False(t, exists, "corrupted key should not exist in keysMap")
425+
426+
// Verify the corrupted entry was deleted from the database.
427+
var dbEntryCountAfter int
428+
err = store.db.view(func(bucket kvdb.RBucket) error {
429+
return bucket.ForEach(func(k, v []byte) error {
430+
dbEntryCountAfter++
431+
return nil
432+
})
433+
}, func() {
434+
dbEntryCountAfter = 0
435+
})
436+
require.NoError(t, err)
437+
require.Equal(t, 2, dbEntryCountAfter, "corrupted entry should be "+
438+
"deleted from the database")
439+
}

0 commit comments

Comments
 (0)