Skip to content

Commit a910fc5

Browse files
authored
find all reorgs in current batch (#142)
### TL;DR Improved reorg detection and handling logic to identify and process all reorged blocks in the current range handled by ReorgHandler. ### What changed? - Renamed and enhanced the reorg detection function to explicitly find the first hash mismatch - Added validation for sequential block headers during reorg detection - Implemented parallel block fetching with chunking for better performance - Improved error messages and logging for better debugging - Changed the reorg handling approach to work with specific reorged block numbers instead of a range - Added sequential block number validation to prevent incorrect reorg detection ### How to test? 1. Run the system during a chain reorganization event 2. Verify that reorged blocks are correctly identified and processed 3. Check logs for detailed reorg detection information 4. Monitor metrics to confirm reorg counter increments appropriately 5. Verify parallel block fetching works with different chunk sizes ### Why make this change? The previous implementation had potential gaps in reorg detection and could miss some edge cases. This update provides more robust and efficient reorg handling, with improved error detection and parallel processing capabilities. The changes also make the system more maintainable and easier to debug when issues occur.
2 parents d5cb6b4 + aff1caa commit a910fc5

File tree

2 files changed

+271
-87
lines changed

2 files changed

+271
-87
lines changed

internal/orchestrator/reorg_handler.go

Lines changed: 86 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package orchestrator
33
import (
44
"fmt"
55
"math/big"
6+
"sort"
7+
"sync"
68
"time"
79

810
"github.com/rs/zerolog/log"
@@ -73,7 +75,7 @@ func (rh *ReorgHandler) Start() {
7375
for range ticker.C {
7476
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
7577
if err != nil {
76-
log.Error().Err(err).Msg("Error during reorg handling")
78+
log.Error().Err(err).Msgf("Error during reorg handling: %s", err.Error())
7779
continue
7880
}
7981
if mostRecentBlockChecked == nil {
@@ -107,98 +109,129 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
107109
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
108110
return nil, nil
109111
}
110-
reorgEndIndex := findReorgEndIndex(blockHeaders)
111-
if reorgEndIndex == -1 {
112+
113+
firstMismatchIndex, err := findIndexOfFirstHashMismatch(blockHeaders)
114+
if err != nil {
115+
return nil, fmt.Errorf("error detecting reorgs: %w", err)
116+
}
117+
if firstMismatchIndex == -1 {
118+
log.Debug().Msgf("No reorg detected, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
112119
return mostRecentBlockHeader.Number, nil
113120
}
121+
114122
metrics.ReorgCounter.Inc()
115-
forkPoint, err := rh.findFirstForkedBlockNumber(blockHeaders[reorgEndIndex:])
123+
reorgedBlockNumbers := make([]*big.Int, 0)
124+
err = rh.findReorgedBlockNumbers(blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
116125
if err != nil {
117-
return nil, fmt.Errorf("error while finding fork point: %w", err)
126+
return nil, fmt.Errorf("error finding reorged block numbers: %w", err)
127+
}
128+
129+
if len(reorgedBlockNumbers) == 0 {
130+
log.Debug().Msgf("Reorg was detected, but no reorged block numbers found, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
131+
return mostRecentBlockHeader.Number, nil
118132
}
119-
reorgEndBlock := blockHeaders[reorgEndIndex].Number
120-
err = rh.handleReorg(forkPoint, reorgEndBlock)
133+
134+
err = rh.handleReorg(reorgedBlockNumbers)
121135
if err != nil {
122136
return nil, fmt.Errorf("error while handling reorg: %w", err)
123137
}
124138
return mostRecentBlockHeader.Number, nil
125139
}
126140

127-
func findReorgEndIndex(blockHeadersDescending []common.BlockHeader) (index int) {
141+
func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (int, error) {
128142
for i := 0; i < len(blockHeadersDescending)-1; i++ {
129143
currentBlock := blockHeadersDescending[i]
130144
previousBlockInChain := blockHeadersDescending[i+1]
131-
132145
if currentBlock.Number.Cmp(previousBlockInChain.Number) == 0 { // unmerged block
133146
continue
134147
}
148+
if currentBlock.Number.Cmp(new(big.Int).Add(previousBlockInChain.Number, big.NewInt(1))) != 0 {
149+
return -1, fmt.Errorf("block headers are not sequential - cannot proceed with detecting reorgs. Comparing blocks: %s and %s", currentBlock.Number.String(), previousBlockInChain.Number.String())
150+
}
135151
if currentBlock.ParentHash != previousBlockInChain.Hash {
136-
log.Debug().
137-
Str("currentBlockNumber", currentBlock.Number.String()).
138-
Str("currentBlockHash", currentBlock.Hash).
139-
Str("currentBlockParentHash", currentBlock.ParentHash).
140-
Str("previousBlockNumber", previousBlockInChain.Number.String()).
141-
Str("previousBlockHash", previousBlockInChain.Hash).
142-
Msg("Reorg detected: parent hash mismatch")
143-
return i + 1
152+
return i + 1, nil
144153
}
145154
}
146-
return -1
155+
return -1, nil
147156
}
148157

149-
func (rh *ReorgHandler) findFirstForkedBlockNumber(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
150-
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
158+
func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
159+
newBlocksByNumber, err := rh.getNewBlocksByNumber(blockHeadersDescending)
151160
if err != nil {
152-
return nil, err
161+
return err
153162
}
154-
155-
for i := 0; i < len(reversedBlockHeaders); i++ {
156-
blockHeader := reversedBlockHeaders[i]
157-
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
163+
continueCheckingForReorgs := false
164+
for i := 0; i < len(blockHeadersDescending); i++ {
165+
blockHeader := blockHeadersDescending[i]
166+
fetchedBlock, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
158167
if !ok {
159-
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
168+
return fmt.Errorf("block not found: %s", blockHeader.Number.String())
160169
}
161-
if blockHeader.ParentHash == block.ParentHash && blockHeader.Hash == block.Hash {
162-
if i == 0 {
163-
return nil, fmt.Errorf("unable to find reorg fork point due to block %s being first in the array", blockHeader.Number.String())
170+
if blockHeader.ParentHash != fetchedBlock.ParentHash || blockHeader.Hash != fetchedBlock.Hash {
171+
*reorgedBlockNumbers = append(*reorgedBlockNumbers, blockHeader.Number)
172+
if i == len(blockHeadersDescending)-1 {
173+
continueCheckingForReorgs = true // if last block in range is reorged, we should continue checking
164174
}
165-
previousBlock := reversedBlockHeaders[i-1]
166-
return previousBlock.Number, nil
167175
}
168176
}
169-
fetchUntilBlock := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
170-
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
171-
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, fetchUntilBlock)
172-
if err != nil {
173-
return nil, fmt.Errorf("error getting next headers batch: %w", err)
177+
if continueCheckingForReorgs {
178+
fetchUntilBlock := blockHeadersDescending[len(blockHeadersDescending)-1].Number
179+
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
180+
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, new(big.Int).Sub(fetchUntilBlock, big.NewInt(1))) // we sub 1 to not check the last block again
181+
if err != nil {
182+
return fmt.Errorf("error getting next headers batch: %w", err)
183+
}
184+
sort.Slice(nextHeadersBatch, func(i, j int) bool {
185+
return nextHeadersBatch[i].Number.Cmp(nextHeadersBatch[j].Number) > 0
186+
})
187+
return rh.findReorgedBlockNumbers(nextHeadersBatch, reorgedBlockNumbers)
174188
}
175-
return rh.findFirstForkedBlockNumber(nextHeadersBatch)
189+
return nil
176190
}
177191

178-
func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
179-
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
180-
for _, header := range reversedBlockHeaders {
192+
func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
193+
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
194+
for _, header := range blockHeaders {
181195
blockNumbers = append(blockNumbers, header.Number)
182196
}
183-
blockResults := rh.rpc.GetBlocks(blockNumbers)
197+
blockCount := len(blockNumbers)
198+
chunks := common.BigIntSliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)
199+
200+
var wg sync.WaitGroup
201+
resultsCh := make(chan []rpc.GetBlocksResult, len(chunks))
202+
203+
// TODO: move batching to rpc
204+
log.Debug().Msgf("Reorg handler fetching %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), rh.rpc.GetBlocksPerRequest().Blocks)
205+
for _, chunk := range chunks {
206+
wg.Add(1)
207+
go func(chunk []*big.Int) {
208+
defer wg.Done()
209+
resultsCh <- rh.rpc.GetBlocks(chunk)
210+
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
211+
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
212+
}
213+
}(chunk)
214+
}
215+
go func() {
216+
wg.Wait()
217+
close(resultsCh)
218+
}()
219+
184220
fetchedBlocksByNumber := make(map[string]common.Block)
185-
for _, blockResult := range blockResults {
186-
if blockResult.Error != nil {
187-
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
221+
for batchResults := range resultsCh {
222+
for _, blockResult := range batchResults {
223+
if blockResult.Error != nil {
224+
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
225+
}
226+
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
188227
}
189-
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
190228
}
191229
return &fetchedBlocksByNumber, nil
192230
}
193231

194-
func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
195-
log.Debug().Msgf("Handling reorg from block %s to %s", reorgStart.String(), reorgEnd.String())
196-
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
197-
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
198-
blockRange = append(blockRange, new(big.Int).Set(i))
199-
}
200-
201-
results := rh.worker.Run(blockRange)
232+
func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
233+
log.Debug().Msgf("Handling reorg for blocks %v", reorgedBlockNumbers)
234+
results := rh.worker.Run(reorgedBlockNumbers)
202235
data := make([]common.BlockData, 0, len(results))
203236
blocksToDelete := make([]*big.Int, 0, len(results))
204237
for _, result := range results {

0 commit comments

Comments
 (0)