Skip to content

Commit 4bc1da4

Browse files
authored
make reorg handling backward looking if it has caught up with the chain (#143)
### TL;DR Improved reorg handler block range calculation to support both forward and backward scanning based on the current sync state. ### What changed? - Added `getReorgCheckRange` function to dynamically determine the block range for reorg checks - When catching up (large gap between latest checked and committed blocks), the handler scans forward - When caught up (small gap), the handler looks backward from the latest committed block - Added protection to prevent negative block numbers when scanning backward - Added early exit when the latest checked block equals the target block ### How to test? - Run the new test cases: - `TestReorgHandlerRangeIsForwardLookingWhenItIsCatchingUp` - `TestReorgHandlerRangeIsBackwardLookingWhenItIsCaughtUp` - `TestReorgHandlerRangeStartIs0WhenRangeIsLargerThanProcessedBlocks` - Verify the reorg handler correctly determines scan ranges in different sync states ### Why make this change? The previous implementation always scanned forward, which could miss reorgs when the system was caught up with the chain head. This change ensures more thorough reorg detection by adapting the scan direction based on the current sync state, improving the reliability of reorg detection.
2 parents a910fc5 + 0c2d50b commit 4bc1da4

File tree

2 files changed

+132
-8
lines changed

2 files changed

+132
-8
lines changed

internal/orchestrator/reorg_handler.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,15 @@ func (rh *ReorgHandler) Start() {
9292
select {}
9393
}
9494

95-
func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
96-
toBlock := new(big.Int).Add(fromBlock, big.NewInt(int64(rh.blocksPerScan)))
95+
func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
96+
fromBlock, toBlock, err := rh.getReorgCheckRange(latestCheckedBlock)
97+
if err != nil {
98+
return nil, err
99+
}
100+
if toBlock.Cmp(latestCheckedBlock) == 0 {
101+
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", toBlock.String(), latestCheckedBlock.String())
102+
return nil, nil
103+
}
97104
log.Debug().Msgf("Checking for reorgs from block %s to %s", fromBlock.String(), toBlock.String())
98105
blockHeaders, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fromBlock, toBlock)
99106
if err != nil {
@@ -104,11 +111,6 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
104111
return nil, nil
105112
}
106113
mostRecentBlockHeader := blockHeaders[0]
107-
lastBlockHeader := blockHeaders[len(blockHeaders)-1]
108-
if mostRecentBlockHeader.Number.Cmp(lastBlockHeader.Number) == 0 {
109-
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
110-
return nil, nil
111-
}
112114

113115
firstMismatchIndex, err := findIndexOfFirstHashMismatch(blockHeaders)
114116
if err != nil {
@@ -138,6 +140,27 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
138140
return mostRecentBlockHeader.Number, nil
139141
}
140142

