@@ -13,15 +13,25 @@ import
1313 chronicles,
1414 metrics,
1515 chronos/ timer,
16- std/ [strformat, strutils],
16+ chronos,
17+ std/ [strformat, strutils, os],
1718 stew/ io2,
1819 beacon_chain/ era_db,
1920 beacon_chain/ networking/ network_metadata,
2021 ./ config,
2122 ./ common/ common,
2223 ./ core/ chain,
2324 ./ db/ era1_db,
24- ./ utils/ era_helpers
25+ ./ utils/ era_helpers,
26+ eth/ common/ keys, # rng
27+ eth/ net/ nat, # setupAddress
28+ eth/ p2p/ discoveryv5/ protocol as discv5_protocol,
29+ eth/ p2p/ discoveryv5/ routing_table,
30+ eth/ p2p/ discoveryv5/ enr,
31+ ../ fluffy/ portal_node,
32+ ../ fluffy/ common/ common_utils, # getPersistentNetKey, getPersistentEnr
33+ ../ fluffy/ network_metadata,
34+ ../ fluffy/ version
2535
2636declareGauge nec_import_block_number, " Latest imported block number"
2737
@@ -95,7 +105,192 @@ template boolFlag(flags, b): PersistBlockFlags =
95105 else :
96106 {}
97107
98- proc importBlocks * (conf: NimbusConf , com: CommonRef ) =
108+ proc run (config: NimbusConf ): PortalNode {.
109+ raises : [CatchableError ]
110+ .} =
111+ let rng = newRng ()
112+
113+ # # Network configuration
114+ let
115+ bindIp = config.listenAddress
116+ udpPort = Port (config.udpPort)
117+ # TODO : allow for no TCP port mapping!
118+ (extIp, _, extUdpPort) =
119+ try :
120+ setupAddress (config.nat, config.listenAddress, udpPort, udpPort, " portal" )
121+ except CatchableError as exc:
122+ raiseAssert exc.msg
123+ # raise exc # TODO: Ideally we don't have the Exception here
124+ except Exception as exc:
125+ raiseAssert exc.msg
126+ (netkey, newNetKey) =
127+ # if config.netKey.isSome():
128+ # (config.netKey.get(), true)
129+ # else:
130+ getPersistentNetKey (rng[], config.dataDir / " netkey" )
131+
132+ enrFilePath = config.dataDir / " nimbus_portal_node.enr"
133+ previousEnr =
134+ if not newNetKey:
135+ getPersistentEnr (enrFilePath)
136+ else :
137+ Opt .none (enr.Record )
138+
139+ var bootstrapRecords: seq [Record ]
140+ # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
141+ # bootstrapRecords.add(config.bootstrapNodes)
142+
143+ # case config.network
144+ # of PortalNetwork.none:
145+ # discard # don't connect to any network bootstrap nodes
146+ # of PortalNetwork.mainnet:
147+ # for enrURI in mainnetBootstrapNodes:
148+ # let res = enr.Record.fromURI(enrURI)
149+ # if res.isOk():
150+ # bootstrapRecords.add(res.value)
151+ # of PortalNetwork.angelfood:
152+ # for enrURI in angelfoodBootstrapNodes:
153+ # let res = enr.Record.fromURI(enrURI)
154+ # if res.isOk():
155+ # bootstrapRecords.add(res.value)
156+
157+ # Only mainnet
158+ for enrURI in mainnetBootstrapNodes:
159+ let res = enr.Record .fromURI (enrURI)
160+ if res.isOk ():
161+ bootstrapRecords.add (res.value)
162+
163+ # # Discovery v5 protocol setup
164+ let
165+ discoveryConfig =
166+ DiscoveryConfig .init (DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop )
167+ d = newProtocol (
168+ netkey,
169+ extIp,
170+ Opt .none (Port ),
171+ extUdpPort,
172+ # Note: The addition of default clientInfo to the ENR is a temporary
173+ # measure to easily identify & debug the clients used in the testnet.
174+ # Might make this into a, default off, cli option.
175+ localEnrFields = {" c" : enrClientInfoShort},
176+ bootstrapRecords = bootstrapRecords,
177+ previousRecord = previousEnr,
178+ bindIp = bindIp,
179+ bindPort = udpPort,
180+ enrAutoUpdate = true ,
181+ config = discoveryConfig,
182+ rng = rng,
183+ )
184+
185+ d.open ()
186+
187+ # # Portal node setup
188+ let
189+ portalProtocolConfig = PortalProtocolConfig .init (
190+ DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop , defaultAlpha, RadiusConfig (kind: Static , logRadius: 249 ),
191+ defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
192+ defaultDisableContentCache, defaultMaxConcurrentOffers, defaultDisableBanNodes,
193+ )
194+
195+ portalNodeConfig = PortalNodeConfig (
196+ accumulatorFile: Opt .none (string ),
197+ disableStateRootValidation: true ,
198+ trustedBlockRoot: Opt .none (Digest ),
199+ portalConfig: portalProtocolConfig,
200+ dataDir: string config.dataDir,
201+ storageCapacity: 0 ,
202+ contentRequestRetries: 1
203+ )
204+
205+ node = PortalNode .new (
206+ PortalNetwork .mainnet,
207+ portalNodeConfig,
208+ d,
209+ {PortalSubnetwork .history},
210+ bootstrapRecords = bootstrapRecords,
211+ rng = rng,
212+ )
213+
214+ let enrFile = config.dataDir / " nimbus_portal_node.enr"
215+ if io2.writeFile (enrFile, d.localNode.record.toURI ()).isErr:
216+ fatal " Failed to write the enr file" , file = enrFile
217+ quit 1
218+
219+ # # Start the Portal node.
220+ node.start ()
221+
222+ node
223+
224+ proc getBlockLoop (node: PortalNode , blockQueue: AsyncQueue [EthBlock ], startBlock: uint64 , portalWorkers: int ): Future [void ] {.async .} =
225+ const bufferSize = 8192
226+
227+ let historyNetwork = node.historyNetwork.value ()
228+ let blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](2048 )
229+
230+ var blockNumber = startBlock
231+ var blocks: seq [EthBlock ] = newSeq [EthBlock ](bufferSize)
232+ var count = 0
233+ var failureCount = 0
234+
235+ # Note: Could make these stuint bitmasks
236+ var downloadFinished: array [bufferSize, bool ]
237+ var downloadStarted: array [bufferSize, bool ]
238+
239+ proc blockWorker (node: PortalNode ): Future [void ] {.async .} =
240+ while true :
241+ let (blockNumber, i) = await blockNumberQueue.popFirst ()
242+ var currentBlockFailures = 0
243+ while true :
244+ let (header, body) = (await historyNetwork.getBlock (blockNumber + i)).valueOr:
245+ currentBlockFailures.inc ()
246+ if currentBlockFailures > 10 :
247+ fatal " Block download failed too many times" , blockNumber = blockNumber + i, currentBlockFailures
248+ quit (QuitFailure )
249+
250+ debug " Failed to get block" , blockNumber = blockNumber + i, currentBlockFailures
251+ failureCount.inc ()
252+ continue
253+
254+ blocks[i] = init (EthBlock , header, body)
255+ downloadFinished[i] = true
256+ count.inc ()
257+
258+ break
259+
260+ var workers: seq [Future [void ]] = @ []
261+ for i in 0 ..< portalWorkers:
262+ workers.add node.blockWorker ()
263+
264+ info " Start downloading blocks" , startBlock = blockNumber
265+ var i = 0 'u64
266+ var nextDownloadedIndex = 0
267+ let t0 = Moment .now ()
268+
269+ while true :
270+ while downloadFinished[nextDownloadedIndex]:
271+ debug " Adding block to the processing queue" , blockNumber = nextDownloadedIndex.uint64 + blockNumber
272+ await blockQueue.addLast (blocks[nextDownloadedIndex])
273+ downloadFinished[nextDownloadedIndex] = false
274+ downloadStarted[nextDownloadedIndex] = false
275+ nextDownloadedIndex = (nextDownloadedIndex + 1 ) mod bufferSize
276+
277+ # TODO : can use the read pointer nextDownloadedIndex instead and get rid of downloadStarted
278+ if not downloadStarted[i]:
279+ debug " Adding block to the download queue" , blockNumber = blockNumber + i
280+ await blockNumberQueue.addLast ((blockNumber, i))
281+ downloadStarted[i] = true
282+ # TODO clean this up by directly using blocknumber with modulo calc
283+ if i == bufferSize.uint64 - 1 :
284+ blockNumber += bufferSize.uint64
285+ let t1 = Moment .now ()
286+ let diff = (t1 - t0).nanoseconds ().float / 1000000000
287+ let avgBps = count.float / diff
288+ info " Total blocks downloaded" , count = count, failureCount = failureCount, failureRate = failureCount.float / count.float , avgBps = avgBps
289+ i = (i + 1 'u64 ) mod bufferSize.uint64
290+ else :
291+ await sleepAsync (1 .nanoseconds)
292+
293+ proc importBlocks * (conf: NimbusConf , com: CommonRef , node: PortalNode , blockQueue: AsyncQueue [EthBlock ]) {.async .} =
99294 proc controlCHandler () {.noconv .} =
100295 when defined (windows):
101296 # workaround for https://github.com/nim-lang/Nim/issues/4057
@@ -126,7 +321,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
126321 boolFlag (NoPersistBodies , not conf.storeBodies) +
127322 boolFlag ({PersistBlockFlag .NoPersistReceipts }, not conf.storeReceipts) +
128323 boolFlag ({PersistBlockFlag .NoPersistSlotHashes }, not conf.storeSlotHashes)
129- blk: Block
324+ blk: blocks. Block
130325 persister = Persister .init (com, flags)
131326 cstats: PersistStats # stats at start of chunk
132327
@@ -299,11 +494,16 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
299494
300495 while running and persister.stats.blocks.uint64 < conf.maxBlocks and
301496 blockNumber <= lastEra1Block:
302- if not loadEraBlock (blockNumber):
303- notice " No more `era1` blocks to import" , blockNumber, slot
304- break
305- persistBlock ()
306- checkpoint ()
497+ if not conf.usePortal:
498+ if not loadEraBlock (blockNumber):
499+ notice " No more `era1` blocks to import" , blockNumber, slot
500+ break
501+ persistBlock ()
502+ checkpoint ()
503+ else :
504+ blk = await blockQueue.popFirst ()
505+ persistBlock ()
506+ checkpoint ()
307507
308508 block era1Import:
309509 if blockNumber > lastEra1Block:
@@ -375,3 +575,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
375575 blocks = persister.stats.blocks,
376576 txs = persister.stats.txs,
377577 mgas = f (persister.stats.gas.float / 1000000 )
578+
579+ proc importBlocksPortal * (conf: NimbusConf , com: CommonRef ) {.
580+ raises : [CatchableError ]
581+ .} =
582+ let
583+ portalNode = run (conf)
584+ blockQueue = newAsyncQueue [EthBlock ]()
585+ start = com.db.baseTxFrame ().getSavedStateBlockNumber () + 1
586+
587+ if conf.usePortal:
588+ asyncSpawn portalNode.getBlockLoop (blockQueue, start, conf.portalWorkers)
589+
590+ asyncSpawn importBlocks (conf, com, portalNode, blockQueue)
591+
592+ while running:
593+ try :
594+ poll ()
595+ except CatchableError as e:
596+ warn " Exception in poll()" , exc = e.name, err = e.msg
0 commit comments