diff --git a/core/vm/evm.go b/core/vm/evm.go index b9fd682b9a7..b9eca8cfb7f 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -187,7 +187,7 @@ func (evm *EVM) Interpreter() *EVMInterpreter { // parameters. It also handles any necessary value transfer required and takes // the necessary steps to create accounts and reverses the state in case of an // execution error or failed value transfer. -func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas uint64, value *uint256.Int) (ret []byte, leftOverGas uint64, err error) { +func (evm *EVM) call(caller ContractRef, addr common.Address, input []byte, gas uint64, value *uint256.Int) (ret []byte, leftOverGas uint64, err error) { // Fail if we're trying to execute above the call depth limit if evm.depth > int(params.CallCreateDepth) { return nil, gas, ErrDepth @@ -433,8 +433,8 @@ func (c *codeAndHash) Hash() common.Hash { return c.hash } -// create creates a new contract using code as deployment code. -func (evm *EVM) create(caller ContractRef, codeAndHash *codeAndHash, gas uint64, value *uint256.Int, address common.Address, typ OpCode) ([]byte, common.Address, uint64, error) { +// createCommon creates a new contract using code as deployment code. +func (evm *EVM) createCommon(caller ContractRef, codeAndHash *codeAndHash, gas uint64, value *uint256.Int, address common.Address, typ OpCode) ([]byte, common.Address, uint64, error) { // Depth check execution. Fail if we're trying to execute above the // limit. if evm.depth > int(params.CallCreateDepth) { diff --git a/core/vm/evm.libevm.go b/core/vm/evm.libevm.go index c2c807c1378..0e902bec427 100644 --- a/core/vm/evm.libevm.go +++ b/core/vm/evm.libevm.go @@ -17,6 +17,8 @@ package vm import ( + "github.com/holiman/uint256" + "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/libevm" "github.com/ava-labs/libevm/log" @@ -52,6 +54,42 @@ func (evm *EVM) canCreateContract(caller ContractRef, contractToCreate common.Ad return gas, err } +// Call executes the contract associated with the addr with the given input as +// parameters. It also handles any necessary value transfer required and takes +// the necessary steps to create accounts and reverses the state in case of an +// execution error or failed value transfer. +func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas uint64, value *uint256.Int) (ret []byte, leftOverGas uint64, err error) { + gas, err = evm.spendPreprocessingGas(gas) + if err != nil { + return nil, gas, err + } + return evm.call(caller, addr, input, gas, value) +} + +// create wraps the original geth method of the same name, now name +// [EVM.createCommon], first spending preprocessing gas. +func (evm *EVM) create(caller ContractRef, codeAndHash *codeAndHash, gas uint64, value *uint256.Int, address common.Address, typ OpCode) ([]byte, common.Address, uint64, error) { + gas, err := evm.spendPreprocessingGas(gas) + if err != nil { + return nil, common.Address{}, gas, err + } + return evm.createCommon(caller, codeAndHash, gas, value, address, typ) +} + +func (evm *EVM) spendPreprocessingGas(gas uint64) (uint64, error) { + if evm.depth > 0 || !libevmHooks.Registered() { + return gas, nil + } + c, err := libevmHooks.Get().PreprocessingGasCharge(evm.StateDB.TxHash()) + if err != nil { + return gas, err + } + if c > gas { + return 0, ErrOutOfGas + } + return gas - c, nil +} + // InvalidateExecution sets the error that will be returned by // [EVM.ExecutionInvalidated] for the length of the current transaction; i.e. // until [EVM.Reset] is called. This is honoured by state-transition logic to diff --git a/core/vm/evm.libevm_test.go b/core/vm/evm.libevm_test.go index c2e6708f725..910b0e63e2b 100644 --- a/core/vm/evm.libevm_test.go +++ b/core/vm/evm.libevm_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/libevm" "github.com/ava-labs/libevm/params" ) @@ -47,6 +48,10 @@ func (o *evmArgOverrider) OverrideEVMResetArgs(r params.Rules, _ *EVMResetArgs) } } +func (o *evmArgOverrider) PreprocessingGasCharge(common.Hash) (uint64, error) { + return 0, nil +} + func (o *evmArgOverrider) register(t *testing.T) { t.Helper() TestOnlyClearRegisteredHooks() diff --git a/core/vm/hooks.libevm.go b/core/vm/hooks.libevm.go index 9c4feb1339b..c6d047cfd2c 100644 --- a/core/vm/hooks.libevm.go +++ b/core/vm/hooks.libevm.go @@ -17,6 +17,7 @@ package vm import ( + "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/libevm" "github.com/ava-labs/libevm/libevm/register" "github.com/ava-labs/libevm/params" @@ -57,6 +58,14 @@ var libevmHooks register.AtMostOnce[Hooks] type Hooks interface { OverrideNewEVMArgs(*NewEVMArgs) *NewEVMArgs OverrideEVMResetArgs(params.Rules, *EVMResetArgs) *EVMResetArgs + Preprocessor +} + +// A Preprocessor performs computation on a transaction before the +// [EVMInterpreter] is invoked and reports its gas charge for spending at the +// beginning of [EVM.Call] or [EVM.Create]. +type Preprocessor interface { + PreprocessingGasCharge(tx common.Hash) (uint64, error) } // NewEVMArgs are the arguments received by [NewEVM], available for override @@ -97,3 +106,19 @@ func (evm *EVM) overrideEVMResetArgs(txCtx TxContext, statedb StateDB) (TxContex args := libevmHooks.Get().OverrideEVMResetArgs(evm.chainRules, &EVMResetArgs{txCtx, statedb}) return args.TxContext, args.StateDB } + +// NOOPHooks implements [Hooks] such that every method is a noop. +type NOOPHooks struct{} + +var _ Hooks = NOOPHooks{} + +// OverrideNewEVMArgs returns the args unchanged. +func (NOOPHooks) OverrideNewEVMArgs(a *NewEVMArgs) *NewEVMArgs { return a } + +// OverrideEVMResetArgs returns the args unchanged. +func (NOOPHooks) OverrideEVMResetArgs(_ params.Rules, a *EVMResetArgs) *EVMResetArgs { + return a +} + +// PreprocessingGasCharge returns (0, nil). +func (NOOPHooks) PreprocessingGasCharge(common.Hash) (uint64, error) { return 0, nil } diff --git a/core/vm/interface.go b/core/vm/interface.go index 4a9e15a6d3c..25ef393e863 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -82,6 +82,8 @@ type StateDB interface { AddLog(*types.Log) AddPreimage(common.Hash, []byte) + + StateDBRemainder } // CallContext provides a basic interface for the EVM calling conventions. The EVM diff --git a/core/vm/interface.libevm.go b/core/vm/interface.libevm.go new file mode 100644 index 00000000000..ee999fcc8c5 --- /dev/null +++ b/core/vm/interface.libevm.go @@ -0,0 +1,27 @@ +// Copyright 2025 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package vm + +import "github.com/ava-labs/libevm/common" + +// StateDBRemainder defines methods not included in the geth definition of +// [StateDB] but present on the concrete type and exposed for libevm +// functionality. +type StateDBRemainder interface { + TxHash() common.Hash + TxIndex() int +} diff --git a/core/vm/preprocess.libevm_test.go b/core/vm/preprocess.libevm_test.go new file mode 100644 index 00000000000..509682668c0 --- /dev/null +++ b/core/vm/preprocess.libevm_test.go @@ -0,0 +1,193 @@ +// Copyright 2025 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package vm_test + +import ( + "errors" + "fmt" + "math" + "math/big" + "testing" + + "github.com/holiman/uint256" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/core/vm" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/libevm/ethtest" + "github.com/ava-labs/libevm/params" +) + +type preprocessingCharger struct { + vm.NOOPHooks + charge map[common.Hash]uint64 +} + +var errUnknownTx = errors.New("unknown tx") + +func (p preprocessingCharger) PreprocessingGasCharge(tx common.Hash) (uint64, error) { + c, ok := p.charge[tx] + if !ok { + return 0, fmt.Errorf("%w: %v", errUnknownTx, tx) + } + return c, nil +} + +func TestChargePreprocessingGas(t *testing.T) { + tests := []struct { + name string + to *common.Address + charge uint64 + skipChargeRegistration bool + txGas uint64 + wantVMErr error + wantGasUsed uint64 + }{ + { + name: "standard create", + to: nil, + txGas: params.TxGas + params.CreateGas, + wantGasUsed: params.TxGas + params.CreateGas, + }, + { + name: "create with extra charge", + to: nil, + charge: 1234, + txGas: params.TxGas + params.CreateGas + 2000, + wantGasUsed: params.TxGas + params.CreateGas + 1234, + }, + { + name: "standard call", + to: &common.Address{}, + txGas: params.TxGas, + wantGasUsed: params.TxGas, + }, + { + name: "out of gas", + to: &common.Address{}, + charge: 1000, + txGas: params.TxGas + 999, + wantGasUsed: params.TxGas + 999, + wantVMErr: vm.ErrOutOfGas, + }, + { + name: "call with extra charge", + to: &common.Address{}, + charge: 13579, + txGas: params.TxGas + 20000, + wantGasUsed: params.TxGas + 13579, + }, + { + name: "error propagation", + to: &common.Address{}, + skipChargeRegistration: true, + txGas: params.TxGas, + wantGasUsed: params.TxGas, + wantVMErr: errUnknownTx, + }, + } + + config := params.AllDevChainProtocolChanges + key, err := crypto.GenerateKey() + require.NoError(t, err, "crypto.GenerateKey()") + eoa := crypto.PubkeyToAddress(key.PublicKey) + + header := &types.Header{ + Number: big.NewInt(0), + Difficulty: big.NewInt(0), + BaseFee: big.NewInt(0), + } + signer := types.MakeSigner(config, header.Number, header.Time) + + var txs types.Transactions + charge := make(map[common.Hash]uint64) + for i, tt := range tests { + tx := types.MustSignNewTx(key, signer, &types.LegacyTx{ + // Although nonces aren't strictly necessary, they guarantee a + // different tx hash for each one. + Nonce: uint64(i), + To: tt.to, + GasPrice: big.NewInt(1), + Gas: tt.txGas, + }) + txs = append(txs, tx) + if !tt.skipChargeRegistration { + charge[tx.Hash()] = tt.charge + } + } + + vm.RegisterHooks(&preprocessingCharger{ + charge: charge, + }) + t.Cleanup(vm.TestOnlyClearRegisteredHooks) + + for i, tt := range tests { + tx := txs[i] + + t.Run(tt.name, func(t *testing.T) { + t.Logf("Extra gas charge: %d", tt.charge) + + t.Run("ApplyTransaction", func(t *testing.T) { + _, _, sdb := ethtest.NewEmptyStateDB(t) + sdb.SetTxContext(tx.Hash(), i) + sdb.SetBalance(eoa, new(uint256.Int).SetAllOne()) + sdb.SetNonce(eoa, tx.Nonce()) + + var gotGasUsed uint64 + gp := core.GasPool(math.MaxUint64) + + receipt, err := core.ApplyTransaction( + config, ethtest.DummyChainContext(), &common.Address{}, + &gp, sdb, header, tx, &gotGasUsed, vm.Config{}, + ) + require.NoError(t, err, "core.ApplyTransaction(...)") + + wantStatus := types.ReceiptStatusSuccessful + if tt.wantVMErr != nil { + wantStatus = types.ReceiptStatusFailed + } + assert.Equalf(t, wantStatus, receipt.Status, "%T.Status", receipt) + + if got, want := gotGasUsed, tt.wantGasUsed; got != want { + t.Errorf("core.ApplyTransaction(..., &gotGasUsed, ...) got %d; want %d", got, want) + } + if got, want := receipt.GasUsed, tt.wantGasUsed; got != want { + t.Errorf("core.ApplyTransaction(...) -> %T.GasUsed = %d; want %d", receipt, got, want) + } + }) + + t.Run("VM_error", func(t *testing.T) { + sdb, evm := ethtest.NewZeroEVM(t, ethtest.WithChainConfig(config)) + sdb.SetTxContext(tx.Hash(), i) + sdb.SetBalance(eoa, new(uint256.Int).SetAllOne()) + sdb.SetNonce(eoa, tx.Nonce()) + + msg, err := core.TransactionToMessage(tx, signer, header.BaseFee) + require.NoError(t, err, "core.TransactionToMessage(...)") + + gp := core.GasPool(math.MaxUint64) + got, err := core.ApplyMessage(evm, msg, &gp) + require.NoError(t, err, "core.ApplyMessage(...)") + require.ErrorIsf(t, got.Err, tt.wantVMErr, "%T.Err", got) + }) + }) + } +} diff --git a/go.mod b/go.mod index f966a6849fd..0ae1145599b 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/tyler-smith/go-bip39 v1.1.0 github.com/urfave/cli/v2 v2.25.7 go.uber.org/automaxprocs v1.5.2 + go.uber.org/goleak v1.3.0 golang.org/x/crypto v0.43.0 golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa golang.org/x/mod v0.29.0 diff --git a/go.sum b/go.sum index 53786439a48..3f876903d96 100644 --- a/go.sum +++ b/go.sum @@ -622,6 +622,8 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME= go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/libevm/ethtest/dummy.go b/libevm/ethtest/dummy.go new file mode 100644 index 00000000000..e800a513d27 --- /dev/null +++ b/libevm/ethtest/dummy.go @@ -0,0 +1,44 @@ +// Copyright 2025 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package ethtest + +import ( + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/consensus" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/types" +) + +// DummyChainContext returns a dummy that returns [DummyEngine] when its +// Engine() method is called, and panics when its GetHeader() method is called. +func DummyChainContext() core.ChainContext { + return chainContext{} +} + +// DummyEngine returns a dummy that panics when its Author() method is called. +func DummyEngine() consensus.Engine { + return engine{} +} + +type ( + chainContext struct{} + engine struct{ consensus.Engine } +) + +func (chainContext) Engine() consensus.Engine { return engine{} } +func (chainContext) GetHeader(common.Hash, uint64) *types.Header { panic("unimplemented") } +func (engine) Author(h *types.Header) (common.Address, error) { panic("unimplemented") } diff --git a/libevm/ethtest/evm.go b/libevm/ethtest/evm.go index 4e16c4e90bb..7a7b463295e 100644 --- a/libevm/ethtest/evm.go +++ b/libevm/ethtest/evm.go @@ -23,14 +23,28 @@ import ( "github.com/stretchr/testify/require" - "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core" "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/state" + "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/core/vm" + "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/params" ) +// NewEmptyStateDB returns a fresh database from [rawdb.NewMemoryDatabase], a +// [state.Database] wrapping it, and a [state.StateDB] wrapping that, opened to +// [types.EmptyRootHash]. +func NewEmptyStateDB(tb testing.TB) (ethdb.Database, state.Database, *state.StateDB) { + tb.Helper() + + db := rawdb.NewMemoryDatabase() + cache := state.NewDatabase(db) + sdb, err := state.New(types.EmptyRootHash, cache, nil) + require.NoError(tb, err, "state.New()") + return db, cache, sdb +} + // NewZeroEVM returns a new EVM backed by a [rawdb.NewMemoryDatabase]; all other // arguments to [vm.NewEVM] are the zero values of their respective types, // except for the use of [core.CanTransfer] and [core.Transfer] instead of nil @@ -38,8 +52,7 @@ import ( func NewZeroEVM(tb testing.TB, opts ...EVMOption) (*state.StateDB, *vm.EVM) { tb.Helper() - sdb, err := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - require.NoError(tb, err, "state.New()") + _, _, sdb := NewEmptyStateDB(tb) args := &evmConstructorArgs{ vm.BlockContext{ diff --git a/libevm/libevm.go b/libevm/libevm.go index a429d604879..4f9e3a363b7 100644 --- a/libevm/libevm.go +++ b/libevm/libevm.go @@ -56,6 +56,9 @@ type StateReader interface { AddressInAccessList(addr common.Address) bool SlotInAccessList(addr common.Address, slot common.Hash) (addressOk bool, slotOk bool) + + TxHash() common.Hash + TxIndex() int } // AddressContext carries addresses available to contexts such as calls and diff --git a/libevm/precompiles/parallel/parallel.go b/libevm/precompiles/parallel/parallel.go new file mode 100644 index 00000000000..007093146b1 --- /dev/null +++ b/libevm/precompiles/parallel/parallel.go @@ -0,0 +1,375 @@ +// Copyright 2025 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +// Package parallel provides functionality for precompiled contracts that can +// pre-process their results in an embarrassingly parallel fashion. +package parallel + +import ( + "errors" + "fmt" + "iter" + "sync" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/state" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/core/vm" + "github.com/ava-labs/libevm/libevm" + "github.com/ava-labs/libevm/libevm/stateconf" + "github.com/ava-labs/libevm/params" +) + +// A Handler is responsible for processing [types.Transactions] in an +// embarrassingly parallel fashion. It is the responsibility of the Handler to +// determine whether this is possible, typically only so if one of the following +// is true with respect to a precompile associated with the Handler: +// +// 1. The destination address is that of the precompile; or +// +// 2. At least one [types.AccessTuple] references the precompile's address. +// +// Scenario (2) allows precompile access to be determined through inspection of +// the [types.Transaction] alone, without the need for execution. +// +// All [libevm.StateReader] instances are opened to the state at the beginning +// of the block. The [StateDB] is the same one used to execute the block, +// before being committed, and MAY be written to. +type Handler[Data, Result, Aggregated any] interface { + BeforeBlock(libevm.StateReader, *types.Block) + Gas(*types.Transaction) (gas uint64, process bool) + Prefetch(sdb libevm.StateReader, index int, tx *types.Transaction) Data + Process(sdb libevm.StateReader, index int, tx *types.Transaction, data Data) Result + PostProcess(iter.Seq2[int, Result]) Aggregated + AfterBlock(StateDB, Aggregated, *types.Block, types.Receipts) +} + +// StateDB is the subset of [state.StateDB] methods that MAY be called by +// [Handler.AfterBlock]. +type StateDB interface { + libevm.StateReader + SetState(_ common.Address, key, val common.Hash, _ ...stateconf.StateDBStateOption) +} + +// A Processor orchestrates dispatch and collection of results from a [Handler]. +type Processor[D, R, A any] struct { + handler Handler[D, R, A] + workers sync.WaitGroup + + stateShare stateDBSharer + txGas map[common.Hash]uint64 + + prefetch, process chan *job + data [](chan D) + results [](chan result[R]) + aggregated chan A +} + +type job struct { + index int + tx *types.Transaction +} + +type result[T any] struct { + tx common.Hash + val *T +} + +// New constructs a new [Processor] with the specified number of concurrent +// workers. [Processor.Close] must be called after the final call to +// [Processor.FinishBlock] to avoid leaking goroutines. +func New[D, R, A any](h Handler[D, R, A], prefetchers, processors int) *Processor[D, R, A] { + prefetchers = max(prefetchers, 1) + processors = max(processors, 1) + workers := prefetchers + processors + + p := &Processor[D, R, A]{ + handler: h, + stateShare: stateDBSharer{ + workers: workers, + nextAvailable: make(chan struct{}), + }, + txGas: make(map[common.Hash]uint64), + prefetch: make(chan *job), + process: make(chan *job), + aggregated: make(chan A), + } + + p.workers.Add(workers) // for shutdown via [Processor.Close] + p.stateShare.wg.Add(workers) // for readiness of [Processor.worker] loops + for range prefetchers { + go p.worker(p.prefetch, nil) + } + for range processors { + go p.worker(nil, p.process) + } + p.stateShare.wg.Wait() + + return p +} + +// A stateDBSharer allows concurrent workers to make copies of a primary +// database. When the `nextAvailable` channel is closed, all workers call +// [state.StateDB.Copy] then signal completion on the [sync.WaitGroup]. The +// channel is replaced for each round of distribution. +type stateDBSharer struct { + nextAvailable chan struct{} + primary *state.StateDB + mu sync.Mutex + workers int + wg sync.WaitGroup +} + +func (s *stateDBSharer) distribute(sdb *state.StateDB) { + s.primary = sdb // no need to Copy() as each worker does it + + ch := s.nextAvailable // already copied by [Processor.worker], which is waiting for it to close + s.nextAvailable = make(chan struct{}) // will be copied, ready for the next distribution + + s.wg.Add(s.workers) + close(ch) + s.wg.Wait() +} + +func (p *Processor[D, R, A]) worker(prefetch, process chan *job) { + defer p.workers.Done() + + var sdb *state.StateDB + share := &p.stateShare + stateAvailable := share.nextAvailable + // Without this signal of readiness, a premature call to + // [Processor.StartBlock] could replace `share.nextAvailable` before we've + // copied it. + share.wg.Done() + + for { + select { + case <-stateAvailable: // guaranteed at the beginning of each block + share.mu.Lock() + sdb = share.primary.Copy() + share.mu.Unlock() + + stateAvailable = share.nextAvailable + share.wg.Done() + + case job, ok := <-prefetch: + if !ok { + return + } + p.data[job.index] <- p.handler.Prefetch(sdb, job.index, job.tx) + + case job, ok := <-process: + if !ok { + return + } + + r := p.handler.Process(sdb, job.index, job.tx, <-p.data[job.index]) + p.results[job.index] <- result[R]{ + tx: job.tx.Hash(), + val: &r, + } + } + } +} + +// Close shuts down the [Processor], after which it can no longer be used. +func (p *Processor[D, R, A]) Close() { + close(p.prefetch) + close(p.process) + p.workers.Wait() +} + +// StartBlock dispatches transactions to the [Handler] and returns immediately. +// It MUST be paired with a call to [Processor.FinishBlock], without overlap of +// blocks. +func (p *Processor[D, R, A]) StartBlock(sdb *state.StateDB, rules params.Rules, b *types.Block) error { + // The distribution mechanism copies the StateDB so we don't need to do it + // here, but the [Handler] is called directly so we do copy. + p.stateShare.distribute(sdb) + p.handler.BeforeBlock( + sdb.Copy(), + types.NewBlockWithHeader( + b.Header(), + ).WithBody( + *b.Body(), + ), + ) + + txs := b.Transactions() + jobs := make([]*job, 0, len(txs)) + + // We can reuse the channels already in the data and results slices because + // they're emptied by [Processor.FinishBlock]. + for i, n := len(p.results), len(txs); i < n; i++ { + p.data = append(p.data, make(chan D, 1)) + p.results = append(p.results, make(chan result[R], 1)) + } + + for i, tx := range txs { + switch do, err := p.shouldProcess(tx, rules); { + case err != nil: + return err + + case do: + jobs = append(jobs, &job{ + index: i, + tx: tx, + }) + + default: + p.results[i] <- result[R]{ + tx: tx.Hash(), + val: nil, + } + } + } + + go func() { + for _, j := range jobs { + p.prefetch <- j + } + }() + go func() { + for _, j := range jobs { + p.process <- j + } + }() + go func() { + n := len(b.Transactions()) + p.aggregated <- p.handler.PostProcess(p.resultIter(n)) + }() + return nil +} + +func (p *Processor[D, R, A]) resultIter(n int) iter.Seq2[int, R] { + return func(yield func(int, R) bool) { + for i := range n { + r, ok := p.Result(i) + if !ok { + continue + } + if !yield(i, r) { + return + } + } + } +} + +// FinishBlock returns the [Processor] to a state ready for the next block. A +// return from FinishBlock guarantees that all dispatched work from the +// respective call to [Processor.StartBlock] has been completed. +func (p *Processor[D, R, A]) FinishBlock(sdb vm.StateDB, b *types.Block, rs types.Receipts) { + p.handler.AfterBlock(sdb, <-p.aggregated, b, rs) + + for i := range len(b.Transactions()) { + // Every result channel is guaranteed to have some value in its buffer + // because [Processor.BeforeBlock] either sends a nil *R or it + // dispatches a job, which will send a non-nil *R. + tx := (<-p.results[i]).tx + delete(p.txGas, tx) + } +} + +// Result blocks until the i'th transaction passed to [Processor.StartBlock] has +// had its result processed, and then returns the value returned by the +// [Handler]. The returned boolean will be false if no processing occurred, +// either because the [Handler] indicated as such or because the transaction +// supplied insufficient gas. +// +// Multiple calls to Result with the same argument are allowed. Callers MUST NOT +// charge the gas price for preprocessing as this is handled by +// [Processor.PreprocessingGasCharge] if registered as a [vm.Preprocessor]. +// +// The same value will be returned by each call with the same argument, such +// that if R is a pointer then modifications will persist between calls. The +// caller does NOT have mutually exclusive access to R, which MUST carry a mutex +// if thread safety is required. +func (p *Processor[D, R, A]) Result(i int) (R, bool) { + ch := p.results[i] + r := <-ch + defer func() { + ch <- r + }() + + if r.val == nil { + // TODO(arr4n) if we're here then the implementoor might have a bug in + // their [Handler], so logging a warning is probably a good idea. + var zero R + return zero, false + } + return *r.val, true +} + +func (p *Processor[R, D, S]) shouldProcess(tx *types.Transaction, rules params.Rules) (process bool, retErr error) { + // An explicit 0 is necessary to avoid [Processor.PreprocessingGasCharge] + // returning [ErrTxUnknown]. + p.txGas[tx.Hash()] = 0 + + cost, ok := p.handler.Gas(tx) + if !ok { + return false, nil + } + defer func() { + if process && retErr == nil { + p.txGas[tx.Hash()] = cost + } + }() + + spent, err := txIntrinsicGas(tx, &rules) + if err != nil { + return false, fmt.Errorf("calculating intrinsic gas of %v: %v", tx.Hash(), err) + } + if spent > tx.Gas() { + // If this happens then consensus has a bug because the tx shouldn't + // have been included. We include the check, however, for completeness. + return false, core.ErrIntrinsicGas + } + return tx.Gas()-spent >= cost, nil +} + +func txIntrinsicGas(tx *types.Transaction, rules *params.Rules) (uint64, error) { + return intrinsicGas(tx.Data(), tx.AccessList(), tx.To(), rules) +} + +func intrinsicGas(data []byte, access types.AccessList, txTo *common.Address, rules *params.Rules) (uint64, error) { + create := txTo == nil + return core.IntrinsicGas( + data, + access, + create, + rules.IsHomestead, + rules.IsIstanbul, // EIP-2028 + rules.IsShanghai, // EIP-3860 + ) +} + +// ErrTxUnknown is returned by [Processor.PreprocessingGasCharge] if it is +// called with a transaction hash that wasn't in the last block passed to +// [Processor.StartBlock]. +var ErrTxUnknown = errors.New("transaction unknown by parallel preprocessor") + +// PreprocessingGasCharge implements the [vm.Preprocessor] interface and MUST be +// registered via [vm.RegisterHooks] to ensure proper gas accounting. +func (p *Processor[R, D, S]) PreprocessingGasCharge(tx common.Hash) (uint64, error) { + g, ok := p.txGas[tx] + if !ok { + return 0, fmt.Errorf("%w: %v", ErrTxUnknown, tx) + } + return g, nil +} + +var _ vm.Preprocessor = (*Processor[any, any, any])(nil) diff --git a/libevm/precompiles/parallel/parallel_test.go b/libevm/precompiles/parallel/parallel_test.go new file mode 100644 index 00000000000..d949cbd68f8 --- /dev/null +++ b/libevm/precompiles/parallel/parallel_test.go @@ -0,0 +1,395 @@ +// Copyright 2025 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package parallel + +import ( + "encoding/binary" + "iter" + "maps" + "math" + "math/big" + "math/rand/v2" + "slices" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/core/vm" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/libevm" + "github.com/ava-labs/libevm/libevm/ethtest" + "github.com/ava-labs/libevm/libevm/hookstest" + "github.com/ava-labs/libevm/params" + "github.com/ava-labs/libevm/trie" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, goleak.IgnoreCurrent()) +} + +type recorder struct { + gas uint64 + addr common.Address + blockKey, prefetchKey, processKey common.Hash + + gotHeaderExtra []byte + gotBlockVal common.Hash + gotReceipts types.Receipts + gotPerTx map[int]recorded +} + +func (r *recorder) BeforeBlock(sdb libevm.StateReader, b *types.Block) { + r.gotHeaderExtra = slices.Clone(b.Header().Extra) + r.gotBlockVal = sdb.GetState(r.addr, r.blockKey) +} + +func (r *recorder) Gas(tx *types.Transaction) (uint64, bool) { + if to := tx.To(); to != nil && *to == r.addr { + return r.gas, true + } + return 0, false +} + +func (r *recorder) Prefetch(sdb libevm.StateReader, i int, tx *types.Transaction) common.Hash { + return sdb.GetState(r.addr, r.prefetchKey) +} + +type recorded struct { + HeaderExtra, TxData []byte + Block, Prefetch, Process common.Hash +} + +func (r *recorder) Process(sdb libevm.StateReader, i int, tx *types.Transaction, prefetched common.Hash) recorded { + return recorded{ + HeaderExtra: slices.Clone(r.gotHeaderExtra), + TxData: slices.Clone(tx.Data()), + Block: r.gotBlockVal, + Prefetch: prefetched, + Process: sdb.GetState(r.addr, r.processKey), + } +} + +func (r *recorded) asLog() *types.Log { + return &types.Log{ + Topics: []common.Hash{ + r.Block, r.Prefetch, r.Process, + }, + Data: slices.Concat(r.HeaderExtra, []byte("|"), r.TxData), + } +} + +func (r *recorder) PostProcess(results iter.Seq2[int, recorded]) map[int]recorded { + return maps.Collect(results) +} + +func (r *recorder) AfterBlock(_ StateDB, perTx map[int]recorded, _ *types.Block, rs types.Receipts) { + r.gotReceipts = slices.Clone(rs) + r.gotPerTx = perTx +} + +func asHash(s string) (h common.Hash) { + copy(h[:], []byte(s)) + return +} + +func TestProcessor(t *testing.T) { + handler := &recorder{ + addr: common.Address{'c', 'o', 'n', 'c', 'a', 't'}, + gas: 1e6, + blockKey: asHash("block"), + prefetchKey: asHash("prefetch"), + processKey: asHash("process"), + } + p := New(handler, 8, 8) + t.Cleanup(p.Close) + + type blockParams struct { + numTxs int + sendToAddrEvery, sufficientGasEvery int + } + + // Each set of params is effectively a test case, but they are all run on + // the same [Processor]. + tests := []blockParams{ + { + numTxs: 0, + }, + { + numTxs: 500, + sendToAddrEvery: 7, + sufficientGasEvery: 5, + }, + { + numTxs: 1_000, + sendToAddrEvery: 7, + sufficientGasEvery: 5, + }, + { + numTxs: 1_000, + sendToAddrEvery: 11, + sufficientGasEvery: 3, + }, + { + numTxs: 100, + sendToAddrEvery: 1, + sufficientGasEvery: 1, + }, + { + numTxs: 0, + }, + } + + rng := rand.New(rand.NewPCG(0, 0)) //nolint:gosec // Reproducibility is useful for testing + for range 100 { + tests = append(tests, blockParams{ + numTxs: rng.IntN(1000), + sendToAddrEvery: 1 + rng.IntN(30), + sufficientGasEvery: 1 + rng.IntN(30), + }) + } + + _, _, sdb := ethtest.NewEmptyStateDB(t) + h := handler + blockVal := asHash("block_val") + sdb.SetState(h.addr, h.blockKey, blockVal) + prefetchVal := asHash("prefetch_val") + sdb.SetState(h.addr, h.prefetchKey, prefetchVal) + processVal := asHash("process_val") + sdb.SetState(h.addr, h.processKey, processVal) + + for _, tt := range tests { + t.Run("", func(t *testing.T) { + t.Logf("%+v", tt) + + var rules params.Rules + txs := make(types.Transactions, tt.numTxs) + wantProcessed := make([]bool, tt.numTxs) + for i := range len(txs) { + var ( + to common.Address + extraGas uint64 + ) + + wantProcessed[i] = true + if i%tt.sendToAddrEvery == 0 { + to = handler.addr + } else { + wantProcessed[i] = false + } + if i%tt.sufficientGasEvery == 0 { + extraGas = handler.gas + } else { + wantProcessed[i] = false + } + + data := binary.BigEndian.AppendUint64(nil, uint64(i)) + gas, err := intrinsicGas(data, types.AccessList{}, &handler.addr, &rules) + require.NoError(t, err, "core.IntrinsicGas(%#x, nil, false, ...)", data) + + txs[i] = types.NewTx(&types.LegacyTx{ + To: &to, + Data: data, + Gas: gas + extraGas, + }) + } + + extra := []byte("extra") + block := types.NewBlock(&types.Header{Extra: extra}, txs, nil, nil, trie.NewStackTrie(nil)) + require.NoError(t, p.StartBlock(sdb, rules, block), "StartBlock()") + + wantPerTx := make(map[int]recorded) + for i, tx := range txs { + wantOK := wantProcessed[i] + + var want recorded + if wantOK { + want = recorded{ + HeaderExtra: extra, + Block: blockVal, + Prefetch: prefetchVal, + Process: processVal, + TxData: tx.Data(), + } + wantPerTx[i] = want + } + + got, gotOK := p.Result(i) + if gotOK != wantOK { + t.Errorf("Result(%d) got ok %t; want %t", i, gotOK, wantOK) + continue + } + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Result(%d) diff (-want +got):\n%s", i, diff) + } + } + + p.FinishBlock(sdb, block, nil) + if diff := cmp.Diff(wantPerTx, h.gotPerTx); diff != "" { + t.Errorf("handler.PostProcess() argument diff (-want +got):\n%s", diff) + } + }) + + if t.Failed() { + break + } + } +} + +type vmHooks struct { + vm.Preprocessor // the [Processor] + vm.NOOPHooks +} + +func (h *vmHooks) PreprocessingGasCharge(tx common.Hash) (uint64, error) { + return h.Preprocessor.PreprocessingGasCharge(tx) +} + +func TestIntegration(t *testing.T) { + const handlerGas = 500 + handler := &recorder{ + addr: common.Address{'c', 'o', 'n', 'c', 'a', 't'}, + gas: handlerGas, + } + sut := New(handler, 8, 8) + t.Cleanup(sut.Close) + + vm.RegisterHooks(&vmHooks{Preprocessor: sut}) + t.Cleanup(vm.TestOnlyClearRegisteredHooks) + + stub := &hookstest.Stub{ + PrecompileOverrides: map[common.Address]libevm.PrecompiledContract{ + handler.addr: vm.NewStatefulPrecompile(func(env vm.PrecompileEnvironment, input []byte) (ret []byte, err error) { + sdb := env.StateDB() + txi, txh := sdb.TxIndex(), sdb.TxHash() + + // Precompiles MUST NOT charge gas for the preprocessing as it + // would then be double-counted. + got, ok := sut.Result(txi) + if !ok { + t.Errorf("no result for tx[%d] %v", txi, txh) + } + sdb.AddLog(got.asLog()) + return nil, nil + }), + }, + } + stub.Register(t) + + key, err := crypto.GenerateKey() + require.NoErrorf(t, err, "crypto.GenerateKey()") + eoa := crypto.PubkeyToAddress(key.PublicKey) + + state, evm := ethtest.NewZeroEVM(t) + state.CreateAccount(eoa) + state.SetBalance(eoa, new(uint256.Int).SetAllOne()) + + var ( + txs types.Transactions + want types.Receipts + ) + ignore := cmp.Options{ + cmpopts.IgnoreFields( + types.Receipt{}, + "PostState", "CumulativeGasUsed", "BlockNumber", "BlockHash", "Bloom", + ), + cmpopts.IgnoreFields(types.Log{}, "BlockHash"), + } + + header := &types.Header{ + Number: big.NewInt(0), + BaseFee: big.NewInt(0), + } + config := evm.ChainConfig() + rules := config.Rules(header.Number, true, header.Time) + signer := types.MakeSigner(config, header.Number, header.Time) + + for i, addr := range []common.Address{ + {'o', 't', 'h', 'e', 'r'}, + handler.addr, + } { + ui := uint(i) + data := []byte("hello, world") + + gas, err := intrinsicGas(data, types.AccessList{}, &addr, &rules) + require.NoError(t, err, "core.IntrinsicGas(%#x, nil, false, ...)", data) + if addr == handler.addr { + gas += handlerGas + } + + tx := types.MustSignNewTx(key, signer, &types.LegacyTx{ + Nonce: uint64(ui), + To: &addr, + Data: data, + Gas: gas, + }) + txs = append(txs, tx) + + wantR := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: tx.Hash(), + GasUsed: gas, + TransactionIndex: ui, + } + if addr == handler.addr { + want := (&recorded{ + TxData: tx.Data(), + }).asLog() + + want.TxHash = tx.Hash() + want.TxIndex = ui + + wantR.Logs = []*types.Log{want} + } + want = append(want, wantR) + } + + block := types.NewBlock(header, txs, nil, nil, trie.NewStackTrie(nil)) + require.NoError(t, sut.StartBlock(state, rules, block), "StartBlock()") + + pool := core.GasPool(math.MaxUint64) + var receipts types.Receipts + for i, tx := range txs { + state.SetTxContext(tx.Hash(), i) + + var usedGas uint64 + receipt, err := core.ApplyTransaction( + evm.ChainConfig(), + ethtest.DummyChainContext(), + &block.Header().Coinbase, + &pool, + state, + block.Header(), + tx, + &usedGas, + vm.Config{}, + ) + require.NoError(t, err, "ApplyTransaction([%d])", i) + receipts = append(receipts, receipt) + } + sut.FinishBlock(state, block, receipts) + + if diff := cmp.Diff(want, handler.gotReceipts, ignore); diff != "" { + t.Errorf("%T diff (-want +got):\n%s", receipts, diff) + } +}