@@ -13,15 +13,25 @@ import
1313 chronicles,
1414 metrics,
1515 chronos/ timer,
16- std/ [strformat, strutils],
16+ chronos,
17+ std/ [strformat, strutils, os],
1718 stew/ io2,
1819 beacon_chain/ era_db,
1920 beacon_chain/ networking/ network_metadata,
2021 ./ config,
2122 ./ common/ common,
2223 ./ core/ chain,
2324 ./ db/ era1_db,
24- ./ utils/ era_helpers
25+ ./ utils/ era_helpers,
26+ eth/ common/ keys, # rng
27+ eth/ net/ nat, # setupAddress
28+ eth/ p2p/ discoveryv5/ protocol as discv5_protocol,
29+ eth/ p2p/ discoveryv5/ routing_table,
30+ eth/ p2p/ discoveryv5/ enr,
31+ ../ fluffy/ portal_node,
32+ ../ fluffy/ common/ common_utils, # getPersistentNetKey, getPersistentEnr
33+ ../ fluffy/ network_metadata,
34+ ../ fluffy/ version
2535
2636declareGauge nec_import_block_number, " Latest imported block number"
2737
@@ -87,7 +97,192 @@ template boolFlag(flags, b): PersistBlockFlags =
8797 else :
8898 {}
8999
90- proc importBlocks * (conf: NimbusConf , com: CommonRef ) =
100+ proc run (config: NimbusConf ): PortalNode {.
101+ raises : [CatchableError ]
102+ .} =
103+ let rng = newRng ()
104+
105+ # # Network configuration
106+ let
107+ bindIp = config.listenAddress
108+ udpPort = Port (config.udpPort)
109+ # TODO : allow for no TCP port mapping!
110+ (extIp, _, extUdpPort) =
111+ try :
112+ setupAddress (config.nat, config.listenAddress, udpPort, udpPort, " portal" )
113+ except CatchableError as exc:
114+ raiseAssert exc.msg
115+ # raise exc # TODO: Ideally we don't have the Exception here
116+ except Exception as exc:
117+ raiseAssert exc.msg
118+ (netkey, newNetKey) =
119+ # if config.netKey.isSome():
120+ # (config.netKey.get(), true)
121+ # else:
122+ getPersistentNetKey (rng[], config.dataDir / " netkey" )
123+
124+ enrFilePath = config.dataDir / " nimbus_portal_node.enr"
125+ previousEnr =
126+ if not newNetKey:
127+ getPersistentEnr (enrFilePath)
128+ else :
129+ Opt .none (enr.Record )
130+
131+ var bootstrapRecords: seq [Record ]
132+ # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
133+ # bootstrapRecords.add(config.bootstrapNodes)
134+
135+ # case config.network
136+ # of PortalNetwork.none:
137+ # discard # don't connect to any network bootstrap nodes
138+ # of PortalNetwork.mainnet:
139+ # for enrURI in mainnetBootstrapNodes:
140+ # let res = enr.Record.fromURI(enrURI)
141+ # if res.isOk():
142+ # bootstrapRecords.add(res.value)
143+ # of PortalNetwork.angelfood:
144+ # for enrURI in angelfoodBootstrapNodes:
145+ # let res = enr.Record.fromURI(enrURI)
146+ # if res.isOk():
147+ # bootstrapRecords.add(res.value)
148+
149+ # Only mainnet
150+ for enrURI in mainnetBootstrapNodes:
151+ let res = enr.Record .fromURI (enrURI)
152+ if res.isOk ():
153+ bootstrapRecords.add (res.value)
154+
155+ # # Discovery v5 protocol setup
156+ let
157+ discoveryConfig =
158+ DiscoveryConfig .init (DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop )
159+ d = newProtocol (
160+ netkey,
161+ extIp,
162+ Opt .none (Port ),
163+ extUdpPort,
164+ # Note: The addition of default clientInfo to the ENR is a temporary
165+ # measure to easily identify & debug the clients used in the testnet.
166+ # Might make this into a, default off, cli option.
167+ localEnrFields = {" c" : enrClientInfoShort},
168+ bootstrapRecords = bootstrapRecords,
169+ previousRecord = previousEnr,
170+ bindIp = bindIp,
171+ bindPort = udpPort,
172+ enrAutoUpdate = true ,
173+ config = discoveryConfig,
174+ rng = rng,
175+ )
176+
177+ d.open ()
178+
179+ # # Portal node setup
180+ let
181+ portalProtocolConfig = PortalProtocolConfig .init (
182+ DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop , defaultAlpha, RadiusConfig (kind: Static , logRadius: 249 ),
183+ defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
184+ defaultDisableContentCache, defaultMaxConcurrentOffers, defaultDisableBanNodes,
185+ )
186+
187+ portalNodeConfig = PortalNodeConfig (
188+ accumulatorFile: Opt .none (string ),
189+ disableStateRootValidation: true ,
190+ trustedBlockRoot: Opt .none (Digest ),
191+ portalConfig: portalProtocolConfig,
192+ dataDir: string config.dataDir,
193+ storageCapacity: 0 ,
194+ contentRequestRetries: 1
195+ )
196+
197+ node = PortalNode .new (
198+ PortalNetwork .mainnet,
199+ portalNodeConfig,
200+ d,
201+ {PortalSubnetwork .history},
202+ bootstrapRecords = bootstrapRecords,
203+ rng = rng,
204+ )
205+
206+ let enrFile = config.dataDir / " nimbus_portal_node.enr"
207+ if io2.writeFile (enrFile, d.localNode.record.toURI ()).isErr:
208+ fatal " Failed to write the enr file" , file = enrFile
209+ quit 1
210+
211+ # # Start the Portal node.
212+ node.start ()
213+
214+ node
215+
216+ proc getBlockLoop (node: PortalNode , blockQueue: AsyncQueue [EthBlock ], startBlock: uint64 , portalWorkers: int ): Future [void ] {.async .} =
217+ const bufferSize = 8192
218+
219+ let historyNetwork = node.historyNetwork.value ()
220+ let blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](2048 )
221+
222+ var blockNumber = startBlock
223+ var blocks: seq [EthBlock ] = newSeq [EthBlock ](bufferSize)
224+ var count = 0
225+ var failureCount = 0
226+
227+ # Note: Could make these stuint bitmasks
228+ var downloadFinished: array [bufferSize, bool ]
229+ var downloadStarted: array [bufferSize, bool ]
230+
231+ proc blockWorker (node: PortalNode ): Future [void ] {.async .} =
232+ while true :
233+ let (blockNumber, i) = await blockNumberQueue.popFirst ()
234+ var currentBlockFailures = 0
235+ while true :
236+ let (header, body) = (await historyNetwork.getBlock (blockNumber + i)).valueOr:
237+ currentBlockFailures.inc ()
238+ if currentBlockFailures > 10 :
239+ fatal " Block download failed too many times" , blockNumber = blockNumber + i, currentBlockFailures
240+ quit (QuitFailure )
241+
242+ debug " Failed to get block" , blockNumber = blockNumber + i, currentBlockFailures
243+ failureCount.inc ()
244+ continue
245+
246+ blocks[i] = init (EthBlock , header, body)
247+ downloadFinished[i] = true
248+ count.inc ()
249+
250+ break
251+
252+ var workers: seq [Future [void ]] = @ []
253+ for i in 0 ..< portalWorkers:
254+ workers.add node.blockWorker ()
255+
256+ info " Start downloading blocks" , startBlock = blockNumber
257+ var i = 0 'u64
258+ var nextDownloadedIndex = 0
259+ let t0 = Moment .now ()
260+
261+ while true :
262+ while downloadFinished[nextDownloadedIndex]:
263+ debug " Adding block to the processing queue" , blockNumber = nextDownloadedIndex.uint64 + blockNumber
264+ await blockQueue.addLast (blocks[nextDownloadedIndex])
265+ downloadFinished[nextDownloadedIndex] = false
266+ downloadStarted[nextDownloadedIndex] = false
267+ nextDownloadedIndex = (nextDownloadedIndex + 1 ) mod bufferSize
268+
269+ # TODO : can use the read pointer nextDownloadedIndex instead and get rid of downloadStarted
270+ if not downloadStarted[i]:
271+ debug " Adding block to the download queue" , blockNumber = blockNumber + i
272+ await blockNumberQueue.addLast ((blockNumber, i))
273+ downloadStarted[i] = true
274+ # TODO clean this up by directly using blocknumber with modulo calc
275+ if i == bufferSize.uint64 - 1 :
276+ blockNumber += bufferSize.uint64
277+ let t1 = Moment .now ()
278+ let diff = (t1 - t0).nanoseconds ().float / 1000000000
279+ let avgBps = count.float / diff
280+ info " Total blocks downloaded" , count = count, failureCount = failureCount, failureRate = failureCount.float / count.float , avgBps = avgBps
281+ i = (i + 1 'u64 ) mod bufferSize.uint64
282+ else :
283+ await sleepAsync (1 .nanoseconds)
284+
285+ proc importBlocks * (conf: NimbusConf , com: CommonRef , node: PortalNode , blockQueue: AsyncQueue [EthBlock ]) {.async .} =
91286 proc controlCHandler () {.noconv .} =
92287 when defined (windows):
93288 # workaround for https://github.com/nim-lang/Nim/issues/4057
@@ -118,7 +313,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
118313 boolFlag (NoPersistBodies , not conf.storeBodies) +
119314 boolFlag ({PersistBlockFlag .NoPersistReceipts }, not conf.storeReceipts) +
120315 boolFlag ({PersistBlockFlag .NoPersistSlotHashes }, not conf.storeSlotHashes)
121- blk: Block
316+ blk: blocks. Block
122317 persister = Persister .init (com, flags)
123318 cstats: PersistStats # stats at start of chunk
124319
@@ -292,11 +487,16 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
292487
293488 while running and persister.stats.blocks.uint64 < conf.maxBlocks and
294489 blockNumber <= lastEra1Block:
295- if not loadEraBlock (blockNumber):
296- notice " No more `era1` blocks to import" , blockNumber, slot
297- break
298- persistBlock ()
299- checkpoint ()
490+ if not conf.usePortal:
491+ if not loadEraBlock (blockNumber):
492+ notice " No more `era1` blocks to import" , blockNumber, slot
493+ break
494+ persistBlock ()
495+ checkpoint ()
496+ else :
497+ blk = await blockQueue.popFirst ()
498+ persistBlock ()
499+ checkpoint ()
300500
301501 block era1Import:
302502 if blockNumber > lastEra1Block:
@@ -368,3 +568,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
368568 blocks = persister.stats.blocks,
369569 txs = persister.stats.txs,
370570 mgas = f (persister.stats.gas.float / 1000000 )
571+
572+ proc importBlocksPortal * (conf: NimbusConf , com: CommonRef ) {.
573+ raises : [CatchableError ]
574+ .} =
575+ let
576+ portalNode = run (conf)
577+ blockQueue = newAsyncQueue [EthBlock ]()
578+ start = com.db.baseTxFrame ().getSavedStateBlockNumber () + 1
579+
580+ if conf.usePortal:
581+ asyncSpawn portalNode.getBlockLoop (blockQueue, start, conf.portalWorkers)
582+
583+ asyncSpawn importBlocks (conf, com, portalNode, blockQueue)
584+
585+ while running:
586+ try :
587+ poll ()
588+ except CatchableError as e:
589+ warn " Exception in poll()" , exc = e.name, err = e.msg
0 commit comments