Skip to content

Commit 4feb960

Browse files
authored
feat(core/state): async trie prefetching (#76)
## Why this should be merged Performs trie prefetching concurrently, required for equivalent performance with `coreth` / `subnet-evm` implementations. ## How this works `StateDB.StartPrefetcher()` accepts variadic options (for backwards compatibility of function signatures). An option to specify a `WorkerPool` is provided which, if present, is used to call `Trie.Get{Account,Storage}()`; the pool is responsible for concurrency but does not need to be able to wait on the work as that is handled by this change. ## How this was tested Unit test demonstrating hand-off of work to a `WorkerPool` as well as API-guaranteed ordering of events.
1 parent 44068c8 commit 4feb960

File tree

5 files changed

+307
-17
lines changed

5 files changed

+307
-17
lines changed

core/state/statedb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
175175
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
176176
// state trie concurrently while the state is mutated so that when we reach the
177177
// commit phase, most of the needed data is already hot.
178-
func (s *StateDB) StartPrefetcher(namespace string) {
178+
func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) {
179179
if s.prefetcher != nil {
180180
s.prefetcher.close()
181181
s.prefetcher = nil
182182
}
183183
if s.snap != nil {
184-
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
184+
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...)
185185
}
186186
}
187187

core/state/trie_prefetcher.go

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"sync"
2121

2222
"github.com/ava-labs/libevm/common"
23+
"github.com/ava-labs/libevm/libevm/options"
2324
"github.com/ava-labs/libevm/log"
2425
"github.com/ava-labs/libevm/metrics"
2526
)
@@ -49,9 +50,11 @@ type triePrefetcher struct {
4950
storageDupMeter metrics.Meter
5051
storageSkipMeter metrics.Meter
5152
storageWasteMeter metrics.Meter
53+
54+
options []PrefetcherOption
5255
}
5356

