@@ -25,8 +25,10 @@ import (
2525
2626 "github.com/ava-labs/libevm/common"
2727 "github.com/ava-labs/libevm/core"
28+ "github.com/ava-labs/libevm/core/state"
2829 "github.com/ava-labs/libevm/core/types"
2930 "github.com/ava-labs/libevm/core/vm"
31+ "github.com/ava-labs/libevm/libevm"
3032 "github.com/ava-labs/libevm/params"
3133)
3234
@@ -44,16 +46,17 @@ import (
4446type Handler [Result any ] interface {
4547 BeforeBlock (* types.Header )
4648 Gas (* types.Transaction ) (gas uint64 , process bool )
47- Process (index int , tx * types.Transaction ) Result
49+ Process (index int , tx * types.Transaction , sdb libevm. StateReader ) Result
4850}
4951
5052// A Processor orchestrates dispatch and collection of results from a [Handler].
5153type Processor [R any ] struct {
52- handler Handler [R ]
53- workers sync.WaitGroup
54- work chan * job
55- results [](chan result [R ])
56- txGas map [common.Hash ]uint64
54+ handler Handler [R ]
55+ workers sync.WaitGroup
56+ work chan * job
57+ results [](chan result [R ])
58+ txGas map [common.Hash ]uint64
59+ stateShare stateDBSharer
5760}
5861
5962type job struct {
@@ -66,36 +69,75 @@ type result[T any] struct {
6669 val * T
6770}
6871
72+ // A stateDBSharer allows concurrent workers to make copies of a primary
73+ // database. When the `nextAvailable` channel is closed, all workers call
74+ // [state.StateDB.Copy] then signal completion on the [sync.WaitGroup]. The
75+ // channel is replaced for each round of distribution.
76+ type stateDBSharer struct {
77+ nextAvailable chan struct {}
78+ primary * state.StateDB
79+ mu sync.Mutex
80+ workers int
81+ wg sync.WaitGroup
82+ }
83+
6984// New constructs a new [Processor] with the specified number of concurrent
7085// workers. [Processor.Close] must be called after the final call to
7186// [Processor.FinishBlock] to avoid leaking goroutines.
7287func New [R any ](h Handler [R ], workers int ) * Processor [R ] {
88+ workers = max (workers , 1 )
89+
7390 p := & Processor [R ]{
7491 handler : h ,
7592 work : make (chan * job ),
7693 txGas : make (map [common.Hash ]uint64 ),
94+ stateShare : stateDBSharer {
95+ workers : workers ,
96+ nextAvailable : make (chan struct {}),
97+ },
7798 }
7899
79- workers = max (workers , 1 )
80- p .workers . Add (workers )
100+ p . workers . Add (workers ) // for shutdown via [Processor.Close]
101+ p .stateShare . wg . Add (workers ) // for readiness of [Processor.worker] loops
81102 for range workers {
82103 go p .worker ()
83104 }
105+ p .stateShare .wg .Wait ()
106+
84107 return p
85108}
86109
87110func (p * Processor [R ]) worker () {
88111 defer p .workers .Done ()
112+
113+ var sdb * state.StateDB
114+ share := & p .stateShare
115+ stateAvailable := share .nextAvailable
116+ // Without this signal of readiness, a premature call to
117+ // [Processor.StartBlock] could replace `share.nextAvailable` before we've
118+ // copied it.
119+ share .wg .Done ()
120+
89121 for {
90- w , ok := <- p .work
91- if ! ok {
92- return
93- }
122+ select {
123+ case <- stateAvailable : // guaranteed at the beginning of each block
124+ share .mu .Lock ()
125+ sdb = share .primary .Copy ()
126+ share .mu .Unlock ()
94127
95- r := p .handler .Process (w .index , w .tx )
96- p .results [w .index ] <- result [R ]{
97- tx : w .tx .Hash (),
98- val : & r ,
128+ stateAvailable = share .nextAvailable
129+ share .wg .Done ()
130+
131+ case w , ok := <- p .work :
132+ if ! ok {
133+ return
134+ }
135+
136+ r := p .handler .Process (w .index , w .tx , sdb )
137+ p .results [w .index ] <- result [R ]{
138+ tx : w .tx .Hash (),
139+ val : & r ,
140+ }
99141 }
100142 }
101143}
@@ -109,7 +151,8 @@ func (p *Processor[R]) Close() {
109151// StartBlock dispatches transactions to the [Handler] and returns immediately.
110152// It MUST be paired with a call to [Processor.FinishBlock], without overlap of
111153// blocks.
112- func (p * Processor [R ]) StartBlock (b * types.Block , rules params.Rules ) error {
154+ func (p * Processor [R ]) StartBlock (b * types.Block , rules params.Rules , sdb * state.StateDB ) error {
155+ p .stateShare .distribute (sdb )
113156 p .handler .BeforeBlock (types .CopyHeader (b .Header ()))
114157 txs := b .Transactions ()
115158 jobs := make ([]* job , 0 , len (txs ))
@@ -149,6 +192,17 @@ func (p *Processor[R]) StartBlock(b *types.Block, rules params.Rules) error {
149192 return nil
150193}
151194
195+ func (s * stateDBSharer ) distribute (sdb * state.StateDB ) {
196+ s .primary = sdb // no need to Copy() as each worker does it
197+
198+ ch := s .nextAvailable
199+ s .nextAvailable = make (chan struct {}) // already copied by each worker
200+
201+ s .wg .Add (s .workers )
202+ close (ch )
203+ s .wg .Wait ()
204+ }
205+
152206// FinishBlock returns the [Processor] to a state ready for the next block. A
153207// return from FinishBlock guarantees that all dispatched work from the
154208// respective call to [Processor.StartBlock] has been completed.
0 commit comments