143+
func (rh *ReorgHandler) getReorgCheckRange(latestCheckedBlock *big.Int) (*big.Int, *big.Int, error) {
144+
latestCommittedBlock, err := rh.storage.MainStorage.GetMaxBlockNumber(rh.rpc.GetChainID())
145+
if err != nil {
146+
return nil, nil, fmt.Errorf("error getting latest committed block: %w", err)
147+
}
148+
if new(big.Int).Sub(latestCommittedBlock, latestCheckedBlock).Cmp(big.NewInt(int64(rh.blocksPerScan))) < 0 {
149+
// diff between latest committed and latest checked is less than blocksPerScan, so we will look back from the latest committed block
150+
fromBlock := new(big.Int).Sub(latestCommittedBlock, big.NewInt(int64(rh.blocksPerScan)))
151+
if fromBlock.Cmp(big.NewInt(0)) < 0 {
152+
fromBlock = big.NewInt(0)
153+
}
154+
toBlock := new(big.Int).Set(latestCommittedBlock)
155+
return fromBlock, toBlock, nil
156+
} else {
157+
// diff between latest committed and latest checked is greater or equal to blocksPerScan, so we will look forward from the latest checked block
158+
fromBlock := new(big.Int).Set(latestCheckedBlock)
159+
toBlock := new(big.Int).Add(fromBlock, big.NewInt(int64(rh.blocksPerScan)))
160+
return fromBlock, toBlock, nil
161+
}
162+
}
163+
141164
func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (int, error) {
142165
for i := 0; i < len(blockHeadersDescending)-1; i++ {
143166
currentBlock := blockHeadersDescending[i]

internal/orchestrator/reorg_handler_test.go

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,77 @@ func TestNewReorgHandlerStartsFromConfiguredBlock(t *testing.T) {
7979
assert.Equal(t, big.NewInt(1000), handler.lastCheckedBlock)
8080
}
8181

82+
func TestReorgHandlerRangeIsForwardLookingWhenItIsCatchingUp(t *testing.T) {
83+
defer func() { config.Cfg = config.Config{} }()
84+
config.Cfg.ReorgHandler.BlocksPerScan = 50
85+
86+
mockRPC := mocks.NewMockIRPCClient(t)
87+
mockMainStorage := mocks.NewMockIMainStorage(t)
88+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
89+
90+
mockStorage := storage.IStorage{
91+
MainStorage: mockMainStorage,
92+
OrchestratorStorage: mockOrchestratorStorage,
93+
}
94+
95+
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
96+
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(0), nil)
97+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)
98+
handler := NewReorgHandler(mockRPC, mockStorage)
99+
100+
fromBlock, toBlock, err := handler.getReorgCheckRange(big.NewInt(100))
101+
assert.NoError(t, err)
102+
assert.Equal(t, big.NewInt(100), fromBlock)
103+
assert.Equal(t, big.NewInt(150), toBlock)
104+
}
105+
func TestReorgHandlerRangeIsBackwardLookingWhenItIsCaughtUp(t *testing.T) {
106+
defer func() { config.Cfg = config.Config{} }()
107+
config.Cfg.ReorgHandler.BlocksPerScan = 50
108+
109+
mockRPC := mocks.NewMockIRPCClient(t)
110+
mockMainStorage := mocks.NewMockIMainStorage(t)
111+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
112+
113+
mockStorage := storage.IStorage{
114+
MainStorage: mockMainStorage,
115+
OrchestratorStorage: mockOrchestratorStorage,
116+
}
117+
118+
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
119+
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(0), nil)
120+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)
121+
handler := NewReorgHandler(mockRPC, mockStorage)
122+
123+
fromBlock, toBlock, err := handler.getReorgCheckRange(big.NewInt(990))
124+
assert.NoError(t, err)
125+
assert.Equal(t, big.NewInt(950), fromBlock)
126+
assert.Equal(t, big.NewInt(1000), toBlock)
127+
}
128+
129+
func TestReorgHandlerRangeStartIs0WhenRangeIsLargerThanProcessedBlocks(t *testing.T) {
130+
defer func() { config.Cfg = config.Config{} }()
131+
config.Cfg.ReorgHandler.BlocksPerScan = 50
132+
133+
mockRPC := mocks.NewMockIRPCClient(t)
134+
mockMainStorage := mocks.NewMockIMainStorage(t)
135+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
136+
137+
mockStorage := storage.IStorage{
138+
MainStorage: mockMainStorage,
139+
OrchestratorStorage: mockOrchestratorStorage,
140+
}
141+
142+
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
143+
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(0), nil)
144+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(10), nil)
145+
handler := NewReorgHandler(mockRPC, mockStorage)
146+
147+
fromBlock, toBlock, err := handler.getReorgCheckRange(big.NewInt(10))
148+
assert.NoError(t, err)
149+
assert.Equal(t, big.NewInt(0), fromBlock)
150+
assert.Equal(t, big.NewInt(10), toBlock)
151+
}
152+
82153
func TestFindReorgEndIndex(t *testing.T) {
83154
tests := []struct {
84155
name string
@@ -451,8 +522,9 @@ func TestStartReorgHandler(t *testing.T) {
451522
OrchestratorStorage: mockOrchestratorStorage,
452523
}
453524

454-
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1)).Times(5)
525+
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1)).Times(7)
455526
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(2000), nil).Times(1)
527+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(100000), nil)
456528
handler := NewReorgHandler(mockRPC, mockStorage)
457529
handler.triggerInterval = 100 // Set a short interval for testing
458530

