Skip to content

Commit ad8e250

Browse files
committed
feat: modify retry mechanism; add DHT guard rails; improve block cancellation handling
1 parent cdab172 commit ad8e250

File tree

3 files changed

+102
-33
lines changed

3 files changed

+102
-33
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ const
7373
DefaultMaxBlocksPerMessage = 500
7474
DefaultTaskQueueSize = 100
7575
DefaultConcurrentTasks = 10
76+
# Don't do more than one discovery request per `DiscoveryRateLimit` seconds.
77+
DiscoveryRateLimit = 1.seconds
78+
DefaultPeerActivityTimeout = 1.minutes
7679

7780
type
7881
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
@@ -94,6 +97,7 @@ type
9497
pricing*: ?Pricing # Optional bandwidth pricing
9598
discovery*: DiscoveryEngine
9699
advertiser*: Advertiser
100+
lastDiscRequest: Moment # time of last discovery request
97101

98102
Pricing* = object
99103
address*: EthAddress
@@ -193,6 +197,14 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
193197
# efficient about it.
194198
await self.refreshBlockKnowledge(peer)
195199

200+
proc searchForNewPeers(self: BlockExcEngine, cid: Cid) =
201+
if self.lastDiscRequest + DiscoveryRateLimit < Moment.now():
202+
trace "Searching for new peers for", cid = cid
203+
self.lastDiscRequest = Moment.now() # always refresh before calling await!
204+
self.discovery.queueFindBlocksReq(@[cid])
205+
else:
206+
trace "Not searching for new peers, rate limit not expired", cid = cid
207+
196208
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
197209
Rng.instance.sample(peers)
198210

