3737 stateRoot: Hash32
3838 stateDiffs: seq [TransactionDiff ]
3939
40- BlockOffersRef = ref object
40+ BlockOffers = ref object
4141 blockNumber: uint64
4242 accountTrieOffers: seq [AccountTrieOfferWithKey ]
4343 contractTrieOffers: seq [ContractTrieOfferWithKey ]
4444 contractCodeOffers: seq [ContractCodeOfferWithKey ]
4545
4646 PortalStateGossipWorker = ref object
4747 id: int
48- portalClient: RpcClient
49- portalUrl: JsonRpcUrl
50- nodeId: NodeId
51- blockOffersQueue: AsyncQueue [BlockOffersRef ]
48+ portalClients: OrderedTable [NodeId , RpcClient ]
49+ portalEndpoints: seq [(JsonRpcUrl , NodeId )]
50+ blockOffersQueue: AsyncQueue [BlockOffers ]
5251 gossipBlockOffersLoop: Future [void ]
5352
5453 PortalStateBridge * = ref object
5554 web3Client: RpcClient
5655 web3Url: JsonRpcUrl
5756 db: DatabaseRef
5857 blockDataQueue: AsyncQueue [BlockData ]
59- blockOffersQueue: AsyncQueue [BlockOffersRef ]
58+ blockOffersQueue: AsyncQueue [BlockOffers ]
6059 gossipWorkers: seq [PortalStateGossipWorker ]
6160 collectBlockDataLoop: Future [void ]
6261 buildBlockOffersLoop: Future [void ]
@@ -92,27 +91,6 @@ proc putLastPersistedBlockNumber(db: DatabaseRef, blockNumber: uint64) {.inline.
9291 if blockNumber > db.getLastPersistedBlockNumber ().valueOr (0 ):
9392 db.put (rlp.encode (" lastPersistedBlockNumber" ), rlp.encode (blockNumber))
9493
95- proc collectOffer (
96- offersMap: OrderedTableRef [seq [byte ], seq [byte ]],
97- offerWithKey:
98- AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey ,
99- ) {.inline .} =
100- let keyBytes = offerWithKey.key.toContentKey ().encode ().asSeq ()
101- offersMap[keyBytes] = offerWithKey.offer.encode ()
102-
103- proc recursiveCollectOffer (
104- offersMap: OrderedTableRef [seq [byte ], seq [byte ]],
105- offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey ,
106- ) =
107- offersMap.collectOffer (offerWithKey)
108-
109- # root node, recursive collect is finished
110- if offerWithKey.key.path.unpackNibbles ().len () == 0 :
111- return
112-
113- # continue the recursive collect
114- offersMap.recursiveCollectOffer (offerWithKey.getParent ())
115-
11694proc runCollectBlockDataLoop (
11795 bridge: PortalStateBridge , startBlockNumber: uint64
11896) {.async : (raises: []).} =
@@ -237,7 +215,7 @@ proc runBuildBlockOffersLoop(
237215 builder.buildBlockOffers ()
238216
239217 await bridge.blockOffersQueue.addLast (
240- BlockOffersRef (
218+ BlockOffers (
241219 blockNumber: 0 .uint64 ,
242220 accountTrieOffers: builder.getAccountTrieOffers (),
243221 contractTrieOffers: builder.getContractTrieOffers (),
@@ -284,7 +262,7 @@ proc runBuildBlockOffersLoop(
284262 builder.buildBlockOffers ()
285263
286264 await bridge.blockOffersQueue.addLast (
287- BlockOffersRef (
265+ BlockOffers (
288266 blockNumber: blockData.blockNumber,
289267 accountTrieOffers: builder.getAccountTrieOffers (),
290268 contractTrieOffers: builder.getContractTrieOffers (),
@@ -299,7 +277,107 @@ proc runBuildBlockOffersLoop(
299277 except CancelledError :
300278 trace " buildBlockOffersLoop canceled"
301279
302- proc runGossipBlockOffersLoop (
280+ proc collectOffer (
281+ offersMap: OrderedTableRef [seq [byte ], seq [byte ]],
282+ offerWithKey:
283+ AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey ,
284+ ) {.inline .} =
285+ let keyBytes = offerWithKey.key.toContentKey ().encode ().asSeq ()
286+ offersMap[keyBytes] = offerWithKey.offer.encode ()
287+
288+ proc recursiveCollectOffer (
289+ offersMap: OrderedTableRef [seq [byte ], seq [byte ]],
290+ offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey ,
291+ ) =
292+ offersMap.collectOffer (offerWithKey)
293+
294+ # root node, recursive collect is finished
295+ if offerWithKey.key.path.unpackNibbles ().len () == 0 :
296+ return
297+
298+ # continue the recursive collect
299+ offersMap.recursiveCollectOffer (offerWithKey.getParent ())
300+
301+ func buildOffersMap (blockOffers: BlockOffers ): auto =
302+ let offersMap = newOrderedTable [seq [byte ], seq [byte ]]()
303+
304+ for offerWithKey in blockOffers.accountTrieOffers:
305+ offersMap.recursiveCollectOffer (offerWithKey)
306+ for offerWithKey in blockOffers.contractTrieOffers:
307+ offersMap.recursiveCollectOffer (offerWithKey)
308+ for offerWithKey in blockOffers.contractCodeOffers:
309+ offersMap.collectOffer (offerWithKey)
310+
311+ offersMap
312+
313+ proc orderPortalClientsByDistanceFromContent (
314+ worker: PortalStateGossipWorker , contentKey: seq [byte ]
315+ ) =
316+ let contentId = ContentKeyByteList .init (contentKey).toContentId ()
317+
318+ # Closure to sort the portal clients using their nodeIds
319+ # and comparing them to the contentId to be gossipped
320+ proc portalClientsCmp (x, y: (NodeId , RpcClient )): int =
321+ let
322+ xDistance = contentId xor x[0 ]
323+ yDistance = contentId xor y[0 ]
324+
325+ if xDistance == yDistance:
326+ 0
327+ elif xDistance > yDistance:
328+ 1
329+ else :
330+ - 1
331+
332+ # Sort the portalClients based on distance from the content so that
333+ # we gossip each piece of content to the closest node first
334+ worker.portalClients.sort (portalClientsCmp)
335+
336+ proc contentFoundInNetwork (
337+ worker: PortalStateGossipWorker , contentKey: seq [byte ]
338+ ): Future [bool ] {.async : (raises: [CancelledError ]).} =
339+ for nodeId, client in worker.portalClients:
340+ try :
341+ let contentInfo = await client.portal_stateGetContent (contentKey.to0xHex ())
342+ if contentInfo.content.len () > 0 :
343+ trace " Found existing content in network" ,
344+ contentKey = contentKey.to0xHex (), nodeId, workerId = worker.id
345+ return true
346+ except CancelledError as e:
347+ raise e
348+ except CatchableError as e:
349+ debug " Unable to find existing content in network" ,
350+ contentKey = contentKey.to0xHex (), nodeId, error = e.msg, workerId = worker.id
351+ return false
352+
353+ proc gossipContentIntoNetwork (
354+ worker: PortalStateGossipWorker ,
355+ minGossipPeers: int ,
356+ contentKey: seq [byte ],
357+ contentOffer: seq [byte ],
358+ ): Future [bool ] {.async : (raises: [CancelledError ]).} =
359+ for nodeId, client in worker.portalClients:
360+ try :
361+ let
362+ putContentResult = await client.portal_statePutContent (
363+ contentKey.to0xHex (), contentOffer.to0xHex ()
364+ )
365+ numPeers = putContentResult.peerCount
366+ if numPeers >= minGossipPeers:
367+ trace " Offer successfully gossipped to peers" ,
368+ contentKey = contentKey.to0xHex (), nodeId, numPeers, workerId = worker.id
369+ return true
370+ else :
371+ warn " Offer not gossiped to enough peers" ,
372+ contentKey = contentKey.to0xHex (), nodeId, numPeers, workerId = worker.id
373+ except CancelledError as e:
374+ raise e
375+ except CatchableError as e:
376+ error " Failed to gossip offer to peers" ,
377+ contentKey = contentKey.to0xHex (), nodeId, error = e.msg, workerId = worker.id
378+ return false
379+
380+ proc runGossipLoop (
303381 worker: PortalStateGossipWorker ,
304382 verifyGossip: bool ,
305383 skipGossipForExisting: bool ,
@@ -308,106 +386,52 @@ proc runGossipBlockOffersLoop(
308386 debug " Starting gossip block offers loop" , workerId = worker.id
309387
310388 try :
311- # Create one client per worker in order to improve performance.
389+ # Create separate clients in each worker in order to improve performance.
312390 # WebSocket connections don't perform well when shared by many
313391 # concurrent workers.
314- worker.portalClient = newRpcClientConnect (worker.portalUrl)
392+ for (rpcUrl, nodeId) in worker.portalEndpoints:
393+ worker.portalClients[nodeId] = newRpcClientConnect (rpcUrl)
315394
316- var blockOffers = await worker.blockOffersQueue.popFirst ()
317-
318- while true :
395+ var
396+ blockOffers = await worker.blockOffersQueue.popFirst ()
319397 # A table of offer key, value pairs is used to filter out duplicates so
320398 # that we don't gossip the same offer multiple times.
321- let offersMap = newOrderedTable [seq [byte ], seq [byte ]]()
322-
323- for offerWithKey in blockOffers.accountTrieOffers:
324- offersMap.recursiveCollectOffer (offerWithKey)
325- for offerWithKey in blockOffers.contractTrieOffers:
326- offersMap.recursiveCollectOffer (offerWithKey)
327- for offerWithKey in blockOffers.contractCodeOffers:
328- offersMap.collectOffer (offerWithKey)
329-
330- # We need to use a closure here because nodeId is required to calculate the
331- # distance of each content id from the node
332- proc offersMapCmp (x, y: (seq [byte ], seq [byte ])): int =
333- let
334- xId = ContentKeyByteList .init (x[0 ]).toContentId ()
335- yId = ContentKeyByteList .init (y[0 ]).toContentId ()
336- xDistance = worker.nodeId xor xId
337- yDistance = worker.nodeId xor yId
338-
339- if xDistance == yDistance:
340- 0
341- elif xDistance > yDistance:
342- 1
343- else :
344- - 1
345-
346- # Sort the offers based on the distance from the node so that we will gossip
347- # content that is closest to the node first
348- offersMap.sort (offersMapCmp)
399+ offersMap = buildOffersMap (blockOffers)
349400
401+ while true :
350402 var retryGossip = false
351- for k, v in offersMap:
403+
404+ for contentKey, contentOffer in offersMap:
405+ worker.orderPortalClientsByDistanceFromContent (contentKey)
406+
352407 # Check if we need to gossip the content
353- var gossipContent = true
354-
355- if skipGossipForExisting:
356- try :
357- let contentInfo =
358- await worker.portalClient.portal_stateGetContent (k.to0xHex ())
359- if contentInfo.content.len () > 0 :
360- gossipContent = false
361- except CancelledError as e:
362- raise e
363- except CatchableError as e:
364- debug " Unable to find existing content. Will attempt to gossip content: " ,
365- contentKey = k.to0xHex (), error = e.msg, workerId = worker.id
408+ if skipGossipForExisting and (await worker.contentFoundInNetwork (contentKey)):
409+ continue # move on to the next content key
366410
367411 # Gossip the content into the network
368- if gossipContent:
369- try :
370- let
371- putContentResult = await worker.portalClient.portal_statePutContent (
372- k.to0xHex (), v.to0xHex ()
373- )
374- numPeers = putContentResult.peerCount
375- if numPeers >= minGossipPeers:
376- debug " Offer successfully gossipped to peers" ,
377- numPeers, workerId = worker.id
378- else :
379- warn " Offer not gossiped to enough peers" , numPeers, workerId = worker.id
380- retryGossip = true
381- break
382- except CancelledError as e:
383- raise e
384- except CatchableError as e:
385- error " Failed to gossip offer to peers" , error = e.msg, workerId = worker.id
386- retryGossip = true
387- break
412+ let gossipCompleted = await worker.gossipContentIntoNetwork (
413+ minGossipPeers, contentKey, contentOffer
414+ )
415+ if not gossipCompleted:
416+ # Retry gossip of this block
417+ retryGossip = true
418+ break
388419
389420 # Check if the content can be found in the network
390421 var foundContentKeys = newSeq [seq [byte ]]()
391422 if verifyGossip and not retryGossip:
392- # wait for the peers to be updated
423+ # Wait for the peers to be updated.
424+ # Wait time is proportional to the number of offers
393425 let waitTimeMs = 200 + (offersMap.len () * 20 )
394426 await sleepAsync (waitTimeMs.milliseconds)
395- # wait time is proportional to the number of offers
396-
397- for k, _ in offersMap:
398- try :
399- let contentInfo =
400- await worker.portalClient.portal_stateGetContent (k.to0xHex ())
401- if contentInfo.content.len () == 0 :
402- error " Found empty contentValue" , workerId = worker.id
403- retryGossip = true
404- break
405- foundContentKeys.add (k)
406- except CancelledError as e:
407- raise e
408- except CatchableError as e:
409- warn " Unable to find content with key. Will retry gossipping content:" ,
410- contentKey = k.to0xHex (), error = e.msg, workerId = worker.id
427+
428+ for contentKey, _ in offersMap:
429+ worker.orderPortalClientsByDistanceFromContent (contentKey)
430+
431+ if await worker.contentFoundInNetwork (contentKey):
432+ foundContentKeys.add (contentKey)
433+ else :
434+ # Retry gossip of this block
411435 retryGossip = true
412436 break
413437
@@ -418,13 +442,17 @@ proc runGossipBlockOffersLoop(
418442 # Don't retry gossip for content that was found in the network
419443 for key in foundContentKeys:
420444 offersMap.del (key)
445+
421446 warn " Retrying state gossip for block: " ,
422447 blockNumber = blockOffers.blockNumber,
423448 remainingOffers = offersMap.len (),
424449 workerId = worker.id
425450
426- # We might need to reconnect if using a WebSocket client
427- await worker.portalClient.tryReconnect (worker.portalUrl)
451+ # We might need to reconnect if using WebSocket clients
452+ for (rpcUrl, nodeId) in worker.portalEndpoints:
453+ await worker.portalClients.getOrDefault (nodeId).tryReconnect (rpcUrl)
454+
455+ # Jump back to the top of while loop to retry processing the current block
428456 continue
429457
430458 if blockOffers.blockNumber mod 1000 == 0 :
@@ -439,6 +467,7 @@ proc runGossipBlockOffersLoop(
439467 workerId = worker.id
440468
441469 blockOffers = await worker.blockOffersQueue.popFirst ()
470+ offersMap = buildOffersMap (blockOffers)
442471 except CancelledError :
443472 trace " gossipBlockOffersLoop canceled"
444473
@@ -528,7 +557,7 @@ proc start*(bridge: PortalStateBridge, config: PortalBridgeConf) =
528557 info " Starting concurrent gossip workers" , workerCount = bridge.gossipWorkers.len ()
529558
530559 for worker in bridge.gossipWorkers:
531- worker.gossipBlockOffersLoop = worker.runGossipBlockOffersLoop (
560+ worker.gossipBlockOffersLoop = worker.runGossipLoop (
532561 config.verifyGossip, config.skipGossipForExisting, config.minGossipPeers.int
533562 )
534563
@@ -580,19 +609,16 @@ proc runState*(
580609 web3Url: config.web3RpcUrl,
581610 db: db,
582611 blockDataQueue: newAsyncQueue [BlockData ](queueSize),
583- blockOffersQueue: newAsyncQueue [BlockOffersRef ](queueSize),
612+ blockOffersQueue: newAsyncQueue [BlockOffers ](queueSize),
584613 gossipWorkers: newSeq [PortalStateGossipWorker ](),
585614 )
586615
587616 for i in 0 ..< config.gossipWorkers.int :
588- let
589- (rpcUrl, nodeId) = portalEndpoints[i mod config.portalRpcEndpoints.int ]
590- worker = PortalStateGossipWorker (
591- id: i + 1 ,
592- portalUrl: rpcUrl,
593- nodeId: nodeId,
594- blockOffersQueue: bridge.blockOffersQueue,
595- )
617+ let worker = PortalStateGossipWorker (
618+ id: i + 1 ,
619+ portalEndpoints: portalEndpoints,
620+ blockOffersQueue: bridge.blockOffersQueue,
621+ )
596622 bridge.gossipWorkers.add (worker)
597623
598624 bridge.start (config)
0 commit comments