@@ -470,6 +542,30 @@ func TestStartReorgHandler(t *testing.T) {
470542
time.Sleep(250 * time.Millisecond)
471543
}
472544

545+
func TestReorgHandlingIsSkippedIfMostRecentAndLastCheckedBlockAreSame(t *testing.T) {
546+
defer func() { config.Cfg = config.Config{} }()
547+
config.Cfg.ReorgHandler.BlocksPerScan = 10
548+
549+
mockRPC := mocks.NewMockIRPCClient(t)
550+
mockMainStorage := mocks.NewMockIMainStorage(t)
551+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
552+
553+
mockStorage := storage.IStorage{
554+
MainStorage: mockMainStorage,
555+
OrchestratorStorage: mockOrchestratorStorage,
556+
}
557+
558+
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
559+
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
560+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
561+
562+
handler := NewReorgHandler(mockRPC, mockStorage)
563+
mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(100))
564+
565+
assert.NoError(t, err)
566+
assert.Nil(t, mostRecentBlockChecked)
567+
}
568+
473569
func TestHandleReorgWithSingleBlockReorg(t *testing.T) {
474570
defer func() { config.Cfg = config.Config{} }()
475571
config.Cfg.ReorgHandler.BlocksPerScan = 10
@@ -486,6 +582,7 @@ func TestHandleReorgWithSingleBlockReorg(t *testing.T) {
486582
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
487583
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
488584
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
585+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)
489586

490587
mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(99), big.NewInt(109)).Return([]common.BlockHeader{
491588
{Number: big.NewInt(109), Hash: "hash109", ParentHash: "hash108"},
@@ -543,6 +640,7 @@ func TestHandleReorgWithLatestBlockReorged(t *testing.T) {
543640
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
544641
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
545642
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
643+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)
546644

547645
mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(99), big.NewInt(109)).Return([]common.BlockHeader{
548646
{Number: big.NewInt(109), Hash: "hash109", ParentHash: "hash108"}, // <-- fork starts here
@@ -610,6 +708,7 @@ func TestHandleReorgWithManyBlocks(t *testing.T) {
610708
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
611709
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
612710
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
711+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)
613712

614713
mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(99), big.NewInt(109)).Return([]common.BlockHeader{
615714
{Number: big.NewInt(109), Hash: "hash109", ParentHash: "hash108"},
@@ -672,6 +771,7 @@ func TestHandleReorgWithDuplicateBlocks(t *testing.T) {
672771

673772
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
674773
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(6268164), nil)
774+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(10000000), nil)
675775

676776
mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(6268162), big.NewInt(6268172)).Return([]common.BlockHeader{
677777
{Number: big.NewInt(6268172), Hash: "0x69d2044d27d2879c309fd885eb0c7d915c9aeed9b28df460d3b52cb4ccf888d8", ParentHash: "0xbf44d12afe40ef30effa32ed45c8d26d854ffba1c8ad781117117e7d18ca157f"},
@@ -708,6 +808,7 @@ func TestNothingIsDoneForCorrectBlocks(t *testing.T) {
708808

709809
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
710810
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(6268164), nil)
811+
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(10000000), nil)
711812

712813
mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(6268163), big.NewInt(6268173)).Return([]common.BlockHeader{
713814
{Number: big.NewInt(6268173), Hash: "0xa281ed679e6f7d0ede5fffdd3528348f303bc456d8d83e6bbe7ad0708f8f9b10", ParentHash: "0x69d2044d27d2879c309fd885eb0c7d915c9aeed9b28df460d3b52cb4ccf888d8"},

0 commit comments

Comments
 (0)