Skip to content

Commit 2f4892d

Browse files
committed
routing: allow misson control manager to startup despite errors
We now allow the mission control manager to skip over deserializable errors. We cannot repair this these results but we just skip over it so we can startup properly. When fetchAll() encounters entries that fail to deserialize, in addition to skipping them, now also: - Delete the corrupted entries from the database - Remove them from the in-memory keysMap and keys tracking structures This prevents corrupted entries from: - Being counted toward maxRecords, which would cause valid entries to be pruned prematurely - Persisting in the database indefinitely - Causing inaccurate entry counts in startup logs
1 parent 0a2a5b2 commit 2f4892d

File tree

2 files changed

+171
-2
lines changed

2 files changed

+171
-2
lines changed

routing/missioncontrol_store.go

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,30 +133,94 @@ func (b *missionControlStore) clear() error {
133133
}
134134

135135
// fetchAll returns all results currently stored in the database.
136+
// It also removes any corrupted entries that fail to deserialize from both
137+
// the database and the in-memory tracking structures.
136138
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
137139
var results []*paymentResult
140+
var corruptedKeys [][]byte
138141

142+
// Read all results and identify corrupted entries.
139143
err := b.db.view(func(resultBucket kvdb.RBucket) error {
140144
results = make([]*paymentResult, 0)
145+
corruptedKeys = make([][]byte, 0)
141146

142-
return resultBucket.ForEach(func(k, v []byte) error {
147+
err := resultBucket.ForEach(func(k, v []byte) error {
143148
result, err := deserializeResult(k, v)
149+
150+
// In case of an error, track the key for removal.
144151
if err != nil {
145-
return err
152+
log.Warnf("Failed to deserialize mission "+
153+
"control entry (key=%x): %v", k, err)
154+
155+
// Make a copy of the key since ForEach reuses
156+
// the slice.
157+
keyCopy := make([]byte, len(k))
158+
copy(keyCopy, k)
159+
corruptedKeys = append(corruptedKeys, keyCopy)
160+
161+
return nil
146162
}
147163

148164
results = append(results, result)
149165

150166
return nil
151167
})
168+
if err != nil {
169+
return err
170+
}
152171

172+
return nil
153173
}, func() {
154174
results = nil
175+
corruptedKeys = nil
155176
})
156177
if err != nil {
157178
return nil, err
158179
}
159180

181+
// Delete corrupted entries from the database which were identified
182+
// when loading the results from the database.
183+
//
184+
// TODO: This code part should eventually be removed once we move the
185+
// mission control store to a native sql database and have to do a
186+
// full migration of the data.
187+
if len(corruptedKeys) > 0 {
188+
err = b.db.update(func(resultBucket kvdb.RwBucket) error {
189+
for _, key := range corruptedKeys {
190+
if err := resultBucket.Delete(key); err != nil {
191+
return fmt.Errorf("failed to delete "+
192+
"corrupted entry: %w", err)
193+
}
194+
}
195+
196+
return nil
197+
}, func() {})
198+
if err != nil {
199+
return nil, err
200+
}
201+
202+
// Remove corrupted keys from in-memory tracking.
203+
for _, key := range corruptedKeys {
204+
keyStr := string(key)
205+
delete(b.keysMap, keyStr)
206+
207+
// Remove from the keys list.
208+
for e := b.keys.Front(); e != nil; e = e.Next() {
209+
keyVal, ok := e.Value.(string)
210+
if !ok {
211+
continue
212+
}
213+
if keyVal == keyStr {
214+
b.keys.Remove(e)
215+
break
216+
}
217+
}
218+
}
219+
220+
log.Infof("Removed %d corrupted mission control entries",
221+
len(corruptedKeys))
222+
}
223+
160224
return results, nil
161225
}
162226

routing/missioncontrol_store_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,108 @@ func BenchmarkMissionControlStoreFlushing(b *testing.B) {
332332
})
333333
}
334334
}
335+
336+
// TestMissionControlStoreDeletesCorruptedEntries tests that fetchAll() skips
337+
// entries that fail to deserialize, deletes them from the database, and
338+
// removes them from the in-memory tracking structures.
339+
func TestMissionControlStoreDeletesCorruptedEntries(t *testing.T) {
340+
h := newMCStoreTestHarness(t, testMaxRecords, time.Second)
341+
store := h.store
342+
343+
failureSourceIdx := 1
344+
345+
// Create two valid results.
346+
result1 := newPaymentResult(
347+
1, mcStoreTestRoute, testTime, testTime,
348+
fn.Some(newPaymentFailure(
349+
&failureSourceIdx,
350+
lnwire.NewFailIncorrectDetails(100, 1000),
351+
)),
352+
)
353+
354+
result2 := newPaymentResult(
355+
2, mcStoreTestRoute, testTime.Add(time.Hour),
356+
testTime.Add(time.Hour),
357+
fn.Some(newPaymentFailure(
358+
&failureSourceIdx,
359+
lnwire.NewFailIncorrectDetails(100, 1000),
360+
)),
361+
)
362+
363+
// Store both results.
364+
store.AddResult(result1)
365+
store.AddResult(result2)
366+
require.NoError(t, store.storeResults())
367+
368+
// Insert a corrupted entry into the database.
369+
var corruptedKey [8 + 8 + 33]byte
370+
byteOrder.PutUint64(corruptedKey[:], uint64(testTime.Add(
371+
30*time.Minute).UnixNano()),
372+
)
373+
byteOrder.PutUint64(corruptedKey[8:], 99) // Unique ID.
374+
copy(corruptedKey[16:], result1.route.Val.sourcePubKey.Val[:])
375+
376+
err := store.db.update(func(bucket kvdb.RwBucket) error {
377+
// Insert corrupted/invalid TLV data that will fail to
378+
// deserialize.
379+
corruptedValue := []byte{0xFF, 0xFF, 0xFF, 0xFF}
380+
381+
return bucket.Put(corruptedKey[:], corruptedValue)
382+
}, func() {})
383+
require.NoError(t, err)
384+
385+
// Add the corrupted key to in-memory tracking to simulate it being
386+
// loaded at startup (newMissionControlStore populates keysMap from
387+
// all DB keys).
388+
corruptedKeyStr := string(corruptedKey[:])
389+
store.keysMap[corruptedKeyStr] = struct{}{}
390+
store.keys.PushBack(corruptedKeyStr)
391+
392+
// Verify the corrupted key is in the in-memory tracking.
393+
_, exists := store.keysMap[corruptedKeyStr]
394+
require.True(t, exists, "corrupted key should be in keysMap")
395+
396+
// Verify we have 3 entries in the database before fetchAll.
397+
var dbEntryCountBefore int
398+
err = store.db.view(func(bucket kvdb.RBucket) error {
399+
return bucket.ForEach(func(k, v []byte) error {
400+
dbEntryCountBefore++
401+
return nil
402+
})
403+
}, func() {
404+
dbEntryCountBefore = 0
405+
})
406+
require.NoError(t, err)
407+
require.Equal(t, 3, dbEntryCountBefore, "should have 3 entries "+
408+
"in the database before cleanup")
409+
410+
// Now fetch all results. The corrupted entry should be skipped,
411+
// deleted from the DB, and removed from in-memory tracking.
412+
results, err := store.fetchAll()
413+
require.NoError(t, err, "fetchAll should not return an error "+
414+
"even when encountering corrupted entries")
415+
require.Len(t, results, 2, "should skip the corrupted entry and "+
416+
"return only valid results")
417+
418+
// Verify we still have the correct results.
419+
require.Equal(t, result1, results[0])
420+
require.Equal(t, result2, results[1])
421+
422+
// Verify the corrupted entry was removed from in-memory tracking.
423+
_, exists = store.keysMap[corruptedKeyStr]
424+
require.False(t, exists, "corrupted key should not exist in keysMap")
425+
426+
// Verify the corrupted entry was deleted from the database.
427+
var dbEntryCountAfter int
428+
err = store.db.view(func(bucket kvdb.RBucket) error {
429+
return bucket.ForEach(func(k, v []byte) error {
430+
dbEntryCountAfter++
431+
return nil
432+
})
433+
}, func() {
434+
dbEntryCountAfter = 0
435+
})
436+
require.NoError(t, err)
437+
require.Equal(t, 2, dbEntryCountAfter, "corrupted entry should be "+
438+
"deleted from the database")
439+
}

0 commit comments

Comments
 (0)