diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 2cb50b91fe21..4a38d041ef65 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -4,15 +4,12 @@ package vm import ( - "context" - "encoding/binary" "flag" "fmt" "maps" "os" "path/filepath" "slices" - "strconv" "strings" "testing" "time" @@ -25,31 +22,12 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/api/metrics" - "github.com/ava-labs/avalanchego/chains/atomic" - "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/leveldb" - "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/genesis" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" - "github.com/ava-labs/avalanchego/snow/engine/enginetest" - "github.com/ava-labs/avalanchego/snow/engine/snowman/block" - "github.com/ava-labs/avalanchego/snow/validators/validatorstest" "github.com/ava-labs/avalanchego/tests" - "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" - "github.com/ava-labs/avalanchego/upgrade" + "github.com/ava-labs/avalanchego/tests/reexecute" "github.com/ava-labs/avalanchego/utils/constants" - "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/timer" - "github.com/ava-labs/avalanchego/utils/units" - "github.com/ava-labs/avalanchego/vms/platformvm/warp" -) - -var ( - mainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") - mainnetCChainID = ids.FromStringOrPanic("2q9e4r6Mu3U68nU1fYjgbR6JvwrRx36CohpAX5UQxse55x1Q5") - mainnetAvaxAssetID = ids.FromStringOrPanic("FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z") ) var ( @@ -187,18 +165,24 @@ func benchmarkReexecuteRange( vmMultiGatherer := metrics.NewPrefixGatherer() r.NoError(prefixGatherer.Register("avalanche_evm", vmMultiGatherer)) - // consensusRegistry includes the chain="C" label and the prefix "avalanche_snowman". - // The consensus registry is passed to the executor to mimic a subset of consensus metrics. - consensusRegistry := prometheus.NewRegistry() - r.NoError(prefixGatherer.Register("avalanche_snowman", consensusRegistry)) - log := tests.NewDefaultLogger("c-chain-reexecution") if metricsServerEnabled { - serverAddr := startServer(b, log, prefixGatherer, metricsPort) + serverAddr := reexecute.StartServer(b, log, prefixGatherer, metricsPort) if metricsCollectorEnabled { - startCollector(b, log, "c-chain-reexecution", labels, serverAddr) + // cChainDashboardPath is the Grafana dashboard path for the C-Chain + // metrics visualization. + cChainDashboardPath := "d/Gl1I20mnk/c-chain" + reexecute.StartCollector( + b, + log, + "c-chain-reexecution", + labels, + serverAddr, + networkUUID, + cChainDashboardPath, + ) } } @@ -216,9 +200,6 @@ func benchmarkReexecuteRange( zap.Int("chan-size", chanSize), ) - blockChan, err := createBlockChanFromLevelDB(b, blockDir, startBlock, endBlock, chanSize) - r.NoError(err) - dbLogger := tests.NewDefaultLogger("db") db, err := leveldb.New(vmDBDir, nil, dbLogger, prometheus.NewRegistry()) @@ -228,12 +209,26 @@ func benchmarkReexecuteRange( r.NoError(db.Close()) }() - vm, err := newMainnetCChainVM( + genesisConfig := genesis.GetConfig(constants.MainnetID) + vmParams := reexecute.VMParams{ + GenesisBytes: []byte(genesisConfig.CChainGenesis), + ConfigBytes: configBytes, + SubnetID: constants.PrimaryNetworkID, + ChainID: reexecute.MainnetCChainID, + ChainToSubnet: map[ids.ID]ids.ID{ + reexecute.MainnetXChainID: constants.PrimaryNetworkID, + reexecute.MainnetCChainID: constants.PrimaryNetworkID, + ids.Empty: constants.PrimaryNetworkID, + }, + } + + vm, err := reexecute.NewMainnetVM( ctx, + &factory.Factory{}, db, chainDataDir, - configBytes, vmMultiGatherer, + vmParams, ) r.NoError(err) defer func() { @@ -241,415 +236,27 @@ func benchmarkReexecuteRange( r.NoError(vm.Shutdown(ctx)) }() - config := vmExecutorConfig{ - Log: tests.NewDefaultLogger("vm-executor"), - Registry: consensusRegistry, - ExecutionTimeout: executionTimeout, - StartBlock: startBlock, - EndBlock: endBlock, - } - executor, err := newVMExecutor(vm, config) - r.NoError(err) + executor := reexecute.NewVMExecutor( + b, + vm, + blockDir, + startBlock, + endBlock, + chanSize, + executionTimeout, + prefixGatherer, + ) start := time.Now() - r.NoError(executor.executeSequence(ctx, blockChan)) + r.NoError(executor.Run(ctx)) elapsed := time.Since(start) b.ReportMetric(0, "ns/op") // Set default ns/op to 0 to hide from the output getTopLevelMetrics(b, prefixGatherer, elapsed) // Report the desired top-level metrics } -func newMainnetCChainVM( - ctx context.Context, - vmAndSharedMemoryDB database.Database, - chainDataDir string, - configBytes []byte, - metricsGatherer metrics.MultiGatherer, -) (block.ChainVM, error) { - factory := factory.Factory{} - vmIntf, err := factory.New(logging.NoLog{}) - if err != nil { - return nil, fmt.Errorf("failed to create VM from factory: %w", err) - } - vm := vmIntf.(block.ChainVM) - - blsKey, err := localsigner.New() - if err != nil { - return nil, fmt.Errorf("failed to create BLS key: %w", err) - } - - blsPublicKey := blsKey.PublicKey() - warpSigner := warp.NewSigner(blsKey, constants.MainnetID, mainnetCChainID) - - genesisConfig := genesis.GetConfig(constants.MainnetID) - - sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), vmAndSharedMemoryDB) - atomicMemory := atomic.NewMemory(sharedMemoryDB) - - chainIDToSubnetID := map[ids.ID]ids.ID{ - mainnetXChainID: constants.PrimaryNetworkID, - mainnetCChainID: constants.PrimaryNetworkID, - ids.Empty: constants.PrimaryNetworkID, - } - - if err := vm.Initialize( - ctx, - &snow.Context{ - NetworkID: constants.MainnetID, - SubnetID: constants.PrimaryNetworkID, - ChainID: mainnetCChainID, - NodeID: ids.GenerateTestNodeID(), - PublicKey: blsPublicKey, - NetworkUpgrades: upgrade.Mainnet, - - XChainID: mainnetXChainID, - CChainID: mainnetCChainID, - AVAXAssetID: mainnetAvaxAssetID, - - Log: tests.NewDefaultLogger("mainnet-vm-reexecution"), - SharedMemory: atomicMemory.NewSharedMemory(mainnetCChainID), - BCLookup: ids.NewAliaser(), - Metrics: metricsGatherer, - - WarpSigner: warpSigner, - - ValidatorState: &validatorstest.State{ - GetSubnetIDF: func(_ context.Context, chainID ids.ID) (ids.ID, error) { - subnetID, ok := chainIDToSubnetID[chainID] - if ok { - return subnetID, nil - } - return ids.Empty, fmt.Errorf("unknown chainID: %s", chainID) - }, - }, - ChainDataDir: chainDataDir, - }, - prefixdb.New([]byte("vm"), vmAndSharedMemoryDB), - []byte(genesisConfig.CChainGenesis), - nil, - configBytes, - nil, - &enginetest.Sender{}, - ); err != nil { - return nil, fmt.Errorf("failed to initialize VM: %w", err) - } - - return vm, nil -} - -type blockResult struct { - BlockBytes []byte - Height uint64 - Err error -} - -type vmExecutorConfig struct { - Log logging.Logger - // Registry is the registry to register the metrics with. - Registry prometheus.Registerer - // ExecutionTimeout is the maximum timeout to continue executing blocks. - // If 0, no timeout is applied. If non-zero, the executor will exit early - // WITHOUT error after hitting the timeout. - // This is useful to provide consistent duration benchmarks. - ExecutionTimeout time.Duration - - // [StartBlock, EndBlock] defines the range (inclusive) of blocks to execute. - StartBlock, EndBlock uint64 -} - -type vmExecutor struct { - config vmExecutorConfig - vm block.ChainVM - metrics *consensusMetrics - etaTracker *timer.EtaTracker -} - -func newVMExecutor(vm block.ChainVM, config vmExecutorConfig) (*vmExecutor, error) { - metrics, err := newConsensusMetrics(config.Registry) - if err != nil { - return nil, fmt.Errorf("failed to create consensus metrics: %w", err) - } - - return &vmExecutor{ - vm: vm, - metrics: metrics, - config: config, - // ETA tracker uses a 10-sample moving window to smooth rate estimates, - // and a 1.2 slowdown factor to slightly pad ETA early in the run, - // tapering to 1.0 as progress approaches 100%. - etaTracker: timer.NewEtaTracker(10, 1.2), - }, nil -} - -func (e *vmExecutor) execute(ctx context.Context, blockBytes []byte) error { - blk, err := e.vm.ParseBlock(ctx, blockBytes) - if err != nil { - return fmt.Errorf("failed to parse block: %w", err) - } - if err := blk.Verify(ctx); err != nil { - return fmt.Errorf("failed to verify block %s at height %d: %w", blk.ID(), blk.Height(), err) - } - - if err := blk.Accept(ctx); err != nil { - return fmt.Errorf("failed to accept block %s at height %d: %w", blk.ID(), blk.Height(), err) - } - e.metrics.lastAcceptedHeight.Set(float64(blk.Height())) - - return nil -} - -func (e *vmExecutor) executeSequence(ctx context.Context, blkChan <-chan blockResult) error { - blkID, err := e.vm.LastAccepted(ctx) - if err != nil { - return fmt.Errorf("failed to get last accepted block: %w", err) - } - blk, err := e.vm.GetBlock(ctx, blkID) - if err != nil { - return fmt.Errorf("failed to get last accepted block by blkID %s: %w", blkID, err) - } - - start := time.Now() - e.config.Log.Info("last accepted block", - zap.Stringer("blkID", blkID), - zap.Uint64("height", blk.Height()), - ) - - // Initialize ETA tracking with a baseline sample at 0 progress - totalWork := e.config.EndBlock - e.config.StartBlock - e.etaTracker.AddSample(0, totalWork, start) - - if e.config.ExecutionTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, e.config.ExecutionTimeout) - defer cancel() - } - - for blkResult := range blkChan { - if blkResult.Err != nil { - return blkResult.Err - } - - if blkResult.Height%1000 == 0 { - completed := blkResult.Height - e.config.StartBlock - etaPtr, progressPercentage := e.etaTracker.AddSample(completed, totalWork, time.Now()) - if etaPtr != nil { - e.config.Log.Info("executing block", - zap.Uint64("height", blkResult.Height), - zap.Float64("progress_pct", progressPercentage), - zap.Duration("eta", *etaPtr), - ) - } else { - e.config.Log.Info("executing block", - zap.Uint64("height", blkResult.Height), - zap.Float64("progress_pct", progressPercentage), - ) - } - } - if err := e.execute(ctx, blkResult.BlockBytes); err != nil { - return err - } - - if err := ctx.Err(); err != nil { - e.config.Log.Info("exiting early due to context timeout", - zap.Duration("elapsed", time.Since(start)), - zap.Duration("execution-timeout", e.config.ExecutionTimeout), - zap.Error(ctx.Err()), - ) - return nil - } - } - e.config.Log.Info("finished executing sequence") - - return nil -} - -func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, endBlock uint64, chanSize int) (<-chan blockResult, error) { - r := require.New(tb) - ch := make(chan blockResult, chanSize) - - db, err := leveldb.New(sourceDir, nil, logging.NoLog{}, prometheus.NewRegistry()) - if err != nil { - return nil, fmt.Errorf("failed to create leveldb database from %q: %w", sourceDir, err) - } - tb.Cleanup(func() { - r.NoError(db.Close()) - }) - - go func() { - defer close(ch) - - iter := db.NewIteratorWithStart(blockKey(startBlock)) - defer iter.Release() - - currentHeight := startBlock - - for iter.Next() { - key := iter.Key() - if len(key) != database.Uint64Size { - ch <- blockResult{ - BlockBytes: nil, - Err: fmt.Errorf("expected key length %d while looking for block at height %d, got %d", database.Uint64Size, currentHeight, len(key)), - } - return - } - height := binary.BigEndian.Uint64(key) - if height != currentHeight { - ch <- blockResult{ - BlockBytes: nil, - Err: fmt.Errorf("expected next height %d, got %d", currentHeight, height), - } - return - } - ch <- blockResult{ - BlockBytes: iter.Value(), - Height: height, - } - currentHeight++ - if currentHeight > endBlock { - break - } - } - if iter.Error() != nil { - ch <- blockResult{ - BlockBytes: nil, - Err: fmt.Errorf("failed to iterate over blocks at height %d: %w", currentHeight, iter.Error()), - } - return - } - }() - - return ch, nil -} - -func blockKey(height uint64) []byte { - return binary.BigEndian.AppendUint64(nil, height) -} - func TestExportBlockRange(t *testing.T) { - exportBlockRange(t, blockDirSrcArg, blockDirDstArg, startBlockArg, endBlockArg, chanSizeArg) -} - -func exportBlockRange(tb testing.TB, blockDirSrc string, blockDirDst string, startBlock, endBlock uint64, chanSize int) { - r := require.New(tb) - blockChan, err := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) - r.NoError(err) - - db, err := leveldb.New(blockDirDst, nil, logging.NoLog{}, prometheus.NewRegistry()) - r.NoError(err) - tb.Cleanup(func() { - r.NoError(db.Close()) - }) - - batch := db.NewBatch() - for blkResult := range blockChan { - r.NoError(batch.Put(blockKey(blkResult.Height), blkResult.BlockBytes)) - - if batch.Size() > 10*units.MiB { - r.NoError(batch.Write()) - batch = db.NewBatch() - } - } - - r.NoError(batch.Write()) -} - -type consensusMetrics struct { - lastAcceptedHeight prometheus.Gauge -} - -// newConsensusMetrics creates a subset of the metrics from snowman consensus -// [engine](../../snow/engine/snowman/metrics.go). -// -// The registry passed in is expected to be registered with the prefix -// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled -// by the[chain manager](../../../chains/manager.go). -func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { - m := &consensusMetrics{ - lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "last_accepted_height", - Help: "last height accepted", - }), - } - if err := registry.Register(m.lastAcceptedHeight); err != nil { - return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) - } - return m, nil -} - -// startServer starts a Prometheus server for the provided gatherer and returns -// the server address. -func startServer( - tb testing.TB, - log logging.Logger, - gatherer prometheus.Gatherer, - port uint64, -) string { - r := require.New(tb) - - server, err := tests.NewPrometheusServerWithPort(gatherer, port) - r.NoError(err) - - log.Info("metrics endpoint available", - zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), - ) - - tb.Cleanup(func() { - r.NoError(server.Stop()) - }) - - return server.Address() -} - -// startCollector starts a Prometheus collector configured to scrape the server -// listening on serverAddr. startCollector also attaches the provided labels + -// Github labels if available to the collected metrics. -func startCollector(tb testing.TB, log logging.Logger, name string, labels map[string]string, serverAddr string) { - r := require.New(tb) - - startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) - defer cancel() - - logger := tests.NewDefaultLogger("prometheus") - r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) - - var sdConfigFilePath string - tb.Cleanup(func() { - // Ensure a final metrics scrape. - // This default delay is set above the default scrape interval used by StartPrometheus. - time.Sleep(tmpnet.NetworkShutdownDelay) - - r.NoError(func() error { - if sdConfigFilePath != "" { - return os.Remove(sdConfigFilePath) - } - return nil - }(), - ) - - //nolint:usetesting // t.Context() is already canceled inside the cleanup function - checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) - defer cancel() - r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) - }) - - sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(name, tmpnet.SDConfig{ - Targets: []string{serverAddr}, - Labels: labels, - }, true /* withGitHubLabels */) - r.NoError(err) - - var ( - dashboardPath = "d/Gl1I20mnk/c-chain" - grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath - startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) - ) - - log.Info("metrics available via grafana", - zap.String( - "url", - tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), - ), - ) + reexecute.ExportBlockRange(t, blockDirSrcArg, blockDirDstArg, startBlockArg, endBlockArg, chanSizeArg) } // parseCustomLabels parses a comma-separated list of key-value pairs into a map diff --git a/tests/reexecute/metrics.go b/tests/reexecute/metrics.go new file mode 100644 index 000000000000..22684a7f8bd1 --- /dev/null +++ b/tests/reexecute/metrics.go @@ -0,0 +1,104 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package reexecute + +import ( + "context" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/tests" + "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" + "github.com/ava-labs/avalanchego/utils/logging" +) + +// StartServer starts a Prometheus server for the provided gatherer and returns +// the server address. +func StartServer( + tb testing.TB, + log logging.Logger, + gatherer prometheus.Gatherer, + port uint64, +) string { + r := require.New(tb) + + server, err := tests.NewPrometheusServerWithPort(gatherer, port) + r.NoError(err) + + log.Info("metrics endpoint available", + zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), + ) + + tb.Cleanup(func() { + r.NoError(server.Stop()) + }) + + return server.Address() +} + +// StartCollector starts a Prometheus collector configured to scrape the server +// listening on serverAddr. StartCollector also attaches the provided labels + +// Github labels if available to the collected metrics. +func StartCollector( + tb testing.TB, + log logging.Logger, + sdConfigName string, + labels map[string]string, + serverAddr string, + networkUUID string, + grafanaDashboardPath string, +) { + r := require.New(tb) + + startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) + defer cancel() + + logger := tests.NewDefaultLogger("prometheus") + r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) + + var sdConfigFilePath string + tb.Cleanup(func() { + // Ensure a final metrics scrape. + // This default delay is set above the default scrape interval used by StartPrometheus. + time.Sleep(tmpnet.NetworkShutdownDelay) + + r.NoError(func() error { + if sdConfigFilePath != "" { + return os.Remove(sdConfigFilePath) + } + return nil + }(), + ) + + //nolint:usetesting // t.Context() is already canceled inside the cleanup function + checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) + defer cancel() + r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) + }) + + sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(sdConfigName, tmpnet.SDConfig{ + Targets: []string{serverAddr}, + Labels: labels, + }, true /* withGitHubLabels */) + r.NoError(err) + + var ( + grafanaURI = tmpnet.DefaultBaseGrafanaURI + grafanaDashboardPath + startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) + ) + + log.Info("metrics available via grafana", + zap.String( + "url", + tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), + ), + ) +} diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go new file mode 100644 index 000000000000..60c676f75b70 --- /dev/null +++ b/tests/reexecute/vm_executor.go @@ -0,0 +1,381 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package reexecute + +import ( + "context" + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/chains/atomic" + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/enginetest" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/validators/validatorstest" + "github.com/ava-labs/avalanchego/tests" + "github.com/ava-labs/avalanchego/upgrade" + "github.com/ava-labs/avalanchego/utils/constants" + "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/timer" + "github.com/ava-labs/avalanchego/utils/units" + "github.com/ava-labs/avalanchego/vms" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +var ( + MainnetCChainID = ids.FromStringOrPanic("2q9e4r6Mu3U68nU1fYjgbR6JvwrRx36CohpAX5UQxse55x1Q5") + MainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") + + mainnetAvaxAssetID = ids.FromStringOrPanic("FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z") +) + +type VMExecutor struct { + log logging.Logger + vm block.ChainVM + metrics *consensusMetrics + executionTimeout time.Duration + startBlock uint64 + endBlock uint64 + blkChan <-chan blockResult + etaTracker *timer.EtaTracker +} + +// NewVMExecutor creates a VMExecutor that reexecutes blocks from startBlock to +// endBlock. NewVMExecutor starts reading blocks from blockDir and sets up +// metrics for the reexecution test. +func NewVMExecutor( + tb testing.TB, + vm block.ChainVM, + blockDir string, + startBlock uint64, + endBlock uint64, + chanSize int, + executionTimeout time.Duration, + prefixGatherer metrics.MultiGatherer, +) *VMExecutor { + r := require.New(tb) + + blockChan := createBlockChanFromLevelDB( + tb, + blockDir, + startBlock, + endBlock, + chanSize, + ) + + consensusRegistry := prometheus.NewRegistry() + r.NoError(prefixGatherer.Register("avalanche_snowman", consensusRegistry)) + + metrics, err := newConsensusMetrics(consensusRegistry) + r.NoError(err) + + return &VMExecutor{ + log: tests.NewDefaultLogger("vm-executor"), + vm: vm, + metrics: metrics, + executionTimeout: executionTimeout, + startBlock: startBlock, + endBlock: endBlock, + blkChan: blockChan, + // ETA tracker uses a 10-sample moving window to smooth rate estimates, + // and a 1.2 slowdown factor to slightly pad ETA early in the run, + // tapering to 1.0 as progress approaches 100%. + etaTracker: timer.NewEtaTracker(10, 1.2), + } +} + +// Run reexecutes the blocks against the VM. It parses, verifies, and accepts +// each block while tracking progress and respecting the execution timeout if configured. +func (e *VMExecutor) Run(ctx context.Context) error { + blkID, err := e.vm.LastAccepted(ctx) + if err != nil { + return fmt.Errorf("failed to get last accepted block: %w", err) + } + blk, err := e.vm.GetBlock(ctx, blkID) + if err != nil { + return fmt.Errorf("failed to get last accepted block by blkID %s: %w", blkID, err) + } + + start := time.Now() + e.log.Info("last accepted block", + zap.Stringer("blkID", blkID), + zap.Uint64("height", blk.Height()), + ) + + // Initialize ETA tracking with a baseline sample at 0 progress + totalWork := e.endBlock - e.startBlock + e.etaTracker.AddSample(0, totalWork, start) + + if e.executionTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, e.executionTimeout) + defer cancel() + } + + for blkResult := range e.blkChan { + if blkResult.err != nil { + return blkResult.err + } + + if blkResult.height%1000 == 0 { + completed := blkResult.height - e.startBlock + etaPtr, progressPercentage := e.etaTracker.AddSample(completed, totalWork, time.Now()) + if etaPtr != nil { + e.log.Info("executing block", + zap.Uint64("height", blkResult.height), + zap.Float64("progress_pct", progressPercentage), + zap.Duration("eta", *etaPtr), + ) + } else { + e.log.Info("executing block", + zap.Uint64("height", blkResult.height), + zap.Float64("progress_pct", progressPercentage), + ) + } + } + if err := e.execute(ctx, blkResult.blockBytes); err != nil { + return err + } + + if err := ctx.Err(); err != nil { + e.log.Info("exiting early due to context timeout", + zap.Duration("elapsed", time.Since(start)), + zap.Duration("execution-timeout", e.executionTimeout), + zap.Error(ctx.Err()), + ) + return nil + } + } + e.log.Info("finished executing sequence") + + return nil +} + +func (e *VMExecutor) execute(ctx context.Context, blockBytes []byte) error { + blk, err := e.vm.ParseBlock(ctx, blockBytes) + if err != nil { + return fmt.Errorf("failed to parse block: %w", err) + } + if err := blk.Verify(ctx); err != nil { + return fmt.Errorf("failed to verify block %s at height %d: %w", blk.ID(), blk.Height(), err) + } + + if err := blk.Accept(ctx); err != nil { + return fmt.Errorf("failed to accept block %s at height %d: %w", blk.ID(), blk.Height(), err) + } + e.metrics.lastAcceptedHeight.Set(float64(blk.Height())) + + return nil +} + +type VMParams struct { + GenesisBytes []byte + UpgradeBytes []byte + ConfigBytes []byte + SubnetID ids.ID + ChainID ids.ID + // ChainToSubnet maps chain IDs to their subnet IDs. This mapping is used by + // the VM to validate cross-chain operations and warp messages. + ChainToSubnet map[ids.ID]ids.ID +} + +// NewMainnetVM creates and initializes a VM configured for mainnet block +// reexecution tests. The VM is initialized with mainnet-specific settings +// including the mainnet network ID, upgrade schedule, and chain configurations. +// Both subnetID and chainID must correspond to subnets/chains that exist on mainnet. +func NewMainnetVM( + ctx context.Context, + factory vms.Factory, + db database.Database, + chainDataDir string, + metricsGatherer metrics.MultiGatherer, + vmParams VMParams, +) (block.ChainVM, error) { + vmIntf, err := factory.New(logging.NoLog{}) + if err != nil { + return nil, fmt.Errorf("failed to create VM from factory: %w", err) + } + vm := vmIntf.(block.ChainVM) + + blsKey, err := localsigner.New() + if err != nil { + return nil, fmt.Errorf("failed to create BLS key: %w", err) + } + + blsPublicKey := blsKey.PublicKey() + warpSigner := warp.NewSigner(blsKey, constants.MainnetID, vmParams.ChainID) + + sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), db) + atomicMemory := atomic.NewMemory(sharedMemoryDB) + + if err := vm.Initialize( + ctx, + &snow.Context{ + NetworkID: constants.MainnetID, + SubnetID: vmParams.SubnetID, + ChainID: vmParams.ChainID, + NodeID: ids.GenerateTestNodeID(), + PublicKey: blsPublicKey, + NetworkUpgrades: upgrade.Mainnet, + + XChainID: MainnetXChainID, + CChainID: MainnetCChainID, + AVAXAssetID: mainnetAvaxAssetID, + + Log: tests.NewDefaultLogger("mainnet-vm-reexecution"), + SharedMemory: atomicMemory.NewSharedMemory(vmParams.ChainID), + BCLookup: ids.NewAliaser(), + Metrics: metricsGatherer, + + WarpSigner: warpSigner, + + ValidatorState: &validatorstest.State{ + GetSubnetIDF: func(_ context.Context, chainID ids.ID) (ids.ID, error) { + subnetID, ok := vmParams.ChainToSubnet[chainID] + if ok { + return subnetID, nil + } + return ids.Empty, fmt.Errorf("unknown chainID: %s", chainID) + }, + }, + ChainDataDir: chainDataDir, + }, + prefixdb.New([]byte("vm"), db), + vmParams.GenesisBytes, + vmParams.UpgradeBytes, + vmParams.ConfigBytes, + nil, + &enginetest.Sender{}, + ); err != nil { + return nil, fmt.Errorf("failed to initialize VM: %w", err) + } + + return vm, nil +} + +// ExportBlockRange copies blocks from a source LevelDB directory to a +// destination LevelDB directory for the specified block range [startBlock, endBlock]. +func ExportBlockRange(tb testing.TB, blockDirSrc string, blockDirDst string, startBlock, endBlock uint64, chanSize int) { + r := require.New(tb) + blockChan := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) + + db, err := leveldb.New(blockDirDst, nil, logging.NoLog{}, prometheus.NewRegistry()) + r.NoError(err) + tb.Cleanup(func() { + r.NoError(db.Close()) + }) + + batch := db.NewBatch() + for blkResult := range blockChan { + r.NoError(batch.Put(blockKey(blkResult.height), blkResult.blockBytes)) + + if batch.Size() > 10*units.MiB { + r.NoError(batch.Write()) + batch = db.NewBatch() + } + } + + r.NoError(batch.Write()) +} + +type blockResult struct { + blockBytes []byte + height uint64 + err error +} + +func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, endBlock uint64, chanSize int) <-chan blockResult { + r := require.New(tb) + ch := make(chan blockResult, chanSize) + + db, err := leveldb.New(sourceDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + r.NoError(err, "failed to create leveldb database from %q", sourceDir) + tb.Cleanup(func() { + r.NoError(db.Close()) + }) + + go func() { + defer close(ch) + + iter := db.NewIteratorWithStart(blockKey(startBlock)) + defer iter.Release() + + currentHeight := startBlock + + for iter.Next() { + key := iter.Key() + if len(key) != database.Uint64Size { + ch <- blockResult{ + blockBytes: nil, + err: fmt.Errorf("expected key length %d while looking for block at height %d, got %d", database.Uint64Size, currentHeight, len(key)), + } + return + } + height := binary.BigEndian.Uint64(key) + if height != currentHeight { + ch <- blockResult{ + blockBytes: nil, + err: fmt.Errorf("expected next height %d, got %d", currentHeight, height), + } + return + } + ch <- blockResult{ + blockBytes: iter.Value(), + height: height, + } + currentHeight++ + if currentHeight > endBlock { + break + } + } + if iter.Error() != nil { + ch <- blockResult{ + blockBytes: nil, + err: fmt.Errorf("failed to iterate over blocks at height %d: %w", currentHeight, iter.Error()), + } + return + } + }() + + return ch +} + +func blockKey(height uint64) []byte { + return binary.BigEndian.AppendUint64(nil, height) +} + +type consensusMetrics struct { + lastAcceptedHeight prometheus.Gauge +} + +// newConsensusMetrics creates a subset of the metrics from snowman consensus +// [engine](../../snow/engine/snowman/metrics.go). +// +// The registry passed in is expected to be registered with the prefix +// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled +// by the [chain manager](../../chains/manager.go). +func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { + m := &consensusMetrics{ + lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "last_accepted_height", + Help: "last height accepted", + }), + } + if err := registry.Register(m.lastAcceptedHeight); err != nil { + return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) + } + return m, nil +}