Skip to content

Commit df442be

Browse files
Crypt-iQcfromknecht
authored andcommitted
persistlog: persistent GC, cleaned tests, batched writes
This commit introduces changes to the garbage collector that cause it to batch all writes to the backend boltdb. The garbage collector's code is much cleaner now and all the logic is contained in the Batch call. Additionally, persistence was added to the garbage collector so that even on DecayedLog shutdown, the garbage collector will still delete expired entries based on their CLTV. Tests were cleaned up and a new test for GC persistence was added.
1 parent 22d2642 commit df442be

File tree

2 files changed

+205
-179
lines changed

2 files changed

+205
-179
lines changed

persistlog/decayedlog.go

Lines changed: 71 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@ import (
44
"crypto/sha256"
55
"encoding/binary"
66
"fmt"
7+
"math"
8+
"sync"
9+
710
"github.com/boltdb/bolt"
811
"github.com/lightningnetwork/lnd/chainntnfs"
912
"github.com/lightningnetwork/lnd/channeldb"
10-
"math"
11-
"sync"
1213
)
1314

1415
const (
1516
// defaultDbDirectory is the default directory where our decayed log
16-
// will store our (sharedHash, CLTV expiry height) key-value pairs.
17-
defaultDbDirectory = "sharedsecret"
17+
// will store our (sharedHash, CLTV) key-value pairs.
18+
defaultDbDirectory = "sharedhashes"
1819

1920
// sharedHashSize is the size in bytes of the keys we will be storing
2021
// in the DecayedLog. It represents the first 20 bytes of a truncated
@@ -26,18 +27,18 @@ const (
2627
)
2728

2829
var (
29-
// sharedHashBucket is a bucket which houses all the first sharedHashSize
30-
// bytes of a received HTLC's hashed shared secret and the HTLC's
31-
// expiry block height.
30+
// sharedHashBucket is a bucket which houses the first sharedHashSize
31+
// bytes of a received HTLC's hashed shared secret as the key and the HTLC's
32+
// CLTV expiry as the value.
3233
sharedHashBucket = []byte("shared-hash")
3334
)
3435

3536
// DecayedLog implements the PersistLog interface. It stores the first
3637
// sharedHashSize bytes of a sha256-hashed shared secret along with a node's
3738
// CLTV value. It is a decaying log meaning there will be a garbage collector
3839
// to collect entries which are expired according to their stored CLTV value
39-
// and the current block height. DecayedLog wraps channeldb for simplicity, but
40-
// must batch writes to the database to decrease write contention.
40+
// and the current block height. DecayedLog wraps channeldb for simplicity and
41+
// batches writes to the database to decrease write contention.
4142
type DecayedLog struct {
4243
db *channeldb.DB
4344
wg sync.WaitGroup
@@ -60,15 +61,13 @@ func (d *DecayedLog) garbageCollector() error {
6061
outer:
6162
for {
6263
select {
63-
case _, ok := <-epochClient.Epochs:
64+
case epoch, ok := <-epochClient.Epochs:
6465
if !ok {
6566
return fmt.Errorf("Epoch client shutting " +
6667
"down")
6768
}
6869

69-
var expiredCltv [][]byte
70-
validCltv := make(map[string]uint32)
71-
err := d.db.View(func(tx *bolt.Tx) error {
70+
err := d.db.Batch(func(tx *bolt.Tx) error {
7271
// Grab the shared hash bucket
7372
sharedHashes := tx.Bucket(sharedHashBucket)
7473
if sharedHashes == nil {
@@ -77,15 +76,53 @@ outer:
7776
}
7877

7978
sharedHashes.ForEach(func(k, v []byte) error {
80-
cltv := uint32(binary.BigEndian.Uint32(v))
79+
// The CLTV value in question.
80+
cltv := uint32(binary.BigEndian.Uint32(v[:4]))
81+
// The last recorded block height that
82+
// the garbage collector received.
83+
lastHeight := uint32(binary.BigEndian.Uint32(v[4:8]))
84+
8185
if cltv == 0 {
82-
// Store expired hash in array
83-
expiredCltv = append(expiredCltv, k)
86+
// This CLTV just expired. We
87+
// must delete it from the db.
88+
err := sharedHashes.Delete(k)
89+
if err != nil {
90+
return err
91+
}
92+
} else if lastHeight != 0 && uint32(epoch.Height) - lastHeight > cltv {
93+
// This CLTV just expired or
94+
// expired in the past but the
95+
// garbage collector was not
96+
// running and therefore could
97+
// not handle it. We delete it
98+
// from the db now.
99+
err := sharedHashes.Delete(k)
100+
if err != nil {
101+
return err
102+
}
84103
} else {
104+
// The CLTV is still valid. We
105+
// decrement the CLTV value and
106+
// store the new CLTV value along
107+
// with the current block height.
108+
var scratch [8]byte
109+
110+
// Store decremented CLTV in
111+
// scratch[:4]
85112
cltv--
86-
// Store valid <hash, cltv> in map
87-
validCltv[string(k)] = cltv
113+
binary.BigEndian.PutUint32(scratch[:4], cltv)
114+
115+
// Store current blockheight in
116+
// scratch[4:8]
117+
binary.BigEndian.PutUint32(scratch[4:8], uint32(epoch.Height))
118+
119+
// Store <hash, CLTV + blockheight>
120+
err := sharedHashes.Put(k, scratch[:])
121+
if err != nil {
122+
return err
123+
}
88124
}
125+
89126
return nil
90127
})
91128

@@ -96,24 +133,6 @@ outer:
96133
"%v", err)
97134
}
98135

99-
// Delete every item in array
100-
for _, hash := range expiredCltv {
101-
err = d.Delete(hash)
102-
if err != nil {
103-
return fmt.Errorf("Unable to delete "+
104-
"expired secret: %v", err)
105-
}
106-
}
107-
108-
// Update decremented CLTV's via validCltv
109-
for hash, cltv := range validCltv {
110-
err = d.Put([]byte(hash), cltv)
111-
if err != nil {
112-
return fmt.Errorf("Unable to decrement "+
113-
"cltv value: %v", err)
114-
}
115-
}
116-
117136
case <-d.quit:
118137
break outer
119138
}
@@ -140,10 +159,10 @@ func HashSharedSecret(sharedSecret [sharedSecretSize]byte) [sharedHashSize]byte
140159
return sharedHash
141160
}
142161

143-
// Delete removes a <shared secret hash, CLTV value> key-pair from the
162+
// Delete removes a <shared secret hash, CLTV> key-pair from the
144163
// sharedHashBucket.
145164
func (d *DecayedLog) Delete(hash []byte) error {
146-
return d.db.Update(func(tx *bolt.Tx) error {
165+
return d.db.Batch(func(tx *bolt.Tx) error {
147166
sharedHashes, err := tx.CreateBucketIfNotExists(sharedHashBucket)
148167
if err != nil {
149168
return fmt.Errorf("Unable to created sharedHashes bucket:"+
@@ -154,29 +173,30 @@ func (d *DecayedLog) Delete(hash []byte) error {
154173
})
155174
}
156175

157-
// Get retrieves the CLTV value of a processed HTLC given the first 20 bytes
158-
// of the Sha-256 hash of the shared secret used during sphinx processing.
176+
// Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
177+
// Sha-256 hash of the shared secret.
159178
func (d *DecayedLog) Get(hash []byte) (uint32, error) {
160179
// math.MaxUint32 is returned when Get did not retrieve a value.
180+
// This was chosen because it's not feasible for a CLTV to be this high.
161181
var value uint32 = math.MaxUint32
162182

163183
err := d.db.View(func(tx *bolt.Tx) error {
164184
// Grab the shared hash bucket which stores the mapping from
165-
// truncated sha-256 hashes of shared secrets to CLTV values.
185+
// truncated sha-256 hashes of shared secrets to CLTV's.
166186
sharedHashes := tx.Bucket(sharedHashBucket)
167187
if sharedHashes == nil {
168188
return fmt.Errorf("sharedHashes is nil, could " +
169189
"not retrieve CLTV value")
170190
}
171191

172-
// If the sharedHash is found, we use it to find the associated
173-
// CLTV in the sharedHashBucket.
192+
// Retrieve the bytes which represents the CLTV + blockheight.
174193
valueBytes := sharedHashes.Get(hash)
175194
if valueBytes == nil {
176195
return nil
177196
}
178197

179-
value = uint32(binary.BigEndian.Uint32(valueBytes))
198+
// The first 4 bytes represent the CLTV, store it in value.
199+
value = uint32(binary.BigEndian.Uint32(valueBytes[:4]))
180200

181201
return nil
182202
})
@@ -187,14 +207,14 @@ func (d *DecayedLog) Get(hash []byte) (uint32, error) {
187207
return value, nil
188208
}
189209

190-
// Put stores a shared secret hash as the key and a slice consisting of the
191-
// current blockheight and the outgoing CLTV value
192-
func (d *DecayedLog) Put(hash []byte, value uint32) error {
193-
194-
var scratch [4]byte
210+
// Put stores a shared secret hash as the key and the CLTV as the value.
211+
func (d *DecayedLog) Put(hash []byte, cltv uint32) error {
212+
// The CLTV will be stored into scratch and then stored into the
213+
// sharedHashBucket.
214+
var scratch [8]byte
195215

196216
// Store value into scratch
197-
binary.BigEndian.PutUint32(scratch[:], value)
217+
binary.BigEndian.PutUint32(scratch[:4], cltv)
198218

199219
return d.db.Batch(func(tx *bolt.Tx) error {
200220
sharedHashes, err := tx.CreateBucketIfNotExists(sharedHashBucket)
@@ -236,7 +256,7 @@ func (d *DecayedLog) Start(dbDir string) error {
236256
return nil
237257
})
238258
if err != nil {
239-
return fmt.Errorf("Could not create sharedHashes")
259+
return err
240260
}
241261

242262
// Start garbage collector.

0 commit comments

Comments
 (0)