@@ -221,74 +221,90 @@ proc run(config: NimbusConf): PortalNode {.
221221
222222 node
223223
224- proc getBlockLoop (node: PortalNode , blockQueue: AsyncQueue [EthBlock ], startBlock: uint64 , portalWorkers: int ): Future [void ] {.async .} =
224+ proc getBlockLoop (
225+ node: PortalNode ,
226+ blockQueue: AsyncQueue [EthBlock ],
227+ startBlock: uint64 ,
228+ portalWorkers: int ,
229+ ): Future [void ] {.async .} =
225230 const bufferSize = 8192
226231
227- let historyNetwork = node.historyNetwork.value ()
228- let blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](2048 )
229-
230- var blockNumber = startBlock
231- var blocks: seq [EthBlock ] = newSeq [EthBlock ](bufferSize)
232- var count = 0
233- var failureCount = 0
232+ let
233+ historyNetwork = node.historyNetwork.value ()
234+ blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](portalWorkers * 2 )
234235
235- # Note: Could make these stuint bitmasks
236- var downloadFinished: array [bufferSize, bool ]
237- var downloadStarted: array [bufferSize, bool ]
236+ var
237+ blocks: array [bufferSize, EthBlock ]
238+ # Note: Could make this stuint bitmask
239+ downloadFinished: array [bufferSize, bool ]
240+ # stats counters
241+ totalDownloadCount = 0
242+ totalFailureCount = 0
238243
239244 proc blockWorker (node: PortalNode ): Future [void ] {.async .} =
240245 while true :
241- let (blockNumber , i) = await blockNumberQueue.popFirst ()
242- var currentBlockFailures = 0
246+ let (blockNumberOffset , i) = await blockNumberQueue.popFirst ()
247+ var blockFailureCount = 0
243248 while true :
244- let (header, body) = (await historyNetwork.getBlock (blockNumber + i)).valueOr:
245- currentBlockFailures.inc ()
246- if currentBlockFailures > 10 :
247- fatal " Block download failed too many times" , blockNumber = blockNumber + i, currentBlockFailures
249+ let blockNumber = blockNumberOffset + i
250+ let (header, body) = (await historyNetwork.getBlock (blockNumber)).valueOr:
251+ blockFailureCount.inc ()
252+ totalFailureCount.inc ()
253+ debug " Failed to get block" , blockNumber, blockFailureCount
254+ if blockFailureCount > 10 :
255+ fatal " Block download failed too many times" , blockNumber, blockFailureCount
248256 quit (QuitFailure )
249257
250- debug " Failed to get block" , blockNumber = blockNumber + i, currentBlockFailures
251- failureCount.inc ()
252258 continue
253259
254260 blocks[i] = init (EthBlock , header, body)
255261 downloadFinished[i] = true
256- count .inc ()
262+ totalDownloadCount .inc ()
257263
258264 break
259265
260266 var workers: seq [Future [void ]] = @ []
261267 for i in 0 ..< portalWorkers:
262268 workers.add node.blockWorker ()
263269
264- info " Start downloading blocks" , startBlock = blockNumber
265- var i = 0 'u64
266- var nextDownloadedIndex = 0
270+ info " Start downloading blocks" , startBlock
271+ var
272+ blockNumberOffset = startBlock
273+ nextReadIndex = 0
274+ nextWriteIndex = 0
275+
267276 let t0 = Moment .now ()
268277
269278 while true :
270- while downloadFinished[nextDownloadedIndex]:
271- debug " Adding block to the processing queue" , blockNumber = nextDownloadedIndex.uint64 + blockNumber
272- await blockQueue.addLast (blocks[nextDownloadedIndex])
273- downloadFinished[nextDownloadedIndex] = false
274- downloadStarted[nextDownloadedIndex] = false
275- nextDownloadedIndex = (nextDownloadedIndex + 1 ) mod bufferSize
276-
277- # TODO : can use the read pointer nextDownloadedIndex instead and get rid of downloadStarted
278- if not downloadStarted[i]:
279- debug " Adding block to the download queue" , blockNumber = blockNumber + i
280- await blockNumberQueue.addLast ((blockNumber, i))
281- downloadStarted[i] = true
282- # TODO clean this up by directly using blocknumber with modulo calc
283- if i == bufferSize.uint64 - 1 :
284- blockNumber += bufferSize.uint64
279+ while downloadFinished[nextReadIndex]:
280+ debug " Adding block to the processing queue" ,
281+ blockNumber = blockNumberOffset + nextReadIndex.uint64
282+ await blockQueue.addLast (blocks[nextReadIndex])
283+ downloadFinished[nextReadIndex] = false
284+ nextReadIndex = (nextReadIndex + 1 ) mod bufferSize
285+ if nextReadIndex == 0 :
285286 let t1 = Moment .now ()
286287 let diff = (t1 - t0).nanoseconds ().float / 1000000000
287- let avgBps = count.float / diff
288- info " Total blocks downloaded" , count = count, failureCount = failureCount, failureRate = failureCount.float / count.float , avgBps = avgBps
289- i = (i + 1 'u64 ) mod bufferSize.uint64
288+ let avgBps = totalDownloadCount.float / diff
289+ info " Total blocks downloaded" ,
290+ totalDownloadCount,
291+ totalFailureCount,
292+ avgBps,
293+ failureRate = totalFailureCount.float / totalDownloadCount.float
294+
295+ if nextWriteIndex != (nextReadIndex + bufferSize - 1 ) mod bufferSize:
296+ debug " Adding block to the download queue" ,
297+ blockNumber = blockNumberOffset + nextWriteIndex.uint64
298+ await blockNumberQueue.addLast ((blockNumberOffset, nextWriteIndex.uint64 ))
299+ nextWriteIndex = (nextWriteIndex + 1 ) mod bufferSize
300+ if nextWriteIndex == 0 :
301+ blockNumberOffset += bufferSize.uint64
290302 else :
291- await sleepAsync (1 .nanoseconds)
303+ debug " Waiting to add block downloads" ,
304+ nextReadIndex,
305+ nextWriteIndex,
306+ blockNumber = blockNumberOffset + nextReadIndex.uint64
307+ await sleepAsync (1 .seconds)
292308
293309proc importBlocks * (conf: NimbusConf , com: CommonRef , node: PortalNode , blockQueue: AsyncQueue [EthBlock ]) {.async .} =
294310 proc controlCHandler () {.noconv .} =
0 commit comments