@@ -13,15 +13,25 @@ import
1313 chronicles,
1414 metrics,
1515 chronos/ timer,
16- std/ [strformat, strutils],
16+ chronos,
17+ std/ [strformat, strutils, os],
1718 stew/ io2,
1819 beacon_chain/ era_db,
1920 beacon_chain/ networking/ network_metadata,
2021 ./ config,
2122 ./ common/ common,
2223 ./ core/ chain,
2324 ./ db/ era1_db,
24- ./ utils/ era_helpers
25+ ./ utils/ era_helpers,
26+ eth/ common/ keys, # rng
27+ eth/ net/ nat, # setupAddress
28+ eth/ p2p/ discoveryv5/ protocol as discv5_protocol,
29+ eth/ p2p/ discoveryv5/ routing_table,
30+ eth/ p2p/ discoveryv5/ enr,
31+ ../ fluffy/ portal_node,
32+ ../ fluffy/ common/ common_utils, # getPersistentNetKey, getPersistentEnr
33+ ../ fluffy/ network_metadata,
34+ ../ fluffy/ version
2535
2636declareGauge nec_import_block_number, " Latest imported block number"
2737
@@ -33,7 +43,165 @@ declareCounter nec_imported_gas, "Gas processed during import"
3343
3444var running {.volatile .} = true
3545
36- proc importBlocks * (conf: NimbusConf , com: CommonRef ) =
46+ proc runPortalNode (config: NimbusConf ): PortalNode {.
47+ raises : [CatchableError ]
48+ .} =
49+ let rng = newRng ()
50+
51+ # # Network configuration
52+ let
53+ bindIp = config.listenAddress
54+ udpPort = Port (config.udpPort)
55+ # TODO : allow for no TCP port mapping!
56+ (extIp, _, extUdpPort) =
57+ try :
58+ setupAddress (config.nat, config.listenAddress, udpPort, udpPort, " portal" )
59+ except CatchableError as exc:
60+ raiseAssert exc.msg
61+ # raise exc # TODO: Ideally we don't have the Exception here
62+ except Exception as exc:
63+ raiseAssert exc.msg
64+ (netkey, newNetKey) =
65+ # if config.netKey.isSome():
66+ # (config.netKey.get(), true)
67+ # else:
68+ getPersistentNetKey (rng[], config.dataDir / " netkey" )
69+
70+ enrFilePath = config.dataDir / " nimbus_portal_node.enr"
71+ previousEnr =
72+ if not newNetKey:
73+ getPersistentEnr (enrFilePath)
74+ else :
75+ Opt .none (enr.Record )
76+
77+ var bootstrapRecords: seq [Record ]
78+ # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
79+ # bootstrapRecords.add(config.bootstrapNodes)
80+
81+ # case config.network
82+ # of PortalNetwork.none:
83+ # discard # don't connect to any network bootstrap nodes
84+ # of PortalNetwork.mainnet:
85+ # for enrURI in mainnetBootstrapNodes:
86+ # let res = enr.Record.fromURI(enrURI)
87+ # if res.isOk():
88+ # bootstrapRecords.add(res.value)
89+ # of PortalNetwork.angelfood:
90+ # for enrURI in angelfoodBootstrapNodes:
91+ # let res = enr.Record.fromURI(enrURI)
92+ # if res.isOk():
93+ # bootstrapRecords.add(res.value)
94+
95+ # Only mainnet
96+ for enrURI in mainnetBootstrapNodes:
97+ let res = enr.Record .fromURI (enrURI)
98+ if res.isOk ():
99+ bootstrapRecords.add (res.value)
100+
101+ # # Discovery v5 protocol setup
102+ let
103+ discoveryConfig =
104+ DiscoveryConfig .init (DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop )
105+ d = newProtocol (
106+ netkey,
107+ extIp,
108+ Opt .none (Port ),
109+ extUdpPort,
110+ # Note: The addition of default clientInfo to the ENR is a temporary
111+ # measure to easily identify & debug the clients used in the testnet.
112+ # Might make this into a, default off, cli option.
113+ localEnrFields = {" c" : enrClientInfoShort},
114+ bootstrapRecords = bootstrapRecords,
115+ previousRecord = previousEnr,
116+ bindIp = bindIp,
117+ bindPort = udpPort,
118+ enrAutoUpdate = true ,
119+ config = discoveryConfig,
120+ rng = rng,
121+ )
122+
123+ d.open ()
124+
125+ # # Portal node setup
126+ let
127+ portalProtocolConfig = PortalProtocolConfig .init (
128+ DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop , defaultAlpha, RadiusConfig (kind: Static , logRadius: 249 ),
129+ defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
130+ defaultDisableContentCache, defaultMaxConcurrentOffers
131+ )
132+
133+ portalNodeConfig = PortalNodeConfig (
134+ accumulatorFile: Opt .none (string ),
135+ disableStateRootValidation: true ,
136+ trustedBlockRoot: Opt .none (Digest ),
137+ portalConfig: portalProtocolConfig,
138+ dataDir: string config.dataDir,
139+ storageCapacity: 0 ,
140+ contentRequestRetries: 1
141+ )
142+
143+ node = PortalNode .new (
144+ PortalNetwork .mainnet,
145+ portalNodeConfig,
146+ d,
147+ {PortalSubnetwork .history},
148+ bootstrapRecords = bootstrapRecords,
149+ rng = rng,
150+ )
151+
152+ let enrFile = config.dataDir / " nimbus_portal_node.enr"
153+ if io2.writeFile (enrFile, d.localNode.record.toURI ()).isErr:
154+ fatal " Failed to write the enr file" , file = enrFile
155+ quit 1
156+
157+ # # Start the Portal node.
158+ node.start ()
159+
160+ node
161+
162+
163+ proc getBlockLoop (node: PortalNode , blockQueue: AsyncQueue [seq [EthBlock ]], startBlock: uint64 ): Future [void ] {.async .} =
164+ let historyNetwork = node.historyNetwork.value ()
165+ var blockNumber = startBlock
166+
167+ let blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](2048 )
168+ var blocks: seq [EthBlock ] = newSeq [EthBlock ](8192 )
169+ var count = 0
170+
171+ proc blockWorker (node: PortalNode ): Future [void ] {.async .} =
172+ while true :
173+ let (blockNumber, i) = await blockNumberQueue.popFirst ()
174+ while true :
175+ let (header, body) = (await historyNetwork.getBlock (blockNumber + i)).valueOr:
176+ error " Failed to get block" , blockNumber = blockNumber + i
177+ # Note: loop will get stuck here if a block is not available
178+ continue
179+
180+ blocks[i] = init (EthBlock , header, body)
181+ count.inc ()
182+
183+ break
184+
185+ var workers: seq [Future [void ]] = @ []
186+ for i in 0 ..< 512 :
187+ workers.add node.blockWorker ()
188+
189+ while true :
190+ blocks = newSeq [EthBlock ](8192 )
191+ count = 0
192+ info " Downloading 8192 blocks" , startBlock = blockNumber
193+ for i in 0 .. 8191 'u64 :
194+ await blockNumberQueue.addLast ((blockNumber, i))
195+
196+ # Not great :/
197+ while count != 8192 :
198+ await sleepAsync (10 .milliseconds)
199+ info " Adding 8192 blocks" , startBlock = blockNumber
200+ await blockQueue.addLast (blocks)
201+
202+ blockNumber += 8192
203+
204+ proc importBlocks * (conf: NimbusConf , com: CommonRef , blockQueue: AsyncQueue [seq [EthBlock ]]) {.async : (raises: [CancelledError ]).} =
37205 proc controlCHandler () {.noconv .} =
38206 when defined (windows):
39207 # workaround for https://github.com/nim-lang/Nim/issues/4057
@@ -278,14 +446,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
278446 true
279447
280448 while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block:
281- if not loadEraBlock (blockNumber):
282- notice " No more era1 blocks to import" , blockNumber
283- break
449+ if not conf.usePortal:
450+ if not loadEraBlock (blockNumber):
451+ notice " No more era1 blocks to import" , blockNumber
452+ break
284453
285- imported += 1
454+ imported += 1
455+ else :
456+ let blockSeq = await blockQueue.popFirst ()
457+ blocks.add (blockSeq)
458+ info " Loaded 8192 blocks" , startBlock = blockNumber
459+
460+ imported += 8192
286461
287462 if blocks.lenu64 mod conf.chunkSize == 0 :
288463 process ()
464+ info " Processed chunk of blocks"
289465
290466 if blocks.len > 0 :
291467 process () # last chunk, if any
@@ -341,3 +517,21 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
341517
342518 if blocks.len > 0 :
343519 process ()
520+
521+ proc importBlocks * (conf: NimbusConf , com: CommonRef ) {.
522+ raises : [CatchableError ]
523+ .} =
524+ let blockQueue = newAsyncQueue [seq [EthBlock ]](4 )
525+
526+ if conf.usePortal:
527+ let portalNode = runPortalNode (conf)
528+ let start = com.db.getSavedStateBlockNumber () + 1
529+ asyncSpawn portalNode.getBlockLoop (blockQueue, start)
530+
531+ asyncSpawn importBlocks (conf, com, blockQueue)
532+
533+ while running:
534+ try :
535+ poll ()
536+ except CatchableError as e:
537+ warn " Exception in poll()" , exc = e.name, err = e.msg
0 commit comments