54-
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
57+
func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...PrefetcherOption) *triePrefetcher {
5558
prefix := triePrefetchMetricsPrefix + namespace
5659
p := &triePrefetcher{
5760
db: db,
@@ -67,6 +70,8 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
6770
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
6871
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
6972
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
73+
74+
options: opts,
7075
}
7176
return p
7277
}
@@ -99,6 +104,7 @@ func (p *triePrefetcher) close() {
99104
}
100105
}
101106
}
107+
p.releaseWorkerPools()
102108
// Clear out all fetchers (will crash on a second call, deliberate)
103109
p.fetchers = nil
104110
}
@@ -122,6 +128,8 @@ func (p *triePrefetcher) copy() *triePrefetcher {
122128
storageDupMeter: p.storageDupMeter,
123129
storageSkipMeter: p.storageSkipMeter,
124130
storageWasteMeter: p.storageWasteMeter,
131+
132+
options: p.options,
125133
}
126134
// If the prefetcher is already a copy, duplicate the data
127135
if p.fetches != nil {
@@ -150,7 +158,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
150158
id := p.trieID(owner, root)
151159
fetcher := p.fetchers[id]
152160
if fetcher == nil {
153-
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
161+
fetcher = newSubfetcher(p.db, p.root, owner, root, addr, p.options...)
154162
p.fetchers[id] = fetcher
155163
}
156164
fetcher.schedule(keys)
@@ -226,11 +234,13 @@ type subfetcher struct {
226234
seen map[string]struct{} // Tracks the entries already loaded
227235
dups int // Number of duplicate preload tasks
228236
used [][]byte // Tracks the entries used in the end
237+
238+
pool *subfetcherPool
229239
}
230240

231241
// newSubfetcher creates a goroutine to prefetch state items belonging to a
232242
// particular root hash.
233-
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
243+
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, opts ...PrefetcherOption) *subfetcher {
234244
sf := &subfetcher{
235245
db: db,
236246
state: state,
@@ -243,6 +253,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
243253
copy: make(chan chan Trie),
244254
seen: make(map[string]struct{}),
245255
}
256+
options.As[prefetcherConfig](opts...).applyTo(sf)
246257
go sf.loop()
247258
return sf
248259
}
@@ -294,7 +305,10 @@ func (sf *subfetcher) abort() {
294305
// out of tasks or its underlying trie is retrieved for committing.
295306
func (sf *subfetcher) loop() {
296307
// No matter how the loop stops, signal anyone waiting that it's terminated
297-
defer close(sf.term)
308+
defer func() {
309+
sf.pool.wait()
310+
close(sf.term)
311+
}()
298312

299313
// Start by opening the trie and stop processing if it fails
300314
if sf.owner == (common.Hash{}) {
@@ -325,14 +339,14 @@ func (sf *subfetcher) loop() {
325339
sf.lock.Unlock()
326340

327341
// Prefetch any tasks until the loop is interrupted
328-
for i, task := range tasks {
342+
for _, task := range tasks {
329343
select {
330-
case <-sf.stop:
331-
// If termination is requested, add any leftover back and return
332-
sf.lock.Lock()
333-
sf.tasks = append(sf.tasks, tasks[i:]...)
334-
sf.lock.Unlock()
335-
return
344+
//libevm:start
345+
//
346+
// The <-sf.stop case has been removed, in keeping with the equivalent change below. Future geth
347+
// versions also remove it so our modification here can be undone when merging upstream.
348+
//
349+
//libevm:end
336350

337351
case ch := <-sf.copy:
338352
// Somebody wants a copy of the current trie, grant them
@@ -344,9 +358,9 @@ func (sf *subfetcher) loop() {
344358
sf.dups++
345359
} else {
346360
if len(task) == common.AddressLength {
347-
sf.trie.GetAccount(common.BytesToAddress(task))
361+
sf.pool.GetAccount(common.BytesToAddress(task))
348362
} else {
349-
sf.trie.GetStorage(sf.addr, task)
363+
sf.pool.GetStorage(sf.addr, task)
350364
}
351365
sf.seen[string(task)] = struct{}{}
352366
}
@@ -358,8 +372,26 @@ func (sf *subfetcher) loop() {
358372
ch <- sf.db.CopyTrie(sf.trie)
359373

360374
case <-sf.stop:
361-
// Termination is requested, abort and leave remaining tasks
362-
return
375+
//libevm:start
376+
//
377+
// This is copied, with alteration, from ethereum/go-ethereum#29519
378+
// and can be deleted once we update to include that change.
379+
380+
// Termination is requested, abort if no more tasks are pending. If
381+
// there are some, exhaust them first.
382+
sf.lock.Lock()
383+
done := len(sf.tasks) == 0
384+
sf.lock.Unlock()
385+
386+
if done {
387+
return
388+
}
389+
390+
select {
391+
case sf.wake <- struct{}{}:
392+
default:
393+
}
394+
//libevm:end
363395
}
364396
}
365397
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright 2024 the libevm authors.
2+
//
3+
// The libevm additions to go-ethereum are free software: you can redistribute
4+
// them and/or modify them under the terms of the GNU Lesser General Public License
5+
// as published by the Free Software Foundation, either version 3 of the License,
6+
// or (at your option) any later version.
7+
//
8+
// The libevm additions are distributed in the hope that they will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
11+
// General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Lesser General Public License
14+
// along with the go-ethereum library. If not, see
15+
// <http://www.gnu.org/licenses/>.
16+
17+
package state
18+
19+
import (
20+
"github.com/ava-labs/libevm/common"
21+
"github.com/ava-labs/libevm/libevm/options"
22+
"github.com/ava-labs/libevm/libevm/sync"
23+
"github.com/ava-labs/libevm/log"
24+
)
25+
26+
// A PrefetcherOption configures behaviour of trie prefetching.
27+
type PrefetcherOption = options.Option[prefetcherConfig]
28+
29+
type prefetcherConfig struct {
30+
newWorkers func() WorkerPool
31+
}
32+
33+
// A WorkerPool executes functions asynchronously. Done() is called to signal
34+
// that the pool is no longer needed and that Execute() is guaranteed to not be
35+
// called again.
36+
type WorkerPool interface {
37+
Execute(func())
38+
Done()
39+
}
40+
41+
// WithWorkerPools configures trie prefetching to execute asynchronously. The
42+
// provided constructor is called once for each trie being fetched but it MAY
43+
// return the same pool.
44+
func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
45+
return options.Func[prefetcherConfig](func(c *prefetcherConfig) {
46+
c.newWorkers = ctor
47+
})
48+
}
49+
50+
type subfetcherPool struct {
51+
workers WorkerPool
52+
tries sync.Pool[Trie]
53+
wg sync.WaitGroup
54+
}
55+
56+
// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided
57+
// with a [PrefetcherOption].
58+
func (c *prefetcherConfig) applyTo(sf *subfetcher) {
59+
sf.pool = &subfetcherPool{
60+
tries: sync.Pool[Trie]{
61+
// Although the workers may be shared between all subfetchers, each
62+
// MUST have its own Trie pool.
63+
New: func() Trie {
64+
return sf.db.CopyTrie(sf.trie)
65+
},
66+
},
67+
}
68+
if c.newWorkers != nil {
69+
sf.pool.workers = c.newWorkers()
70+
}
71+
}
72+
73+
// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be
74+
// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed
75+
// to be shared between them. This is because we guarantee in the public API
76+
// that no further calls will be made to Execute() after a call to Done().
77+
func (p *triePrefetcher) releaseWorkerPools() {
78+
for _, f := range p.fetchers {
79+
if w := f.pool.workers; w != nil {
80+
w.Done()
81+
}
82+
}
83+
}
84+
85+
func (p *subfetcherPool) wait() {
86+
p.wg.Wait()
87+
}
88+
89+
// execute runs the provided function with a copy of the subfetcher's Trie.
90+
// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was
91+
// configured with a [WorkerPool] then it is used for function execution,
92+
// otherwise `fn` is just called directly.
93+
func (p *subfetcherPool) execute(fn func(Trie)) {
94+
p.wg.Add(1)
95+
do := func() {
96+
t := p.tries.Get()
97+
fn(t)
98+
p.tries.Put(t)
99+
p.wg.Done()
100+
}
101+
102+
if w := p.workers; w != nil {
103+
w.Execute(do)
104+
} else {
105+
do()
106+
}
107+
}
108+
109+
// GetAccount optimistically pre-fetches an account, dropping the returned value
110+
// and logging errors. See [subfetcherPool.execute] re worker pools.
111+
func (p *subfetcherPool) GetAccount(addr common.Address) {
112+
p.execute(func(t Trie) {
113+
if _, err := t.GetAccount(addr); err != nil {
114+
log.Error("account prefetching failed", "address", addr, "err", err)
115+
}
116+
})
117+
}
118+
119+
// GetStorage is the storage equivalent of [subfetcherPool.GetAccount].
120+
func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) {
121+
p.execute(func(t Trie) {
122+
if _, err := t.GetStorage(addr, key); err != nil {
123+
log.Error("storage prefetching failed", "address", addr, "key", key, "err", err)
124+
}
125+
})
126+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2024 the libevm authors.
2+
//
3+
// The libevm additions to go-ethereum are free software: you can redistribute
4+
// them and/or modify them under the terms of the GNU Lesser General Public License
5+
// as published by the Free Software Foundation, either version 3 of the License,
6+
// or (at your option) any later version.
7+
//
8+
// The libevm additions are distributed in the hope that they will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
11+
// General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Lesser General Public License
14+
// along with the go-ethereum library. If not, see
15+
// <http://www.gnu.org/licenses/>.
16+
17+
package state
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/stretchr/testify/assert"
24+
25+
"github.com/ava-labs/libevm/common"
26+
)
27+
28+
type synchronisingWorkerPool struct {
29+
t *testing.T
30+
executed, unblock chan struct{}
31+
done bool
32+
preconditionsToStopPrefetcher int
33+
}
34+
35+
var _ WorkerPool = (*synchronisingWorkerPool)(nil)
36+
37+
func (p *synchronisingWorkerPool) Execute(fn func()) {
38+
fn()
39+
select {
40+
case <-p.executed:
41+
default:
42+
close(p.executed)
43+
}
44+
45+
<-p.unblock
46+
assert.False(p.t, p.done, "Done() called before Execute() returns")
47+
p.preconditionsToStopPrefetcher++
48+
}
49+
50+
func (p *synchronisingWorkerPool) Done() {
51+
p.done = true
52+
p.preconditionsToStopPrefetcher++
53+
}
54+
55+
func TestStopPrefetcherWaitsOnWorkers(t *testing.T) {
56+
pool := &synchronisingWorkerPool{
57+
t: t,
58+
executed: make(chan struct{}),
59+
unblock: make(chan struct{}),
60+
}
61+
opt := WithWorkerPools(func() WorkerPool { return pool })
62+
63+
db := filledStateDB()
64+
db.prefetcher = newTriePrefetcher(db.db, db.originalRoot, "", opt)
65+
db.prefetcher.prefetch(common.Hash{}, common.Hash{}, common.Address{}, [][]byte{{}})
66+
67+
go func() {
68+
<-pool.executed
69+
// Sleep otherwise there is a small chance that we close pool.unblock
70+
// between db.StopPrefetcher() returning and the assertion.
71+
time.Sleep(time.Second)
72+
close(pool.unblock)
73+
}()
74+
75+
<-pool.executed
76+
db.StopPrefetcher()
77+
// If this errors then either Execute() hadn't returned or Done() wasn't
78+
// called.
79+
assert.Equalf(t, 2, pool.preconditionsToStopPrefetcher, "%T.StopPrefetcher() returned early", db)
80+
}

0 commit comments

Comments
 (0)