@@ -215,34 +227,56 @@ proc downloadInternal(
215227
handle.fail(newException(RetriesExhaustedError, "Error retries exhausted"))
216228
break
217229

218-
trace "Running retry handle"
219230
let peers = self.peers.getPeersForBlock(address)
220231
logScope:
221232
peersWith = peers.with.len
222233
peersWithout = peers.without.len
223234

224-
trace "Peers for block"
225-
if peers.with.len > 0:
226-
self.pendingBlocks.setInFlight(address, true)
227-
await self.sendWantBlock(@[address], peers.with.randomPeer)
228-
else:
229-
self.pendingBlocks.setInFlight(address, false)
235+
if peers.with.len == 0:
236+
# We know of no peers that have the block.
230237
if peers.without.len > 0:
231-
# We have peers connected, but none of them have the block. This
238+
# If we have peers connected but none of them have the block, this
232239
# could be because our knowledge about what they have has run stale.
233240
# Tries to refresh it.
234241
await self.refreshBlockKnowledge()
235-
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
242+
# Also tries to look for new peers for good measure.
243+
# TODO: in the future, peer search and knowledge maintenance should
244+
# be completely decoupled from one another. It is very hard to
245+
# control what happens and how many neighbors we get like this.
246+
self.searchForNewPeers(address.cidOrTreeCid)
247+
248+
# We wait for a bit and then retry. Since we've shipped our wantlists to
249+
# connected peers, they might reply and we might request the block in the
250+
# meantime as part of `blockPresenceHandler`.
251+
await handle or sleepAsync(self.pendingBlocks.retryInterval)
252+
# If we already got the block, we're done. Otherwise, we'll go for another
253+
# cycle, potentially refreshing knowledge on peers again, and looking up
254+
# the DHT again.
255+
if handle.finished:
256+
break
257+
trace "No peers for block, will retry shortly"
258+
continue
259+
260+
let scheduledPeer = peers.with.randomPeer
261+
self.pendingBlocks.setInFlight(address, true)
262+
scheduledPeer.blockRequested(address)
263+
await self.sendWantBlock(@[address], scheduledPeer)
264+
265+
let activityTimer = scheduledPeer.activityTimer()
266+
await handle or activityTimer # TODO: or peerDropped
267+
activityTimer.cancel()
236268

237-
# FIXME: blocks should not blindly reschedule themselves. Instead,
238-
# we should only reschedule a block if the peer drops, or we are
239-
# in endgame mode.
240-
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
269+
# XXX: we should probably not have this. Blocks should be retried
270+
# to infinity unless cancelled by the client.
241271
self.pendingBlocks.decRetries(address)
242272

243273
if handle.finished:
244274
trace "Handle for block finished", failed = handle.failed
245275
break
276+
else:
277+
# If the peer timed out, retries immediately.
278+
trace "Dropping timed out peer.", peer = scheduledPeer.id
279+
# TODO: disconnect peer
246280
except CancelledError as exc:
247281
trace "Block download cancelled"
248282
if not handle.finished:
@@ -347,6 +381,7 @@ proc blockPresenceHandler*(
347381
for address in ourWantCids:
348382
self.pendingBlocks.setInFlight(address, true)
349383
self.pendingBlocks.decRetries(address)
384+
peerCtx.blockRequested(address)
350385

351386
if ourWantCids.len > 0:
352387
trace "Peer has blocks in our wantList", peer, wants = ourWantCids
@@ -401,15 +436,13 @@ proc cancelBlocks(
401436
return entry.peerId
402437

403438
try:
404-
# Does the peer have any of the blocks we're canceling?
405439
for peerCtx in self.peers.peers.values:
406-
let intersection = peerCtx.peerHave.intersection(addrSet)
440+
# Have we requested any of the blocks we're cancelling to this peer?
441+
let intersection = peerCtx.blocksRequested.intersection(addrSet)
407442
if intersection.len > 0:
408443
pendingCancellations[peerCtx.id] = intersection
409444

410445
# If so, dispatches cancellations.
411-
# FIXME: we're still spamming peers - the fact that the peer has the block does
412-
# not mean we've requested it.
413446
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
414447
toSeq(pendingCancellations.pairs).map(processPeer)
415448
)
@@ -418,6 +451,8 @@ proc cancelBlocks(
418451
let ctx = self.peers.get(peerId)
419452
if not ctx.isNil:
420453
ctx.cleanPresence(addrs)
454+
for address in pendingCancellations[peerId]:
455+
ctx.blockRequestCancelled(address)
421456

422457
if failedFuts.len > 0:
423458
warn "Failed to send block request cancellations to peers", peers = failedFuts.len
@@ -492,6 +527,8 @@ proc blocksDeliveryHandler*(
492527
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address))
493528

494529
var validatedBlocksDelivery: seq[BlockDelivery]
530+
let peerCtx = self.peers.get(peer)
531+
495532
for bd in blocksDelivery:
496533
logScope:
497534
peer = peer
@@ -517,6 +554,9 @@ proc blocksDeliveryHandler*(
517554
).errorOption:
518555
warn "Unable to store proof and cid for a block"
519556
continue
557+
558+
if peerCtx != nil:
559+
peerCtx.blockReceived(bd.address)
520560
except CatchableError as exc:
521561
warn "Error handling block delivery", error = exc.msg
522562
continue
@@ -525,7 +565,6 @@ proc blocksDeliveryHandler*(
525565

526566
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
527567

528-
let peerCtx = self.peers.get(peer)
529568
if peerCtx != nil:
530569
if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption:
531570
warn "Error paying for blocks", err = err.msg
@@ -652,7 +691,7 @@ proc setupPeer*(
652691
trace "Setting up peer", peer
653692

654693
if peer notin self.peers:
655-
let peerCtx = BlockExcPeerCtx(id: peer)
694+
let peerCtx = BlockExcPeerCtx(id: peer, activityTimeout: DefaultPeerActivityTimeout)
656695
trace "Setting up new peer", peer
657696
self.peers.add(peerCtx)
658697
trace "Added peer", peers = self.peers.len
@@ -701,14 +740,14 @@ proc taskHandler*(
701740
# Send to the peer blocks he wants to get,
702741
# if they present in our local store
703742

704-
# Blocks that are in flight have already been picked up by other tasks and
743+
# Blocks that have been sent have already been picked up by other tasks and
705744
# should not be re-sent.
706745
var
707-
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isInFlight(it))
746+
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isBlockSent(it))
708747
sent: HashSet[BlockAddress]
709748

710749
for wantedBlock in wantedBlocks:
711-
peerCtx.addInFlight(wantedBlock)
750+
peerCtx.markBlockAsSent(wantedBlock)
712751

713752
try:
714753
for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage):
@@ -718,7 +757,7 @@ proc taskHandler*(
718757
without blockDelivery =? await self.localLookup(wantedBlock), err:
719758
error "Error getting block from local store",
720759
err = err.msg, address = wantedBlock
721-
peerCtx.removeInFlight(wantedBlock)
760+
peerCtx.markBlockAsNotSent(wantedBlock)
722761
continue
723762
blockDeliveries.add(blockDelivery)
724763
sent.incl(wantedBlock)
@@ -737,7 +776,7 @@ proc taskHandler*(
737776
finally:
738777
# Better safe than sorry: if an exception does happen, we don't want to keep
739778
# those in flight as it'll effectively prevent the blocks from ever being sent.
740-
peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks)
779+
peerCtx.blocksSent.keepItIf(it notin wantedBlocks)
741780

742781
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
743782
## process tasks

codex/blockexchange/engine/pendingblocks.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ declareGauge(
3434

3535
const
3636
DefaultBlockRetries* = 3000
37-
DefaultRetryInterval* = 180.seconds
37+
DefaultRetryInterval* = 5.seconds
3838

3939
type
4040
RetriesExhaustedError* = object of CatchableError

codex/blockexchange/peers/peercontext.nim

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,25 @@ type BlockExcPeerCtx* = ref object of RootObj
3030
blocks*: Table[BlockAddress, Presence] # remote peer have list including price
3131
wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants
3232
exchanged*: int # times peer has exchanged with us
33-
lastExchange*: Moment # last time peer has exchanged with us
3433
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
3534
account*: ?Account # ethereum account of this peer
3635
paymentChannel*: ?ChannelId # payment channel id
37-
blocksInFlight*: HashSet[BlockAddress] # blocks in flight towards peer
36+
blocksSent*: HashSet[BlockAddress] # blocks sent to peer
37+
blocksRequested*: HashSet[BlockAddress] # pending block requests to this peer
38+
lastExchange*: Moment # last time peer has sent us a block
39+
activityTimeout*: Duration
3840

3941
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
4042
self.lastRefresh + 5.minutes < Moment.now()
4143

42-
proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool =
43-
address in self.blocksInFlight
44+
proc isBlockSent*(self: BlockExcPeerCtx, address: BlockAddress): bool =
45+
address in self.blocksSent
4446

45-
proc addInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
46-
self.blocksInFlight.incl(address)
47+
proc markBlockAsSent*(self: BlockExcPeerCtx, address: BlockAddress) =
48+
self.blocksSent.incl(address)
4749

48-
proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
49-
self.blocksInFlight.excl(address)
50+
proc markBlockAsNotSent*(self: BlockExcPeerCtx, address: BlockAddress) =
51+
self.blocksSent.excl(address)
5052

5153
proc refreshed*(self: BlockExcPeerCtx) =
5254
self.lastRefresh = Moment.now()
@@ -77,3 +79,31 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 =
7779
price += precense[].price
7880

7981
price
82+
83+
proc blockRequested*(self: BlockExcPeerCtx, address: BlockAddress) =
84+
# We start counting the timeout from the first block requested.
85+
if self.blocksRequested.len == 0:
86+
self.lastExchange = Moment.now()
87+
self.blocksRequested.incl(address)
88+
89+
proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) =
90+
self.blocksRequested.excl(address)
91+
92+
proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress) =
93+
self.blocksRequested.excl(address)
94+
self.lastExchange = Moment.now()
95+
96+
proc activityTimer*(
97+
self: BlockExcPeerCtx
98+
): Future[void] {.async: (raises: [CancelledError]).} =
99+
## This is called by the block exchange when a block is scheduled for this peer.
100+
## If the peer sends no blocks for a while, it is considered inactive/uncooperative
101+
## and the peer is dropped. Note that ANY block that the peer sends will reset this
102+
## timer for all blocks.
103+
##
104+
while true:
105+
let idleTime = Moment.now() - self.lastExchange
106+
if idleTime > self.activityTimeout:
107+
return
108+
109+
await sleepAsync(self.activityTimeout - idleTime)

0 commit comments

Comments
 (0)